Skip to content

Commit 8f8f734

Browse files
authored
fix: support EventStreamEmitter inside #[ProjectionFlush] (#662)
* fix: support EventStreamEmitter inside #[ProjectionFlush] Calling EventStreamEmitter::emit() / linkTo() from a #[ProjectionFlush] method threw "Header projection.name does not exists" because the flush dispatch in EcotoneProjectorExecutor skipped the header context that project() sets up (projection.name, projection.live, streamBasedSourced) and never wrapped the dispatch in MessageHeadersPropagatorInterceptor. flush() now mirrors project()'s header context — projection.name stays enterprise-gated via withProjectionName() — and wraps the dispatch in storeHeaders() so EventStreamEmitter's PropagateHeaders gateway can merge those headers into outbound emit/linkTo calls. ProjectorExecutor gains an isRebuilding flag so projection.live is set to false during rebuild, suppressing flush-time emits via the existing live-filter. * refactor: make ProjectorExecutor::flush parameters required Drop the default values on $userState and $isRebuilding so callers must explicitly pass both — the rebuild flag in particular should always come from the surrounding flow rather than silently defaulting to live mode.
1 parent 030570d commit 8f8f734

6 files changed

Lines changed: 230 additions & 9 deletions

File tree

packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,29 @@ public function delete(): void
8080
}
8181
}
8282

83-
public function flush(mixed $userState = null): void
83+
public function flush(mixed $userState, bool $isRebuilding): void
8484
{
85-
if ($this->flushChannel) {
86-
$this->messagingEntrypoint->sendWithHeaders([], $this->withProjectionName([
87-
ProjectingHeaders::PROJECTION_STATE => $userState,
88-
]), $this->flushChannel);
85+
if (! $this->flushChannel) {
86+
return;
8987
}
88+
89+
$headers = $this->withProjectionName([
90+
ProjectingHeaders::PROJECTION_STATE => $userState,
91+
ProjectingHeaders::PROJECTION_LIVE => $this->isLive && ! $isRebuilding,
92+
MessageHeaders::STREAM_BASED_SOURCED => true,
93+
]);
94+
95+
$requestMessage = MessageBuilder::withPayload([])
96+
->setMultipleHeaders($headers)
97+
->build();
98+
99+
$flushChannel = $this->flushChannel;
100+
$this->messageHeadersPropagatorInterceptor->storeHeaders(
101+
function () use ($headers, $flushChannel): void {
102+
$this->messagingEntrypoint->sendWithHeaders([], $headers, $flushChannel);
103+
},
104+
$requestMessage
105+
);
90106
}
91107

92108
public function reset(?string $partitionKey = null): void

packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public function delete(): void
7373
// No deletion needed
7474
}
7575

76-
public function flush(mixed $userState = null): void
76+
public function flush(mixed $userState, bool $isRebuilding): void
7777
{
7878
// No flushing needed
7979
}

packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public function delete(): void
4040
$this->projectedEvents = [];
4141
}
4242

43-
public function flush(mixed $userState = null): void
43+
public function flush(mixed $userState, bool $isRebuilding): void
4444
{
4545
}
4646

packages/Ecotone/src/Projecting/ProjectingManager.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public function executePartitionBatch(?string $partitionKeyValue = null, bool $c
102102
$batchProcessedEvents++;
103103
}
104104
if ($batchProcessedEvents > 0) {
105-
$this->projectorExecutor->flush($userState);
105+
$this->projectorExecutor->flush($userState, $shouldReset);
106106
}
107107

108108
$totalProcessedEvents += $batchProcessedEvents;

packages/Ecotone/src/Projecting/ProjectorExecutor.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ interface ProjectorExecutor
1818
public function project(Event $event, mixed $userState = null, bool $isRebuilding = false): mixed;
1919
public function init(): void;
2020
public function delete(): void;
21-
public function flush(mixed $userState = null): void;
21+
public function flush(mixed $userState, bool $isRebuilding): void;
2222
public function reset(?string $partitionKey = null): void;
2323
}

packages/PdoEventSourcing/tests/Projecting/Partitioned/EmittingEventsProjectionTest.php

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,26 @@
77

88
namespace Test\Ecotone\EventSourcing\Projecting\Partitioned;
99

