Skip to content

Commit 4a70a01

Browse files
authored
feat: per-handler error channel and delayed retry for async endpoints (fixes #659) (#665)
* additional features * delayed retry * fail-fast on #[ErrorChannel] / #[DelayedRetry] placed directly on async handler methods Both attributes have no effect when placed alongside #[Asynchronous] on a handler method; they must be passed via #[Asynchronous(asynchronousExecution: [...])] for the polling consumer to pick them up. Detect the misplacement at compile time and throw a ConfigurationException pointing the user at the correct form. * reject #[DelayedRetry] on inbound channel adapters; allow #[InstantRetry] on #[Scheduled] Inbound Channel Adapters (Kafka, AMQP inbound, #[Scheduled]) consume from external systems and have no source Message Channel for the framework to reschedule a delayed retry into — fail-fast at bootstrap with a descriptive error pointing at #[ErrorChannel] and/or #[InstantRetry] as the supported alternatives. InstantRetryAttributeModule now also accepts #[ChannelAdapter] (parent of #[Scheduled]) in addition to #[MessageConsumer], so the recommended workaround works docker-free. * test: KafkaConsumer dead letter replay parity with Dbal scheduled test Mirrors Test\Ecotone\Dbal\Integration\DeadLetterTest::test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler — same shape, real Kafka transport. Verifies that a failure on a #[KafkaConsumer] lands in the DBAL Dead Letter, replyAll() routes it back to the same consumer's handler with the original payload, and the handler runs successfully on the second attempt. * verify replyAll() triggers KafkaConsumer handler synchronously The DLQ handler routes via MessagingEntrypoint, which is a synchronous in-process dispatch — invocations should be 2 immediately after replyAll(), no need to run() the consumer again. Drop the redundant second poll and assert directly. Cuts the test runtime from ~35s to ~4s. * refactor: consolidate ChannelAdapter into MessageConsumer; inline test fixtures; extract exception factory - ChannelAdapter now extends MessageConsumer (and MessageConsumer extends IdentifiedAnnotation), so checks throughout the framework reference a single base class instead of two. - Move all introduced test fixtures inline (anonymous classes for single-use, named classes within the same test file for multi-use). Drops 9 separate fixture files. - Extract ConfigurationException/LicensingException messages for Error Channel + Delayed Retry placement validations into an ErrorChannelExceptionMessages factory; keeps the validation logic readable. - Add a second run() check to dead-letter replay tests (Kafka + Dbal) verifying the replayed message is not re-consumed. * extract more exception messages into ErrorChannelExceptionMessages factory Move 6 more inline exception strings into the factory so the validation logic stays focused on intent rather than text: - DbalDeadLetterHandler: "cannot reply ... no polledChannelName/inboundRequestChannel/routingSlip" - DelayedRetryErrorHandler: "Failed to handle Error Message via Retry Configuration ..." - InstantRetryAttributeModule: "InstantRetry only on Inbound Channel Adapter" + Enterprise licence check - MessagingSystemConfiguration: "asynchronousExecution requires Enterprise" - MessagingGatewayModule: gateway-level Error Channel + DelayedRetry licence checks * update OpenTelemetry tracing tests for new inbound channel adapter span name #[Scheduled] handlers now set INBOUND_REQUEST_CHANNEL, so TracerInterceptor produces "Receiving from inbound channel adapter: <channel>" rather than the old fallback "Endpoint: <endpointId> produced Message". The new format is consistent with pollable channel spans ("Receiving from channel: <channel>") and tells the user where the message went, not just which endpoint produced it. * make multi-tenant projection test deterministic by polling all queued messages The async multi-tenant projection test relies on \$ecotone->run() processing both tenants' queued messages on a single call so each tenant's first message triggers lazy projection initialization. The default polling metadata was not guaranteed to drain the queue in CI, leaving tenant_b's projection state UNINITIALIZED and the in_progress_tickets table absent in tenant_b's database. Pass explicit ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 2) so both tenants' events are reliably processed in one run() call, ensuring lazy init fires for each tenant before queries run.
1 parent 0c0282c commit 4a70a01

33 files changed

Lines changed: 1529 additions & 82 deletions

packages/Dbal/src/Recoverability/DbalDeadLetterConsoleCommand.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@ public function show(DeadLetterGateway $deadLetterGateway, string $messageId, bo
5353
[
5454
['Message Id', $message->getHeaders()->getMessageId()],
5555
['Failed At', $this->convertTimestampToReadableFormat($message->getHeaders()->getTimestamp())],
56-
['Channel Name', $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME)],
56+
['Channel Name', $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME)
57+
? $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME)
58+
: ($message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL)
59+
? $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL)
60+
: 'Unknown')],
5761
['Type', $message->getHeaders()->containsKey(MessageHeaders::TYPE_ID) ? $message->getHeaders()->get(MessageHeaders::TYPE_ID) : 'Unknown'],
5862
['Stacktrace', $this->getReadableStacktrace($message->getHeaders()->get(ErrorContext::EXCEPTION_STACKTRACE), $fullDetails)],
5963
]

