-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathdynamic.py
More file actions
77 lines (61 loc) · 1.94 KB
/
dynamic.py
File metadata and controls
77 lines (61 loc) · 1.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
This can also be run as a service like:
```sh
vechord --db postgresql://postgres:postgres@127.0.0.1:5432
```
And send the request to the endpoint `POST /api/run`.
"""
from os import environ
from pathlib import Path
from uuid import UUID
from tqdm import tqdm
from vechord.model import InputType, ResourceRequest, RunRequest
from vechord.pipeline import DynamicPipeline
from vechord.registry import VechordRegistry
VOYAGE_API_KEY = environ.get("VOYAGE_API_KEY")
GEMINI_API_KEY = environ.get("GEMINI_API_KEY")
namespace = "dynamic"
vr = VechordRegistry(
namespace=namespace, url="postgresql://postgres:postgres@172.17.0.1:5432/"
)
ingest_steps = [
ResourceRequest(
kind="multimodal-emb", provider="voyage", args={"api_key": VOYAGE_API_KEY}
),
ResourceRequest(kind="index", provider="vectorchord", args={"vector": {}}),
]
search_steps = [
ResourceRequest(
kind="multimodal-emb", provider="voyage", args={"api_key": VOYAGE_API_KEY}
),
ResourceRequest(
kind="search", provider="vectorchord", args={"vector": {"topk": 10}}
),
]
file_uuids: dict[UUID, Path] = {}
async def ingest(files: list[Path]):
dp = DynamicPipeline.from_steps(ingest_steps)
for file in tqdm(files):
ack = await dp.run(
request=RunRequest(
name=namespace, data=file.read_bytes(), input_type=InputType.IMAGE
),
vr=vr,
)
file_uuids[ack.uid] = file
async def search(query: str):
dp = DynamicPipeline.from_steps(search_steps)
return await dp.run(
request=RunRequest(name=namespace, data=query.encode("utf-8")),
vr=vr,
)
async def main():
async with vr:
dir = Path.home() / "Pictures"
await ingest(dir.glob("*.jpg"))
res = await search("cat")
for item in res.chunks:
print("=>", file_uuids.get(item.doc_id, "Unknown file"))
if __name__ == "__main__":
import asyncio
asyncio.run(main())