Skip to content

Commit 7c7da40

Browse files
Expose recent queue flow on task queue APIs
Expose recent queue flow on task queue APIs
1 parent 8cb95fb commit 7c7da40

3 files changed

Lines changed: 59 additions & 8 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -821,7 +821,7 @@ heartbeat, complete, and fail responses include `run_closed_reason` and
821821

822822
### Task Queues
823823
- `GET /api/task-queues` — List task queues
824-
- `GET /api/task-queues/{name}` — Task queue details and pollers
824+
- `GET /api/task-queues/{name}` — Task queue details, pollers, and recent add/dispatch flow
825825

826826
Task queue responses include an `admission` object so operators can separate
827827
worker-local capacity from server-side queue and query-task admission limits. Workflow

app/Http/Controllers/Api/TaskQueueController.php

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
use App\Support\WorkflowQueryTaskBroker;
1010
use Illuminate\Http\JsonResponse;
1111
use Illuminate\Http\Request;
12+
use Workflow\V2\Enums\TaskType;
13+
use Workflow\V2\Models\WorkflowTask;
1214
use Workflow\V2\Support\StandaloneWorkerVisibility;
1315

1416
class TaskQueueController
@@ -35,7 +37,7 @@ public function index(Request $request): JsonResponse
3537
$payload = [
3638
'namespace' => $snapshot->namespace,
3739
'task_queues' => array_map(function ($detail) use ($namespace): array {
38-
$summary = $detail->toSummaryArray();
40+
$summary = $this->withRecentTaskFlow($namespace, $detail->name, $detail->toSummaryArray());
3941
$summary['pollers'] = $detail->pollers();
4042
$summary = $this->withAdmission($namespace, $summary);
4143
unset($summary['pollers']);
@@ -56,13 +58,20 @@ public function show(Request $request, string $taskQueue): JsonResponse
5658
$namespace = (string) $request->attributes->get('namespace');
5759

5860
return ControlPlaneProtocol::json(
59-
$this->withAdmission($namespace, StandaloneWorkerVisibility::queueDetail(
61+
$this->withAdmission(
6062
$namespace,
61-
$taskQueue,
62-
WorkerRegistration::class,
63-
now(),
64-
$this->workerStaleAfterSeconds(),
65-
)->toArray()),
63+
$this->withRecentTaskFlow(
64+
$namespace,
65+
$taskQueue,
66+
StandaloneWorkerVisibility::queueDetail(
67+
$namespace,
68+
$taskQueue,
69+
WorkerRegistration::class,
70+
now(),
71+
$this->workerStaleAfterSeconds(),
72+
)->toArray(),
73+
),
74+
),
6675
);
6776
}
6877

@@ -541,6 +550,42 @@ private function taskAdmissionStatus(int $activeWorkerCount, int $configuredSlot
541550
return $leasedCount >= $configuredSlots ? 'saturated' : 'accepting';
542551
}
543552

553+
/**
554+
* @param array<string, mixed> $payload
555+
* @return array<string, mixed>
556+
*/
557+
private function withRecentTaskFlow(string $namespace, string $taskQueue, array $payload): array
558+
{
559+
$stats = is_array($payload['stats'] ?? null) ? $payload['stats'] : [];
560+
$stats = array_merge($stats, $this->recentTaskFlow($namespace, $taskQueue));
561+
$payload['stats'] = $stats;
562+
563+
return $payload;
564+
}
565+
566+
/**
567+
* @return array{tasks_added_last_minute: int, tasks_dispatched_last_minute: int}
568+
*/
569+
private function recentTaskFlow(string $namespace, string $taskQueue): array
570+
{
571+
$windowStart = now()->subMinute();
572+
573+
$query = WorkflowTask::query()
574+
->where('namespace', $namespace)
575+
->where('queue', $taskQueue)
576+
->whereIn('task_type', [TaskType::Workflow->value, TaskType::Activity->value]);
577+
578+
return [
579+
'tasks_added_last_minute' => (clone $query)
580+
->where('created_at', '>=', $windowStart)
581+
->count(),
582+
'tasks_dispatched_last_minute' => (clone $query)
583+
->whereNotNull('last_dispatched_at')
584+
->where('last_dispatched_at', '>=', $windowStart)
585+
->count(),
586+
];
587+
}
588+
544589
private function workerStaleAfterSeconds(): int
545590
{
546591
$configured = config('server.workers.stale_after_seconds');

tests/Feature/TaskQueueVisibilityTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ public function test_it_reports_workflow_backlog_stale_pollers_and_expired_workf
110110
->assertHeader('X-Durable-Workflow-Control-Plane-Version', '2')
111111
->assertJsonPath('name', 'external-workflows')
112112
->assertJsonPath('stats.approximate_backlog_count', 1)
113+
->assertJsonPath('stats.tasks_added_last_minute', 2)
114+
->assertJsonPath('stats.tasks_dispatched_last_minute', 2)
113115
->assertJsonPath('stats.workflow_tasks.ready_count', 1)
114116
->assertJsonPath('stats.workflow_tasks.leased_count', 1)
115117
->assertJsonPath('stats.workflow_tasks.expired_lease_count', 1)
@@ -153,6 +155,8 @@ public function test_it_reports_workflow_backlog_stale_pollers_and_expired_workf
153155

154156
$list->assertOk()
155157
->assertJsonPath('task_queues.0.name', 'external-workflows')
158+
->assertJsonPath('task_queues.0.stats.tasks_added_last_minute', 2)
159+
->assertJsonPath('task_queues.0.stats.tasks_dispatched_last_minute', 2)
156160
->assertJsonPath('task_queues.0.admission.workflow_tasks.configured_slot_count', 10)
157161
->assertJsonPath('task_queues.0.admission.query_tasks.status', 'full')
158162
->assertJsonMissingPath('task_queues.0.pollers');
@@ -214,6 +218,8 @@ public function test_it_reports_activity_backlog_and_current_activity_attempt_le
214218
$describe->assertOk()
215219
->assertHeader('X-Durable-Workflow-Control-Plane-Version', '2')
216220
->assertJsonPath('stats.approximate_backlog_count', 1)
221+
->assertJsonPath('stats.tasks_added_last_minute', 2)
222+
->assertJsonPath('stats.tasks_dispatched_last_minute', 2)
217223
->assertJsonPath('stats.workflow_tasks.ready_count', 0)
218224
->assertJsonPath('stats.activity_tasks.ready_count', 1)
219225
->assertJsonPath('stats.activity_tasks.leased_count', 1)

0 commit comments

Comments
 (0)