packages/Dbal/src/Recoverability/DbalDeadLetterHandler.php

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,18 @@ private function initialize(): void
245245
private function replyWithoutInitialization(string $messageId, MessagingEntrypointService $messagingEntrypoint): void
246246
{
247247
$message = $this->show($messageId);
248-
if (! $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) && ! $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP)) {
249-
throw InvalidArgumentException::create("Can not reply to message {$messageId}, as it does not contain either `polledChannelName` or `routingSlip` header. Please add one of them, so Message can be routed back to the original channel.");
248+
$hasPolledChannel = $message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME);
249+
$hasInboundRequestChannel = $message->getHeaders()->containsKey(MessageHeaders::INBOUND_REQUEST_CHANNEL);
250+
$hasRoutingSlip = $message->getHeaders()->containsKey(MessageHeaders::ROUTING_SLIP);
251+
252+
if (! $hasPolledChannel && ! $hasInboundRequestChannel && ! $hasRoutingSlip) {
253+
throw InvalidArgumentException::create(\Ecotone\Messaging\Config\Annotation\ModuleConfiguration\ErrorChannelExceptionMessages::cannotReplyToDeadLetterMessage($messageId));
250254
}
251255

252-
if ($message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME)) {
256+
if ($hasPolledChannel) {
253257
$entrypoint = $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME);
258+
} elseif ($hasInboundRequestChannel) {
259+
$entrypoint = $message->getHeaders()->get(MessageHeaders::INBOUND_REQUEST_CHANNEL);
254260
} else {
255261
// This allows to replay Error Message stored for synchronous calls (non asynchronous)
256262
$routingSlip = $message->getHeaders()->resolveRoutingSlip();

packages/Dbal/tests/Integration/DeadLetterTest.php

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Ecotone\Lite\Test\FlowTestSupport;
1111
use Ecotone\Messaging\Config\ModulePackageList;
1212
use Ecotone\Messaging\Config\ServiceConfiguration;
13+
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
1314
use Ecotone\Messaging\Endpoint\PollingMetadata;
1415
use Ecotone\Messaging\Handler\Recoverability\ErrorContext;
1516
use Ecotone\Messaging\MessageHeaders;
@@ -203,6 +204,81 @@ public function test_same_event_is_stored_in_dead_letter_twice_for_different_end
203204
$this->assertErrorMessageCount($ecotone, 0);
204205
}
205206

