Skip to content

Commit 7f0e6ad

Browse files
authored
feat: Encoding (#143)
* feat: Add encoding types * feat: Add encoding logic to publish_task * chore: Bump Python SDK version * feat: Modify models to be explicit about task input being raw * fix: Refactor encoding base types to be classes instead of just functions * feat: Add Passthrough encoders * feat: Add Pydantic encoders * chore: Update encoder exports * feat: Improve worker handler structure, typing, and add encoding * test: Add encoder tests * test: Update tests to account for new encoding ergonomics * fix: Fix poor type handling in Avro and Pydantic task update schemas * chore: Bump package version * chore: Sync updated Avro schemas * test: Update e2e tests to account for new encoders * fix: Fix bug causing task update data to be possibly mixed up and corrupted by adding a unique constant identifier that gets validated on ingestion * chore: Bump cargo version * chore: Bump all versions to 0.4.0 * docs: Update docs to account for new changes * chore: Fix versions for alpha release * feat: Add custom encoding error * feat: Add JSON dictionary encoders * feat: Add improved error handling to Pydantic encoders * feat: Add string encoders * feat: Add polymorphic encoder factory so we can adapt to the most common types * docs: Update task documentation to use polymorphic encoder * test: Remove explicit encoders from e2e tests * test: Add tests for all new encoders * test: Create a new task for user-friendly error * feat: Add new type-hinting based encoding logic to publisher and worker * feat: Export all default encoders in Taco * docs: Remove most explicit encoding from setup page * Update .gitignore
1 parent c5a0c1a commit 7f0e6ad

44 files changed

Lines changed: 1332 additions & 289 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,5 @@
1515
.benchmarks/
1616
__pycache__/
1717
/client_sdks/python/tacoq.egg-info
18+
.vscode/settings.json
19+
client_sdks/python/.vscode/settings.json

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client_sdks/python/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "tacoq"
3-
version = "0.3.0b"
3+
version = "0.4.0-alpha.1"
44
description = "Python SDK to interact with the TacoQ task queue."
55
readme = "README.md"
66
authors = [
@@ -22,7 +22,6 @@ dependencies = [
2222
"fastavro>=1.10.0",
2323
"opentelemetry-api>=1.30.0",
2424
"pydantic>=2.10.5",
25-
"pytest-timeout>=2.3.1",
2625
"tenacity>=9.0.0",
2726
"uuid>=1.30",
2827
"watchfiles>=1.0.3",
@@ -61,6 +60,7 @@ dev = [
6160
"pytest>=8.3.5",
6261
"pytest-asyncio>=0.25.3",
6362
"pytest-xdist>=3.6.1",
63+
"pytest-timeout>=2.3.1",
6464
"opentelemetry-sdk>=1.30.0",
6565
"opentelemetry-exporter-otlp>=1.30.0",
6666
]
Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
1+
from tacoq.core.encoding import (
2+
Data,
3+
Decoder,
4+
Encoder,
5+
PassthroughDecoder,
6+
PassthroughEncoder,
7+
PydanticDecoder,
8+
PydanticEncoder,
9+
StringDecoder,
10+
StringEncoder,
11+
JsonDictDecoder,
12+
JsonDictEncoder,
13+
create_encoder,
14+
create_decoder,
15+
)
116
from tacoq.core.infra.broker import BrokerConfig
2-
from tacoq.relay import RelayConfig, RelayClient, RelayStates
3-
from tacoq.core.models import Task, TaskInput, TaskOutput
4-
from tacoq.core.telemetry import TracerManager, LoggerManager
17+
from tacoq.core.models import Task, TaskRawInput, TaskRawOutput
18+
from tacoq.core.telemetry import LoggerManager, TracerManager
519
from tacoq.publisher import PublisherClient
20+
from tacoq.relay import RelayClient, RelayConfig, RelayStates
621
from tacoq.worker import WorkerApplication, WorkerApplicationConfig
722

823
__all__ = [
@@ -14,8 +29,21 @@
1429
"WorkerApplication",
1530
"WorkerApplicationConfig",
1631
"Task",
17-
"TaskInput",
18-
"TaskOutput",
32+
"TaskRawInput",
33+
"TaskRawOutput",
1934
"TracerManager",
2035
"LoggerManager",
36+
"PydanticEncoder",
37+
"PydanticDecoder",
38+
"PassthroughEncoder",
39+
"PassthroughDecoder",
40+
"Encoder",
41+
"Decoder",
42+
"Data",
43+
"StringEncoder",
44+
"StringDecoder",
45+
"JsonDictEncoder",
46+
"JsonDictDecoder",
47+
"create_encoder",
48+
"create_decoder",
2149
]
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from tacoq.core.encoding.models import (
2+
Encoder,
3+
Decoder,
4+
EncodingError,
5+
Data,
6+
)
7+
8+
from tacoq.core.encoding.pydantic import PydanticDecoder, PydanticEncoder
9+
from tacoq.core.encoding.passthrough import (
10+
PassthroughDecoder,
11+
PassthroughEncoder,
12+
)
13+
from tacoq.core.encoding.json_dict import JsonDictDecoder, JsonDictEncoder
14+
from tacoq.core.encoding.string import StringDecoder, StringEncoder
15+
from tacoq.core.encoding.polymorphic import create_encoder, create_decoder
16+
17+
__all__ = [
18+
"EncodingError",
19+
"Encoder",
20+
"Decoder",
21+
"Data",
22+
"create_encoder",
23+
"create_decoder",
24+
"PydanticDecoder",
25+
"PydanticEncoder",
26+
"PassthroughDecoder",
27+
"PassthroughEncoder",
28+
"JsonDictDecoder",
29+
"JsonDictEncoder",
30+
"StringDecoder",
31+
"StringEncoder",
32+
]
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
"""Encoders/Decoders that translate between Python dictionaries and JSON bytes."""
2+
3+
import json
4+
from typing import Dict, Any
5+
from tacoq.core.encoding.models import Encoder, Decoder, EncodingError
6+
7+
8+
class JsonDictEncoder(Encoder[Dict[str, Any]]):
9+
"""Encodes Python dictionaries to JSON bytes.
10+
11+
### Usage:
12+
```python
13+
from tacoq.core.encoding import JsonDictEncoder
14+
15+
encoder = JsonDictEncoder()
16+
encoded_data = encoder.encode({"name": "John", "age": 30})
17+
```
18+
"""
19+
20+
def encode(self, data: Dict[str, Any]) -> bytes:
21+
"""Encodes a dictionary to JSON bytes.
22+
23+
### Arguments:
24+
- data: The dictionary to encode
25+
26+
### Returns:
27+
The encoded JSON bytes
28+
29+
### Raises:
30+
- EncodingError: If there's an error encoding the message
31+
"""
32+
try:
33+
return json.dumps(data).encode("utf-8")
34+
except Exception as e:
35+
raise EncodingError(f"Error encoding message: {str(e)}")
36+
37+
38+
class JsonDictDecoder(Decoder[Dict[str, Any]]):
39+
"""Decodes JSON bytes to Python dictionaries.
40+
41+
### Usage:
42+
```python
43+
from tacoq.core.encoding import JsonDictDecoder
44+
45+
decoder = JsonDictDecoder()
46+
decoded_data = decoder.decode(b'{"name": "John", "age": 30}')
47+
```
48+
"""
49+
50+
def decode(self, data: bytes) -> Dict[str, Any]:
51+
"""Decodes JSON bytes to a dictionary.
52+
53+
### Arguments:
54+
- data: The JSON bytes to decode
55+
56+
### Returns:
57+
The decoded dictionary
58+
59+
### Raises:
60+
- EncodingError: If there's an error decoding the message
61+
"""
62+
try:
63+
return json.loads(data.decode("utf-8"))
64+
except Exception as e:
65+
raise EncodingError(f"Error decoding message: {str(e)}")
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Interfaces for byte encoders/decoders."""
2+
3+
from typing import Generic, TypeVar
4+
5+
Data = TypeVar("Data")
6+
""" The type of the data to encode/decode. """
7+
8+
9+
class Encoder(Generic[Data]):
10+
"""An encoder for a specific type of data."""
11+
12+
def encode(self, data: Data) -> bytes:
13+
"""Encodes an object from the output of the task into bytes."""
14+
...
15+
16+
17+
class Decoder(Generic[Data]):
18+
"""A decoder for a specific type of data."""
19+
20+
def decode(self, data: bytes) -> Data:
21+
"""Decodes an object from bytes into the input of the task."""
22+
...
23+
24+
25+
class EncodingError(Exception):
26+
"""Raised when there's an error encoding or decoding a message."""
27+
28+
pass
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""Generic passthrough decoders that you can use if you need your encoding
2+
logic to be more complex than what can fit within the standard `Encoder`
3+
and `Decoder` interfaces.
4+
"""
5+
6+
from tacoq.core.encoding.models import Encoder, Decoder
7+
8+
9+
class PassthroughEncoder(Encoder[bytes]):
10+
"""Passthrough encoder that returns the input bytes unchanged.
11+
12+
### Usage:
13+
```python
14+
from tacoq.core.encoding import PassthroughEncoder
15+
16+
encoder = PassthroughEncoder()
17+
encoded_data = encoder.encode(b"Hello, world!")
18+
# encoded_data will be b"Hello, world!"
19+
```
20+
"""
21+
22+
def encode(self, data: bytes) -> bytes:
23+
"""Passthrough encoder that returns the input bytes unchanged.
24+
25+
### Arguments:
26+
- data: The bytes to encode
27+
28+
### Returns:
29+
The same bytes unchanged
30+
"""
31+
return data
32+
33+
34+
class PassthroughDecoder(Decoder[bytes]):
35+
"""Passthrough decoder that returns the input bytes unchanged.
36+
37+
### Usage:
38+
```python
39+
from tacoq.core.encoding import PassthroughDecoder
40+
41+
decoder = PassthroughDecoder()
42+
decoded_data = decoder.decode(b"Hello, world!")
43+
# decoded_data will be b"Hello, world!"
44+
```
45+
"""
46+
47+
def decode(self, data: bytes) -> bytes:
48+
"""Passthrough decoder that returns the input bytes unchanged.
49+
50+
### Arguments:
51+
- data: The bytes to decode
52+
53+
### Returns:
54+
The same bytes unchanged
55+
"""
56+
return data
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""Factory functions for creating encoders and decoders based on type."""
2+
3+
from inspect import isclass
4+
from typing import Any, Type, TypeVar, Union, cast, get_origin
5+
6+
from pydantic import BaseModel
7+
from tacoq.core.encoding.json_dict import JsonDictDecoder, JsonDictEncoder
8+
from tacoq.core.encoding.models import Decoder, Encoder, EncodingError
9+
from tacoq.core.encoding.passthrough import PassthroughDecoder, PassthroughEncoder
10+
from tacoq.core.encoding.pydantic import PydanticDecoder, PydanticEncoder
11+
from tacoq.core.encoding.string import StringDecoder, StringEncoder
12+
13+
SUPPORTED_DATA_TYPES = (bytes, str, dict, list, BaseModel)
14+
15+
T = TypeVar("T", bound=Union[bytes, str, dict[str, Any], list[Any], BaseModel])
16+
17+
18+
def create_encoder(data_type: Type[T]) -> Encoder[T]:
19+
"""Creates an appropriate encoder based on the input type.
20+
21+
### Arguments:
22+
- data_type: The type to create an encoder for
23+
24+
### Returns:
25+
An encoder instance appropriate for the given type
26+
27+
### Raises:
28+
- EncodingError: If no suitable encoder is found for the type
29+
"""
30+
31+
if data_type is bytes:
32+
return cast(Encoder[T], PassthroughEncoder())
33+
elif data_type is str:
34+
return cast(Encoder[T], StringEncoder())
35+
elif data_type is dict or get_origin(data_type) is dict:
36+
return cast(Encoder[T], JsonDictEncoder())
37+
elif isclass(data_type) and issubclass(data_type, BaseModel):
38+
return cast(Encoder[T], PydanticEncoder())
39+
else:
40+
raise EncodingError(
41+
f"No encoder found for type {data_type}. Available encoder types: {SUPPORTED_DATA_TYPES}. If yours isn't one of these, perhaps you should implement your own encoder?"
42+
)
43+
44+
45+
def create_decoder(data_type: Type[T]) -> Decoder[T]:
46+
"""Creates an appropriate decoder based on the input type.
47+
48+
### Arguments:
49+
- data_type: The type to create a decoder for
50+
51+
### Returns:
52+
A decoder instance appropriate for the given type
53+
54+
### Raises:
55+
- EncodingError: If no suitable decoder is found for the type
56+
"""
57+
58+
if data_type is bytes:
59+
return cast(Decoder[T], PassthroughDecoder())
60+
elif data_type is str:
61+
return cast(Decoder[T], StringDecoder())
62+
elif data_type is dict or get_origin(data_type) is dict:
63+
return cast(Decoder[T], JsonDictDecoder())
64+
elif issubclass(data_type, BaseModel):
65+
return cast(Decoder[T], PydanticDecoder(model=data_type))
66+
else:
67+
raise EncodingError(
68+
f"No decoder found for type {data_type}. Available decoder types: {SUPPORTED_DATA_TYPES}. If yours isn't one of these, perhaps you should implement your own decoder?"
69+
)

0 commit comments

Comments
 (0)