Skip to content

outbox event publishing failure, queue management in erroneous cases#21

Open
sbilge wants to merge 2 commits into
devfrom
feature/error_marking
Open

outbox event publishing failure, queue management in erroneous cases#21
sbilge wants to merge 2 commits into
devfrom
feature/error_marking

Conversation

@sbilge

@sbilge sbilge commented Jun 16, 2026

Copy link
Copy Markdown
Member

This PR adds failure handling and outside visibility for AEMPack data derivation.
Includes:

  • Data-derivation failures are now caught as a typed DataDerivationError: the processing loop aborts the current AEMPack without publishing partial results and without crashing the service instance
  • An outbox failure event (AEMPackFailedEvent) is published on a processing-status topic — so downstream observers learn about failures.
  • Failed AEMPacks are marked (failed_at) so they are not claimed for processing again. The mark is reset — and the pack retried — when a higher version arrives, or when the config changes.
  • Version-gated queuing: an incoming AEMPack is only (re)stored when its version is higher than the stored one; equal or lower versions are rejected and logged

It does not include:

  • a success event, nor "queued"/"claimed" events
  • handling of model derivation failures (only workflow data-step failures are caught)
  • versioning/provenance on derived (published) AEMPacks — the new version is on incoming packs only and is not propagated downstream
  • removal of the AEMPack.id / (pid, model_name) identity harmonization

@coveralls

Copy link
Copy Markdown

Coverage Report for CI Build 27622313161

Coverage increased (+0.3%) to 92.478%

Details

  • Coverage increased (+0.3%) from the base build.
  • Patch coverage: 2 uncovered changes across 1 file (56 of 58 lines covered, 96.55%).
  • No coverage regressions found.

Uncovered Changes

File Changed Covered %
src/ets/core/aem_pack_registry.py 20 18 90.0%
Total (10 files) 58 56 96.55%

Coverage Regressions

No coverage regressions found.


Coverage Stats

Coverage Status
Relevant Lines: 1037
Covered Lines: 959
Line Coverage: 92.48%
Coverage Strength: 0.92 hits per line

💛 - Coveralls

@sbilge sbilge requested a review from mephenor June 16, 2026 13:52
input_config_path: input_dummy.yaml
original_aem_pack_topic: original-aempacks
derived_aem_pack_topic: derived-aempacks
aem_pack_processing_event_topic: aempack-processing-events

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event in the names is redundant, so a more compact

Suggested change
aem_pack_processing_event_topic: aempack-processing-events
aem_pack_processing_topic: aempack-processing

is better or, slightly less generic, aem_pack_processing_status_topic