207+
public function test_inbound_channel_adapter_failure_lands_in_dead_letter_and_replays_back_to_handler(): void
208+
{
209+
$handler = new class () {
210+
public const ENDPOINT_ID = 'failingInboundAdapter';
211+
public const REQUEST_CHANNEL = 'failingInboundAdapterRequestChannel';
212+
213+
public bool $shouldFail = true;
214+
public int $invocations = 0;
215+
/** @var string[] */
216+
public array $processedPayloads = [];
217+
private bool $hasEmitted = false;
218+
219+
#[\Ecotone\Messaging\Attribute\Scheduled(self::REQUEST_CHANNEL, self::ENDPOINT_ID)]
220+
#[\Ecotone\Messaging\Attribute\Poller(executionTimeLimitInMilliseconds: 1, handledMessageLimit: 1)]
221+
public function emit(): ?string
222+
{
223+
if ($this->hasEmitted) {
224+
return null;
225+
}
226+
$this->hasEmitted = true;
227+
228+
return 'first-payload';
229+
}
230+
231+
#[\Ecotone\Messaging\Attribute\ServiceActivator(self::REQUEST_CHANNEL)]
232+
public function handle(string $payload): void
233+
{
234+
$this->invocations++;
235+
if ($this->shouldFail) {
236+
throw new \RuntimeException('simulated');
237+
}
238+
$this->processedPayloads[] = $payload;
239+
}
240+
};
241+
$connectionFactory = $this->getConnectionFactory();
242+
243+
$ecotone = EcotoneLite::bootstrapFlowTesting(
244+
containerOrAvailableServices: [
245+
$handler,
246+
DbalConnectionFactory::class => $connectionFactory,
247+
'managerRegistry' => $connectionFactory,
248+
],
249+
configuration: ServiceConfiguration::createWithDefaults()
250+
->withEnvironment('prod')
251+
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
252+
->withDefaultErrorChannel(DbalDeadLetterBuilder::STORE_CHANNEL),
253+
classesToResolve: [$handler::class],
254+
pathToRootCatalog: __DIR__ . '/../../',
255+
);
256+
257+
$ecotone->run('failingInboundAdapter', ExecutionPollingMetadata::createWithTestingSetup(
258+
amountOfMessagesToHandle: 1,
259+
failAtError: false,
260+
));
261+
262+
$this->assertErrorMessageCount($ecotone, 1);
263+
$this->assertSame(1, $handler->invocations, 'Handler must have been invoked once before the failure was captured');
264+
$this->assertSame([], $handler->processedPayloads);
265+
266+
$handler->shouldFail = false;
267+
$this->replyAllErrorMessages($ecotone);
268+
269+
$this->assertErrorMessageCount($ecotone, 0);
270+
$this->assertSame(2, $handler->invocations, 'replyAll() must synchronously re-invoke the handler via MessagingEntrypoint — no second run() needed');
271+
$this->assertSame(['first-payload'], $handler->processedPayloads, 'Replayed Message must carry the original payload back to the handler');
272+
273+
$ecotone->run('failingInboundAdapter', ExecutionPollingMetadata::createWithTestingSetup(
274+
amountOfMessagesToHandle: 1,
275+
failAtError: false,
276+
));
277+
278+
$this->assertSame(2, $handler->invocations, 'Subsequent polls must not re-process the replayed Message (emit() returns null after the first emission)');
279+
$this->assertSame(['first-payload'], $handler->processedPayloads);
280+
}
281+
206282
private function assertErrorMessageCount(FlowTestSupport $ecotone, int $amount, string $deadLetterReference = DeadLetterGateway::class): void
207283
{
208284
$gateway = $ecotone->getGateway(DeadLetterGateway::class);

packages/Ecotone/src/Messaging/Attribute/Asynchronous.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@ class Asynchronous
1515
{
1616
private string|array $channelName;
1717
/** @var AsynchronousEndpointAttribute[] */
18-
private array $endpointAnnotations;
18+
private array $asynchronousExecution;
1919

2020
/**
21-
* @param AsynchronousEndpointAttribute[] $endpointAnnotations
21+
* @param AsynchronousEndpointAttribute[] $asynchronousExecution Attributes scoped to the asynchronous execution context — applied when the polling consumer processes the Message, not at the synchronous bus call.
2222
*/
23-
public function __construct(string|array $channelName, array $endpointAnnotations = [])
23+
public function __construct(string|array $channelName, array $asynchronousExecution = [])
2424
{
2525
Assert::notNullAndEmpty($channelName, 'Channel name can not be empty string');
26-
Assert::allInstanceOfType($endpointAnnotations, AsynchronousEndpointAttribute::class);
26+
Assert::allInstanceOfType($asynchronousExecution, AsynchronousEndpointAttribute::class);
2727
$this->channelName = $channelName;
28-
$this->endpointAnnotations = $endpointAnnotations;
28+
$this->asynchronousExecution = $asynchronousExecution;
2929
}
3030

3131
public function getChannelName(): array
@@ -36,8 +36,8 @@ public function getChannelName(): array
3636
/**
3737
* @return AsynchronousEndpointAttribute[]
3838
*/
39-
public function getEndpointAnnotations(): array
39+
public function getAsynchronousExecution(): array
4040
{
41-
return $this->endpointAnnotations;
41+
return $this->asynchronousExecution;
4242
}
4343
}

packages/Ecotone/src/Messaging/Attribute/ChannelAdapter.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
/**
88
* licence Apache-2.0
99
*/
10-
abstract class ChannelAdapter extends IdentifiedAnnotation
10+
abstract class ChannelAdapter extends MessageConsumer
1111
{
1212
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\Messaging\Attribute;
6+
7+
use Attribute;
8+
use Ecotone\Messaging\Support\Assert;
9+
10+
/**
11+
* licence Enterprise
12+
*/
13+
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
14+
final class DelayedRetry implements AsynchronousEndpointAttribute
15+
{
16+
public function __construct(
17+
public readonly int $initialDelayMs,
18+
public readonly int $multiplier = 1,
19+
public readonly ?int $maxDelayMs = null,
20+
public readonly ?int $maxAttempts = 3,
21+
public readonly ?string $deadLetterChannel = null,
22+
) {
23+
Assert::isTrue($initialDelayMs > 0, 'DelayedRetry initialDelayMs must be greater than 0');
24+
Assert::isTrue($multiplier > 0, 'DelayedRetry multiplier must be greater than 0');
25+
Assert::isTrue($maxAttempts === null || $maxAttempts > 0, 'DelayedRetry maxAttempts must be null (unlimited) or greater than 0');
26+
Assert::isTrue($deadLetterChannel === null || $deadLetterChannel !== '', 'DelayedRetry deadLetterChannel must be null or a non-empty channel name');
27+
}
28+
29+
public static function generateChannelName(string $handlerEndpointId): string
30+
{
31+
return 'ecotone.retry.' . $handlerEndpointId;
32+
}
33+
34+
public static function generateGatewayChannelName(string $gatewayInterfaceFqn): string
35+
{
36+
return 'ecotone.retry.gateway.' . str_replace('\\', '.', $gatewayInterfaceFqn);
37+
}
38+
}

packages/Ecotone/src/Messaging/Attribute/ErrorChannel.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* licence Enterprise
1212
*/
1313
#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_METHOD)]
14-
class ErrorChannel
14+
class ErrorChannel implements AsynchronousEndpointAttribute
1515
{
1616
/**
1717
* @param string $errorChannelName Name of the error channel to send Message too

packages/Ecotone/src/Messaging/Attribute/MessageConsumer.php

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,12 @@
88

99
#[Attribute(Attribute::TARGET_METHOD)]
1010
/**
11+
* Base attribute for any Inbound Channel Adapter consuming from an external system
12+
* (Kafka, AMQP, scheduled tasks, etc.). Subclasses include #[KafkaConsumer], #[RabbitConsumer],
13+
* and #[ChannelAdapter] (the base for #[Scheduled]).
14+
*
1115
* licence Apache-2.0
1216
*/
13-
class MessageConsumer
17+
class MessageConsumer extends IdentifiedAnnotation
1418
{
15-
private string $endpointId;
16-
17-
public function __construct(string $endpointId)
18-
{
19-
$this->endpointId = $endpointId;
20-
}
21-
22-
public function getEndpointId(): string
23-
{
24-
return $this->endpointId;
25-
}
2619
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Ecotone\Messaging\Config\Annotation\ModuleConfiguration;
6+
7+
/**
8+
* licence Apache-2.0
9+
*
10+
* @internal
11+
*/
12+
final class ErrorChannelExceptionMessages
13+
{
14+
public static function delayedRetryOnInboundChannelAdapter(string $className, string $methodName): string
15+
{
16+
return "#[DelayedRetry] cannot be used on an Inbound Channel Adapter `{$className}::{$methodName}`. "
17+
. 'Inbound Channel Adapters consume from external systems (Kafka, AMQP, scheduled tasks) and have no source Message Channel for the framework to reschedule a delayed retry into. '
18+
. 'Use #[ErrorChannel] to capture the failure for later replay (e.g. from a Dead Letter), and optionally combine it with #[InstantRetry] for in-process retries before forwarding to the Error Channel.';
19+
}
20+
21+
public static function errorChannelDirectlyOnAsyncHandlerMethod(string $endpointId): string
22+
{
23+
return "Asynchronous handler `{$endpointId}` has `#[ErrorChannel]` placed directly on the handler method — this has no effect on async handlers. "
24+
. "Pass it via the #[Asynchronous] attribute instead: `#[Asynchronous('channel', asynchronousExecution: [new ErrorChannel('...')])]` so the polling consumer routes failures correctly.";
25+
}
26+
27+
public static function delayedRetryDirectlyOnAsyncHandlerMethod(string $endpointId): string
28+
{
29+
return "Asynchronous handler `{$endpointId}` has `#[DelayedRetry]` placed directly on the handler method — this has no effect on async handlers. "
30+
. "Pass it via the #[Asynchronous] attribute instead: `#[Asynchronous('channel', asynchronousExecution: [new DelayedRetry(...)])]` so the polling consumer applies the retry policy correctly.";
31+
}
32+
33+
public static function errorChannelAndDelayedRetryMutuallyExclusiveOnHandler(string $endpointId): string
34+
{
35+
return "Handler `{$endpointId}` declares both #[ErrorChannel] and #[DelayedRetry] in #[Asynchronous] asynchronousExecution — these are mutually exclusive. "
36+
. 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.';
37+
}
38+
39+
public static function errorChannelAndDelayedRetryMutuallyExclusiveOnGateway(string $gatewayInterfaceFqn): string
40+
{
41+
return "Gateway `{$gatewayInterfaceFqn}` declares both #[ErrorChannel] and #[DelayedRetry] — these are mutually exclusive. "
42+
. 'Use #[ErrorChannel] to send failures to a channel you control, OR #[DelayedRetry] to have Ecotone manage the retry+dead-letter flow with a generated channel.';
43+
}
44+
45+
public static function cannotReplyToDeadLetterMessage(string $messageId): string
46+
{
47+
return "Can not reply to message {$messageId}, as it does not contain `polledChannelName`, `inboundRequestChannel` or `routingSlip` header. "
48+
. 'Please add one of them, so Message can be routed back to the original channel.';
49+
}
50+
51+
public static function delayedRetryRequiresPolledChannelName(string $originalErrorMessage): string
52+
{
53+
return 'Failed to handle Error Message via Retry Configuration, as it does not contain information about origination channel from which it was polled. Original error message: '
54+
. $originalErrorMessage;
55+
}
56+
57+
public static function instantRetryNotOnInboundChannelAdapter(string $className, string $methodName): string
58+
{
59+
return "InstantRetry attribute can only be used on Inbound Channel Adapter methods (annotated with MessageConsumer e.g. #[KafkaConsumer], #[RabbitConsumer], #[Scheduled]). "
60+
. "'{$className}::{$methodName}' has none.";
61+
}
62+
63+
public static function instantRetryRequiresEnterprise(): string
64+
{
65+
return 'Instant retry attribute is available only for Ecotone Enterprise.';
66+
}
67+
68+
public static function asynchronousExecutionRequiresEnterprise(string $endpointId): string
69+
{
70+
return "Endpoint annotations on #[Asynchronous] attribute for endpoint `{$endpointId}` require Ecotone Enterprise licence.";
71+
}
72+
73+
public static function gatewayErrorChannelRequiresEnterprise(string $interfaceFqn, string $methodName): string
74+
{
75+
return "Gateway {$interfaceFqn}::{$methodName} is marked with synchronous Error Channel. This functionality is available as part of Ecotone Enterprise.";
76+
}
77+
78+
public static function gatewayDelayedRetryRequiresEnterprise(string $interfaceFqn, string $methodName): string
79+
{
80+
return "Gateway {$interfaceFqn}::{$methodName} is marked with #[DelayedRetry]. This functionality is available as part of Ecotone Enterprise.";
81+
}
82+
}

0 commit comments

Comments
 (0)