Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 73 additions & 60 deletions src/intersect_orchestrator/app/core/campaign_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class CampaignState:
resolved_output_values: dict[uuid.UUID, Any] = field(default_factory=dict)
"""Maps output Value IDs from completed tasks to their resolved runtime values.
Persists across task groups so upstream outputs flow into downstream inputs."""
lock: threading.Lock = field(default_factory=threading.Lock)
"""Per-campaign lock to serialise broker callback threads."""


class CampaignOrchestrator:
Expand Down Expand Up @@ -154,7 +156,11 @@ def submit_campaign(self, campaign: Campaign) -> IntersectCampaignId:

self._repository.create_campaign(campaign_id, campaign, campaign_state)

self._start_next_iteration(state)
# Hold the per-campaign lock during the initial dispatch so that
# a fast service reply arriving in a broker callback thread cannot race with
# the first iteration setup.
with state.lock:
self._start_next_iteration(state)
return campaign_id

def cancel_campaign(self, campaign_id: IntersectCampaignId) -> bool:
Expand Down Expand Up @@ -254,57 +260,59 @@ def handle_request_reply_broker_message(

node_id = headers.request_id

# Check that the reply matches one of the currently active tasks
if state.current_group_index >= len(state.task_group_executions):
logger.error('Campaign has no active task group for ID: %s', campaign_id)
self._handle_request_reply_service_error(
state,
headers.source,
'No active task group',
failed_step_id=node_id,
)
return

execution = state.task_group_executions[state.current_group_index]
if node_id not in execution.active_tasks:
logger.error(
'Node ID %s from message is not among active tasks for campaign ID: %s',
node_id,
campaign_id,
)
self._handle_request_reply_service_error(
state,
headers.source,
'Node ID from message does not match any active task',
failed_step_id=node_id,
)
return
# Acquire per-campaign lock to serialise concurrent broker reply threads.
with state.lock:
# Check that the reply matches one of the currently active tasks
if state.current_group_index >= len(state.task_group_executions):
logger.error('Campaign has no active task group for ID: %s', campaign_id)
self._handle_request_reply_service_error(
state,
headers.source,
'No active task group',
failed_step_id=node_id,
)
return

if content_type == 'application/json':
try:
payload = json.loads(message)
except json.JSONDecodeError:
logger.exception('Message claimed to be JSON but was not')
execution = state.task_group_executions[state.current_group_index]
if node_id not in execution.active_tasks:
logger.error(
'Node ID %s from message is not among active tasks for campaign ID: %s',
node_id,
campaign_id,
)
self._handle_request_reply_service_error(
state,
headers.source,
'Message claimed to be JSON but was not',
'Node ID from message does not match any active task',
failed_step_id=node_id,
)
return
else:
payload = message

if headers.has_error:
self._handle_request_reply_service_error(
state,
headers.source,
str(payload),
failed_step_id=node_id,
)
return
if content_type == 'application/json':
try:
payload = json.loads(message)
except json.JSONDecodeError:
logger.exception('Message claimed to be JSON but was not')
self._handle_request_reply_service_error(
state,
headers.source,
'Message claimed to be JSON but was not',
failed_step_id=node_id,
)
return
else:
payload = message

self._complete_step(state, node_id, message)
if headers.has_error:
self._handle_request_reply_service_error(
state,
headers.source,
str(payload),
failed_step_id=node_id,
)
return

self._complete_step(state, node_id, message)

def handle_event_broker_message(
self, message: bytes, content_type: str, raw_headers: dict[str, str]
Expand Down Expand Up @@ -364,25 +372,30 @@ def handle_event_broker_message(
if current_state is None:
continue

if current_state.current_group_index >= len(current_state.task_group_executions):
continue
# Acquire per-campaign lock to serialise concurrent broker callback threads:
# without it two threads can both see the same event task in
# active_tasks and both call _complete_step, causing double-dispatch
# of dependent tasks (and the second reply killing the campaign).
with current_state.lock:
if current_state.current_group_index >= len(current_state.task_group_executions):
continue

execution = current_state.task_group_executions[current_state.current_group_index]
if task_id not in execution.active_tasks:
continue
execution = current_state.task_group_executions[current_state.current_group_index]
if task_id not in execution.active_tasks:
continue

self._record_task_event(
campaign_id=current_state.campaign_run_id,
task_group_id=execution.task_group_id,
task_id=task_id,
event_type='TASK_EVENT_RECEIVED',
payload={
'source': headers.source,
'capability_name': headers.capability_name,
'event_name': headers.event_name,
},
)
self._complete_step(current_state, task_id, message)
self._record_task_event(
campaign_id=current_state.campaign_run_id,
task_group_id=execution.task_group_id,
task_id=task_id,
event_type='TASK_EVENT_RECEIVED',
payload={
'source': headers.source,
'capability_name': headers.capability_name,
'event_name': headers.event_name,
},
)
self._complete_step(current_state, task_id, message)

def _handle_request_reply_service_error(
self,
Expand Down
Loading