Skip to content

Commit 0d30342

Browse files
authored
feat(client) Add methods to TaskbrokerApp to hide registry (#560)
Now that we have the TaskbrokerApp the registry serves little purpose. These methods enable us to refactor it away. Refs STREAM-716
1 parent 6dd430c commit 0d30342

6 files changed

Lines changed: 95 additions & 8 deletions

File tree

.github/workflows/python-example-image.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
uses: getsentry/action-build-and-push-images@a53f146fc1ea3cb404f2dcf7378f5b60dd98d3ca
2323
with:
2424
image_name: 'taskbroker-python-example'
25-
platforms: linux/${{ matrix.platform }}
25+
platforms: linux/amd64
2626
dockerfile_path: './clients/python/Dockerfile.example'
2727
dockerfile_context: './clients/python'
2828
ghcr: true

clients/python/src/examples/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919

2020
# Create a namespace and register tasks
21-
exampletasks = app.taskregistry.create_namespace("examples")
21+
exampletasks = app.create_namespace("examples")
2222

2323

2424
@exampletasks.register(name="examples.simple_task")

clients/python/src/taskbroker_client/app.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1+
import datetime
12
import importlib
23
from collections.abc import Iterable
34
from typing import Any
45

56
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation
67

8+
from taskbroker_client.constants import DEFAULT_PROCESSING_DEADLINE
79
from taskbroker_client.imports import import_string
810
from taskbroker_client.metrics import MetricsBackend
9-
from taskbroker_client.registry import TaskRegistry
11+
from taskbroker_client.registry import TaskNamespace, TaskRegistry
12+
from taskbroker_client.retry import Retry
1013
from taskbroker_client.router import TaskRouter
14+
from taskbroker_client.task import Task
1115
from taskbroker_client.types import AtMostOnceStore, ProducerFactory
1216

1317

@@ -72,6 +76,39 @@ def set_config(self, config: dict[str, Any]) -> None:
7276
if key in self._config:
7377
self._config[key] = value
7478

79+
def create_namespace(
80+
self,
81+
name: str,
82+
*,
83+
retry: Retry | None = None,
84+
expires: int | datetime.timedelta | None = None,
85+
processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE,
86+
app_feature: str | None = None,
87+
) -> TaskNamespace:
88+
"""
89+
Create a task namespace.
90+
91+
Namespaces are mapped onto topics through the configured router allowing
92+
infrastructure to be scaled based on a region's requirements.
93+
94+
Namespaces can define default behavior for tasks defined within a namespace.
95+
"""
96+
return self._taskregistry.create_namespace(
97+
name=name,
98+
retry=retry,
99+
expires=expires,
100+
processing_deadline_duration=processing_deadline_duration,
101+
app_feature=app_feature,
102+
)
103+
104+
def get_task(self, namespace: str, task: str) -> Task[Any, Any]:
105+
"""Fetch a task by namespace and name."""
106+
return self._taskregistry.get(namespace).get(task)
107+
108+
def get_namespace(self, namespace: str) -> TaskNamespace:
109+
"""Fetch a task by namespace and name."""
110+
return self._taskregistry.get(namespace)
111+
75112
def set_modules(self, modules: Iterable[str]) -> None:
76113
"""
77114
Set the list of modules containing tasks to be loaded by workers and schedulers.

clients/python/src/taskbroker_client/scheduler/runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def add(self, key: str, task_config: ScheduleConfig) -> None:
195195
except ValueError:
196196
raise ValueError("Invalid task name. Must be in the format namespace:taskname")
197197

198-
task = self._app.taskregistry.get_task(namespace, taskname)
198+
task = self._app.get_task(namespace, taskname)
199199
entry = ScheduleEntry(key=key, task=task, schedule=task_config["schedule"])
200200
self._entries.append(entry)
201201
self._heap = []

clients/python/src/taskbroker_client/worker/workerchild.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,18 @@ def child_process(
101101
"""
102102
app = import_app(app_module)
103103
app.load_modules()
104-
taskregistry = app.taskregistry
105104
metrics = app.metrics
106105

107106
def _get_known_task(activation: TaskActivation) -> Task[Any, Any] | None:
108-
if not taskregistry.contains(activation.namespace):
107+
try:
108+
namespace = app.get_namespace(activation.namespace)
109+
except KeyError:
109110
logger.error(
110111
"taskworker.invalid_namespace",
111112
extra={"namespace": activation.namespace, "taskname": activation.taskname},
112113
)
113114
return None
114115

115-
namespace = taskregistry.get(activation.namespace)
116116
if not namespace.contains(activation.taskname):
117117
logger.error(
118118
"taskworker.invalid_taskname",
@@ -421,7 +421,7 @@ def record_task_execution(
421421
},
422422
)
423423

424-
namespace = taskregistry.get(activation.namespace)
424+
namespace = app.get_namespace(activation.namespace)
425425
metrics.incr(
426426
"taskworker.cogs.usage",
427427
value=int(execution_duration * 1000),

clients/python/tests/test_app.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import pytest
12
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation
23

34
from taskbroker_client.app import TaskbrokerApp
5+
from taskbroker_client.retry import Retry
46
from taskbroker_client.router import TaskRouter
7+
from taskbroker_client.task import Task
58

69
from .conftest import StubAtMostOnce, producer_factory
710

@@ -47,3 +50,50 @@ def test_should_attempt_at_most_once() -> None:
4750
app.at_most_once_store(at_most)
4851
assert app.should_attempt_at_most_once(activation)
4952
assert not app.should_attempt_at_most_once(activation)
53+
54+
55+
def test_create_namespace() -> None:
56+
app = TaskbrokerApp(name="acme", producer_factory=producer_factory, router_class=StubRouter())
57+
ns = app.create_namespace("test")
58+
assert ns.name == "test"
59+
assert ns.topic == "honk"
60+
61+
retry = Retry(times=3)
62+
ns = app.create_namespace(
63+
"test-two",
64+
retry=retry,
65+
expires=60 * 10,
66+
processing_deadline_duration=60,
67+
app_feature="anvils",
68+
)
69+
assert ns.default_retry == retry
70+
assert ns.default_processing_deadline_duration == 60
71+
assert ns.default_expires == 60 * 10
72+
assert ns.name == "test-two"
73+
assert ns.application == "acme"
74+
assert ns.topic == "honk"
75+
assert ns.app_feature == "anvils"
76+
77+
fetched = app.get_namespace("test-two")
78+
assert fetched == ns
79+
80+
with pytest.raises(KeyError):
81+
app.get_namespace("invalid")
82+
83+
84+
def test_get_task() -> None:
85+
app = TaskbrokerApp(name="acme", producer_factory=producer_factory, router_class=StubRouter())
86+
ns = app.create_namespace(name="tests")
87+
88+
@ns.register(name="test.simpletask")
89+
def simple_task() -> None:
90+
raise NotImplementedError
91+
92+
task = app.get_task(ns.name, "test.simpletask")
93+
assert isinstance(task, Task)
94+
95+
with pytest.raises(KeyError):
96+
app.get_task("nope", "test.simpletask")
97+
98+
with pytest.raises(KeyError):
99+
app.get_task(ns.name, "nope")

0 commit comments

Comments
 (0)