-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent_node.py
More file actions
91 lines (80 loc) · 3.35 KB
/
agent_node.py
File metadata and controls
91 lines (80 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from fastapi import FastAPI, Request, BackgroundTasks
from pydantic import BaseModel
import httpx, uvicorn, psutil, asyncio, time, socket, random
DISCOVERY = "http://localhost:8500"
AGENT_NAME = f"agent-{random.randint(100,999)}"
PORT = random.randint(8600, 8700)
app = FastAPI(title=f"{AGENT_NAME} Peer Agent")
class RPC(BaseModel):
jsonrpc: str
method: str
params: dict | None = None
id: str | None = None
# -------------------- RPC methods --------------------
async def handle_info(_):
cpu = psutil.cpu_percent(interval=0.1)
mem = psutil.virtual_memory().percent
return {"agent": AGENT_NAME, "cpu": cpu, "mem": mem}
async def handle_work(params):
text = params.get("text", "")
await asyncio.sleep(random.uniform(0.2, 0.6))
return {"agent": AGENT_NAME, "result": f"Processed: {text}"}
# delegate work to least-loaded peer
async def handle_delegate(params):
async with httpx.AsyncClient() as client:
agents = (await client.get(f"{DISCOVERY}/services")).json()["active_agents"]
peers = [v["url"] for k, v in agents.items() if k != AGENT_NAME]
if not peers:
return {"note": "no peers available"}
# query each peer for load
async def ping_peer(url):
start = time.time()
try:
r = await client.post(f"http://{url}/rpc",
json={"jsonrpc": "2.0","method":"info","params":{},"id":"1"},
timeout=2)
data = r.json()["result"]
data["latency_ms"] = round((time.time() - start)*1000,2)
data["url"] = url
return data
except Exception:
return None
stats = [s for s in await asyncio.gather(*(ping_peer(u) for u in peers)) if s]
if not stats: return {"note":"all peers unreachable"}
target = sorted(stats, key=lambda x: (x["cpu"], x["latency_ms"]))[0]
payload = {"jsonrpc":"2.0","method":"work","params":params,"id":"2"}
r = await client.post(f"http://{target['url']}/rpc", json=payload)
res = r.json()["result"]
return {"delegated_to": target, "peer_result": res}
METHODS = {"info": handle_info, "work": handle_work, "delegate": handle_delegate}
@app.post("/rpc")
async def rpc(req: Request):
body = await req.json()
rpc = RPC(**body)
if rpc.method not in METHODS:
return {"jsonrpc":"2.0","error":{"code":-32601,"message":"not found"},"id":rpc.id}
start = time.time()
result = await METHODS[rpc.method](rpc.params or {})
result["rpc_latency_ms"] = round((time.time()-start)*1000,2)
return {"jsonrpc":"2.0","result":result,"id":rpc.id}
# -------------------- discovery + heartbeat --------------------
async def register():
async with httpx.AsyncClient() as c:
url = f"127.0.0.1:{PORT}"
await c.post(f"{DISCOVERY}/register", json={"name": AGENT_NAME, "url": url})
print(f"✅ {AGENT_NAME} registered at {url}")
async def heartbeat():
async with httpx.AsyncClient() as c:
url = f"127.0.0.1:{PORT}"
while True:
try:
await c.post(f"{DISCOVERY}/register", json={"name": AGENT_NAME, "url": url})
except Exception:
pass
await asyncio.sleep(10)
@app.on_event("startup")
async def startup():
await register()
asyncio.create_task(heartbeat())
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=PORT)