-
Notifications
You must be signed in to change notification settings - Fork 298
Expand file tree
/
Copy pathinternal_nexus_task_poller.go
More file actions
246 lines (218 loc) · 8.67 KB
/
internal_nexus_task_poller.go
File metadata and controls
246 lines (218 loc) · 8.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package internal
import (
"context"
"time"
"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/log"
)
type nexusTaskPoller struct {
basePoller
namespace string
taskQueueName string
identity string
service workflowservice.WorkflowServiceClient
taskHandler *nexusTaskHandler
logger log.Logger
numPollerMetric *numPollerMetric
pollerGroupTracker *pollerGroupTracker
}
type nexusTask struct {
task *workflowservice.PollNexusTaskQueueResponse
}
var _ taskPoller = &nexusTaskPoller{}
func newNexusTaskPoller(
taskHandler *nexusTaskHandler,
service workflowservice.WorkflowServiceClient,
params workerExecutionParameters,
) *nexusTaskPoller {
return &nexusTaskPoller{
basePoller: basePoller{
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.DeploymentOptions.Version,
capabilities: params.capabilities,
pollTimeTracker: params.pollTimeTracker,
workerInstanceKey: params.workerInstanceKey,
workerPollCompleteOnShutdown: params.workerPollCompleteOnShutdown,
},
taskHandler: taskHandler,
service: service,
namespace: params.Namespace,
taskQueueName: params.TaskQueue,
identity: params.Identity,
logger: params.Logger,
numPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeNexusTask),
pollerGroupTracker: newPollerGroupTracker(),
}
}
// Poll the nexus task queue and update the num_poller metric
func (ntp *nexusTaskPoller) pollNexusTaskQueue(ctx context.Context, request *workflowservice.PollNexusTaskQueueRequest) (*workflowservice.PollNexusTaskQueueResponse, error) {
ntp.numPollerMetric.increment()
defer ntp.numPollerMetric.decrement()
return ntp.service.PollNexusTaskQueue(ctx, request)
}
func (ntp *nexusTaskPoller) poll(ctx context.Context) (taskForWorker, error) {
traceLog(func() {
ntp.logger.Debug("nexusTaskPoller::Poll")
})
groupId := ntp.pollerGroupTracker.getNextGroupId()
defer ntp.pollerGroupTracker.release(groupId)
request := &workflowservice.PollNexusTaskQueueRequest{
Namespace: ntp.namespace,
TaskQueue: &taskqueuepb.TaskQueue{Name: ntp.taskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Identity: ntp.identity,
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: ntp.workerBuildID,
UseVersioning: ntp.useBuildIDVersioning,
DeploymentSeriesName: ntp.workerDeploymentVersion.DeploymentName,
},
DeploymentOptions: workerDeploymentOptionsToProto(
ntp.useBuildIDVersioning,
ntp.workerDeploymentVersion,
),
WorkerInstanceKey: ntp.workerInstanceKey,
PollerGroupId: groupId,
}
response, err := ntp.pollNexusTaskQueue(ctx, request)
if err != nil {
return nil, err
}
ntp.pollerGroupTracker.updateGroups(response.GetPollerGroupInfos())
if response == nil || len(response.TaskToken) == 0 {
// No operation info is available on empty poll. Emit using base scope.
ntp.metricsHandler.Counter(metrics.NexusPollNoTaskCounter).Inc(1)
return nil, nil
}
ntp.pollTimeTracker.recordPollSuccess(metrics.PollerTypeNexusTask)
return &nexusTask{task: response}, nil
}
// PollTask polls a new task
func (ntp *nexusTaskPoller) PollTask() (taskForWorker, error) {
return ntp.doPoll(ntp.poll)
}
// ProcessTask processes a new task
func (ntp *nexusTaskPoller) ProcessTask(task interface{}) error {
if ntp.stopping() {
return errStop
}
response := task.(*nexusTask).task
if response.GetRequest() == nil {
// We didn't get a request, poll must have timed out.
traceLog(func() {
ntp.logger.Debug("Empty Nexus poll response")
})
return nil
}
executionStartTime := time.Now()
// Schedule-to-start (from the time the request hit the frontend).
// Note that this metric does not include the service and operation name as they are not relevant when polling from
// the Nexus task queue.
scheduleToStartLatency := executionStartTime.Sub(response.GetRequest().GetScheduledTime().AsTime())
ntp.metricsHandler.WithTags(metrics.TaskQueueTags(ntp.taskQueueName)).Timer(metrics.NexusTaskScheduleToStartLatency).Record(scheduleToStartLatency)
nctx, handlerErr := ntp.taskHandler.newNexusOperationContext(response)
if handlerErr != nil {
// context wasn't propagated to us, use a background context.
failedRequest, err := ntp.taskHandler.fillInFailure(response.TaskToken, handlerErr, getEffectiveTemporalFailureResponses(response.GetRequest().GetCapabilities().GetTemporalFailureResponses()), response.GetPollerGroupId())
if err != nil {
return err
}
_, err = ntp.taskHandler.client.WorkflowService().RespondNexusTaskFailed(
context.Background(), failedRequest)
return err
}
// Process the nexus task.
res, failure, err := ntp.taskHandler.ExecuteContext(nctx, response)
// Execution latency (in-SDK processing time).
nctx.metricsHandler.Timer(metrics.NexusTaskExecutionLatency).Record(time.Since(executionStartTime))
// Increment failure in all forms of errors:
// Internal error processing the task.
// Failure from user handler.
// Special case for the start response with operation error.
if err != nil {
var failureTag string
if err == errNexusTaskTimeout {
failureTag = "timeout"
} else {
failureTag = "internal_sdk_error"
}
nctx.log.Error("Error processing nexus task", "error", err)
nctx.metricsHandler.
WithTags(metrics.NexusTaskFailureTags(failureTag)).
Counter(metrics.NexusTaskExecutionFailedCounter).
Inc(1)
} else if failure != nil {
//lint:ignore SA1019 handle legacy operation error format for backward compatibility.
taskErr := failure.GetError()
if taskErr != nil {
nctx.metricsHandler.
WithTags(metrics.NexusTaskFailureTags("handler_error_" + taskErr.GetErrorType())).
Counter(metrics.NexusTaskExecutionFailedCounter).
Inc(1)
} else if failure.GetFailure() != nil {
// Failure must contain a NexusHandlerFailureInfo
nctx.metricsHandler.
WithTags(metrics.NexusTaskFailureTags("handler_error_" + failure.GetFailure().GetNexusHandlerFailureInfo().GetType())).
Counter(metrics.NexusTaskExecutionFailedCounter).
Inc(1)
}
//lint:ignore SA1019 handle legacy operation error format for backward compatibility.
} else if e := res.Response.GetStartOperation().GetOperationError(); e != nil {
nctx.metricsHandler.
WithTags(metrics.NexusTaskFailureTags("operation_" + e.GetOperationState())).
Counter(metrics.NexusTaskExecutionFailedCounter).
Inc(1)
} else if f := res.Response.GetStartOperation().GetFailure(); f != nil {
if sf := f.GetApplicationFailureInfo(); sf != nil {
nctx.metricsHandler.
WithTags(metrics.NexusTaskFailureTags("operation_" + string(nexus.OperationStateFailed))).
Counter(metrics.NexusTaskExecutionFailedCounter).
Inc(1)
} else if cf := f.GetCanceledFailureInfo(); cf != nil {
nctx.metricsHandler.
WithTags(metrics.NexusTaskFailureTags("operation_" + string(nexus.OperationStateCanceled))).
Counter(metrics.NexusTaskExecutionFailedCounter).
Inc(1)
}
}
// Let the poller machinery drop the task, nothing to report back.
// This is only expected due to context deadline errors.
if err != nil {
return err
}
if err := ntp.reportCompletion(res, failure); err != nil {
traceLog(func() {
ntp.logger.Debug("reportNexusTaskComplete failed", tagError, err)
})
return err
}
// E2E latency, from frontend until we finished reporting completion.
nctx.metricsHandler.
Timer(metrics.NexusTaskEndToEndLatency).
Record(time.Since(response.GetRequest().GetScheduledTime().AsTime()))
return nil
}
func (ntp *nexusTaskPoller) reportCompletion(
completion *workflowservice.RespondNexusTaskCompletedRequest,
failure *workflowservice.RespondNexusTaskFailedRequest,
) error {
ctx := context.Background()
// No workflow or activity tags to report.
// Task queue expected to be empty for Respond*Task... requests.
rpcMetricsHandler := ntp.metricsHandler.WithTags(metrics.RPCTags(metrics.NoneTagValue, metrics.NoneTagValue, metrics.NoneTagValue))
ctx, cancel := newGRPCContext(ctx, grpcMetricsHandler(rpcMetricsHandler),
defaultGrpcRetryParameters(ctx))
defer cancel()
if failure != nil {
_, err := ntp.taskHandler.client.WorkflowService().RespondNexusTaskFailed(ctx, failure)
return err
}
_, err := ntp.taskHandler.client.WorkflowService().RespondNexusTaskCompleted(ctx, completion)
return err
}