10+
use Ecotone\EventSourcing\Attribute\FromAggregateStream;
1011
use Ecotone\EventSourcing\Attribute\FromStream;
1112
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
1213
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
1314
use Ecotone\EventSourcing\Attribute\ProjectionReset;
15+
use Ecotone\EventSourcing\Attribute\ProjectionState;
1416
use Ecotone\EventSourcing\EventStore;
1517
use Ecotone\EventSourcing\EventStreamEmitter;
1618
use Ecotone\Lite\EcotoneLite;
1719
use Ecotone\Messaging\Attribute\Parameter\Reference;
1820
use Ecotone\Messaging\Config\ModulePackageList;
1921
use Ecotone\Messaging\Config\ServiceConfiguration;
22+
use Ecotone\Messaging\Support\LicensingException;
2023
use Ecotone\Modelling\Attribute\EventHandler;
2124
use Ecotone\Modelling\Attribute\QueryHandler;
2225
use Ecotone\Projecting\Attribute\Partitioned;
2326
use Ecotone\Projecting\Attribute\ProjectionDeployment;
27+
use Ecotone\Projecting\Attribute\ProjectionFlush;
2428
use Ecotone\Projecting\Attribute\ProjectionV2;
29+
use Ecotone\Projecting\ProjectionRegistry;
2530
use Ecotone\Test\LicenceTesting;
2631
use Enqueue\Dbal\DbalConnectionFactory;
2732

