Skip to content

Commit fff5e7e

Browse files
committed
Batch SQS message processing in jobs to improve efficiency. Add error filtering to process monitor.
1 parent f1a7f3f commit fff5e7e

File tree

3 files changed

+53
-21
lines changed

3 files changed

+53
-21
lines changed

app/Jobs/TesseractOcrProcessJob.php

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,35 @@ public function handle(SqsClient $sqs): void
8888
->where('processed', 0)
8989
->orderBy('id')
9090
->chunkById(1000, function ($files) use ($sqs, $queueUrl, $updatesQueueUrl, &$sentCount) {
91+
$batch = [];
9192
foreach ($files as $file) {
92-
$payload = [
93-
'ocrQueueFileId' => $file->id,
94-
'subjectId' => $file->subject_id,
95-
'access_uri' => $file->access_uri,
96-
'updatesQueueUrl' => $updatesQueueUrl,
93+
$batch[] = [
94+
'Id' => (string) $file->id,
95+
'MessageBody' => json_encode([
96+
'ocrQueueFileId' => $file->id,
97+
'subjectId' => $file->subject_id,
98+
'access_uri' => $file->access_uri,
99+
'updatesQueueUrl' => $updatesQueueUrl,
100+
]),
97101
];
98102

99-
$sqs->sendMessage([
103+
if (count($batch) === 10) {
104+
$sqs->sendMessageBatch([
105+
'QueueUrl' => $queueUrl,
106+
'Entries' => $batch,
107+
]);
108+
$sentCount += 10;
109+
$batch = [];
110+
}
111+
}
112+
113+
// Send remaining messages
114+
if (! empty($batch)) {
115+
$sqs->sendMessageBatch([
100116
'QueueUrl' => $queueUrl,
101-
'MessageBody' => json_encode($payload),
117+
'Entries' => $batch,
102118
]);
103-
104-
$sentCount++;
119+
$sentCount += count($batch);
105120
}
106121
}, 'id');
107122

app/Jobs/ZooniverseExportProcessImagesJob.php

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,22 +87,38 @@ public function handle(SqsClient $sqs): void
8787
->where('processed', 0)
8888
->orderBy('id')
8989
->chunkById(1000, function ($files) use ($sqs, $queueUrl, $updatesQueueUrl, $processDir, &$sentCount) {
90-
foreach ($files as $file) {
91-
$payload = [
92-
'processDir' => $processDir,
93-
'accessURI' => $file->access_uri,
94-
'subjectId' => $file->subject_id,
95-
's3Bucket' => config('filesystems.disks.s3.bucket'),
96-
'updatesQueueUrl' => $updatesQueueUrl,
97-
'queueId' => $this->exportQueue->id,
90+
$batch = [];
91+
foreach ($files as $index => $file) {
92+
$batch[] = [
93+
'Id' => (string) $file->id, // Required: unique string for the batch
94+
'MessageBody' => json_encode([
95+
'processDir' => $processDir,
96+
'accessURI' => $file->access_uri,
97+
'subjectId' => $file->subject_id,
98+
's3Bucket' => config('filesystems.disks.s3.bucket'),
99+
'updatesQueueUrl' => $updatesQueueUrl,
100+
'queueId' => $this->exportQueue->id,
101+
]),
98102
];
99103

100-
$sqs->sendMessage([
104+
// SQS Batch limit is 10
105+
if (count($batch) === 10) {
106+
$sqs->sendMessageBatch([
107+
'QueueUrl' => $queueUrl,
108+
'Entries' => $batch,
109+
]);
110+
$sentCount += count($batch);
111+
$batch = [];
112+
}
113+
}
114+
115+
// Send remaining messages (less than 10)
116+
if (count($batch) > 0) {
117+
$sqs->sendMessageBatch([
101118
'QueueUrl' => $queueUrl,
102-
'MessageBody' => json_encode($payload),
119+
'Entries' => $batch,
103120
]);
104-
105-
$sentCount++;
121+
$sentCount += count($batch);
106122
}
107123
}, 'id');
108124

app/Livewire/ProcessMonitor.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private function getOcrQueues(): Collection
5959
->select(['id', 'expedition_id', 'project_id', 'total', 'stage'])
6060
->with(['expedition:id,title', 'project:id,title'])
6161
->withCount(['files as processed_files' => fn ($q) => $q->where('processed', 1)])
62+
->where('error', 0)
6263
->orderBy('id')
6364
->get();
6465
}

0 commit comments

Comments
 (0)