Skip to content

Commit 9a87f47

Browse files
Make query task queue admission atomic
1 parent 9e8dace commit 9a87f47

4 files changed

Lines changed: 339 additions & 7 deletions

File tree

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
namespace App\Support;
4+
5+
use RuntimeException;
6+
use Throwable;
7+
8+
final class QueryTaskQueueUnavailableException extends RuntimeException
9+
{
10+
public function __construct(
11+
public readonly string $namespace,
12+
public readonly string $taskQueue,
13+
string $reason,
14+
?Throwable $previous = null,
15+
) {
16+
parent::__construct(sprintf(
17+
'Query task queue [%s] in namespace [%s] is unavailable: %s',
18+
$taskQueue,
19+
$namespace,
20+
$reason,
21+
), 0, $previous);
22+
}
23+
}

app/Support/WorkflowQueryTaskBroker.php

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
namespace App\Support;
44

55
use App\Models\WorkerRegistration;
6+
use Closure;
7+
use Illuminate\Contracts\Cache\LockProvider;
8+
use Illuminate\Contracts\Cache\LockTimeoutException;
69
use Illuminate\Contracts\Cache\Repository as CacheRepository;
710
use Illuminate\Support\Carbon;
811
use Illuminate\Support\Collection;
@@ -60,6 +63,14 @@ public function query(
6063
$exception->getMessage(),
6164
429,
6265
);
66+
} catch (QueryTaskQueueUnavailableException $exception) {
67+
return $this->queryFailed(
68+
$run,
69+
$queryName,
70+
'query_task_queue_unavailable',
71+
$exception->getMessage(),
72+
503,
73+
);
6374
}
6475

6576
$result = $this->waitForResult((string) $task['query_task_id']);
@@ -292,6 +303,25 @@ private function claimNext(
292303
string $taskQueue,
293304
string $leaseOwner,
294305
array $supportedWorkflowTypes,
306+
): ?array {
307+
$task = $this->withQueueLock(
308+
$namespace,
309+
$taskQueue,
310+
fn (): ?array => $this->claimNextPendingTask($namespace, $taskQueue, $leaseOwner, $supportedWorkflowTypes),
311+
);
312+
313+
return is_array($task) ? $this->queryTaskPayload($task) : null;
314+
}
315+
316+
/**
317+
* @param list<string> $supportedWorkflowTypes
318+
* @return array<string, mixed>|null
319+
*/
320+
private function claimNextPendingTask(
321+
string $namespace,
322+
string $taskQueue,
323+
string $leaseOwner,
324+
array $supportedWorkflowTypes,
295325
): ?array {
296326
$ids = $this->pendingTaskIds($namespace, $taskQueue);
297327
$remaining = [];
@@ -332,7 +362,7 @@ private function claimNext(
332362
)),
333363
);
334364

335-
return $this->queryTaskPayload($task);
365+
return $task;
336366
}
337367

338368
$this->storePendingTaskIds($namespace, $taskQueue, $remaining);
@@ -512,15 +542,17 @@ private function putTask(array $task): void
512542

513543
private function appendPendingTask(string $namespace, string $taskQueue, string $queryTaskId): void
514544
{
515-
$ids = $this->pendingTaskIds($namespace, $taskQueue);
545+
$this->withQueueLock($namespace, $taskQueue, function () use ($namespace, $taskQueue, $queryTaskId): void {
546+
$ids = $this->pendingTaskIds($namespace, $taskQueue);
516547

517-
if (! in_array($queryTaskId, $ids, true) && count($ids) >= $this->maxPendingPerQueue()) {
518-
throw new QueryTaskQueueFullException($namespace, $taskQueue, $this->maxPendingPerQueue());
519-
}
548+
if (! in_array($queryTaskId, $ids, true) && count($ids) >= $this->maxPendingPerQueue()) {
549+
throw new QueryTaskQueueFullException($namespace, $taskQueue, $this->maxPendingPerQueue());
550+
}
520551

521-
$ids[] = $queryTaskId;
552+
$ids[] = $queryTaskId;
522553

523-
$this->storePendingTaskIds($namespace, $taskQueue, array_values(array_unique($ids)));
554+
$this->storePendingTaskIds($namespace, $taskQueue, array_values(array_unique($ids)));
555+
});
524556
}
525557

