Skip to content

Commit 3bd9e50

Browse files
ethowitzvassili-zarouba
authored andcommitted
dataflow-expression: Add support for the EXTRACT built-in function
- This commit adds support for the PostgreSQL built-in `EXTRACT` function. This is a combination of Ethan's and Vassili's commits, squashed for easier review. Fixes: REA-4370 Change-Id: I9676fe63ab0416e95f9b698e66b02055adccafee Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7676 Tested-by: Buildkite CI Reviewed-by: Jason Brown <[email protected]>
1 parent 256c09e commit 3bd9e50

File tree

23 files changed

+894
-96
lines changed

23 files changed

+894
-96
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dataflow-expression/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,6 @@ tokio = { workspace = true, features = ["full"] }
3131
tokio-postgres = { workspace = true, features = ["with-chrono-0_4", "with-eui48-1", "with-uuid-0_8", "with-serde_json-1", "with-bit-vec-0_6"] }
3232
postgres = { workspace = true, features = ["with-chrono-0_4", "with-eui48-1", "with-uuid-0_8", "with-serde_json-1", "with-bit-vec-0_6"] }
3333
mysql_async = { workspace = true }
34+
anyhow = { workspace = true }
35+
regex = { workspace = true }
36+
bytes = { workspace = true }

dataflow-expression/src/eval/builtins.rs

Lines changed: 237 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ use std::ops::{Add, Div, Mul, Sub};
55
use std::str::FromStr;
66

77
use chrono::{
8-
Datelike, LocalResult, Month, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Timelike, Weekday,
8+
Datelike, Duration, LocalResult, Month, NaiveDate, NaiveDateTime, NaiveTime, Offset, TimeZone,
9+
Timelike, Weekday,
910
};
1011
use chrono_tz::Tz;
1112
use itertools::Either;
1213
use mysql_time::MySqlTime;
13-
use readyset_data::{DfType, DfValue};
14-
use readyset_errors::{invalid_query_err, unsupported, ReadySetError, ReadySetResult};
14+
use nom_sql::TimestampField;
15+
use readyset_data::{DfType, DfValue, TimestampTz};
16+
use readyset_errors::{internal, invalid_query_err, unsupported, ReadySetError, ReadySetResult};
1517
use readyset_util::math::integer_rnd;
1618
use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
1719
use rust_decimal::Decimal;
@@ -21,8 +23,15 @@ use vec1::Vec1;
2123

2224
use crate::{BuiltinFunction, Expr};
2325

26+
const MICROS_IN_SECOND: u32 = 1_000_000;
27+
const MILLIS_IN_SECOND: u32 = 1_000;
2428
const NANOS_IN_MICRO: u32 = 1_000;
29+
const NANOS_IN_SECOND: u32 = 1_000_000_000;
2530
const NANOS_IN_MILLI: u32 = 1_000_000;
31+
const SECONDS_IN_HOUR: u64 = 60 * 60;
32+
const SECONDS_IN_MINUTE: u64 = 60;
33+
const SECONDS_IN_DAY: u64 = SECONDS_IN_HOUR * 24;
34+
const MINUTES_IN_HOUR: u64 = 60;
2635

