Skip to content

DM-54533 Replace enqueue webhook with Kafka consumer#80

Merged
atanikan merged 3 commits into
mainfrom
tickets/DM-54533
Jun 2, 2026
Merged

DM-54533 Replace enqueue webhook with Kafka consumer#80
atanikan merged 3 commits into
mainfrom
tickets/DM-54533

Conversation

@atanikan

@atanikan atanikan commented Apr 30, 2026

Copy link
Copy Markdown
Contributor
  • Drop Flask /notify endpoint, gunicorn entrypoint, and NOTIFICATION_SECRET.
  • Consume RGW S3 ObjectCreated notifications (plain JSON, no Avro/SASL -- matches the pattern in lsst-dm/prompt_processing activator.py) via an aiokafka.AIOKafkaConsumer driven by asyncio.run(main()).
  • Add object_names_from_notification() that filters ObjectCreated:* events and maps records to "/" for Info.from_path.
  • Idempotent enqueue: SET NX EX ENQ: guard before lpush so Kafka redelivery (rebalance, pod restart) and operator re-trigger tooling do not double-enqueue. Skipped duplicates log "Skipping duplicate enqueue". The existing "Enqueued to " line is preserved so existing Loki gap-detection scripts continue to parse cleanly.
  • At-least-once: enable_auto_commit=False with a two-stage try block -- parse failures (malformed JSON, tombstones) are committed as poison pills; process failures (Redis errors) leave the offset uncommitted so the message is redelivered on next poll or after pod restart.
  • Dockerfile.enqueue: drop gunicorn/flask, add aiokafka; entrypoint is now a plain Python process. Required env: REDIS_HOST REDIS_PASSWORD KAFKA_CLUSTER KAFKA_TOPIC. Optional: DATASET_REGEXP PROFILE KAFKA_GROUP_ID KAFKA_OFFSET_RESET.

atanikan and others added 2 commits April 30, 2026 12:38
- Drop Flask /notify endpoint, gunicorn entrypoint, and NOTIFICATION_SECRET.
- Consume RGW S3 ObjectCreated notifications (plain JSON, no Avro/SASL --
  matches the pattern in lsst-dm/prompt_processing activator.py) via an
  aiokafka.AIOKafkaConsumer driven by asyncio.run(main()).
- Add object_names_from_notification() that filters ObjectCreated:* events
  and maps records to "<profile><bucket>/<urldecoded key>" for Info.from_path.
- Idempotent enqueue: SET NX EX ENQ:<path> guard before lpush so Kafka
  redelivery (rebalance, pod restart) and operator re-trigger tooling do
  not double-enqueue. Skipped duplicates log "Skipping duplicate enqueue".
  The existing "Enqueued <path> to <bucket>" line is preserved so existing
  Loki gap-detection scripts continue to parse cleanly.
- At-least-once: enable_auto_commit=False with a two-stage try block --
  parse failures (malformed JSON, tombstones) are committed as poison
  pills; process failures (Redis errors) leave the offset uncommitted so
  the message is redelivered on next poll or after pod restart.
- Dockerfile.enqueue: drop gunicorn/flask, add aiokafka; entrypoint is now
  a plain Python process. Required env: REDIS_HOST REDIS_PASSWORD
  KAFKA_CLUSTER KAFKA_TOPIC. Optional: DATASET_REGEXP PROFILE
  KAFKA_GROUP_ID KAFKA_OFFSET_RESET.

Made-with: Cursor
The enqueue-side dedupe key used to inherit FILE_RETENTION (7 days), which is
longer than the Kafka topic retention and longer than the operator workflow
needs.  Anything Kafka could possibly redeliver is bounded by the topic
retention, and operators occasionally need to force a re-enqueue of a path
that recently failed ingest -- waiting a week is too long.

Introduce a dedicated ENQUEUE_DEDUPE_TTL = 24h so the marker decays on its
own within a day; operators can still force an immediate re-enqueue by
deleting the ENQ:<path> key (the manual trigger script in
usdf-embargo-deploy grows a --force flag that does this via
`kubectl exec redis-0 -- redis-cli DEL`).

Co-authored-by: Cursor <cursoragent@cursor.com>

@ctslater ctslater left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this a lot, the locking scheme is very simple. I just have one documentation request.

Comment thread src/enqueue.py Outdated
@atanikan

atanikan commented Jun 2, 2026

Copy link
Copy Markdown
Contributor Author

Merging it after reviews.

@atanikan atanikan merged commit d487f40 into main Jun 2, 2026
4 checks passed
@atanikan atanikan deleted the tickets/DM-54533 branch June 2, 2026 19:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants