Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,23 @@
namespace Gaming\Common\EventStore\Integration\ForkPool;

use Gaming\Common\EventStore\DomainEvent;
use Gaming\Common\EventStore\Exception\EventStoreException;
use Gaming\Common\EventStore\StoredEventSubscriber;
use Gaming\Common\ForkPool\Channel\Channel;
use InvalidArgumentException;
use Gaming\Common\ForkPool\Channel\Channels;

final class ForwardToChannelStoredEventSubscriber implements StoredEventSubscriber
{
/**
* @param Channel[] $channels
*
* @throws InvalidArgumentException
*/
public function __construct(
private readonly array $channels
private readonly Channels $channels
) {
if (count($this->channels) < 1) {
throw new InvalidArgumentException('no channels given');
}
}

public function handle(DomainEvent $domainEvent): void
{
$shardId = crc32($domainEvent->streamId) % count($this->channels);
$this->channels[$shardId]->send($domainEvent);
$this->channels->consistent($domainEvent->streamId)->send($domainEvent);
}

public function commit(): void
{
foreach ($this->channels as $channel) {
$channel->send('COMMIT');
}

foreach ($this->channels as $channel) {
if ($channel->receive() !== 'ACK') {
throw new EventStoreException('No ack from channel.');
}
}
$this->channels->synchronize();
}
}
4 changes: 2 additions & 2 deletions src/Common/EventStore/Integration/ForkPool/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function execute(Channel $channel): int
{
while ($message = $channel->receive()) {
match ($message) {
'COMMIT' => $this->handleCommit($channel),
Channel::MESSAGE_SYNC => $this->handleCommit($channel),
default => $this->storedEventSubscriber->handle($message)
};
}
Expand All @@ -31,6 +31,6 @@ private function handleCommit(Channel $channel): void
{
$this->storedEventSubscriber->commit();

$channel->send('ACK');
$channel->send(Channel::MESSAGE_SYNC_ACK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
use Gaming\Common\EventStore\Integration\ForkPool\Worker;
use Gaming\Common\EventStore\PollableEventStore;
use Gaming\Common\EventStore\StoredEventSubscriber;
use Gaming\Common\ForkPool\Channel\Channel;
use Gaming\Common\ForkPool\Channel\Channels;
use Gaming\Common\ForkPool\Channel\StreamChannelPairFactory;
use Gaming\Common\ForkPool\ForkPool;
use Gaming\Common\ForkPool\Task;
Expand Down Expand Up @@ -112,11 +114,13 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$this->pollableEventStore,
$this->eventStorePointerFactory->withName((string)$input->getArgument('pointer')),
new ForwardToChannelStoredEventSubscriber(
array_map(
fn() => $forkPool->fork(
new Worker($this->getStoredEventSubscriber($subscriberNames))
)->channel(),
range(1, $parallelism)
new Channels(
array_map(
fn(): Channel => $forkPool->fork(
new Worker($this->getStoredEventSubscriber($subscriberNames))
)->channel(),
range(1, $parallelism)
)
)
),
max(1, (int)$input->getOption('batch')),
Expand Down
3 changes: 3 additions & 0 deletions src/Common/ForkPool/Channel/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

interface Channel
{
public const string MESSAGE_SYNC = 'SYNC';
public const string MESSAGE_SYNC_ACK = 'SYNC_ACK';

/**
* @throws ForkPoolException
*/
Expand Down
55 changes: 55 additions & 0 deletions src/Common/ForkPool/Channel/Channels.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

declare(strict_types=1);

namespace Gaming\Common\ForkPool\Channel;

use Gaming\Common\ForkPool\Exception\ForkPoolException;

final class Channels
{
private int $roundRobinIndex = -1;

/**
* @param list<Channel> $channels
*
* @throws ForkPoolException
*/
public function __construct(
private readonly array $channels
) {
if (count($channels) === 0) {
throw new ForkPoolException('At least one channel is required.');
}
}

public function random(): Channel
{
return $this->channels[array_rand($this->channels)];
}

public function roundRobin(): Channel
{
return $this->channels[$this->roundRobinIndex = ++$this->roundRobinIndex % count($this->channels)];
}

public function consistent(string $key): Channel
{
return $this->channels[crc32($key) % count($this->channels)];
}

public function synchronize(?int $timeout = null): void
{
foreach ($this->channels as $channel) {
$channel->send(Channel::MESSAGE_SYNC);
}

foreach ($this->channels as $channel) {
$start = time();
if ($channel->receive($timeout) !== Channel::MESSAGE_SYNC_ACK) {
throw new ForkPoolException('Failed to synchronize all channels.');
}
$timeout !== null && $timeout = max(0, $timeout - (time() - $start));
}
}
}
161 changes: 161 additions & 0 deletions tests/unit/Common/ForkPool/Channel/ChannelsTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
<?php

declare(strict_types=1);

namespace Gaming\Tests\Unit\Common\ForkPool\Channel;

use Exception;
use Gaming\Common\ForkPool\Channel\Channel;
use Gaming\Common\ForkPool\Channel\Channels;
use Gaming\Common\ForkPool\Exception\ForkPoolException;
use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\TestCase;

final class ChannelsTest extends TestCase
{
#[Test]
public function itShouldThrowWhenEmpty(): void
{
$this->expectException(ForkPoolException::class);

new Channels([]);
}

#[Test]
public function itShouldReturnRandomChannels(): void
{
$channels = $this->createChannels(100);

for ($i = 0; $i < 100; $i++) {
$this->assertContains($channels->random()->receive(), range(1, 100));
}
}

#[Test]
public function itShouldReturnRoundRobinChannels(): void
{
$channels = $this->createChannels($count = 100);

for ($i = 1; $i <= $count * 3; $i++) {
$this->assertSame($channels->roundRobin()->receive(), $i % $count === 0 ? $count : $i % $count);
}
}

#[Test]
public function itShouldReturnConsistentChannels(): void
{
$keys = ['key1' => 2, 'key2' => 4, 'key3' => 1, 'key4' => 4, 'key5' => 1];
$channels = $this->createChannels(5);

for ($i = 0; $i < 100; $i++) {
foreach ($keys as $key => $expectedReceive) {
$this->assertSame($channels->consistent($key)->receive(), $expectedReceive);
}
}
}

#[Test]
public function itShouldSynchronize(): void
{
$this->createSynchronizeChannels(100)->synchronize();
}

#[Test]
public function itShouldThrowOnSynchronizeWithInvalidMessage(): void
{
$this->expectException(ForkPoolException::class);

$this->createSynchronizeChannels(100, receive: 'invalid')->synchronize();
}

#[Test]
public function itShouldThrowOnSynchronizeWhenTimeout(): void
{
$called1 = $called2 = $called3 = $called4 = $called5 = false;

try {
// Passing a clock just to fake timeouts for this little test doesn’t feel right.
// Using sleep with max 1s instead.
new Channels(
[
$this->createSleepingChannel($called1, 1, 360), // This times out, but succeeds in the last second.
$this->createSleepingChannel($called2), // This succeeds immediately.
$this->createSleepingChannel($called3), // This too.
$this->createSleepingChannel($called4, 0, 360, false), // This actually times out.
$this->createSleepingChannel($called5) // This is never called.
]
)->synchronize(1);
} catch (ForkPoolException) {
$this->assertSame([true, true, true, true], [$called1, $called2, $called3, $called4]);
$this->assertSame(false, $called5);

return;
}

$this->fail(__FUNCTION__ . ' did not throw an exception.');
}

private function createChannels(int $count): Channels
{
$channels = [];
for ($i = 1; $i <= $count; $i++) {
$channel = $this->createMock(Channel::class);
$channel->method('receive')->willReturn($i);
$channels[] = $channel;
}

return new Channels($channels);
}

private function createSynchronizeChannels(
int $count,
?int $timeout = null,
string $receive = Channel::MESSAGE_SYNC_ACK
): Channels {
$channels = [];
for ($i = 1; $i <= $count; $i++) {
$channel = $this->createMock(Channel::class);
$channel->method('send')->with(Channel::MESSAGE_SYNC);
$channel->method('receive')->with($timeout)->willReturn($receive);
$channels[] = $channel;
}

return new Channels($channels);
}

private function createSleepingChannel(
bool &$called,
int $withTimeout = 0,
?int $sleepSeconds = null,
bool $success = true
): Channel {
return new class ($called, $withTimeout, $sleepSeconds, $success) implements Channel {
public function __construct(
private bool &$called,
private int $withTimeout,
private readonly ?int $sleepSeconds,
private readonly bool $success,
) {
}

public function send(mixed $message): void
{
}

public function receive(?int $timeout = null): ?string
{
$this->called = true;

if ($this->withTimeout !== $timeout) {
throw new Exception('Unexpected timeout value.');
}

if ($timeout !== null && $this->sleepSeconds !== null) {
usleep(min($this->sleepSeconds, $timeout) * 1000000 + 100000); // Extra 0.1s to be sure.
}

return $this->success ? Channel::MESSAGE_SYNC_ACK : null;
}
};
}
}