diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 9c31851823..53dc95c737 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -226,6 +226,9 @@ message DataSource { // Format of files at `path` (e.g. parquet, avro, etc) string file_format = 4; + + // Date Format of date partition column (e.g. %Y-%m-%d) + string date_partition_column_format = 5; } // Defines configuration for custom third-party data sources. diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 41c180f5c3..4ded07c66c 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -2,8 +2,9 @@ import tempfile import uuid import warnings +from dataclasses import asdict, dataclass from datetime import datetime, timezone -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast import numpy as np import pandas @@ -54,6 +55,12 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel): """ AWS Region if applicable for s3-based staging locations""" +@dataclass(frozen=True) +class SparkFeatureViewQueryContext(offline_utils.FeatureViewQueryContext): + min_date_partition: Optional[str] + max_date_partition: str + + class SparkOfflineStore(OfflineStore): @staticmethod def pull_latest_from_table_or_query( @@ -100,6 +107,7 @@ def pull_latest_from_table_or_query( aliases_as_string = ", ".join(aliases) date_partition_column = data_source.date_partition_column + date_partition_column_format = data_source.date_partition_column_format start_date_str = _format_datetime(start_date) end_date_str = _format_datetime(end_date) @@ -111,7 +119,7 @@ def pull_latest_from_table_or_query( SELECT {fields_as_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_ FROM {from_expression} t1 - WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'){" AND " + date_partition_column + " >= '" + start_date.strftime("%Y-%m-%d") + "' AND " + date_partition_column + " <= '" + end_date.strftime("%Y-%m-%d") + "' " if date_partition_column != "" and date_partition_column is not None else ""} + WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'){" AND " + date_partition_column + " >= '" + start_date.strftime(date_partition_column_format) + "' AND " + date_partition_column + " <= '" + end_date.strftime(date_partition_column_format) + "' " if date_partition_column != "" and date_partition_column is not None else ""} ) t2 WHERE feast_row_ = 1 """ @@ -135,8 +143,12 @@ def get_historical_features( full_feature_names: bool = False, ) -> RetrievalJob: assert isinstance(config.offline_store, SparkOfflineStoreConfig) + date_partition_column_formats = [] for fv in feature_views: assert isinstance(fv.batch_source, SparkSource) + date_partition_column_formats.append( + fv.batch_source.date_partition_column_format + ) warnings.warn( "The spark offline store is an experimental feature in alpha development. " @@ -185,8 +197,27 @@ def get_historical_features( entity_df_event_timestamp_range, ) + spark_query_context = [ + SparkFeatureViewQueryContext( + **asdict(context), + min_date_partition=datetime.fromisoformat( + context.min_event_timestamp + ).strftime(date_format) + if context.min_event_timestamp is not None + else None, + max_date_partition=datetime.fromisoformat( + context.max_event_timestamp + ).strftime(date_format), + ) + for date_format, context in zip( + date_partition_column_formats, query_context + ) + ] + query = offline_utils.build_point_in_time_query( - feature_view_query_contexts=query_context, + feature_view_query_contexts=cast( + List[offline_utils.FeatureViewQueryContext], spark_query_context + ), left_table_query_string=tmp_entity_df_table_name, entity_df_event_timestamp_col=event_timestamp_col, entity_df_columns=entity_schema.keys(), @@ -644,13 +675,13 @@ def _cast_data_frame( FROM {{ featureview.table_subquery }} WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' {% if featureview.date_partition_column != "" and featureview.date_partition_column is not none %} - AND {{ featureview.date_partition_column }} <= '{{ featureview.max_event_timestamp[:10] }}' + AND {{ featureview.date_partition_column }} <= '{{ featureview.max_date_partition }}' {% endif %} {% if featureview.ttl == 0 %}{% else %} AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' {% if featureview.date_partition_column != "" and featureview.date_partition_column is not none %} - AND {{ featureview.date_partition_column }} >= '{{ featureview.min_event_timestamp[:10] }}' + AND {{ featureview.date_partition_column }} >= '{{ featureview.min_date_partition }}' {% endif %} {% endif %} ), diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 7ad331239f..f16446a4e4 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -46,6 +46,7 @@ def __init__( owner: Optional[str] = "", timestamp_field: Optional[str] = None, date_partition_column: Optional[str] = None, + date_partition_column_format: Optional[str] = "%Y-%m-%d", ): """Creates a SparkSource object. @@ -97,6 +98,7 @@ def __init__( query=query, path=path, file_format=file_format, + date_partition_column_format=date_partition_column_format, ) @property @@ -127,6 +129,13 @@ def file_format(self): """ return self.spark_options.file_format + @property + def date_partition_column_format(self): + """ + Returns the date partition column format of this feature data source. + """ + return self.spark_options.date_partition_column_format + @staticmethod def from_proto(data_source: DataSourceProto) -> Any: assert data_source.HasField("spark_options") @@ -139,6 +148,7 @@ def from_proto(data_source: DataSourceProto) -> Any: query=spark_options.query, path=spark_options.path, file_format=spark_options.file_format, + date_partition_column_format=spark_options.date_partition_column_format, date_partition_column=data_source.date_partition_column, timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, @@ -240,6 +250,7 @@ def __init__( query: Optional[str], path: Optional[str], file_format: Optional[str], + date_partition_column_format: Optional[str] = "%Y-%m-%d", ): # Check that only one of the ways to load a spark dataframe can be used. We have # to treat empty string and null the same due to proto (de)serialization. @@ -261,6 +272,7 @@ def __init__( self._query = query self._path = path self._file_format = file_format + self._date_partition_column_format = date_partition_column_format @property def table(self): @@ -294,6 +306,14 @@ def file_format(self): def file_format(self, file_format): self._file_format = file_format + @property + def date_partition_column_format(self): + return self._date_partition_column_format + + @date_partition_column_format.setter + def date_partition_column_format(self, date_partition_column_format): + self._date_partition_column_format = date_partition_column_format + @classmethod def from_proto(cls, spark_options_proto: DataSourceProto.SparkOptions): """ @@ -308,6 +328,7 @@ def from_proto(cls, spark_options_proto: DataSourceProto.SparkOptions): query=spark_options_proto.query, path=spark_options_proto.path, file_format=spark_options_proto.file_format, + date_partition_column_format=spark_options_proto.date_partition_column_format, ) return spark_options @@ -323,6 +344,7 @@ def to_proto(self) -> DataSourceProto.SparkOptions: query=self.query, path=self.path, file_format=self.file_format, + date_partition_column_format=self.date_partition_column_format, ) return spark_options_proto diff --git a/sdk/python/feast/protos/feast/core/DataSource_pb2.py b/sdk/python/feast/protos/feast/core/DataSource_pb2.py index 68bee8d760..ae03c7d0c4 100644 --- a/sdk/python/feast/protos/feast/core/DataSource_pb2.py +++ b/sdk/python/feast/protos/feast/core/DataSource_pb2.py @@ -19,7 +19,7 @@ from feast.protos.feast.core import Feature_pb2 as feast_dot_core_dot_Feature__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x66\x65\x61st/core/DataSource.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1b\x66\x65\x61st/core/DataFormat.proto\x1a\x17\x66\x65\x61st/types/Value.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\"\xc0\x16\n\nDataSource\x12\x0c\n\x04name\x18\x14 \x01(\t\x12\x0f\n\x07project\x18\x15 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x17 \x01(\t\x12.\n\x04tags\x18\x18 \x03(\x0b\x32 .feast.core.DataSource.TagsEntry\x12\r\n\x05owner\x18\x19 \x01(\t\x12/\n\x04type\x18\x01 \x01(\x0e\x32!.feast.core.DataSource.SourceType\x12?\n\rfield_mapping\x18\x02 \x03(\x0b\x32(.feast.core.DataSource.FieldMappingEntry\x12\x17\n\x0ftimestamp_field\x18\x03 \x01(\t\x12\x1d\n\x15\x64\x61te_partition_column\x18\x04 \x01(\t\x12 \n\x18\x63reated_timestamp_column\x18\x05 \x01(\t\x12\x1e\n\x16\x64\x61ta_source_class_type\x18\x11 \x01(\t\x12,\n\x0c\x62\x61tch_source\x18\x1a \x01(\x0b\x32\x16.feast.core.DataSource\x12/\n\x04meta\x18\x32 \x01(\x0b\x32!.feast.core.DataSource.SourceMeta\x12:\n\x0c\x66ile_options\x18\x0b \x01(\x0b\x32\".feast.core.DataSource.FileOptionsH\x00\x12\x42\n\x10\x62igquery_options\x18\x0c \x01(\x0b\x32&.feast.core.DataSource.BigQueryOptionsH\x00\x12<\n\rkafka_options\x18\r \x01(\x0b\x32#.feast.core.DataSource.KafkaOptionsH\x00\x12@\n\x0fkinesis_options\x18\x0e \x01(\x0b\x32%.feast.core.DataSource.KinesisOptionsH\x00\x12\x42\n\x10redshift_options\x18\x0f \x01(\x0b\x32&.feast.core.DataSource.RedshiftOptionsH\x00\x12I\n\x14request_data_options\x18\x12 \x01(\x0b\x32).feast.core.DataSource.RequestDataOptionsH\x00\x12\x44\n\x0e\x63ustom_options\x18\x10 \x01(\x0b\x32*.feast.core.DataSource.CustomSourceOptionsH\x00\x12\x44\n\x11snowflake_options\x18\x13 \x01(\x0b\x32\'.feast.core.DataSource.SnowflakeOptionsH\x00\x12:\n\x0cpush_options\x18\x16 \x01(\x0b\x32\".feast.core.DataSource.PushOptionsH\x00\x12<\n\rspark_options\x18\x1b \x01(\x0b\x32#.feast.core.DataSource.SparkOptionsH\x00\x12<\n\rtrino_options\x18\x1e \x01(\x0b\x32#.feast.core.DataSource.TrinoOptionsH\x00\x12>\n\x0e\x61thena_options\x18# \x01(\x0b\x32$.feast.core.DataSource.AthenaOptionsH\x00\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x33\n\x11\x46ieldMappingEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x82\x01\n\nSourceMeta\x12:\n\x16\x65\x61rliestEventTimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x38\n\x14latestEventTimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x1a\x65\n\x0b\x46ileOptions\x12+\n\x0b\x66ile_format\x18\x01 \x01(\x0b\x32\x16.feast.core.FileFormat\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x1c\n\x14s3_endpoint_override\x18\x03 \x01(\t\x1a/\n\x0f\x42igQueryOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a,\n\x0cTrinoOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a\xae\x01\n\x0cKafkaOptions\x12\x1f\n\x17kafka_bootstrap_servers\x18\x01 \x01(\t\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x30\n\x0emessage_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x12<\n\x19watermark_delay_threshold\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x1a\x66\n\x0eKinesisOptions\x12\x0e\n\x06region\x18\x01 \x01(\t\x12\x13\n\x0bstream_name\x18\x02 \x01(\t\x12/\n\rrecord_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x1aQ\n\x0fRedshiftOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\t\x1aT\n\rAthenaOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\x12\x13\n\x0b\x64\x61ta_source\x18\x04 \x01(\t\x1aX\n\x10SnowflakeOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\tJ\x04\x08\x05\x10\x06\x1aO\n\x0cSparkOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0c\n\x04path\x18\x03 \x01(\t\x12\x13\n\x0b\x66ile_format\x18\x04 \x01(\t\x1a,\n\x13\x43ustomSourceOptions\x12\x15\n\rconfiguration\x18\x01 \x01(\x0c\x1a\xf7\x01\n\x12RequestDataOptions\x12Z\n\x11\x64\x65precated_schema\x18\x02 \x03(\x0b\x32?.feast.core.DataSource.RequestDataOptions.DeprecatedSchemaEntry\x12)\n\x06schema\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x1aT\n\x15\x44\x65precatedSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0e\x32\x1b.feast.types.ValueType.Enum:\x02\x38\x01J\x04\x08\x01\x10\x02\x1a\x13\n\x0bPushOptionsJ\x04\x08\x01\x10\x02\"\xf8\x01\n\nSourceType\x12\x0b\n\x07INVALID\x10\x00\x12\x0e\n\nBATCH_FILE\x10\x01\x12\x13\n\x0f\x42\x41TCH_SNOWFLAKE\x10\x08\x12\x12\n\x0e\x42\x41TCH_BIGQUERY\x10\x02\x12\x12\n\x0e\x42\x41TCH_REDSHIFT\x10\x05\x12\x10\n\x0cSTREAM_KAFKA\x10\x03\x12\x12\n\x0eSTREAM_KINESIS\x10\x04\x12\x11\n\rCUSTOM_SOURCE\x10\x06\x12\x12\n\x0eREQUEST_SOURCE\x10\x07\x12\x0f\n\x0bPUSH_SOURCE\x10\t\x12\x0f\n\x0b\x42\x41TCH_TRINO\x10\n\x12\x0f\n\x0b\x42\x41TCH_SPARK\x10\x0b\x12\x10\n\x0c\x42\x41TCH_ATHENA\x10\x0c\x42\t\n\x07optionsJ\x04\x08\x06\x10\x0b\"=\n\x0e\x44\x61taSourceList\x12+\n\x0b\x64\x61tasources\x18\x01 \x03(\x0b\x32\x16.feast.core.DataSourceBT\n\x10\x66\x65\x61st.proto.coreB\x0f\x44\x61taSourceProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x66\x65\x61st/core/DataSource.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1b\x66\x65\x61st/core/DataFormat.proto\x1a\x17\x66\x65\x61st/types/Value.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\"\xe6\x16\n\nDataSource\x12\x0c\n\x04name\x18\x14 \x01(\t\x12\x0f\n\x07project\x18\x15 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x17 \x01(\t\x12.\n\x04tags\x18\x18 \x03(\x0b\x32 .feast.core.DataSource.TagsEntry\x12\r\n\x05owner\x18\x19 \x01(\t\x12/\n\x04type\x18\x01 \x01(\x0e\x32!.feast.core.DataSource.SourceType\x12?\n\rfield_mapping\x18\x02 \x03(\x0b\x32(.feast.core.DataSource.FieldMappingEntry\x12\x17\n\x0ftimestamp_field\x18\x03 \x01(\t\x12\x1d\n\x15\x64\x61te_partition_column\x18\x04 \x01(\t\x12 \n\x18\x63reated_timestamp_column\x18\x05 \x01(\t\x12\x1e\n\x16\x64\x61ta_source_class_type\x18\x11 \x01(\t\x12,\n\x0c\x62\x61tch_source\x18\x1a \x01(\x0b\x32\x16.feast.core.DataSource\x12/\n\x04meta\x18\x32 \x01(\x0b\x32!.feast.core.DataSource.SourceMeta\x12:\n\x0c\x66ile_options\x18\x0b \x01(\x0b\x32\".feast.core.DataSource.FileOptionsH\x00\x12\x42\n\x10\x62igquery_options\x18\x0c \x01(\x0b\x32&.feast.core.DataSource.BigQueryOptionsH\x00\x12<\n\rkafka_options\x18\r \x01(\x0b\x32#.feast.core.DataSource.KafkaOptionsH\x00\x12@\n\x0fkinesis_options\x18\x0e \x01(\x0b\x32%.feast.core.DataSource.KinesisOptionsH\x00\x12\x42\n\x10redshift_options\x18\x0f \x01(\x0b\x32&.feast.core.DataSource.RedshiftOptionsH\x00\x12I\n\x14request_data_options\x18\x12 \x01(\x0b\x32).feast.core.DataSource.RequestDataOptionsH\x00\x12\x44\n\x0e\x63ustom_options\x18\x10 \x01(\x0b\x32*.feast.core.DataSource.CustomSourceOptionsH\x00\x12\x44\n\x11snowflake_options\x18\x13 \x01(\x0b\x32\'.feast.core.DataSource.SnowflakeOptionsH\x00\x12:\n\x0cpush_options\x18\x16 \x01(\x0b\x32\".feast.core.DataSource.PushOptionsH\x00\x12<\n\rspark_options\x18\x1b \x01(\x0b\x32#.feast.core.DataSource.SparkOptionsH\x00\x12<\n\rtrino_options\x18\x1e \x01(\x0b\x32#.feast.core.DataSource.TrinoOptionsH\x00\x12>\n\x0e\x61thena_options\x18# \x01(\x0b\x32$.feast.core.DataSource.AthenaOptionsH\x00\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x33\n\x11\x46ieldMappingEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x82\x01\n\nSourceMeta\x12:\n\x16\x65\x61rliestEventTimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x38\n\x14latestEventTimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x1a\x65\n\x0b\x46ileOptions\x12+\n\x0b\x66ile_format\x18\x01 \x01(\x0b\x32\x16.feast.core.FileFormat\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x1c\n\x14s3_endpoint_override\x18\x03 \x01(\t\x1a/\n\x0f\x42igQueryOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a,\n\x0cTrinoOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a\xae\x01\n\x0cKafkaOptions\x12\x1f\n\x17kafka_bootstrap_servers\x18\x01 \x01(\t\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x30\n\x0emessage_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x12<\n\x19watermark_delay_threshold\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x1a\x66\n\x0eKinesisOptions\x12\x0e\n\x06region\x18\x01 \x01(\t\x12\x13\n\x0bstream_name\x18\x02 \x01(\t\x12/\n\rrecord_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x1aQ\n\x0fRedshiftOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\t\x1aT\n\rAthenaOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\x12\x13\n\x0b\x64\x61ta_source\x18\x04 \x01(\t\x1aX\n\x10SnowflakeOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\tJ\x04\x08\x05\x10\x06\x1au\n\x0cSparkOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0c\n\x04path\x18\x03 \x01(\t\x12\x13\n\x0b\x66ile_format\x18\x04 \x01(\t\x12$\n\x1c\x64\x61te_partition_column_format\x18\x05 \x01(\t\x1a,\n\x13\x43ustomSourceOptions\x12\x15\n\rconfiguration\x18\x01 \x01(\x0c\x1a\xf7\x01\n\x12RequestDataOptions\x12Z\n\x11\x64\x65precated_schema\x18\x02 \x03(\x0b\x32?.feast.core.DataSource.RequestDataOptions.DeprecatedSchemaEntry\x12)\n\x06schema\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x1aT\n\x15\x44\x65precatedSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0e\x32\x1b.feast.types.ValueType.Enum:\x02\x38\x01J\x04\x08\x01\x10\x02\x1a\x13\n\x0bPushOptionsJ\x04\x08\x01\x10\x02\"\xf8\x01\n\nSourceType\x12\x0b\n\x07INVALID\x10\x00\x12\x0e\n\nBATCH_FILE\x10\x01\x12\x13\n\x0f\x42\x41TCH_SNOWFLAKE\x10\x08\x12\x12\n\x0e\x42\x41TCH_BIGQUERY\x10\x02\x12\x12\n\x0e\x42\x41TCH_REDSHIFT\x10\x05\x12\x10\n\x0cSTREAM_KAFKA\x10\x03\x12\x12\n\x0eSTREAM_KINESIS\x10\x04\x12\x11\n\rCUSTOM_SOURCE\x10\x06\x12\x12\n\x0eREQUEST_SOURCE\x10\x07\x12\x0f\n\x0bPUSH_SOURCE\x10\t\x12\x0f\n\x0b\x42\x41TCH_TRINO\x10\n\x12\x0f\n\x0b\x42\x41TCH_SPARK\x10\x0b\x12\x10\n\x0c\x42\x41TCH_ATHENA\x10\x0c\x42\t\n\x07optionsJ\x04\x08\x06\x10\x0b\"=\n\x0e\x44\x61taSourceList\x12+\n\x0b\x64\x61tasources\x18\x01 \x03(\x0b\x32\x16.feast.core.DataSourceBT\n\x10\x66\x65\x61st.proto.coreB\x0f\x44\x61taSourceProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -34,7 +34,7 @@ _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._options = None _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_options = b'8\001' _globals['_DATASOURCE']._serialized_start=189 - _globals['_DATASOURCE']._serialized_end=3069 + _globals['_DATASOURCE']._serialized_end=3107 _globals['_DATASOURCE_TAGSENTRY']._serialized_start=1436 _globals['_DATASOURCE_TAGSENTRY']._serialized_end=1479 _globals['_DATASOURCE_FIELDMAPPINGENTRY']._serialized_start=1481 @@ -58,17 +58,17 @@ _globals['_DATASOURCE_SNOWFLAKEOPTIONS']._serialized_start=2315 _globals['_DATASOURCE_SNOWFLAKEOPTIONS']._serialized_end=2403 _globals['_DATASOURCE_SPARKOPTIONS']._serialized_start=2405 - _globals['_DATASOURCE_SPARKOPTIONS']._serialized_end=2484 - _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_start=2486 - _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_end=2530 - _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_start=2533 - _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_end=2780 - _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_start=2690 - _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_end=2774 - _globals['_DATASOURCE_PUSHOPTIONS']._serialized_start=2782 - _globals['_DATASOURCE_PUSHOPTIONS']._serialized_end=2801 - _globals['_DATASOURCE_SOURCETYPE']._serialized_start=2804 - _globals['_DATASOURCE_SOURCETYPE']._serialized_end=3052 - _globals['_DATASOURCELIST']._serialized_start=3071 - _globals['_DATASOURCELIST']._serialized_end=3132 + _globals['_DATASOURCE_SPARKOPTIONS']._serialized_end=2522 + _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_start=2524 + _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_end=2568 + _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_start=2571 + _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_end=2818 + _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_start=2728 + _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_end=2812 + _globals['_DATASOURCE_PUSHOPTIONS']._serialized_start=2820 + _globals['_DATASOURCE_PUSHOPTIONS']._serialized_end=2839 + _globals['_DATASOURCE_SOURCETYPE']._serialized_start=2842 + _globals['_DATASOURCE_SOURCETYPE']._serialized_end=3090 + _globals['_DATASOURCELIST']._serialized_start=3109 + _globals['_DATASOURCELIST']._serialized_end=3170 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi b/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi index aadec3fad4..39d37ed904 100644 --- a/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi @@ -360,6 +360,7 @@ class DataSource(google.protobuf.message.Message): QUERY_FIELD_NUMBER: builtins.int PATH_FIELD_NUMBER: builtins.int FILE_FORMAT_FIELD_NUMBER: builtins.int + DATE_PARTITION_COLUMN_FORMAT_FIELD_NUMBER: builtins.int table: builtins.str """Table name""" query: builtins.str @@ -368,6 +369,8 @@ class DataSource(google.protobuf.message.Message): """Path from which spark can read the table, this is an alternative to `table`""" file_format: builtins.str """Format of files at `path` (e.g. parquet, avro, etc)""" + date_partition_column_format: builtins.str + """Date Format of date partition column (e.g. %Y-%m-%d)""" def __init__( self, *, @@ -375,8 +378,9 @@ class DataSource(google.protobuf.message.Message): query: builtins.str = ..., path: builtins.str = ..., file_format: builtins.str = ..., + date_partition_column_format: builtins.str = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["file_format", b"file_format", "path", b"path", "query", b"query", "table", b"table"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["date_partition_column_format", b"date_partition_column_format", "file_format", b"file_format", "path", b"path", "query", b"query", "table", b"table"]) -> None: ... class CustomSourceOptions(google.protobuf.message.Message): """Defines configuration for custom third-party data sources.""" diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark.py b/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark.py index 307ba4058c..938514a2ca 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark.py @@ -1,6 +1,10 @@ from datetime import datetime from unittest.mock import MagicMock, patch +import pandas as pd + +from feast.entity import Entity +from feast.feature_view import FeatureView, Field from feast.infra.offline_stores.contrib.spark_offline_store.spark import ( SparkOfflineStore, SparkOfflineStoreConfig, @@ -10,6 +14,7 @@ ) from feast.infra.offline_stores.offline_store import RetrievalJob from feast.repo_config import RepoConfig +from feast.types import Float32, ValueType @patch( @@ -248,3 +253,89 @@ def test_pull_latest_from_table_without_nested_timestamp_or_query_and_date_parti assert isinstance(retrieval_job, RetrievalJob) assert retrieval_job.query.strip() == expected_query.strip() + + +@patch( + "feast.infra.offline_stores.contrib.spark_offline_store.spark.get_spark_session_or_start_new_with_repoconfig" +) +def test_get_historical_features(mock_get_spark_session): + mock_spark_session = MagicMock() + mock_get_spark_session.return_value = mock_spark_session + + test_repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=SparkOfflineStoreConfig(type="spark"), + ) + + test_data_source1 = SparkSource( + name="test_nested_batch_source1", + description="test_nested_batch_source", + table="offline_store_database_name.offline_store_table_name1", + timestamp_field="nested_timestamp", + field_mapping={ + "event_header.event_published_datetime_utc": "nested_timestamp", + }, + date_partition_column="effective_date", + date_partition_column_format="%Y%m%d", + ) + + test_data_source2 = SparkSource( + name="test_nested_batch_source2", + description="test_nested_batch_source", + table="offline_store_database_name.offline_store_table_name2", + timestamp_field="nested_timestamp", + field_mapping={ + "event_header.event_published_datetime_utc": "nested_timestamp", + }, + date_partition_column="effective_date", + ) + + test_feature_view1 = FeatureView( + name="test_feature_view1", + entities=_mock_entity(), + schema=[ + Field(name="feature1", dtype=Float32), + ], + source=test_data_source1, + ) + + test_feature_view2 = FeatureView( + name="test_feature_view2", + entities=_mock_entity(), + schema=[ + Field(name="feature2", dtype=Float32), + ], + source=test_data_source2, + ) + + # Create a DataFrame with the required event_timestamp column + entity_df = pd.DataFrame( + {"event_timestamp": [datetime(2021, 1, 1)], "driver_id": [1]} + ) + + mock_registry = MagicMock() + retrieval_job = SparkOfflineStore.get_historical_features( + config=test_repo_config, + feature_views=[test_feature_view2, test_feature_view1], + feature_refs=["test_feature_view2:feature2", "test_feature_view1:feature1"], + entity_df=entity_df, + registry=mock_registry, + project="test_project", + ) + query = retrieval_job.query.strip() + + assert "effective_date <= '2021-01-01'" in query + assert "effective_date <= '20210101'" in query + + +def _mock_entity(): + return [ + Entity( + name="driver_id", + join_keys=["driver_id"], + description="Driver ID", + value_type=ValueType.INT64, + ) + ]