526558
/**
@@ -554,6 +586,29 @@ private function storePendingTaskIds(string $namespace, string $taskQueue, array
554586
$this->store()->put($this->queueKey($namespace, $taskQueue), array_values($ids), now()->addSeconds($this->taskTtlSeconds()));
555587
}
556588

589+
/**
590+
* @template TReturn
591+
*
592+
* @param Closure(): TReturn $callback
593+
* @return TReturn
594+
*/
595+
private function withQueueLock(string $namespace, string $taskQueue, Closure $callback): mixed
596+
{
597+
$store = $this->store()->getStore();
598+
599+
if (! $store instanceof LockProvider) {
600+
throw new QueryTaskQueueUnavailableException($namespace, $taskQueue, 'The configured polling cache store does not support atomic locks.');
601+
}
602+
603+
try {
604+
return $store
605+
->lock($this->queueLockKey($namespace, $taskQueue), $this->queueLockTtlSeconds())
606+
->block($this->queueLockWaitSeconds(), $callback);
607+
} catch (LockTimeoutException $exception) {
608+
throw new QueryTaskQueueUnavailableException($namespace, $taskQueue, 'Timed out waiting for the query task queue lock.', $exception);
609+
}
610+
}
611+
557612
private function queryFailed(
558613
WorkflowRun $run,
559614
string $queryName,
@@ -637,6 +692,16 @@ private function maxPendingPerQueue(): int
637692
return max(1, min(10000, (int) config('server.query_tasks.max_pending_per_queue', 1024)));
638693
}
639694

695+
private function queueLockTtlSeconds(): int
696+
{
697+
return 10;
698+
}
699+
700+
private function queueLockWaitSeconds(): int
701+
{
702+
return 5;
703+
}
704+
640705
private function staleAfterSeconds(): int
641706
{
642707
return max(1, (int) config('server.workers.stale_after_seconds', 60));
@@ -657,6 +722,11 @@ private function queueKey(string $namespace, string $taskQueue): string
657722
return self::CACHE_PREFIX.'queue:'.sha1($namespace.'|'.$taskQueue);
658723
}
659724

725+
private function queueLockKey(string $namespace, string $taskQueue): string
726+
{
727+
return self::CACHE_PREFIX.'queue-lock:'.sha1($namespace.'|'.$taskQueue);
728+
}
729+
660730
private function store(): CacheRepository
661731
{
662732
return $this->cache->store();

tests/Feature/WorkflowQueryTaskBrokerTest.php

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
use App\Models\WorkerRegistration;
66
use App\Support\ControlPlaneProtocol;
77
use App\Support\QueryTaskQueueFullException;
8+
use App\Support\ServerPollingCache;
89
use App\Support\WorkerProtocol;
910
use App\Support\WorkflowQueryTaskBroker;
1011
use Illuminate\Foundation\Testing\RefreshDatabase;
12+
use Illuminate\Support\Facades\File;
1113
use Illuminate\Support\Facades\Queue;
14+
use Symfony\Component\Process\Process;
1215
use Tests\Feature\Concerns\ServerTestHelpers;
1316
use Tests\TestCase;
1417
use Workflow\Serializers\Serializer;
@@ -190,6 +193,101 @@ public function test_control_plane_query_reports_queue_full_when_pending_limit_i
190193
->assertJsonPath('control_plane.operation_name', 'status');
191194
}
192195