version: int = Field(
default=...,
description="Version assigned by the publishing service (RS), incremented on each republish.",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not point to the concrete service and republishing actually doesn't increase the version, only a genuinely new version of the AEMPack does.

Suggested change
description="Version assigned by the publishing service (RS), incremented on each republish.",
description="Current version of the AEMPack. Used to resolve republishing conflicts.",

or something like that.

Comment on lines +88 to +99
async def get_failed_event_dao(
*, dao_publisher_factory: DaoPublisherFactoryProtocol, topic: str
) -> FailedEventDao:
"""Construct an outbox DAO for AEMPack processing-failure events."""
return await dao_publisher_factory.get_dao(
name="aem_pack_failed_events",
id_field="id",
dto_model=AEMPackFailedEvent,
dto_to_event=lambda event: event.model_dump(mode="json"),
event_topic=topic,
autopublish=True,
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be coupled to only the failure event, there should be events fired when an AEMPack has been successfully put into the queue and when one has successfully been processed.
We don't need events on picking up an AEMPack from the queue, only when it reaches a final state.

The collection name and model should be more more generic, something in the direction of status_events.

Comment on lines +63 to +70
aem_pack_processing_event_topic: str = Field(
default=...,
description=(
"Topic for AEMPack processing-lifecycle (status) events, e.g. processing"
" failures, and later successes."
),
examples=["aempack-processing-events"],
)

@mephenor mephenor Jun 17, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned on the yaml file:
Drop the redundant event in the name and change it to aem_pack_processing_status_topic

Comment thread src/ets/core/models.py
Comment on lines +327 to +362
class AEMPackProcessingEvent(BaseModel):
"""Base for AEMPack processing-lifecycle events published on the status channel.

Siblings (e.g. a future succeeded event) share these fields so they can sit on
the same topic and be correlated back to the originating incoming AEMPack.
"""

id: UUID4 = Field(
default_factory=uuid4, description="Unique identifier of the event."
)
pid: str = Field(
default=...,
description="Shared identifier of the incoming AEMPack and its derived packs.",
)
model_name: str = Field(
default=...,
description="Name of the model the AEMPack being processed conforms to.",
)
version: int = Field(
default=..., description="Version of the incoming AEMPack this event concerns."
)


class AEMPackFailedEvent(AEMPackProcessingEvent):
"""Published when data derivation fails for an incoming AEMPack."""

transformation_step: str | None = Field(
default=None,
description="Name of the workflow step that failed, if known.",
)
error_type: str = Field(
default=..., description="Class name of the error that caused the failure."
)
error_message: str = Field(
default=..., description="Human-readable message of the underlying error."
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be merged into one model to be used by the outbox by making all error related fields optional.
For easier disambiguation at the recipient, an enum or literal valued field for the processing step could be included covering "Queued", "Failed" and "Processed" states.

Comment on lines +344 to +354
try:
return runner.run_workflow(
data=aem_pack.data,
annotation=_AnnotationModel.model_validate(aem_pack.annotation),
)
except WorkflowExecutionError as error:
raise DataDerivationError(
pid=aem_pack.pid,
model_name=aem_pack.model_name,
error=error,
) from error

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the step_name accessible on the WorkflowExecutionError?
Promoting that to a field on the DataDerivationError would be a bit more transparent than the current approach.

Comment on lines 341 to 343
runner: WorkflowRunner = WorkflowRunner(
workflow=workflow.workflow, input_model=input_schema
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the step that could potentially fail because it internally rederives the schemas we've already produced during model derivation?

Can't we just wrap that in a try-except and set the step_name to runner_init or something like that?

Can we assume schema equivalence between this and and the previously derived schema or do we need to re-check that?

Also, we could theoretically cache the naive runner, but don't we also store the data transformation results on the runner?
This could cause some problems depending on how it's accessed.

Seems like this needs a solution at two different levels:

  1. Basic error handling and potentially caching in the EMTS
  2. Some level of internal redesign on the metldata level, further decoupling data and model related functionality.

Can we get away with just wrapping this in a try-except here for now and deal with the redundancy/potential but probably unlikely inconsistencies later?

Comment on lines +31 to +32
AEMPack being transformed for logging and audit context; the failing
workflow step is available on the wrapped ``error``.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned elsewhere:
It would make some sense to make this slightly more transparent by also exposing the workflow step directly on this error.

There's some code in the current diff that tries to get it from the wrapped exception via getattr with a None fallback, but this seems like it should always be present, so this requirement should be encoded directly into the error type.

Comment on lines +227 to +235
await self._collection.update_one(
{"_id": aem_pack_id},
{
"$set": {
FAILED_AT_FIELD: now_utc_ms_prec(),
}
},
)
await self.mark_processed(aem_pack_id)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To guard against possible race conditions and make it one call instead of two, inline mark_processed into the update_one call above.

Comment on lines +95 to +104
PROCESSED_AT_FIELD: {
"$cond": {
"if": f"${PROCESSED_AT_FIELD}",
"then": f"${PROCESSED_AT_FIELD}",
"else": None,
}
},
# A newer version clears any prior failure so the pack is
# reprocessed instead of staying parked as failed.
FAILED_AT_FIELD: None,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of an inconsistency here with how marking as failed is treated:
This resets the failed_at field, but preserves the processed_at field, so the corresponding AEMPack is still marked as done, but the failure is masked now and it's not picked up for reprocessing.

The easiest way to solve this would probably be to extend the if clause in the conditional set for the PROCESSED_AT_FIELD to include a check that FAILED_AT_FIELD is currently None.
This would probably need another look in the mongo docs, so we can be sure the check is performed before FAILED_AT_FIELD is set to None.

If that doesn't work out, another way would be to include the FAILED_AT_FIELD in the checks for the claim_next and mark_all_for_reprocessing logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants