-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfinal_graph.py
More file actions
176 lines (137 loc) Β· 6.43 KB
/
final_graph.py
File metadata and controls
176 lines (137 loc) Β· 6.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from langgraph.graph import StateGraph, START, END
from graph_states import FinalState
from langgraph.checkpoint.memory import InMemorySaver
from research_graph import create_research_graph
from forum_graph import create_forum_graph
from IPython.display import display,Image
import base64
from IPython.display import display, Markdown, Image
import uuid # Import uuid here as well for generating thread_id if needed elsewhere
import base64
from IPython.display import display, Markdown, Image
import uuid # Import uuid here as well for generating thread_id if needed elsewhere
def render_bettafish_report(final_state):
"""
Renders the Final Report in the style of BettaFish using the
structured data from your new 'Round Digest' architecture.
"""
# Extract data from the state
# Note: In LangGraph, accessing values depends on if you have the dict or the object
# We assume 'final_state' is the dictionary returned by .invoke() or .get_state().values
report = final_state.get("final_report")
digests = final_state.get("round_digests", [])
raw_references = final_state.get("references", [])
references = set(tuple(r) for r in raw_references)
# If using the 'final_report' key from the graph output, it might be inside a dict
# Adjust extraction if needed based on your specific return statement
if isinstance(report, dict) and "structured_response" in report:
report = report["structured_response"]
# --- 1. Header & Executive Summary ---
md_output = f"""
# π¦ Strategic Decision Matrix: {report.title}
---
### π Executive Summary
{report.executive_summary}
---
### π₯ The Debate Scorecard (Round-by-Round Analysis)
This matrix tracks the flow of dominance throughout the debate phases.
| Round | **Agent A (Thesis)** | **Agent B (Antithesis)** | **Winner** |
| :--- | :--- | :--- | :--- |
"""
# --- 2. The Battle Matrix (Iterating through Round Digests) ---
for digest in digests:
# Format the winner with an icon
winner_display = "βοΈ Draw"
if "Agent A" in digest.winner_of_round or "Bull" in digest.winner_of_round or "Pro" in digest.winner_of_round:
winner_display = "π **Agent A**"
elif "Agent B" in digest.winner_of_round or "Bear" in digest.winner_of_round or "Con" in digest.winner_of_round:
winner_display = "π **Agent B**"
# Join list arguments into bullet points for the table
pro_args = "<br>".join([f"β’ {arg}" for arg in digest.key_arguments_pro])
con_args = "<br>".join([f"β’ {arg}" for arg in digest.key_arguments_con])
row = f"| **{digest.round_number}** | {pro_args} | {con_args} | {winner_display} |\n"
md_output += row
sorted_references = sorted(list(references))
# --- 3. Consensus & Unique Angles ---
md_output += "\n\n### π€ Consensus & Novelty\n"
md_output += "**Agreed Reality:**\n"
for p in report.consensus_points:
md_output += f"- {p}\n"
md_output += "\n**Unique Perspectives Uncovered:**\n"
for p in report.unique_perspectives:
md_output += f"- {p}\n"
md_output += "\n\n### π Bibliography & Data Sources\n"
if sorted_references:
for idx, ref in enumerate(sorted_references, 1):
# Check if it looks like a URL or just a string
if ref[1].startswith("http"):
md_output += f"{idx}. [{ref[0]}]({ref[1]})\n"
else:
md_output += f"{idx}. {ref[0]}\n"
else:
md_output += "*No specific data sources cited in the structured output.*"
# # --- 4. Render Markdown ---
# display(Markdown(md_output))
# # --- 5. Render The Argument Map (Mermaid) ---
# print("\nVisualizing the Logic Tree...")
# Combine the accumulated mermaid subgraphs into one main graph
raw_mermaid = final_state.get("running_mermaid_graph", "")
# If the accumulator didn't add the header, add it now
if "graph TD" not in raw_mermaid and "flowchart" not in raw_mermaid:
full_mermaid = f"graph TD\n{raw_mermaid}"
else:
full_mermaid = raw_mermaid
# # Clean up any potential markdown code fences from the LLM
# full_mermaid = full_mermaid.replace("```mermaid", "").replace("```", "").strip()
# # Render via API
# graphbytes = full_mermaid.encode("utf8")
# base64_bytes = base64.b64encode(graphbytes)
# base64_string = base64_bytes.decode("ascii")
# url = "https://mermaid.ink/img/" + base64_string
# display(Image(url=url))
md_output = f"{md_output}\n\n## Visual Logic Map\n```mermaid\n{full_mermaid}\n```"
# --- 6. Save to File ---
with open("strategic_report.md", "w") as f:
f.write(md_output)
# f.write("\n\n## Visual Logic Map\n```mermaid\n" + full_mermaid + "\n```")
return md_output
async def create_final_graph():
research_graph = await create_research_graph()
forum_graph = await create_forum_graph()
final_blueprint = StateGraph(FinalState)
final_blueprint.add_node("Research Graph",research_graph)
final_blueprint.add_node("Forum Graph",forum_graph)
final_blueprint.add_edge(START,"Research Graph")
final_blueprint.add_edge("Research Graph","Forum Graph")
final_blueprint.add_edge("Forum Graph",END)
final_graph = final_blueprint.compile(checkpointer=InMemorySaver())
display(Image(final_graph.get_graph().draw_mermaid_png()))
return final_graph
if __name__ == "__main__":
import asyncio
async def main():
final_graph = await create_final_graph()
thread_id = str(uuid.uuid4())
config = {"configurable":{"thread_id":thread_id},"recursion_limit":100}
initial_state = FinalState(
query="Is AI investment in 2024 a bubble or undervalued opportunity?",
vector_store="",
query_limit=0,
max_rounds=1,
current_round=1,
step=0,
round_digests=[],
running_mermaid_graph="",
debate_history=[],
final_report=None,
messages = [],
references = []
)
async for parent,child in final_graph.astream(initial_state,config=config,stream_mode="updates",subgraphs=True):
print(child.keys())
snapshot = final_graph.get_state(config={"configurable":{"thread_id":thread_id}})
if snapshot.values:
render_bettafish_report(snapshot.values)
else:
print("No state found. Did the graph finish?")
asyncio.run(main())