-
Notifications
You must be signed in to change notification settings - Fork 302
docs: add Python data handling section #4378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3c1d108
11a60c8
e67ca99
ac57399
66c6fd1
7e04f12
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| --- | ||
| id: data-conversion | ||
| title: Payload conversion - Python SDK | ||
| sidebar_label: Payload conversion | ||
| slug: /develop/python/data-handling/data-conversion | ||
| toc_max_heading_level: 2 | ||
| tags: | ||
| - Data Converters | ||
| - Python SDK | ||
| - Temporal SDKs | ||
| description: Customize how Temporal serializes application objects using Payload Converters in the Python SDK, including Pydantic and custom type examples. | ||
| --- | ||
|
|
||
| Payload Converters serialize your application objects into a `Payload` and deserialize them back. | ||
| A `Payload` is a binary form with metadata that Temporal uses to transport data. | ||
|
|
||
| By default, Temporal uses a `DefaultPayloadConverter` that handles `None`, `bytes`, protobuf messages, and anything JSON-serializable. | ||
| You only need a custom Payload Converter when your application uses types that aren't natively supported. | ||
|
|
||
| ## Default supported types | ||
|
|
||
| The default Data Converter supports converting multiple types including: | ||
|
|
||
| - `None` | ||
| - `bytes` | ||
| - `google.protobuf.message.Message` — As JSON when encoding, but can decode binary proto from other languages | ||
| - Anything that can be converted to JSON including: | ||
| - Anything that [`json.dump`](https://docs.python.org/3/library/json.html#json.dump) supports natively | ||
| - [dataclasses](https://docs.python.org/3/library/dataclasses.html) | ||
| - Iterables including ones JSON dump may not support by default, e.g. `set` | ||
| - [IntEnum, StrEnum](https://docs.python.org/3/library/enum.html) based enumerates | ||
| - [UUID](https://docs.python.org/3/library/uuid.html) | ||
|
|
||
| Although Workflows, Updates, Signals, and Queries can all be defined with multiple input parameters, users are strongly | ||
| encouraged to use a single `dataclass` or Pydantic model parameter so that fields with defaults can be easily added | ||
| without breaking compatibility. | ||
| Similar advice applies to return values. | ||
|
|
||
| Classes with generics may not have the generics properly resolved. | ||
| The current implementation does not have generic type resolution. | ||
| Users should use concrete types. | ||
|
|
||
| ## Use Pydantic models | ||
|
|
||
| To use Pydantic model instances, install Pydantic and set the Pydantic Data Converter when creating Client instances: | ||
|
|
||
| ```python | ||
| from temporalio.contrib.pydantic import pydantic_data_converter | ||
|
|
||
| client = Client(data_converter=pydantic_data_converter, ...) | ||
| ``` | ||
|
|
||
| This Data Converter supports conversion of all [types supported by Pydantic](https://docs.pydantic.dev/latest/api/standard_library_types/) to and from JSON. | ||
| In addition to Pydantic models, supported types include: | ||
|
|
||
| - Everything that [`json.dumps()`](https://docs.python.org/3/library/json.html#py-to-json-table) supports by default. | ||
| - Several standard library types that `json.dumps()` does not support, including dataclasses, types from the datetime module, sets, UUID, etc. | ||
| - Custom types composed of any of these, with any degree of nesting. | ||
| For example, a list of Pydantic models with `datetime` fields. | ||
|
|
||
| See the [Pydantic documentation](https://docs.pydantic.dev/latest/api/standard_library_types/) for full details. | ||
|
|
||
| :::note | ||
|
|
||
| Pydantic v1 isn't supported by this Data Converter. | ||
| If you aren't yet able to upgrade from Pydantic v1, see https://github.com/temporalio/samples-python/tree/main/pydantic_converter/v1 for limited v1 support. | ||
|
|
||
| ::: | ||
|
|
||
| `datetime.date`, `datetime.time`, and `datetime.datetime` can only be used with the Pydantic Data Converter. | ||
|
|
||
| ## How the default converter works | ||
|
|
||
| The default converter is a `CompositePayloadConverter` that tries each encoding converter in order until one handles the value. | ||
| Upon serialization, each `EncodingPayloadConverter` is used in order until one succeeds. | ||
|
|
||
| Payload Converters can be customized independently of a Payload Codec. | ||
|
|
||
| ## Custom Payload Converters | ||
|
|
||
| To handle custom data types, create a new `EncodingPayloadConverter`. | ||
| For example, to support `IPv4Address` types: | ||
|
|
||
| ```python | ||
| class IPv4AddressEncodingPayloadConverter(EncodingPayloadConverter): | ||
| @property | ||
| def encoding(self) -> str: | ||
| return "text/ipv4-address" | ||
|
|
||
| def to_payload(self, value: Any) -> Optional[Payload]: | ||
| if isinstance(value, ipaddress.IPv4Address): | ||
| return Payload( | ||
| metadata={"encoding": self.encoding.encode()}, | ||
| data=str(value).encode(), | ||
| ) | ||
| else: | ||
| return None | ||
|
|
||
| def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any: | ||
| assert not type_hint or type_hint is ipaddress.IPv4Address | ||
| return ipaddress.IPv4Address(payload.data.decode()) | ||
|
|
||
| class IPv4AddressPayloadConverter(CompositePayloadConverter): | ||
| def __init__(self) -> None: | ||
| # Just add ours as first before the defaults | ||
| super().__init__( | ||
| IPv4AddressEncodingPayloadConverter(), | ||
| *DefaultPayloadConverter.default_encoding_payload_converters, | ||
| ) | ||
|
|
||
| my_data_converter = dataclasses.replace( | ||
| DataConverter.default, | ||
| payload_converter_className=IPv4AddressPayloadConverter, | ||
| ) | ||
| ``` | ||
|
|
||
| ### Customize the JSON converter for custom types | ||
|
|
||
| If you need your custom type to work in lists, unions, and other collections, customize the existing JSON converter instead of adding a new encoding converter. | ||
| The JSON converter is the last in the list, so it handles any otherwise unknown type. | ||
|
|
||
| Customize serialization with a custom `json.JSONEncoder` and deserialization with a custom `JSONTypeConverter`: | ||
|
|
||
| ```python | ||
| class IPv4AddressJSONEncoder(AdvancedJSONEncoder): | ||
| def default(self, o: Any) -> Any: | ||
| if isinstance(o, ipaddress.IPv4Address): | ||
| return str(o) | ||
| return super().default(o) | ||
|
|
||
| class IPv4AddressJSONTypeConverter(JSONTypeConverter): | ||
| def to_typed_value( | ||
| self, hint: Type, value: Any | ||
| ) -> Union[Optional[Any], _JSONTypeConverterUnhandled]: | ||
| if issubclass(hint, ipaddress.IPv4Address): | ||
| return ipaddress.IPv4Address(value) | ||
| return JSONTypeConverter.Unhandled | ||
|
|
||
| class IPv4AddressPayloadConverter(CompositePayloadConverter): | ||
| def __init__(self) -> None: | ||
| # Replace default JSON plain with our own that has our encoder and type | ||
| # converter | ||
| json_converter = JSONPlainPayloadConverter( | ||
| encoder=IPv4AddressJSONEncoder, | ||
| custom_type_converters=[IPv4AddressJSONTypeConverter()], | ||
| ) | ||
| super().__init__( | ||
| *[ | ||
| c if not isinstance(c, JSONPlainPayloadConverter) else json_converter | ||
| for c in DefaultPayloadConverter.default_encoding_payload_converters | ||
| ] | ||
| ) | ||
|
|
||
| my_data_converter = dataclasses.replace( | ||
| DataConverter.default, | ||
| payload_converter_className=IPv4AddressPayloadConverter, | ||
| ) | ||
| ``` | ||
|
|
||
| Now `IPv4Address` can be used in type hints including collections, optionals, etc. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| --- | ||
| id: data-encryption | ||
| title: Payload encryption - Python SDK | ||
| sidebar_label: Payload encryption | ||
| slug: /develop/python/data-handling/data-encryption | ||
| toc_max_heading_level: 2 | ||
| tags: | ||
| - Security | ||
| - Encryption | ||
| - Codec Server | ||
| - Python SDK | ||
| - Temporal SDKs | ||
| description: Encrypt data sent to and from the Temporal Service using a custom Payload Codec in the Python SDK. | ||
| --- | ||
|
|
||
| Payload Codecs transform `Payload` bytes after serialization (by the Payload Converter) and before the data is sent to the Temporal Service. | ||
| Unlike Payload Converters, codecs run outside the Workflow sandbox, so they can use non-deterministic operations and call external services. | ||
|
|
||
| The most common use case is encryption: encrypting payloads before they reach the Temporal Service so that sensitive data is never stored in plaintext. | ||
|
|
||
| ## PayloadCodec interface | ||
lennessyy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Implement a `PayloadCodec` with `encode()` and `decode()` methods. | ||
| These should loop through all of a Workflow's payloads, perform your marshaling, compression, or encryption steps in order, and set an `"encoding"` metadata field. | ||
|
|
||
| In this example, the `encode` method compresses a payload using Python's [cramjam](https://github.com/milesgranger/cramjam) library to provide `snappy` compression. | ||
| The `decode()` function implements the `encode()` logic in reverse: | ||
|
|
||
| ```python | ||
| import cramjam | ||
| from temporalio.api.common.v1 import Payload | ||
| from temporalio.converter import PayloadCodec | ||
|
|
||
| class EncryptionCodec(PayloadCodec): | ||
| async def encode(self, payloads: Iterable[Payload]) -> List[Payload]: | ||
| return [ | ||
| Payload( | ||
| metadata={ | ||
| "encoding": b"binary/snappy", | ||
| }, | ||
| data=(bytes(cramjam.snappy.compress(p.SerializeToString()))), | ||
| ) | ||
| for p in payloads | ||
| ] | ||
|
|
||
| async def decode(self, payloads: Iterable[Payload]) -> List[Payload]: | ||
| ret: List[Payload] = [] | ||
| for p in payloads: | ||
| if p.metadata.get("encoding", b"").decode() != "binary/snappy": | ||
| ret.append(p) | ||
| continue | ||
| ret.append(Payload.FromString(bytes(cramjam.snappy.decompress(p.data)))) | ||
| return ret | ||
| ``` | ||
|
|
||
| ## Configure the codec on the Data Converter | ||
|
|
||
| Add a `data_converter` parameter to your `Client.connect()` options that overrides the default converter with your Payload Codec: | ||
|
|
||
| ```python | ||
| from codec import EncryptionCodec | ||
|
|
||
| client = await Client.connect( | ||
| "localhost:7233", | ||
| data_converter=dataclasses.replace( | ||
| temporalio.converter.default(), payload_codec=EncryptionCodec() | ||
| ), | ||
| ) | ||
| ``` | ||
|
|
||
| For reference, see the [Encryption](https://github.com/temporalio/samples-python/tree/main/encryption) sample. | ||
|
|
||
| ## Codec Server | ||
|
|
||
| A Codec Server is an HTTP server that runs your `PayloadCodec` remotely, so that the Temporal Web UI and CLI can decode encrypted payloads for display. | ||
|
|
||
| For more information, see [Codec Server](/codec-server). | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,36 @@ | ||||||
| --- | ||||||
| id: data-handling | ||||||
| title: Data handling - Python SDK | ||||||
| sidebar_label: Data handling | ||||||
| slug: /develop/python/data-handling | ||||||
| description: | ||||||
| Learn how Temporal handles data through the Data Converter, including payload conversion, encryption, and large | ||||||
| payload storage. | ||||||
| toc_max_heading_level: 3 | ||||||
| tags: | ||||||
| - Python SDK | ||||||
| - Temporal SDKs | ||||||
| - Data Converters | ||||||
| --- | ||||||
|
|
||||||
| import { CaptionedImage } from '@site/src/components'; | ||||||
|
|
||||||
| All data sent to and from the Temporal Service passes through the **Data Converter**. The Data Converter has three | ||||||
| layers that handle different concerns: | ||||||
|
|
||||||
| <CaptionedImage | ||||||
| src="/diagrams/data-converter-flow-with-external-storage.svg" | ||||||
| title="The Flow of Data through a Data Converter" | ||||||
| /> | ||||||
|
|
||||||
| Of these three layers, only the PayloadConverter is required. Temporal uses a default PayloadConverter that handles JSON | ||||||
| serialization. The PayloadCodec and ExternalStorage layers are optional. You only need to customize these layers when | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we link to encyclopedia for external storage somewhere? |
||||||
| your application requires non-JSON types, encryption, or payload offloading. | ||||||
|
|
||||||
| | | [PayloadConverter](/develop/python/data-handling/data-conversion) | [PayloadCodec](/develop/python/data-handling/data-encryption) | [ExternalStorage](/develop/python/data-handling/large-payload-storage) | | ||||||
| | ------------------------- | ----------------------------------------------------------------- | ------------------------------------------------------------- | ---------------------------------------------------------------------- | | ||||||
| | **Purpose** | Serialize application data to bytes | Transform encoded payloads (encrypt, compress) | Offload large payloads to external store | | ||||||
| | **Must be deterministic** | Yes | No | No | | ||||||
|
||||||
| | **Default** | JSON serialization | None (passthrough) | None (passthrough) | | ||||||
|
||||||
| | **Default** | JSON serialization | None (passthrough) | None (passthrough) | | |
| | **Default** | JSON serialization | None (passthrough) | None (all payloads will be stored in Workflow History) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that "passthrough" is correct. It is whatever comes out of this "pipeline" of data handling is what is stored in workflow history and shouldn't be tied to the external storage step.
Uh oh!
There was an error while loading. Please reload this page.