Skip to content

Commit 6637d7f

Browse files
untitakerclaude
andauthored
feat(taskbroker): Dual-write new parameters_bytes (#602)
* feat(taskbroker): Dual-write parameters_bytes (msgpack) alongside parameters (JSON) Introduces the new parameters_bytes field on TaskActivation so tasks can carry raw bytes via msgpack, and adds the worker-side reader that prefers parameters_bytes with a fallback to the legacy JSON parameters field. The producer dual-writes both fields by default so this rolls out in a single commit regardless of worker/broker upgrade order. The TASKBROKER_CLIENT_PARAMETERS_FORMAT env var (both|json|msgpack) narrows this once everything is on the new reader. First step toward STREAM-882 (taskbroker passthrough mode for arbitrary Kafka topics): https://linear.app/getsentry/issue/STREAM-882 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * ci: Satisfy clippy, black, and mypy on dual-write branch - upkeep.rs: move #[allow(deprecated)] from individual rstest-expanded tests (where it gets swallowed by the macro expansion) to the test module as a whole. - task.py: black formatting. - pyproject.toml: tell mypy to ignore missing msgpack stubs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * ref(upkeep tests): Scope allow(deprecated) to the specific lines Replaces the module-wide allow with three narrow block-scoped allows exactly where the tests touch the legacy parameters field, so we keep deprecation warnings active for everything else in the test module. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * switch to sum of both --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9543ea6 commit 6637d7f

14 files changed

Lines changed: 266 additions & 58 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ sentry = { version = "0.41.0", default-features = false, features = [
4242
"tracing",
4343
"logs"
4444
] }
45-
sentry_protos = "0.8.5"
45+
sentry_protos = "0.8.13"
4646
serde = "1.0.214"
4747
serde_yaml = "0.9.34"
4848
sha2 = "0.10.8"

clients/python/pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ readme = "README.md"
66
dependencies = [
77
"sentry-arroyo>=2.38.7",
88
"sentry-sdk[http2]>=2.43.0",
9-
"sentry-protos>=0.8.5",
9+
"sentry-protos>=0.8.13",
1010
"confluent_kafka>=2.3.0",
1111
"cronsim>=2.6",
1212
"grpcio>=1.67.0",
13+
"msgpack>=1.0.0",
1314
"orjson>=3.10.10",
1415
"protobuf>=5.28.3",
1516
"redis>=3.4.1",

clients/python/src/taskbroker_client/constants.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,16 @@ class CompressionType(Enum):
6666

6767
ZSTD = "zstd"
6868
PLAINTEXT = "plaintext"
69+
70+
71+
class ParametersFormat(Enum):
72+
"""
73+
How the producer populates the legacy `parameters` (JSON) and new
74+
`parameters_bytes` (msgpack) fields on TaskActivation.
75+
76+
Set via env var `TASKBROKER_CLIENT_PARAMETERS_FORMAT`. Default BOTH.
77+
"""
78+
79+
BOTH = "both"
80+
JSON = "json"
81+
MSGPACK = "msgpack"

clients/python/src/taskbroker_client/task.py

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22

33
import base64
44
import datetime
5+
import os
56
import time
67
from collections.abc import Callable, Collection, Mapping, MutableMapping
78
from functools import update_wrapper
89
from typing import TYPE_CHECKING, Any, Generic, ParamSpec, TypeVar
910
from uuid import uuid4
1011

12+
import msgpack
1113
import orjson
1214
import sentry_sdk
1315
import zstandard as zstd
@@ -22,9 +24,22 @@
2224
DEFAULT_PROCESSING_DEADLINE,
2325
MAX_PARAMETER_BYTES_BEFORE_COMPRESSION,
2426
CompressionType,
27+
ParametersFormat,
2528
)
2629
from taskbroker_client.retry import Retry
2730

31+
32+
def _get_parameters_format() -> ParametersFormat:
33+
raw = os.environ.get("TASKBROKER_CLIENT_PARAMETERS_FORMAT", ParametersFormat.BOTH.value)
34+
try:
35+
return ParametersFormat(raw.lower())
36+
except ValueError:
37+
raise ValueError(
38+
f"Invalid TASKBROKER_CLIENT_PARAMETERS_FORMAT={raw!r}. "
39+
f"Expected one of: {', '.join(f.value for f in ParametersFormat)}"
40+
)
41+
42+
2843
if TYPE_CHECKING:
2944
from taskbroker_client.registry import TaskNamespace
3045

@@ -197,37 +212,58 @@ def create_activation(
197212
f"The `{key}` header value is of type {type(value)}"
198213
)
199214

200-
parameters_json = orjson.dumps({"args": args, "kwargs": kwargs})
201-
if (
202-
len(parameters_json) > MAX_PARAMETER_BYTES_BEFORE_COMPRESSION
203-
or self.compression_type == CompressionType.ZSTD
204-
):
205-
# Worker uses this header to determine if the parameters are decompressed
215+
parameters_format = _get_parameters_format()
216+
data = {"args": args, "kwargs": kwargs}
217+
218+
msgpack_bytes = (
219+
msgpack.packb(data, use_bin_type=True)
220+
if parameters_format in (ParametersFormat.BOTH, ParametersFormat.MSGPACK)
221+
else b""
222+
)
223+
# JSON can't encode some values msgpack can (e.g. raw bytes). In
224+
# JSON-only mode we surface the TypeError; in BOTH mode we silently
225+
# skip the legacy field so msgpack-aware workers can still run.
226+
json_bytes: bytes | None = None
227+
if parameters_format in (ParametersFormat.BOTH, ParametersFormat.JSON):
228+
try:
229+
json_bytes = orjson.dumps(data)
230+
except TypeError:
231+
if parameters_format == ParametersFormat.JSON:
232+
raise
233+
234+
should_compress = (
235+
self.compression_type == CompressionType.ZSTD
236+
or (len(msgpack_bytes) + len(json_bytes or b""))
237+
> MAX_PARAMETER_BYTES_BEFORE_COMPRESSION
238+
)
239+
240+
if should_compress:
206241
headers["compression-type"] = CompressionType.ZSTD.value
207242
start_time = time.perf_counter()
208-
parameters_str = base64.b64encode(zstd.compress(parameters_json)).decode("utf8")
209-
end_time = time.perf_counter()
243+
parameters_bytes_val = zstd.compress(msgpack_bytes) if msgpack_bytes else b""
244+
parameters_str = (
245+
base64.b64encode(zstd.compress(json_bytes)).decode("utf8") if json_bytes else ""
246+
)
247+
elapsed = time.perf_counter() - start_time
210248

249+
metric_tags = {
250+
"namespace": self._namespace.name,
251+
"taskname": self.name,
252+
"topic": self._namespace.topic,
253+
}
211254
self.namespace.metrics.distribution(
212255
"taskworker.producer.compressed_parameters_size",
213-
len(parameters_str),
214-
tags={
215-
"namespace": self._namespace.name,
216-
"taskname": self.name,
217-
"topic": self._namespace.topic,
218-
},
256+
len(parameters_bytes_val) or len(parameters_str),
257+
tags=metric_tags,
219258
)
220259
self.namespace.metrics.distribution(
221260
"taskworker.producer.compression_time",
222-
end_time - start_time,
223-
tags={
224-
"namespace": self._namespace.name,
225-
"taskname": self.name,
226-
"topic": self._namespace.topic,
227-
},
261+
elapsed,
262+
tags=metric_tags,
228263
)
229264
else:
230-
parameters_str = parameters_json.decode("utf8")
265+
parameters_bytes_val = msgpack_bytes
266+
parameters_str = json_bytes.decode("utf8") if json_bytes else ""
231267

232268
return TaskActivation(
233269
id=uuid4().hex,
@@ -236,6 +272,7 @@ def create_activation(
236272
taskname=self.name,
237273
headers=headers,
238274
parameters=parameters_str,
275+
parameters_bytes=parameters_bytes_val,
239276
retry_state=self._create_retry_state(),
240277
received_at=received_at,
241278
processing_deadline_duration=processing_deadline,

clients/python/src/taskbroker_client/worker/workerchild.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import Any
1313

1414
# XXX: Don't import any modules that will import django here, do those within child_process
15+
import msgpack
1516
import orjson
1617
import sentry_sdk
1718
import zstandard as zstd
@@ -59,17 +60,28 @@ def timeout_alarm(
5960
signal.signal(signal.SIGALRM, original)
6061

6162

62-
def load_parameters(data: str, headers: dict[str, str]) -> dict[str, Any]:
63+
def load_parameters(activation: TaskActivation) -> dict[str, Any]:
64+
headers = dict(activation.headers)
6365
compression_type = headers.get("compression-type", None)
66+
67+
# Prefer new msgpack field
68+
if activation.parameters_bytes:
69+
data = activation.parameters_bytes
70+
if compression_type == CompressionType.ZSTD.value:
71+
data = zstd.decompress(data)
72+
return msgpack.unpackb(data, raw=False)
73+
74+
# Legacy JSON fallback
75+
data_str = activation.parameters
6476
if not compression_type or compression_type == CompressionType.PLAINTEXT.value:
65-
return orjson.loads(data)
77+
return orjson.loads(data_str)
6678
elif compression_type == CompressionType.ZSTD.value:
67-
return orjson.loads(zstd.decompress(base64.b64decode(data)))
79+
return orjson.loads(zstd.decompress(base64.b64decode(data_str)))
6880
else:
6981
logger.error(
7082
"Unsupported compression type: %s. Continuing with plaintext.", compression_type
7183
)
72-
return orjson.loads(data)
84+
return orjson.loads(data_str)
7385

7486

7587
def status_name(status: TaskActivationStatus.ValueType) -> str:
@@ -314,12 +326,12 @@ def _execute_activation(
314326
context_hooks: Sequence[ContextHook] = (),
315327
) -> None:
316328
"""Invoke a task function with the activation parameters."""
317-
headers = {k: v for k, v in activation.headers.items()}
318-
parameters = load_parameters(activation.parameters, headers)
329+
parameters = load_parameters(activation)
319330

320331
args = parameters.get("args", [])
321332
kwargs = parameters.get("kwargs", {})
322333

334+
headers = dict(activation.headers)
323335
transaction = sentry_sdk.continue_trace(
324336
environ_or_headers=headers,
325337
op="queue.task.taskworker",

clients/python/tests/test_registry.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from concurrent.futures import Future
33
from unittest.mock import Mock
44

5+
import msgpack
56
import orjson
67
import pytest
78
import zstandard as zstd
@@ -174,12 +175,14 @@ def simple_task_with_compression(param: str) -> None:
174175

175176
expected_params = {"args": ["test_arg"], "kwargs": {"test_key": "test_value"}}
176177

177-
decoded_data = base64.b64decode(activation.parameters.encode("utf-8"))
178-
decompressed_data = zstd.decompress(decoded_data)
179-
actual_params = orjson.loads(decompressed_data)
178+
decompressed_data = zstd.decompress(activation.parameters_bytes)
179+
actual_params = msgpack.unpackb(decompressed_data, raw=False)
180180

181181
assert actual_params == expected_params
182182

183+
legacy_decompressed = zstd.decompress(base64.b64decode(activation.parameters.encode("utf-8")))
184+
assert orjson.loads(legacy_decompressed) == expected_params
185+
183186

184187
def test_namespace_send_task_with_auto_compression() -> None:
185188
namespace = TaskNamespace(
@@ -204,12 +207,14 @@ def simple_task_with_compression(param: str) -> None:
204207

205208
expected_params = {"args": big_args, "kwargs": {"test_key": "test_value"}}
206209

207-
decoded_data = base64.b64decode(activation.parameters.encode("utf-8"))
208-
decompressed_data = zstd.decompress(decoded_data)
209-
actual_params = orjson.loads(decompressed_data)
210+
decompressed_data = zstd.decompress(activation.parameters_bytes)
211+
actual_params = msgpack.unpackb(decompressed_data, raw=False)
210212

211213
assert actual_params == expected_params
212214

215+
legacy_decompressed = zstd.decompress(base64.b64decode(activation.parameters.encode("utf-8")))
216+
assert orjson.loads(legacy_decompressed) == expected_params
217+
213218

214219
def test_namespace_send_task_with_retry() -> None:
215220
namespace = TaskNamespace(

clients/python/tests/test_task.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Any
55
from unittest.mock import patch
66

7+
import msgpack
78
import orjson
89
import pytest
910
import sentry_sdk
@@ -78,9 +79,9 @@ def test_func(*args: Any, **kwargs: Any) -> None:
7879

7980
activation = call_params.args[0]
8081
assert activation.expires == 10
81-
assert activation.parameters == orjson.dumps(
82-
{"args": ["arg2"], "kwargs": {"org_id": 2}}
83-
).decode("utf-8")
82+
expected_params = {"args": ["arg2"], "kwargs": {"org_id": 2}}
83+
assert msgpack.unpackb(activation.parameters_bytes, raw=False) == expected_params
84+
assert orjson.loads(activation.parameters) == expected_params
8485

8586

8687
def test_apply_async_countdown(task_namespace: TaskNamespace) -> None:
@@ -99,9 +100,9 @@ def test_func(*args: Any, **kwargs: Any) -> None:
99100

100101
activation = call_params.args[0]
101102
assert activation.delay == 600
102-
assert activation.parameters == orjson.dumps(
103-
{"args": ["arg2"], "kwargs": {"org_id": 2}}
104-
).decode("utf-8")
103+
expected_params = {"args": ["arg2"], "kwargs": {"org_id": 2}}
104+
assert msgpack.unpackb(activation.parameters_bytes, raw=False) == expected_params
105+
assert orjson.loads(activation.parameters) == expected_params
105106

106107

107108
def test_delay_immediate_mode(task_namespace: TaskNamespace) -> None:
@@ -268,11 +269,14 @@ def with_parameters(one: str, two: int, org_id: int) -> None:
268269
raise NotImplementedError
269270

270271
activation = with_parameters.create_activation(["one", 22], {"org_id": 99})
271-
params = orjson.loads(activation.parameters)
272+
params = msgpack.unpackb(activation.parameters_bytes, raw=False)
272273
assert params["args"]
273274
assert params["args"] == ["one", 22]
274275
assert params["kwargs"] == {"org_id": 99}
275276

277+
json_params = orjson.loads(activation.parameters)
278+
assert json_params == params
279+
276280

277281
def test_create_activation_tracing(task_namespace: TaskNamespace) -> None:
278282
@task_namespace.register(name="test.parameters")

0 commit comments

Comments
 (0)