Skip to content

feat: add to_string method to SparkLikeExprDateTimeNamespace #1842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
43 changes: 43 additions & 0 deletions narwhals/_spark_like/expr_dt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import TYPE_CHECKING

from narwhals._spark_like.utils import strptime_to_pyspark_format

if TYPE_CHECKING:
from sqlframe.base.column import Column
from typing_extensions import Self
Expand All @@ -13,6 +15,47 @@ class SparkLikeExprDateTimeNamespace:
def __init__(self: Self, expr: SparkLikeExpr) -> None:
self._compliant_expr = expr

def to_string(self: Self, format: str) -> SparkLikeExpr:
F = self._compliant_expr._F # noqa: N806

def _format_iso_week_with_day(_input: Column) -> Column:
"""Format datetime as ISO week string with day."""
year = F.date_format(_input, "yyyy")
week = F.lpad(F.weekofyear(_input).cast("string"), 2, "0")
day = F.dayofweek(_input)
# Adjust Sunday from 1 to 7
day = F.when(day == 1, 7).otherwise(day - 1)
return F.concat(year, F.lit("-W"), week, F.lit("-"), day.cast("string"))

def _format_iso_week(_input: Column) -> Column:
"""Format datetime as ISO week string."""
year = F.date_format(_input, "yyyy")
week = F.lpad(F.weekofyear(_input).cast("string"), 2, "0")
return F.concat(year, F.lit("-W"), week)

def _format_iso_datetime(_input: Column) -> Column:
"""Format datetime as ISO datetime with microseconds."""
date_part = F.date_format(_input, "yyyy-MM-dd")
time_part = F.date_format(_input, "HH:mm:ss")
micros = F.unix_micros(_input) % 1_000_000
micros_str = F.lpad(micros.cast("string"), 6, "0")
return F.concat(date_part, F.lit("T"), time_part, F.lit("."), micros_str)

def _to_string(_input: Column) -> Column:
# Handle special formats
if format == "%G-W%V":
return _format_iso_week(_input)
if format == "%G-W%V-%u":
return _format_iso_week_with_day(_input)
if format in {"%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S%.f"}:
return _format_iso_datetime(_input)

# Convert Python format to PySpark format
pyspark_fmt = strptime_to_pyspark_format(format)
return F.date_format(_input, pyspark_fmt)

return self._compliant_expr._from_call(_to_string)

def date(self: Self) -> SparkLikeExpr:
return self._compliant_expr._from_call(self._compliant_expr._F.to_date)

Expand Down
33 changes: 2 additions & 31 deletions narwhals/_spark_like/expr_str.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from functools import partial
from typing import TYPE_CHECKING

from narwhals._spark_like.utils import strptime_to_pyspark_format

if TYPE_CHECKING:
from sqlframe.base.column import Column
from typing_extensions import Self
Expand Down Expand Up @@ -117,34 +119,3 @@ def to_datetime(self: Self, format: str | None) -> SparkLikeExpr:

def is_naive_format(format: str) -> bool:
return not any(x in format for x in ("%s", "%z", "Z"))


def strptime_to_pyspark_format(format: str) -> str:
"""Converts a Python strptime datetime format string to a PySpark datetime format string."""
# Mapping from Python strptime format to PySpark format

# see https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
# and https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior
format_mapping = {
"%Y": "y", # Year with century
"%y": "y", # Year without century
"%m": "M", # Month
"%d": "d", # Day of the month
"%H": "H", # Hour (24-hour clock) 0-23
"%I": "h", # Hour (12-hour clock) 1-12
"%M": "m", # Minute
"%S": "s", # Second
"%f": "S", # Microseconds -> Milliseconds
"%p": "a", # AM/PM
"%a": "E", # Abbreviated weekday name
"%A": "E", # Full weekday name
"%j": "D", # Day of the year
"%z": "Z", # Timezone offset
"%s": "X", # Unix timestamp
}

