11"""SQLite implementation of UiPathResumableStorageProtocol."""
22
33import json
4- from typing import cast
4+ from typing import Any , cast
55
66from langgraph .checkpoint .sqlite .aio import AsyncSqliteSaver
77from pydantic import BaseModel
1414
1515
1616class SqliteResumableStorage :
17- """SQLite storage for resume triggers."""
17+ """SQLite storage for resume triggers and arbitrary kv pairs ."""
1818
1919 def __init__ (
20- self , memory : AsyncSqliteSaver , table_name : str = "__uipath_resume_triggers"
20+ self ,
21+ memory : AsyncSqliteSaver ,
22+ rs_table_name : str = "__uipath_resume_triggers" ,
23+ kv_table_name : str = "__uipath_runtime_kv" ,
2124 ):
2225 self .memory = memory
23- self .table_name = table_name
26+ self .rs_table_name = rs_table_name
27+ self .kv_table_name = kv_table_name
2428 self ._initialized = False
2529
30+ def _dump_value (self , value : str | dict [str , Any ] | BaseModel | None ) -> str | None :
31+ if value is None :
32+ return None
33+ if isinstance (value , BaseModel ):
34+ return json .dumps (value .model_dump ())
35+ if isinstance (value , dict ):
36+ return json .dumps (value )
37+ return value
38+
39+ def _load_value (self , raw : str | None ) -> Any :
40+ if raw is None :
41+ return None
42+ try :
43+ return json .loads (raw )
44+ except Exception :
45+ return raw
46+
2647 async def _ensure_table (self ) -> None :
27- """Create table if needed."""
48+ """Create tables if needed."""
2849 if self ._initialized :
2950 return
3051
3152 await self .memory .setup ()
3253 async with self .memory .lock , self .memory .conn .cursor () as cur :
33- await cur .execute (f"""
34- CREATE TABLE IF NOT EXISTS { self .table_name } (
54+ await cur .execute (
55+ f"""
56+ CREATE TABLE IF NOT EXISTS { self .rs_table_name } (
3557 id INTEGER PRIMARY KEY AUTOINCREMENT,
58+ runtime_id TEXT NOT NULL,
3659 type TEXT NOT NULL,
3760 name TEXT NOT NULL,
3861 key TEXT,
@@ -41,75 +64,146 @@ async def _ensure_table(self) -> None:
4164 payload TEXT,
4265 timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc'))
4366 )
44- """ )
67+ """
68+ )
69+
70+ await cur .execute (
71+ f"""
72+ CREATE TABLE IF NOT EXISTS { self .kv_table_name } (
73+ runtime_id TEXT NOT NULL,
74+ namespace TEXT NOT NULL,
75+ key TEXT NOT NULL,
76+ value TEXT,
77+ timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')),
78+ PRIMARY KEY (runtime_id, namespace, key)
79+ )
80+ """
81+ )
82+
4583 await self .memory .conn .commit ()
46- self ._initialized = True
4784
48- async def save_trigger (self , trigger : UiPathResumeTrigger ) -> None :
49- """Save resume trigger to database."""
85+ self ._initialized = True
86+
87+ async def save_trigger (self , runtime_id : str , trigger : UiPathResumeTrigger ) -> None :
88+ """Save resume trigger to database (scoped by runtime_id)."""
5089 await self ._ensure_table ()
5190
5291 trigger_key = (
5392 trigger .api_resume .inbox_id if trigger .api_resume else trigger .item_key
5493 )
55- payload = trigger .payload
56- if payload :
57- payload = (
58- (
59- payload .model_dump ()
60- if isinstance (payload , BaseModel )
61- else json .dumps (payload )
62- )
63- if isinstance (payload , dict )
64- else str (payload )
65- )
94+ payload_text = self ._dump_value (trigger .payload )
6695
6796 async with self .memory .lock , self .memory .conn .cursor () as cur :
6897 await cur .execute (
69- f"INSERT INTO { self .table_name } (type, key, name, payload, folder_path, folder_key) VALUES (?, ?, ?, ?, ?, ?)" ,
98+ f"""
99+ INSERT INTO { self .rs_table_name }
100+ (runtime_id, type, key, name, payload, folder_path, folder_key)
101+ VALUES (?, ?, ?, ?, ?, ?, ?)
102+ """ ,
70103 (
104+ runtime_id ,
71105 trigger .trigger_type .value ,
72106 trigger_key ,
73107 trigger .trigger_name .value ,
74- payload ,
108+ payload_text ,
75109 trigger .folder_path ,
76110 trigger .folder_key ,
77111 ),
78112 )
79113 await self .memory .conn .commit ()
80114
81- async def get_latest_trigger (self ) -> UiPathResumeTrigger | None :
82- """Get most recent trigger from database."""
115+ async def get_latest_trigger (self , runtime_id : str ) -> UiPathResumeTrigger | None :
116+ """Get most recent trigger for runtime_id from database."""
83117 await self ._ensure_table ()
84118
85119 async with self .memory .lock , self .memory .conn .cursor () as cur :
86- await cur .execute (f"""
120+ await cur .execute (
121+ f"""
87122 SELECT type, key, name, folder_path, folder_key, payload
88- FROM { self .table_name }
123+ FROM { self .rs_table_name }
124+ WHERE runtime_id = ?
89125 ORDER BY timestamp DESC
90126 LIMIT 1
91- """ )
127+ """ ,
128+ (runtime_id ,),
129+ )
92130 result = await cur .fetchone ()
93131
94- if not result :
95- return None
132+ if not result :
133+ return None
134+
135+ trigger_type , key , name , folder_path , folder_key , payload_text = cast (
136+ tuple [str , str , str , str | None , str | None , str | None ], tuple (result )
137+ )
138+
139+ payload = self ._load_value (payload_text )
96140
97- trigger_type , key , name , folder_path , folder_key , payload = cast (
98- tuple [str , str , str , str , str , str ], tuple (result )
141+ resume_trigger = UiPathResumeTrigger (
142+ trigger_type = UiPathResumeTriggerType (trigger_type ),
143+ trigger_name = UiPathResumeTriggerName (name ),
144+ item_key = key ,
145+ folder_path = folder_path ,
146+ folder_key = folder_key ,
147+ payload = payload ,
148+ )
149+
150+ if resume_trigger .trigger_type == UiPathResumeTriggerType .API :
151+ resume_trigger .api_resume = UiPathApiTrigger (
152+ inbox_id = resume_trigger .item_key ,
153+ request = resume_trigger .payload ,
99154 )
100155
101- resume_trigger = UiPathResumeTrigger (
102- trigger_type = UiPathResumeTriggerType (trigger_type ),
103- trigger_name = UiPathResumeTriggerName (name ),
104- item_key = key ,
105- folder_path = folder_path ,
106- folder_key = folder_key ,
107- payload = payload ,
156+ return resume_trigger
157+
158+ async def save_value (
159+ self ,
160+ runtime_id : str ,
161+ namespace : str ,
162+ key : str ,
163+ value : Any ,
164+ ) -> None :
165+ """Save arbitrary key-value pair to database."""
166+ assert (
167+ isinstance (value , str )
168+ or isinstance (value , dict )
169+ or isinstance (value , BaseModel )
170+ ), "Value must be str, dict, BaseModel."
171+
172+ await self ._ensure_table ()
173+
174+ value_text = self ._dump_value (value )
175+
176+ async with self .memory .lock , self .memory .conn .cursor () as cur :
177+ await cur .execute (
178+ f"""
179+ INSERT INTO { self .kv_table_name } (runtime_id, namespace, key, value)
180+ VALUES (?, ?, ?, ?)
181+ ON CONFLICT(runtime_id, namespace, key)
182+ DO UPDATE SET
183+ value = excluded.value,
184+ timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc'))
185+ """ ,
186+ (runtime_id , namespace , key , value_text ),
108187 )
188+ await self .memory .conn .commit ()
109189
110- if resume_trigger .trigger_type == UiPathResumeTriggerType .API :
111- resume_trigger .api_resume = UiPathApiTrigger (
112- inbox_id = resume_trigger .item_key , request = resume_trigger .payload
113- )
190+ async def get_value (self , runtime_id : str , namespace : str , key : str ) -> Any :
191+ """Get arbitrary key-value pair from database (scoped by runtime_id + namespace)."""
192+ await self ._ensure_table ()
193+
194+ async with self .memory .lock , self .memory .conn .cursor () as cur :
195+ await cur .execute (
196+ f"""
197+ SELECT value
198+ FROM { self .kv_table_name }
199+ WHERE runtime_id = ? AND namespace = ? AND key = ?
200+ LIMIT 1
201+ """ ,
202+ (runtime_id , namespace , key ),
203+ )
204+ row = await cur .fetchone ()
205+
206+ if not row :
207+ return None
114208
115- return resume_trigger
209+ return self . _load_value ( cast ( str | None , row [ 0 ]))
0 commit comments