2736
macro_rules! try_cast_or_none {
2837
($df_value:expr, $to_ty:expr, $from_ty:expr) => {{
@@ -272,6 +281,220 @@ fn date_trunc(precision: DateTruncPrecision, dt: NaiveDateTime) -> ReadySetResul
272281
}
273282
}
274283

284+
// Source:
285+
// https://github.com/postgres/postgres/blob/c6cf6d353c2865d82356ac86358622a101fde8ca/src/interfaces/ecpg/pgtypeslib/dt_common.c#L581-L582
286+
fn ymd_to_julian_date(y: i32, m: i32, d: i32) -> i32 {
287+
let month: i32;
288+
let year: i32;
289+
290+
if m > 2 {
291+
month = m + 1;
292+
year = y + 4800;
293+
} else {
294+
month = m + 13;
295+
year = y + 4799;
296+
}
297+
298+
let century = year / 100;
299+
let mut julian_date = year * 365 - 32167;
300+
julian_date += year / 4 - century + century / 4;
301+
julian_date += 7834 * month / 256 + d;
302+
303+
julian_date
304+
}
305+
306+
#[inline(always)]
307+
fn years_term(year: i32, term: i32) -> i32 {
308+
if year > 0 {
309+
(year + (term - 1)) / term
310+
} else {
311+
-(((term - 1) - (year - 1)) / term)
312+
}
313+
}
314+
315+
#[inline(always)]
316+
fn years_decade(year: i32) -> i32 {
317+
if year >= 0 {
318+
year / 10
319+
} else {
320+
-((8 - (year - 1)) / 10)
321+
}
322+
}
323+
324+
#[inline(always)]
325+
fn seconds_term(secs: u32, nanos: u32, units_per_sec: u32) -> Decimal {
326+
Decimal::from(secs * units_per_sec)
327+
+ (Decimal::from(nanos) / Decimal::from(NANOS_IN_SECOND / units_per_sec))
328+
}
329+
330+
#[inline(always)]
331+
fn hms_to_seconds(h: u32, m: u32, s: u32) -> u32 {
332+
((h * MINUTES_IN_HOUR as u32) + m) * SECONDS_IN_MINUTE as u32 + s
333+
}
334+
335+
#[inline(always)]
336+
fn hms_with_nanos_to_days(h: u32, m: u32, s: u32, nanos: u32) -> Decimal {
337+
seconds_term(hms_to_seconds(h, m, s), nanos, 1) / Decimal::from(SECONDS_IN_DAY).round()
338+
}
339+
340+
#[inline(always)]
341+
fn tz_to_seconds(dt: &NaiveDateTime) -> i32 {
342+
dt.and_utc().timezone().fix().local_minus_utc()
343+
}
344+
345+
/* The error message we compose here, is compatible with Postgres 15, but not with 13/14
346+
*/
347+
fn invalid_extract_call_error_message(cal_type: &str, field: TimestampField) -> String {
348+
let mut msg = format!(
349+
"ERROR: unit \"{}\" not supported for type {}",
350+
format!("{field}").to_lowercase(),
351+
cal_type
352+
);
353+
if cal_type.eq_ignore_ascii_case("time") || cal_type.eq_ignore_ascii_case("timestamp") {
354+
msg.push_str(" without time zone");
355+
}
356+
msg
357+
}
358+
359+
fn invalid_extract_call_error(calendar_type: &str, field: TimestampField) -> ReadySetError {
360+
invalid_query_err!(
361+
"{}",
362+
invalid_extract_call_error_message(calendar_type, field)
363+
)
364+
}
365+
366+
fn extract_from_time(field: TimestampField, tm: &MySqlTime) -> ReadySetResult<DfValue> {
367+
macro_rules! seconds_term {
368+
($tm:expr, $units_in_second:expr) => {
369+
seconds_term(
370+
$tm.seconds() as u32,
371+
($tm.microseconds() * NANOS_IN_MICRO),
372+
$units_in_second,
373+
)
374+
};
375+
}
376+
377+
let result: DfValue = match field {
378+
TimestampField::Hour => tm.hour().into(),
379+
TimestampField::Minute => tm.minutes().into(),
380+
TimestampField::Second => seconds_term!(tm, 1).into(),
381+
TimestampField::Milliseconds => seconds_term!(tm, MILLIS_IN_SECOND).into(),
382+
TimestampField::Microseconds => seconds_term!(tm, MICROS_IN_SECOND).round().into(),
383+
TimestampField::Epoch => Duration::from(*tm).num_seconds().into(),
384+
_ => return Err(invalid_extract_call_error("time", field)),
385+
};
386+
387+
Ok(result)
388+
}
389+
390+
fn extract_from_timestamptz(field: TimestampField, tz: &TimestampTz) -> ReadySetResult<DfValue> {
391+
macro_rules! has_time_else_error {
392+
($tz:expr, $field:expr) => {
393+
if $tz.has_date_only() {
394+
return Err(invalid_extract_call_error("date", $field));
395+
}
396+
};
397+
}
398+
399+
macro_rules! has_timezone_else_error {
400+
($tz:expr, $field:expr) => {
401+
if !$tz.has_timezone() {
402+
return Err(invalid_extract_call_error(
403+
if $tz.has_date_only() {
404+
"date"
405+
} else {
406+
"timestamp"
407+
},
408+
$field,
409+
));
410+
}
411+
};
412+
}
413+
414+
macro_rules! seconds_term {
415+
($dt_utc:expr, $units_in_second:expr) => {
416+
seconds_term($dt_utc.second(), $dt_utc.nanosecond(), $units_in_second)
417+
};
418+
}
419+
420+
let dt_utc = tz.to_chrono().naive_utc();
421+
let result: DfValue = match field {
422+
TimestampField::Millennium => years_term(dt_utc.year(), 1000).into(),
423+
TimestampField::Century => years_term(dt_utc.year(), 100).into(),
424+
TimestampField::Decade => years_decade(dt_utc.year()).into(),
425+
TimestampField::Year => {
426+
let year = dt_utc.year();
427+
if year < 0 { year - 1 } else { year }.into()
428+
}
429+
TimestampField::Isoyear => {
430+
let year = dt_utc.iso_week().year();
431+
if year <= 0 { year - 1 } else { year }.into()
432+
}
433+
TimestampField::Quarter => (dt_utc.month0() / 3 + 1).into(),
434+
TimestampField::Month => dt_utc.month().into(),
435+
TimestampField::Week => dt_utc.iso_week().week().into(),
436+
TimestampField::Day => dt_utc.day().into(),
437+
TimestampField::Dow => dt_utc.weekday().num_days_from_sunday().into(),
438+
TimestampField::Isodow => dt_utc.weekday().number_from_monday().into(),
439+
TimestampField::Doy => dt_utc.ordinal().into(),
440+
TimestampField::Hour => {
441+
has_time_else_error!(tz, field);
442+
dt_utc.hour().into()
443+
}
444+
TimestampField::Minute => {
445+
has_time_else_error!(tz, field);
446+
dt_utc.minute().into()
447+
}
448+
TimestampField::Second => {
449+
has_time_else_error!(tz, field);
450+
seconds_term!(dt_utc, 1).into()
451+
}
452+
TimestampField::Milliseconds => {
453+
has_time_else_error!(tz, field);
454+
seconds_term!(dt_utc, MILLIS_IN_SECOND).into()
455+
}
456+
TimestampField::Microseconds => {
457+
has_time_else_error!(tz, field);
458+
seconds_term!(dt_utc, MICROS_IN_SECOND).round().into()
459+
}
460+
TimestampField::Epoch => (Decimal::from(dt_utc.and_utc().timestamp_micros())
461+
/ Decimal::from(MICROS_IN_SECOND))
462+
.round_dp(6)
463+
.into(),
464+
TimestampField::Julian => {
465+
let julian_date: Decimal =
466+
ymd_to_julian_date(dt_utc.year(), dt_utc.month() as i32, dt_utc.day() as i32)
467+
.into();
468+
if tz.has_date_only() {
469+
julian_date
470+
} else {
471+
julian_date
472+
+ hms_with_nanos_to_days(
473+
dt_utc.hour(),
474+
dt_utc.minute(),
475+
dt_utc.second(),
476+
dt_utc.nanosecond(),
477+
)
478+
}
479+
.into()
480+
}
481+
TimestampField::Timezone => {
482+
has_timezone_else_error!(tz, field);
483+
tz_to_seconds(&dt_utc).into()
484+
}
485+
TimestampField::TimezoneHour => {
486+
has_timezone_else_error!(tz, field);
487+
(tz_to_seconds(&dt_utc) / SECONDS_IN_HOUR as i32).into()
488+
}
489+
TimestampField::TimezoneMinute => {
490+
has_timezone_else_error!(tz, field);
491+
((tz_to_seconds(&dt_utc) % SECONDS_IN_HOUR as i32) / SECONDS_IN_MINUTE as i32).into()
492+
}
493+
};
494+
495+
Ok(result)
496+
}
497+
275498
/// Format the given time value according to the given `format_string`, using the [MySQL date
276499
/// formatting rules][mysql-docs]. Since these rules don't match up well with anything available in
277500
/// the Rust crate ecosystem, this is done manually.
@@ -1033,6 +1256,17 @@ impl BuiltinFunction {
10331256
date_trunc(precision, datetime.naive_utc()).unwrap(),
10341257
))
10351258
}
1259+
BuiltinFunction::Extract(field, expr) => {
1260+
let ts = non_null!(expr.eval(record)?);
1261+
if let DfValue::TimestampTz(tz) = ts {
1262+
extract_from_timestamptz(*field, &tz)
1263+
} else if let DfValue::Time(tm) = ts {
1264+
extract_from_time(*field, &tm)
1265+
} else {
1266+
internal!("EXTRACT function input expected to be DfValue::TimestampTz or DfValue::Time. Found {}", ts);
1267+
}
1268+
.and_then(|value| value.coerce_to(ty, &DfType::Unknown))
1269+
}
10361270
}
10371271
}
10381272
}

