5
5
namespace Stackkit \LaravelGoogleCloudTasksQueue ;
6
6
7
7
use Closure ;
8
+ use Exception ;
8
9
use Illuminate \Support \Str ;
9
10
10
11
use function Safe \json_decode ;
23
24
use Illuminate \Contracts \Queue \Queue as QueueContract ;
24
25
use Stackkit \LaravelGoogleCloudTasksQueue \Events \TaskCreated ;
25
26
27
+ /**
28
+ * @phpstan-import-type QueueConfig from CloudTasksConnector
29
+ * @phpstan-import-type JobShape from CloudTasksJob
30
+ * @phpstan-import-type JobBeforeDispatch from CloudTasksJob
31
+ *
32
+ * @phpstan-type JobOptions array{
33
+ * job?: Closure|string|object,
34
+ * delay?: ?int
35
+ * }
36
+ */
26
37
class CloudTasksQueue extends LaravelQueue implements QueueContract
27
38
{
28
- private static ?Closure $ handlerUrlCallback = null ;
39
+ protected static ?Closure $ handlerUrlCallback = null ;
29
40
30
- private static ?Closure $ taskHeadersCallback = null ;
41
+ protected static ?Closure $ taskHeadersCallback = null ;
31
42
32
43
/** @var (Closure(IncomingTask): WorkerOptions)|null */
33
- private static ?Closure $ workerOptionsCallback = null ;
44
+ protected static ?Closure $ workerOptionsCallback = null ;
34
45
35
- public function __construct (public array $ config , public CloudTasksClient $ client , public $ dispatchAfterCommit = false )
36
- {
46
+ /**
47
+ * @param QueueConfig $config
48
+ */
49
+ public function __construct (
50
+ public array $ config ,
51
+ public CloudTasksClient $ client ,
52
+ // @phpstan-ignore-next-line
53
+ public $ dispatchAfterCommit = false ,
54
+ ) {
37
55
//
38
56
}
39
57
@@ -92,15 +110,20 @@ public function size($queue = null): int
92
110
/**
93
111
* Push a new job onto the queue.
94
112
*
95
- * @param string|object $job
113
+ * @param string|Closure|JobBeforeDispatch $job
96
114
* @param mixed $data
97
115
* @param string|null $queue
98
- * @return void
116
+ * @return mixed
99
117
*/
100
118
public function push ($ job , $ data = '' , $ queue = null )
101
119
{
102
- if (! ($ job instanceof Closure)) {
103
- $ job ->queue = $ queue ?? $ job ->queue ?? $ this ->config ['queue ' ];
120
+ if (! $ queue ) {
121
+ $ queue = $ this ->getQueueForJob ($ job );
122
+ }
123
+
124
+ if (is_object ($ job ) && ! $ job instanceof Closure) {
125
+ /** @var JobBeforeDispatch $job */
126
+ $ job ->queue = $ queue ;
104
127
}
105
128
106
129
return $ this ->enqueueUsing (
@@ -119,6 +142,7 @@ function ($payload, $queue) use ($job) {
119
142
*
120
143
* @param string $payload
121
144
* @param string|null $queue
145
+ * @param JobOptions $options
122
146
* @return string
123
147
*/
124
148
public function pushRaw ($ payload , $ queue = null , array $ options = [])
@@ -133,13 +157,18 @@ public function pushRaw($payload, $queue = null, array $options = [])
133
157
* Push a new job onto the queue after a delay.
134
158
*
135
159
* @param \DateTimeInterface|\DateInterval|int $delay
136
- * @param string|object $job
160
+ * @param Closure| string|JobBeforeDispatch $job
137
161
* @param mixed $data
138
162
* @param string|null $queue
139
- * @return void
163
+ * @return mixed
140
164
*/
141
165
public function later ($ delay , $ job , $ data = '' , $ queue = null )
142
166
{
167
+ // Laravel pls fix your typehints
168
+ if (! $ queue ) {
169
+ $ queue = $ this ->getQueueForJob ($ job );
170
+ }
171
+
143
172
return $ this ->enqueueUsing (
144
173
$ job ,
145
174
$ this ->createPayload ($ job , $ queue , $ data ),
@@ -157,7 +186,7 @@ function ($payload, $queue, $delay) use ($job) {
157
186
* @param string|null $queue
158
187
* @param string $payload
159
188
* @param \DateTimeInterface|\DateInterval|int $delay
160
- * @param string|object $job
189
+ * @param Closure| string|object|null $job
161
190
* @return string
162
191
*/
163
192
protected function pushToCloudTasks ($ queue , $ payload , $ delay , mixed $ job )
@@ -166,6 +195,7 @@ protected function pushToCloudTasks($queue, $payload, $delay, mixed $job)
166
195
167
196
$ payload = (array ) json_decode ($ payload , true );
168
197
198
+ /** @var JobShape $payload */
169
199
$ task = tap (new Task )->setName ($ this ->taskName ($ queue , $ payload ['displayName ' ]));
170
200
171
201
$ payload = $ this ->enrichPayloadWithAttempts ($ payload );
@@ -200,33 +230,43 @@ private function taskName(string $queueName, string $displayName): string
200
230
);
201
231
}
202
232
203
- private function enrichPayloadWithAttempts (
204
- array $ payload ,
205
- ): array {
233
+ /**
234
+ * @param JobShape $payload
235
+ * @return JobShape
236
+ */
237
+ private function enrichPayloadWithAttempts (array $ payload ): array
238
+ {
206
239
$ payload ['internal ' ] = [
207
240
'attempts ' => $ payload ['internal ' ]['attempts ' ] ?? 0 ,
208
241
];
209
242
210
243
return $ payload ;
211
244
}
212
245
213
- /** @param string|object $job */
214
- public function addPayloadToTask (array $ payload , Task $ task , mixed $ job ): Task
246
+ /**
247
+ * @param Closure|string|object|null $job
248
+ * @param JobShape $payload
249
+ */
250
+ public function addPayloadToTask (array $ payload , Task $ task , $ job ): Task
215
251
{
216
252
$ headers = $ this ->headers ($ payload );
217
253
218
254
if (! empty ($ this ->config ['app_engine ' ])) {
219
255
$ path = \Safe \parse_url (route ('cloud-tasks.handle-task ' ), PHP_URL_PATH );
220
256
257
+ if (! is_string ($ path )) {
258
+ throw new Exception ('Something went wrong parsing the route. ' );
259
+ }
260
+
221
261
$ appEngineRequest = new AppEngineHttpRequest ;
222
262
$ appEngineRequest ->setRelativeUri ($ path );
223
263
$ appEngineRequest ->setHttpMethod (HttpMethod::POST );
224
264
$ appEngineRequest ->setBody (json_encode ($ payload ));
225
265
$ appEngineRequest ->setHeaders ($ headers );
226
266
227
- if (! empty ($ service = $ this ->config ['app_engine_service ' ])) {
267
+ if (! empty ($ this ->config ['app_engine_service ' ])) {
228
268
$ routing = new AppEngineRouting ;
229
- $ routing ->setService ($ service );
269
+ $ routing ->setService ($ this -> config [ ' app_engine_service ' ] );
230
270
$ appEngineRequest ->setAppEngineRouting ($ routing );
231
271
}
232
272
@@ -239,7 +279,7 @@ public function addPayloadToTask(array $payload, Task $task, mixed $job): Task
239
279
$ httpRequest ->setHeaders ($ headers );
240
280
241
281
$ token = new OidcToken ;
242
- $ token ->setServiceAccountEmail ($ this ->config ['service_account_email ' ]);
282
+ $ token ->setServiceAccountEmail ($ this ->config ['service_account_email ' ] ?? '' );
243
283
$ httpRequest ->setOidcToken ($ token );
244
284
$ task ->setHttpRequest ($ httpRequest );
245
285
}
@@ -267,7 +307,9 @@ public function release(CloudTasksJob $job, int $delay = 0): void
267
307
);
268
308
}
269
309
270
- /** @param string|object $job */
310
+ /**
311
+ * @param Closure|string|object|null $job
312
+ */
271
313
public function getHandler (mixed $ job ): string
272
314
{
273
315
if (static ::$ handlerUrlCallback ) {
@@ -280,11 +322,11 @@ public function getHandler(mixed $job): string
280
322
281
323
$ handler = rtrim ($ this ->config ['handler ' ], '/ ' );
282
324
283
- if (str_ends_with ($ handler , '/ ' .config ('cloud-tasks.uri ' ))) {
325
+ if (str_ends_with ($ handler , '/ ' .config ()-> string ( 'cloud-tasks.uri ' ))) {
284
326
return $ handler ;
285
327
}
286
328
287
- return $ handler .'/ ' .config ('cloud-tasks.uri ' );
329
+ return $ handler .'/ ' .config ()-> string ( 'cloud-tasks.uri ' );
288
330
}
289
331
290
332
/**
@@ -299,4 +341,19 @@ private function headers(mixed $payload): array
299
341
300
342
return (static ::$ taskHeadersCallback )($ payload );
301
343
}
344
+
345
+ /**
346
+ * @param Closure|string|JobBeforeDispatch $job
347
+ */
348
+ private function getQueueForJob (mixed $ job ): string
349
+ {
350
+ if (is_object ($ job ) && ! $ job instanceof Closure) {
351
+ /** @var JobBeforeDispatch $job */
352
+ if (! empty ($ job ->queue )) {
353
+ return $ job ->queue ;
354
+ }
355
+ }
356
+
357
+ return $ this ->config ['queue ' ];
358
+ }
302
359
}
0 commit comments