Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
493ac83
fix: Pass headers into TiledWriter
DiamondJoseph Nov 21, 2025
8fbe37b
test: In-progress system tests
DiamondJoseph Nov 21, 2025
910c7d6
Add keycloak config using commadline
ZohebShaikh Nov 24, 2025
0477234
Merge commit 'cef2b55a22416c1b60058559eed44d8e7eadb2da' into authn-co…
DiamondJoseph Nov 24, 2025
863f808
fix(tests): Prevent spurious metadata when Tiled not configured
DiamondJoseph Nov 24, 2025
74db417
fix: Metadata dimensions
DiamondJoseph Nov 24, 2025
457730a
fix: Fix sent access tags
DiamondJoseph Nov 24, 2025
e2245af
fix: typing
DiamondJoseph Nov 24, 2025
f36252b
Pass access blob in expected form
DiamondJoseph Nov 25, 2025
e5b3d9a
subscribe to TiledWriter only when token available
DiamondJoseph Nov 25, 2025
3e3cb88
fix typing of TiledConstructor
DiamondJoseph Nov 25, 2025
c35ac3f
Pass beamline to make decision
DiamondJoseph Nov 26, 2025
1720b85
Cast proposal and visit to int
DiamondJoseph Nov 27, 2025
8eb6ab9
Fix tests
DiamondJoseph Nov 27, 2025
2b52dfe
Merge commit 'd8a989cc6e2740e9eb3e8adf356a2c1b5ce41d3b' into authn-co…
DiamondJoseph Nov 27, 2025
cf3f818
Revert "test: add system tests for tiled integration"
DiamondJoseph Nov 27, 2025
e3690a8
fix system tests
DiamondJoseph Nov 27, 2025
a82c547
Temporarily disable system tests in ci
DiamondJoseph Nov 27, 2025
7ddb65d
Fix erroneous full queue error
keithralphs Nov 28, 2025
a184bee
fix: Access blob keys names
DiamondJoseph Dec 2, 2025
2422686
Standardise outputs and inputs of authz
DiamondJoseph Dec 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ on:
branches:
- main
tags:
- '*'
- "*"
pull_request:

jobs:

lint:
uses: ./.github/workflows/_tox.yml
with:
Expand All @@ -32,8 +31,8 @@ jobs:
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

system-test:
uses: ./.github/workflows/_system_test.yml
# system-test:
# uses: ./.github/workflows/_system_test.yml

container:
needs: test
Expand All @@ -48,7 +47,6 @@ jobs:
docs:
uses: ./.github/workflows/_docs.yml


dist:
uses: ./.github/workflows/_dist.yml

Expand Down
10 changes: 10 additions & 0 deletions src/blueapi/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
DodalSource,
EnvironmentConfig,
PlanSource,
TiledConfig,
)
from blueapi.core.protocols import DeviceManager
from blueapi.utils import (
Expand Down Expand Up @@ -116,6 +117,7 @@ class BlueskyContext:
run_engine: RunEngine = field(
default_factory=lambda: RunEngine(context_managers=[])
)
tiled_conf: TiledConfig | None = field(default=None, init=False, repr=False)
numtracker: NumtrackerClient | None = field(default=None, init=False, repr=False)
path_provider: PathProvider | None = None
plans: dict[str, Plan] = field(default_factory=dict)
Expand Down Expand Up @@ -168,6 +170,14 @@ def _update_scan_num(md: dict[str, Any]) -> int:
"the devices. Remove this path provider to use numtracker."
)

if (tiled_conf := configuration.tiled) is not None and tiled_conf.enabled:
if configuration.env.metadata is None:
raise InvalidConfigError(
"Tiled has been configured but `instrument` metadata is not set- "
"this field is required to make authorization decisions."
)
self.tiled_conf = tiled_conf

