-
Notifications
You must be signed in to change notification settings - Fork 860
[WIP] Task Processing Optimization #7332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
DefaultValue: false, | ||
}, | ||
TimerProcessorInMemoryQueue: { | ||
KeyName: "history.timerProcessorInMemoryQueue", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to rename it history.enableTimerProcessorInMemoryQueue
to make it a boolean explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're using pending task tracker to track all the pending tasks in-memory and uses this component to get the ack level of history queues. The in-memory tasks introduced in this PR are NOT tracked by that component, so it's possible that the ack level becomes larger than a pending task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And RemoveScheduledTasksAfter
should be implemented in a way that invokes Cancel
method of task interface on tasks in pending task tracker.
} | ||
} | ||
|
||
func (r *reschedulerImpl) RemoveScheduledTasksAfter(t time.Time) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not necessary. Task has Cancel
method. If it's canceled, it will be ignored by rescheduler.
ts := info.Tasks[i].GetVisibilityTimestamp() | ||
if ts.Before(nextTime) { | ||
nextTime = ts | ||
readLevel := q.base.newVirtualSliceState.Range.InclusiveMinTaskKey.GetScheduledTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the current max read level. The current read level is in virtual slice component. If virtual queue is still trying to read tasks from the database, the task can be submitted twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes don't seem to be necessary.
What changed?
Timer task queue now keeps incoming tasks in an in-memory queue until the next execution iteration
Why?
To reduce load on DB by avoiding agressively polling it for next timer tasks
How did you test it?
[WIP]
Potential risks
[WIP]