Skip to content

Commit 2a6c603

Browse files
authored
Unify channel access (#307)
Add a Channels class that unifies how publishers distribute work across channels and synchronize them.
1 parent ee74b5b commit 2a6c603

File tree

6 files changed

+234
-30
lines changed

6 files changed

+234
-30
lines changed

src/Common/EventStore/Integration/ForkPool/ForwardToChannelStoredEventSubscriber.php

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,42 +5,23 @@
55
namespace Gaming\Common\EventStore\Integration\ForkPool;
66

77
use Gaming\Common\EventStore\DomainEvent;
8-
use Gaming\Common\EventStore\Exception\EventStoreException;
98
use Gaming\Common\EventStore\StoredEventSubscriber;
10-
use Gaming\Common\ForkPool\Channel\Channel;
11-
use InvalidArgumentException;
9+
use Gaming\Common\ForkPool\Channel\Channels;
1210

1311
final class ForwardToChannelStoredEventSubscriber implements StoredEventSubscriber
1412
{
15-
/**
16-
* @param Channel[] $channels
17-
*
18-
* @throws InvalidArgumentException
19-
*/
2013
public function __construct(
21-
private readonly array $channels
14+
private readonly Channels $channels
2215
) {
23-
if (count($this->channels) < 1) {
24-
throw new InvalidArgumentException('no channels given');
25-
}
2616
}
2717

2818
public function handle(DomainEvent $domainEvent): void
2919
{
30-
$shardId = crc32($domainEvent->streamId) % count($this->channels);
31-
$this->channels[$shardId]->send($domainEvent);
20+
$this->channels->consistent($domainEvent->streamId)->send($domainEvent);
3221
}
3322

3423
public function commit(): void
3524
{
36-
foreach ($this->channels as $channel) {
37-
$channel->send('COMMIT');
38-
}
39-
40-
foreach ($this->channels as $channel) {
41-
if ($channel->receive() !== 'ACK') {
42-
throw new EventStoreException('No ack from channel.');
43-
}
44-
}
25+
$this->channels->synchronize();
4526
}
4627
}

src/Common/EventStore/Integration/ForkPool/Worker.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public function execute(Channel $channel): int
1919
{
2020
while ($message = $channel->receive()) {
2121
match ($message) {
22-
'COMMIT' => $this->handleCommit($channel),
22+
Channel::MESSAGE_SYNC => $this->handleCommit($channel),
2323
default => $this->storedEventSubscriber->handle($message)
2424
};
2525
}
@@ -31,6 +31,6 @@ private function handleCommit(Channel $channel): void
3131
{
3232
$this->storedEventSubscriber->commit();
3333

34-
$channel->send('ACK');
34+
$channel->send(Channel::MESSAGE_SYNC_ACK);
3535
}
3636
}

src/Common/EventStore/Integration/Symfony/FollowEventStoreCommand.php

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
use Gaming\Common\EventStore\Integration\ForkPool\Worker;
1313
use Gaming\Common\EventStore\PollableEventStore;
1414
use Gaming\Common\EventStore\StoredEventSubscriber;
15+
use Gaming\Common\ForkPool\Channel\Channel;
16+
use Gaming\Common\ForkPool\Channel\Channels;
1517
use Gaming\Common\ForkPool\Channel\StreamChannelPairFactory;
1618
use Gaming\Common\ForkPool\ForkPool;
1719
use Gaming\Common\ForkPool\Task;
@@ -112,11 +114,13 @@ protected function execute(InputInterface $input, OutputInterface $output): int
112114
$this->pollableEventStore,
113115
$this->eventStorePointerFactory->withName((string)$input->getArgument('pointer')),
114116
new ForwardToChannelStoredEventSubscriber(
115-
array_map(
116-
fn() => $forkPool->fork(
117-
new Worker($this->getStoredEventSubscriber($subscriberNames))
118-
)->channel(),
119-
range(1, $parallelism)
117+
new Channels(
118+
array_map(
119+
fn(): Channel => $forkPool->fork(
120+
new Worker($this->getStoredEventSubscriber($subscriberNames))
121+
)->channel(),
122+
range(1, $parallelism)
123+
)
120124
)
121125
),
122126
max(1, (int)$input->getOption('batch')),

src/Common/ForkPool/Channel/Channel.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
interface Channel
1010
{
11+
public const string MESSAGE_SYNC = 'SYNC';
12+
public const string MESSAGE_SYNC_ACK = 'SYNC_ACK';
13+
1114
/**
1215
* @throws ForkPoolException
1316
*/
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Gaming\Common\ForkPool\Channel;
6+
7+
use Gaming\Common\ForkPool\Exception\ForkPoolException;
8+
9+
final class Channels
10+
{
11+
private int $roundRobinIndex = -1;
12+
13+
/**
14+
* @param list<Channel> $channels
15+
*
16+
* @throws ForkPoolException
17+
*/
18+
public function __construct(
19+
private readonly array $channels
20+
) {
21+
if (count($channels) === 0) {
22+
throw new ForkPoolException('At least one channel is required.');
23+
}
24+
}
25+
26+
public function random(): Channel
27+
{
28+
return $this->channels[array_rand($this->channels)];
29+
}
30+
31+
public function roundRobin(): Channel
32+
{
33+
return $this->channels[$this->roundRobinIndex = ++$this->roundRobinIndex % count($this->channels)];
34+
}
35+
36+
public function consistent(string $key): Channel
37+
{
38+
return $this->channels[crc32($key) % count($this->channels)];
39+
}
40+
41+
public function synchronize(?int $timeout = null): void
42+
{
43+
foreach ($this->channels as $channel) {
44+
$channel->send(Channel::MESSAGE_SYNC);
45+
}
46+
47+
foreach ($this->channels as $channel) {
48+
$start = time();
49+
if ($channel->receive($timeout) !== Channel::MESSAGE_SYNC_ACK) {
50+
throw new ForkPoolException('Failed to synchronize all channels.');
51+
}
52+
$timeout !== null && $timeout = max(0, $timeout - (time() - $start));
53+
}
54+
}
55+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Gaming\Tests\Unit\Common\ForkPool\Channel;
6+
7+
use Exception;
8+
use Gaming\Common\ForkPool\Channel\Channel;
9+
use Gaming\Common\ForkPool\Channel\Channels;
10+
use Gaming\Common\ForkPool\Exception\ForkPoolException;
11+
use PHPUnit\Framework\Attributes\Test;
12+
use PHPUnit\Framework\TestCase;
13+
14+
final class ChannelsTest extends TestCase
15+
{
16+
#[Test]
17+
public function itShouldThrowWhenEmpty(): void
18+
{
19+
$this->expectException(ForkPoolException::class);
20+
21+
new Channels([]);
22+
}
23+
24+
#[Test]
25+
public function itShouldReturnRandomChannels(): void
26+
{
27+
$channels = $this->createChannels(100);
28+
29+
for ($i = 0; $i < 100; $i++) {
30+
$this->assertContains($channels->random()->receive(), range(1, 100));
31+
}
32+
}
33+
34+
#[Test]
35+
public function itShouldReturnRoundRobinChannels(): void
36+
{
37+
$channels = $this->createChannels($count = 100);
38+
39+
for ($i = 1; $i <= $count * 3; $i++) {
40+
$this->assertSame($channels->roundRobin()->receive(), $i % $count === 0 ? $count : $i % $count);
41+
}
42+
}
43+
44+
#[Test]
45+
public function itShouldReturnConsistentChannels(): void
46+
{
47+
$keys = ['key1' => 2, 'key2' => 4, 'key3' => 1, 'key4' => 4, 'key5' => 1];
48+
$channels = $this->createChannels(5);
49+
50+
for ($i = 0; $i < 100; $i++) {
51+
foreach ($keys as $key => $expectedReceive) {
52+
$this->assertSame($channels->consistent($key)->receive(), $expectedReceive);
53+
}
54+
}
55+
}
56+
57+
#[Test]
58+
public function itShouldSynchronize(): void
59+
{
60+
$this->createSynchronizeChannels(100)->synchronize();
61+
}
62+
63+
#[Test]
64+
public function itShouldThrowOnSynchronizeWithInvalidMessage(): void
65+
{
66+
$this->expectException(ForkPoolException::class);
67+
68+
$this->createSynchronizeChannels(100, receive: 'invalid')->synchronize();
69+
}
70+
71+
#[Test]
72+
public function itShouldThrowOnSynchronizeWhenTimeout(): void
73+
{
74+
$called1 = $called2 = $called3 = $called4 = $called5 = false;
75+
76+
try {
77+
// Passing a clock just to fake timeouts for this little test doesn’t feel right.
78+
// Using sleep with max 1s instead.
79+
new Channels(
80+
[
81+
$this->createSleepingChannel($called1, 1, 360), // This times out, but succeeds in the last second.
82+
$this->createSleepingChannel($called2), // This succeeds immediately.
83+
$this->createSleepingChannel($called3), // This too.
84+
$this->createSleepingChannel($called4, 0, 360, false), // This actually times out.
85+
$this->createSleepingChannel($called5) // This is never called.
86+
]
87+
)->synchronize(1);
88+
} catch (ForkPoolException) {
89+
$this->assertSame([true, true, true, true], [$called1, $called2, $called3, $called4]);
90+
$this->assertSame(false, $called5);
91+
92+
return;
93+
}
94+
95+
$this->fail(__FUNCTION__ . ' did not throw an exception.');
96+
}
97+
98+
private function createChannels(int $count): Channels
99+
{
100+
$channels = [];
101+
for ($i = 1; $i <= $count; $i++) {
102+
$channel = $this->createMock(Channel::class);
103+
$channel->method('receive')->willReturn($i);
104+
$channels[] = $channel;
105+
}
106+
107+
return new Channels($channels);
108+
}
109+
110+
private function createSynchronizeChannels(
111+
int $count,
112+
?int $timeout = null,
113+
string $receive = Channel::MESSAGE_SYNC_ACK
114+
): Channels {
115+
$channels = [];
116+
for ($i = 1; $i <= $count; $i++) {
117+
$channel = $this->createMock(Channel::class);
118+
$channel->method('send')->with(Channel::MESSAGE_SYNC);
119+
$channel->method('receive')->with($timeout)->willReturn($receive);
120+
$channels[] = $channel;
121+
}
122+
123+
return new Channels($channels);
124+
}
125+
126+
private function createSleepingChannel(
127+
bool &$called,
128+
int $withTimeout = 0,
129+
?int $sleepSeconds = null,
130+
bool $success = true
131+
): Channel {
132+
return new class ($called, $withTimeout, $sleepSeconds, $success) implements Channel {
133+
public function __construct(
134+
private bool &$called,
135+
private int $withTimeout,
136+
private readonly ?int $sleepSeconds,
137+
private readonly bool $success,
138+
) {
139+
}
140+
141+
public function send(mixed $message): void
142+
{
143+
}
144+
145+
public function receive(?int $timeout = null): ?string
146+
{
147+
$this->called = true;
148+
149+
if ($this->withTimeout !== $timeout) {
150+
throw new Exception('Unexpected timeout value.');
151+
}
152+
153+
if ($timeout !== null && $this->sleepSeconds !== null) {
154+
usleep(min($this->sleepSeconds, $timeout) * 1000000 + 100000); // Extra 0.1s to be sure.
155+
}
156+
157+
return $this->success ? Channel::MESSAGE_SYNC_ACK : null;
158+
}
159+
};
160+
}
161+
}

0 commit comments

Comments
 (0)