-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy path_stream.py
More file actions
98 lines (81 loc) · 3.43 KB
/
Copy path_stream.py
File metadata and controls
98 lines (81 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""StreamProcessor: parse newline-delimited JSON output from various CLIs."""
from __future__ import annotations
import json
class StreamProcessor:
"""Process streaming JSON output from various CLIs.
Recognized formats:
- Claude / Cursor: a single ``{"type": "result", "result": ...}`` line
- Gemini stream: ``init`` then assistant ``message`` lines, ending with ``result``
- Codex stream: ``thread.started`` then ``item.completed`` lines, ending with ``turn.completed``
"""
def __init__(self):
self.result_json = None
self.gemini_parts = []
self.codex_messages = []
self.is_gemini = False
self.is_codex = False
def process_line(self, line: str) -> bool:
"""Process one line. Returns True when a terminal event is reached."""
line = line.strip()
if not line or self.result_json is not None:
return False
try:
data = json.loads(line)
except json.JSONDecodeError:
# Some CLIs prepend a non-JSON banner to the first event line. Gemini
# CLI, for example, emits "MCP issues detected. Run /mcp list for
# status." glued onto the `init` JSON when an MCP server is
# unreachable. Without recovery the `init` line never parses,
# is_gemini stays False, and all assistant text is dropped (empty
# result). Recover by parsing from the first brace; a line with no
# JSON object (or one whose JSON starts at column 0 yet still failed)
# is ignored, as before.
brace = line.find("{")
if brace <= 0:
return False
try:
data = json.loads(line[brace:])
except json.JSONDecodeError:
return False
if data.get("type") == "init":
self.is_gemini = True
return False
if data.get("type") == "thread.started":
self.is_codex = True
return False
if self.is_gemini and data.get("type") == "message" and data.get("role") == "assistant":
content = data.get("content", "")
if isinstance(content, str):
self.gemini_parts.append(content)
return False
if self.is_codex and data.get("type") == "item.completed":
item = data.get("item", {})
if item.get("type") == "agent_message" and isinstance(item.get("text"), str):
self.codex_messages.append(item["text"])
return False
# Codex: turn.completed signals end
if self.is_codex and data.get("type") == "turn.completed":
self.result_json = {
"type": "result",
"result": "\n".join(self.codex_messages),
"status": "success",
}
return True
# Result type signals completion
if data.get("type") == "result":
if self.is_gemini:
self.result_json = {
"type": "result",
"result": "".join(self.gemini_parts),
"status": data.get("status", "success"),
}
else:
self.result_json = data
return True
# Fallback: first valid JSON without type field
if "type" not in data:
self.result_json = data
return True
return False
def get_result(self):
return self.result_json