2
2
3
3
namespace Stackkit \LaravelGoogleCloudTasksQueue ;
4
4
5
+ use Google \Cloud \Tasks \V2 \Attempt ;
5
6
use Google \Cloud \Tasks \V2 \CloudTasksClient ;
7
+ use Google \Cloud \Tasks \V2 \RetryConfig ;
6
8
use Illuminate \Http \Request ;
7
9
use Illuminate \Queue \Events \JobFailed ;
8
10
use Illuminate \Queue \Worker ;
@@ -14,6 +16,16 @@ class TaskHandler
14
16
private $ publicKey ;
15
17
private $ config ;
16
18
19
+ /**
20
+ * @var CloudTasksQueue
21
+ */
22
+ private $ queue ;
23
+
24
+ /**
25
+ * @var RetryConfig
26
+ */
27
+ private $ retryConfig = null ;
28
+
17
29
public function __construct (CloudTasksClient $ client , Request $ request , OpenIdVerificator $ publicKey )
18
30
{
19
31
$ this ->client = $ client ;
@@ -31,6 +43,8 @@ public function handle($task = null)
31
43
32
44
$ this ->loadQueueConnectionConfiguration ($ task );
33
45
46
+ $ this ->setQueue ();
47
+
34
48
$ this ->authorizeRequest ();
35
49
36
50
$ this ->listenForEvents ();
@@ -48,6 +62,11 @@ private function loadQueueConnectionConfiguration($task)
48
62
);
49
63
}
50
64
65
+ private function setQueue ()
66
+ {
67
+ $ this ->queue = new CloudTasksQueue ($ this ->config , $ this ->client );
68
+ }
69
+
51
70
/**
52
71
* @throws CloudTasksException
53
72
*/
@@ -122,29 +141,63 @@ private function listenForEvents()
122
141
*/
123
142
private function handleTask ($ task )
124
143
{
125
- $ job = new CloudTasksJob ($ task );
144
+ $ job = new CloudTasksJob ($ task , $ this ->queue );
145
+
146
+ $ this ->loadQueueRetryConfig ();
126
147
127
148
$ job ->setAttempts (request ()->header ('X-CloudTasks-TaskRetryCount ' ) + 1 );
128
149
$ job ->setQueue (request ()->header ('X-Cloudtasks-Queuename ' ));
129
- $ job ->setMaxTries ($ this ->getQueueMaxTries ($ job ));
150
+ $ job ->setMaxTries ($ this ->retryConfig ->getMaxAttempts ());
151
+
152
+ // If the job is being attempted again we also check if a
153
+ // max retry duration has been set. If that duration
154
+ // has passed, it should stop trying altogether.
155
+ if ($ job ->attempts () > 1 ) {
156
+ $ job ->setRetryUntil ($ this ->getRetryUntilTimestamp ($ job ));
157
+ }
130
158
131
159
$ worker = $ this ->getQueueWorker ();
132
160
133
161
$ worker ->process ($ this ->config ['connection ' ], $ job , new WorkerOptions ());
134
162
}
135
163
136
- private function getQueueMaxTries ( CloudTasksJob $ job )
164
+ private function loadQueueRetryConfig ( )
137
165
{
138
166
$ queueName = $ this ->client ->queueName (
139
167
$ this ->config ['project ' ],
140
168
$ this ->config ['location ' ],
141
- $ job -> getQueue ( )
169
+ request ()-> header ( ' X-Cloudtasks-Queuename ' )
142
170
);
143
171
144
- return $ this ->client
145
- ->getQueue ($ queueName )
146
- ->getRetryConfig ()
147
- ->getMaxAttempts ();
172
+ $ this ->retryConfig = $ this ->client ->getQueue ($ queueName )->getRetryConfig ();
173
+ }
174
+
175
+ private function getRetryUntilTimestamp (CloudTasksJob $ job )
176
+ {
177
+ $ task = $ this ->client ->getTask (
178
+ $ this ->client ->taskName (
179
+ $ this ->config ['project ' ],
180
+ $ this ->config ['location ' ],
181
+ $ job ->getQueue (),
182
+ request ()->header ('X-Cloudtasks-Taskname ' )
183
+ )
184
+ );
185
+
186
+ $ attempt = $ task ->getFirstAttempt ();
187
+
188
+ if (!$ attempt instanceof Attempt) {
189
+ return null ;
190
+ }
191
+
192
+ if (! $ this ->retryConfig ->hasMaxRetryDuration ()) {
193
+ return null ;
194
+ }
195
+
196
+ $ maxDurationInSeconds = $ this ->retryConfig ->getMaxRetryDuration ()->getSeconds ();
197
+
198
+ $ firstAttemptTimestamp = $ attempt ->getDispatchTime ()->toDateTime ()->getTimestamp ();
199
+
200
+ return $ firstAttemptTimestamp + $ maxDurationInSeconds ;
148
201
}
149
202
150
203
/**
0 commit comments