Skip to content

Commit 4ca732f

Browse files
committed
Pull-based indexer
1 parent bc5e8c3 commit 4ca732f

File tree

2 files changed

+31
-22
lines changed

2 files changed

+31
-22
lines changed

.devcontainer/python/devcontainer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
}
3838
},
3939
"containerEnv": {
40-
"GOROOT2": "./python"
40+
"NATS_URL": "http://nats-server:4222"
4141
},
4242
"shutdownAction": "stopCompose"
4343
}

python/nats-indexer.py

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
from typing import List
23
import nats
34
import asyncio
45

@@ -7,34 +8,42 @@
78
import config
89

910

10-
async def process(msg):
11-
print(f"Received a message")
12-
13-
json_data = json.loads(msg.data.decode())
14-
prepared_search_docs = prepare(SearchDoc(
15-
title=json_data["title"],
16-
author=json_data["author"],
17-
content=json_data["content"],
18-
excerpt=json_data["excerpt"],
19-
date=json_data["date"],
20-
language=json_data["language"],
21-
url=json_data["url"],
22-
))
23-
11+
async def process(msgs: List[nats.aio.msg.Msg]):
12+
jsonmsgs = []
13+
for msg in msgs:
14+
json_data = json.loads(msg.data.decode())
15+
jsonmsgs.append(SearchDoc(
16+
title=json_data["title"],
17+
author=json_data["author"],
18+
content=json_data["content"],
19+
excerpt=json_data["excerpt"],
20+
date=json_data["date"],
21+
language=json_data["language"],
22+
url=json_data["url"],
23+
))
24+
prepared_search_docs = prepare(jsonmsgs)
2425
add(prepared_search_docs)
2526

2627

2728
async def run():
2829
nc = await nats.connect(config.NATS_URL)
2930
js = nc.jetstream()
3031

31-
await js.subscribe(
32-
config.NATS_STREAM_NAME,
33-
config=nats.js.api.ConsumerConfig(durable_name=config.NATS_CONSUMER_NAME,
34-
ack_policy=nats.js.api.AckPolicy.NONE),
35-
cb=process,
36-
manual_ack=False
37-
)
32+
psub = await js.pull_subscribe("*",
33+
stream=config.NATS_STREAM_NAME,
34+
durable=config.NATS_CONSUMER_NAME)
35+
36+
while True:
37+
try:
38+
msgs = await psub.fetch(10, timeout=5)
39+
print(f"Fetched {len(msgs)} messages")
40+
await process(msgs)
41+
for msg in msgs:
42+
await msg.ack()
43+
except asyncio.TimeoutError:
44+
print("No new messages, waiting...")
45+
continue
46+
3847

3948

4049
if __name__ == "__main__":

0 commit comments

Comments
 (0)