1+ import json5
2+ from Agently .utils import Lexer
3+ from .utils import ComponentABC
4+ from Agently .utils import find_json , DataGenerator
5+
6+ class Realtime (ComponentABC ):
7+ def __init__ (self , agent : object ):
8+ self .agent = agent
9+ self ._is_enable = False
10+ self ._is_init = False
11+ self ._output_data_frame = None
12+ self ._target_keys = []
13+ self ._buffer = ""
14+ self ._data_generator = DataGenerator ()
15+ self ._streamed_keys = []
16+ self ._ongoing_key = None
17+ self ._ongoing_value = None
18+
19+ def use_realtime (self ):
20+ self ._is_enable = True
21+ return self .agent
22+
23+ def __get_output_data_frame (self , prompt_output :dict , prefix :str = "" ):
24+ result = {}
25+ for key , value in prompt_output .items ():
26+ if isinstance (key , dict ):
27+ prefix += f"{ key } ."
28+ result .update ({ key : self .__get_output_data_frame (value , prefix ) })
29+ else :
30+ self ._target_keys .append (prefix + key )
31+ result .update ({ key : None })
32+ return result
33+
34+ async def __scan_realtime_value (self , realtime_value :dict , prefix :str = "" , output_data_frame_pointer :dict = {}):
35+ for key , value in realtime_value .items ():
36+ if isinstance (value , dict ):
37+ if key in output_data_frame_pointer :
38+ await self .__scan_realtime_value (value , prefix + f"{ key } ." , output_data_frame_pointer [key ])
39+ else :
40+ if self ._ongoing_key == (prefix + key ):
41+ self ._ongoing_value = value
42+ if self ._ongoing_key != prefix + key :
43+ if (prefix + key ) not in self ._streamed_keys and value not in (None , "" ):
44+ if self ._ongoing_key != None :
45+ await self .agent .call_event_listeners (f"key_stop:{ self ._ongoing_key } " , None )
46+ self ._ongoing_key = prefix + key
47+ await self .agent .call_event_listeners (f"key_start:{ self ._ongoing_key } " , None )
48+ self ._streamed_keys .append (prefix + key )
49+ self ._ongoing_value = value
50+ if (prefix + key ) in self ._streamed_keys or value in (None , "" ):
51+ continue
52+
53+ async def _suffix (self , event :str , data :any ):
54+ if not self ._is_enable or "type" not in self .agent .request .response_cache or self .agent .request .response_cache ["type" ] != "JSON" :
55+ return None
56+ else :
57+ if not self ._is_init :
58+ prompt_output = self .agent .request .response_cache ["prompt" ]["output" ]
59+ if isinstance (prompt_output , dict ):
60+ self ._output_data_frame = self .__get_output_data_frame (prompt_output )
61+ self ._is_init = True
62+ if event == "response:delta" :
63+ self ._buffer += data
64+ realtime_json_str = find_json (self ._buffer )
65+ if realtime_json_str != None :
66+ lexer = Lexer ()
67+ lexer .append_string (realtime_json_str )
68+ realtime_value = json5 .loads (lexer .complete_json ())
69+ if self ._output_data_frame == None :
70+ await self .agent .call_event_listeners (
71+ "response:realtime" ,
72+ {
73+ "key" : None ,
74+ "value" : realtime_value ,
75+ "complete_value" : realtime_value ,
76+ },
77+ )
78+ else :
79+ if isinstance (realtime_value , dict ):
80+ await self .__scan_realtime_value (realtime_value , "" , self ._output_data_frame )
81+ if self ._ongoing_key != None :
82+ await self .agent .call_event_listeners (
83+ "response:realtime" ,
84+ {
85+ "key" : self ._ongoing_key ,
86+ "value" : self ._ongoing_value ,
87+ "complete_value" : realtime_value ,
88+ },
89+ )
90+
91+ def export (self ):
92+ return {
93+ "prefix" : None ,
94+ "suffix" : self ._suffix ,
95+ "alias" : {
96+ "use_realtime" : { "func" : self .use_realtime }
97+ },
98+ }
99+
100+ def export ():
101+ return ("Realtime" , Realtime )
0 commit comments