Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent taskIdBlock data race #7470

Draft
wants to merge 1 commit into
base: priority
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions service/matching/backlog_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,14 @@ func (c *backlogManagerImpl) TotalApproximateBacklogCount() int64 {
}

func (c *backlogManagerImpl) InternalStatus() []*taskqueuespb.InternalTaskQueueStatus {
currentTaskIDBlock := c.taskWriter.getTaskIDBlock()
return []*taskqueuespb.InternalTaskQueueStatus{
&taskqueuespb.InternalTaskQueueStatus{
ReadLevel: c.taskAckManager.getReadLevel(),
AckLevel: c.taskAckManager.getAckLevel(),
TaskIdBlock: &taskqueuepb.TaskIdBlock{
// TODO(pri): this is a data race, it should only be read by taskWriterLoop
StartId: c.taskWriter.taskIDBlock.start,
EndId: c.taskWriter.taskIDBlock.end,
StartId: currentTaskIDBlock.start,
EndId: currentTaskIDBlock.end,
},
LoadedTasks: c.taskAckManager.getBacklogCountHint(),
MaxReadLevel: c.db.GetMaxReadLevel(subqueueZero),
Expand Down
15 changes: 7 additions & 8 deletions service/matching/pri_backlog_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,7 @@
}

func (c *priBacklogManagerImpl) InternalStatus() []*taskqueuespb.InternalTaskQueueStatus {
// TODO(pri): this is a data race, it should only be read by taskWriterLoop
idBlock := &taskqueuepb.TaskIdBlock{
StartId: c.taskWriter.taskIDBlock.start,
EndId: c.taskWriter.taskIDBlock.end,
}
currentTaskIDBlock := c.taskWriter.currentTaskIDBlock()

Check warning on line 326 in service/matching/pri_backlog_manager.go

View check run for this annotation

Codecov / codecov/patch

service/matching/pri_backlog_manager.go#L326

Added line #L326 was not covered by tests

c.subqueueLock.Lock()
defer c.subqueueLock.Unlock()
Expand All @@ -336,9 +332,12 @@
for i, r := range c.subqueues {
readLevel, ackLevel := r.getLevels()
status[i] = &taskqueuespb.InternalTaskQueueStatus{
ReadLevel: readLevel,
AckLevel: ackLevel,
TaskIdBlock: idBlock,
ReadLevel: readLevel,
AckLevel: ackLevel,
TaskIdBlock: &taskqueuepb.TaskIdBlock{
StartId: currentTaskIDBlock.start,
EndId: currentTaskIDBlock.end,
},

Check warning on line 340 in service/matching/pri_backlog_manager.go

View check run for this annotation

Codecov / codecov/patch

service/matching/pri_backlog_manager.go#L335-L340

Added lines #L335 - L340 were not covered by tests
LoadedTasks: int64(r.getLoadedTasks()),
MaxReadLevel: c.db.GetMaxReadLevel(i),
ApproximateBacklogCount: c.db.getApproximateBacklogCount(i),
Expand Down
26 changes: 19 additions & 7 deletions service/matching/pri_task_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import (
"context"
"sync/atomic"
"time"

enumspb "go.temporal.io/api/enums/v1"
Expand All @@ -52,12 +53,13 @@

// priTaskWriter writes tasks persistence split among subqueues
priTaskWriter struct {
backlogMgr *priBacklogManagerImpl
config *taskQueueConfig
db *taskQueueDB
logger log.Logger
appendCh chan *writeTaskRequest
taskIDBlock taskIDBlock
backlogMgr *priBacklogManagerImpl
config *taskQueueConfig
db *taskQueueDB
logger log.Logger
appendCh chan *writeTaskRequest
taskIDBlock taskIDBlock
lastTaskIDBlock taskIDBlock // copy of the last taskIDBlock for safe concurrent access via currentTaskIDBlock()
}
)

Expand Down Expand Up @@ -91,7 +93,6 @@
subqueue int,
taskInfo *persistencespb.TaskInfo,
) error {

select {
case <-w.backlogMgr.tqCtx.Done():
return errShutdown
Expand Down Expand Up @@ -174,6 +175,7 @@
return err
}
w.taskIDBlock = rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize)
w.lastTaskIDBlock = w.taskIDBlock
w.backlogMgr.initState(state, nil)
return nil
}
Expand All @@ -185,6 +187,9 @@

var reqs []*writeTaskRequest
for {
atomic.StoreInt64(&w.lastTaskIDBlock.start, w.taskIDBlock.start)
atomic.StoreInt64(&w.lastTaskIDBlock.end, w.taskIDBlock.end)

select {
case request := <-w.appendCh:
// read a batch of requests from the channel
Expand Down Expand Up @@ -249,3 +254,10 @@
}
return rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize), nil
}

func (w *priTaskWriter) currentTaskIDBlock() taskIDBlock {
return taskIDBlock{
start: atomic.LoadInt64(&w.lastTaskIDBlock.start),
end: atomic.LoadInt64(&w.lastTaskIDBlock.end),
}

Check warning on line 262 in service/matching/pri_task_writer.go

View check run for this annotation

Codecov / codecov/patch

service/matching/pri_task_writer.go#L258-L262

Added lines #L258 - L262 were not covered by tests
}
26 changes: 20 additions & 6 deletions service/matching/task_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package matching
import (
"context"
"errors"
"sync/atomic"
"time"

enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -54,12 +55,13 @@ type (

// taskWriter writes tasks sequentially to persistence
taskWriter struct {
backlogMgr *backlogManagerImpl
config *taskQueueConfig
db *taskQueueDB
logger log.Logger
appendCh chan *writeTaskRequest
taskIDBlock taskIDBlock
backlogMgr *backlogManagerImpl
config *taskQueueConfig
db *taskQueueDB
logger log.Logger
appendCh chan *writeTaskRequest
taskIDBlock taskIDBlock
lastTaskIDBlock taskIDBlock // copy of the last taskIDBlock for safe concurrent access via getTaskIDBlock()
}
)

Expand Down Expand Up @@ -99,6 +101,7 @@ func (w *taskWriter) initReadWriteState() error {
return err
}
w.taskIDBlock = rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize)
w.lastTaskIDBlock = w.taskIDBlock
w.backlogMgr.taskAckManager.setAckLevel(state.ackLevel)

return nil
Expand Down Expand Up @@ -182,6 +185,9 @@ func (w *taskWriter) taskWriterLoop() {
w.backlogMgr.SetInitializedError(err)

for {
atomic.StoreInt64(&w.lastTaskIDBlock.start, w.taskIDBlock.start)
atomic.StoreInt64(&w.lastTaskIDBlock.end, w.taskIDBlock.end)

select {
case request := <-w.appendCh:
// read a batch of requests from the channel
Expand Down Expand Up @@ -248,3 +254,11 @@ func (w *taskWriter) allocTaskIDBlock(prevBlockEnd int64) (taskIDBlock, error) {
}
return rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize), nil
}

// getTaskIDBlock returns the latest taskIDBlock. Safe to be called concurrently.
func (w *taskWriter) getTaskIDBlock() taskIDBlock {
return taskIDBlock{
start: atomic.LoadInt64(&w.lastTaskIDBlock.start),
end: atomic.LoadInt64(&w.lastTaskIDBlock.end),
}
}
Loading