Open
Description
I'm tinkering with an automatic Pydantic (de)serialization integration. Here is what I have so far:
import dataclasses
from typing import Any
import dramatiq
from dramatiq.results import Results
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, validate_call
def pydantic_convert(data, type_annotation):
"""Use Pydantic to convert `data` to the type specified by `type_annotation`."""
return pydantic_task.create_model("", x=(type_annotation, ...))(x=data).x # type: ignore
@dataclasses.dataclass(frozen=True)
class PydanticMessage[R](dramatiq.Message[R]):
"""Use Pydantic to convert the task result to the actor's return annotation type."""
def get_result(self, *args, **kwargs) -> R:
res = super().get_result(*args, **kwargs)
actor_fn = broker.get_actor(self.actor_name).fn
return pydantic_convert(res, actor_fn.__annotations__["return"])
class PydanticActor[R](dramatiq.Actor[..., R]):
def message_with_options(self, *, args: tuple = (), kwargs: dict[str, Any] | None = None, **options) -> PydanticMessage[R]:
message = super().message_with_options(args=jsonable_encoder(args), kwargs=jsonable_encoder(kwargs), **options)
return PydanticMessage(**message.asdict())
def pydantic_task():
def decorator(func):
validated_func = validate_call(validate_return=True)(func)
return dramatiq.actor(actor_class=PydanticActor)(validated_func)
return decorator
##########
class X(BaseModel):
x: int
@pydantic_task()
def example(x: X) -> X:
return X(x=x.x + 1)
- It needs access to the
broker
instance to fetch the actor function's return type - It needs a custom
Message
class to makeget_result
convert the result to the actor function's return type
Metadata
Assignees
Labels
No labels