forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshared.py
66 lines (53 loc) · 2.14 KB
/
shared.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import dataclasses
from typing import Any, Optional, Type
import temporalio.converter
from temporalio.api.common.v1 import Payload
from temporalio.converter import (
CompositePayloadConverter,
DefaultPayloadConverter,
EncodingPayloadConverter,
)
class GreetingInput:
def __init__(self, name: str) -> None:
self.name = name
class GreetingOutput:
def __init__(self, result: str) -> None:
self.result = result
class GreetingEncodingPayloadConverter(EncodingPayloadConverter):
@property
def encoding(self) -> str:
return "text/my-greeting-encoding"
def to_payload(self, value: Any) -> Optional[Payload]:
if isinstance(value, GreetingInput):
return Payload(
metadata={"encoding": self.encoding.encode(), "is_input": b"true"},
data=value.name.encode(),
)
elif isinstance(value, GreetingOutput):
return Payload(
metadata={"encoding": self.encoding.encode()},
data=value.result.encode(),
)
else:
return None
def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
if payload.metadata.get("is_input") == b"true":
# Confirm proper type hint if present
assert not type_hint or type_hint is GreetingInput
return GreetingInput(payload.data.decode())
else:
assert not type_hint or type_hint is GreetingOutput
return GreetingOutput(payload.data.decode())
class GreetingPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Just add ours as first before the defaults
super().__init__(
GreetingEncodingPayloadConverter(),
# TODO(cretz): Make this list available without instantiation - https://github.com/temporalio/sdk-python/issues/139
*DefaultPayloadConverter().converters.values(),
)
# Use the default data converter, but change the payload converter.
greeting_data_converter = dataclasses.replace(
temporalio.converter.default(),
payload_converter_class=GreetingPayloadConverter,
)