Skip to content

Commit 6bf61a0

Browse files
authored
add expired tasks state to the metric (temporalio#8059)
## What changed? add "task expire state" tag to the "ExpiredTasksPerTaskQueueCounter" metric. Possible states - "read", "process", "memory" ## Why? For fairness, we remove TTL. We expect that this will lead to reading too many expired tasks. To be sure we want a metrics tag that will allows us to separate tasks that are expired when we read them from other tasks. ## How did you test it? - [X] built
1 parent 53ad671 commit 6bf61a0

File tree

5 files changed

+29
-6
lines changed

5 files changed

+29
-6
lines changed

common/metrics/tags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const (
4141
// See server.api.enums.v1.ReplicationTaskType
4242
replicationTaskType = "replicationTaskType"
4343
replicationTaskPriority = "replicationTaskPriority"
44+
taskExpireStage = "task_expire_stage"
4445
versioningBehavior = "versioning_behavior"
4546
isFirstAttempt = "first-attempt"
4647
workflowStatus = "workflow_status"
@@ -477,3 +478,7 @@ func ToUnversionedTag(version string) Tag {
477478
}
478479
return &tagImpl{key: toUnversioned, value: falseValue}
479480
}
481+
482+
var TaskExpireStageReadTag Tag = &tagImpl{key: taskExpireStage, value: "read"}
483+
var TaskExpireStageMemoryTag Tag = &tagImpl{key: taskExpireStage, value: "memory"}
484+
var TaskInvalidTag Tag = &tagImpl{key: taskExpireStage, value: "invalid"}

service/matching/physical_task_queue_manager.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,10 @@ func (c *physicalTaskQueueManagerImpl) PollTask(
338338
// there. In that case, go back for another task.
339339
// If we didn't do this, the task would be rejected when we call RecordXTaskStarted on
340340
// history, but this is more efficient.
341+
341342
if task.event != nil && IsTaskExpired(task.event.AllocatedTaskInfo) {
342-
c.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1)
343+
// task is expired while polling
344+
c.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1, metrics.TaskExpireStageMemoryTag)
343345
task.finish(nil, false)
344346
continue
345347
}
@@ -382,7 +384,9 @@ func (c *physicalTaskQueueManagerImpl) ProcessSpooledTask(
382384
) error {
383385
if !c.taskValidator.maybeValidate(task.event.AllocatedTaskInfo, c.queue.TaskType()) {
384386
task.finish(nil, false)
385-
c.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1)
387+
388+
var invalidTaskTag = getInvalidTaskTag(task)
389+
c.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1, invalidTaskTag)
386390
// Don't try to set read level here because it may have been advanced already.
387391
return nil
388392
}

service/matching/pri_matcher.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,10 @@ func (tm *priTaskMatcher) forwardTask(task *internalTask) error {
187187
maybeValid := tm.validator.maybeValidate(task.event.AllocatedTaskInfo, tm.fwdr.partition.TaskType())
188188
if !maybeValid {
189189
task.finish(nil, false)
190-
tm.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1)
190+
var invalidTaskTag = getInvalidTaskTag(task)
191+
192+
// consider this task expired while processing.
193+
tm.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1, invalidTaskTag)
191194
return nil
192195
}
193196

@@ -243,7 +246,9 @@ func (tm *priTaskMatcher) validateTasksOnRoot(lim quotas.RateLimiter, retrier ba
243246
if !maybeValid {
244247
// We found an invalid one, complete it and go back for another immediately.
245248
task.finish(nil, false)
246-
tm.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1)
249+
var invalidStageTag = getInvalidTaskTag(task)
250+
tm.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1, invalidStageTag)
251+
247252
retrier.Reset()
248253
} else {
249254
// Task was valid, put it back and slow down checking.
@@ -611,3 +616,10 @@ func (tm *priTaskMatcher) emitForwardedSourceStats(
611616
metrics.LocalToLocalMatchPerTaskQueueCounter.With(tm.metricsHandler).Record(1)
612617
}
613618
}
619+
620+
func getInvalidTaskTag(task *internalTask) metrics.Tag {
621+
if IsTaskExpired(task.event.AllocatedTaskInfo) {
622+
return metrics.TaskExpireStageMemoryTag
623+
}
624+
return metrics.TaskInvalidTag
625+
}

service/matching/pri_task_reader.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ func (tr *priTaskReader) processTaskBatch(tasks []*persistencespb.AllocatedTaskI
247247
tr.readLevel = max(tr.readLevel, t.TaskId)
248248

249249
if IsTaskExpired(t) {
250-
metrics.ExpiredTasksPerTaskQueueCounter.With(tr.backlogMgr.metricsHandler).Record(1)
250+
// task expired when we read it
251+
metrics.ExpiredTasksPerTaskQueueCounter.With(tr.backlogMgr.metricsHandler).Record(1, metrics.TaskExpireStageReadTag)
251252
return true
252253
}
253254

service/matching/task_reader.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ func (tr *taskReader) addTasksToBuffer(
254254
) error {
255255
for _, t := range tasks {
256256
if IsTaskExpired(t) {
257-
metrics.ExpiredTasksPerTaskQueueCounter.With(tr.taggedMetricsHandler()).Record(1)
257+
// task is expired when "add tasks to buffer" is called, so when we read it
258+
metrics.ExpiredTasksPerTaskQueueCounter.With(tr.taggedMetricsHandler()).Record(1, metrics.TaskExpireStageReadTag)
258259
// Also increment readLevel for expired tasks otherwise it could result in
259260
// looping over the same tasks if all tasks read in the batch are expired
260261
tr.backlogMgr.taskAckManager.setReadLevel(t.GetTaskId())

0 commit comments

Comments
 (0)