Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/blueapi/core/bluesky_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class DataEvent(BlueapiBaseModel):

name: str
doc: Mapping[str, Any]
task_id: str


@runtime_checkable
Expand Down
5 changes: 4 additions & 1 deletion src/blueapi/worker/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,10 @@ def _on_document(self, name: str, document: Mapping[str, Any]) -> None:

correlation_id = self._current.request_id
self._data_events.publish(
DataEvent(name=name, doc=document), correlation_id
DataEvent(
name=name, task_id=self._current.task_id, doc=document
),
correlation_id,
)
else:
raise ValueError(
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_tests/client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def test_run_task_fails_on_failing_event(
),
),
ProgressEvent(task_id="foo"),
DataEvent(name="start", doc={}),
DataEvent(name="start", doc={}, task_id="0000-1111"),
],
)
def test_run_task_calls_event_callback(
Expand Down
7 changes: 5 additions & 2 deletions tests/unit_tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,9 @@ def test_plan_output_formatting():

def test_event_formatting():
data = DataEvent(
name="start", doc={"foo": "bar", "fizz": {"buzz": (1, 2, 3), "hello": "world"}}
name="start",
doc={"foo": "bar", "fizz": {"buzz": (1, 2, 3), "hello": "world"}},
task_id="0000-1111",
)
worker = WorkerEvent(
state=WorkerState.RUNNING,
Expand All @@ -672,7 +674,8 @@ def test_event_formatting():
data,
(
"""{"name": "start", "doc": """
"""{"foo": "bar", "fizz": {"buzz": [1, 2, 3], "hello": "world"}}}\n"""
"""{"foo": "bar", "fizz": {"buzz": [1, 2, 3], "hello": "world"}}, """
""""task_id": "0000-1111"}\n"""
),
)
_assert_matching_formatting(OutputFormat.COMPACT, data, "Data Event: start\n")
Expand Down
8 changes: 4 additions & 4 deletions tests/unit_tests/worker/test_task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,10 @@ def test_worker_and_data_events_produce_in_order(
errors=[],
warnings=[],
),
DataEvent(name="start", doc={}),
DataEvent(name="descriptor", doc={}),
DataEvent(name="event", doc={}),
DataEvent(name="stop", doc={}),
DataEvent(name="start", doc={}, task_id="0000-1111"),
DataEvent(name="descriptor", doc={}, task_id="0000-1111"),
DataEvent(name="event", doc={}, task_id="0000-1111"),
DataEvent(name="stop", doc={}, task_id="0000-1111"),
WorkerEvent(
state=WorkerState.IDLE,
task_status=TaskStatus(
Expand Down
Loading