-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathabsurd_worker.py
More file actions
65 lines (46 loc) · 1.91 KB
/
absurd_worker.py
File metadata and controls
65 lines (46 loc) · 1.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""Long-running worker that completes rps.stub tasks with a static response."""
from __future__ import annotations
import json
import logging
import sys
from pathlib import Path
from typing import Any
SDK_SRC = Path(__file__).parent / "third_party" / "absurd" / "sdks" / "python" / "src"
if str(SDK_SRC) not in sys.path:
sys.path.insert(0, str(SDK_SRC))
from absurd_sdk import Absurd # type: ignore
from log_config import setup_logging
from paper import USER_MESSAGE, run_agent
logger = logging.getLogger(__name__)
RESULT_PAYLOAD = {"move": "rock", "rationale": "rock beats scissors"}
app = Absurd()
# psycopg disables autocommit by default; enable it so queue setup doesn't hold locks
app._conn.autocommit = True
@app.register_task(name="rps.stub")
def rps_stub_task(_params: Any, _ctx: Any) -> dict[str, str]:
logger.info("Executing rps.stub task with params: %s", _params)
logger.debug("Task result: %s", RESULT_PAYLOAD)
return RESULT_PAYLOAD
@app.register_task(name="rps.llm")
def rps_llm_task(_params: Any, _ctx: Any) -> dict[str, str]:
logger.info("Executing rps.llm task with params: %s", _params)
move_choice = run_agent(USER_MESSAGE)
payload = move_choice.model_dump()
logger.debug("rps.llm payload: %s", payload)
action_trace = {"tool_call": {"name": "emit_move", "arguments": payload}}
logger.action("tool_call\n%s", json.dumps(action_trace, indent=2))
return payload
def main() -> None:
setup_logging()
logger.info("Starting absurd worker for task: rps.stub")
# Ensure the queue exists before starting the worker
try:
logger.info("Creating queue if it doesn't exist")
app.create_queue()
logger.info("Queue created successfully")
except Exception as e:
logger.info("Queue already exists or failed to create: %s", e)
logger.info("Starting worker polling loop")
app.start_worker()
if __name__ == "__main__":
main()