forked from cadence-workflow/cadence-python-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
83 lines (65 loc) · 2.67 KB
/
client.py
File metadata and controls
83 lines (65 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import socket
from typing import TypedDict, Unpack, Any, cast
from grpc import ChannelCredentials, Compression
from cadence._internal.rpc.yarpc import YarpcMetadataInterceptor
from cadence.api.v1.service_worker_pb2_grpc import WorkerAPIStub
from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel
from cadence.data_converter import DataConverter, DefaultDataConverter
class ClientOptions(TypedDict, total=False):
domain: str
target: str
data_converter: DataConverter
identity: str
service_name: str
caller_name: str
channel_arguments: dict[str, Any]
credentials: ChannelCredentials | None
compression: Compression
interceptors: list[ClientInterceptor]
_DEFAULT_OPTIONS: ClientOptions = {
"data_converter": DefaultDataConverter(),
"identity": f"{os.getpid()}@{socket.gethostname()}",
"service_name": "cadence-frontend",
"caller_name": "cadence-client",
"channel_arguments": {},
"credentials": None,
"compression": Compression.NoCompression,
"interceptors": [],
}
class Client:
def __init__(self, **kwargs: Unpack[ClientOptions]) -> None:
self._options = _validate_and_copy_defaults(ClientOptions(**kwargs))
self._channel = _create_channel(self._options)
self._worker_stub = WorkerAPIStub(self._channel)
@property
def data_converter(self) -> DataConverter:
return self._options["data_converter"]
@property
def domain(self) -> str:
return self._options["domain"]
@property
def identity(self) -> str:
return self._options["identity"]
@property
def worker_stub(self) -> WorkerAPIStub:
return self._worker_stub
async def close(self) -> None:
await self._channel.close()
def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions:
if "target" not in options:
raise ValueError("target must be specified")
if "domain" not in options:
raise ValueError("domain must be specified")
# Set default values for missing options
for key, value in _DEFAULT_OPTIONS.items():
if key not in options:
cast(dict, options)[key] = value
return options
def _create_channel(options: ClientOptions) -> Channel:
interceptors = list(options["interceptors"])
interceptors.append(YarpcMetadataInterceptor(options["service_name"], options["caller_name"]))
if options["credentials"]:
return secure_channel(options["target"], options["credentials"], options["channel_arguments"], options["compression"], interceptors)
else:
return insecure_channel(options["target"], options["channel_arguments"], options["compression"], interceptors)