forked from MervinPraison/PraisonAI
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
697 lines (577 loc) · 26.7 KB
/
main.py
File metadata and controls
697 lines (577 loc) · 26.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
import os
import time
import json
import logging
import threading
from praisonaiagents._logging import get_logger
from typing import List, Optional, Dict, Any, Union, Literal, Type
from pydantic import BaseModel, ConfigDict
import asyncio
def _rich():
"""Lazy-load Rich display classes (cached by sys.modules after first call)."""
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.markdown import Markdown
return Console, Panel, Text, Markdown
# Import token metrics if available
try:
from .telemetry.token_collector import TokenMetrics
except ImportError:
TokenMetrics = None
# Logging is already configured in _logging.py via __init__.py
# Global list to store error logs
error_logs = []
_error_logs_lock = threading.Lock()
# Separate registries for sync and async callbacks
sync_display_callbacks = {}
async_display_callbacks = {}
_callbacks_lock = threading.Lock()
# Global approval callback registry
approval_callback = None
_approval_callback_lock = threading.Lock()
# ─────────────────────────────────────────────────────────────────────────────
# PraisonAI Unique Color Palette: "Elegant Intelligence"
# Creates a visual narrative flow: Agent → Task → Working → Response
# ─────────────────────────────────────────────────────────────────────────────
PRAISON_COLORS = {
# Agent identity - grounded, stable
"agent": "#86A789", # Soft Sage Green
"agent_text": "#D2E3C8", # Light sage for text
# Task/Question - input, attention-grabbing
"task": "#FF9B9B", # Warm Coral
"task_text": "#FFE5E5", # Light coral for text
# Working/Processing - action, energy
"working": "#FFB347", # Amber
"working_text": "#FFF3E0", # Light amber for text
# Response/Output - completion, calm
"response": "#4A90D9", # Ocean Blue
"response_text": "#E3F2FD", # Light blue for text
# Tool calls - special action
"tool": "#9B7EDE", # Violet Accent
"tool_text": "#EDE7F6", # Light violet for text
# Reasoning - thought process
"reasoning": "#78909C", # Blue Gray
"reasoning_text": "#ECEFF1", # Light gray for text
# Error/Warning - alert
"error": "#E57373", # Alert Red
"error_text": "#FFEBEE", # Light red for text
# Metrics - meta information
"metrics": "#B4B4B3", # Cool Gray
"metrics_text": "#FAFAFA", # Near white for text
}
# Status animation frames for "Working" indicator
WORKING_FRAMES = ["●○○", "○●○", "○○●", "○●○"]
WORKING_PHASES = [
"Analyzing query...",
"Processing context...",
"Generating response...",
"Finalizing output...",
]
# At the top of the file, add display_callbacks to __all__
__all__ = [
'error_logs',
'register_display_callback',
'register_approval_callback',
'add_display_callback', # Simplified alias
'add_approval_callback', # Simplified alias
'sync_display_callbacks',
'async_display_callbacks',
'execute_callback',
'approval_callback',
# Color palette and animation constants
'PRAISON_COLORS',
'WORKING_FRAMES',
'WORKING_PHASES',
# Display functions
'display_interaction',
'display_instruction',
'display_tool_call',
'display_error',
'display_generating',
'display_reasoning_steps',
'display_working_status',
'display_self_reflection',
]
# P3/G2: Supported callback types documentation
# Standard callbacks: tool_call, interaction, error, llm_start, llm_end
# Autonomy callbacks (PraisonAI unique):
# - autonomy_iteration: {iteration, max_iterations, stage}
# - autonomy_stage_change: {from_stage, to_stage}
# - autonomy_doom_loop: {iteration, recovery_action}
# - autonomy_complete: {completion_reason, iterations, duration_seconds}
# - retry: {attempt, max_attempts, error, retry_in_seconds}
SUPPORTED_CALLBACK_TYPES = [
'tool_call', 'interaction', 'error', 'llm_start', 'llm_end', 'llm_content',
'autonomy_iteration', 'autonomy_stage_change', 'autonomy_doom_loop', 'autonomy_complete',
'retry',
]
def register_display_callback(display_type: str, callback_fn, is_async: bool = False):
"""Register a synchronous or asynchronous callback function for a specific display type.
Args:
display_type (str): Type of display event ('interaction', 'self_reflection', etc.)
callback_fn: The callback function to register
is_async (bool): Whether the callback is asynchronous
"""
with _callbacks_lock:
if is_async:
async_display_callbacks[display_type] = callback_fn
else:
sync_display_callbacks[display_type] = callback_fn
def register_approval_callback(callback_fn):
"""Register a global approval callback function for dangerous tool operations.
Args:
callback_fn: Function that takes (function_name, arguments, risk_level) and returns ApprovalDecision
"""
global approval_callback
with _approval_callback_lock:
approval_callback = callback_fn
# Simplified aliases (consistent naming convention)
add_display_callback = register_display_callback
add_approval_callback = register_approval_callback
def execute_sync_callback(display_type: str, **kwargs):
"""Execute synchronous callback for a given display type without displaying anything.
This function is used to trigger callbacks even when verbose=False.
Args:
display_type (str): Type of display event
**kwargs: Arguments to pass to the callback function
"""
with _callbacks_lock:
callback = sync_display_callbacks.get(display_type)
if callback:
import inspect
sig = inspect.signature(callback)
# Filter kwargs to what the callback accepts to maintain backward compatibility
if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()):
# Callback accepts **kwargs, so pass all arguments
supported_kwargs = kwargs
else:
# Only pass arguments that the callback signature supports
supported_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters}
callback(**supported_kwargs)
async def execute_callback(display_type: str, **kwargs):
"""Execute both sync and async callbacks for a given display type.
Args:
display_type (str): Type of display event
**kwargs: Arguments to pass to the callback functions
"""
import inspect
# Execute synchronous callback if registered
with _callbacks_lock:
sync_callback = sync_display_callbacks.get(display_type)
async_callback = async_display_callbacks.get(display_type)
if sync_callback:
sig = inspect.signature(sync_callback)
# Filter kwargs to what the callback accepts to maintain backward compatibility
if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()):
# Callback accepts **kwargs, so pass all arguments
supported_kwargs = kwargs
else:
# Only pass arguments that the callback signature supports
supported_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters}
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: sync_callback(**supported_kwargs))
# Execute asynchronous callback if registered
if async_callback:
callback = async_callback
sig = inspect.signature(callback)
# Filter kwargs to what the callback accepts to maintain backward compatibility
if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()):
# Callback accepts **kwargs, so pass all arguments
supported_kwargs = kwargs
else:
# Only pass arguments that the callback signature supports
supported_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters}
await callback(**supported_kwargs)
def _clean_display_content(content: str, max_length: int = 20000) -> str:
"""Helper function to clean and truncate content for display."""
if not content or not str(content).strip():
logging.debug(f"Empty content received in _clean_display_content: {repr(content)}")
return ""
content = str(content)
# Handle base64 content
if "base64" in content:
content_parts = []
for line in content.split('\n'):
if "base64" not in line:
content_parts.append(line)
content = '\n'.join(content_parts)
# Truncate if too long
if len(content) > max_length:
content = content[:max_length] + "..."
return content.strip()
def display_interaction(message, response, markdown=True, generation_time=None, console=None, agent_name=None, agent_role=None, agent_tools=None, task_name=None, task_description=None, task_id=None, metrics=None):
"""Synchronous version of display_interaction.
Displays the task/message and response in clean panels with semantic colors
and optional metrics footer. Uses PraisonAI's unique color palette.
Args:
metrics: Optional dict with token_in, token_out, cost, model for footer display
"""
Console, Panel, Text, Markdown = _rich()
if console is None:
console = Console()
if isinstance(message, list):
text_content = next((item["text"] for item in message if item["type"] == "text"), "")
message = text_content
message = _clean_display_content(str(message))
response = _clean_display_content(str(response))
# Skip display if response is empty (common with Gemini tool calls)
if not response or not response.strip():
return
# Execute synchronous callbacks
execute_sync_callback(
'interaction',
message=message,
response=response,
markdown=markdown,
generation_time=generation_time,
agent_name=agent_name,
agent_role=agent_role,
agent_tools=agent_tools,
task_name=task_name,
task_description=task_description,
task_id=task_id,
metrics=metrics
)
# Build response title with time
response_title = "Response"
if generation_time:
response_title = f"Response ({generation_time:.1f}s)"
# Build response content with optional metrics footer
response_content = response
if metrics and isinstance(metrics, dict):
# Add dashed separator and compact metrics line
tokens_in = metrics.get('tokens_in', 0)
tokens_out = metrics.get('tokens_out', 0)
cost = metrics.get('cost', 0)
model = metrics.get('model', '')
metrics_line = f"\n\n─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─\n📊 {tokens_in} tokens in · {tokens_out} out"
if cost > 0:
metrics_line += f" · ${cost:.4f}"
if model:
metrics_line += f" · {model}"
response_content = response + metrics_line
# Task is inline (less visual weight), Response keeps panel (it's the main content)
# Format: 📝 Task message
task_prefix = "[bold #FF9B9B]📝[/]"
if markdown:
console.print(f"{task_prefix} {message}\n")
console.print(Panel.fit(Markdown(response_content), title=response_title, border_style=PRAISON_COLORS["response"]))
else:
console.print(f"{task_prefix} [bold {PRAISON_COLORS['task_text']}]{message}[/]\n")
console.print(Panel.fit(Text(response_content, style=f"bold {PRAISON_COLORS['response_text']}"), title=response_title, border_style=PRAISON_COLORS["response"]))
def display_self_reflection(message: str, console=None):
if not message or not message.strip():
return
Console, Panel, Text, _Md = _rich()
if console is None:
console = Console()
message = _clean_display_content(str(message))
# Execute synchronous callbacks
execute_sync_callback('self_reflection', message=message)
console.print(Panel.fit(Text(message, style="bold yellow"), title="Self Reflection", border_style="magenta"))
def display_instruction(message: str, console=None, agent_name: str = None, agent_role: str = None, agent_tools: List[str] = None):
if not message or not message.strip():
return
Console, Panel, Text, _Md = _rich()
if console is None:
console = Console()
message = _clean_display_content(str(message))
# Execute synchronous callbacks
execute_sync_callback('instruction', message=message, agent_name=agent_name, agent_role=agent_role, agent_tools=agent_tools)
# Display agent info if available
if agent_name:
agent_info = f"[bold #FF9B9B]👤 Agent:[/] [#FFE5E5]{agent_name}[/]"
if agent_role:
agent_info += f"\n[bold #B4B4B3]Role:[/] [#FFE5E5]{agent_role}[/]"
if agent_tools:
tools_str = ", ".join(f"[italic #B4D4FF]{tool}[/]" for tool in agent_tools)
agent_info += f"\n[bold #86A789]Tools:[/] {tools_str}"
console.print(Panel(agent_info, border_style="#D2E3C8", title="[bold]Agent Info[/]", title_align="left", padding=(1, 2)))
# Only print if log level is DEBUG
if get_logger().getEffectiveLevel() == logging.DEBUG:
console.print(Panel.fit(Text(message, style="bold blue"), title="Instruction", border_style="cyan"))
def display_tool_call(message: str, console=None, tool_name: str = None, tool_input: dict = None, tool_output: str = None, elapsed_time: float = None, success: bool = True):
"""Display tool call information in PraisonAI's unique timeline format.
Uses ▸ prefix, inline timing [X.Xs], and status icons ✓/✗ for a clean,
scannable tool activity display.
Args:
message: The tool call message (legacy format)
console: Rich console for output
tool_name: Name of the tool being called
tool_input: Input arguments to the tool
tool_output: Output from the tool (if available)
elapsed_time: Time taken for tool execution in seconds
success: Whether the tool call succeeded
"""
logging.debug(f"display_tool_call called with message: {repr(message)}")
if not message or not message.strip():
logging.debug("Empty message in display_tool_call, returning early")
return
message = _clean_display_content(str(message))
logging.debug(f"Cleaned message in display_tool_call: {repr(message)}")
# Execute synchronous callbacks (always, even when console is None)
execute_sync_callback('tool_call', message=message, tool_name=tool_name, tool_input=tool_input, tool_output=tool_output)
# Only print if console is provided (verbose mode)
if console is not None:
# Build clean inline format - no panels, just prefixed lines
if tool_name:
# Format: ▸ tool_name(args) → result [X.Xs] ✓
args_str = ""
if tool_input:
# Truncate long values for display
args_parts = []
for k, v in tool_input.items():
v_str = str(v)
if len(v_str) > 50:
v_str = v_str[:47] + "..."
args_parts.append(f"{k}={repr(v_str) if isinstance(v, str) else v_str}")
args_str = ", ".join(args_parts)
# Build the inline entry
status_icon = "[green]✓[/]" if success else "[red]✗[/]"
time_str = f"[dim][{elapsed_time:.1f}s][/]" if elapsed_time else ""
# Base tool call line
tool_line = f"[bold #86A789]▸[/] [#B4D4FF]{tool_name}[/]({args_str})"
# Add output inline with arrow if available
if tool_output:
output_str = str(tool_output)
if len(output_str) > 80:
output_str = output_str[:77] + "..."
tool_line += f" [dim]→[/] [italic]{output_str}[/]"
tool_line += f" {time_str} {status_icon}"
console.print(tool_line)
else:
# Legacy format - simple inline
console.print(f"[bold #86A789]▸[/] {message}")
def display_error(message: str, console=None):
if not message or not message.strip():
return
Console, Panel, Text, _Md = _rich()
if console is None:
console = Console()
message = _clean_display_content(str(message))
# Execute synchronous callbacks
execute_sync_callback('error', message=message)
# Use semantic error color
console.print(Panel.fit(
Text(message, style=f"bold {PRAISON_COLORS['error_text']}"),
title="⚠ Error",
border_style=PRAISON_COLORS["error"]
))
with _error_logs_lock:
error_logs.append(message)
def display_generating(content: str = "", start_time: Optional[float] = None):
if not content or not str(content).strip():
logging.debug("Empty content in display_generating, returning early")
return None
elapsed_str = ""
if start_time is not None:
elapsed = time.time() - start_time
elapsed_str = f" {elapsed:.1f}s"
content = _clean_display_content(str(content))
# Execute synchronous callbacks
execute_sync_callback('generating', content=content, elapsed_time=elapsed_str.strip() if elapsed_str else None)
_Console, Panel, _Text, Markdown = _rich()
return Panel(Markdown(content), title=f"Generating...{elapsed_str}", border_style=PRAISON_COLORS["response"])
def display_reasoning_steps(steps: List[str], console=None):
"""Display reasoning steps with unique numbered circles.
Uses ①②③ numbered circles for a distinctive, scannable format
that shows the agent's thought process.
Args:
steps: List of reasoning step descriptions
console: Rich console for output
"""
if not steps:
return
Console, Panel, Text, _Md = _rich()
if console is None:
console = Console()
# Circle number mapping (supports up to 20 steps)
circle_numbers = "①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳"
# Build reasoning display with numbered circles
reasoning_lines = []
for i, step in enumerate(steps):
circle = circle_numbers[i] if i < len(circle_numbers) else f"({i+1})"
step_text = _clean_display_content(str(step))
if len(step_text) > 80:
step_text = step_text[:77] + "..."
reasoning_lines.append(f"{circle} {step_text}")
reasoning_display = "\n".join(reasoning_lines)
console.print(Panel.fit(
Text(reasoning_display, style=f"italic {PRAISON_COLORS['reasoning_text']}"),
title="Reasoning",
border_style=PRAISON_COLORS["reasoning"]
))
def display_working_status(phase: int = 0, status_text: str = None, console=None):
"""Display animated working status with pulsing dots.
Shows a unique "Working ●○○" indicator with phase-specific status.
This is PraisonAI's distinctive approach to showing processing status.
Args:
phase: Current animation phase (0-3)
status_text: Optional status description
console: Rich console for output
Returns:
Panel object for use with Rich.Live
"""
Console, Panel, Text, _Md = _rich()
if console is None:
console = Console()
# Get current frame and phase text
frame = WORKING_FRAMES[phase % len(WORKING_FRAMES)]
phase_text = status_text or WORKING_PHASES[phase % len(WORKING_PHASES)]
# Build working status display
working_display = f"Working {frame} {phase_text}"
return Panel.fit(
Text(working_display, style=f"bold {PRAISON_COLORS['working_text']}"),
title="Status",
border_style=PRAISON_COLORS["working"]
)
# Async versions with 'a' prefix
async def adisplay_interaction(message, response, markdown=True, generation_time=None, console=None, agent_name=None, agent_role=None, agent_tools=None, task_name=None, task_description=None, task_id=None):
"""Async version of display_interaction."""
Console, Panel, Text, Markdown = _rich()
if console is None:
console = Console()
if isinstance(message, list):
text_content = next((item["text"] for item in message if item["type"] == "text"), "")
message = text_content
message = _clean_display_content(str(message))
response = _clean_display_content(str(response))
# Execute callbacks
await execute_callback(
'interaction',
message=message,
response=response,
markdown=markdown,
generation_time=generation_time,
agent_name=agent_name,
agent_role=agent_role,
agent_tools=agent_tools,
task_name=task_name,
task_description=task_description,
task_id=task_id
)
# Rest of the display logic...
if generation_time:
console.print(Text(f"Response generated in {generation_time:.1f}s", style="dim"))
if markdown:
console.print(Panel.fit(Markdown(message), title="Task", border_style="cyan"))
console.print(Panel.fit(Markdown(response), title="Response", border_style="cyan"))
else:
console.print(Panel.fit(Text(message, style="bold green"), title="Task", border_style="cyan"))
console.print(Panel.fit(Text(response, style="bold blue"), title="Response", border_style="cyan"))
async def adisplay_self_reflection(message: str, console=None):
"""Async version of display_self_reflection."""
if not message or not message.strip():
return
Console, Panel, Text, _Md = _rich()
if console is None:
console = Console()
message = _clean_display_content(str(message))
# Execute callbacks
await execute_callback('self_reflection', message=message)
console.print(Panel.fit(Text(message, style="bold yellow"), title="Self Reflection", border_style="magenta"))
async def adisplay_instruction(message: str, console=None, agent_name: str = None, agent_role: str = None, agent_tools: List[str] = None):
"""Async version of display_instruction."""
if not message or not message.strip():
return
Console, Panel, Text, _Md = _rich()
if console is None:
console = Console()
message = _clean_display_content(str(message))
# Execute callbacks
await execute_callback('instruction', message=message, agent_name=agent_name, agent_role=agent_role, agent_tools=agent_tools)
# Display agent info if available
if agent_name:
agent_info = f"[bold #FF9B9B]👤 Agent:[/] [#FFE5E5]{agent_name}[/]"
if agent_role:
agent_info += f"\n[bold #B4B4B3]Role:[/] [#FFE5E5]{agent_role}[/]"
if agent_tools:
tools_str = ", ".join(f"[italic #B4D4FF]{tool}[/]" for tool in agent_tools)
agent_info += f"\n[bold #86A789]Tools:[/] {tools_str}"
console.print(Panel(agent_info, border_style="#D2E3C8", title="[bold]Agent Info[/]", title_align="left", padding=(1, 2)))
# Only print if log level is DEBUG
if get_logger().getEffectiveLevel() == logging.DEBUG:
console.print(Panel.fit(Text(message, style="bold blue"), title="Instruction", border_style="cyan"))
async def adisplay_tool_call(message: str, console=None):
"""Async version of display_tool_call."""
logging.debug(f"adisplay_tool_call called with message: {repr(message)}")
if not message or not message.strip():
logging.debug("Empty message in adisplay_tool_call, returning early")
return
Console, Panel, Text, _Md = _rich()
if console is None:
console = Console()
message = _clean_display_content(str(message))
logging.debug(f"Cleaned message in adisplay_tool_call: {repr(message)}")
# Execute callbacks
await execute_callback('tool_call', message=message)
console.print(Panel.fit(Text(message, style="bold cyan"), title="Tool Call", border_style="green"))
async def adisplay_error(message: str, console=None):
"""Async version of display_error."""
if not message or not message.strip():
return
Console, Panel, Text, _Md = _rich()
if console is None:
console = Console()
message = _clean_display_content(str(message))
# Execute callbacks
await execute_callback('error', message=message)
console.print(Panel.fit(Text(message, style="bold red"), title="Error", border_style="red"))
with _error_logs_lock:
error_logs.append(message)
async def adisplay_generating(content: str = "", start_time: Optional[float] = None):
"""Async version of display_generating."""
if not content or not str(content).strip():
logging.debug("Empty content in adisplay_generating, returning early")
return None
elapsed_str = ""
if start_time is not None:
elapsed = time.time() - start_time
elapsed_str = f" {elapsed:.1f}s"
content = _clean_display_content(str(content))
# Execute callbacks
await execute_callback('generating', content=content, elapsed_time=elapsed_str.strip() if elapsed_str else None)
_Console, Panel, _Text, Markdown = _rich()
return Panel(Markdown(content), title=f"Generating...{elapsed_str}", border_style="green")
def clean_triple_backticks(text: str) -> str:
"""Remove triple backticks and surrounding json fences from a string."""
cleaned = text.strip()
if cleaned.startswith("```json"):
cleaned = cleaned[len("```json"):].strip()
if cleaned.startswith("```"):
cleaned = cleaned[len("```"):].strip()
if cleaned.endswith("```"):
cleaned = cleaned[:-3].strip()
return cleaned
class ReflectionOutput(BaseModel):
reflection: str
satisfactory: Literal["yes", "no"]
class TaskOutput(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
description: str
summary: Optional[str] = None
raw: str
pydantic: Optional[BaseModel] = None
json_dict: Optional[Dict[str, Any]] = None
agent: str
output_format: Literal["RAW", "JSON", "Pydantic"] = "RAW"
token_metrics: Optional['TokenMetrics'] = None # Add token metrics field
def json(self) -> Optional[str]:
if self.output_format == "JSON" and self.json_dict:
return json.dumps(self.json_dict)
return None
def to_dict(self) -> dict:
output_dict = {}
if self.json_dict:
output_dict.update(self.json_dict)
if self.pydantic:
output_dict.update(self.pydantic.model_dump())
return output_dict
def __str__(self):
if self.pydantic:
return str(self.pydantic)
elif self.json_dict:
return json.dumps(self.json_dict)
else:
return self.raw