Skip to content

Commit 1045b78

Browse files
authored
Propagate Nexus Operation-Timeout header (temporalio#6676)
## What changed? <!-- Describe what has changed in this PR --> Added support to propagate the new `Operation-Timeout` header. If this timeout is shorter than the context timeout, the context timeout will be reduced. ## Why? <!-- Tell your future self why have you made these changes --> ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> New tests
1 parent d92011a commit 1045b78

File tree

9 files changed

+228
-48
lines changed

9 files changed

+228
-48
lines changed

common/nexus/util.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in
13+
// all copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
// THE SOFTWARE.
22+
23+
package nexus
24+
25+
import (
26+
"strconv"
27+
"time"
28+
)
29+
30+
// FormatDuration converts a duration into a string representation in millisecond resolution.
31+
// TODO: replace this with the version exported from the Nexus SDK
32+
func FormatDuration(d time.Duration) string {
33+
return strconv.FormatInt(d.Milliseconds(), 10) + "ms"
34+
}

components/nexusoperations/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ var RequestTimeout = dynamicconfig.NewDestinationDurationSetting(
3939
`RequestTimeout is the timeout for making a single nexus start or cancel request.`,
4040
)
4141

42+
var MinOperationTimeout = dynamicconfig.NewNamespaceDurationSetting(
43+
"componenet.nexusoperations.limit.operation.timeout.min",
44+
0,
45+
`MinOperationTimeout is the minimum time remaining for an operation to complete for the server to make
46+
RPCs. If the remaining operation timeout is less than this value, a non-retryable timeout error will be returned.`,
47+
)
48+
4249
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
4350
"component.nexusoperations.limit.operation.concurrency",
4451
// Temporary limit due to a persistence limitation, this will be increased when we change persistence to accept
@@ -136,6 +143,7 @@ var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
136143
type Config struct {
137144
Enabled dynamicconfig.BoolPropertyFn
138145
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
146+
MinOperationTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
139147
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
140148
MaxServiceNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
141149
MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
@@ -152,6 +160,7 @@ func ConfigProvider(dc *dynamicconfig.Collection) *Config {
152160
return &Config{
153161
Enabled: dynamicconfig.EnableNexus.Get(dc),
154162
RequestTimeout: RequestTimeout.Get(dc),
163+
MinOperationTimeout: MinOperationTimeout.Get(dc),
155164
MaxConcurrentOperations: MaxConcurrentOperations.Get(dc),
156165
MaxServiceNameLength: MaxServiceNameLength.Get(dc),
157166
MaxOperationNameLength: MaxOperationNameLength.Get(dc),

components/nexusoperations/executors.go

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ import (
5050
"go.uber.org/fx"
5151
)
5252

53+
var ErrOperationTimeoutBelowMin = errors.New("remaining operation timeout is less than required minimum")
54+
5355
// ClientProvider provides a nexus client for a given endpoint.
5456
type ClientProvider func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexus.Client, error)
5557

@@ -143,7 +145,6 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
143145
return err
144146
}
145147

146-
header := nexus.Header(args.header)
147148
if e.Config.CallbackURLTemplate() == "unset" {
148149
return serviceerror.NewInternal(fmt.Sprintf("dynamic config %q is unset", CallbackURLTemplate.Key().String()))
149150
}
@@ -190,23 +191,38 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
190191
return fmt.Errorf("%w: %w", queues.NewUnprocessableTaskError("failed to generate a callback token"), err)
191192
}
192193

193-
callCtx, cancel := context.WithTimeout(
194-
ctx,
195-
e.Config.RequestTimeout(ns.Name().String(), task.EndpointName),
196-
)
194+
header := nexus.Header(args.header)
195+
callTimeout := e.Config.RequestTimeout(ns.Name().String(), task.EndpointName)
196+
if args.scheduleToCloseTimeout > 0 {
197+
opTimeout := args.scheduleToCloseTimeout - time.Since(args.scheduledTime)
198+
callTimeout = min(callTimeout, opTimeout)
199+
if opTimeoutHeader := header.Get(nexus.HeaderOperationTimeout); opTimeoutHeader == "" {
200+
if header == nil {
201+
header = make(nexus.Header, 1)
202+
}
203+
header[nexus.HeaderOperationTimeout] = opTimeout.String()
204+
}
205+
}
206+
207+
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
197208
defer cancel()
198209

199-
// Make the call and record metrics.
200210
startTime := time.Now()
201-
rawResult, callErr := client.StartOperation(callCtx, args.operation, args.payload, nexus.StartOperationOptions{
202-
Header: header,
203-
CallbackURL: callbackURL,
204-
RequestID: args.requestID,
205-
CallbackHeader: nexus.Header{
206-
commonnexus.CallbackTokenHeader: token,
207-
},
208-
Links: []nexus.Link{args.nexusLink},
209-
})
211+
var rawResult *nexus.ClientStartOperationResult[*nexus.LazyValue]
212+
var callErr error
213+
if callTimeout < e.Config.MinOperationTimeout(ns.Name().String()) {
214+
callErr = ErrOperationTimeoutBelowMin
215+
} else {
216+
rawResult, callErr = client.StartOperation(callCtx, args.operation, args.payload, nexus.StartOperationOptions{
217+
Header: header,
218+
CallbackURL: callbackURL,
219+
RequestID: args.requestID,
220+
CallbackHeader: nexus.Header{
221+
commonnexus.CallbackTokenHeader: token,
222+
},
223+
Links: []nexus.Link{args.nexusLink},
224+
})
225+
}
210226

211227
methodTag := metrics.NexusMethodTag("StartOperation")
212228
namespaceTag := metrics.NamespaceTag(ns.Name().String())
@@ -259,6 +275,8 @@ type startArgs struct {
259275
requestID string
260276
endpointName string
261277
endpointID string
278+
scheduledTime time.Time
279+
scheduleToCloseTimeout time.Duration
262280
header map[string]string
263281
payload *commonpb.Payload
264282
nexusLink nexus.Link
@@ -294,6 +312,8 @@ func (e taskExecutor) loadOperationArgs(
294312
if err != nil {
295313
return nil
296314
}
315+
args.scheduledTime = event.EventTime.AsTime()
316+
args.scheduleToCloseTimeout = event.GetNexusOperationScheduledEventAttributes().GetScheduleToCloseTimeout().AsDuration()
297317
args.payload = event.GetNexusOperationScheduledEventAttributes().GetInput()
298318
args.header = event.GetNexusOperationScheduledEventAttributes().GetNexusHeader()
299319
args.nexusLink = ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{
@@ -391,6 +411,9 @@ func (e taskExecutor) handleStartOperationError(env hsm.Environment, node *hsm.N
391411
// Following practices from workflow task completion payload size limit enforcement, we do not retry this
392412
// operation if the response body is too large.
393413
return handleNonRetryableStartOperationError(env, node, operation, callErr.Error())
414+
} else if errors.Is(callErr, ErrOperationTimeoutBelowMin) {
415+
// Operation timeout is not retryable
416+
return handleNonRetryableStartOperationError(env, node, operation, callErr.Error())
394417
}
395418
return TransitionAttemptFailed.Apply(operation, EventAttemptFailed{
396419
Time: env.Now(),
@@ -514,15 +537,21 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro
514537
return fmt.Errorf("failed to get handle for operation: %w", err)
515538
}
516539

517-
callCtx, cancel := context.WithTimeout(
518-
ctx,
519-
e.Config.RequestTimeout(ns.Name().String(), task.EndpointName),
520-
)
540+
callTimeout := e.Config.RequestTimeout(ns.Name().String(), task.EndpointName)
541+
if args.scheduleToCloseTimeout > 0 {
542+
opTimeout := args.scheduleToCloseTimeout - time.Since(args.scheduledTime)
543+
callTimeout = min(callTimeout, opTimeout)
544+
}
545+
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
521546
defer cancel()
522547

523-
// Make the call and record metrics.
548+
var callErr error
524549
startTime := time.Now()
525-
callErr := handle.Cancel(callCtx, nexus.CancelOperationOptions{})
550+
if callTimeout < e.Config.MinOperationTimeout(ns.Name().String()) {
551+
callErr = ErrOperationTimeoutBelowMin
552+
} else {
553+
callErr = handle.Cancel(callCtx, nexus.CancelOperationOptions{})
554+
}
526555

527556
methodTag := metrics.NexusMethodTag("CancelOperation")
528557
namespaceTag := metrics.NamespaceTag(ns.Name().String())
@@ -546,6 +575,8 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro
546575

547576
type cancelArgs struct {
548577
service, operation, operationID, endpointID, endpointName string
578+
scheduledTime time.Time
579+
scheduleToCloseTimeout time.Duration
549580
}
550581

551582
// loadArgsForCancelation loads state from the operation state machine that's the parent of the cancelation machine the
@@ -566,6 +597,8 @@ func (e taskExecutor) loadArgsForCancelation(ctx context.Context, env hsm.Enviro
566597
args.operationID = op.OperationId
567598
args.endpointID = op.EndpointId
568599
args.endpointName = op.Endpoint
600+
args.scheduledTime = op.ScheduledTime.AsTime()
601+
args.scheduleToCloseTimeout = op.ScheduleToCloseTimeout.AsDuration()
569602
return nil
570603
})
571604
return
@@ -575,6 +608,13 @@ func (e taskExecutor) saveCancelationResult(ctx context.Context, env hsm.Environ
575608
return env.Access(ctx, ref, hsm.AccessWrite, func(n *hsm.Node) error {
576609
return hsm.MachineTransition(n, func(c Cancelation) (hsm.TransitionOutput, error) {
577610
if callErr != nil {
611+
if errors.Is(callErr, ErrOperationTimeoutBelowMin) {
612+
return TransitionCancelationFailed.Apply(c, EventCancelationFailed{
613+
Time: env.Now(),
614+
Err: callErr,
615+
Node: n,
616+
})
617+
}
578618
var handlerErr *nexus.HandlerError
579619
if errors.As(callErr, &handlerErr) {
580620
if !isRetryableHandlerError(handlerErr.Type) {
@@ -647,6 +687,9 @@ func startCallOutcomeTag(callCtx context.Context, result *nexus.ClientStartOpera
647687
var opFailedError *nexus.UnsuccessfulOperationError
648688

649689
if callErr != nil {
690+
if errors.Is(callErr, ErrOperationTimeoutBelowMin) {
691+
return "operation-timeout"
692+
}
650693
if callCtx.Err() != nil {
651694
return "request-timeout"
652695
}
@@ -666,6 +709,9 @@ func startCallOutcomeTag(callCtx context.Context, result *nexus.ClientStartOpera
666709
func cancelCallOutcomeTag(callCtx context.Context, callErr error) string {
667710
var handlerErr *nexus.HandlerError
668711
if callErr != nil {
712+
if errors.Is(callErr, ErrOperationTimeoutBelowMin) {
713+
return "operation-timeout"
714+
}
669715
if callCtx.Err() != nil {
670716
return "request-timeout"
671717
}
@@ -709,5 +755,8 @@ func isDestinationDown(err error) bool {
709755
if errors.Is(err, ErrResponseBodyTooLarge) {
710756
return false
711757
}
758+
if errors.Is(err, ErrOperationTimeoutBelowMin) {
759+
return false
760+
}
712761
return true
713762
}

0 commit comments

Comments
 (0)