|
4 | 4 |
|
5 | 5 | namespace Stackkit\LaravelGoogleCloudTasksQueue;
|
6 | 6 |
|
| 7 | +use Illuminate\Contracts\Debug\ExceptionHandler; |
| 8 | +use Illuminate\Queue\Events\JobTimedOut; |
7 | 9 | use Illuminate\Queue\Worker as LaravelWorker;
|
8 | 10 | use Illuminate\Queue\WorkerOptions;
|
| 11 | +use Symfony\Component\ErrorHandler\Error\FatalError; |
9 | 12 |
|
10 | 13 | /**
|
11 | 14 | * Custom worker class to handle specific requirements for Google Cloud Tasks.
|
|
14 | 17 | * integrate with Google Cloud Tasks, particularly focusing on job timeout
|
15 | 18 | * handling and graceful shutdowns to avoid interrupting the HTTP lifecycle.
|
16 | 19 | *
|
17 |
| - * Firstly, the 'supportsAsyncSignals', 'listenForSignals', and 'registerTimeoutHandler' methods |
18 |
| - * are protected and called within the queue while(true) loop. We want (and need!) to have that |
19 |
| - * too in order to support job timeouts. So, to make it work, we create a public method that |
20 |
| - * can call the private signal methods. |
21 |
| - * |
22 |
| - * Secondly, we need to override the 'kill' method because it tends to kill the server process (artisan serve, octane), |
23 |
| - * as well as abort the HTTP request from Cloud Tasks. This is not the desired behavior. |
24 |
| - * Instead, it should just fire the WorkerStopped event and return a normal status code. |
| 20 | + * Firstly, normally job timeouts are handled using the pcntl extension. Since we |
| 21 | + * are running in an HTTP environment, we can't use those functions. An alternative |
| 22 | + * method is using set_time_limit and when PHP throws the fatal 'Maximum execution time exceeded' error, |
| 23 | + * we will handle the job error like how Laravel would if the pcntl alarm had gone off. |
25 | 24 | */
|
26 | 25 | class Worker extends LaravelWorker
|
27 | 26 | {
|
28 | 27 | public function process($connectionName, $job, WorkerOptions $options): void
|
29 | 28 | {
|
30 |
| - if ($this->supportsAsyncSignals()) { |
31 |
| - $this->listenForSignals(); |
| 29 | + assert($job instanceof CloudTasksJob); |
32 | 30 |
|
33 |
| - $this->registerTimeoutHandler($job, $options); |
34 |
| - } |
| 31 | + set_time_limit(max($this->timeoutForJob($job, $options), 0)); |
| 32 | + |
| 33 | + app(ExceptionHandler::class)->reportable( |
| 34 | + fn (FatalError $error) => $this->onFatalError($error, $job, $options) |
| 35 | + ); |
35 | 36 |
|
36 | 37 | parent::process($connectionName, $job, $options);
|
37 | 38 | }
|
38 | 39 |
|
39 |
| - public function kill($status = 0, $options = null): void |
| 40 | + private function onFatalError(FatalError $error, CloudTasksJob $job, WorkerOptions $options): bool |
40 | 41 | {
|
41 |
| - parent::stop($status, $options); |
| 42 | + if (fnmatch('Maximum execution time * exceeded', $error->getMessage())) { |
| 43 | + $this->onJobTimedOut($job, $options); |
42 | 44 |
|
43 |
| - // When running tests, we cannot run exit because it will kill the PHPunit process. |
44 |
| - // So, to still test that the application has exited, we will simply rely on the |
45 |
| - // WorkerStopped event that is fired when the worker is stopped. |
46 |
| - if (! app()->runningUnitTests()) { |
47 |
| - exit($status); |
| 45 | + return false; |
48 | 46 | }
|
49 | 47 |
|
| 48 | + return true; |
| 49 | + } |
| 50 | + |
| 51 | + private function onJobTimedOut(CloudTasksJob $job, WorkerOptions $options): void |
| 52 | + { |
| 53 | + $this->markJobAsFailedIfWillExceedMaxAttempts( |
| 54 | + $job->getConnectionName(), $job, (int) $options->maxTries, $e = $this->timeoutExceededException($job) |
| 55 | + ); |
| 56 | + |
| 57 | + $this->markJobAsFailedIfWillExceedMaxExceptions( |
| 58 | + $job->getConnectionName(), $job, $e |
| 59 | + ); |
| 60 | + |
| 61 | + $this->markJobAsFailedIfItShouldFailOnTimeout( |
| 62 | + $job->getConnectionName(), $job, $e |
| 63 | + ); |
| 64 | + |
| 65 | + $this->events->dispatch(new JobTimedOut( |
| 66 | + $job->getConnectionName(), $job |
| 67 | + )); |
| 68 | + |
| 69 | + if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) { |
| 70 | + $job->release($this->calculateBackoff($job, $options)); |
| 71 | + } |
50 | 72 | }
|
51 | 73 | }
|
0 commit comments