Open
Description
I've experienced a few issue while setting up a persistent subscription to update my readl model.
I will try to list them here in a concise way, let's discuss about it afterward.
- The
UserEventAppeared
example class in the read model section should implementEventAppearedOnPersistentSubscription
. - The documentation doesn't tell much about persistent subscription creation. That is especially troublesome because the suggested blueprint uses
PersistentSubscriptionSettings::create() ->build();
which will NOT resolve links and setstartFrom
to -1.- Not resolving links will prevent to retrieve recorded event data (unless I have not understood something). I've also had to manually decode this data
- Using
startFrom: -1
will ignore all the previous events of the stream. As it is suggested to enable the$by_category
projection juste before setting up the persistent subscription, no events will be handled by the mysql subscriber. That is quite tricky to understand IMO.
Here's my read model subscriber:
<?php
declare(strict_types=1);
namespace App\Contribution\Infrastructure\ReadModel;
use Amp\Promise;
use App\Contribution\Application\ReadModel\Contributions;
use Prooph\EventStore\Async\EventAppearedOnPersistentSubscription;
use Prooph\EventStore\Async\EventStorePersistentSubscription;
use Prooph\EventStore\ResolvedEvent;
use Doctrine\DBAL\Connection;
use Amp\Success;
use Prooph\EventStore\Util\Json;
final class PostgreSQLContributions implements Contributions, EventAppearedOnPersistentSubscription
{
private const TABLE_NAME = 'contributions';
private $connection;
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
public function __invoke(
EventStorePersistentSubscription $subscription,
ResolvedEvent $resolvedEvent,
?int $retryCount = null
): Promise {
$event = $resolvedEvent->event();
$data = Json::decode($event->data());
switch ($event->eventType()) {
case 'contribution-opened':
$sql = 'INSERT INTO %s (id, title, url, state, created_at, updated_at) VALUES (:id, :title, :url, \'opened\', :created_at, :updated_at)';
$this->connection->executeUpdate(sprintf($sql, self::TABLE_NAME), [
'id' => $data['id'],
'title' => $data['title'],
'url' => $data['url'],
'created_at' => $data['createdAt'],
'updated_at' => $data['updatedAt'],
]);
break;
case 'contribution-closed':
$sql = 'UPDATE %s SET state = \'closed\', closed_at = :closed_at WHERE id = :id';
$this->connection->executeUpdate(sprintf($sql, self::TABLE_NAME), [
'id' => $data['id'],
'closed_at' => $data['closedAt'],
]);
break;
case 'contribution-merged':
$sql = 'UPDATE %s SET state = \'merged\' WHERE id = :id';
$this->connection->executeUpdate(sprintf($sql, self::TABLE_NAME), [
'id' => $data['id'],
]);
break;
}
return new Success();
}
public function all(): array
{
}
}
Metadata
Assignees
Labels
No labels
Activity