-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy pathnexus_service.py
169 lines (137 loc) · 6.28 KB
/
nexus_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
Notes:
Sync operations:
---------------
Implementations are free to make arbitrary network calls, or perform CPU-bound
computations such as this one. Total execution duration must not exceed 10s. To
perform Temporal client calls such as signaling/querying/listing workflows, use
self.client.
Workflow operations:
---------------------
The task queue defaults to the task queue being used by the Nexus worker.
"""
from __future__ import annotations
import nexusrpc.handler
import temporalio.common
import temporalio.nexus.handler
from nexus.handler.dbclient import MyDBClient
from nexus.handler.workflows import HelloWorkflow
from nexus.service import interface
from nexus.service.interface import (
EchoInput,
EchoOutput,
HelloInput,
HelloOutput,
)
# Inheriting from the protocol here is optional. Users who do it will get the
# operation definition itself type-checked in situ against the interface (*).
# Call-sites using instances of the operation are always type-checked.
#
# (*) However, in VSCode/Pyright this is done only when type-checking is set to
# 'strict'.
class EchoOperation(nexusrpc.handler.Operation[EchoInput, EchoOutput]):
def __init__(self, service: MyNexusService):
self.service = service
async def start(
self, input: EchoInput, options: nexusrpc.handler.StartOperationOptions
) -> EchoOutput:
return EchoOutput(message=f"Echo {input.message}!")
async def cancel(
self, token: str, options: nexusrpc.handler.CancelOperationOptions
) -> None:
raise NotImplementedError
async def fetch_info(
self, token: str, options: nexusrpc.handler.FetchOperationInfoOptions
) -> nexusrpc.handler.OperationInfo:
raise NotImplementedError
async def fetch_result(
self, token: str, options: nexusrpc.handler.FetchOperationResultOptions
) -> EchoOutput:
raise NotImplementedError
# Inheriting from the protocol here is optional. Users who do it will get the
# operation definition itself type-checked in situ against the interface (*).
# Call-sites using instances of the operation are always type-checked.
#
# (*) However, in VSCode/Pyright this is done only when type-checking is set to
# 'strict'.
class HelloOperation: # (nexusrpc.handler.Operation[HelloInput, HelloOutput]):
def __init__(self, service: "MyNexusService"):
self.service = service
async def start(
self, input: HelloInput, options: nexusrpc.handler.StartOperationOptions
) -> temporalio.nexus.handler.AsyncWorkflowOperationResult[HelloOutput]:
self.service.db_client.execute("<some query>")
workflow_id = "default-workflow-id"
return await temporalio.nexus.handler.start_workflow(
HelloWorkflow.run, input, workflow_id, options
)
async def cancel(
self, token: str, options: nexusrpc.handler.CancelOperationOptions
) -> None:
return await temporalio.nexus.handler.cancel_workflow(token, options)
async def fetch_info(
self, token: str, options: nexusrpc.handler.FetchOperationInfoOptions
) -> nexusrpc.handler.OperationInfo:
return await temporalio.nexus.handler.fetch_workflow_info(token, options)
async def fetch_result(
self, token: str, options: nexusrpc.handler.FetchOperationResultOptions
) -> HelloOutput:
return await temporalio.nexus.handler.fetch_workflow_result(token, options)
class EchoOperation3(nexusrpc.handler.AbstractOperation[EchoInput, EchoOutput]):
async def start(
self, input: EchoInput, options: nexusrpc.handler.StartOperationOptions
) -> EchoOutput:
return EchoOutput(message=f"Echo {input.message}! [from base class variant]")
@nexusrpc.handler.service(interface=interface.MyNexusService)
class MyNexusService:
def __init__(self, db_client: MyDBClient):
# An example of something that might be held by the service instance.
self.db_client = db_client
# --------------------------------------------------------------------------
# Operations defined by explicitly implementing the Operation interface.
#
@nexusrpc.handler.operation
def echo(self) -> nexusrpc.handler.Operation[EchoInput, EchoOutput]:
return EchoOperation(self)
@nexusrpc.handler.operation
def hello(self) -> nexusrpc.handler.Operation[HelloInput, HelloOutput]:
return HelloOperation(self)
@nexusrpc.handler.operation
def echo3(self) -> nexusrpc.handler.Operation[EchoInput, EchoOutput]:
return EchoOperation3()
# --------------------------------------------------------------------------
# Operations defined by providing the start method only, using the
# "shorthand" decorators.
#
# Note that a start method defined this way has access to the service
# instance, but not to the operation instance (users who need the latter
# should implement the Operation interface directly).
@nexusrpc.handler.sync_operation
async def echo2(
self, input: EchoInput, _: nexusrpc.handler.StartOperationOptions
) -> EchoOutput:
return EchoOutput(message=f"Echo {input.message} [via shorthand]!")
# --------------------------------------------------------------------------
# Operations defined by providing the start method only, using the
# "shorthand" decorators.
#
# Note that a start method defined this way has access to the service
# instance, but not to the operation instance (users who need the latter
# should implement the Operation interface directly).
@temporalio.nexus.handler.workflow_operation
async def hello2(
self, input: HelloInput, options: nexusrpc.handler.StartOperationOptions
) -> temporalio.nexus.handler.AsyncWorkflowOperationResult[HelloOutput]:
self.db_client.execute("<some query>")
workflow_id = "default-workflow-id"
input.name += " [via shorthand]"
return await temporalio.nexus.handler.start_workflow(
HelloWorkflow.run, input, workflow_id, options
)
if __name__ == "__main__":
# Check run-time type annotations resulting from the decorators.
service = MyNexusService(MyDBClient())
print("echo:", temporalio.common._type_hints_from_func(service.echo2().start))
print(
"hello:", temporalio.common._type_hints_from_func(service.hello2().fetch_result)
)