Skip to content

Commit b38c880

Browse files
committed
feat(queuev2): add read level sync loop to cachedScheduledQueue
Adds a readLevelSyncLoop goroutine to cachedScheduledQueue that ticks every TimerProcessorCacheReadLevelSyncInterval (default 1s) and calls reader.UpdateReadLevel(virtualQueueManager.GetMinReadLevel()). This keeps the cache lower bound within ~1s of actual processing progress, compared to the previous ~30s lag from updateQueueStateFn which is gated on DB writes. New config: TimerProcessorCacheReadLevelSyncInterval (global, default 1s). Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
1 parent a2adcb6 commit b38c880

4 files changed

Lines changed: 60 additions & 2 deletions

File tree

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3064,6 +3064,14 @@ const (
30643064
// Default value: 1s (1*time.Second)
30653065
// Allowed filters: N/A
30663066
TimerProcessorCacheTimeEvictionInterval
3067+
// TimerProcessorCacheReadLevelSyncInterval is how often the cached scheduled
3068+
// queue syncs the read level from the virtual queue manager to the cache reader,
3069+
// evicting tasks the processor has already passed.
3070+
// KeyName: history.timerProcessorCacheReadLevelSyncInterval
3071+
// Value type: Duration
3072+
// Default value: 1s (1*time.Second)
3073+
// Allowed filters: N/A
3074+
TimerProcessorCacheReadLevelSyncInterval
30673075
// TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer
30683076
// failover queue processing. The actual jitter interval used will be a random duration between
30693077
// 0 and the max interval so that timer failover queue across different shards won't start at
@@ -5754,6 +5762,11 @@ var DurationKeys = map[DurationKey]DynamicDuration{
57545762
Description: "TimerProcessorCacheTimeEvictionInterval is how often the time-based eviction loop fires",
57555763
DefaultValue: time.Second,
57565764
},
5765+
TimerProcessorCacheReadLevelSyncInterval: {
5766+
KeyName: "history.timerProcessorCacheReadLevelSyncInterval",
5767+
Description: "TimerProcessorCacheReadLevelSyncInterval is how often the cached scheduled queue syncs the read level to the cache reader",
5768+
DefaultValue: time.Second,
5769+
},
57575770
TransferProcessorFailoverMaxStartJitterInterval: {
57585771
KeyName: "history.transferProcessorFailoverMaxStartJitterInterval",
57595772
Description: "TransferProcessorFailoverMaxStartJitterInterval is the max jitter interval for starting transfer failover queue processing. The actual jitter interval used will be a random duration between 0 and the max interval so that timer failover queue across different shards won't start at the same time",

service/history/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ type Config struct {
166166
TimerProcessorCacheEvictionSafeWindow dynamicproperties.DurationPropertyFn
167167
TimerProcessorCacheMinPrefetchInterval dynamicproperties.DurationPropertyFn
168168
TimerProcessorCacheTimeEvictionInterval dynamicproperties.DurationPropertyFn
169+
TimerProcessorCacheReadLevelSyncInterval dynamicproperties.DurationPropertyFn
169170

170171
// TransferQueueProcessor settings
171172
TransferTaskBatchSize dynamicproperties.IntPropertyFn
@@ -481,6 +482,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
481482
TimerProcessorCacheEvictionSafeWindow: dc.GetDurationProperty(dynamicproperties.TimerProcessorCacheEvictionSafeWindow),
482483
TimerProcessorCacheMinPrefetchInterval: dc.GetDurationProperty(dynamicproperties.TimerProcessorCacheMinPrefetchInterval),
483484
TimerProcessorCacheTimeEvictionInterval: dc.GetDurationProperty(dynamicproperties.TimerProcessorCacheTimeEvictionInterval),
485+
TimerProcessorCacheReadLevelSyncInterval: dc.GetDurationProperty(dynamicproperties.TimerProcessorCacheReadLevelSyncInterval),
484486
TransferTaskBatchSize: dc.GetIntProperty(dynamicproperties.TransferTaskBatchSize),
485487
TransferTaskDeleteBatchSize: dc.GetIntProperty(dynamicproperties.TransferTaskDeleteBatchSize),
486488
TransferProcessorFailoverMaxStartJitterInterval: dc.GetDurationProperty(dynamicproperties.TransferProcessorFailoverMaxStartJitterInterval),

service/history/config/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func TestNewConfig(t *testing.T) {
148148
"TimerProcessorCacheEvictionSafeWindow": {dynamicproperties.TimerProcessorCacheEvictionSafeWindow, time.Second},
149149
"TimerProcessorCacheMinPrefetchInterval": {dynamicproperties.TimerProcessorCacheMinPrefetchInterval, time.Second},
150150
"TimerProcessorCacheTimeEvictionInterval": {dynamicproperties.TimerProcessorCacheTimeEvictionInterval, time.Second},
151+
"TimerProcessorCacheReadLevelSyncInterval": {dynamicproperties.TimerProcessorCacheReadLevelSyncInterval, time.Second},
151152
"TransferTaskBatchSize": {dynamicproperties.TransferTaskBatchSize, 47},
152153
"TransferTaskDeleteBatchSize": {dynamicproperties.TransferTaskDeleteBatchSize, 48},
153154
"TransferProcessorCompleteTransferFailureRetryCount": {dynamicproperties.TransferProcessorCompleteTransferFailureRetryCount, 49},

service/history/queuev2/queue_scheduled_cached.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,20 @@ package queuev2
2424

2525
import (
2626
"context"
27+
"sync"
2728

29+
"github.com/uber/cadence/common/clock"
30+
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
2831
hcommon "github.com/uber/cadence/service/history/common"
2932
)
3033

3134
type cachedScheduledQueue struct {
3235
*scheduledQueue
33-
reader CachedQueueReader
36+
reader CachedQueueReader
37+
readLevelSyncCancel context.CancelFunc
38+
readLevelSyncWg sync.WaitGroup
39+
readLevelInterval dynamicproperties.DurationPropertyFn
40+
clock clock.TimeSource
3441
}
3542

3643
func newCachedScheduledQueue(inner *scheduledQueue, reader CachedQueueReader) Queue {
@@ -44,7 +51,13 @@ func newCachedScheduledQueue(inner *scheduledQueue, reader CachedQueueReader) Qu
4451
reader.UpdateReadLevel(inner.base.virtualQueueManager.GetMinReadLevel())
4552
}
4653

47-
return &cachedScheduledQueue{scheduledQueue: inner, reader: reader}
54+
config := inner.base.shard.GetConfig()
55+
return &cachedScheduledQueue{
56+
scheduledQueue: inner,
57+
reader: reader,
58+
readLevelInterval: config.TimerProcessorCacheReadLevelSyncInterval,
59+
clock: inner.base.timeSource,
60+
}
4861
}
4962

5063
func (q *cachedScheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
@@ -55,9 +68,38 @@ func (q *cachedScheduledQueue) NotifyNewTask(clusterName string, info *hcommon.N
5568
func (q *cachedScheduledQueue) Start() {
5669
q.reader.Start()
5770
q.scheduledQueue.Start()
71+
72+
ctx, cancel := context.WithCancel(context.Background())
73+
q.readLevelSyncCancel = cancel
74+
q.readLevelSyncWg.Add(1)
75+
go q.readLevelSyncLoop(ctx)
5876
}
5977

6078
func (q *cachedScheduledQueue) Stop() {
79+
q.readLevelSyncCancel()
80+
q.readLevelSyncWg.Wait()
81+
6182
q.scheduledQueue.Stop()
6283
q.reader.Stop()
6384
}
85+
86+
// readLevelSyncLoop periodically syncs the virtual queue manager's current read
87+
// level to the cache reader, evicting tasks the processor has already passed.
88+
// This runs more frequently than updateQueueStateFn (which is gated on DB writes)
89+
// so the cache lower bound stays close to actual processing progress.
90+
func (q *cachedScheduledQueue) readLevelSyncLoop(ctx context.Context) {
91+
defer q.readLevelSyncWg.Done()
92+
93+
timer := q.clock.NewTimer(q.readLevelInterval())
94+
defer timer.Stop()
95+
96+
for {
97+
select {
98+
case <-ctx.Done():
99+
return
100+
case <-timer.Chan():
101+
q.reader.UpdateReadLevel(q.base.virtualQueueManager.GetMinReadLevel())
102+
timer.Reset(q.readLevelInterval())
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)