dataflow-expression/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::fmt::{self, Display, Formatter};
1111

1212
pub use eval::builtins::DateTruncPrecision;
1313
use itertools::Itertools;
14+
use nom_sql::TimestampField;
1415
pub use readyset_data::Dialect;
1516
use readyset_data::{DfType, DfValue};
1617
use serde::{Deserialize, Serialize};
@@ -112,6 +113,9 @@ pub enum BuiltinFunction {
112113

113114
/// [`date_trunc`](https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC)
114115
DateTrunc(Expr, Expr),
116+
117+
/// [`date_trunc`](https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC)
118+
Extract(TimestampField, Expr),
115119
}
116120

117121
impl BuiltinFunction {
@@ -146,6 +150,7 @@ impl BuiltinFunction {
146150
Least { .. } => "least",
147151
ArrayToString { .. } => "array_to_string",
148152
DateTrunc { .. } => "date_trunc",
153+
Extract { .. } => "extract",
149154
}
150155
}
151156
}
@@ -244,6 +249,9 @@ impl Display for BuiltinFunction {
244249
DateTrunc(field, source) => {
245250
write!(f, "({}, {})", field, source)
246251
}
252+
Extract(field, expr) => {
253+
write!(f, "({} FROM {})", field, expr)
254+
}
247255
}
248256
}
249257
}

