Skip to content

Commit 92dde13

Browse files
feat: Implementing online_read for MilvusOnlineStore (feast-dev#4996)
1 parent 0145e55 commit 92dde13

File tree

9 files changed

+431
-60
lines changed

9 files changed

+431
-60
lines changed

docs/SUMMARY.md

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
* [Hazelcast](reference/online-stores/hazelcast.md)
117117
* [ScyllaDB](reference/online-stores/scylladb.md)
118118
* [SingleStore](reference/online-stores/singlestore.md)
119+
* [Milvus](reference/online-stores/milvus.md)
119120
* [Registries](reference/registries/README.md)
120121
* [Local](reference/registries/local.md)
121122
* [S3](reference/registries/s3.md)
+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Redis online store
2+
3+
## Description
4+
5+
The [Milvus](https://milvus.io/) online store provides support for materializing feature values into Milvus.
6+
7+
* The data model used to store feature values in Milvus is described in more detail [here](../../specs/online\_store\_format.md).
8+
9+
## Getting started
10+
In order to use this online store, you'll need to install the Milvus extra (along with the dependency needed for the offline store of choice). E.g.
11+
12+
`pip install 'feast[milvus]'`
13+
14+
You can get started by using any of the other templates (e.g. `feast init -t gcp` or `feast init -t snowflake` or `feast init -t aws`), and then swapping in Redis as the online store as seen below in the examples.
15+
16+
## Examples
17+
18+
Connecting to a local MilvusDB instance:
19+
20+
{% code title="feature_store.yaml" %}
21+
```yaml
22+
project: my_feature_repo
23+
registry: data/registry.db
24+
provider: local
25+
online_store:
26+
type: milvus
27+
path: "data/online_store.db"
28+
connection_string: "localhost:6379"
29+
embedding_dim: 128
30+
index_type: "FLAT"
31+
metric_type: "COSINE"
32+
username: "username"
33+
password: "password"
34+
```
35+
{% endcode %}
36+
37+
38+
The full set of configuration options is available in [MilvusOnlineStoreConfig](https://rtd.feast.dev/en/latest/#feast.infra.online_stores.milvus.MilvusOnlineStoreConfig).
39+
40+
## Functionality Matrix
41+
42+
The set of functionality supported by online stores is described in detail [here](overview.md#functionality).
43+
Below is a matrix indicating which functionality is supported by the Milvus online store.
44+
45+
| | Milvus |
46+
| :-------------------------------------------------------- |:-------|
47+
| write feature values to the online store | yes |
48+
| read feature values from the online store | yes |
49+
| update infrastructure (e.g. tables) in the online store | yes |
50+
| teardown infrastructure (e.g. tables) in the online store | yes |
51+
| generate a plan of infrastructure changes | no |
52+
| support for on-demand transforms | yes |
53+
| readable by Python SDK | yes |
54+
| readable by Java | no |
55+
| readable by Go | no |
56+
| support for entityless feature views | yes |
57+
| support for concurrent writing to the same key | yes |
58+
| support for ttl (time to live) at retrieval | yes |
59+
| support for deleting expired data | yes |
60+
| collocated by feature view | no |
61+
| collocated by feature service | no |
62+
| collocated by entity key | yes |
63+
64+
To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix).

examples/rag/README.md

+3-4
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ The RAG architecture combines retrieval of documents (using vector search) with
3737

3838
3. Materialize features into the online store:
3939

40-
```bash
41-
python -c "from datetime import datetime; from feast import FeatureStore; store = FeatureStore(repo_path='.')"
42-
python -c "store.materialize_incremental(datetime.utcnow())"
40+
```python
41+
store.write_to_online_store(feature_view_name='city_embeddings', df=df)
4342
```
4443
4. Run a query:
4544

@@ -61,7 +60,7 @@ feast apply
6160
store.write_to_online_store(feature_view_name='city_embeddings', df=df)
6261
```
6362

64-
-Inspect retrieved features using Python:
63+
- Inspect retrieved features using Python:
6564
```python
6665
context_data = store.retrieve_online_documents_v2(
6766
features=[

examples/rag/milvus-quickstart.ipynb

+1-1
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@
461461
}
462462
],
463463
"source": [
464-
"! feast apply "
464+
"! feast apply"
465465
]
466466
},
467467
{

sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py

+153-52
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from pydantic import StrictStr
66
from pymilvus import (
7-
Collection,
87
CollectionSchema,
98
DataType,
109
FieldSchema,
@@ -20,13 +19,13 @@
2019
)
2120
from feast.infra.online_stores.online_store import OnlineStore
2221
from feast.infra.online_stores.vector_store import VectorStoreConfig
23-
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
2422
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
2523
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2624
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
2725
from feast.repo_config import FeastConfigBaseModel, RepoConfig
2826
from feast.type_map import (
2927
PROTO_VALUE_TO_VALUE_TYPE_MAP,
28+
VALUE_TYPE_TO_PROTO_VALUE_MAP,
3029
feast_value_type_to_python_type,
3130
)
3231
from feast.types import (
@@ -35,6 +34,7 @@
3534
ComplexFeastType,
3635
PrimitiveFeastType,
3736
ValueType,
37+
from_feast_type,
3838
)
3939
from feast.utils import (
4040
_serialize_vector_to_float_list,
@@ -146,9 +146,7 @@ def _get_or_create_collection(
146146
collection_name = _table_id(config.project, table)
147147
if collection_name not in self._collections:
148148
# Create a composite key by combining entity fields
149-
composite_key_name = (
150-
"_".join([field.name for field in table.entity_columns]) + "_pk"
151-
)
149+
composite_key_name = _get_composite_key_name(table)
152150

153151
fields = [
154152
FieldSchema(
@@ -251,9 +249,8 @@ def online_write_batch(
251249
).hex()
252250
# to recover the entity key just run:
253251
# deserialize_entity_key(bytes.fromhex(entity_key_str), entity_key_serialization_version=3)
254-
composite_key_name = (
255-
"_".join([str(value) for value in entity_key.join_keys]) + "_pk"
256-
)
252+
composite_key_name = _get_composite_key_name(table)
253+
257254
timestamp_int = int(to_naive_utc(timestamp).timestamp() * 1e6)
258255
created_ts_int = (
259256
int(to_naive_utc(created_ts).timestamp() * 1e6) if created_ts else 0
@@ -293,8 +290,133 @@ def online_read(
293290
table: FeatureView,
294291
entity_keys: List[EntityKeyProto],
295292
requested_features: Optional[List[str]] = None,
293+
full_feature_names: bool = False,
296294
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
297-
raise NotImplementedError
295+
self.client = self._connect(config)
296+
collection_name = _table_id(config.project, table)
297+
collection = self._get_or_create_collection(config, table)
298+
299+
composite_key_name = _get_composite_key_name(table)
300+
301+
output_fields = (
302+
[composite_key_name]
303+
+ (requested_features if requested_features else [])
304+
+ ["created_ts", "event_ts"]
305+
)
306+
assert all(
307+
field in [f["name"] for f in collection["fields"]]
308+
for field in output_fields
309+
), (
310+
f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema"
311+
)
312+
composite_entities = []
313+
for entity_key in entity_keys:
314+
entity_key_str = serialize_entity_key(
315+
entity_key,
316+
entity_key_serialization_version=config.entity_key_serialization_version,
317+
).hex()
318+
composite_entities.append(entity_key_str)
319+
320+
query_filter_for_entities = (
321+
f"{composite_key_name} in ["
322+
+ ", ".join([f"'{e}'" for e in composite_entities])
323+
+ "]"
324+
)
325+
self.client.load_collection(collection_name)
326+
results = self.client.query(
327+
collection_name=collection_name,
328+
filter=query_filter_for_entities,
329+
output_fields=output_fields,
330+
)
331+
# Group hits by composite key.
332+
grouped_hits: Dict[str, Any] = {}
333+
for hit in results:
334+
key = hit.get(composite_key_name)
335+
grouped_hits.setdefault(key, []).append(hit)
336+
337+
# Map the features to their Feast types.
338+
feature_name_feast_primitive_type_map = {
339+
f.name: f.dtype for f in table.features
340+
}
341+
# Build a dictionary mapping composite key -> (res_ts, res)
342+
results_dict: Dict[
343+
str, Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
344+
] = {}
345+
346+
# here we need to map the data stored as characters back into the protobuf value
347+
for hit in results:
348+
key = hit.get(composite_key_name)
349+
# Only take one hit per composite key (adjust if you need aggregation)
350+
if key not in results_dict:
351+
res = {}
352+
res_ts = None
353+
for field in output_fields:
354+
val = ValueProto()
355+
field_value = hit.get(field, None)
356+
if field_value is None and ":" in field:
357+
_, field_short = field.split(":", 1)
358+
field_value = hit.get(field_short)
359+
360+
if field in ["created_ts", "event_ts"]:
361+
res_ts = datetime.fromtimestamp(field_value / 1e6)
362+
elif field == composite_key_name:
363+
# We do not return the composite key value
364+
pass
365+
else:
366+
feature_feast_primitive_type = (
367+
feature_name_feast_primitive_type_map.get(
368+
field, PrimitiveFeastType.INVALID
369+
)
370+
)
371+
feature_fv_dtype = from_feast_type(feature_feast_primitive_type)
372+
proto_attr = VALUE_TYPE_TO_PROTO_VALUE_MAP.get(feature_fv_dtype)
373+
if proto_attr:
374+
if proto_attr == "bytes_val":
375+
setattr(val, proto_attr, field_value.encode())
376+
elif proto_attr in [
377+
"int32_val",
378+
"int64_val",
379+
"float_val",
380+
"double_val",
381+
]:
382+
setattr(
383+
val,
384+
proto_attr,
385+
type(getattr(val, proto_attr))(field_value),
386+
)
387+
elif proto_attr in [
388+
"int32_list_val",
389+
"int64_list_val",
390+
"float_list_val",
391+
"double_list_val",
392+
]:
393+
setattr(
394+
val,
395+
proto_attr,
396+
list(
397+
map(
398+
type(getattr(val, proto_attr)).__args__[0],
399+
field_value,
400+
)
401+
),
402+
)
403+
else:
404+
setattr(val, proto_attr, field_value)
405+
else:
406+
raise ValueError(
407+
f"Unsupported ValueType: {feature_feast_primitive_type} with feature view value {field_value} for feature {field} with value {field_value}"
408+
)
409+
# res[field] = val
410+
key_to_use = field.split(":", 1)[-1] if ":" in field else field
411+
res[key_to_use] = val
412+
results_dict[key] = (res_ts, res if res else None)
413+
414+
# Map the results back into a list matching the original order of composite_keys.
415+
result_list = [
416+
results_dict.get(key, (None, None)) for key in composite_entities
417+
]
418+
419+
return result_list
298420

299421
def update(
300422
self,
@@ -362,11 +484,7 @@ def retrieve_online_documents_v2(
362484
"params": {"nprobe": 10},
363485
}
364486

365-
composite_key_name = (
366-
"_".join([str(field.name) for field in table.entity_columns]) + "_pk"
367-
)
368-
# features_str = ", ".join([f"'{f}'" for f in requested_features])
369-
# expr = f" && feature_name in [{features_str}]"
487+
composite_key_name = _get_composite_key_name(table)
370488

371489
output_fields = (
372490
[composite_key_name]
@@ -452,6 +570,10 @@ def _table_id(project: str, table: FeatureView) -> str:
452570
return f"{project}_{table.name}"
453571

454572

573+
def _get_composite_key_name(table: FeatureView) -> str:
574+
return "_".join([field.name for field in table.entity_columns]) + "_pk"
575+
576+
455577
def _extract_proto_values_to_dict(
456578
input_dict: Dict[str, Any],
457579
vector_cols: List[str],
@@ -462,6 +584,13 @@ def _extract_proto_values_to_dict(
462584
for k in PROTO_VALUE_TO_VALUE_TYPE_MAP.keys()
463585
if k is not None and "list" in k and "string" not in k
464586
]
587+
numeric_types = [
588+
"double_val",
589+
"float_val",
590+
"int32_val",
591+
"int64_val",
592+
"bool_val",
593+
]
465594
output_dict = {}
466595
for feature_name, feature_values in input_dict.items():
467596
for proto_val_type in PROTO_VALUE_TO_VALUE_TYPE_MAP:
@@ -475,10 +604,18 @@ def _extract_proto_values_to_dict(
475604
else:
476605
vector_values = getattr(feature_values, proto_val_type).val
477606
else:
478-
if serialize_to_string and proto_val_type != "string_val":
607+
if (
608+
serialize_to_string
609+
and proto_val_type not in ["string_val"] + numeric_types
610+
):
479611
vector_values = feature_values.SerializeToString().decode()
480612
else:
481-
vector_values = getattr(feature_values, proto_val_type)
613+
if not isinstance(feature_values, str):
614+
vector_values = str(
615+
getattr(feature_values, proto_val_type)
616+
)
617+
else:
618+
vector_values = getattr(feature_values, proto_val_type)
482619
output_dict[feature_name] = vector_values
483620
else:
484621
if serialize_to_string:
@@ -487,39 +624,3 @@ def _extract_proto_values_to_dict(
487624
output_dict[feature_name] = feature_values
488625

489626
return output_dict
490-
491-
492-
class MilvusTable(InfraObject):
493-
"""
494-
A Milvus collection managed by Feast.
495-
496-
Attributes:
497-
host: The host of the Milvus server.
498-
port: The port of the Milvus server.
499-
name: The name of the collection.
500-
"""
501-
502-
host: str
503-
port: int
504-
505-
def __init__(self, host: str, port: int, name: str):
506-
super().__init__(name)
507-
self.host = host
508-
self.port = port
509-
self._connect()
510-
511-
def _connect(self):
512-
raise NotImplementedError
513-
514-
def to_infra_object_proto(self) -> InfraObjectProto:
515-
# Implement serialization if needed
516-
raise NotImplementedError
517-
518-
def update(self):
519-
# Implement update logic if needed
520-
raise NotImplementedError
521-
522-
def teardown(self):
523-
collection = Collection(name=self.name)
524-
if collection.exists():
525-
collection.drop()

0 commit comments

Comments
 (0)