-
Notifications
You must be signed in to change notification settings - Fork 200
Description
from faust import App, Record
from faust.web import Request, Response, View
import asyncio
app = App('ness', broker='kafka://localhost')
data_queues = {}
async def process_message(stream):
async for record in stream:
yield record
async def process_events(stream):
async for event in stream.events():
event = stream.current_event
message = event.message
topic = event.message.topic
print(f"Received message {message} from topic {topic}")
agents = {}
topics = {}
for i in ['test-30']:
agents[i] = app.agent(channel=i, name=app.topic(i))(process_message)
topics[i] = app.topic(i)
app.start()
@app.page('/get_data/{topic_name}')
async def get_data(self, request: Request, topic_name) -> Response:
print(topic_name)
if topic_name not in agents:
return self.json({'error': 'Topic not found'}, status=404)
agent = agents[topic_name]
data = process_events(agent.stream())
return self.json({'topic_name': topic_name, 'message': 'data'})
if name == 'main':
import sys
if len(sys.argv) < 2:
sys.argv.extend(['worker', '-l', 'info'])
app.main()
Versions
- Python version: 3.11
- Faust version: faust-streaming 0.11.1
- Operating system macOS
- Kafka version 2.8
- RocksDB version (if applicable) None