Skip to content

feat: Add date_partition_column_format for spark source #5273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ message DataSource {

// Format of files at `path` (e.g. parquet, avro, etc)
string file_format = 4;

// Date Format of date partition column (e.g. %Y-%m-%d)
string date_partition_column_format = 5;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you run make protos?

Copy link
Contributor Author

@joeyutong joeyutong Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I have only run make compile-protos-python

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you run that command please? should generate some required stuff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have run make protos. It generates protos doc/html in dist/grpc

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, do you mind making a follow up PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the dist directory is included in the .gitignore file

}

// Defines configuration for custom third-party data sources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import tempfile
import uuid
import warnings
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast

import numpy as np
import pandas
Expand Down Expand Up @@ -54,6 +55,12 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel):
""" AWS Region if applicable for s3-based staging locations"""


@dataclass(frozen=True)
class SparkFeatureViewQuerContext(offline_utils.FeatureViewQueryContext):
min_date_partition: Optional[str]
max_date_partition: str


class SparkOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table_or_query(
Expand Down Expand Up @@ -100,6 +107,7 @@ def pull_latest_from_table_or_query(
aliases_as_string = ", ".join(aliases)

date_partition_column = data_source.date_partition_column
date_partition_column_format = data_source.date_partition_column_format

start_date_str = _format_datetime(start_date)
end_date_str = _format_datetime(end_date)
Expand All @@ -111,7 +119,7 @@ def pull_latest_from_table_or_query(
SELECT {fields_as_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_
FROM {from_expression} t1
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'){" AND " + date_partition_column + " >= '" + start_date.strftime("%Y-%m-%d") + "' AND " + date_partition_column + " <= '" + end_date.strftime("%Y-%m-%d") + "' " if date_partition_column != "" and date_partition_column is not None else ""}
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'){" AND " + date_partition_column + " >= '" + start_date.strftime(date_partition_column_format) + "' AND " + date_partition_column + " <= '" + end_date.strftime(date_partition_column_format) + "' " if date_partition_column != "" and date_partition_column is not None else ""}
) t2
WHERE feast_row_ = 1
"""
Expand All @@ -135,8 +143,12 @@ def get_historical_features(
full_feature_names: bool = False,
) -> RetrievalJob:
assert isinstance(config.offline_store, SparkOfflineStoreConfig)
date_partition_column_formats = []
for fv in feature_views:
assert isinstance(fv.batch_source, SparkSource)
date_partition_column_formats.append(
fv.batch_source.date_partition_column_format
)

warnings.warn(
"The spark offline store is an experimental feature in alpha development. "
Expand Down Expand Up @@ -185,8 +197,27 @@ def get_historical_features(
entity_df_event_timestamp_range,
)

spark_query_context = [
SparkFeatureViewQuerContext(
**asdict(context),
min_date_partition=datetime.fromisoformat(
context.min_event_timestamp
).strftime(date_format)
if context.min_event_timestamp is not None
else None,
max_date_partition=datetime.fromisoformat(
context.max_event_timestamp
).strftime(date_format),
)
for date_format, context in zip(
date_partition_column_formats, query_context
)
]

query = offline_utils.build_point_in_time_query(
feature_view_query_contexts=query_context,
feature_view_query_contexts=cast(
List[offline_utils.FeatureViewQueryContext], spark_query_context
),
left_table_query_string=tmp_entity_df_table_name,
entity_df_event_timestamp_col=event_timestamp_col,
entity_df_columns=entity_schema.keys(),
Expand Down Expand Up @@ -644,13 +675,13 @@ def _cast_data_frame(
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.date_partition_column != "" and featureview.date_partition_column is not none %}
AND {{ featureview.date_partition_column }} <= '{{ featureview.max_event_timestamp[:10] }}'
AND {{ featureview.date_partition_column }} <= '{{ featureview.max_date_partition }}'
{% endif %}

{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
{% if featureview.date_partition_column != "" and featureview.date_partition_column is not none %}
AND {{ featureview.date_partition_column }} >= '{{ featureview.min_event_timestamp[:10] }}'
AND {{ featureview.date_partition_column }} >= '{{ featureview.min_date_partition }}'
{% endif %}
{% endif %}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
owner: Optional[str] = "",
timestamp_field: Optional[str] = None,
date_partition_column: Optional[str] = None,
date_partition_column_format: Optional[str] = "%Y-%m-%d",
):
"""Creates a SparkSource object.

Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(
query=query,
path=path,
file_format=file_format,
date_partition_column_format=date_partition_column_format,
)

@property
Expand Down Expand Up @@ -127,6 +129,13 @@ def file_format(self):
"""
return self.spark_options.file_format

@property
def date_partition_column_format(self):
"""
Returns the date partition column format of this feature data source.
"""
return self.spark_options.date_partition_column_format

@staticmethod
def from_proto(data_source: DataSourceProto) -> Any:
assert data_source.HasField("spark_options")
Expand All @@ -139,6 +148,7 @@ def from_proto(data_source: DataSourceProto) -> Any:
query=spark_options.query,
path=spark_options.path,
file_format=spark_options.file_format,
date_partition_column_format=spark_options.date_partition_column_format,
date_partition_column=data_source.date_partition_column,
timestamp_field=data_source.timestamp_field,
created_timestamp_column=data_source.created_timestamp_column,
Expand Down Expand Up @@ -240,6 +250,7 @@ def __init__(
query: Optional[str],
path: Optional[str],
file_format: Optional[str],
date_partition_column_format: Optional[str] = "%Y-%m-%d",
):
# Check that only one of the ways to load a spark dataframe can be used. We have
# to treat empty string and null the same due to proto (de)serialization.
Expand All @@ -261,6 +272,7 @@ def __init__(
self._query = query
self._path = path
self._file_format = file_format
self._date_partition_column_format = date_partition_column_format

@property
def table(self):
Expand Down Expand Up @@ -294,6 +306,14 @@ def file_format(self):
def file_format(self, file_format):
self._file_format = file_format

@property
def date_partition_column_format(self):
return self._date_partition_column_format

@date_partition_column_format.setter
def date_partition_column_format(self, date_partition_column_format):
self._date_partition_column_format = date_partition_column_format

@classmethod
def from_proto(cls, spark_options_proto: DataSourceProto.SparkOptions):
"""
Expand All @@ -308,6 +328,7 @@ def from_proto(cls, spark_options_proto: DataSourceProto.SparkOptions):
query=spark_options_proto.query,
path=spark_options_proto.path,
file_format=spark_options_proto.file_format,
date_partition_column_format=spark_options_proto.date_partition_column_format,
)

return spark_options
Expand All @@ -323,6 +344,7 @@ def to_proto(self) -> DataSourceProto.SparkOptions:
query=self.query,
path=self.path,
file_format=self.file_format,
date_partition_column_format=self.date_partition_column_format,
)

return spark_options_proto
Expand Down
Loading
Loading