dataflow-expression/src/lower.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,13 @@ impl Expr {
862862

863863
Ok(Self::Call { func, ty })
864864
}
865+
AstExpr::Call(FunctionExpr::Extract { field, expr }) => {
866+
let expr = Self::lower(*expr, dialect, context)?;
867+
let ty = DfType::Numeric { prec: 20, scale: 6 };
868+
let func = Box::new(BuiltinFunction::Extract(field, expr));
869+
870+
Ok(Self::Call { func, ty })
871+
}
865872
AstExpr::Call(call) => internal!(
866873
"Unexpected (aggregate?) call node in project expression: {:?}",
867874
Sensitive(&call)

dataflow-expression/tests/common/mod.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,8 @@ pub fn parse_lower_eval(
2929
expr: &str,
3030
parser_dialect: nom_sql::Dialect,
3131
expr_dialect: dataflow_expression::Dialect,
32-
) -> DfValue {
32+
) -> ReadySetResult<DfValue> {
3333
let ast = parse_expr(parser_dialect, expr).unwrap();
3434
let lowered = Expr::lower(ast, expr_dialect, &TestLowerContext).unwrap();
35-
match lowered.eval::<DfValue>(&[]) {
36-
Ok(res) => res,
37-
Err(e) => {
38-
panic!("Error evaluating `{expr}`: {e}")
39-
}
40-
}
35+
lowered.eval::<DfValue>(&[])
4136
}

dataflow-expression/tests/mysql_oracle.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ async fn compare_eval(expr: &str, conn: &mut Conn) {
7979
expr,
8080
nom_sql::Dialect::MySQL,
8181
dataflow_expression::Dialect::DEFAULT_MYSQL,
82-
);
82+
)
83+
.unwrap_or_else(|e| panic!("Error evaluating `{expr}`: {e}"));
8384
assert_eq!(
8485
our_result, mysql_result,
8586
"mismatched results for {expr} (left: us, right: mysql)"
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Seeds for failure cases proptest has generated in the past. It is
2+
# automatically read and these particular cases re-run before any
3+
# novel cases are generated.
4+
#
5+
# It is recommended to check this file in to source control so that
6+
# everyone who runs the test benefits from these saved cases.
7+
cc 2babad91d07b8955fc644887a196de3f10a2d5906841ecf2266022cab7ada9df # shrinks to field = Century, datetime = 1970-01-01T00:00:00
8+
cc 74a8620219bfa0ce332d4888bd451e13ebae34411b31cb3c698100aa36b7ed36 # shrinks to field = Second, datetime = 1970-01-01T00:00:01
9+
cc 7cb714461ce8352e2586db8c9d70dc6473741533cff95ab38f93945034ee9082 # shrinks to field = Isoyear, datetime = 1970-01-01T00:00:00
10+
cc 0e4282717b1b027255dedda6a563a9e476c859217c68eb85c662c7fd9f222e0a # shrinks to field = Decade, datetime = 1970-01-01T00:00:00
11+
cc 03b2ab6ce5e6c9a8625203de6bff831de34734dbccce73558e8cc34113459319 # shrinks to field = Epoch, datetime = 1970-01-01T00:00:01
12+
cc c11828b40b19ecaf688bf5e0da47f3615913ada65a2c103f9d06f06a6c6415d2 # shrinks to field = Dow, datetime = 2035-09-09T00:00:00
13+
cc d893989c83aaae04070d3d4f3a5268baaee176330458a0a73cb738a64940cc08 # shrinks to field = Julian, datetime = 2038-01-01T00:00:01
14+
cc f0e758dc44d25cc02f50841e54d5a9ce2808d9d7c3d2bf3d499eba04b893b9f2 # shrinks to field = Julian, datetime = 1970-01-01T00:00:09
15+
cc ad2e5c4942b266e05f9337bef7b482985b0dd83d986c2be7265a3cb45385931f # shrinks to field = Century, date = -0001-01-01
16+
cc ae8a16e5a382edc488ecb5d21c31ac5f9c816e47d9b15f1e86bfe1951a218655 # shrinks to field = Isoyear, date = -0001-01-03
17+
cc 0d521b15cabc8ee12b89ec2532e9b59923158340de74ca00a7580fb78f7c68ba # shrinks to field = Decade, date = -0001-01-02
18+
cc b9588ba5419366556017de48721258ee0f8568893550e25503c14d637b1744fb # shrinks to field = Century, date = -4290-01-01

0 commit comments

Comments
 (0)