forked from cadence-workflow/cadence-python-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelper.py
More file actions
43 lines (35 loc) · 1.52 KB
/
helper.py
File metadata and controls
43 lines (35 loc) · 1.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from contextlib import asynccontextmanager
import pathlib
from typing import AsyncGenerator, Unpack, cast
from google.protobuf.proto_json import serialize, parse
from msgspec import json
from cadence import Registry
from cadence.api.v1 import history
from cadence.client import ClientOptions, Client
from cadence.worker import WorkerOptions, Worker
DOMAIN_NAME = "test-domain"
class CadenceHelper:
def __init__(self, options: ClientOptions, test_name: str, fspath: str) -> None:
self.options = options
self.test_name = test_name
self.fspath = fspath
@asynccontextmanager
async def worker(
self, registry: Registry, **kwargs: Unpack[WorkerOptions]
) -> AsyncGenerator[Worker, None]:
async with self.client() as client:
async with Worker(client, self.test_name, registry, **kwargs) as w:
yield w
def load_history(self, path: str) -> history.History:
file = pathlib.Path(self.fspath).with_name(path)
with file.open("rb") as fp:
as_json = json.decode(fp.read())
return cast(history.History, parse(history.History, as_json))
# This isn't useful in a test, but it's useful while writing tests.
def write_history(self, path: str, h: history.History) -> None:
file = pathlib.Path(self.fspath).with_name(path)
with file.open("wb") as fp:
as_json = json.encode(serialize(h, preserving_proto_field_name=True))
fp.write(as_json)
def client(self):
return Client(**self.options)