Skip to content

feat(queuev2): cached timer queue reader with prefetch and time-based eviction#7962

Open
arzonus wants to merge 12 commits intocadence-workflow:masterfrom
arzonus:implement-cached-queue-reader-and-scheduled-queue
Open

feat(queuev2): cached timer queue reader with prefetch and time-based eviction#7962
arzonus wants to merge 12 commits intocadence-workflow:masterfrom
arzonus:implement-cached-queue-reader-and-scheduled-queue

Conversation

@arzonus
Copy link
Copy Markdown
Contributor

@arzonus arzonus commented Apr 16, 2026

What changed?

Adds a cached layer over the timer (scheduled) queue reader that keeps a look-ahead window of upcoming tasks in memory, eliminating repeated DB reads on the hot path. Relates to #7953.

Replaces four stacked PRs (#7954 #7955 #7956 #7957) with a single consolidated draft for easier review iteration.

Components:

InMemQueue (queue_mem.go) — sorted in-memory task store with PutTasks, GetTasks, LookAHead, LTrim, RTrimBySize, Clear. Deduplicates inserts by task key. RTrimBySize nils evicted interface slots for GC.

CachedQueueReader (queue_reader_cached.go) — wraps any QueueReader with an in-memory look-ahead cache. Three rollout modes: disabled (default), shadow (cache runs, DB always wins, mismatches logged), enabled (cache serves reads). Key behaviours: prefetch loop, time-eviction loop, Inject for new tasks, UpdateReadLevel for ack-level eviction. injectAllowedAfter is computed once at construction (now + WarmupGracePeriod) — no mutex needed.

CachedScheduledQueue (queue_scheduled_cached.go) — thin wrapper wiring NotifyNewTask → Inject, updateQueueStateFn → UpdateReadLevel, and reader lifecycle into the existing scheduled queue.

Config — nine new global dynamic config properties (no ShardID filter, apply uniformly per host). Feature flag: TimerProcessorEnableCachedScheduledQueue (default: off).

Simulation scenarios — two new scenarios (enabled + shadow mode) requiring #7952 for infra.

Why?

The timer queue currently issues a DB round-trip for every virtual slice read. For scheduled tasks most reads land within a predictable look-ahead window. A pre-fetched in-memory cache removes those round-trips in the common case.

How did you test it?

go test -race -count=100 -timeout=600s ./service/history/queuev2/...
make pr GEN_DIR=service/history/queuev2
make build

Unit test coverage for new code: 87.4%. Tests cover prefetch, eviction, shadow mismatch, inject races, lifecycle, RTrim guard (both shrink and raise cases).

Simulation:

./simulation/history/run.sh --scenario queuev2_scheduled_cache_shadow --dockerfile-suffix .local
./simulation/history/run.sh --scenario queuev2_scheduled_cache_enabled --dockerfile-suffix .local

Potential risks

  • Feature-flagged behind TimerProcessorEnableCachedScheduledQueue (default: false) and TimerProcessorCachedQueueReaderMode (default: disabled). Zero code path changes when flags are off.
  • When enabled: two background goroutines per shard (prefetch + time-eviction). Memory bounded by TimerProcessorCacheMaxSize (default: 1000 tasks per shard).

Release notes

N/A — internal change behind a feature flag.

Documentation Changes

N/A

Comment thread service/history/queuev2/queue_reader_cached.go
@arzonus arzonus force-pushed the implement-cached-queue-reader-and-scheduled-queue branch from 180eedb to 206586b Compare April 16, 2026 07:29
Comment thread service/history/queuev2/queue_reader_cached.go Outdated
Comment thread service/history/queuev2/queue_mem.go
@arzonus arzonus force-pushed the implement-cached-queue-reader-and-scheduled-queue branch from 206586b to 308bee5 Compare April 16, 2026 08:20
Comment thread service/history/queuev2/queue_reader_cached.go Outdated
Comment thread service/history/queuev2/queue_reader_cached.go Outdated
Comment thread service/history/queuev2/queue_mem.go
@arzonus arzonus force-pushed the implement-cached-queue-reader-and-scheduled-queue branch from 308bee5 to 4bee2d2 Compare April 16, 2026 10:34
Comment thread service/history/queuev2/queue_reader_cached.go
Comment thread service/history/queuev2/queue_reader_cached.go Outdated
Comment thread service/history/queuev2/queue_reader_cached.go
Comment on lines +427 to +432
// putTasks adds tasks to the cache and enforces the size cap.
// Caller must hold q.mu.
// putTasks adds tasks to the cache and enforces the size cap.
// Returns true if RTrimBySize fired and updated exclusiveUpperBound,
// meaning the caller must not re-advance the bound.
// Caller must hold q.mu.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: Duplicate doc comment on putTasks

Lines 427-428 and 429-432 both start with // putTasks adds tasks to the cache and enforces the size cap. and // Caller must hold q.mu.. The old comment block (lines 427-428) was left in place when the new, expanded comment block (lines 429-432) was added.

Suggested fix:

Remove the duplicate lines 427-428, keeping only the fuller comment block (lines 429-432):

// putTasks adds tasks to the cache and enforces the size cap.
// Returns true if RTrimBySize fired and updated exclusiveUpperBound,
// meaning the caller must not re-advance the bound.
// Caller must hold q.mu.
func (q *cachedQueueReader) putTasks(tasks []persistence.Task) bool {

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

arzonus added 11 commits April 17, 2026 12:41
… eviction

Adds a cached layer over the timer (scheduled) queue reader that keeps
a look-ahead window of upcoming tasks in memory, eliminating repeated
DB reads on the hot path. Relates to cadence-workflow#7953.

## Components

### InMemQueue (queue_mem.go)
Sorted in-memory task store. GetTasks guards PageSize<=0 and breaks early.
RTrimBySize nils evicted slots for GC and resets exclusiveUpperBound when
the queue is emptied.

### CachedQueueReader (queue_reader_cached.go)
Three modes: disabled (default), shadow, enabled.
  isEnabled/isShadow — strict single-value checks
  isDisabled — catch-all: returns true for 'disabled' and any unrecognised
               mode (Warn log), ensuring misconfigurations default to safe

Key design:
- prefetchLoop: fires after WarmupGracePeriod; returns nil for cache-full
  so the loop reschedules quietly without Warn noise
- RTrim guard: after putTasks, if exclusiveUpperBound changed (!Equal, catches
  both shrink and raise cases), skip re-advancing to avoid false coverage
- putTasks resets exclusiveUpperBound to MinimumHistoryTaskKey when RTrim
  empties the cache (MaxSize<=0)
- injectAllowedAfter: set once in constructor (now+WarmupGracePeriod),
  no mutex needed; isInWarmup() is a plain clock.Now().Before() check
- ctx/cancel initialized in constructor to avoid nil-cancel race with Stop
- LookAHead uses strict Less on upper bound (exclusive) and checks
  inclusiveLowerBound so evicted tasks fall through to DB

Shadow mode comparison:
  findMismatchesInShadow — pure function returning findMismatchesInShadowResult
    with MissingFromCache, ExtraInCache, NextKeyMismatch, HasMismatches
  reportShadowComparison — logs + increments metric from result
  getTaskInShadow — orchestrates DB fetch, live cache re-read, and both above
  preFetchLowerBound filters eviction between snapshot and live re-read to
  suppress false mismatch warnings

### Config (service/history/config, dynamicconfig/dynamicproperties)
Nine new global dynamic config properties (no ShardID filter).
Feature flag: TimerProcessorEnableCachedScheduledQueue (default: off).

### CachedScheduledQueue (queue_scheduled_cached.go)
Wires NotifyNewTask→Inject, updateQueueStateFn→UpdateReadLevel,
and reader lifecycle into the existing scheduled queue.

### Simulation scenarios
Two new scenarios (enabled + shadow mode) requiring cadence-workflow#7952 for infra.

## Tests
Coverage: 87.4% on new code. Tests cover prefetch scheduling, time
eviction, shadow mismatch detection (bidirectional, NextTaskKey,
preFetchLowerBound filtering), inject races, lifecycle, all three modes,
RTrim guards (shrink, raise, empty-cache reset), LookAHead bounds,
warmup gating, unknown mode fallback, compareTasksInShadow unit tests.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
GetTask warmup bypass
  GetTask now delegates to the base reader when isInWarmup() is true,
  same as disabled mode. Previously the prefetch loop had not fully
  populated the cache yet, but GetTask would still attempt to serve from
  it. The result was stale/partial responses and spurious 'cache misses'
  that were really just warmup bypasses.

ExtraInCache excluded from HasMismatches
  Tasks in the cache but absent from the DB are benign: Inject writes
  tasks to the cache as soon as NotifyNewTask fires, before the DB
  read in the shadow path completes. Due to task independence, having
  extra tasks in cache causes no harm. ExtraInCache is still tracked and
  logged at Debug for observability but no longer sets HasMismatches or
  triggers the mismatch Warn/counter.

Reason tag on bound advances
  updateExclusiveUpperBound and updateInclusiveLowerBound now accept a
  reason string logged alongside prevBound/newBound:
    prefetch-partial-page, prefetch-full-page — prefetch advancing window
    gap-detected-reset — gap reset after concurrent bound change
    rtrim-shrink, rtrim-empty — RTrim narrowing or clearing cache
    time-eviction — periodic time-based eviction loop
    ack-level-update — UpdateReadLevel from queue state persist

Tests
  GetTask_Enabled and GetTask_Shadow set r.injectAllowedAfter = time.Time{}
  so unit tests skip the warmup check and exercise cache logic directly.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…dLevel

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
1. extraInCache moved to mismatchTags (logged on mismatch only)
2. All mismatch log fields prefixed shadowMismatch.*
3. HasMismatches restored to include ExtraInCache — still a mismatch
   even if benign, for consistent observability
4. ack-level-update → read-level-update in updateInclusiveLowerBound
5. LookAHead now delegates to base during warmup, matching GetTask

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Collapse the partial/full-page if/else into a single target computation:
  target := exclusiveMaxKey (partial page)
  target = NextTaskKey     (full page)

One updateExclusiveUpperBound call, one reason tag 'prefetch-advance'.
Drop the verbose comment block — the early return on trim is self-explanatory.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…efetch

putTasks now returns true when RTrimBySize fired and updated the upper
bound. prefetch uses this directly instead of comparing exclusiveUpperBound
before and after the call, removing the prevUpper variable and the
verbose !Equal guard.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
… reset

updateExclusiveUpperBound: now logs inclusiveLowerBound + cacheSize
for context, and records the size histogram.

updateInclusiveLowerBound: now a pure setter with log + metrics.
No advance guard, no LTrim — safe to call for resets too.

advanceInclusiveLowerBound: new helper wrapping the advance-only
logic (upper-bound cap, advance guard, LTrim) then calls
updateInclusiveLowerBound.

Gap detection: uses updateInclusiveLowerBound directly after Clear()
since going backwards is intentional and no LTrim is needed.

timeEvict + UpdateReadLevel: fixed to call advanceInclusiveLowerBound
(were calling updateInclusiveLowerBound after the rename, which skipped
the LTrim — tasks below the eviction point were never removed).

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
…fy Inject guard

Gap detection now calls updateInclusiveLowerBound (the simple setter)
instead of assigning inclusiveLowerBound directly. No LTrim needed since
the queue is already cleared, and the setter accepts backwards resets.

Inject: combine isDisabled and isInWarmup into a single guard condition.
Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
ExtraInCache (tasks in cache not in DB) is benign given task independence.
It now logs at Info without incrementing the mismatch counter, so it is
still observable without polluting the Warn metric used for alerting.

MissingFromCache and NextKeyMismatch are real divergences: they log at Warn
and increment the counter as before.

shadowMismatch.dbTaskCount and shadowMismatch.cacheTaskCount moved into the
mismatch tag set so they are only emitted when there is something to act on.
The match path logs a bare Debug with no extra fields.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
Adds a readLevelSyncLoop goroutine to cachedScheduledQueue that ticks
every TimerProcessorCacheReadLevelSyncInterval (default 1s) and calls
reader.UpdateReadLevel(virtualQueueManager.GetMinReadLevel()).

This keeps the cache lower bound within ~1s of actual processing
progress, compared to the previous ~30s lag from updateQueueStateFn
which is gated on DB writes.

New config: TimerProcessorCacheReadLevelSyncInterval (global, default 1s).

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
@arzonus arzonus force-pushed the implement-cached-queue-reader-and-scheduled-queue branch from 09e3939 to b38c880 Compare April 17, 2026 10:42
…vent startup false coverage

At shard takeover, the cached reader starts with inclusiveLowerBound =
MinimumHistoryTaskKey and exclusiveUpperBound = MinimumHistoryTaskKey.
After the first prefetch, exclusiveUpperBound jumps to now + MaxLookAheadWindow
but inclusiveLowerBound stays at MinimumHistoryTaskKey until timeEvict
catches up (~60s).

During this window, isRangeCovered returns true for ALL historical tasks
(from the beginning of time to the cache ceiling). Tasks from the previous
shard owner exist in DB but not in the fresh cache. GetTask returns 0,
the virtual slice pops its progress entry, and those tasks are permanently
skipped at shard takeover.

Fix: after the first prefetch, advance inclusiveLowerBound to the fetch
anchor (now - EvictionSafeWindow). Tasks before that anchor now correctly
miss the cache and fall through to the DB.

Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Apr 17, 2026

CI failed: The CI build failed due to flaky integration tests emitting background logs after test completion. These 'logged too late' errors indicate incomplete lifecycle teardown for the global ratelimiter component.

Overview

Multiple integration tests failed due to background goroutine logs emitted after the tests completed, specifically originating from the global-ratelimiter's backgroundUpdateLoop. This pattern indicates a race condition between test cleanup and component shutdown.

Failures

Flaky Integration Test Teardown (confidence: high)

  • Type: flaky_test
  • Affected jobs: 71836747679
  • Related to change: no
  • Root cause: The testlogger detects debug logs (e.g., "update tick") from the global-ratelimiter package occurring after the test t.done signal. The background goroutines are not being explicitly stopped in the TearDown phase of the affected test suites (TestTaskListSuite, TestWorkflowIDRateLimitIntegrationSuite, etc.).
  • Suggested fix: Ensure the global-ratelimiter components are stopped during test cleanup. Review TearDownTest or TearDownSuite in the failing integration tests to ensure all asynchronous background workers are signaled to shut down before the test context is closed.

Summary

  • Change-related failures: 0
  • Infrastructure/flaky failures: 1 (Flaky teardown in multiple integration test suites)
  • Recommended action: The reported failures are pre-existing flakiness related to test lifecycle management, not the current PR changes. Developers should verify if the PR changes exacerbate this race condition, otherwise, this is likely an environment or infrastructure-test-harness issue.
Code Review 👍 Approved with suggestions 9 resolved / 10 findings

Introduces a cached timer queue reader with prefetch and eviction, resolving several concurrency and performance issues with bulk inserts and dynamic config reads. Please address the duplicate doc comments on putTasks.

💡 Quality: Duplicate doc comment on putTasks

📄 service/history/queuev2/queue_reader_cached.go:427-432

Lines 427-428 and 429-432 both start with // putTasks adds tasks to the cache and enforces the size cap. and // Caller must hold q.mu.. The old comment block (lines 427-428) was left in place when the new, expanded comment block (lines 429-432) was added.

Suggested fix
Remove the duplicate lines 427-428, keeping only the fuller comment block (lines 429-432):

// putTasks adds tasks to the cache and enforces the size cap.
// Returns true if RTrimBySize fired and updated exclusiveUpperBound,
// meaning the caller must not re-advance the bound.
// Caller must hold q.mu.
func (q *cachedQueueReader) putTasks(tasks []persistence.Task) bool {
✅ 9 resolved
Edge Case: putTasks doesn't reset exclusiveUpperBound when RTrim empties cache

📄 service/history/queuev2/queue_reader_cached.go:426-435 📄 service/history/queuev2/queue_mem.go:191-195
When RTrimBySize(maxSize) is called with maxSize <= 0 (e.g. a dynamic config misconfiguration), it empties the queue and returns (MinimumHistoryTaskKey, true). In putTasks, the guard trimmed && newUpper.Greater(persistence.MinimumHistoryTaskKey) evaluates to true && false = false, so exclusiveUpperBound is not reset. The cache then thinks it covers [inclusiveLowerBound, oldExclusiveUpperBound) but holds zero tasks, causing GetTask to return empty results for ranges that may contain tasks in the DB.

This only triggers with MaxSize <= 0, which is unlikely with the default of 1000, but a defensive check would prevent silent data loss on misconfiguration.

Quality: Mode string comparison has no validation for unknown values

📄 service/history/queuev2/queue_reader_cached.go:271-272
isDisabled() checks Mode() == "disabled" and isShadow() checks Mode() == "shadow". Any typo or unknown mode value (e.g. "enabeld") silently falls through to the enabled code path, which serves reads from cache. This could cause unintended behavior if someone misconfigures the dynamic config string.

Consider adding a log warning or defaulting to disabled for unknown values, particularly in GetTask and LookAHead where the mode drives correctness.

Edge Case: Shadow mode false mismatches from cache eviction between reads

📄 service/history/queuev2/queue_reader_cached.go:604-618
In getTaskInShadow, the initial cache snapshot is taken under RLock (line 588-589), then the lock is released before querying the DB. If a mismatch is found, the cache is re-read (line 641-647) to filter benign races. However, between the two cache reads, timeEvict or UpdateReadLevel can advance inclusiveLowerBound and LTrim the cache, removing tasks that were in the original snapshot. A DB task that was in the first snapshot but got evicted before the second read would appear as a realMissingKey, creating a false mismatch alarm.

Since shadow mode is observational-only (DB result is always returned), this doesn't affect correctness — only alert noise. But spurious mismatch counters could erode confidence in the cache during rollout.

Quality: InMemQueue.PutTasks is O(n*m) for unsorted bulk inserts

📄 service/history/queuev2/queue_mem.go:98-102
PutTasks calls putTask in a loop, and each putTask for an out-of-order insert does a binary search + copy shift. When inserting m tasks into a queue of n elements, and the tasks aren't pre-sorted, this is O(m * (log n + n)) in the worst case. For prefetch pages (up to 100 tasks by default into a 1000-element queue), this is acceptable, but if page sizes or cache sizes grow it could become a bottleneck.

The fast-path append optimization handles the common case of sorted inserts well. This is just a note for future scaling — a batch sort-merge would be O((n+m) log(n+m)).

Edge Case: Non-atomic dynamic config Mode() reads can cause inconsistency

📄 service/history/queuev2/queue_reader_cached.go:567 📄 service/history/queuev2/queue_reader_cached.go:594
In GetTask (line 567), isDisabled() is checked first, then isShadow() is checked at line 594. Both call q.options.Mode() independently. Since this is a dynamic config property that can change between calls, a mode transition (e.g., shadow→enabled) between the two reads could cause the code to serve a cache result directly when the operator intended shadow mode, or vice versa. The window is small and the impact is transient (one request), but it's worth noting.

Similarly, prefetch() checks isDisabled() at line 303, and Inject() checks it at line 529 — both independently reading the mode.

...and 4 more resolved from earlier reviews

🤖 Prompt for agents
Code Review: Introduces a cached timer queue reader with prefetch and eviction, resolving several concurrency and performance issues with bulk inserts and dynamic config reads. Please address the duplicate doc comments on putTasks.

1. 💡 Quality: Duplicate doc comment on putTasks
   Files: service/history/queuev2/queue_reader_cached.go:427-432

   Lines 427-428 and 429-432 both start with `// putTasks adds tasks to the cache and enforces the size cap.` and `// Caller must hold q.mu.`. The old comment block (lines 427-428) was left in place when the new, expanded comment block (lines 429-432) was added.

   Suggested fix:
   Remove the duplicate lines 427-428, keeping only the fuller comment block (lines 429-432):
   
   // putTasks adds tasks to the cache and enforces the size cap.
   // Returns true if RTrimBySize fired and updated exclusiveUpperBound,
   // meaning the caller must not re-advance the bound.
   // Caller must hold q.mu.
   func (q *cachedQueueReader) putTasks(tasks []persistence.Task) bool {

Rules ✅ All requirements met

Repository Rules

GitHub Issue Linking Requirement: The PR description links to issue #7953, satisfying the issue linking requirement.
PR Description Quality Standards: The PR description contains all required sections with substantive content and specific testing commands.

1 rule not applicable. Show all rules by commenting gitar display:verbose.

Tip

Comment Gitar fix CI or enable auto-apply: gitar auto-apply:on

Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant