Skip to content

Commit 239cdad

Browse files
authored
update feast op to handle feast 0.31 - latest (#344)
* update feast op to handle feast 0.31 - latest * fix magic mock fixtures for feast * feast update version * move to base test requirements file * remove unneeded component feature service * remove unneeded imports * update tox to install correct versions of packages for tests regardless what is on the system * fixes for working with pandas 1.5.2 and cudf 22.12.00 * tensorflow-gpu no longer valid pypi package must use tensorflow
1 parent e3e91c3 commit 239cdad

File tree

6 files changed

+190
-20
lines changed

6 files changed

+190
-20
lines changed

merlin/systems/dag/ops/feast.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime
12
from typing import List
23

34
import numpy as np
@@ -82,7 +83,7 @@ def from_feature_view(
8283

8384
output_schema = Schema([])
8485
for feature in feature_view.features:
85-
feature_dtype, is_list, is_ragged = feast_2_numpy[feature.dtype]
86+
feature_dtype, is_list, is_ragged = feast_2_numpy[feature.dtype.to_value_type()]
8687

8788
if is_list:
8889
mh_features.append(feature.name)
@@ -165,6 +166,9 @@ def __init__(
165166
self.output_prefix = output_prefix
166167

167168
self.store = FeatureStore(repo_path=repo_path)
169+
# add feature view to the online store
170+
self.store.materialize_incremental(datetime.now(), feature_views=[self.entity_view])
171+
168172
super().__init__()
169173

170174
def __getstate__(self):

requirements/test-gpu.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
-r test.txt
22

3-
tensorflow-gpu<=2.9.0
3+
tensorflow
44
faiss-gpu==1.7.2

requirements/test.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ testbook==0.4.2
1414

1515
# packages necessary to run tests and push PRs
1616
tritonclient
17-
feast==0.18.1
17+
feast==0.31
1818
xgboost==1.6.2
1919
implicit==0.6.0
2020

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
#
2+
# Copyright (c) 2023, NVIDIA CORPORATION.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
import os
17+
from datetime import datetime
18+
19+
import numpy as np
20+
import pytest
21+
22+
from merlin.core.dispatch import make_df
23+
from merlin.schema import ColumnSchema, Schema
24+
from merlin.systems.dag import Ensemble
25+
from merlin.systems.dag.ops.feast import QueryFeast # noqa
26+
from merlin.table import Device, TensorTable # noqa
27+
28+
feast = pytest.importorskip("feast") # noqa
29+
30+
31+
def test_feast_integration(tmpdir):
32+
project_name = "test"
33+
os.system(f"cd {tmpdir} && feast init {project_name}")
34+
feast_repo = os.path.join(tmpdir, f"{project_name}")
35+
feature_repo_path = os.path.join(feast_repo, "feature_repo/")
36+
if os.path.exists(f"{feature_repo_path}/example_repo.py"):
37+
os.remove(f"{feature_repo_path}/example_repo.py")
38+
if os.path.exists(f"{feature_repo_path}/data/driver_stats.parquet"):
39+
os.remove(f"{feature_repo_path}/data/driver_stats.parquet")
40+
df_path = os.path.join(feature_repo_path, "data/", "item_features.parquet")
41+
feat_file_path = os.path.join(feature_repo_path, "item_features.py")
42+
43+
item_features = make_df(
44+
{
45+
"item_id": [1, 2, 3, 4, 5, 6, 7, 8, 9],
46+
"item_id_raw": [1, 2, 3, 4, 5, 6, 7, 8, 9],
47+
"item_category": [
48+
[1, 11],
49+
[2, 12],
50+
[3, 13],
51+
[4, 14],
52+
[5, 15],
53+
[6, 16],
54+
[7, 17],
55+
[8, 18],
56+
[9, 19],
57+
],
58+
"item_brand": [1, 2, 3, 4, 5, 6, 7, 8, 9],
59+
}
60+
)
61+
item_features = TensorTable.from_df(item_features).to_df()
62+
item_features["datetime"] = datetime.now()
63+
item_features["datetime"] = item_features["datetime"].astype("datetime64[ns]")
64+
item_features["created"] = datetime.now()
65+
item_features["created"] = item_features["created"].astype("datetime64[ns]")
66+
67+
item_features.to_parquet(df_path)
68+
69+
with open(feat_file_path, "w", encoding="utf-8") as file:
70+
file.write(
71+
f"""
72+
from datetime import timedelta
73+
from feast import Entity, Field, FeatureView, ValueType
74+
from feast.types import Int64, Array
75+
from feast.infra.offline_stores.file_source import FileSource
76+
77+
item_features = FileSource(
78+
path="{df_path}",
79+
timestamp_field="datetime",
80+
created_timestamp_column="created",
81+
)
82+
83+
item = Entity(name="item_id", value_type=ValueType.INT64, join_keys=["item_id"],)
84+
85+
item_features_view = FeatureView(
86+
name="item_features",
87+
entities=[item],
88+
ttl=timedelta(0),
89+
schema=[
90+
Field(name="item_category", dtype=Array(Int64)),
91+
Field(name="item_brand", dtype=Int64),
92+
Field(name="item_id_raw", dtype=Int64),
93+
],
94+
online=True,
95+
source=item_features,
96+
tags=dict(),
97+
)
98+
"""
99+
)
100+
101+
os.system(
102+
f"cd {feature_repo_path} && "
103+
"feast apply && "
104+
'CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") && '
105+
"feast materialize 1995-01-01T01:01:01 $CURRENT_TIME"
106+
)
107+
108+
feature_store = feast.FeatureStore(feature_repo_path)
109+
110+
# check the information is loaded and correctly querying
111+
feature_refs = [
112+
"item_features:item_id_raw",
113+
"item_features:item_category",
114+
"item_features:item_brand",
115+
]
116+
feat_df = feature_store.get_historical_features(
117+
features=feature_refs,
118+
entity_df=make_df({"item_id": [1], "event_timestamp": [datetime.now()]}, device="cpu"),
119+
).to_df()
120+
assert all(feat_df["item_id_raw"] == 1)
121+
# feature_store.write_to_online_store("item_features", item_features)
122+
# create and run ensemble with feast operator
123+
request_schema = Schema([ColumnSchema("item_id", dtype=np.int64)])
124+
graph = ["item_id"] >> QueryFeast.from_feature_view(
125+
store=feature_store,
126+
view="item_features",
127+
column="item_id",
128+
output_prefix="item",
129+
include_id=True,
130+
)
131+
ensemble = Ensemble(graph, request_schema)
132+
result = ensemble.transform(TensorTable.from_df(make_df({"item_id": [1, 2]})))
133+
columns = ["item_id_raw", "item_brand", "item_category"]
134+
if result.device == Device.GPU:
135+
for column in columns:
136+
if column == "item_category":
137+
assert (
138+
result.to_df()[column]._column.leaves()
139+
== item_features.iloc[0:2][column]._column.leaves()
140+
).all()
141+
assert (
142+
result.to_df()[column]._column.offsets
143+
== item_features.iloc[0:2][column]._column.offsets
144+
).all()
145+
else:
146+
assert (result.to_df()[column] == item_features.iloc[0:2][column]).all()
147+
else:
148+
assert result.to_df()[columns].equals(item_features.iloc[0:2][columns])

tests/unit/systems/ops/feast/test_op.py

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,33 +43,42 @@ def test_feast_from_feature_view(tmpdir):
4343
MagicMock(side_effect=QueryFeast),
4444
) as qf_init:
4545
input_source = feast.FileSource(
46-
path=tmpdir,
46+
path=str(tmpdir),
4747
event_timestamp_column="datetime",
4848
created_timestamp_column="created",
4949
)
50+
item_id = feast.Entity(
51+
name="item_id",
52+
value_type=feast.ValueType.INT32,
53+
join_keys=["item_id"],
54+
)
5055
feature_view = feast.FeatureView(
5156
name="item_features",
52-
entities=["item_id"],
57+
entities=[item_id],
5358
ttl=timedelta(seconds=100),
54-
features=[
55-
feast.Feature(name="int_feature", dtype=feast.ValueType.INT32),
56-
feast.Feature(name="float_feature", dtype=feast.ValueType.FLOAT),
57-
feast.Feature(name="int_list_feature", dtype=feast.ValueType.INT32_LIST),
58-
feast.Feature(name="float_list_feature", dtype=feast.ValueType.FLOAT_LIST),
59+
schema=[
60+
feast.Field(name="int_feature", dtype=feast.types.Int32),
61+
feast.Field(name="float_feature", dtype=feast.types.Float32),
62+
feast.Field(name="int_list_feature", dtype=feast.types.Array(feast.types.Int32)),
63+
feast.Field(
64+
name="float_list_feature", dtype=feast.types.Array(feast.types.Float32)
65+
),
5966
],
6067
online=True,
61-
input=input_source,
68+
source=input_source,
6269
tags={},
6370
)
6471
fs = feast.FeatureStore("repo_path")
6572
fs.repo_path = "repo_path"
66-
fs._registry = feast.feature_store.Registry(None, None)
67-
fs.list_entities = MagicMock(
68-
return_value=[feast.Entity(name="item_id", value_type=feast.ValueType.INT32)]
69-
)
70-
fs.get_feature_view = MagicMock(return_value=feature_view)
71-
fs._registry.get_feature_view = MagicMock(return_value=feature_view)
72-
73+
fs._registry = feast.feature_store.Registry(None, None, "repo_path")
74+
fs.list_entities = MagicMock(return_value=[item_id])
75+
76+
feast.FeatureStore._registry = MagicMock(return_value=fs._registry)
77+
feast.FeatureStore.get_feature_view = MagicMock(return_value=feature_view)
78+
feast.FeatureStore._registry.get_feature_view = MagicMock(return_value=feature_view)
79+
feast.FeatureStore.materialize_incremental = MagicMock(return_value=None)
80+
fs._registry._list_feature_views = MagicMock(return_value=feature_view)
81+
fs._get_feature_view = MagicMock(return_value=feature_view)
7382
expected_input_schema = Schema(
7483
column_schemas=[ColumnSchema(name="item_id", dtype=np.int32)]
7584
)
@@ -122,7 +131,7 @@ def test_feast_from_feature_view(tmpdir):
122131

123132
@pytest.mark.parametrize("is_ragged", [True, False])
124133
@pytest.mark.parametrize("prefix", ["prefix", ""])
125-
def test_feast_transform(prefix, is_ragged):
134+
def test_feast_transform(tmpdir, prefix, is_ragged):
126135
mocked_resp = OnlineResponse(
127136
online_response_proto=ServingService_pb2.GetOnlineFeaturesResponse(
128137
metadata=ServingService_pb2.GetOnlineFeaturesResponseMetadata(
@@ -134,10 +143,18 @@ def test_feast_transform(prefix, is_ragged):
134143
ServingService_pb2.GetOnlineFeaturesResponse.FeatureVector(
135144
values=[
136145
Value_pb2.Value(int32_val=1),
146+
]
147+
),
148+
ServingService_pb2.GetOnlineFeaturesResponse.FeatureVector(
149+
values=[
137150
Value_pb2.Value(float_val=1.0),
151+
]
152+
),
153+
ServingService_pb2.GetOnlineFeaturesResponse.FeatureVector(
154+
values=[
138155
Value_pb2.Value(float_list_val=Value_pb2.FloatList(val=[1.0, 2.0, 3.0])),
139156
]
140-
)
157+
),
141158
],
142159
)
143160
)

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ sitepackages=true
4848
setenv =
4949
TF_GPU_ALLOCATOR=cuda_malloc_async
5050
deps =
51+
-rrequirements/test-gpu.txt
5152
pytest
5253
pytest-cov
5354
commands =

0 commit comments

Comments
 (0)