From 5c4dd910c8af0e470a85f17a0eb2d9afc396fcca Mon Sep 17 00:00:00 2001 From: Dhanunjaya Elluri Date: Mon, 20 Jan 2025 16:31:51 +0100 Subject: [PATCH 1/5] feat: add `to_string` method to Spark `Expr.dt` --- narwhals/_spark_like/expr_dt.py | 50 ++++++++++++++++++++++ tests/conftest.py | 1 + tests/expr_and_series/dt/to_string_test.py | 11 +---- 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/narwhals/_spark_like/expr_dt.py b/narwhals/_spark_like/expr_dt.py index ad5d58b829..d68edcad65 100644 --- a/narwhals/_spark_like/expr_dt.py +++ b/narwhals/_spark_like/expr_dt.py @@ -15,6 +15,56 @@ class SparkLikeExprDateTimeNamespace: def __init__(self: Self, expr: SparkLikeExpr) -> None: self._compliant_expr = expr + def to_string(self: Self, format: str) -> SparkLikeExpr: # noqa: A002 + def _format_iso_week_with_day(_input: Column) -> Column: + """Format datetime as ISO week string with day.""" + year = F.date_format(_input, "YYYY") + week = F.lpad(F.weekofyear(_input).cast("string"), 2, "0") + day = F.dayofweek(_input) + # Adjust Sunday from 1 to 7 + day = F.when(day == 1, 7).otherwise(day - 1) + return F.concat(year, F.lit("-W"), week, F.lit("-"), day.cast("string")) + + def _format_iso_week(_input: Column) -> Column: + """Format datetime as ISO week string.""" + year = F.date_format(_input, "YYYY") + week = F.lpad(F.weekofyear(_input).cast("string"), 2, "0") + return F.concat(year, F.lit("-W"), week) + + def _format_iso_datetime(_input: Column) -> Column: + """Format datetime as ISO datetime with microseconds.""" + date_part = F.date_format(_input, "yyyy-MM-dd") + time_part = F.date_format(_input, "HH:mm:ss") + micros = F.unix_micros(_input) % 1_000_000 + micros_str = F.lpad(micros.cast("string"), 6, "0") + return F.concat(date_part, F.lit("T"), time_part, F.lit("."), micros_str) + + def _to_string(_input: Column) -> Column: + # Handle special formats + if format == "%G-W%V": + return _format_iso_week(_input) + if format == "%G-W%V-%u": + return _format_iso_week_with_day(_input) + if format in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S%.f"): + return _format_iso_datetime(_input) + + # Standard format conversions + java_fmt = ( + format.replace("%Y", "yyyy") + .replace("%m", "MM") + .replace("%d", "dd") + .replace("%H", "HH") + .replace("%M", "mm") + .replace("%S", "ss") + ) + return F.date_format(_input, java_fmt) + + return self._compliant_expr._from_call( + _to_string, + "to_string", + returns_scalar=self._compliant_expr._returns_scalar, + ) + def date(self: Self) -> SparkLikeExpr: return self._compliant_expr._from_call( F.to_date, diff --git a/tests/conftest.py b/tests/conftest.py index 857af06f0d..cb55fb4f85 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -159,6 +159,7 @@ def pyspark_lazy_constructor() -> Callable[[Any], IntoFrame]: # pragma: no cove .config("spark.sql.shuffle.partitions", "2") # common timezone for all tests environments .config("spark.sql.session.timeZone", "UTC") + .config("spark.sql.legacy.timeParserPolicy", "LEGACY") .getOrCreate() ) diff --git a/tests/expr_and_series/dt/to_string_test.py b/tests/expr_and_series/dt/to_string_test.py index 6428e35a63..e05351224f 100644 --- a/tests/expr_and_series/dt/to_string_test.py +++ b/tests/expr_and_series/dt/to_string_test.py @@ -59,11 +59,7 @@ def test_dt_to_string_series(constructor_eager: ConstructorEager, fmt: str) -> N ], ) @pytest.mark.skipif(is_windows(), reason="pyarrow breaking on windows") -def test_dt_to_string_expr( - constructor: Constructor, fmt: str, request: pytest.FixtureRequest -) -> None: - if "pyspark" in str(constructor): - request.applymarker(pytest.mark.xfail) +def test_dt_to_string_expr(constructor: Constructor, fmt: str) -> None: input_frame = nw.from_native(constructor(data)) expected_col = [datetime.strftime(d, fmt) for d in data["a"]] @@ -141,7 +137,7 @@ def test_dt_to_string_iso_local_datetime_expr( expected: str, request: pytest.FixtureRequest, ) -> None: - if ("pyspark" in str(constructor)) or "duckdb" in str(constructor): + if "duckdb" in str(constructor): request.applymarker(pytest.mark.xfail) df = constructor({"a": [data]}) @@ -178,10 +174,7 @@ def test_dt_to_string_iso_local_date_expr( constructor: Constructor, data: datetime, expected: str, - request: pytest.FixtureRequest, ) -> None: - if "pyspark" in str(constructor): - request.applymarker(pytest.mark.xfail) df = constructor({"a": [data]}) result = nw.from_native(df).with_columns( nw.col("a").dt.to_string("%Y-%m-%d").alias("b") From baf84f03de5f3214faa46e938efb66b3ac408e7c Mon Sep 17 00:00:00 2001 From: Dhanunjaya Elluri Date: Wed, 22 Jan 2025 18:39:01 +0000 Subject: [PATCH 2/5] refactor(spark): centralize datetime format conversion --- narwhals/_spark_like/expr_dt.py | 15 ++++-------- narwhals/_spark_like/utils.py | 41 +++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/narwhals/_spark_like/expr_dt.py b/narwhals/_spark_like/expr_dt.py index d68edcad65..abaa0af4c4 100644 --- a/narwhals/_spark_like/expr_dt.py +++ b/narwhals/_spark_like/expr_dt.py @@ -4,6 +4,8 @@ from pyspark.sql import functions as F # noqa: N812 +from narwhals._spark_like.utils import strptime_to_pyspark_format + if TYPE_CHECKING: from pyspark.sql import Column from typing_extensions import Self @@ -48,16 +50,9 @@ def _to_string(_input: Column) -> Column: if format in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S%.f"): return _format_iso_datetime(_input) - # Standard format conversions - java_fmt = ( - format.replace("%Y", "yyyy") - .replace("%m", "MM") - .replace("%d", "dd") - .replace("%H", "HH") - .replace("%M", "mm") - .replace("%S", "ss") - ) - return F.date_format(_input, java_fmt) + # Convert Python format to PySpark format + pyspark_fmt = strptime_to_pyspark_format(format) + return F.date_format(_input, pyspark_fmt) return self._compliant_expr._from_call( _to_string, diff --git a/narwhals/_spark_like/utils.py b/narwhals/_spark_like/utils.py index dbea477207..0c7c552762 100644 --- a/narwhals/_spark_like/utils.py +++ b/narwhals/_spark_like/utils.py @@ -3,6 +3,7 @@ from functools import lru_cache from typing import TYPE_CHECKING from typing import Any +from typing import overload from pyspark.sql import functions as F # noqa: N812 @@ -195,3 +196,43 @@ def _var(_input: Column | str, ddof: int, np_version: tuple[int, ...]) -> Column input_col = F.col(_input) if isinstance(_input, str) else _input return var(input_col, ddof=ddof) + + +@overload +def strptime_to_pyspark_format(format: None) -> None: ... + + +@overload +def strptime_to_pyspark_format(format: str) -> str: ... + + +def strptime_to_pyspark_format(format: str | None) -> str | None: # noqa: A002 + """Converts a Python strptime datetime format string to a PySpark datetime format string.""" + if format is None: + return None + + # see https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html + # and https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior + format_mapping = { + "%Y": "yyyy", # Year with century (4 digits) + "%y": "yy", # Year without century (2 digits) + "%m": "MM", # Month (01-12) + "%d": "dd", # Day of the month (01-31) + "%H": "HH", # Hour (24-hour clock) (00-23) + "%I": "hh", # Hour (12-hour clock) (01-12) + "%M": "mm", # Minute (00-59) + "%S": "ss", # Second (00-59) + "%f": "S", # Microseconds -> Milliseconds + "%p": "a", # AM/PM + "%a": "E", # Abbreviated weekday name + "%A": "E", # Full weekday name + "%j": "D", # Day of the year + "%z": "Z", # Timezone offset + "%s": "X", # Unix timestamp + } + + # Replace Python format specifiers with PySpark specifiers + pyspark_format = format + for py_format, spark_format in format_mapping.items(): + pyspark_format = pyspark_format.replace(py_format, spark_format) + return pyspark_format.replace("T", " ") From 5b96b0ca65f816496e705b6b17f0868ab1e34c85 Mon Sep 17 00:00:00 2001 From: Dhanunjaya Elluri Date: Wed, 22 Jan 2025 20:05:11 +0100 Subject: [PATCH 3/5] refactor(spark): centralize datetime format conversion --- narwhals/_spark_like/expr_str.py | 44 ++------------------------------ 1 file changed, 2 insertions(+), 42 deletions(-) diff --git a/narwhals/_spark_like/expr_str.py b/narwhals/_spark_like/expr_str.py index c8cc3cadfa..7198189557 100644 --- a/narwhals/_spark_like/expr_str.py +++ b/narwhals/_spark_like/expr_str.py @@ -1,10 +1,11 @@ from __future__ import annotations from typing import TYPE_CHECKING -from typing import overload from pyspark.sql import functions as F # noqa: N812 +from narwhals._spark_like.utils import strptime_to_pyspark_format + if TYPE_CHECKING: from pyspark.sql import Column from typing_extensions import Self @@ -123,44 +124,3 @@ def to_datetime(self: Self, format: str | None) -> SparkLikeExpr: # noqa: A002 "to_datetime", returns_scalar=self._compliant_expr._returns_scalar, ) - - -@overload -def strptime_to_pyspark_format(format: None) -> None: ... - - -@overload -def strptime_to_pyspark_format(format: str) -> str: ... - - -def strptime_to_pyspark_format(format: str | None) -> str | None: # noqa: A002 - """Converts a Python strptime datetime format string to a PySpark datetime format string.""" - # Mapping from Python strptime format to PySpark format - if format is None: - return None - - # see https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html - # and https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior - format_mapping = { - "%Y": "y", # Year with century - "%y": "y", # Year without century - "%m": "M", # Month - "%d": "d", # Day of the month - "%H": "H", # Hour (24-hour clock) 0-23 - "%I": "h", # Hour (12-hour clock) 1-12 - "%M": "m", # Minute - "%S": "s", # Second - "%f": "S", # Microseconds -> Milliseconds - "%p": "a", # AM/PM - "%a": "E", # Abbreviated weekday name - "%A": "E", # Full weekday name - "%j": "D", # Day of the year - "%z": "Z", # Timezone offset - "%s": "X", # Unix timestamp - } - - # Replace Python format specifiers with PySpark specifiers - pyspark_format = format - for py_format, spark_format in format_mapping.items(): - pyspark_format = pyspark_format.replace(py_format, spark_format) - return pyspark_format.replace("T", " ") From 93a45d501d374a6333c70bbb69d6ea88f9219ef3 Mon Sep 17 00:00:00 2001 From: Francesco Bruzzesi <42817048+FBruzzesi@users.noreply.github.com> Date: Thu, 23 Jan 2025 08:46:16 +0100 Subject: [PATCH 4/5] fix: raise on selectors addition (#1854) --- narwhals/_arrow/selectors.py | 10 ---------- narwhals/_dask/selectors.py | 10 ---------- narwhals/_pandas_like/selectors.py | 10 ---------- narwhals/selectors.py | 29 ++++++++++++++++++++++++++++- tests/selectors_test.py | 19 ++++++++++--------- 5 files changed, 38 insertions(+), 40 deletions(-) diff --git a/narwhals/_arrow/selectors.py b/narwhals/_arrow/selectors.py index 48e837ec7e..36feb5d562 100644 --- a/narwhals/_arrow/selectors.py +++ b/narwhals/_arrow/selectors.py @@ -2,7 +2,6 @@ from typing import TYPE_CHECKING from typing import Any -from typing import NoReturn from typing import Sequence from narwhals._arrow.expr import ArrowExpr @@ -178,12 +177,3 @@ def __invert__(self: Self) -> ArrowSelector: ).all() - self ) - - def __rsub__(self: Self, other: Any) -> NoReturn: - raise NotImplementedError - - def __rand__(self: Self, other: Any) -> NoReturn: - raise NotImplementedError - - def __ror__(self: Self, other: Any) -> NoReturn: - raise NotImplementedError diff --git a/narwhals/_dask/selectors.py b/narwhals/_dask/selectors.py index 703e248600..9e6cc63021 100644 --- a/narwhals/_dask/selectors.py +++ b/narwhals/_dask/selectors.py @@ -2,7 +2,6 @@ from typing import TYPE_CHECKING from typing import Any -from typing import NoReturn from narwhals._dask.expr import DaskExpr from narwhals.utils import import_dtypes_module @@ -186,12 +185,3 @@ def __invert__(self: Self) -> DaskSelector: ).all() - self ) - - def __rsub__(self: Self, other: Any) -> NoReturn: - raise NotImplementedError - - def __rand__(self: Self, other: Any) -> NoReturn: - raise NotImplementedError - - def __ror__(self: Self, other: Any) -> NoReturn: - raise NotImplementedError diff --git a/narwhals/_pandas_like/selectors.py b/narwhals/_pandas_like/selectors.py index c4ddfff36a..b3518283f8 100644 --- a/narwhals/_pandas_like/selectors.py +++ b/narwhals/_pandas_like/selectors.py @@ -2,7 +2,6 @@ from typing import TYPE_CHECKING from typing import Any -from typing import NoReturn from narwhals._pandas_like.expr import PandasLikeExpr from narwhals.utils import import_dtypes_module @@ -189,12 +188,3 @@ def __invert__(self: Self) -> PandasSelector: ).all() - self ) - - def __rsub__(self, other: Any) -> NoReturn: - raise NotImplementedError - - def __rand__(self, other: Any) -> NoReturn: - raise NotImplementedError - - def __ror__(self, other: Any) -> NoReturn: - raise NotImplementedError diff --git a/narwhals/selectors.py b/narwhals/selectors.py index e67424281e..dabf6f83f2 100644 --- a/narwhals/selectors.py +++ b/narwhals/selectors.py @@ -1,12 +1,39 @@ from __future__ import annotations +from typing import TYPE_CHECKING from typing import Any +from typing import NoReturn from narwhals.expr import Expr from narwhals.utils import flatten +if TYPE_CHECKING: + from typing_extensions import Self -class Selector(Expr): ... + +class Selector(Expr): + def _to_expr(self: Self) -> Expr: + return Expr( + to_compliant_expr=self._to_compliant_expr, + is_order_dependent=self._is_order_dependent, + changes_length=self._changes_length, + aggregates=self._aggregates, + ) + + def __add__(self: Self, other: Any) -> Expr: # type: ignore[override] + if isinstance(other, Selector): + msg = "unsupported operand type(s) for op: ('Selector' + 'Selector')" + raise TypeError(msg) + return self._to_expr() + other # type: ignore[no-any-return] + + def __rsub__(self: Self, other: Any) -> NoReturn: + raise NotImplementedError + + def __rand__(self: Self, other: Any) -> NoReturn: + raise NotImplementedError + + def __ror__(self: Self, other: Any) -> NoReturn: + raise NotImplementedError def by_dtype(*dtypes: Any) -> Expr: diff --git a/tests/selectors_test.py b/tests/selectors_test.py index 80aa648032..36ea15a4f2 100644 --- a/tests/selectors_test.py +++ b/tests/selectors_test.py @@ -1,7 +1,7 @@ from __future__ import annotations -import pandas as pd -import pyarrow as pa +import re + import pytest import narwhals.stable.v1 as nw @@ -103,16 +103,17 @@ def test_set_ops( assert sorted(result) == expected -@pytest.mark.parametrize("invalid_constructor", [pd.DataFrame, pa.table]) -def test_set_ops_invalid( - invalid_constructor: Constructor, request: pytest.FixtureRequest -) -> None: - if "duckdb" in str(invalid_constructor): - request.applymarker(pytest.mark.xfail) - df = nw.from_native(invalid_constructor(data)) +def test_set_ops_invalid(constructor: Constructor) -> None: + df = nw.from_native(constructor(data)) with pytest.raises((NotImplementedError, ValueError)): df.select(1 - numeric()) with pytest.raises((NotImplementedError, ValueError)): df.select(1 | numeric()) with pytest.raises((NotImplementedError, ValueError)): df.select(1 & numeric()) + + with pytest.raises( + TypeError, + match=re.escape("unsupported operand type(s) for op: ('Selector' + 'Selector')"), + ): + df.select(boolean() + numeric()) From a476ba68e0ae24f098e8ca1b1b06e6f82995fa6c Mon Sep 17 00:00:00 2001 From: Dhanunjaya Elluri Date: Thu, 23 Jan 2025 11:09:36 +0100 Subject: [PATCH 5/5] fix: remove spark config UTC and LEGACY --- narwhals/_spark_like/expr_dt.py | 4 ++-- tests/conftest.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/narwhals/_spark_like/expr_dt.py b/narwhals/_spark_like/expr_dt.py index abaa0af4c4..be9f768bd5 100644 --- a/narwhals/_spark_like/expr_dt.py +++ b/narwhals/_spark_like/expr_dt.py @@ -20,7 +20,7 @@ def __init__(self: Self, expr: SparkLikeExpr) -> None: def to_string(self: Self, format: str) -> SparkLikeExpr: # noqa: A002 def _format_iso_week_with_day(_input: Column) -> Column: """Format datetime as ISO week string with day.""" - year = F.date_format(_input, "YYYY") + year = F.date_format(_input, "yyyy") week = F.lpad(F.weekofyear(_input).cast("string"), 2, "0") day = F.dayofweek(_input) # Adjust Sunday from 1 to 7 @@ -29,7 +29,7 @@ def _format_iso_week_with_day(_input: Column) -> Column: def _format_iso_week(_input: Column) -> Column: """Format datetime as ISO week string.""" - year = F.date_format(_input, "YYYY") + year = F.date_format(_input, "yyyy") week = F.lpad(F.weekofyear(_input).cast("string"), 2, "0") return F.concat(year, F.lit("-W"), week) diff --git a/tests/conftest.py b/tests/conftest.py index a1a6226940..b7daf1d3c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -150,6 +150,9 @@ def pyspark_lazy_constructor() -> Callable[[Any], IntoFrame]: # pragma: no cove "ignore", r"Using fork\(\) can cause Polars", category=RuntimeWarning ) + # common timezone for all tests environments + os.environ["TZ"] = "UTC" + session = ( SparkSession.builder.appName("unit-tests") .master("local[1]") @@ -157,9 +160,6 @@ def pyspark_lazy_constructor() -> Callable[[Any], IntoFrame]: # pragma: no cove # executing one task at a time makes the tests faster .config("spark.default.parallelism", "1") .config("spark.sql.shuffle.partitions", "2") - # common timezone for all tests environments - .config("spark.sql.session.timeZone", "UTC") - .config("spark.sql.legacy.timeParserPolicy", "LEGACY") .getOrCreate() )