Skip to content

Commit aca7169

Browse files
authored
feat(spark): add unix date and timestamp functions (apache#19892)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#19891 - Part of apache#15914 ## Rationale for this change Add spark unix date and timestamp functions. ## What changes are included in this PR? New spark functions: - https://spark.apache.org/docs/latest/api/sql/index.html#unix_date - https://spark.apache.org/docs/latest/api/sql/index.html#unix_seconds - https://spark.apache.org/docs/latest/api/sql/index.html#unix_millis - https://spark.apache.org/docs/latest/api/sql/index.html#unix_micros ## Are these changes tested? yes in slt ## Are there any user-facing changes? yes
1 parent af77197 commit aca7169

3 files changed

Lines changed: 349 additions & 0 deletions

File tree

datafusion/spark/src/function/datetime/mod.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod next_day;
3030
pub mod time_trunc;
3131
pub mod to_utc_timestamp;
3232
pub mod trunc;
33+
pub mod unix;
3334

3435
use datafusion_expr::ScalarUDF;
3536
use datafusion_functions::make_udf_function;
@@ -55,6 +56,22 @@ make_udf_function!(next_day::SparkNextDay, next_day);
5556
make_udf_function!(time_trunc::SparkTimeTrunc, time_trunc);
5657
make_udf_function!(to_utc_timestamp::SparkToUtcTimestamp, to_utc_timestamp);
5758
make_udf_function!(trunc::SparkTrunc, trunc);
59+
make_udf_function!(unix::SparkUnixDate, unix_date);
60+
make_udf_function!(
61+
unix::SparkUnixTimestamp,
62+
unix_micros,
63+
unix::SparkUnixTimestamp::microseconds
64+
);
65+
make_udf_function!(
66+
unix::SparkUnixTimestamp,
67+
unix_millis,
68+
unix::SparkUnixTimestamp::milliseconds
69+
);
70+
make_udf_function!(
71+
unix::SparkUnixTimestamp,
72+
unix_seconds,
73+
unix::SparkUnixTimestamp::seconds
74+
);
5875

5976
pub mod expr_fn {
6077
use datafusion_functions::export_functions;
@@ -142,6 +159,26 @@ pub mod expr_fn {
142159
"Interpret a given timestamp `ts` in timezone `tz` and then convert it to UTC timezone.",
143160
ts tz
144161
));
162+
export_functions!((
163+
unix_date,
164+
"Returns the number of days since epoch (1970-01-01) for the given date `dt`.",
165+
dt
166+
));
167+
export_functions!((
168+
unix_micros,
169+
"Returns the number of microseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp `ts`.",
170+
ts
171+
));
172+
export_functions!((
173+
unix_millis,
174+
"Returns the number of milliseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp `ts`.",
175+
ts
176+
));
177+
export_functions!((
178+
unix_seconds,
179+
"Returns the number of seconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp `ts`.",
180+
ts
181+
));
145182
}
146183