# Replace Python format specifiers with PySpark specifiers
pyspark_format = format
for py_format, spark_format in format_mapping.items():
pyspark_format = pyspark_format.replace(py_format, spark_format)
return pyspark_format.replace("T", " ")
41 changes: 41 additions & 0 deletions narwhals/_spark_like/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TYPE_CHECKING
from typing import Any
from typing import Sequence
from typing import overload

from narwhals.exceptions import UnsupportedDTypeError
from narwhals.utils import Implementation
Expand Down Expand Up @@ -269,3 +270,43 @@ def import_window(implementation: Implementation, /) -> type[Any]:
return import_module(
f"sqlframe.{_BaseSession().execution_dialect_name}.window"
).Window


@overload
def strptime_to_pyspark_format(format: None) -> None: ...


@overload
def strptime_to_pyspark_format(format: str) -> str: ...


def strptime_to_pyspark_format(format: str | None) -> str | None:
"""Converts a Python strptime datetime format string to a PySpark datetime format string."""
if format is None:
return None

# see https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
# and https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior
format_mapping = {
"%Y": "yyyy", # Year with century (4 digits)
"%y": "yy", # Year without century (2 digits)
"%m": "MM", # Month (01-12)
"%d": "dd", # Day of the month (01-31)
"%H": "HH", # Hour (24-hour clock) (00-23)
"%I": "hh", # Hour (12-hour clock) (01-12)
"%M": "mm", # Minute (00-59)
"%S": "ss", # Second (00-59)
"%f": "S", # Microseconds -> Milliseconds
"%p": "a", # AM/PM
"%a": "E", # Abbreviated weekday name
"%A": "E", # Full weekday name
"%j": "D", # Day of the year
"%z": "Z", # Timezone offset
"%s": "X", # Unix timestamp
}

# Replace Python format specifiers with PySpark specifiers
pyspark_format = format
for py_format, spark_format in format_mapping.items():
pyspark_format = pyspark_format.replace(py_format, spark_format)
return pyspark_format.replace("T", " ")
5 changes: 3 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,16 @@ def pyspark_lazy_constructor() -> Callable[[Any], IntoFrame]: # pragma: no cove
"ignore", r"Using fork\(\) can cause Polars", category=RuntimeWarning
)

# common timezone for all tests environments
os.environ["TZ"] = "UTC"

session = (
SparkSession.builder.appName("unit-tests") # pyright: ignore[reportAttributeAccessIssue]
.master("local[1]")
.config("spark.ui.enabled", "false")
# executing one task at a time makes the tests faster
.config("spark.default.parallelism", "1")
.config("spark.sql.shuffle.partitions", "2")
# common timezone for all tests environments
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
)

Expand Down
11 changes: 2 additions & 9 deletions tests/expr_and_series/dt/to_string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ def test_dt_to_string_series(constructor_eager: ConstructorEager, fmt: str) -> N
],
)
@pytest.mark.skipif(is_windows(), reason="pyarrow breaking on windows")
def test_dt_to_string_expr(
constructor: Constructor, fmt: str, request: pytest.FixtureRequest
) -> None:
if "pyspark" in str(constructor):
request.applymarker(pytest.mark.xfail)
def test_dt_to_string_expr(constructor: Constructor, fmt: str) -> None:
input_frame = nw.from_native(constructor(data))

expected_col = [datetime.strftime(d, fmt) for d in data["a"]]
Expand Down Expand Up @@ -141,7 +137,7 @@ def test_dt_to_string_iso_local_datetime_expr(
expected: str,
request: pytest.FixtureRequest,
) -> None:
if ("pyspark" in str(constructor)) or "duckdb" in str(constructor):
if "duckdb" in str(constructor):
request.applymarker(pytest.mark.xfail)
df = constructor({"a": [data]})

Expand Down Expand Up @@ -178,10 +174,7 @@ def test_dt_to_string_iso_local_date_expr(
constructor: Constructor,
data: datetime,
expected: str,
request: pytest.FixtureRequest,
) -> None:
if "pyspark" in str(constructor):
request.applymarker(pytest.mark.xfail)
df = constructor({"a": [data]})
result = nw.from_native(df).with_columns(
nw.col("a").dt.to_string("%Y-%m-%d").alias("b")
Expand Down
Loading