Skip to content

Commit 76d8b04

Browse files
committed
feat: symfony messenger delays
1 parent 2cef487 commit 76d8b04

File tree

10 files changed

+3076
-2945
lines changed

10 files changed

+3076
-2945
lines changed

packages/Laravel/tests/Queue/LaravelQueueFinalFailureStrategyTest.php

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@
44

55
namespace Test\Ecotone\Laravel\Queue;
66

7+
use DateTimeImmutable;
78
use Ecotone\Laravel\Queue\LaravelQueueMessageChannelBuilder;
89
use Ecotone\Lite\EcotoneLite;
910
use Ecotone\Messaging\Attribute\Asynchronous;
1011
use Ecotone\Messaging\Config\ConfigurationException;
1112
use Ecotone\Messaging\Config\ServiceConfiguration;
1213
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
1314
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
15+
use Ecotone\Messaging\MessageHeaders;
1416
use Ecotone\Modelling\Attribute\CommandHandler;
17+
use Ecotone\Modelling\Attribute\QueryHandler;
1518
use Exception;
1619
use Illuminate\Database\Schema\Blueprint;
1720
use Illuminate\Foundation\Http\Kernel;
@@ -84,6 +87,29 @@ public function test_release_failure_strategy_releases_message_on_exception()
8487
);
8588
}
8689

90+
public function test_sending_with_delay_using_datetime()
91+
{
92+
$container = $this->getContainer();
93+
$delayedService = new DelayedService();
94+
$container->instance(DelayedService::class, $delayedService);
95+
96+
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
97+
[DelayedService::class],
98+
$container,
99+
ServiceConfiguration::createWithAsynchronicityOnly()
100+
->withExtensionObjects([
101+
LaravelQueueMessageChannelBuilder::create('async', 'database'),
102+
])
103+
);
104+
105+
$ecotoneTestSupport->sendCommandWithRoutingKey('execute.delayed_command', new DelayedCommand('test_1'), metadata: [
106+
MessageHeaders::DELIVERY_DELAY => (new DateTimeImmutable())->modify('+1 second'),
107+
]);
108+
$ecotoneTestSupport->run('async', ExecutionPollingMetadata::createWithTestingSetup(maxExecutionTimeInMilliseconds: 2000));
109+
110+
$this->assertEquals(['test_1'], $delayedService->getMessages());
111+
}
112+
87113
private function getContainer(): ContainerInterface
88114
{
89115
$app = require __DIR__ . '/../Application/bootstrap/app.php';
@@ -109,3 +135,32 @@ public function execute(FailingCommand $command): void
109135
throw new Exception('Failing');
110136
}
111137
}
138+
139+
class DelayedCommand
140+
{
141+
public function __construct(public readonly string $payload)
142+
{
143+
}
144+
}
145+
146+
class DelayedService
147+
{
148+
/** @var string[] */
149+
private array $messages = [];
150+
151+
#[Asynchronous('async')]
152+
#[CommandHandler('execute.delayed_command', 'delayed_endpoint')]
153+
public function execute(DelayedCommand $command): void
154+
{
155+
$this->messages[] = $command->payload;
156+
}
157+
158+
/**
159+
* @return string[]
160+
*/
161+
#[QueryHandler('get.delayed_messages')]
162+
public function getMessages(): array
163+
{
164+
return $this->messages;
165+
}
166+
}

packages/Symfony/.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,8 @@ composer.lock
88
phpunit.xml
99
# Ignore generated container files
1010
/var/cache/
11-
/tests/phpunit/*/var/cache/
11+
/tests/phpunit/*/var/cache/
12+
# Ignore generated reference files
13+
config/reference.php
14+
tests/phpunit/config/reference.php
15+
tests/phpunit/*/config/reference.php

packages/Symfony/SymfonyBundle/Messenger/SymfonyMessageConverter.php

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44

55
namespace Ecotone\SymfonyBundle\Messenger;
66

7+
use DateTimeInterface;
78
use Ecotone\Messaging\Conversion\ConversionService;
89
use Ecotone\Messaging\Conversion\MediaType;
910
use Ecotone\Messaging\Endpoint\FinalFailureStrategy;
1011
use Ecotone\Messaging\Handler\Type;
1112
use Ecotone\Messaging\Message;
1213
use Ecotone\Messaging\MessageConverter\HeaderMapper;
1314
use Ecotone\Messaging\MessageHeaders;
15+
use Ecotone\Messaging\Scheduling\DatePoint;
16+
use Ecotone\Messaging\Scheduling\Duration;
17+
use Ecotone\Messaging\Scheduling\TimeSpan;
1418
use Ecotone\Messaging\Support\MessageBuilder;
1519
use Symfony\Component\Messenger\Envelope;
1620
use Symfony\Component\Messenger\Stamp\DelayStamp;
@@ -53,12 +57,39 @@ public function convertToSymfonyMessage(Message $message, bool $withDelay): Enve
5357
$envelopeToSend = new Envelope($payload, [new MetadataStamp($headers)]);
5458

5559
if ($message->getHeaders()->containsKey(MessageHeaders::DELIVERY_DELAY) && $withDelay) {
56-
$envelopeToSend = $envelopeToSend->with(new DelayStamp($message->getHeaders()->get(MessageHeaders::DELIVERY_DELAY)));
60+
$deliveryDelay = $this->convertDeliveryDelayToMilliseconds(
61+
$message->getHeaders()->get(MessageHeaders::DELIVERY_DELAY),
62+
$message->getHeaders()->getTimestamp()
63+
);
64+
if ($deliveryDelay !== null) {
65+
$envelopeToSend = $envelopeToSend->with(new DelayStamp($deliveryDelay));
66+
}
5767
}
5868

5969
return $envelopeToSend;
6070
}
6171

72+
private function convertDeliveryDelayToMilliseconds(mixed $deliveryDelay, int $messageTimestamp): ?int
73+
{
74+
if ($deliveryDelay instanceof DateTimeInterface) {
75+
$deliveryDelay = DatePoint::createFromInterface($deliveryDelay)->durationSince(DatePoint::createFromTimestamp($messageTimestamp));
76+
}
77+
78+
if ($deliveryDelay instanceof Duration) {
79+
$deliveryDelay = $deliveryDelay->inMilliseconds();
80+
}
81+
82+
if ($deliveryDelay instanceof TimeSpan) {
83+
$deliveryDelay = $deliveryDelay->toMilliseconds();
84+
}
85+
86+
if ($deliveryDelay !== null && $deliveryDelay < 0) {
87+
return null;
88+
}
89+
90+
return $deliveryDelay;
91+
}
92+
6293
public function convertFromSymfonyMessage(Envelope $symfonyEnvelope, TransportInterface $symfonyTransport): Message
6394
{
6495
$headers = $symfonyEnvelope->last(MetadataStamp::class)->getMetadata();

0 commit comments

Comments
 (0)