Skip to content

Commit 757a88a

Browse files
Chibi Vikramclaude
andcommitted
feat: Add suspend/resume support for RPA invocations in evaluations
This change implements the suspend/resume pattern for RPA process invocations within the evaluation runtime, ensuring evaluations can pause when waiting for external job completion and resume after the job finishes. Key Changes: 1. Functions Runtime (src/uipath/functions/runtime.py): - Added _detect_langgraph_interrupt() method to detect LangGraph's __interrupt__ field and extract trigger information - Modified execute() to check for interrupts and return SUSPENDED status - Converts InvokeProcess objects to UiPathResumeTrigger for serverless executor 2. Evaluation Runtime (src/uipath/_cli/_evals/_runtime.py): - Added SUSPENDED status detection after agent execution - Populates agentExecutionOutput with trigger data when suspended - Skips evaluator execution for suspended runs - Publishes EvalRunUpdatedEvent with suspended status to event bus 3. Test Agent (samples/event-trigger/test_suspend_resume_agent.py): - Updated to use MemorySaver checkpointer (required for LangGraph interrupts) - Returns raw dict to preserve __interrupt__ field for runtime detection The implementation follows the established pattern from UiPathResumableRuntime and ensures proper trigger extraction and status handling throughout the evaluation lifecycle. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 2da6547 commit 757a88a

3 files changed

Lines changed: 254 additions & 0 deletions

File tree

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
"""Test agent that demonstrates suspend/resume pattern with RPA process invocation."""
2+
3+
from typing import TypedDict
4+
5+
from langgraph.graph import END, START, StateGraph
6+
from langgraph.types import interrupt
7+
from pydantic import BaseModel
8+
9+
from uipath.platform.common import InvokeProcess
10+
11+
12+
class Input(BaseModel):
13+
"""Input for the test agent."""
14+
15+
query: str
16+
17+
18+
class Output(BaseModel):
19+
"""Output from the test agent."""
20+
21+
result: str
22+
process_output: dict | None = None
23+
24+
25+
class State(TypedDict):
26+
"""Agent state."""
27+
28+
query: str
29+
process_result: dict | None
30+
final_result: str
31+
32+
33+
def prepare_input(state: State) -> State:
34+
"""Prepare input for RPA process."""
35+
print(f"Preparing to call RPA process with query: {state['query']}")
36+
return state
37+
38+
39+
def call_rpa_process(state: State) -> State:
40+
"""Call RPA process - this will suspend execution."""
41+
print("Calling RPA process - execution will suspend here")
42+
43+
# This interrupt() call will cause the runtime to suspend
44+
# The serverless executor will detect SUSPENDED status, poll the job,
45+
# and then resume execution once the job completes
46+
process_result = interrupt(
47+
InvokeProcess(
48+
name="TestProcess", # Replace with actual process name
49+
input_arguments={"query": state["query"], "timestamp": "2024-01-08"},
50+
process_folder_path="Shared", # Replace with actual folder
51+
process_folder_key=None,
52+
)
53+
)
54+
55+
print(f"RPA process completed with result: {process_result}")
56+
57+
return {**state, "process_result": process_result}
58+
59+
60+
def format_output(state: State) -> State:
61+
"""Format final output after RPA process completes."""
62+
process_result = state.get("process_result", {})
63+
64+
final_result = (
65+
f"Processed query '{state['query']}' via RPA. Result: {process_result}"
66+
)
67+
68+
return {**state, "final_result": final_result}
69+
70+
71+
# Build the graph
72+
builder = StateGraph(State)
73+
builder.add_node("prepare", prepare_input)
74+
builder.add_node("call_rpa", call_rpa_process)
75+
builder.add_node("format", format_output)
76+
77+
builder.add_edge(START, "prepare")
78+
builder.add_edge("prepare", "call_rpa")
79+
builder.add_edge("call_rpa", "format")
80+
builder.add_edge("format", END)
81+
82+
83+
def main(input_data: Input):
84+
"""Main entry point for the agent.
85+
86+
Returns raw dict to preserve __interrupt__ field for suspend/resume.
87+
When execution suspends, the dict will contain __interrupt__ field with trigger data.
88+
When execution completes, the dict will contain final_result.
89+
"""
90+
from langgraph.checkpoint.memory import MemorySaver
91+
92+
# IMPORTANT: Must use checkpointer for interrupt() to work
93+
checkpointer = MemorySaver()
94+
graph = builder.compile(checkpointer=checkpointer)
95+
96+
# Generate unique thread ID for this execution
97+
import uuid
98+
99+
thread_id = f"agent-{uuid.uuid4()}"
100+
101+
config = {"configurable": {"thread_id": thread_id}}
102+
103+
result = graph.invoke(
104+
{"query": input_data.query, "process_result": None, "final_result": ""}, config
105+
)
106+
107+
# Return raw dict - preserves __interrupt__ field if suspended
108+
# Runtime will detect __interrupt__ and create UiPath trigger
109+
return result

