Skip to content

Commit 3224571

Browse files
test: Fix for integration tests (feast-dev#5126)
* chore: Fixing Milvus integration test configuration Signed-off-by: Francisco Javier Arceo <[email protected]> * chore: Fix Milvus integration tests Signed-off-by: ntkathole <[email protected]> * Make sure all entities have values Signed-off-by: ntkathole <[email protected]> * Fixed test_retrieve_online_milvus_documents Signed-off-by: ntkathole <[email protected]> --------- Signed-off-by: Francisco Javier Arceo <[email protected]> Signed-off-by: ntkathole <[email protected]> Co-authored-by: Francisco Javier Arceo <[email protected]>
1 parent e846dea commit 3224571

20 files changed

+393
-171
lines changed

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ ci = [
135135
"pytest-mock==1.10.4",
136136
"pytest-env",
137137
"Sphinx>4.0.0,<7",
138-
"testcontainers==4.8.2",
138+
"testcontainers==4.9.0",
139139
"python-keycloak==4.2.2",
140140
"pre-commit<3.3.2",
141141
"assertpy==1.1",

sdk/python/feast/cli.py

+1
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
884884
"hazelcast",
885885
"ikv",
886886
"couchbase",
887+
"milvus",
887888
],
888889
case_sensitive=False,
889890
),

sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py

