forked from EventSaucePHP/EventSauce
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEventSourcedAggregateRootRepository.php
More file actions
112 lines (96 loc) · 3.72 KB
/
EventSourcedAggregateRootRepository.php
File metadata and controls
112 lines (96 loc) · 3.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
<?php
declare(strict_types=1);
namespace EventSauce\EventSourcing;
use Generator;
use Ramsey\Uuid\Uuid;
use Throwable;
use function assert;
use function count;
/**
* @template T of AggregateRoot
*
* @template-implements AggregateRootRepository<T>
*/
class EventSourcedAggregateRootRepository implements AggregateRootRepository
{
/** @var class-string<T> */
private string $aggregateRootClassName;
private MessageRepository $messages;
private MessageDecorator $decorator;
private MessageDispatcher $dispatcher;
private ClassNameInflector $classNameInflector;
/**
* @param class-string<T> $aggregateRootClassName
*/
public function __construct(
string $aggregateRootClassName,
MessageRepository $messageRepository,
?MessageDispatcher $dispatcher = null,
?MessageDecorator $decorator = null,
?ClassNameInflector $classNameInflector = null
) {
$this->aggregateRootClassName = $aggregateRootClassName;
$this->messages = $messageRepository;
$this->dispatcher = $dispatcher ?: new SynchronousMessageDispatcher();
$this->decorator = $decorator ?: new DefaultHeadersDecorator();
$this->classNameInflector = $classNameInflector ?: new DotSeparatedSnakeCaseInflector();
}
/**
* @return T
*/
public function retrieve(AggregateRootId $aggregateRootId): object
{
try {
/** @var AggregateRoot $className */
/** @phpstan-var class-string<T> $className */
$className = $this->aggregateRootClassName;
$events = $this->retrieveAllEvents($aggregateRootId);
return $className::reconstituteFromEvents($aggregateRootId, $events);
} catch (Throwable $throwable) {
throw UnableToReconstituteAggregateRoot::becauseOf($throwable->getMessage(), $throwable);
}
}
private function retrieveAllEvents(AggregateRootId $aggregateRootId): Generator
{
/** @var Generator<Message> $messages */
$messages = $this->messages->retrieveAll($aggregateRootId);
foreach ($messages as $message) {
yield $message->event();
}
return $messages->getReturn();
}
public function persist(object $aggregateRoot): void
{
assert($aggregateRoot instanceof AggregateRoot, 'Expected $aggregateRoot to be an instance of ' . AggregateRoot::class);
$this->persistEvents(
$aggregateRoot->aggregateRootId(),
$aggregateRoot->aggregateRootVersion(),
...$aggregateRoot->releaseEvents()
);
}
public function persistEvents(AggregateRootId $aggregateRootId, int $aggregateRootVersion, object ...$events): void
{
if (count($events) === 0) {
return;
}
// decrease the aggregate root version by the number of raised events
// so the version of each message represents the version at the time
// of recording.
$aggregateRootVersion = $aggregateRootVersion - count($events);
$metadata = [
Header::AGGREGATE_ROOT_ID => $aggregateRootId,
Header::AGGREGATE_ROOT_TYPE => $this->classNameInflector->classNameToType($this->aggregateRootClassName),
];
$messages = array_map(function (object $event) use ($metadata, &$aggregateRootVersion) {
return $this->decorator->decorate(new Message(
$event,
$metadata + [
Header::AGGREGATE_ROOT_VERSION => ++$aggregateRootVersion,
Header::EVENT_ID => Uuid::uuid4()->toString(),
],
));
}, $events);
$this->messages->persist(...$messages);
$this->dispatcher->dispatch(...$messages);
}
}