Skip to content

Commit 43cb335

Browse files
Adds supporting dispatch requests for raw types / non-pydantic objects
1 parent 674d438 commit 43cb335

2 files changed

Lines changed: 221 additions & 0 deletions

File tree

src/intersect_orchestrator/app/core/campaign_orchestrator.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,14 @@ def _build_task_request_payload(
722722
``resolved_output_values`` maps output Value UUIDs from completed tasks to
723723
their runtime values. When a task's input Value ID matches a key in that
724724
dict, the resolved value is used instead of the schema default.
725+
726+
Supports two schema shapes:
727+
* Object schema (``{"type": "object", "properties": {...}}``): builds a
728+
dict payload with property values extracted from resolved values or defaults.
729+
* Primitive schema (``{"type": "string"}``, ``{"type": "integer"}``, etc.):
730+
sends the raw value directly (e.g., ``"uuid-value"`` instead of
731+
``{"uuid": "uuid-value"}``). This is needed for capabilities that expect
732+
a raw primitive argument rather than a Pydantic model.
725733
"""
726734
# TODO: Need to return b'null' for content type application/json
727735
# and empty byte string for non-json content types
@@ -731,6 +739,25 @@ def _build_task_request_payload(
731739
if resolved_output_values is None:
732740
resolved_output_values = {}
733741

742+
schema_type = task.input.json_schema.get('type', 'object')
743+
744+
# Handle non-object schema types (string, integer, number, boolean, array)
745+
if schema_type != 'object':
746+
# For primitive types, use the single value's resolved value or schema default
747+
if task.input.values:
748+
value = task.input.values[0]
749+
if value.id in resolved_output_values:
750+
raw_value = resolved_output_values[value.id]
751+
elif 'default' in task.input.json_schema:
752+
raw_value = task.input.json_schema['default']
753+
else:
754+
return b'null'
755+
return json.dumps(raw_value).encode('utf-8')
756+
elif 'default' in task.input.json_schema:
757+
return json.dumps(task.input.json_schema['default']).encode('utf-8')
758+
return b'null'
759+
760+
# Handle object schema type (existing logic)
734761
properties = task.input.json_schema.get('properties', {})
735762
payload: dict[str, Any] = {}
736763

tests/unit/test_campaign_orchestrator.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,3 +1043,197 @@ def test_cross_group_output_resolves_to_downstream_group_input() -> None:
10431043
_reply_headers(campaign_id, task_update),
10441044
)
10451045
assert _event_types(client.broadcasts)[-1] == 'CAMPAIGN_COMPLETE'
1046+
1047+
1048+
def test_dispatch_request_with_raw_string_schema() -> None:
1049+
"""Test that tasks with a non-object schema (e.g., string) send the raw value, not a wrapped object.
1050+
1051+
When a target capability expects a raw string (like `uuid: str`) instead of an object,
1052+
the orchestrator should send `"the-value"` rather than `{"uuid": "the-value"}`.
1053+
"""
1054+
client = FakeClient()
1055+
orchestrator = CampaignOrchestrator(client)
1056+
1057+
campaign_id = uuid.uuid4()
1058+
step_id = uuid.uuid4()
1059+
campaign = Campaign(
1060+
id=campaign_id,
1061+
run_id=campaign_id,
1062+
name='test-campaign',
1063+
user='test-user',
1064+
description='Test campaign for raw string schema',
1065+
task_groups=[
1066+
{
1067+
'id': str(uuid.uuid4()),
1068+
'tasks': [
1069+
{
1070+
'id': str(step_id),
1071+
'hierarchy': 'org.fac.system.subsystem.service',
1072+
'capability': 'dial',
1073+
'operation_id': 'get_workflow_data',
1074+
'output': None,
1075+
'input': {
1076+
'schema': {
1077+
'type': 'string',
1078+
'default': 'deadbeefdeadbeefdeadbeef',
1079+
},
1080+
'values': [
1081+
{'id': str(uuid.uuid4()), 'var': 'uuid'},
1082+
],
1083+
},
1084+
'task_dependencies': [],
1085+
'task_objectives': None,
1086+
}
1087+
],
1088+
'group_dependencies': [],
1089+
'objectives': [],
1090+
}
1091+
],
1092+
)
1093+
1094+
orchestrator.submit_campaign(campaign)
1095+
1096+
assert len(client.control_plane_manager.published) == 1
1097+
published_payload = client.control_plane_manager.published[0][1]
1098+
# Should be a raw JSON string, not an object
1099+
assert json.loads(published_payload.decode('utf-8')) == 'deadbeefdeadbeefdeadbeef'
1100+
1101+
1102+
def test_dispatch_request_with_raw_integer_schema() -> None:
1103+
"""Test that tasks with an integer schema send the raw integer value."""
1104+
client = FakeClient()
1105+
orchestrator = CampaignOrchestrator(client)
1106+
1107+
campaign_id = uuid.uuid4()
1108+
step_id = uuid.uuid4()
1109+
campaign = Campaign(
1110+
id=campaign_id,
1111+
run_id=campaign_id,
1112+
name='test-campaign',
1113+
user='test-user',
1114+
description='Test campaign for raw integer schema',
1115+
task_groups=[
1116+
{
1117+
'id': str(uuid.uuid4()),
1118+
'tasks': [
1119+
{
1120+
'id': str(step_id),
1121+
'hierarchy': 'org.fac.system.subsystem.service',
1122+
'capability': 'SomeCapability',
1123+
'operation_id': 'some_operation',
1124+
'output': None,
1125+
'input': {
1126+
'schema': {
1127+
'type': 'integer',
1128+
'default': 42,
1129+
},
1130+
'values': [
1131+
{'id': str(uuid.uuid4()), 'var': 'count'},
1132+
],
1133+
},
1134+
'task_dependencies': [],
1135+
'task_objectives': None,
1136+
}
1137+
],
1138+
'group_dependencies': [],
1139+
'objectives': [],
1140+
}
1141+
],
1142+
)
1143+
1144+
orchestrator.submit_campaign(campaign)
1145+
1146+
assert len(client.control_plane_manager.published) == 1
1147+
published_payload = client.control_plane_manager.published[0][1]
1148+
# Should be a raw JSON integer, not an object
1149+
assert json.loads(published_payload.decode('utf-8')) == 42
1150+
1151+
1152+
def test_dispatch_request_with_raw_string_from_resolved_value() -> None:
1153+
"""Test that a raw string schema uses resolved values from previous tasks.
1154+
1155+
When task B depends on task A's output and task B expects a raw string,
1156+
the orchestrator should send the resolved string value directly.
1157+
"""
1158+
client = FakeClient()
1159+
orchestrator = CampaignOrchestrator(client)
1160+
1161+
campaign_id = uuid.uuid4()
1162+
tg1_id = uuid.uuid4()
1163+
tg2_id = uuid.uuid4()
1164+
task_init = uuid.uuid4()
1165+
task_get_data = uuid.uuid4()
1166+
shared_value_id = uuid.uuid4()
1167+
1168+
campaign = Campaign(
1169+
id=campaign_id,
1170+
run_id=campaign_id,
1171+
name='test-campaign',
1172+
user='test-user',
1173+
description='Test campaign with raw string from resolved value',
1174+
task_groups=[
1175+
{
1176+
'id': str(tg1_id),
1177+
'tasks': [
1178+
{
1179+
'id': str(task_init),
1180+
'hierarchy': 'org.fac.system.subsystem.service',
1181+
'capability': 'dial',
1182+
'operation_id': 'init_workflow',
1183+
'output': {
1184+
'schema': {
1185+
'type': 'string',
1186+
},
1187+
'values': [{'id': str(shared_value_id), 'var': 'workflow_id'}],
1188+
},
1189+
'input': None,
1190+
'task_dependencies': [],
1191+
'task_objectives': None,
1192+
}
1193+
],
1194+
'group_dependencies': [],
1195+
'objectives': [],
1196+
},
1197+
{
1198+
'id': str(tg2_id),
1199+
'tasks': [
1200+
{
1201+
'id': str(task_get_data),
1202+
'hierarchy': 'org.fac.system.subsystem.service',
1203+
'capability': 'dial',
1204+
'operation_id': 'get_workflow_data',
1205+
'output': None,
1206+
'input': {
1207+
'schema': {
1208+
'type': 'string',
1209+
'default': 'placeholder',
1210+
},
1211+
'values': [{'id': str(shared_value_id), 'var': 'uuid'}],
1212+
},
1213+
'task_dependencies': [],
1214+
'task_objectives': None,
1215+
}
1216+
],
1217+
'group_dependencies': [str(tg1_id)],
1218+
'objectives': [],
1219+
},
1220+
],
1221+
)
1222+
1223+
orchestrator.submit_campaign(campaign)
1224+
1225+
# Group 1: complete task_init with real workflow_id
1226+
real_workflow_id = 'cafebabecafebabecafebabe'
1227+
orchestrator.handle_request_reply_broker_message(
1228+
json.dumps(real_workflow_id).encode(),
1229+
'application/json',
1230+
_reply_headers(campaign_id, task_init),
1231+
)
1232+
1233+
# Group 2 task_get_data should have been dispatched with the real workflow_id as raw string
1234+
assert len(client.control_plane_manager.published) == 2
1235+
task_get_data_payload = json.loads(client.control_plane_manager.published[1][1].decode())
1236+
# Should be the raw string, not {"uuid": "cafebabecafebabecafebabe"}
1237+
assert task_get_data_payload == real_workflow_id, (
1238+
f"Expected raw string '{real_workflow_id}' but got: {task_get_data_payload}"
1239+
)

0 commit comments

Comments
 (0)