Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
java-version: 21
- uses: vemonet/setup-spark@v1
with:
spark-version: '3.5.3'
spark-version: '4.0.0'
hadoop-version: '3'
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
Expand Down
4 changes: 2 additions & 2 deletions tests/_core/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_wrong_type(spark: SparkSession):

def test_inherrited_functions(spark: SparkSession):
df = create_empty_dataset(spark, A)

assert hasattr(df, "_jdf")
df.distinct()
cached1: DataSet[A] = df.cache()
cached2: DataSet[A] = df.persist(StorageLevel.MEMORY_AND_DISK)
Expand Down Expand Up @@ -107,7 +107,7 @@ def test_schema_property_of_dataset(spark: SparkSession):

def test_initialize_dataset_implements(spark: SparkSession):
with pytest.raises(NotImplementedError):
DataSetImplements()
DataSetImplements() # type: ignore


def test_reduce(spark: SparkSession):
Expand Down
5 changes: 4 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ def spark():
"""Fixture for creating a spark session."""
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
os.environ.pop("SPARK_REMOTE", None)
os.environ.pop("PYSPARK_CONNECT_MODE_ENABLED", None)

spark = SparkSession.Builder().getOrCreate()
SparkSession._instantiatedSession = None # clear any existing session
spark = SparkSession.builder.master("local[2]").getOrCreate()
yield spark
spark.stop()
20 changes: 16 additions & 4 deletions typedspark/_core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

from __future__ import annotations

from copy import deepcopy
from typing import Callable, Generic, List, Literal, Optional, Type, TypeVar, Union, cast, overload

from pyspark import StorageLevel
from pyspark.sql import Column as SparkColumn
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType
from typing_extensions import Concatenate, ParamSpec

from typedspark._core.validate_schema import validate_schema
Expand Down Expand Up @@ -44,6 +44,11 @@ def birthday(df: DataSetImplements[Age, T]) -> DataSet[T]:

_schema_annotations: Type[_Implementation]

def __new__(cls, *args, **kwargs):
raise NotImplementedError(
"DataSetImplements should solely be used as a type annotation; it is never initialized."
)

def __init__(self):
raise NotImplementedError(
"DataSetImplements should solely be used as a type annotation, it is never initialized."
Expand Down Expand Up @@ -184,6 +189,12 @@ def __new__(cls, dataframe: DataFrame) -> DataSet[_Schema]:
be difficult to access. Subsequently, we perform schema validation, if
the schema annotations are provided.
"""
try:
schema_snapshot: StructType = StructType.fromJson(dataframe.schema.jsonValue())
except Exception:
# last-ditch: still try the property
schema_snapshot = dataframe.schema # type: ignore

dataframe = cast(DataSet, dataframe)
dataframe.__class__ = DataSet

Expand All @@ -194,13 +205,14 @@ def __new__(cls, dataframe: DataFrame) -> DataSet[_Schema]:
# then we use the class' schema annotations to validate the schema and add metadata
if hasattr(cls, "_schema_annotations"):
dataframe._schema_annotations = cls._schema_annotations # type: ignore
dataframe._schema_snapshot = schema_snapshot # type: ignore[attr-defined]
dataframe._validate_schema()
dataframe._add_schema_metadata()

return dataframe # type: ignore

def __init__(self, dataframe: DataFrame):
pass
# pylint: disable=unused-argument
self._add_schema_metadata()

def __class_getitem__(cls, item):
"""Allows us to define a schema for the ``DataSet``.
Expand All @@ -216,7 +228,7 @@ def _validate_schema(self) -> None:
"""Validates the schema of the ``DataSet`` against the schema annotations."""
validate_schema(
self._schema_annotations.get_structtype(),
deepcopy(self.schema),
self._schema_snapshot, # type: ignore
self._schema_annotations.get_schema_name(),
)

Expand Down
Loading