Skip to content

Commit 180eedb

Browse files
committed
fix(queuev2): broaden RTrim guard to catch raise-upper case
The previous guard used Less(prevUpper) which only caught trims that shrank the upper bound below prevUpper. It missed the case where RTrimBySize kept some prefetched tasks (raising the bound above prevUpper) but dropped the newest ones. The re-advance on lines 392-399 then claimed coverage for the dropped tail, causing GetTask to miss tasks that exist in the DB. Changed to !Equal(prevUpper): any change means putTasks called updateExclusiveUpperBound (= a trim occurred), so skip re-advance in all cases. Test: TestCachedQueueReader_PrefetchTrimRaisesUpperBoundAbovePrevUpper simulates inject filling 1 slot during DB fetch, DB returning 3 tasks, RTrim keeping 3 oldest and dropping the 4th, confirming the guard fires even though the trimmed bound is above prevUpper. Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
1 parent 6463d67 commit 180eedb

2 files changed

Lines changed: 63 additions & 9 deletions

File tree

service/history/queuev2/queue_reader_cached.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -371,15 +371,14 @@ func (q *cachedQueueReader) prefetch() error {
371371
prevUpper := q.exclusiveUpperBound
372372
q.putTasks(resp.Tasks)
373373

374-
// putTasks calls RTrimBySize, which can shrink exclusiveUpperBound if
375-
// concurrent Inject calls filled the cache between the capacity snapshot
376-
// and the write-lock acquisition above. When a trim occurred, the upper
377-
// bound already reflects the actual cache contents. Re-advancing it to
378-
// exclusiveMaxKey or NextTaskKey would claim coverage for tasks that were
379-
// just evicted, causing GetTask cache hits to return fewer tasks than exist
380-
// in the DB for that range.
381-
if q.exclusiveUpperBound.Less(prevUpper) {
382-
q.logger.Debug("prefetch skipped upper-bound advance: cache trimmed after concurrent inject",
374+
// putTasks calls RTrimBySize, which calls updateExclusiveUpperBound when a
375+
// trim occurs. Any change to exclusiveUpperBound here means the cache no
376+
// longer holds a contiguous prefix up to NextTaskKey or exclusiveMaxKey, so
377+
// re-advancing would create false coverage. Use Equal (not Less) to catch
378+
// both the shrink case and the case where the trim kept some DB tasks,
379+
// raising the bound above prevUpper but below the target ceiling.
380+
if !q.exclusiveUpperBound.Equal(prevUpper) {
381+
q.logger.Debug("prefetch skipped upper-bound advance: cache trimmed",
383382
tag.Dynamic("trimmedUpper", q.exclusiveUpperBound),
384383
tag.Dynamic("prevUpper", prevUpper),
385384
)

service/history/queuev2/queue_reader_cached_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1545,6 +1545,61 @@ func TestCachedQueueReader_PrefetchTrimPreventsUpperBoundAdvance(t *testing.T) {
15451545
"trimmed upper bound should be less than prevUpper")
15461546
}
15471547

1548+
// TestCachedQueueReader_PrefetchTrimRaisesUpperBoundAdvance verifies the
1549+
// complementary case: when RTrimBySize keeps some prefetched tasks (raising
1550+
// exclusiveUpperBound above prevUpper) but drops the newest ones, the prefetch
1551+
// must NOT re-advance to NextTaskKey or exclusiveMaxKey. Doing so would claim
1552+
// coverage for the dropped tasks, causing GetTask hits to miss them.
1553+
func TestCachedQueueReader_PrefetchTrimRaisesUpperBoundAbovePrevUpper(t *testing.T) {
1554+
now := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
1555+
ts := clock.NewMockedTimeSourceAt(now)
1556+
ctrl := gomock.NewController(t)
1557+
opts := defaultTestOptions()
1558+
// MaxSize=3. prevUpper = now+1min. DB returns 3 tasks (now+2m, now+3m, now+4m).
1559+
// During the DB round-trip, inject adds 1 inject task (now+30s, below prevUpper).
1560+
// After putTasks: queue = inject(30s) + db(2m) + db(3m) + db(4m) = 4 tasks > MaxSize=3.
1561+
// RTrimBySize(3) keeps 3 oldest: inject(30s), db(2m), db(3m); drops db(4m).
1562+
// newUpper = db(3m).Next() > prevUpper (now+1min) → !Equal fires → skip re-advance.
1563+
opts.MaxSize = dynamicproperties.GetIntPropertyFn(3)
1564+
1565+
injectTask := newTestTask(now.Add(30*time.Second), 0)
1566+
dbTask1 := newTestTask(now.Add(2*time.Minute), 1)
1567+
dbTask2 := newTestTask(now.Add(3*time.Minute), 2)
1568+
dbTask3 := newTestTask(now.Add(4*time.Minute), 3) // will be dropped
1569+
prevUpper := persistence.NewHistoryTaskKey(now.Add(1*time.Minute), 0)
1570+
1571+
base := NewMockQueueReader(ctrl)
1572+
queue := newInMemQueue()
1573+
r := newCachedQueueReaderWithOptions(base, queue, opts, ts, testlogger.New(t), metrics.NoopScope)
1574+
r.exclusiveUpperBound = prevUpper
1575+
1576+
base.EXPECT().GetTask(gomock.Any(), gomock.Any()).DoAndReturn(
1577+
func(_ context.Context, _ *GetTaskRequest) (*GetTaskResponse, error) {
1578+
r.mu.Lock()
1579+
r.queue.PutTasks([]persistence.Task{injectTask})
1580+
r.mu.Unlock()
1581+
nextKey := persistence.NewHistoryTaskKey(now.Add(5*time.Minute), 0)
1582+
return &GetTaskResponse{
1583+
Tasks: []persistence.Task{dbTask1, dbTask2, dbTask3},
1584+
Progress: &GetTaskProgress{NextTaskKey: nextKey},
1585+
}, nil
1586+
},
1587+
)
1588+
1589+
err := r.prefetch()
1590+
assert.NoError(t, err)
1591+
1592+
// Queue should hold the 3 oldest: injectTask, dbTask1, dbTask2. dbTask3 dropped.
1593+
assert.Equal(t, 3, r.queue.Len())
1594+
// Upper bound must be dbTask2.Next() — NOT advanced to NextTaskKey (now+5min).
1595+
wantUpper := dbTask2.GetTaskKey().Next()
1596+
assert.Equal(t, wantUpper, r.exclusiveUpperBound,
1597+
"upper bound must not advance past trimmed point even when trim raises bound above prevUpper")
1598+
// Confirm it is above prevUpper (the scenario the original Less check missed).
1599+
assert.True(t, r.exclusiveUpperBound.Greater(prevUpper),
1600+
"trimmed upper is above prevUpper — the case the Less-only guard missed")
1601+
}
1602+
15481603
func TestCachedQueueReader_PrefetchGapDetected(t *testing.T) {
15491604
now := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
15501605
ts := clock.NewMockedTimeSourceAt(now)

0 commit comments

Comments
 (0)