Skip to content

Commit 684793b

Browse files
Handles job retries via release in Cloud Run Jobs
Modifies the Cloud Run Job worker to handle job retries by releasing jobs back to Cloud Tasks instead of relying on Cloud Run's retry mechanism. This prevents duplicate job executions when retries are configured on the Cloud Run Job itself, as Laravel manages retries by pushing a new task back to Cloud Tasks when a job is released. This matches the same behaviour as the HTTP request handler Updates tests to reflect the change.
1 parent e8dc1a7 commit 684793b

File tree

2 files changed

+36
-46
lines changed

2 files changed

+36
-46
lines changed

src/Commands/WorkCloudRunJob.php

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66

77
use Exception;
88
use Illuminate\Console\Command;
9-
10-
use function Safe\base64_decode;
11-
129
use Illuminate\Container\Container;
1310
use Illuminate\Queue\WorkerOptions;
1411
use Illuminate\Support\Facades\Storage;
@@ -18,6 +15,8 @@
1815
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksJob;
1916
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksQueue;
2017

18+
use function Safe\base64_decode;
19+
2120
/**
2221
* Artisan command to process Cloud Tasks jobs via Cloud Run Jobs.
2322
*
@@ -100,27 +99,18 @@ public function handle(): int
10099
/** @var Worker $worker */
101100
$worker = app('cloud-tasks.worker');
102101

103-
// Use rescue to catch any errors during processing, similar to TaskHandler
104-
$failed = false;
105-
rescue(function () use ($worker, $job, $task, $config) {
106-
$worker->process(
107-
connectionName: $job->getConnectionName(),
108-
job: $job,
109-
options: CloudTasksQueue::getWorkerOptionsCallback()
110-
? (CloudTasksQueue::getWorkerOptionsCallback())($task)
111-
: $this->getWorkerOptions($config)
112-
);
113-
}, function () use (&$failed) {
114-
$failed = true;
115-
});
116-
117-
if ($failed || $job->hasFailed()) {
118-
$this->error('Job processing failed.');
119-
120-
return self::FAILURE;
121-
}
102+
// We manually manage retries by releasing jobs (which pushes a new task back to Cloud Tasks),
103+
// so we never want to return a failure exit code as that will result in duplicate job attempts
104+
// if retries are configured on the cloud run job.
105+
rescue(fn () => $worker->process(
106+
connectionName: $job->getConnectionName(),
107+
job: $job,
108+
options: CloudTasksQueue::getWorkerOptionsCallback()
109+
? (CloudTasksQueue::getWorkerOptionsCallback())($task)
110+
: $this->getWorkerOptions($config)
111+
));
122112

123-
$this->info('Job processed successfully.');
113+
$this->info('Job processed.');
124114