+22-5
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig):
8888
"""
8989

9090
type: Literal["milvus"] = "milvus"
91-
path: Optional[StrictStr] = "data/online_store.db"
91+
path: Optional[StrictStr] = "online_store.db"
9292
host: Optional[StrictStr] = "localhost"
9393
port: Optional[int] = 19530
9494
index_type: Optional[str] = "FLAT"
@@ -245,6 +245,8 @@ def online_write_batch(
245245
collection = self._get_or_create_collection(config, table)
246246
vector_cols = [f.name for f in table.features if f.vector_index]
247247
entity_batch_to_insert = []
248+
unique_entities: dict[str, dict[str, Any]] = {}
249+
required_fields = {field["name"] for field in collection["fields"]}
248250
for entity_key, values_dict, timestamp, created_ts in data:
249251
# need to construct the composite primary key also need to handle the fact that entities are a list
250252
entity_key_str = serialize_entity_key(
@@ -278,12 +280,22 @@ def online_write_batch(
278280
"created_ts": created_ts_int,
279281
}
280282
single_entity_record.update(values_dict)
281-
entity_batch_to_insert.append(single_entity_record)
283+
# Ensure all required fields exist, setting missing ones to empty strings
284+
for field in required_fields:
285+
if field not in single_entity_record:
286+
single_entity_record[field] = ""
287+
# Store only the latest event timestamp per entity
288+
if (
289+
entity_key_str not in unique_entities
290+
or unique_entities[entity_key_str]["event_ts"] < timestamp_int
291+
):
292+
unique_entities[entity_key_str] = single_entity_record
282293

283294
if progress:
284295
progress(1)
285296

286-
self.client.insert(
297+
entity_batch_to_insert = list(unique_entities.values())
298+
self.client.upsert(
287299
collection_name=collection["collection_name"],
288300
data=entity_batch_to_insert,
289301
)
@@ -551,11 +563,16 @@ def retrieve_online_documents_v2(
551563
field, PrimitiveFeastType.INVALID
552564
) in [
553565
PrimitiveFeastType.STRING,
566+
PrimitiveFeastType.BYTES,
567+
]:
568+
res[field] = ValueProto(string_val=str(field_value))
569+
elif entity_name_feast_primitive_type_map.get(
570+
field, PrimitiveFeastType.INVALID
571+
) in [
554572
PrimitiveFeastType.INT64,
555573
PrimitiveFeastType.INT32,
556-
PrimitiveFeastType.BYTES,
557574
]:
558-
res[field] = ValueProto(string_val=field_value)
575+
res[field] = ValueProto(int64_val=int(field_value))
559576
elif field == composite_key_name:
560577
pass
561578
elif isinstance(field_value, bytes):

sdk/python/feast/infra/online_stores/milvus_online_store/milvus_repo_configuration.py

+3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from tests.integration.feature_repos.integration_test_repo_config import (
22
IntegrationTestRepoConfig,
33
)
4+
from tests.integration.feature_repos.repo_configuration import MILVUS_CONFIG
45
from tests.integration.feature_repos.universal.online_store.milvus import (
56
MilvusOnlineStoreCreator,
67
)
@@ -10,3 +11,5 @@
1011
online_store="milvus", online_store_creator=MilvusOnlineStoreCreator
1112
),
1213
]
14+
15+
AVAILABLE_ONLINE_STORES = {"milvus": (MILVUS_CONFIG, MilvusOnlineStoreCreator)}

sdk/python/feast/repo_config.py

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
"feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore": "feast.infra.online_stores.elasticsearch_online_store.ElasticSearchOnlineStore",
6262
"feast.infra.online_stores.contrib.singlestore_online_store.singlestore.SingleStoreOnlineStore": "feast.infra.online_stores.singlestore_online_store.singlestore.SingleStoreOnlineStore",
6363
"feast.infra.online_stores.contrib.qdrant.QdrantOnlineStore": "feast.infra.online_stores.cqdrant.QdrantOnlineStore",
64+
"feast.infra.online_stores.contrib.milvus.MilvusOnlineStore": "feast.infra.online_stores.milvus.MilvusOnlineStore",
6465
}
6566

6667
ONLINE_STORE_CLASS_FOR_TYPE = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Byte-compiled / optimized / DLL files
2+
__pycache__/
3+
*.py[cod]
4+
*.pyo
5+
*.pyd
6+
7+
# C extensions
8+
*.so
9+
10+
# Distribution / packaging
11+
.Python
12+
env/
13+
venv/
14+
ENV/
15+
env.bak/
16+
venv.bak/
17+
*.egg-info/
18+
dist/
19+
build/
20+
.venv
21+
22+
# Pytest
23+
.cache
24+
*.cover
25+
*.log
26+
.coverage
27+
nosetests.xml
28+
coverage.xml
29+
*.hypothesis/
30+
*.pytest_cache/
31+
32+
# Jupyter Notebook
33+
.ipynb_checkpoints
34+
35+
# IDEs and Editors
36+
.vscode/
37+
.idea/
38+
*.swp
39+
*.swo
40+
*.sublime-workspace
41+
*.sublime-project
42+
43+
# OS generated files
44+
.DS_Store
45+
Thumbs.db
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Feast Quickstart
2+
If you haven't already, check out the quickstart guide on Feast's website (http://docs.feast.dev/quickstart), which
3+
uses this repo. A quick view of what's in this repository's `feature_repo/` directory:
4+
5+
* `data/` contains raw demo parquet data
6+
* `feature_repo/example_repo.py` contains demo feature definitions
7+
* `feature_repo/feature_store.yaml` contains a demo setup configuring where data sources are
8+
* `feature_repo/test_workflow.py` showcases how to run all key Feast commands, including defining, retrieving, and pushing features.
9+
10+
You can run the overall workflow with `python test_workflow.py`.
11+
12+
## To move from this into a more production ready workflow:
13+
> See more details in [Running Feast in production](https://docs.feast.dev/how-to-guides/running-feast-in-production)
14+
15+
1. First: you should start with a different Feast template, which delegates to a more scalable offline store.
16+
- For example, running `feast init -t gcp`
17+
or `feast init -t aws` or `feast init -t snowflake`.
18+
- You can see your options if you run `feast init --help`.
19+
2. `feature_store.yaml` points to a local file as a registry. You'll want to setup a remote file (e.g. in S3/GCS) or a
20+
SQL registry. See [registry docs](https://docs.feast.dev/getting-started/concepts/registry) for more details.
21+
3. This example uses a file [offline store](https://docs.feast.dev/getting-started/components/offline-store)
22+
to generate training data. It does not scale. We recommend instead using a data warehouse such as BigQuery,
23+
Snowflake, Redshift. There is experimental support for Spark as well.
24+
4. Setup CI/CD + dev vs staging vs prod environments to automatically update the registry as you change Feast feature definitions. See [docs](https://docs.feast.dev/how-to-guides/running-feast-in-production#1.-automatically-deploying-changes-to-your-feature-definitions).
25+
5. (optional) Regularly scheduled materialization to power low latency feature retrieval (e.g. via Airflow). See [Batch data ingestion](https://docs.feast.dev/getting-started/concepts/data-ingestion#batch-data-ingestion)
26+
for more details.
27+
6. (optional) Deploy feature server instances with `feast serve` to expose endpoints to retrieve online features.
28+
- See [Python feature server](https://docs.feast.dev/reference/feature-servers/python-feature-server) for details.
29+
- Use cases can also directly call the Feast client to fetch features as per [Feature retrieval](https://docs.feast.dev/getting-started/concepts/feature-retrieval)

sdk/python/feast/templates/milvus/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from feast.file_utils import replace_str_in_file
2+
3+
4+
def bootstrap():
5+
# Bootstrap() will automatically be called from the init_repo() during `feast init`
6+
7+
import pathlib
8+
from datetime import datetime, timedelta
9+
10+
from feast.driver_test_data import create_driver_hourly_stats_df
11+
12+
repo_path = pathlib.Path(__file__).parent.absolute() / "feature_repo"
13+
project_name = pathlib.Path(__file__).parent.absolute().name
14+
data_path = repo_path / "data"
15+
data_path.mkdir(exist_ok=True)
16+
17+
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
18+
start_date = end_date - timedelta(days=15)
19+
20+
driver_entities = [1001, 1002, 1003, 1004, 1005]
21+
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)
22+
23+
driver_stats_path = data_path / "driver_stats.parquet"
24+
driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True)
25+
26+
example_py_file = repo_path / "example_repo.py"
27+
replace_str_in_file(example_py_file, "%PROJECT_NAME%", str(project_name))
28+
replace_str_in_file(
29+
example_py_file, "%PARQUET_PATH%", str(driver_stats_path.relative_to(repo_path))
30+
)
31+
replace_str_in_file(
32+
example_py_file, "%LOGGING_PATH%", str(data_path.relative_to(repo_path))
33+
)
34+
35+
36+
if __name__ == "__main__":
37+
bootstrap()

0 commit comments

Comments
 (0)