def find_device(self, addr: str | list[str]) -> Device | None:
"""
Find a device in this context, allows for recursive search.
Expand Down
69 changes: 53 additions & 16 deletions src/blueapi/service/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections.abc import Mapping
from functools import cache
from queue import Full
from typing import Any

from bluesky.callbacks.tiled_writer import TiledWriter
Expand All @@ -8,7 +9,7 @@
from tiled.client import from_uri

from blueapi.cli.scratch import get_python_environment
from blueapi.config import ApplicationConfig, OIDCConfig, StompConfig, TiledConfig
from blueapi.config import ApplicationConfig, OIDCConfig, StompConfig
from blueapi.core.context import BlueskyContext
from blueapi.core.event import EventStream
from blueapi.log import set_up_logging
Expand All @@ -20,7 +21,8 @@
TaskRequest,
WorkerTask,
)
from blueapi.worker.event import TaskStatusEnum, WorkerState
from blueapi.utils.serialization import access_blob
from blueapi.worker.event import TaskStatusEnum, WorkerEvent, WorkerState
from blueapi.worker.task import Task
from blueapi.worker.task_worker import TaskWorker, TrackableTask

Expand Down Expand Up @@ -87,16 +89,6 @@ def stomp_client() -> StompClient | None:
return None


@cache
def tiled_writer() -> TiledWriter | None:
tiled_config: TiledConfig = config().tiled
if tiled_config.enabled:
client = from_uri(str(tiled_config.url), api_key=tiled_config.api_key)
return TiledWriter(client, batch_size=1)
else:
return None


def setup(config: ApplicationConfig) -> None:
"""Creates and starts a worker with supplied config"""
set_config(config)
Expand All @@ -105,8 +97,6 @@ def setup(config: ApplicationConfig) -> None:
# Eagerly initialize worker and messaging connection
worker()
stomp_client()
if writer := tiled_writer():
context().run_engine.subscribe(writer)


def teardown() -> None:
Expand All @@ -116,7 +106,6 @@ def teardown() -> None:
context.cache_clear()
worker.cache_clear()
stomp_client.cache_clear()
tiled_writer.cache_clear()


def _publish_event_streams(
Expand Down Expand Up @@ -158,10 +147,20 @@ def get_device(name: str) -> DeviceModel:

def submit_task(task_request: TaskRequest) -> str:
"""Submit a task to be run on begin_task"""
metadata: dict[str, Any] = {
"instrument_session": task_request.instrument_session,
}
if context().tiled_conf is not None:
md = config().env.metadata
# We raise an InvalidConfigError if this isn't set
assert md
metadata["tiled_access_tags"] = [
access_blob(task_request.instrument_session, md.instrument)
]
task = Task(
name=task_request.name,
params=task_request.params,
metadata={"instrument_session": task_request.instrument_session},
metadata=metadata,
)
return worker().submit_task(task)

Expand All @@ -175,8 +174,46 @@ def begin_task(
task: WorkerTask, pass_through_headers: Mapping[str, str] | None = None
) -> WorkerTask:
"""Trigger a task. Will fail if the worker is busy"""
active_task = worker().get_active_task()
if active_task is not None and not active_task.is_complete:
raise Full()
if nt := context().numtracker:
nt.set_headers(pass_through_headers or {})

def unset_headers_when_task_finished(
event: WorkerEvent, correlation_id: str | None
) -> None:
if (
event.task_status
and event.task_status.task_id == task.task_id
and event.task_status.task_complete
):
nt.set_headers({})

worker().worker_events.subscribe(unset_headers_when_task_finished)

if tiled_config := context().tiled_conf:
tiled_client = from_uri(
str(tiled_config.url),
api_key=tiled_config.api_key,
headers=pass_through_headers,
)
tiled_writer_token = context().run_engine.subscribe(
TiledWriter(tiled_client, batch_size=1)
)

def remove_callback_when_task_finished(
event: WorkerEvent, correlation_id: str | None
) -> None:
if (
event.task_status
and event.task_status.task_id == task.task_id
and event.task_status.task_complete
):
context().run_engine.unsubscribe(tiled_writer_token)

worker().worker_events.subscribe(remove_callback_when_task_finished)

if task.task_id is not None:
worker().begin_task(task.task_id)
return task
Expand Down
20 changes: 20 additions & 0 deletions src/blueapi/utils/serialization.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json
import re
from typing import Any

from pydantic import BaseModel
Expand All @@ -24,3 +26,21 @@ def serialize(obj: Any) -> Any:
return serialize(obj.__pydantic_model__)
else:
return obj


_INSTRUMENT_SESSION_AUTHZ_REGEX: str = r"^[a-zA-Z]{2}(\d+)-(\d+)$"


def access_blob(instrument_session: str, beamline: str) -> str:
m = re.search(_INSTRUMENT_SESSION_AUTHZ_REGEX, instrument_session)
if m is None:
raise ValueError(
"Unable to extract proposal and visit from "
f"instrument session {instrument_session}"
)
blob = {
"proposal": int(m.group(1)),
"visit": int(m.group(2)),
"beamline": beamline,
}
return json.dumps(blob)
5 changes: 4 additions & 1 deletion tests/system_tests/test_blueapi_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,10 @@ def test_instrument_session_propagated(client: BlueapiClient):
response = client.create_task(_SIMPLE_TASK)
trackable_task = client.get_task(response.task_id)
assert trackable_task.task.metadata == {
"instrument_session": FAKE_INSTRUMENT_SESSION
"instrument_session": FAKE_INSTRUMENT_SESSION,
"tiled_access_tags": [
'{"proposal_number": 12345, "visit_number": 1, "beamline": "adsim"}',
],
}


Expand Down
40 changes: 40 additions & 0 deletions tests/unit_tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@
from pytest import LogCaptureFixture

from blueapi.config import (
ApplicationConfig,
DeviceManagerSource,
DeviceSource,
DodalSource,
EnvironmentConfig,
MetadataConfig,
PlanSource,
TiledConfig,
)
from blueapi.core import BlueskyContext, is_bluesky_compatible_device
from blueapi.core.context import DefaultFactory, generic_bounds, qualified_name
from blueapi.core.protocols import DeviceConnectResult, DeviceManager
from blueapi.utils.connect_devices import _establish_device_connections
from blueapi.utils.invalid_config_error import InvalidConfigError

SIM_MOTOR_NAME = "sim"
ALT_MOTOR_NAME = "alt"
Expand Down Expand Up @@ -837,3 +840,40 @@ def test_non_device_manager_errors(empty_context: BlueskyContext):
imp_mod.side_effect = lambda mod: dev_mod if mod == "foo.bar" else None
with pytest.raises(ValueError, match="not a device manager"):
empty_context.with_config(env)


def test_setup_without_tiled_not_makes_tiled_inserter():
config = TiledConfig(enabled=False)
context = BlueskyContext(
ApplicationConfig(
tiled=config,
env=EnvironmentConfig(metadata=MetadataConfig(instrument="ixx")),
)
)
assert context.tiled_conf is None


def test_setup_default_not_makes_tiled_inserter():
context = BlueskyContext(ApplicationConfig())
assert context.tiled_conf is None


@pytest.mark.parametrize("api_key", [None, "foo"])
def test_setup_with_tiled_makes_tiled_inserter(api_key: str | None):
config = TiledConfig(enabled=True, api_key=api_key)
context = BlueskyContext(
ApplicationConfig(
tiled=config,
env=EnvironmentConfig(metadata=MetadataConfig(instrument="ixx")),
)
)
assert context.tiled_conf == config


@pytest.mark.parametrize("api_key", [None, "foo"])
def test_must_have_instrument_set_for_tiled(api_key: str | None):
config = TiledConfig(enabled=True, api_key=api_key)
with pytest.raises(InvalidConfigError):
BlueskyContext(
ApplicationConfig(tiled=config, env=EnvironmentConfig(metadata=None))
)
58 changes: 25 additions & 33 deletions tests/unit_tests/service/test_interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import uuid
from dataclasses import dataclass
from typing import Any
from unittest.mock import ANY, MagicMock, Mock, patch

import pytest
Expand All @@ -25,7 +27,6 @@
PlanSource,
ScratchConfig,
StompConfig,
TiledConfig,
)
from blueapi.core.context import BlueskyContext
from blueapi.service import interface
Expand Down Expand Up @@ -316,10 +317,19 @@ def test_get_tasks(get_tasks_mock: MagicMock):
assert interface.get_tasks() == tasks


@pytest.mark.parametrize("tiled_enabled", [True, False])
@patch("blueapi.service.interface.context")
def test_get_task_by_id(context_mock: MagicMock):
@patch("blueapi.service.interface.config")
def test_get_task_by_id(
config_mock: MagicMock, context_mock: MagicMock, tiled_enabled: bool
):
context = BlueskyContext()
context.register_plan(my_plan)
if tiled_enabled:
context.tiled_conf = MagicMock()
config_mock.return_value = ApplicationConfig(
env=EnvironmentConfig(metadata=MetadataConfig(instrument="ixx"))
)
context_mock.return_value = context

task_id = interface.submit_task(
Expand All @@ -329,15 +339,25 @@ def test_get_task_by_id(context_mock: MagicMock):
)
)

expected_metadata: dict[str, Any] = {
"instrument_session": FAKE_INSTRUMENT_SESSION,
}

if tiled_enabled:
expected_access_tag = {
"proposal_number": 12345,
"visit_number": 1,
"beamline": "ixx",
}
expected_metadata["tiled_access_tags"] = [json.dumps(expected_access_tag)]

assert interface.get_task_by_id(task_id) == TrackableTask.model_construct(
task_id=task_id,
request_id=ANY,
task=Task(
name="my_plan",
params={},
metadata={
"instrument_session": FAKE_INSTRUMENT_SESSION,
},
metadata=expected_metadata,
),
is_complete=False,
is_pending=True,
Expand Down Expand Up @@ -509,34 +529,6 @@ def test_setup_with_numtracker_makes_start_document_provider():
clear_path_provider()


def test_setup_without_tiled_not_makes_tiled_inserter():
with patch("blueapi.service.interface.from_uri") as from_uri:
conf = ApplicationConfig()
interface.setup(conf)

assert from_uri.call_count == 0


def test_setup_with_tiled_makes_tiled_inserter():
with patch("blueapi.service.interface.from_uri") as from_uri:
conf = ApplicationConfig(tiled=TiledConfig(enabled=True))
interface.setup(conf)

assert from_uri.call_count == 1
assert from_uri.call_args.args == ("http://localhost:8407/",)
assert from_uri.call_args.kwargs == {"api_key": None}


def test_setup_with_tiled_api_key_makes_tiled_inserter():
with patch("blueapi.service.interface.from_uri") as from_uri:
conf = ApplicationConfig(tiled=TiledConfig(enabled=True, api_key="foobarbaz"))
interface.setup(conf)

assert from_uri.call_count == 1
assert from_uri.call_args.args == ("http://localhost:8407/",)
assert from_uri.call_args.kwargs == {"api_key": "foobarbaz"}


def test_setup_with_numtracker_raises_if_provider_is_defined_in_device_module():
conf = ApplicationConfig(
env=EnvironmentConfig(
Expand Down
Loading
Loading