Skip to content

Commit 0883ca2

Browse files
authored
Keep syncing User Data as long as version changes (temporalio#7320)
## What changed? <!-- Describe what has changed in this PR --> UserDataManger in child partitions constantly fetches data from the parent partition. There is a safely mechanism to prevent busy loop in case the parent returns result immediately instead of only when the data is updated. The safety mechanism did not work well with Versioning 3 because many updates are involved in a single tests. With this update we relax the safety check to only apply when the user data version remains the same and continue to fetch the data asap as long as each time it gets new data. Here, also I'm relaxing the "requested task queue user data for version greater than known version" errors to be thrown only for the root partition, as they can normally happen for child partitions. ## Why? <!-- Tell your future self why have you made these changes --> So that back-to-back Versioning 3 APIs do not have to wait for a long time. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Existing tests. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) --> Yes.
1 parent 288f4ce commit 0883ca2

File tree

4 files changed

+51
-34
lines changed

4 files changed

+51
-34
lines changed

config/dynamicconfig/development-cass.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ system.enableDeployments:
4040
- value: true
4141
system.enableDeploymentVersions:
4242
- value: true
43+
frontend.workerVersioningRuleAPIs:
44+
- value: true
45+
frontend.enableExecuteMultiOperation:
46+
- value: true
4347
system.enableNexus:
4448
- value: true
4549
component.nexusoperations.callback.endpoint.template:

config/dynamicconfig/development-sql.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ frontend.workerVersioningDataAPIs:
3939
- value: true
4040
frontend.workerVersioningWorkflowAPIs:
4141
- value: true
42+
frontend.workerVersioningRuleAPIs:
43+
- value: true
44+
system.enableDeploymentVersions:
45+
- value: true
46+
system.enableDeployments:
47+
- value: true
48+
frontend.enableExecuteMultiOperation:
49+
- value: true
4250
system.enableNexus:
4351
- value: true
4452
component.nexusoperations.callback.endpoint.template:

service/matching/user_data_manager.go

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ const (
5858
userDataClosed
5959
)
6060

61-
const maxFastUserDataFetches = 10
61+
const maxFastUserDataFetches = 5
6262

6363
type (
6464
userDataManager interface {
@@ -288,9 +288,11 @@ func (m *userDataManagerImpl) fetchUserData(ctx context.Context) error {
288288
// hasFetchedUserData is true if we have gotten a successful reply to GetTaskQueueUserData.
289289
// It's used to control whether we do a long poll or a simple get.
290290
hasFetchedUserData := false
291+
userDataVersionChanged := false
291292

292293
op := func(ctx context.Context) error {
293294
knownUserData, _, _ := m.GetUserData()
295+
userDataVersionChanged = false
294296

295297
callCtx, cancel := context.WithTimeout(ctx, m.config.GetUserDataLongPollTimeout())
296298
defer cancel()
@@ -323,6 +325,7 @@ func (m *userDataManagerImpl) fetchUserData(ctx context.Context) error {
323325
// nil inner fields.
324326
if res.GetUserData() != nil {
325327
m.setUserDataForNonOwningPartition(res.GetUserData())
328+
userDataVersionChanged = res.GetUserData().GetVersion() != knownUserData.GetVersion()
326329
m.logNewUserData("fetched user data from parent", res.GetUserData())
327330
} else {
328331
m.logger.Debug("fetched user data from parent, no change")
@@ -344,7 +347,8 @@ func (m *userDataManagerImpl) fetchUserData(ctx context.Context) error {
344347
// one. But if the remote is broken and returns success immediately, we might end up
345348
// spinning. So enforce a minimum wait time that increases as long as we keep getting
346349
// very fast replies.
347-
if elapsed < m.config.GetUserDataMinWaitTime {
350+
// If the user data version changed it means new data was received so we skip this check.
351+
if !userDataVersionChanged && elapsed < m.config.GetUserDataMinWaitTime {
348352
if fastResponseCounter >= maxFastUserDataFetches {
349353
// maxFastUserDataFetches or more consecutive fast responses, let's throttle!
350354
util.InterruptibleSleep(ctx, minWaitTime-elapsed)
@@ -575,7 +579,28 @@ func (m *userDataManagerImpl) HandleGetUserDataRequest(
575579
} else if err != nil {
576580
return nil, err
577581
}
578-
if req.WaitNewData && userData.GetVersion() == version {
582+
if userData.GetVersion() > version {
583+
resp.UserData = userData
584+
m.logger.Info("returning user data",
585+
tag.NewBoolTag("long-poll", req.WaitNewData),
586+
tag.NewInt64("request-known-version", version),
587+
tag.UserDataVersion(userData.Version),
588+
)
589+
} else if userData != nil && userData.Version < version && m.store != nil {
590+
// When m.store == nil it means this is a non-owner partition, so it is possible
591+
// for the requested version to be greater than the known version if there are
592+
// concurrent user data updates in flight. We do not log an error in that case.
593+
594+
// This is highly unlikely to happen in the owner/root partition but may happen
595+
// due to an edge case in during ownership transfer.
596+
// We rely on client retries in this case to let the system eventually self-heal.
597+
m.logger.Error("requested task queue user data for version greater than known version",
598+
tag.NewInt64("request-known-version", version),
599+
tag.UserDataVersion(userData.Version),
600+
)
601+
return nil, errRequestedVersionTooLarge
602+
}
603+
if req.WaitNewData && userData.GetVersion() <= version {
579604
// long-poll: wait for data to change/appear
580605
select {
581606
case <-ctx.Done():
@@ -590,31 +615,7 @@ func (m *userDataManagerImpl) HandleGetUserDataRequest(
590615
continue
591616
}
592617
}
593-
if userData != nil {
594-
if userData.Version > version {
595-
resp.UserData = userData
596-
m.logger.Info("returning user data",
597-
tag.NewBoolTag("long-poll", req.WaitNewData),
598-
tag.NewInt64("request-known-version", version),
599-
tag.UserDataVersion(userData.Version),
600-
)
601-
} else if userData.Version < version {
602-
if m.store != nil {
603-
// When m.store == nil it means this is a non-owner partition, so it is possible
604-
// for the requested version to be greater than the known version if there are
605-
// concurrent user data updates in flight. We do not log an error in that case.
606-
607-
// This is highly unlikely to happen in the owner/root partition but may happen
608-
// due to an edge case in during ownership transfer.
609-
// We rely on client retries in this case to let the system eventually self-heal.
610-
m.logger.Error("requested task queue user data for version greater than known version",
611-
tag.NewInt64("request-known-version", version),
612-
tag.UserDataVersion(userData.Version),
613-
)
614-
}
615-
return nil, errRequestedVersionTooLarge
616-
}
617-
} else {
618+
if userData == nil {
618619
m.logger.Debug("returning empty user data (no data)", tag.NewBoolTag("long-poll", req.WaitNewData))
619620
}
620621
return resp, nil

service/matching/user_data_manager_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func TestUserData_FetchesOnInit(t *testing.T) {
331331
}).
332332
Return(&matchingservice.GetTaskQueueUserDataResponse{
333333
UserData: data1,
334-
}, nil).MaxTimes(maxFastUserDataFetches)
334+
}, nil).MaxTimes(maxFastUserDataFetches + 1)
335335

336336
m := createUserDataManager(t, controller, tqCfg)
337337
m.config.GetUserDataMinWaitTime = 10 * time.Second // only one fetch
@@ -468,7 +468,9 @@ func TestUserData_RetriesFetchOnUnavailable(t *testing.T) {
468468
}).
469469
Return(&matchingservice.GetTaskQueueUserDataResponse{
470470
UserData: data1,
471-
}, nil).MaxTimes(maxFastUserDataFetches)
471+
}, nil).
472+
// +3 because the counter resets when version changes so the calls with error do not count
473+
MaxTimes(maxFastUserDataFetches + 3)
472474

473475
m := createUserDataManager(t, controller, tqCfg)
474476
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success
@@ -552,7 +554,9 @@ func TestUserData_RetriesFetchOnUnImplemented(t *testing.T) {
552554
}).
553555
Return(&matchingservice.GetTaskQueueUserDataResponse{
554556
UserData: data1,
555-
}, nil).MaxTimes(maxFastUserDataFetches)
557+
}, nil).
558+
// +3 because the counter resets when version changes so the calls with error do not count
559+
MaxTimes(maxFastUserDataFetches + 3)
556560

557561
m := createUserDataManager(t, controller, tqCfg)
558562
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success
@@ -621,7 +625,7 @@ func TestUserData_FetchesUpTree(t *testing.T) {
621625
}).
622626
Return(&matchingservice.GetTaskQueueUserDataResponse{
623627
UserData: data1,
624-
}, nil).MaxTimes(maxFastUserDataFetches)
628+
}, nil).MaxTimes(maxFastUserDataFetches + 1)
625629

626630
m := createUserDataManager(t, controller, tqCfg)
627631
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success
@@ -672,7 +676,7 @@ func TestUserData_FetchesActivityToWorkflow(t *testing.T) {
672676
}).
673677
Return(&matchingservice.GetTaskQueueUserDataResponse{
674678
UserData: data1,
675-
}, nil).MaxTimes(maxFastUserDataFetches)
679+
}, nil).MaxTimes(maxFastUserDataFetches + 1)
676680

677681
m := createUserDataManager(t, controller, tqCfg)
678682
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success
@@ -727,7 +731,7 @@ func TestUserData_FetchesStickyToNormal(t *testing.T) {
727731
}).
728732
Return(&matchingservice.GetTaskQueueUserDataResponse{
729733
UserData: data1,
730-
}, nil).MaxTimes(maxFastUserDataFetches)
734+
}, nil).MaxTimes(maxFastUserDataFetches + 1)
731735

732736
m := createUserDataManager(t, controller, tqCfg)
733737
m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success

0 commit comments

Comments
 (0)