-
Notifications
You must be signed in to change notification settings - Fork 703
/
Copy pathgraph.py
298 lines (242 loc) · 12.4 KB
/
graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
import json
from typing_extensions import Literal
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langchain_ollama import ChatOllama
from langgraph.graph import START, END, StateGraph
from ollama_deep_researcher.configuration import Configuration, SearchAPI
from ollama_deep_researcher.utils import deduplicate_and_format_sources, tavily_search, format_sources, perplexity_search, duckduckgo_search, searxng_search, strip_thinking_tokens, get_config_value
from ollama_deep_researcher.state import SummaryState, SummaryStateInput, SummaryStateOutput
from ollama_deep_researcher.prompts import query_writer_instructions, summarizer_instructions, reflection_instructions, get_current_date
from ollama_deep_researcher.lmstudio import ChatLMStudio
# Nodes
def generate_query(state: SummaryState, config: RunnableConfig):
"""LangGraph node that generates a search query based on the research topic.
Uses an LLM to create an optimized search query for web research based on
the user's research topic. Supports both LMStudio and Ollama as LLM providers.
Args:
state: Current graph state containing the research topic
config: Configuration for the runnable, including LLM provider settings
Returns:
Dictionary with state update, including search_query key containing the generated query
"""
# Format the prompt
current_date = get_current_date()
formatted_prompt = query_writer_instructions.format(
current_date=current_date,
research_topic=state.research_topic
)
# Generate a query
configurable = Configuration.from_runnable_config(config)
# Choose the appropriate LLM based on the provider
if configurable.llm_provider == "lmstudio":
llm_json_mode = ChatLMStudio(
base_url=configurable.lmstudio_base_url,
model=configurable.local_llm,
temperature=0,
format="json"
)
else: # Default to Ollama
llm_json_mode = ChatOllama(
base_url=configurable.ollama_base_url,
model=configurable.local_llm,
temperature=0,
format="json"
)
result = llm_json_mode.invoke(
[SystemMessage(content=formatted_prompt),
HumanMessage(content=f"Generate a query for web search:")]
)
# Get the content
content = result.content
# Parse the JSON response and get the query
try:
query = json.loads(content)
search_query = query['query']
except (json.JSONDecodeError, KeyError):
# If parsing fails or the key is not found, use a fallback query
if configurable.strip_thinking_tokens:
content = strip_thinking_tokens(content)
search_query = content
return {"search_query": search_query}
def web_research(state: SummaryState, config: RunnableConfig):
"""LangGraph node that performs web research using the generated search query.
Executes a web search using the configured search API (tavily, perplexity,
duckduckgo, or searxng) and formats the results for further processing.
Args:
state: Current graph state containing the search query and research loop count
config: Configuration for the runnable, including search API settings
Returns:
Dictionary with state update, including sources_gathered, research_loop_count, and web_research_results
"""
# Configure
configurable = Configuration.from_runnable_config(config)
# Get the search API
search_api = get_config_value(configurable.search_api)
# Search the web
if search_api == "tavily":
search_results = tavily_search(state.search_query, fetch_full_page=configurable.fetch_full_page, max_results=1)
search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, fetch_full_page=configurable.fetch_full_page)
elif search_api == "perplexity":
search_results = perplexity_search(state.search_query, state.research_loop_count)
search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, fetch_full_page=configurable.fetch_full_page)
elif search_api == "duckduckgo":
search_results = duckduckgo_search(state.search_query, max_results=3, fetch_full_page=configurable.fetch_full_page)
search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, fetch_full_page=configurable.fetch_full_page)
elif search_api == "searxng":
search_results = searxng_search(state.search_query, max_results=3, fetch_full_page=configurable.fetch_full_page)
search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, fetch_full_page=configurable.fetch_full_page)
else:
raise ValueError(f"Unsupported search API: {configurable.search_api}")
return {"sources_gathered": [format_sources(search_results)], "research_loop_count": state.research_loop_count + 1, "web_research_results": [search_str]}
def summarize_sources(state: SummaryState, config: RunnableConfig):
"""LangGraph node that summarizes web research results.
Uses an LLM to create or update a running summary based on the newest web research
results, integrating them with any existing summary.
Args:
state: Current graph state containing research topic, running summary,
and web research results
config: Configuration for the runnable, including LLM provider settings
Returns:
Dictionary with state update, including running_summary key containing the updated summary
"""
# Existing summary
existing_summary = state.running_summary
# Most recent web research
most_recent_web_research = state.web_research_results[-1]
# Build the human message
if existing_summary:
human_message_content = (
f"<Existing Summary> \n {existing_summary} \n <Existing Summary>\n\n"
f"<New Context> \n {most_recent_web_research} \n <New Context>"
f"Update the Existing Summary with the New Context on this topic: \n <User Input> \n {state.research_topic} \n <User Input>\n\n"
)
else:
human_message_content = (
f"<Context> \n {most_recent_web_research} \n <Context>"
f"Create a Summary using the Context on this topic: \n <User Input> \n {state.research_topic} \n <User Input>\n\n"
)
# Run the LLM
configurable = Configuration.from_runnable_config(config)
# Choose the appropriate LLM based on the provider
if configurable.llm_provider == "lmstudio":
llm = ChatLMStudio(
base_url=configurable.lmstudio_base_url,
model=configurable.local_llm,
temperature=0
)
else: # Default to Ollama
llm = ChatOllama(
base_url=configurable.ollama_base_url,
model=configurable.local_llm,
temperature=0
)
result = llm.invoke(
[SystemMessage(content=summarizer_instructions),
HumanMessage(content=human_message_content)]
)
# Strip thinking tokens if configured
running_summary = result.content
if configurable.strip_thinking_tokens:
running_summary = strip_thinking_tokens(running_summary)
return {"running_summary": running_summary}
def reflect_on_summary(state: SummaryState, config: RunnableConfig):
"""LangGraph node that identifies knowledge gaps and generates follow-up queries.
Analyzes the current summary to identify areas for further research and generates
a new search query to address those gaps. Uses structured output to extract
the follow-up query in JSON format.
Args:
state: Current graph state containing the running summary and research topic
config: Configuration for the runnable, including LLM provider settings
Returns:
Dictionary with state update, including search_query key containing the generated follow-up query
"""
# Generate a query
configurable = Configuration.from_runnable_config(config)
# Choose the appropriate LLM based on the provider
if configurable.llm_provider == "lmstudio":
llm_json_mode = ChatLMStudio(
base_url=configurable.lmstudio_base_url,
model=configurable.local_llm,
temperature=0,
format="json"
)
else: # Default to Ollama
llm_json_mode = ChatOllama(
base_url=configurable.ollama_base_url,
model=configurable.local_llm,
temperature=0,
format="json"
)
result = llm_json_mode.invoke(
[SystemMessage(content=reflection_instructions.format(research_topic=state.research_topic)),
HumanMessage(content=f"Reflect on our existing knowledge: \n === \n {state.running_summary}, \n === \n And now identify a knowledge gap and generate a follow-up web search query:")]
)
# Strip thinking tokens if configured
try:
# Try to parse as JSON first
reflection_content = json.loads(result.content)
# Get the follow-up query
query = reflection_content.get('follow_up_query')
# Check if query is None or empty
if not query:
# Use a fallback query
return {"search_query": f"Tell me more about {state.research_topic}"}
return {"search_query": query}
except (json.JSONDecodeError, KeyError, AttributeError):
# If parsing fails or the key is not found, use a fallback query
return {"search_query": f"Tell me more about {state.research_topic}"}
def finalize_summary(state: SummaryState):
"""LangGraph node that finalizes the research summary.
Prepares the final output by deduplicating and formatting sources, then
combining them with the running summary to create a well-structured
research report with proper citations.
Args:
state: Current graph state containing the running summary and sources gathered
Returns:
Dictionary with state update, including running_summary key containing the formatted final summary with sources
"""
# Deduplicate sources before joining
seen_sources = set()
unique_sources = []
for source in state.sources_gathered:
# Split the source into lines and process each individually
for line in source.split('\n'):
# Only process non-empty lines
if line.strip() and line not in seen_sources:
seen_sources.add(line)
unique_sources.append(line)
# Join the deduplicated sources
all_sources = "\n".join(unique_sources)
state.running_summary = f"## Summary\n{state.running_summary}\n\n ### Sources:\n{all_sources}"
return {"running_summary": state.running_summary}
def route_research(state: SummaryState, config: RunnableConfig) -> Literal["finalize_summary", "web_research"]:
"""LangGraph routing function that determines the next step in the research flow.
Controls the research loop by deciding whether to continue gathering information
or to finalize the summary based on the configured maximum number of research loops.
Args:
state: Current graph state containing the research loop count
config: Configuration for the runnable, including max_web_research_loops setting
Returns:
String literal indicating the next node to visit ("web_research" or "finalize_summary")
"""
configurable = Configuration.from_runnable_config(config)
if state.research_loop_count <= configurable.max_web_research_loops:
return "web_research"
else:
return "finalize_summary"
# Add nodes and edges
builder = StateGraph(SummaryState, input=SummaryStateInput, output=SummaryStateOutput, config_schema=Configuration)
builder.add_node("generate_query", generate_query)
builder.add_node("web_research", web_research)
builder.add_node("summarize_sources", summarize_sources)
builder.add_node("reflect_on_summary", reflect_on_summary)
builder.add_node("finalize_summary", finalize_summary)
# Add edges
builder.add_edge(START, "generate_query")
builder.add_edge("generate_query", "web_research")
builder.add_edge("web_research", "summarize_sources")
builder.add_edge("summarize_sources", "reflect_on_summary")
builder.add_conditional_edges("reflect_on_summary", route_research)
builder.add_edge("finalize_summary", END)
graph = builder.compile()