Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions daft/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
to_date,
to_datetime,
convert_time_zone,
convert_timezone,
from_utc_timestamp,
to_utc_timestamp,
replace_time_zone,
date_trunc,
trunc,
Expand Down Expand Up @@ -349,6 +352,7 @@
"conv",
"convert_image",
"convert_time_zone",
"convert_timezone",
"cos",
"cosh",
"cosine_distance",
Expand Down Expand Up @@ -414,6 +418,7 @@
"floor",
"format",
"from_unixtime",
"from_utc_timestamp",
"get",
"great_circle_distance",
"guess_mime_type",
Expand Down Expand Up @@ -580,6 +585,7 @@
"to_upper_camel_case",
"to_upper_kebab_case",
"to_upper_snake_case",
"to_utc_timestamp",
"tokenize_decode",
"tokenize_encode",
"total_days",
Expand Down
84 changes: 84 additions & 0 deletions daft/functions/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,90 @@ def replace_time_zone(expr: Expression, timezone: str | None = None) -> Expressi
return Expression._call_builtin_scalar_fn("replace_time_zone", expr, timezone)


def from_utc_timestamp(expr: Expression, timezone: str) -> Expression:
"""Interprets a UTC timestamp and returns the wall-clock time in the given timezone.

Mirrors Spark's ``from_utc_timestamp``. The input is treated as a UTC instant
regardless of any timezone label, and the result is a tz-naive Timestamp whose
value reads as the wall-clock time in ``timezone``.

Args:
expr: A Timestamp expression interpreted as UTC.
timezone: Target timezone name (e.g. ``"America/Los_Angeles"`` or ``"+01:00"``).

Returns:
Expression: A tz-naive Timestamp expression representing the wall-clock in ``timezone``.

Note:
Unlike Spark, Daft does not silently resolve DST transitions during this
conversion. ``from_utc_timestamp`` itself maps UTC instants to local time
via :func:`chrono::TimeZone::from_utc_datetime`, which is unambiguous and
never errors. The strict DST handling only applies to :func:`to_utc_timestamp`
(see its docstring).

Examples:
>>> import daft
>>> from datetime import datetime
>>> from daft.functions import from_utc_timestamp
>>> df = daft.from_pydict({"ts": [datetime(2017, 7, 14, 2, 40)]})
>>> df = df.with_column("local", from_utc_timestamp(df["ts"], "Europe/London"))
>>> df.to_pydict()["local"]
[datetime.datetime(2017, 7, 14, 3, 40)]
"""
return Expression._call_builtin_scalar_fn("from_utc_timestamp", expr, timezone=timezone)


def to_utc_timestamp(expr: Expression, timezone: str) -> Expression:
"""Interprets a wall-clock timestamp in the given timezone and returns the UTC instant.

Mirrors Spark's ``to_utc_timestamp``. The input's wall-clock value is treated as
local time in ``timezone`` and converted to the equivalent UTC instant, returned as
a tz-naive Timestamp.

Args:
expr: A Timestamp expression whose wall-clock is interpreted in ``timezone``.
timezone: Source timezone name.

Returns:
Expression: A tz-naive Timestamp expression representing the UTC instant.

Note:
DST transition handling differs from Spark. When the local wall-clock falls
in a non-existent gap (e.g. the spring-forward hour) or an ambiguous overlap
(e.g. the fall-back hour), Daft raises a ``ValueError`` rather than silently
picking a side. Spark instead advances past the gap and resolves ambiguity
to the pre-transition offset. If you need Spark-compatible behavior, filter
or pre-shift these inputs before calling.

Examples:
>>> import daft
>>> from datetime import datetime
>>> from daft.functions import to_utc_timestamp
>>> df = daft.from_pydict({"ts": [datetime(2017, 7, 14, 3, 40)]})
>>> df = df.with_column("utc", to_utc_timestamp(df["ts"], "Europe/London"))
>>> df.to_pydict()["utc"]
[datetime.datetime(2017, 7, 14, 2, 40)]
"""
return Expression._call_builtin_scalar_fn("to_utc_timestamp", expr, timezone=timezone)


def convert_timezone(target_timezone: str, source_timestamp: Expression) -> Expression:
"""Spark-style alias for :func:`convert_time_zone`.

Note Spark's argument order is ``(target_timezone, source_timestamp)`` which is the
reverse of Daft's :func:`convert_time_zone`. The source timestamp must already carry
a timezone (this alias does not accept a ``from_timezone`` argument).

Args:
target_timezone: Target timezone name.
source_timestamp: A tz-aware Timestamp expression.

Returns:
Expression: Timestamp expression in the target timezone.
"""
return convert_time_zone(source_timestamp, target_timezone)