125115
return self::SUCCESS;
126116
}
@@ -190,7 +180,7 @@ private function fetchPayloadFromStorage(string $payloadPath): ?string
190180
*/
191181
private function getWorkerOptions(array $config): WorkerOptions
192182
{
193-
$options = new WorkerOptions;
183+
$options = new WorkerOptions();
194184

195185
if (isset($config['backoff'])) {
196186
$options->backoff = $config['backoff'];

tests/CloudRunJobTest.php

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public function it_can_run_a_job_via_the_command(): void
7878
{
7979
// Arrange
8080
Event::fake(JobOutput::class);
81-
$payload = $this->createPayload(new SimpleJob);
81+
$payload = $this->createPayload(new SimpleJob());
8282
$this->setEnvVars($payload, 'test-task-name');
8383

8484
// Act
@@ -95,7 +95,7 @@ public function it_extracts_connection_from_payload(): void
9595
Event::fake(JobOutput::class);
9696

9797
// Create a job with a specific connection
98-
$job = new SimpleJob;
98+
$job = new SimpleJob();
9999
$job->connection = 'my-cloudtasks-connection';
100100
$payload = $this->createPayload($job);
101101
$this->setEnvVars($payload, 'test-task-name');
@@ -121,7 +121,7 @@ public function it_fails_without_payload(): void
121121
public function it_fails_without_task_name(): void
122122
{
123123
// Arrange
124-
$payload = $this->createPayload(new SimpleJob);
124+
$payload = $this->createPayload(new SimpleJob());
125125
putenv('CLOUD_TASKS_PAYLOAD='.$payload);
126126

127127
// Act & Assert
@@ -143,13 +143,13 @@ public function it_handles_failing_jobs(): void
143143
{
144144
// Arrange
145145
Event::fake(JobOutput::class);
146-
$payload = $this->createPayload(new FailingJob);
146+
$payload = $this->createPayload(new FailingJob());
147147
$this->setEnvVars($payload, 'test-task-name');
148148

149-
// Act
150-
$this->artisan('cloud-tasks:work-job');
149+
// Act & Assert - The command should always return success, just like the HTTP handler.
150+
// Retries are managed by Laravel via $job->release(), not by Cloud Run Jobs retry mechanism.
151+
$this->artisan('cloud-tasks:work-job')->assertSuccessful();
151152

152-
// Assert - The job should process but the command may return failure due to exception
153153
Event::assertDispatched(JobOutput::class);
154154
}
155155

@@ -158,7 +158,7 @@ public function it_can_handle_encrypted_jobs(): void
158158
{
159159
// Arrange
160160
Event::fake(JobOutput::class);
161-
$payload = $this->createPayload(new EncryptedJob);
161+
$payload = $this->createPayload(new EncryptedJob());
162162
$this->setEnvVars($payload, 'test-task-name');
163163

164164
// Act
@@ -177,7 +177,7 @@ public function uses_worker_options_callback(): void
177177
return new WorkerOptions(maxTries: 10);
178178
});
179179

180-
$payload = $this->createPayload(new SimpleJob);
180+
$payload = $this->createPayload(new SimpleJob());
181181
$this->setEnvVars($payload, 'test-task-name');
182182

183183
// Act
@@ -201,7 +201,7 @@ public function cloud_run_job_target_creates_http_request_to_run_api(): void
201201
$this->setConfigValue('cloud_run_job_region', 'europe-west1');
202202

203203
// Act
204-
$this->dispatch(new SimpleJob);
204+
$this->dispatch(new SimpleJob());
205205

206206
// Assert
207207
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -221,7 +221,7 @@ public function cloud_run_job_target_uses_location_as_default_region(): void
221221
// Not setting cloud_run_job_region - should default to location
222222

223223
// Act
224-
$this->dispatch(new SimpleJob);
224+
$this->dispatch(new SimpleJob());
225225

226226
// Assert
227227
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -241,7 +241,7 @@ public function cloud_run_job_target_posts_with_post_method(): void
241241
$this->setConfigValue('cloud_run_job_name', 'my-worker-job');
242242

243243
// Act
244-
$this->dispatch(new SimpleJob);
244+
$this->dispatch(new SimpleJob());
245245

246246
// Assert
247247
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -258,7 +258,7 @@ public function cloud_run_job_target_includes_container_overrides_with_env_vars(
258258
$this->setConfigValue('cloud_run_job_name', 'my-worker-job');
259259

260260
// Act
261-
$this->dispatch(new SimpleJob);
261+
$this->dispatch(new SimpleJob());
262262

263263
// Assert
264264
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -278,7 +278,7 @@ public function cloud_run_job_target_includes_base64_encoded_payload_in_env(): v
278278
$this->setConfigValue('cloud_run_job_name', 'my-worker-job');
279279

280280
// Act
281-
$this->dispatch(new SimpleJob);
281+
$this->dispatch(new SimpleJob());
282282

283283
// Assert
284284
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -307,7 +307,7 @@ public function cloud_run_job_target_includes_task_name_in_env(): void
307307
$this->setConfigValue('cloud_run_job_name', 'my-worker-job');
308308

309309
// Act
310-
$this->dispatch(new SimpleJob);
310+
$this->dispatch(new SimpleJob());
311311

312312
// Assert
313313
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -334,7 +334,7 @@ public function cloud_run_job_target_sets_oauth_token_with_correct_scope(): void
334334
$this->setConfigValue('cloud_run_job_name', 'my-worker-job');
335335

336336
// Act
337-
$this->dispatch(new SimpleJob);
337+
$this->dispatch(new SimpleJob());
338338

339339
// Assert
340340
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -354,7 +354,7 @@ public function cloud_run_job_target_respects_dispatch_deadline(): void
354354
$this->setConfigValue('dispatch_deadline', 1800);
355355

356356
// Act
357-
$this->dispatch(new SimpleJob);
357+
$this->dispatch(new SimpleJob());
358358

359359
// Assert
360360
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -374,7 +374,7 @@ public function incoming_task_returns_task_name_from_constructor(): void
374374
'displayName' => 'SimpleJob',
375375
'job' => 'Illuminate\\Queue\\CallQueuedHandler@call',
376376
'data' => [
377-
'command' => serialize(new SimpleJob),
377+
'command' => serialize(new SimpleJob()),
378378
],
379379
'internal' => [
380380
'attempts' => 0,
@@ -392,7 +392,7 @@ public function incoming_task_returns_task_name_from_constructor(): void
392392
public function incoming_task_extracts_connection_from_payload(): void
393393
{
394394
// Arrange
395-
$job = new SimpleJob;
395+
$job = new SimpleJob();
396396
$job->connection = 'my-cloudtasks-connection';
397397

398398
$payload = json_encode([
@@ -428,7 +428,7 @@ public function payload_below_threshold_is_passed_directly_in_env(): void
428428
$this->setConfigValue('payload_threshold', 100000); // 100KB threshold
429429

430430
// Act
431-
$this->dispatch(new SimpleJob);
431+
$this->dispatch(new SimpleJob());
432432

433433
// Assert - should use CLOUD_TASKS_PAYLOAD directly since payload is below threshold
434434
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -458,7 +458,7 @@ public function payload_above_threshold_is_stored_in_disk(): void
458458
$this->setConfigValue('payload_threshold', 1); // 1 byte threshold
459459

460460
// Act
461-
$this->dispatch(new SimpleJob);
461+
$this->dispatch(new SimpleJob());
462462

463463
// Assert - should use CLOUD_TASKS_PAYLOAD_PATH since payload exceeds threshold
464464
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
@@ -485,7 +485,7 @@ public function worker_can_process_job_from_payload_path(): void
485485
// Arrange
486486
Event::fake(JobOutput::class);
487487
Storage::fake('local');
488-
$payload = $this->createPayload(new SimpleJob);
488+
$payload = $this->createPayload(new SimpleJob());
489489
$path = 'cloud-tasks-payloads/test-task.json';
490490

491491
// Store payload in fake storage

0 commit comments

Comments
 (0)