|
25 | 25 | ) |
26 | 26 |
|
27 | 27 |
|
28 | | -# Number of concurrent workers for parallel classification |
29 | | -CLASSIFICATION_CONCURRENCY = 6 |
| 28 | +# Number of concurrent workers for parallel classification. |
| 29 | +# Must not exceed the connection pool size (default 4) to avoid pool |
| 30 | +# exhaustion, which silently leaves images stuck in STAGED. |
| 31 | +CLASSIFICATION_CONCURRENCY = 4 |
30 | 32 |
|
31 | 33 | # Hard timeout per image to avoid a single stuck S3 read / decode hanging the whole batch. |
32 | 34 | # If exceeded, the image is marked as REJECTED with a generic "Classification failed". |
@@ -788,7 +790,36 @@ async def classify_with_commit(image_record: dict[str, Any]) -> dict[str, Any]: |
788 | 790 |
|
789 | 791 | # Process all images in parallel with controlled concurrency |
790 | 792 | tasks = [classify_with_commit(image) for image in images] |
791 | | - await asyncio.gather(*tasks, return_exceptions=True) |
| 793 | + gather_results = await asyncio.gather(*tasks, return_exceptions=True) |
| 794 | + |
| 795 | + # Detect silently swallowed exceptions (e.g. pool timeout before the |
| 796 | + # inner try/except). These leave images stuck in STAGED with no log. |
| 797 | + for image_record, result_or_exc in zip(images, gather_results): |
| 798 | + if isinstance(result_or_exc, BaseException): |
| 799 | + image_id = image_record["id"] |
| 800 | + log.error( |
| 801 | + f"Classification worker failed for image {image_id} " |
| 802 | + f"(batch {batch_id}): {result_or_exc!r}" |
| 803 | + ) |
| 804 | + # Best-effort: mark the image as rejected so it isn't stuck |
| 805 | + try: |
| 806 | + async with db_pool.connection() as err_conn: |
| 807 | + await ImageClassifier._update_image_status( |
| 808 | + err_conn, |
| 809 | + image_id |
| 810 | + if isinstance(image_id, uuid.UUID) |
| 811 | + else uuid.UUID(image_id), |
| 812 | + ImageStatus.REJECTED, |
| 813 | + _format_classification_failure_reason(result_or_exc), |
| 814 | + ) |
| 815 | + await err_conn.commit() |
| 816 | + async with results_lock: |
| 817 | + results["rejected"] += 1 |
| 818 | + except Exception as cleanup_err: |
| 819 | + log.error( |
| 820 | + f"Failed to mark image {image_id} as rejected " |
| 821 | + f"after worker error: {cleanup_err}" |
| 822 | + ) |
792 | 823 |
|
793 | 824 | log.info( |
794 | 825 | f"Parallel classification complete for batch {batch_id}: " |
|
0 commit comments