Skip to content

Commit d8e8685

Browse files
authored
VersionMembershipCache: Metrics and refactorings! (temporalio#8894)
## What changed? - Added metrics, such as cache hits and cache misses, so that we can understand if the currently set TTL for this cache (of 1 second) is too low or too high. - Also did some re-factorings: While working on this, I realized that it was much simpler to add a new wrapper with a metrics handler, that specifically served the use case of understanding cache hits and missed, rather than use any of the existing implementations of caches that have a metrics handler attached to them. (See *NewWithMetrics*) - Thus, I took some inspiration from the way *newEventsCache* was implemented and came up with this. ## Why? - Explained above. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks - None. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Introduces a typed `VersionMembershipCache` wrapper with metrics and refactors callers to use it. > > - New `common/worker_versioning/version_membership_cache.go`: typed cache API with `Get`/`Put`, emits `cache_requests` and `cache_miss` with `cache_type=version_membership` and op scopes `VersionMembershipCacheGet`/`Put` > - Metrics: add `VersionMembershipCacheTypeTagValue` and new operation scopes in `metrics/metric_defs.go` > - Refactor: replace `cache.Cache` with `VersionMembershipCache` in worker versioning validation and history APIs (`startworkflow`, `signalwithstartworkflow`, `resetworkflow`, `multioperation`, `updateworkflowoptions`), update DI to provide wrapped cache in `service/history/fx.go` > - History engine constructors and factory updated to accept the new interface > - Tests: add lightweight in-memory and noop implementations; update unit tests to use the new interface > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 1ce3a10. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 32f2b95 commit d8e8685

13 files changed

Lines changed: 251 additions & 107 deletions

File tree

common/metrics/metric_defs.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const (
4343

4444
MutableStateCacheTypeTagValue = "mutablestate"
4545
EventsCacheTypeTagValue = "events"
46+
VersionMembershipCacheTypeTagValue = "version_membership"
4647
NexusEndpointRegistryReadThroughCacheTypeTagValue = "nexus_endpoint_registry_readthrough"
4748

4849
InvalidHistoryURITagValue = "invalid_history_uri"
@@ -450,6 +451,10 @@ const (
450451
OperationMemoryScheduledQueueProcessorScope = "MemoryScheduledQueueProcessor"
451452
// OperationOutboundQueueProcessorScope is a scope for the outbound queue processor.
452453
OperationOutboundQueueProcessorScope = "OutboundQueueProcessor"
454+
// VersionMembershipCacheGetScope is the scope used by version membership cache
455+
VersionMembershipCacheGetScope = "VersionMembershipCacheGet"
456+
// VersionMembershipCachePutScope is the scope used by version membership cache
457+
VersionMembershipCachePutScope = "VersionMembershipCachePut"
453458
)
454459

455460
// Matching Scope
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
//nolint:staticcheck
2+
package worker_versioning
3+
4+
import (
5+
enumspb "go.temporal.io/api/enums/v1"
6+
"go.temporal.io/server/common/cache"
7+
"go.temporal.io/server/common/metrics"
8+
)
9+
10+
// VersionMembershipCache is used to cache results of Matching's CheckTaskQueueVersionMembership
11+
// calls (used internally by the worker versioning pinned override validation).
12+
//
13+
// Implementations are expected to be safe for concurrent use.
14+
type (
15+
VersionMembershipCache interface {
16+
// Get returns (isMember, ok). ok=false means there was no cached value.
17+
Get(
18+
namespaceID string,
19+
taskQueue string,
20+
taskQueueType enumspb.TaskQueueType,
21+
deploymentName string,
22+
buildID string,
23+
) (isMember bool, ok bool)
24+
25+
Put(
26+
namespaceID string,
27+
taskQueue string,
28+
taskQueueType enumspb.TaskQueueType,
29+
deploymentName string,
30+
buildID string,
31+
isMember bool,
32+
)
33+
}
34+
35+
versionMembershipCacheKey struct {
36+
namespaceID string
37+
taskQueue string
38+
taskQueueType enumspb.TaskQueueType
39+
deploymentName string
40+
buildID string
41+
}
42+
43+
VersionMembershipCacheImpl struct {
44+
cache.Cache
45+
metricsHandler metrics.Handler
46+
}
47+
)
48+
49+
// NewVersionMembershipCache wraps the provided cache with a typed API and metrics.
50+
func NewVersionMembershipCache(c cache.Cache, metricsHandler metrics.Handler) VersionMembershipCache {
51+
h := metricsHandler.WithTags(metrics.CacheTypeTag(metrics.VersionMembershipCacheTypeTagValue))
52+
return &VersionMembershipCacheImpl{
53+
Cache: c,
54+
metricsHandler: h,
55+
}
56+
}
57+
58+
func (c *VersionMembershipCacheImpl) Get(
59+
namespaceID string,
60+
taskQueue string,
61+
taskQueueType enumspb.TaskQueueType,
62+
deploymentName string,
63+
buildID string,
64+
) (isMember bool, ok bool) {
65+
handler := c.metricsHandler.WithTags(metrics.OperationTag(metrics.VersionMembershipCacheGetScope), metrics.NamespaceIDTag(namespaceID))
66+
metrics.CacheRequests.With(handler).Record(1)
67+
68+
key := versionMembershipCacheKey{
69+
namespaceID: namespaceID,
70+
taskQueue: taskQueue,
71+
taskQueueType: taskQueueType,
72+
deploymentName: deploymentName,
73+
buildID: buildID,
74+
}
75+
v := c.Cache.Get(key)
76+
if v == nil {
77+
metrics.CacheMissCounter.With(handler).Record(1)
78+
return false, false
79+
}
80+
isMember, ok = v.(bool)
81+
if !ok {
82+
// Unexpected type: treat as miss to avoid false positives.
83+
metrics.CacheMissCounter.With(handler).Record(1)
84+
return false, false
85+
}
86+
return isMember, true
87+
}
88+
89+
func (c *VersionMembershipCacheImpl) Put(
90+
namespaceID string,
91+
taskQueue string,
92+
taskQueueType enumspb.TaskQueueType,
93+
deploymentName string,
94+
buildID string,
95+
isMember bool,
96+
) {
97+
handler := c.metricsHandler.WithTags(metrics.OperationTag(metrics.VersionMembershipCachePutScope), metrics.NamespaceIDTag(namespaceID))
98+
metrics.CacheRequests.With(handler).Record(1)
99+
100+
key := versionMembershipCacheKey{
101+
namespaceID: namespaceID,
102+
taskQueue: taskQueue,
103+
taskQueueType: taskQueueType,
104+
deploymentName: deploymentName,
105+
buildID: buildID,
106+
}
107+
c.Cache.Put(key, isMember)
108+
}

common/worker_versioning/worker_versioning.go

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"go.temporal.io/server/api/matchingservice/v1"
2121
persistencespb "go.temporal.io/server/api/persistence/v1"
2222
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
23-
"go.temporal.io/server/common/cache"
2423
"go.temporal.io/server/common/namespace"
2524
"go.temporal.io/server/common/persistence/visibility/manager"
2625
"go.temporal.io/server/common/resource"
@@ -530,28 +529,25 @@ func ExtractVersioningBehaviorFromOverride(override *workflowpb.VersioningOverri
530529
func validatePinnedVersionInTaskQueue(ctx context.Context,
531530
pinnedVersion *deploymentpb.WorkerDeploymentVersion,
532531
matchingClient resource.MatchingClient,
533-
versionMembershipCache cache.Cache,
532+
versionMembershipCache VersionMembershipCache,
534533
tq string,
535534
tqType enumspb.TaskQueueType,
536535
namespaceID string) error {
537536

538537
// Check if we have recently queried matching to validate if this version exists in the task queue.
539-
key := versionMembershipCacheKey{
540-
namespaceID: namespaceID,
541-
taskQueue: tq,
542-
taskQueueType: tqType,
543-
deploymentName: pinnedVersion.DeploymentName,
544-
buildID: pinnedVersion.BuildId,
545-
}
546-
if cached := versionMembershipCache.Get(key); cached != nil {
547-
if isMember, ok := cached.(bool); ok {
548-
if isMember {
549-
return nil
550-
}
551-
return serviceerror.NewFailedPrecondition(
552-
"Pinned version is not present in the task queue",
553-
)
538+
if isMember, ok := versionMembershipCache.Get(
539+
namespaceID,
540+
tq,
541+
tqType,
542+
pinnedVersion.DeploymentName,
543+
pinnedVersion.BuildId,
544+
); ok {
545+
if isMember {
546+
return nil
554547
}
548+
return serviceerror.NewFailedPrecondition(
549+
"Pinned version is not present in the task queue",
550+
)
555551
}
556552

557553
resp, err := matchingClient.CheckTaskQueueVersionMembership(ctx, &matchingservice.CheckTaskQueueVersionMembershipRequest{
@@ -565,7 +561,14 @@ func validatePinnedVersionInTaskQueue(ctx context.Context,
565561
}
566562

567563
// Add result to cache
568-
versionMembershipCache.Put(key, resp.GetIsMember())
564+
versionMembershipCache.Put(
565+
namespaceID,
566+
tq,
567+
tqType,
568+
pinnedVersion.DeploymentName,
569+
pinnedVersion.BuildId,
570+
resp.GetIsMember(),
571+
)
569572
if !resp.GetIsMember() {
570573
return serviceerror.NewFailedPrecondition(
571574
"Pinned version is not present in the task queue",
@@ -574,18 +577,10 @@ func validatePinnedVersionInTaskQueue(ctx context.Context,
574577
return nil
575578
}
576579

577-
type versionMembershipCacheKey struct {
578-
namespaceID string
579-
taskQueue string
580-
taskQueueType enumspb.TaskQueueType
581-
deploymentName string
582-
buildID string
583-
}
584-
585580
func ValidateVersioningOverride(ctx context.Context,
586581
override *workflowpb.VersioningOverride,
587582
matchingClient resource.MatchingClient,
588-
versionMembershipCache cache.Cache,
583+
versionMembershipCache VersionMembershipCache,
589584
tq string,
590585
tqType enumspb.TaskQueueType,
591586
namespaceID string) error {

0 commit comments

Comments
 (0)