196+
public function test_concurrent_query_task_enqueues_are_atomic_for_file_cache_backend(): void
197+
{
198+
$cachePath = sys_get_temp_dir().'/dw-server-query-task-race-'.bin2hex(random_bytes(5));
199+
$readyDir = $cachePath.'-ready';
200+
$barrierPath = $cachePath.'.release';
201+
$processCount = 8;
202+
$limit = 3;
203+
$processes = [];
204+
205+
File::ensureDirectoryExists($cachePath);
206+
File::ensureDirectoryExists($readyDir);
207+
208+
config([
209+
'cache.default' => 'file',
210+
'server.polling.cache_path' => $cachePath,
211+
'server.query_tasks.max_pending_per_queue' => $limit,
212+
]);
213+
214+
try {
215+
for ($i = 0; $i < $processCount; $i++) {
216+
$process = new Process([
217+
PHP_BINARY,
218+
base_path('tests/Fixtures/query_task_enqueue_worker.php'),
219+
$cachePath,
220+
$barrierPath,
221+
$readyDir,
222+
(string) $limit,
223+
'default',
224+
'python-queries',
225+
'worker-'.$i,
226+
], base_path());
227+
$process->setTimeout(30);
228+
$process->start();
229+
230+
$processes[] = $process;
231+
}
232+
233+
$this->waitForReadyQueryTaskEnqueueWorkers($readyDir, $processCount, $processes);
234+
235+
touch($barrierPath);
236+
237+
$results = array_map(
238+
fn (Process $process): array => $this->queryTaskEnqueueWorkerResult($process),
239+
$processes,
240+
);
241+
242+
$errors = array_values(array_filter(
243+
$results,
244+
static fn (array $result): bool => ($result['status'] ?? null) === 'error',
245+
));
246+
247+
$this->assertSame([], $errors);
248+
249+
$enqueuedIds = array_values(array_map(
250+
static fn (array $result): string => (string) $result['query_task_id'],
251+
array_filter($results, static fn (array $result): bool => ($result['status'] ?? null) === 'enqueued'),
252+
));
253+
$fullResults = array_values(array_filter(
254+
$results,
255+
static fn (array $result): bool => ($result['status'] ?? null) === 'full',
256+
));
257+
258+
$this->assertCount($limit, $enqueuedIds);
259+
$this->assertCount($processCount - $limit, $fullResults);
260+
261+
/** @var ServerPollingCache $cache */
262+
$cache = app(ServerPollingCache::class);
263+
$store = $cache->store();
264+
$queueIds = $store->get('server:workflow-query-task:queue:'.sha1('default|python-queries'));
265+
266+
$this->assertIsArray($queueIds);
267+
sort($queueIds);
268+
sort($enqueuedIds);
269+
270+
$this->assertSame($enqueuedIds, $queueIds);
271+
272+
foreach ($queueIds as $queryTaskId) {
273+
$task = $store->get('server:workflow-query-task:task:'.$queryTaskId);
274+
275+
$this->assertIsArray($task);
276+
$this->assertSame('pending', $task['status'] ?? null);
277+
}
278+
} finally {
279+
foreach ($processes as $process) {
280+
if ($process->isRunning()) {
281+
$process->stop(0);
282+
}
283+
}
284+
285+
File::deleteDirectory($cachePath);
286+
File::deleteDirectory($readyDir);
287+
@unlink($barrierPath);
288+
}
289+
}
290+
193291
private function startRemoteWorkflow(string $workflowId): WorkflowRun
194292
{
195293
$start = $this->postJson('/api/workflows', [
@@ -236,4 +334,51 @@ private function registerPythonWorker(
236334
],
237335
);
238336
}
337+
338+
/**
339+
* @param list<Process> $processes
340+
*/
341+
private function waitForReadyQueryTaskEnqueueWorkers(string $readyDir, int $expected, array $processes): void
342+
{
343+
$deadline = microtime(true) + 15;
344+
345+
while ($this->readyQueryTaskEnqueueWorkerCount($readyDir) < $expected && microtime(true) < $deadline) {
346+
foreach ($processes as $process) {
347+
if (! $process->isRunning()) {
348+
$this->fail("Query-task enqueue worker exited before the barrier.\n".$process->getOutput().$process->getErrorOutput());
349+
}
350+
}
351+
352+
usleep(10000);
353+
}
354+
355+
$this->assertSame($expected, $this->readyQueryTaskEnqueueWorkerCount($readyDir));
356+
}
357+
358+
private function readyQueryTaskEnqueueWorkerCount(string $readyDir): int
359+
{
360+
return count(glob($readyDir.'/*.ready') ?: []);
361+
}
362+
363+
/**
364+
* @return array<string, mixed>
365+
*/
366+
private function queryTaskEnqueueWorkerResult(Process $process): array
367+
{
368+
$process->wait();
369+
370+
$output = trim($process->getOutput());
371+
$decoded = json_decode($output, true);
372+
373+
if (! $process->isSuccessful() || ! is_array($decoded)) {
374+
return [
375+
'status' => 'error',
376+
'exit_code' => $process->getExitCode(),
377+
'stdout' => $output,
378+
'stderr' => trim($process->getErrorOutput()),
379+
];
380+
}
381+
382+
return $decoded;
383+
}
239384
}

0 commit comments

Comments
 (0)