def date_trunc(interval: str, expr: Expression, relative_to: Expression | None = None) -> Expression:
"""Truncates the datetime column to the specified interval.

Expand Down
1 change: 1 addition & 0 deletions src/daft-functions-temporal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ chrono-tz = {workspace = true}
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-schema = {path = "../daft-schema", default-features = false}
paste = "1.0.15"
serde = {workspace = true}
typetag = {workspace = true}
Expand Down
6 changes: 4 additions & 2 deletions src/daft-functions-temporal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod date_arithmetic;
pub mod date_construction;
pub mod date_navigation;
pub mod epoch_conversions;
mod time;
pub mod time;
mod to_string;
mod total;
pub mod truncate;
Expand All @@ -25,7 +25,7 @@ use epoch_conversions::{
DateFromUnixDate, FromUnixtime, TimestampMicros, TimestampMillis, TimestampSeconds,
};
use serde::{Deserialize, Serialize};
use time::{ConvertTimeZone, ReplaceTimeZone, Time};
use time::{ConvertTimeZone, FromUtcTimestamp, ReplaceTimeZone, Time, ToUtcTimestamp};
pub use to_string::ToString;
use truncate::Truncate;
use unix_timestamp::UnixTimestamp;
Expand Down Expand Up @@ -111,6 +111,8 @@ impl FunctionModule for TemporalFunctions {
parent.add_fn(ToString);
parent.add_fn(ConvertTimeZone);
parent.add_fn(ReplaceTimeZone);
parent.add_fn(FromUtcTimestamp);
parent.add_fn(ToUtcTimestamp);
parent.add_fn(Truncate);
parent.add_fn(TotalDays);
parent.add_fn(TotalHours);
Expand Down
185 changes: 184 additions & 1 deletion src/daft-functions-temporal/src/time.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
use std::sync::Arc;

use arrow_array::{
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use common_error::DaftError;
use daft_core::prelude::TimeUnit;
use daft_core::prelude::{AsArrow, TimeUnit};
use daft_dsl::functions::{UnaryArg, prelude::*};
use daft_schema::time_unit::{
naive_datetime_to_timestamp, naive_local_to_timestamp, parse_timezone,
timestamp_to_naive_datetime, timestamp_to_naive_local,
};

// Build an Arrow timestamp array with the physical type that matches the
// logical Daft TimeUnit. Using the typed PrimitiveArray variants (rather than
// Int64Array) keeps the Arrow physical dtype aligned with the Field dtype.
fn build_timestamp_array(values: Vec<Option<i64>>, time_unit: TimeUnit) -> arrow_array::ArrayRef {
match time_unit {
TimeUnit::Seconds => Arc::new(TimestampSecondArray::from(values)),
TimeUnit::Milliseconds => Arc::new(TimestampMillisecondArray::from(values)),
TimeUnit::Microseconds => Arc::new(TimestampMicrosecondArray::from(values)),
TimeUnit::Nanoseconds => Arc::new(TimestampNanosecondArray::from(values)),
}
}

#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct Time;
Expand Down Expand Up @@ -149,3 +171,164 @@ impl ScalarUDF for ReplaceTimeZone {
))
}
}

// --- FromUtcTimestamp ---
//
// Spark semantics: interprets the input as a UTC instant (regardless of any tz
// label) and returns a tz-naive timestamp representing the wall-clock time in
// the given timezone. The output i64 encodes that wall-clock as if it were
// UTC, so a downstream reader formatting the result without a tz sees the
// shifted local time.

#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct FromUtcTimestamp;

#[derive(FunctionArgs)]
struct UtcConversionArgs<T> {
input: T,
timezone: String,
}

#[typetag::serde]
impl ScalarUDF for FromUtcTimestamp {
fn name(&self) -> &'static str {
"from_utc_timestamp"
}

fn call(
&self,
inputs: FunctionArgs<Series>,
_ctx: &daft_dsl::functions::scalar::EvalContext,
) -> DaftResult<Series> {
let UtcConversionArgs { input, timezone } = inputs.try_into()?;
let tz_parsed = parse_timezone(&timezone)?;

let DataType::Timestamp(time_unit, _) = input.data_type().clone() else {
return Err(DaftError::TypeError(format!(
"Expected timestamp input to from_utc_timestamp, got {}",
input.data_type()
)));
};

let ts_array = input.timestamp()?;
let physical = ts_array.as_arrow()?;

let mut values: Vec<Option<i64>> = Vec::with_capacity(physical.len());
for opt in physical {
match opt {
None => values.push(None),
Some(ts) => {
let naive_local = timestamp_to_naive_local(ts, time_unit, &tz_parsed);
values.push(Some(naive_datetime_to_timestamp(naive_local, time_unit)?));
}
}
}

let arrow_arr = build_timestamp_array(values, time_unit);
Series::from_arrow(
Arc::new(Field::new(
input.name().to_string(),
DataType::Timestamp(time_unit, None),
)),
arrow_arr,
)
}

fn get_return_field(
&self,
inputs: FunctionArgs<ExprRef>,
schema: &Schema,
) -> DaftResult<Field> {
let UtcConversionArgs { input, timezone } = inputs.try_into()?;
parse_timezone(&timezone)?;
let field = input.to_field(schema)?;
let DataType::Timestamp(timeunit, _) = &field.dtype else {
return Err(DaftError::TypeError(format!(
"Expected timestamp input to from_utc_timestamp, got {}",
field.dtype
)));
};
Ok(Field::new(field.name, DataType::Timestamp(*timeunit, None)))
}
}

// --- ToUtcTimestamp ---
//
// Spark semantics: interprets the input wall-clock as being in the given
// timezone and returns the equivalent UTC instant as a tz-naive timestamp.
// For tz-aware inputs the wall-clock is first extracted via the input's own
// timezone label, then re-interpreted in the target tz (matches Spark's
// behavior of always treating the displayed wall-clock as the input).

#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ToUtcTimestamp;

#[typetag::serde]
impl ScalarUDF for ToUtcTimestamp {
fn name(&self) -> &'static str {
"to_utc_timestamp"
}

fn call(
&self,
inputs: FunctionArgs<Series>,
_ctx: &daft_dsl::functions::scalar::EvalContext,
) -> DaftResult<Series> {
let UtcConversionArgs { input, timezone } = inputs.try_into()?;
let tz_parsed = parse_timezone(&timezone)?;

let DataType::Timestamp(time_unit, input_tz) = input.data_type().clone() else {
return Err(DaftError::TypeError(format!(
"Expected timestamp input to to_utc_timestamp, got {}",
input.data_type()
)));
};

let input_tz_parsed = input_tz.as_deref().map(parse_timezone).transpose()?;

let ts_array = input.timestamp()?;
let physical = ts_array.as_arrow()?;

let mut values: Vec<Option<i64>> = Vec::with_capacity(physical.len());
for opt in physical {
match opt {
None => values.push(None),
Some(ts) => {
let naive_local = match &input_tz_parsed {
Some(in_tz) => timestamp_to_naive_local(ts, time_unit, in_tz),
None => timestamp_to_naive_datetime(ts, time_unit),
};
let utc_ts =
naive_local_to_timestamp(naive_local, time_unit, &tz_parsed, &timezone)?;
values.push(Some(utc_ts));
Comment on lines +297 to +303

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 DST transition behavior diverges from Spark

naive_local_to_timestamp propagates DaftError::ValueError for both ambiguous times (DST fall-back) and nonexistent times (DST spring-forward). Spark instead silently resolves ambiguous times to the pre-transition offset and advances past the gap for nonexistent times.

A user migrating from Spark who has timestamps around a DST boundary will get a runtime error where Spark would have produced a result. The difference is worth documenting in the docstring even if the strict behavior is intentional.

@BABTUNA BABTUNA May 12, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented in the to_utc_timestamp docstring. Keeping the strict behavior for now. silently picking a side on DST transitions can be just as surprising as erroring.

}
}
}

let arrow_arr = build_timestamp_array(values, time_unit);
Series::from_arrow(
Arc::new(Field::new(
input.name().to_string(),
DataType::Timestamp(time_unit, None),
)),
arrow_arr,
)
}

fn get_return_field(
&self,
inputs: FunctionArgs<ExprRef>,
schema: &Schema,
) -> DaftResult<Field> {
let UtcConversionArgs { input, timezone } = inputs.try_into()?;
parse_timezone(&timezone)?;
let field = input.to_field(schema)?;
let DataType::Timestamp(timeunit, _) = &field.dtype else {
return Err(DaftError::TypeError(format!(
"Expected timestamp input to to_utc_timestamp, got {}",
field.dtype
)));
};
Ok(Field::new(field.name, DataType::Timestamp(*timeunit, None)))
}
}
Loading
Loading