src/uipath/_cli/_evals/_runtime.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,14 @@ async def _execute_eval(
432432
agent_execution_output = await self.execute_runtime(
433433
eval_item, execution_id, runtime
434434
)
435+
436+
logger.info(
437+
f"DEBUG: Agent execution result status: {agent_execution_output.result.status}"
438+
)
439+
logger.info(
440+
f"DEBUG: Agent execution result trigger: {agent_execution_output.result.trigger}"
441+
)
442+
435443
except Exception as e:
436444
if self.context.verbose:
437445
if isinstance(e, EvaluationRuntimeException):
@@ -467,6 +475,58 @@ async def _execute_eval(
467475
)
468476
raise
469477

478+
# Check if execution was suspended (e.g., waiting for RPA job completion)
479+
if (
480+
agent_execution_output.result.status
481+
== UiPathRuntimeStatus.SUSPENDED
482+
):
483+
# For suspended executions, we don't run evaluators yet
484+
# The serverless executor should save the triggers and resume later
485+
logger.info(
486+
f"Evaluation execution suspended for eval '{eval_item.name}' (id: {eval_item.id})"
487+
)
488+
489+
# Extract triggers from result
490+
triggers = []
491+
if agent_execution_output.result.trigger:
492+
triggers.append(agent_execution_output.result.trigger)
493+
if agent_execution_output.result.triggers:
494+
triggers.extend(agent_execution_output.result.triggers)
495+
496+
# IMPORTANT: Always include execution output with triggers when suspended
497+
# This ensures triggers are visible in the output JSON for serverless executor
498+
evaluation_run_results.agent_execution_output = (
499+
convert_eval_execution_output_to_serializable(
500+
agent_execution_output
501+
)
502+
)
503+
504+
# Publish suspended status event
505+
await self.event_bus.publish(
506+
EvaluationEvents.UPDATE_EVAL_RUN,
507+
EvalRunUpdatedEvent(
508+
execution_id=execution_id,
509+
eval_item=eval_item,
510+
eval_results=[],
511+
success=True, # Not failed, just suspended
512+
agent_output={
513+
"status": "suspended",
514+
"triggers": [
515+
t.model_dump(by_alias=True) for t in triggers
516+
],
517+
},
518+
agent_execution_time=agent_execution_output.execution_time,
519+
spans=agent_execution_output.spans,
520+
logs=agent_execution_output.logs,
521+
exception_details=None,
522+
),
523+
wait_for_completion=False,
524+
)
525+
526+
# Return partial results with trigger information
527+
# The evaluation will be completed when resumed
528+
return evaluation_run_results
529+
470530
if self.context.verbose:
471531
evaluation_run_results.agent_execution_output = (
472532
convert_eval_execution_output_to_serializable(

src/uipath/functions/runtime.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
UiPathRuntimeError,
2424
)
2525
from uipath.runtime.schema import UiPathRuntimeSchema
26+
from uipath.runtime.resumable.trigger import (
27+
UiPathResumeTrigger,
28+
UiPathResumeTriggerType,
29+
UiPathResumeTriggerName,
30+
)
2631

2732
from .schema_gen import get_type_schema
2833
from .type_conversion import (
@@ -124,6 +129,71 @@ async def _execute_function(
124129

125130
return convert_from_class(result) if result is not None else {}
126131

132+
def _detect_langgraph_interrupt(
133+
self, output: dict[str, Any]
134+
) -> UiPathResumeTrigger | None:
135+
"""Detect LangGraph __interrupt__ field and extract InvokeProcess trigger.
136+
137+
LangGraph's interrupt() creates an __interrupt__ field in the output dict:
138+
{
139+
"query": "...",
140+
"final_result": "",
141+
"__interrupt__": [Interrupt(value=InvokeProcess(...), id="...")]
142+
}
143+
144+
We extract the InvokeProcess from the interrupt and convert it to a UiPath trigger.
145+
"""
146+
try:
147+
if not isinstance(output, dict):
148+
return None
149+
150+
# Check for LangGraph's __interrupt__ field
151+
if "__interrupt__" not in output:
152+
return None
153+
154+
interrupts = output["__interrupt__"]
155+
if not interrupts or not isinstance(interrupts, list):
156+
logger.warning("__interrupt__ field exists but is not a list")
157+
return None
158+
159+
# Extract first interrupt
160+
interrupt_obj = interrupts[0]
161+
if not hasattr(interrupt_obj, "value"):
162+
logger.warning("Interrupt object missing 'value' attribute")
163+
return None
164+
165+
invoke_process = interrupt_obj.value
166+
167+
# Check if it's an InvokeProcess object (has name and input_arguments)
168+
if not (
169+
hasattr(invoke_process, "name")
170+
and hasattr(invoke_process, "input_arguments")
171+
):
172+
logger.warning(
173+
f"Interrupt value is not InvokeProcess (type: {type(invoke_process)})"
174+
)
175+
return None
176+
177+
logger.info(
178+
f"Detected LangGraph interrupt - suspending execution for process: {invoke_process.name}"
179+
)
180+
181+
# Convert InvokeProcess to UiPath trigger
182+
return UiPathResumeTrigger(
183+
trigger_type=UiPathResumeTriggerType.JOB,
184+
trigger_name=UiPathResumeTriggerName.JOB,
185+
item_key=f"job-{uuid.uuid4()}", # Generate unique job key
186+
folder_path=getattr(invoke_process, "process_folder_path", "Shared"),
187+
payload={
188+
"process_name": invoke_process.name,
189+
"input_arguments": invoke_process.input_arguments or {},
190+
"folder_key": getattr(invoke_process, "process_folder_key", None),
191+
},
192+
)
193+
except Exception as e:
194+
logger.warning(f"Failed to detect LangGraph interrupt: {e}")
195+
return None
196+
127197
async def execute(
128198
self,
129199
input: dict[str, Any] | None = None,
@@ -134,6 +204,21 @@ async def execute(
134204
func = self._load_function()
135205
output = await self._execute_function(func, input or {})
136206

207+
logger.info(f"Output type: {type(output)}, has __interrupt__: {'__interrupt__' in output if isinstance(output, dict) else False}")
208+
209+
# Check if output represents a LangGraph interrupt (suspend)
210+
trigger = self._detect_langgraph_interrupt(output)
211+
logger.info(f"Trigger detected: {trigger}")
212+
if trigger:
213+
logger.info(
214+
f"Detected LangGraph interrupt - suspending execution with trigger: {trigger.item_key}"
215+
)
216+
return UiPathRuntimeResult(
217+
output=None, # No final output yet (suspended)
218+
status=UiPathRuntimeStatus.SUSPENDED,
219+
trigger=trigger,
220+
)
221+
137222
return UiPathRuntimeResult(
138223
output=output,
139224
status=UiPathRuntimeStatus.SUCCESSFUL,

0 commit comments

Comments
 (0)