147184
pub fn functions() -> Vec<Arc<ScalarUDF>> {
@@ -163,5 +200,9 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
163200
time_trunc(),
164201
to_utc_timestamp(),
165202
trunc(),
203+
unix_date(),
204+
unix_micros(),
205+
unix_millis(),
206+
unix_seconds(),
166207
]
167208
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
22+
use datafusion_common::types::logical_date;
23+
use datafusion_common::utils::take_function_args;
24+
use datafusion_common::{Result, internal_err};
25+
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext};
26+
use datafusion_expr::{
27+
Coercion, ColumnarValue, Expr, ExprSchemable, ReturnFieldArgs, ScalarFunctionArgs,
28+
ScalarUDFImpl, Signature, TypeSignatureClass, Volatility,
29+
};
30+
31+
/// Returns the number of days since epoch (1970-01-01) for the given date.
32+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_date>
33+
#[derive(Debug, PartialEq, Eq, Hash)]
34+
pub struct SparkUnixDate {
35+
signature: Signature,
36+
}
37+
38+
impl Default for SparkUnixDate {
39+
fn default() -> Self {
40+
Self::new()
41+
}
42+
}
43+
44+
impl SparkUnixDate {
45+
pub fn new() -> Self {
46+
Self {
47+
signature: Signature::coercible(
48+
vec![Coercion::new_exact(TypeSignatureClass::Native(
49+
logical_date(),
50+
))],
51+
Volatility::Immutable,
52+
),
53+
}
54+
}
55+
}
56+
57+
impl ScalarUDFImpl for SparkUnixDate {
58+
fn as_any(&self) -> &dyn Any {
59+
self
60+
}
61+
62+
fn name(&self) -> &str {
63+
"unix_date"
64+
}
65+
66+
fn signature(&self) -> &Signature {
67+
&self.signature
68+
}
69+
70+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
71+
internal_err!("return_field_from_args should be used instead")
72+
}
73+
74+
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
75+
let nullable = args.arg_fields[0].is_nullable();
76+
Ok(Arc::new(Field::new(self.name(), DataType::Int32, nullable)))
77+
}
78+
79+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
80+
internal_err!("invoke_with_args should not be called on SparkUnixDate")
81+
}
82+
83+
fn simplify(
84+
&self,
85+
args: Vec<Expr>,
86+
info: &SimplifyContext,
87+
) -> Result<ExprSimplifyResult> {
88+
let [date] = take_function_args(self.name(), args)?;
89+
Ok(ExprSimplifyResult::Simplified(
90+
date.cast_to(&DataType::Date32, info.schema())?
91+
.cast_to(&DataType::Int32, info.schema())?,
92+
))
93+
}
94+
}
95+
96+
#[derive(Debug, PartialEq, Eq, Hash)]
97+
pub struct SparkUnixTimestamp {
98+
time_unit: TimeUnit,
99+
signature: Signature,
100+
name: &'static str,
101+
}
102+
103+
impl SparkUnixTimestamp {
104+
pub fn new(name: &'static str, time_unit: TimeUnit) -> Self {
105+
Self {
106+
signature: Signature::coercible(
107+
vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
108+
Volatility::Immutable,
109+
),
110+
time_unit,
111+
name,
112+
}
113+
}
114+
115+
/// Returns the number of microseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp.
116+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_micros>
117+
pub fn microseconds() -> Self {
118+
Self::new("unix_micros", TimeUnit::Microsecond)
119+
}
120+
121+
/// Returns the number of milliseconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp.
122+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_millis>
123+
pub fn milliseconds() -> Self {
124+
Self::new("unix_millis", TimeUnit::Millisecond)
125+
}
126+
127+
/// Returns the number of seconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp.
128+
/// <https://spark.apache.org/docs/latest/api/sql/index.html#unix_seconds>
129+
pub fn seconds() -> Self {
130+
Self::new("unix_seconds", TimeUnit::Second)
131+
}
132+
}
133+
134+
impl ScalarUDFImpl for SparkUnixTimestamp {
135+
fn as_any(&self) -> &dyn Any {
136+
self
137+
}
138+
139+
fn name(&self) -> &str {
140+
self.name
141+
}
142+
143+
fn signature(&self) -> &Signature {
144+
&self.signature
145+
}
146+
147+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
148+
internal_err!("return_field_from_args should be used instead")
149+
}
150+
151+
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
152+
let nullable = args.arg_fields[0].is_nullable();
153+
Ok(Arc::new(Field::new(self.name(), DataType::Int64, nullable)))
154+
}
155+
156+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
157+
internal_err!("invoke_with_args should not be called on `{}`", self.name())
158+
}
159+
160+
fn simplify(
161+
&self,
162+
args: Vec<Expr>,
163+
info: &SimplifyContext,
164+
) -> Result<ExprSimplifyResult> {
165+
let [ts] = take_function_args(self.name(), args)?;
166+
Ok(ExprSimplifyResult::Simplified(
167+
ts.cast_to(
168+
&DataType::Timestamp(self.time_unit, Some("UTC".into())),
169+
info.schema(),
170+
)?
171+
.cast_to(&DataType::Int64, info.schema())?,
172+
))
173+
}
174+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Unix Date tests
19+
20+
query I
21+
SELECT unix_date('1970-01-02'::date);
22+
----
23+
1
24+
25+
query I
26+
SELECT unix_date('1900-01-02'::date);
27+
----
28+
-25566
29+
30+
31+
query I
32+
SELECT unix_date(arrow_cast('1970-01-02', 'Date64'));
33+
----
34+
1
35+
36+
query I
37+
SELECT unix_date(NULL::date);
38+
----
39+
NULL
40+
41+
query error Function 'unix_date' requires TypeSignatureClass::Native\(LogicalType\(Native\(Date\), Date\)\), but received String \(DataType: Utf8View\)
42+
SELECT unix_date('1970-01-02'::string);
43+
44+
# Unix Micro Tests
45+
46+
query I
47+
SELECT unix_micros('1970-01-01 00:00:01Z'::timestamp);
48+
----
49+
1000000
50+
51+
query I
52+
SELECT unix_micros('1900-01-01 00:00:01Z'::timestamp);
53+
----
54+
-2208988799000000
55+
56+
query I
57+
SELECT unix_micros(arrow_cast('1970-01-01 00:00:01+02:00', 'Timestamp(Microsecond, None)'));
58+
----
59+
-7199000000
60+
61+
query I
62+
SELECT unix_micros(arrow_cast('1970-01-01 00:00:01Z', 'Timestamp(Second, None)'));
63+
----
64+
1000000
65+
66+
query I
67+
SELECT unix_micros(NULL::timestamp);
68+
----
69+
NULL
70+
71+
query error Function 'unix_micros' requires TypeSignatureClass::Timestamp, but received String \(DataType: Utf8View\)
72+
SELECT unix_micros('1970-01-01 00:00:01Z'::string);
73+
74+
75+
# Unix Millis Tests
76+
77+
query I
78+
SELECT unix_millis('1970-01-01 00:00:01Z'::timestamp);
79+
----
80+
1000
81+
82+
query I
83+
SELECT unix_millis('1900-01-01 00:00:01Z'::timestamp);
84+
----
85+
-2208988799000
86+
87+
query I
88+
SELECT unix_millis(arrow_cast('1970-01-01 00:00:01+02:00', 'Timestamp(Microsecond, None)'));
89+
----
90+
-7199000
91+
92+
query I
93+
SELECT unix_millis(arrow_cast('1970-01-01 00:00:01Z', 'Timestamp(Second, None)'));
94+
----
95+
1000
96+
97+
query I
98+
SELECT unix_millis(NULL::timestamp);
99+
----
100+
NULL
101+
102+
query error Function 'unix_millis' requires TypeSignatureClass::Timestamp, but received String \(DataType: Utf8View\)
103+
SELECT unix_millis('1970-01-01 00:00:01Z'::string);
104+
105+
106+
# Unix Seconds Tests
107+
108+
query I
109+
SELECT unix_seconds('1970-01-01 00:00:01Z'::timestamp);
110+
----
111+
1
112+
113+
query I
114+
SELECT unix_seconds('1900-01-01 00:00:01Z'::timestamp);
115+
----
116+
-2208988799
117+
118+
query I
119+
SELECT unix_seconds(arrow_cast('1970-01-01 00:00:01+02:00', 'Timestamp(Microsecond, None)'));
120+
----
121+
-7199
122+
123+
query I
124+
SELECT unix_seconds(arrow_cast('1970-01-01 00:00:01Z', 'Timestamp(Second, None)'));
125+
----
126+
1
127+
128+
query I
129+
SELECT unix_seconds(NULL::timestamp);
130+
----
131+
NULL
132+
133+
query error Function 'unix_seconds' requires TypeSignatureClass::Timestamp, but received String \(DataType: Utf8View\)
134+
SELECT unix_seconds('1970-01-01 00:00:01Z'::string);

0 commit comments

Comments
 (0)