-
Notifications
You must be signed in to change notification settings - Fork 20.8k
Expand file tree
/
Copy pathworkflow_entry.py
More file actions
524 lines (468 loc) · 20.1 KB
/
workflow_entry.py
File metadata and controls
524 lines (468 loc) · 20.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
import logging
import time
from collections.abc import Generator, Mapping, Sequence
from typing import Any, cast
from configs import dify_config
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context
from core.app.workflow.layers.llm_quota import LLMQuotaLayer
from core.app.workflow.layers.observability import ObservabilityLayer
from core.workflow.node_factory import DifyNodeFactory
from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
from dify_graph.constants import ENVIRONMENT_VARIABLE_NODE_ID
from dify_graph.entities import GraphInitParams
from dify_graph.entities.graph_config import NodeConfigDictAdapter
from dify_graph.errors import WorkflowNodeRunFailedError
from dify_graph.file.models import File
from dify_graph.graph import Graph
from dify_graph.graph_engine import GraphEngine, GraphEngineConfig
from dify_graph.graph_engine.command_channels import InMemoryChannel
from dify_graph.graph_engine.layers import DebugLoggingLayer, ExecutionLimitsLayer
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_engine.protocols.command_channel import CommandChannel
from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent
from dify_graph.nodes import NodeType
from dify_graph.nodes.base.node import Node
from dify_graph.runtime import ChildGraphNotFoundError, GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool
from extensions.otel.runtime import is_instrument_flag_enabled
from factories import file_factory
from models.workflow import Workflow
logger = logging.getLogger(__name__)
class _WorkflowChildEngineBuilder:
@staticmethod
def _has_node_id(graph_config: Mapping[str, Any], node_id: str) -> bool | None:
"""
Return whether `graph_config["nodes"]` contains the given node id.
Returns `None` when the nodes payload shape is unexpected, so graph-level
validation can surface the original configuration error.
"""
nodes = graph_config.get("nodes")
if not isinstance(nodes, list):
return None
for node in nodes:
if not isinstance(node, Mapping):
return None
current_id = node.get("id")
if isinstance(current_id, str) and current_id == node_id:
return True
return False
def build_child_engine(
self,
*,
workflow_id: str,
graph_init_params: GraphInitParams,
graph_runtime_state: GraphRuntimeState,
graph_config: Mapping[str, Any],
root_node_id: str,
layers: Sequence[object] = (),
) -> GraphEngine:
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
has_root_node = self._has_node_id(graph_config=graph_config, node_id=root_node_id)
if has_root_node is False:
raise ChildGraphNotFoundError(f"child graph root node '{root_node_id}' not found")
child_graph = Graph.init(
graph_config=graph_config,
node_factory=node_factory,
root_node_id=root_node_id,
)
child_engine = GraphEngine(
workflow_id=workflow_id,
graph=child_graph,
graph_runtime_state=graph_runtime_state,
command_channel=InMemoryChannel(),
config=GraphEngineConfig(),
child_engine_builder=self,
)
child_engine.layer(LLMQuotaLayer())
for layer in layers:
child_engine.layer(cast(GraphEngineLayer, layer))
return child_engine
class WorkflowEntry:
def __init__(
self,
tenant_id: str,
app_id: str,
workflow_id: str,
graph_config: Mapping[str, Any],
graph: Graph,
user_id: str,
user_from: UserFrom,
invoke_from: InvokeFrom,
call_depth: int,
variable_pool: VariablePool,
graph_runtime_state: GraphRuntimeState,
command_channel: CommandChannel | None = None,
) -> None:
"""
Init workflow entry
:param tenant_id: tenant id
:param app_id: app id
:param workflow_id: workflow id
:param workflow_type: workflow type
:param graph_config: workflow graph config
:param graph: workflow graph
:param user_id: user id
:param user_from: user from
:param invoke_from: invoke from
:param call_depth: call depth
:param variable_pool: variable pool
:param graph_runtime_state: pre-created graph runtime state
:param command_channel: command channel for external control (optional, defaults to InMemoryChannel)
:param thread_pool_id: thread pool id
"""
# check call depth
workflow_call_max_depth = dify_config.WORKFLOW_CALL_MAX_DEPTH
if call_depth > workflow_call_max_depth:
raise ValueError(f"Max workflow call depth {workflow_call_max_depth} reached.")
# Use provided command channel or default to InMemoryChannel
if command_channel is None:
command_channel = InMemoryChannel()
self.command_channel = command_channel
self._child_engine_builder = _WorkflowChildEngineBuilder()
self.graph_engine = GraphEngine(
workflow_id=workflow_id,
graph=graph,
graph_runtime_state=graph_runtime_state,
command_channel=command_channel,
config=GraphEngineConfig(
min_workers=dify_config.GRAPH_ENGINE_MIN_WORKERS,
max_workers=dify_config.GRAPH_ENGINE_MAX_WORKERS,
scale_up_threshold=dify_config.GRAPH_ENGINE_SCALE_UP_THRESHOLD,
scale_down_idle_time=dify_config.GRAPH_ENGINE_SCALE_DOWN_IDLE_TIME,
),
child_engine_builder=self._child_engine_builder,
)
# Add debug logging layer when in debug mode
if dify_config.DEBUG:
logger.info("Debug mode enabled - adding DebugLoggingLayer to GraphEngine")
debug_layer = DebugLoggingLayer(
level="DEBUG",
include_inputs=True,
include_outputs=True,
include_process_data=False, # Process data can be very verbose
logger_name=f"GraphEngine.Debug.{workflow_id[:8]}", # Use workflow ID prefix for unique logger
)
self.graph_engine.layer(debug_layer)
# Add execution limits layer
limits_layer = ExecutionLimitsLayer(
max_steps=dify_config.WORKFLOW_MAX_EXECUTION_STEPS, max_time=dify_config.WORKFLOW_MAX_EXECUTION_TIME
)
self.graph_engine.layer(limits_layer)
self.graph_engine.layer(LLMQuotaLayer())
# Add observability layer when OTel is enabled
if dify_config.ENABLE_OTEL or is_instrument_flag_enabled():
self.graph_engine.layer(ObservabilityLayer())
def run(self) -> Generator[GraphEngineEvent, None, None]:
graph_engine = self.graph_engine
try:
# run workflow
generator = graph_engine.run()
yield from generator
except GenerateTaskStoppedError:
pass
except Exception as e:
logger.exception("Unknown Error when workflow entry running")
yield GraphRunFailedEvent(error=str(e))
return
@classmethod
def single_step_run(
cls,
*,
workflow: Workflow,
node_id: str,
user_id: str,
user_inputs: Mapping[str, Any],
variable_pool: VariablePool,
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
) -> tuple[Node, Generator[GraphNodeEventBase, None, None]]:
"""
Single step run workflow node
:param workflow: Workflow instance
:param node_id: node id
:param user_id: user id
:param user_inputs: user inputs
:return:
"""
node_config = workflow.get_node_config_by_id(node_id)
node_config_data = node_config["data"]
# Get node type
node_type = node_config_data.type
# init graph init params and runtime state
graph_init_params = GraphInitParams(
workflow_id=workflow.id,
graph_config=workflow.graph_dict,
run_context=build_dify_run_context(
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
user_id=user_id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
),
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
# init workflow run state
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
node = node_factory.create_node(node_config)
node_cls = type(node)
try:
# variable selector to variable mapping
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(
graph_config=workflow.graph_dict, config=node_config
)
except NotImplementedError:
variable_mapping = {}
# Loading missing variable from draft var here, and set it into
# variable_pool.
load_into_variable_pool(
variable_loader=variable_loader,
variable_pool=variable_pool,
variable_mapping=variable_mapping,
user_inputs=user_inputs,
)
if node_type != NodeType.DATASOURCE:
cls.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping,
user_inputs=user_inputs,
variable_pool=variable_pool,
tenant_id=workflow.tenant_id,
)
try:
generator = cls._traced_node_run(node)
except Exception as e:
logger.exception(
"error while running node, workflow_id=%s, node_id=%s, node_type=%s, node_version=%s",
workflow.id,
node.id,
node.node_type,
node.version(),
)
raise WorkflowNodeRunFailedError(node=node, err_msg=str(e))
return node, generator
@staticmethod
def _create_single_node_graph(
node_id: str,
node_data: dict[str, Any],
node_width: int = 114,
node_height: int = 514,
) -> dict[str, Any]:
"""
Create a minimal graph structure for testing a single node in isolation.
:param node_id: ID of the target node
:param node_data: configuration data for the target node
:param node_width: width for UI layout (default: 200)
:param node_height: height for UI layout (default: 100)
:return: graph dictionary with start node and target node
"""
node_config = {
"id": node_id,
"width": node_width,
"height": node_height,
"type": "custom",
"data": node_data,
}
start_node_config = {
"id": "start",
"width": node_width,
"height": node_height,
"type": "custom",
"data": {
"type": NodeType.START,
"title": "Start",
"desc": "Start",
},
}
return {
"nodes": [start_node_config, node_config],
"edges": [
{
"source": "start",
"target": node_id,
"sourceHandle": "source",
"targetHandle": "target",
}
],
}
@classmethod
def run_free_node(
cls, node_data: dict[str, Any], node_id: str, tenant_id: str, user_id: str, user_inputs: dict[str, Any]
) -> tuple[Node, Generator[GraphNodeEventBase, None, None]]:
"""
Run free node
NOTE: only parameter_extractor/question_classifier are supported
:param node_data: node data
:param node_id: node id
:param tenant_id: tenant id
:param user_id: user id
:param user_inputs: user inputs
:return:
"""
# Create a minimal graph for single node execution
graph_dict = cls._create_single_node_graph(node_id, node_data)
node_type = NodeType(node_data.get("type", ""))
if node_type not in {NodeType.PARAMETER_EXTRACTOR, NodeType.QUESTION_CLASSIFIER}:
raise ValueError(f"Node type {node_type} not supported")
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type]["1"]
if not node_cls:
raise ValueError(f"Node class not found for node type {node_type}")
# init variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
environment_variables=[],
)
# init graph init params and runtime state
graph_init_params = GraphInitParams(
workflow_id="",
graph_config=graph_dict,
run_context=build_dify_run_context(
tenant_id=tenant_id,
app_id="",
user_id=user_id,
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
),
call_depth=0,
)
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
# init workflow run state
node_config = NodeConfigDictAdapter.validate_python({"id": node_id, "data": node_data})
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
node = node_factory.create_node(node_config)
try:
# variable selector to variable mapping
try:
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(
graph_config=graph_dict, config=node_config
)
except NotImplementedError:
variable_mapping = {}
cls.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping,
user_inputs=user_inputs,
variable_pool=variable_pool,
tenant_id=tenant_id,
)
generator = cls._traced_node_run(node)
return node, generator
except Exception as e:
logger.exception(
"error while running node, node_id=%s, node_type=%s, node_version=%s",
node.id,
node.node_type,
node.version(),
)
raise WorkflowNodeRunFailedError(node=node, err_msg=str(e))
@staticmethod
def handle_special_values(value: Mapping[str, Any] | None) -> Mapping[str, Any] | None:
# NOTE(QuantumGhost): Avoid using this function in new code.
# Keep values structured as long as possible and only convert to dict
# immediately before serialization (e.g., JSON serialization) to maintain
# data integrity and type information.
result = WorkflowEntry._handle_special_values(value)
return result if isinstance(result, Mapping) or result is None else dict(result)
@staticmethod
def _handle_special_values(value: Any):
if value is None:
return value
if isinstance(value, dict):
res = {}
for k, v in value.items():
res[k] = WorkflowEntry._handle_special_values(v)
return res
if isinstance(value, list):
res_list = []
for item in value:
res_list.append(WorkflowEntry._handle_special_values(item))
return res_list
if isinstance(value, File):
return value.to_dict()
return value
@classmethod
def mapping_user_inputs_to_variable_pool(
cls,
*,
variable_mapping: Mapping[str, Sequence[str]],
user_inputs: Mapping[str, Any],
variable_pool: VariablePool,
tenant_id: str,
):
# NOTE(QuantumGhost): This logic should remain synchronized with
# the implementation of `load_into_variable_pool`, specifically the logic about
# variable existence checking.
# WARNING(QuantumGhost): The semantics of this method are not clearly defined,
# and multiple parts of the codebase depend on its current behavior.
# Modify with caution.
for node_variable, variable_selector in variable_mapping.items():
# fetch node id and variable key from node_variable
node_variable_list = node_variable.split(".")
if len(node_variable_list) < 1:
raise ValueError(f"Invalid node variable {node_variable}")
node_variable_key = ".".join(node_variable_list[1:])
if (node_variable_key not in user_inputs and node_variable not in user_inputs) and not variable_pool.get(
variable_selector
):
raise ValueError(f"Variable key {node_variable} not found in user inputs.")
# environment variable already exist in variable pool, not from user inputs
if variable_pool.get(variable_selector) and variable_selector[0] == ENVIRONMENT_VARIABLE_NODE_ID:
continue
# fetch variable node id from variable selector
variable_node_id = variable_selector[0]
variable_key_list = variable_selector[1:]
variable_key_list = list(variable_key_list)
# get input value
input_value = user_inputs.get(node_variable)
if not input_value:
input_value = user_inputs.get(node_variable_key)
if input_value is None:
continue
if isinstance(input_value, dict) and "type" in input_value and "transfer_method" in input_value:
input_value = file_factory.build_from_mapping(mapping=input_value, tenant_id=tenant_id)
if (
isinstance(input_value, list)
and all(isinstance(item, dict) for item in input_value)
and all("type" in item and "transfer_method" in item for item in input_value)
):
input_value = file_factory.build_from_mappings(mappings=input_value, tenant_id=tenant_id)
# append variable and value to variable pool
if variable_node_id != ENVIRONMENT_VARIABLE_NODE_ID:
# In single run, the input_value is set as the LLM's structured output value within the variable_pool.
if len(variable_key_list) == 2 and variable_key_list[0] == "structured_output":
input_value = {variable_key_list[1]: input_value}
variable_key_list = variable_key_list[0:1]
# Support for a single node to reference multiple structured_output variables
current_variable = variable_pool.get([variable_node_id] + variable_key_list)
if current_variable and isinstance(current_variable.value, dict):
input_value = current_variable.value | input_value
variable_pool.add([variable_node_id] + variable_key_list, input_value)
@staticmethod
def _traced_node_run(node: Node) -> Generator[GraphNodeEventBase, None, None]:
"""
Wraps a node's run method with OpenTelemetry tracing and returns a generator.
"""
# Wrap node.run() with ObservabilityLayer hooks to produce node-level spans
layer = ObservabilityLayer()
layer.on_graph_start()
node.ensure_execution_id()
def _gen():
error: Exception | None = None
layer.on_node_run_start(node)
try:
yield from node.run()
except Exception as exc:
error = exc
raise
finally:
layer.on_node_run_end(node, error)
return _gen()