Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devcontainer/.dev_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ kafka_enable_dlq: True
input_config_path: input_dummy.yaml
original_aem_pack_topic: original-aempacks
derived_aem_pack_topic: derived-aempacks
aem_pack_processing_event_topic: aempack-processing-events

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event in the names is redundant, so a more compact

Suggested change
aem_pack_processing_event_topic: aempack-processing-events
aem_pack_processing_topic: aempack-processing

is better or, slightly less generic, aem_pack_processing_status_topic

worker_id: test-worker
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ repos:
- id: no-commit-to-branch
args: [--branch, dev, --branch, int, --branch, main]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.13
rev: v0.15.17
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ The service requires the following configuration parameters:
"derived-aempacks"
```

- <a id="properties/aem_pack_processing_event_topic"></a>**`aem_pack_processing_event_topic`** *(string, required)*: Topic for AEMPack processing-lifecycle (status) events, e.g. processing failures, and later successes.

Examples:
```json
"aempack-processing-events"
```

- <a id="properties/original_aem_pack_topic"></a>**`original_aem_pack_topic`** *(string, required)*: Topic informing about new ingress AEMPacks.

Examples:
Expand Down
9 changes: 9 additions & 0 deletions config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
"title": "Derived Aem Pack Topic",
"type": "string"
},
"aem_pack_processing_event_topic": {
"description": "Topic for AEMPack processing-lifecycle (status) events, e.g. processing failures, and later successes.",
"examples": [
"aempack-processing-events"
],
"title": "Aem Pack Processing Event Topic",
"type": "string"
},
"original_aem_pack_topic": {
"description": "Topic informing about new ingress AEMPacks.",
"examples": [
Expand Down Expand Up @@ -287,6 +295,7 @@
},
"required": [
"derived_aem_pack_topic",
"aem_pack_processing_event_topic",
"original_aem_pack_topic",
"service_instance_id",
"kafka_servers",
Expand Down
1 change: 1 addition & 0 deletions example_config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aem_pack_processing_event_topic: aempack-processing-events
db_name: dev
derived_aem_pack_topic: derived-aempacks
generate_correlation_id: true
Expand Down
973 changes: 493 additions & 480 deletions lock/requirements-dev.txt

Large diffs are not rendered by default.

439 changes: 224 additions & 215 deletions lock/requirements.txt

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/ets/adapters/inbound/temporary_event_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ class AEMPackEventConfig(BaseSettings):

class OriginalAEMPack(AEMPack):
"""Model for the incoming AEMPack payload."""

version: int = Field(
default=...,
description="Version assigned by the publishing service (RS), incremented on each republish.",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not point to the concrete service and republishing actually doesn't increase the version, only a genuinely new version of the AEMPack does.

Suggested change
description="Version assigned by the publishing service (RS), incremented on each republish.",
description="Current version of the AEMPack. Used to resolve republishing conflicts.",

or something like that.

)
25 changes: 24 additions & 1 deletion src/ets/adapters/outbound/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from pydantic_settings import BaseSettings

from ets.core import models
from ets.core.models import AEMPack
from ets.core.models import AEMPack, AEMPackFailedEvent
from ets.ports.outbound.dao import (
FailedEventDao,
ModelDao,
RouteDao,
WorkflowDao,
Expand Down Expand Up @@ -59,6 +60,14 @@ class AEMPackDaoConfig(BaseSettings):
description="Topic for events informing about derived AEMPacks.",
examples=["derived-aempacks"],
)
aem_pack_processing_event_topic: str = Field(
default=...,
description=(
"Topic for AEMPack processing-lifecycle (status) events, e.g. processing"
" failures, and later successes."
),
examples=["aempack-processing-events"],
)
Comment on lines +63 to +70

@mephenor mephenor Jun 17, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned on the yaml file:
Drop the redundant event in the name and change it to aem_pack_processing_status_topic



async def get_aem_pack_dao(
Expand All @@ -74,3 +83,17 @@ async def get_aem_pack_dao(
autopublish=True,
indexes=[MongoDbIndex(fields={"pid": 1, "model_name": 1})],
)


async def get_failed_event_dao(
*, dao_publisher_factory: DaoPublisherFactoryProtocol, topic: str
) -> FailedEventDao:
"""Construct an outbox DAO for AEMPack processing-failure events."""
return await dao_publisher_factory.get_dao(
name="aem_pack_failed_events",
id_field="id",
dto_model=AEMPackFailedEvent,
dto_to_event=lambda event: event.model_dump(mode="json"),
event_topic=topic,
autopublish=True,
)
Comment on lines +88 to +99

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be coupled to only the failure event, there should be events fired when an AEMPack has been successfully put into the queue and when one has successfully been processed.
We don't need events on picking up an AEMPack from the queue, only when it reaches a final state.

The collection name and model should be more more generic, something in the direction of status_events.

115 changes: 79 additions & 36 deletions src/ets/adapters/outbound/incoming_aem_pack_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
from pydantic import UUID4
from pymongo import ReturnDocument
from pymongo.asynchronous.collection import AsyncCollection
from pymongo.errors import DuplicateKeyError

from ets.constants import (
FAILED_AT_FIELD,
NEEDS_REPROCESSING_FIELD,
PROCESSED_AT_FIELD,
PROCESSOR_FIELD,
TOMBSTONE_FIELD,
VERSION_FIELD,
)
from ets.core.models import AEMPack, IncomingAEMPack
from ets.ports.outbound.incoming_aem_pack_queue import IncomingAEMPackQueuePort
Expand All @@ -52,44 +55,64 @@ def __init__(
self._worker_id = worker_id

async def queue(self, aem_pack: AEMPack) -> None:
"""Upsert an AEMPack into the queue."""
"""Upsert an AEMPack into the queue if its version is newer than the stored one.

The incoming version is compared against any document already stored for the
same id. The document is only (over)written when the incoming version is
strictly higher; equal or lower versions are rejected and logged (the likely
cause is a republish on the RS side). Accepting a newer version also resets
``failed_at``, so a previously failed pack is reprocessed under the new version.
"""
doc = aem_pack.model_dump(mode="json")
doc.pop("id")
doc["correlation_id"] = str(get_correlation_id())

await self._collection.find_one_and_update(
filter={"_id": aem_pack.id},
update=[
{
"$set": {
**doc,
# Preserve the current processor so the in-flight instance can still
# complete and mark the doc as done; it will be requeued via needs_reprocessing.
PROCESSOR_FIELD: {
"$cond": {
"if": f"${PROCESSOR_FIELD}",
"then": f"${PROCESSOR_FIELD}",
"else": None,
}
},
NEEDS_REPROCESSING_FIELD: {
"$or": [
{"$ne": [f"${PROCESSOR_FIELD}", None]},
{"$ne": [f"${PROCESSED_AT_FIELD}", None]},
]
},
PROCESSED_AT_FIELD: {
"$cond": {
"if": f"${PROCESSED_AT_FIELD}",
"then": f"${PROCESSED_AT_FIELD}",
"else": None,
}
},
incoming_version = doc[VERSION_FIELD]

try:
await self._collection.find_one_and_update(
# No match when the stored version is >= incoming: upsert then tries to
# insert a duplicate _id, which surfaces as DuplicateKeyError (rejection).
filter={"_id": aem_pack.id, VERSION_FIELD: {"$lt": incoming_version}},
update=[
{
"$set": {
**doc,
# Preserve the current processor so the in-flight instance can still
# complete and mark the doc as done; it will be requeued via needs_reprocessing.
PROCESSOR_FIELD: {
"$cond": {
"if": f"${PROCESSOR_FIELD}",
"then": f"${PROCESSOR_FIELD}",
"else": None,
}
},
NEEDS_REPROCESSING_FIELD: {
"$or": [
{"$ne": [f"${PROCESSOR_FIELD}", None]},
{"$ne": [f"${PROCESSED_AT_FIELD}", None]},
]
},
PROCESSED_AT_FIELD: {
"$cond": {
"if": f"${PROCESSED_AT_FIELD}",
"then": f"${PROCESSED_AT_FIELD}",
"else": None,
}
},
# A newer version clears any prior failure so the pack is
# reprocessed instead of staying parked as failed.
FAILED_AT_FIELD: None,
Comment on lines +95 to +104

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of an inconsistency here with how marking as failed is treated:
This resets the failed_at field, but preserves the processed_at field, so the corresponding AEMPack is still marked as done, but the failure is masked now and it's not picked up for reprocessing.

The easiest way to solve this would probably be to extend the if clause in the conditional set for the PROCESSED_AT_FIELD to include a check that FAILED_AT_FIELD is currently None.
This would probably need another look in the mongo docs, so we can be sure the check is performed before FAILED_AT_FIELD is set to None.

If that doesn't work out, another way would be to include the FAILED_AT_FIELD in the checks for the claim_next and mark_all_for_reprocessing logic.

}
}
}
],
upsert=True,
)
],
upsert=True,
)
except DuplicateKeyError:
log.info(
"AEMPack %s version %s is not newer than the stored version; rejecting.",
aem_pack.id,
incoming_version,
)

async def claim_next(self) -> IncomingAEMPack | None:
"""Claim the next available AEMPack for processing."""
Expand Down Expand Up @@ -185,8 +208,28 @@ async def free(self, aem_pack_id: UUID4) -> None:
)

async def mark_all_for_reprocessing(self) -> None:
"""Flag all processed AEMPacks for reprocessing."""
"""Flag all processed AEMPacks for reprocessing.

Failed AEMPacks are included and their ``failed_at`` is cleared: a config
change may be exactly what fixes the transformation that previously failed,
so they are retried fresh under the new config.
"""
await self._collection.update_many(
{PROCESSED_AT_FIELD: {"$ne": None}, TOMBSTONE_FIELD: {"$ne": True}},
{"$set": {NEEDS_REPROCESSING_FIELD: True}},
{"$set": {NEEDS_REPROCESSING_FIELD: True, FAILED_AT_FIELD: None}},
)

async def mark_as_failed(self, aem_pack_id: UUID4) -> None:
"""Mark an AEMPack as failed when data derivation raises an exception.
It is marked as processed for the sake of state management to ensure
that it is not picked up again for processing.
"""
await self._collection.update_one(
{"_id": aem_pack_id},
{
"$set": {
FAILED_AT_FIELD: now_utc_ms_prec(),
}
},
)
await self.mark_processed(aem_pack_id)
Comment on lines +227 to +235

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To guard against possible race conditions and make it one call instead of two, inline mark_processed into the update_one call above.

2 changes: 2 additions & 0 deletions src/ets/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@
PROCESSOR_FIELD = "processor"
PROCESSED_AT_FIELD = "processed_at"
NEEDS_REPROCESSING_FIELD = "needs_reprocessing"
FAILED_AT_FIELD = "failed_at"
VERSION_FIELD = "version"
TOMBSTONE_FIELD = "is_deleted"
Loading
Loading