@@ -359,4 +364,204 @@ public function delete(#[Reference] EventStore $eventStore): void
359364
}
360365
};
361366
}
367+
368+
public function test_partitioned_projection_flush_emits_events_using_event_stream_emitter(): void
369+
{
370+
$projection = $this->createFlushEmittingProjection();
371+
372+
$ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore(
373+
classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class],
374+
containerOrAvailableServices: [
375+
$projection,
376+
new TicketEventConverter(),
377+
new TicketListUpdatedConverter(),
378+
DbalConnectionFactory::class => $this->getConnectionFactory(),
379+
],
380+
configuration: ServiceConfiguration::createWithDefaults()
381+
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
382+
->withNamespaces([
383+
'Test\Ecotone\EventSourcing\Fixture\Ticket',
384+
]),
385+
pathToRootCatalog: __DIR__ . '/../../',
386+
runForProductionEventStore: true,
387+
licenceKey: LicenceTesting::VALID_LICENCE,
388+
);
389+
390+
$ecotone
391+
->sendCommand(new RegisterTicket('1', 'Johnny', 'alert'))
392+
->sendCommand(new RegisterTicket('2', 'Jane', 'info'));
393+
394+
$eventStore = $ecotone->getGateway(EventStore::class);
395+
$emittedEvents = $eventStore->load('projection_flush_emitting_projection');
396+
397+
self::assertCount(2, $emittedEvents);
398+
}
399+
400+
public function test_rebuild_should_not_emit_events_from_flush_method(): void
401+
{
402+
$projection = $this->createFlushEmittingProjection();
403+
404+
$ecotone = EcotoneLite::bootstrapFlowTestingWithEventStore(
405+
classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class],
406+
containerOrAvailableServices: [
407+
$projection,
408+
new TicketEventConverter(),
409+
new TicketListUpdatedConverter(),
410+
DbalConnectionFactory::class => $this->getConnectionFactory(),
411+
],
412+
configuration: ServiceConfiguration::createWithDefaults()
413+
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
414+
->withNamespaces([
415+
'Test\Ecotone\EventSourcing\Fixture\Ticket',
416+
]),
417+
pathToRootCatalog: __DIR__ . '/../../',
418+
runForProductionEventStore: true,
419+
licenceKey: LicenceTesting::VALID_LICENCE,
420+
);
421+
422+
$ecotone
423+
->sendCommand(new RegisterTicket('1', 'Johnny', 'alert'))
424+
->sendCommand(new RegisterTicket('2', 'Jane', 'info'));
425+
426+
$eventStore = $ecotone->getGateway(EventStore::class);
427+
self::assertCount(2, $eventStore->load('projection_flush_emitting_projection'));
428+
429+
$ecotone->resetProjection('flush_emitting_projection');
430+
self::assertEmpty($projection->getTickets());
431+
$emittedCountBeforeRebuild = $eventStore->hasStream('projection_flush_emitting_projection')
432+
? count($eventStore->load('projection_flush_emitting_projection'))
433+
: 0;
434+
435+
$ecotone->getGateway(ProjectionRegistry::class)->get('flush_emitting_projection')->prepareRebuild();
436+
437+
self::assertNotEmpty($projection->getTickets());
438+
$emittedCountAfterRebuild = $eventStore->hasStream('projection_flush_emitting_projection')
439+
? count($eventStore->load('projection_flush_emitting_projection'))
440+
: 0;
441+
self::assertSame($emittedCountBeforeRebuild, $emittedCountAfterRebuild);
442+
}
443+
444+
public function test_global_flush_with_projection_state_requires_enterprise_licence(): void
445+
{
446+
$projection = new #[ProjectionV2('global_flush_state_projection'), \Ecotone\EventSourcing\Attribute\FromStream(Ticket::class)] class () {
447+
#[EventHandler(endpointId: 'globalFlushStateProjection.addTicket')]
448+
public function addTicket(TicketWasRegistered $event, #[ProjectionState] array $ticket = []): array
449+
{
450+
$ticket['ticketId'] = $event->getTicketId();
451+
return $ticket;
452+
}
453+
454+
#[ProjectionFlush]
455+
public function flush(#[ProjectionState] array $ticket, EventStreamEmitter $emitter): void
456+
{
457+
if (! isset($ticket['ticketId'])) {
458+
return;
459+
}
460+
$emitter->emit([new TicketListUpdated($ticket['ticketId'])]);
461+
}
462+
};
463+
464+
$this->expectException(LicensingException::class);
465+
$this->expectExceptionMessage('Using #[ProjectionState] in #[ProjectionFlush] methods requires Ecotone Enterprise licence.');
466+
467+
EcotoneLite::bootstrapFlowTestingWithEventStore(
468+
classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class],
469+
containerOrAvailableServices: [
470+
$projection,
471+
new TicketEventConverter(),
472+
new TicketListUpdatedConverter(),
473+
DbalConnectionFactory::class => $this->getConnectionFactory(),
474+
],
475+
configuration: ServiceConfiguration::createWithDefaults()
476+
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
477+
->withNamespaces([
478+
'Test\Ecotone\EventSourcing\Fixture\Ticket',
479+
]),
480+
pathToRootCatalog: __DIR__ . '/../../',
481+
runForProductionEventStore: true,
482+
);
483+
}
484+
485+
public function test_partitioned_flush_emitter_pattern_requires_enterprise_licence(): void
486+
{
487+
$projection = $this->createFlushEmittingProjection();
488+
489+
$this->expectException(LicensingException::class);
490+
$this->expectExceptionMessageMatches('/Enterprise licence/');
491+
492+
EcotoneLite::bootstrapFlowTestingWithEventStore(
493+
classesToResolve: [get_class($projection), TicketListUpdatedConverter::class, TicketListUpdated::class],
494+
containerOrAvailableServices: [
495+
$projection,
496+
new TicketEventConverter(),
497+
new TicketListUpdatedConverter(),
498+
DbalConnectionFactory::class => $this->getConnectionFactory(),
499+
],
500+
configuration: ServiceConfiguration::createWithDefaults()
501+
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE, ModulePackageList::EVENT_SOURCING_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
502+
->withNamespaces([
503+
'Test\Ecotone\EventSourcing\Fixture\Ticket',
504+
]),
505+
pathToRootCatalog: __DIR__ . '/../../',
506+
runForProductionEventStore: true,
507+
);
508+
}
509+
510+
private function createFlushEmittingProjection(): object
511+
{
512+
return new #[ProjectionV2('flush_emitting_projection'), Partitioned, FromAggregateStream(Ticket::class)] class () {
513+
public array $tickets = [];
514+
515+
#[EventHandler(endpointId: 'flushEmittingProjection.addTicket')]
516+
public function addTicket(TicketWasRegistered $event, #[ProjectionState] array $ticket = []): array
517+
{
518+
$ticket['ticketId'] = $event->getTicketId();
519+
$ticket['status'] = 'open';
520+
$this->tickets[$event->getTicketId()] = $ticket;
521+
return $ticket;
522+
}
523+
524+
#[EventHandler(endpointId: 'flushEmittingProjection.closeTicket')]
525+
public function closeTicket(TicketWasClosed $event, #[ProjectionState] array $ticket): array
526+
{
527+
$ticket['status'] = 'closed';
528+
$this->tickets[$event->getTicketId()] = $ticket;
529+
return $ticket;
530+
}
531+
532+
#[ProjectionFlush]
533+
public function flush(#[ProjectionState] array $ticket, EventStreamEmitter $emitter): void
534+
{
535+
if (! isset($ticket['ticketId'])) {
536+
return;
537+
}
538+
539+
$emitter->emit([new TicketListUpdated($ticket['ticketId'])]);
540+
}
541+
542+
#[QueryHandler('getFlushEmittingProjectionTickets')]
543+
public function getTickets(): array
544+
{
545+
return $this->tickets;
546+
}
547+
548+
#[ProjectionReset]
549+
public function reset(#[Reference] EventStore $eventStore): void
550+
{
551+
$this->tickets = [];
552+
if ($eventStore->hasStream('projection_flush_emitting_projection')) {
553+
$eventStore->delete('projection_flush_emitting_projection');
554+
}
555+
}
556+
557+
#[ProjectionDelete]
558+
public function delete(#[Reference] EventStore $eventStore): void
559+
{
560+
$this->tickets = [];
561+
if ($eventStore->hasStream('projection_flush_emitting_projection')) {
562+
$eventStore->delete('projection_flush_emitting_projection');
563+
}
564+
}
565+
};
566+
}
362567
}

0 commit comments

Comments
 (0)