Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/UiTPAS/UiTPASIncomingEventServicesProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ function () use ($container) {
$uitpasDeserializerLocator,
$container->get(EventBus::class),
$container->get('config')['amqp']['consumer_tag'],
new GeneratedUuidFactory()
new GeneratedUuidFactory(),
$container->get('dbal_connection')
);

$consumerConfig = $container->get('config')['amqp']['consumers']['uitpas'];
Expand Down
26 changes: 26 additions & 0 deletions src/Broadway/AMQP/EventBusForwardingConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use Broadway\Domain\Metadata;
use Broadway\EventHandling\EventBus;
use CultuurNet\UDB3\Deserializer\DeserializerLocatorInterface;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use CultuurNet\UDB3\Model\ValueObject\Identity\UuidFactory\UuidFactory;

Expand All @@ -19,6 +21,8 @@ final class EventBusForwardingConsumer extends AbstractConsumer

private UuidFactory $uuidFactory;

private Connection $dbalConnection;

public function __construct(
AMQPStreamConnection $connection,
EventBus $eventBus,
Expand All @@ -27,10 +31,12 @@ public function __construct(
string $exchangeName,
String $queueName,
UuidFactory $uuidFactory,
Connection $dbalConnection,
int $delay = 0
) {
$this->eventBus = $eventBus;
$this->uuidFactory = $uuidFactory;
$this->dbalConnection = $dbalConnection;

parent::__construct(
$connection,
Expand All @@ -48,6 +54,8 @@ public function __construct(
*/
protected function handle($deserializedMessage, array $context): void
{
$this->ensureDatabaseConnection();

// If the deserializer did not return a DomainMessage yet, then
// consider the returned value as the payload, and wrap it in a
// DomainMessage.
Expand All @@ -65,4 +73,22 @@ protected function handle($deserializedMessage, array $context): void
new DomainEventStream([$deserializedMessage])
);
}

private function ensureDatabaseConnection(): void
{
try {
if (!$this->dbalConnection->isConnected()) {
$connected = $this->dbalConnection->connect();
if (!$connected) {
$this->logger->critical('Reconnection to database failed');
} else {
$this->logger->debug('Connection to database restored successfully');
}
} else {
$this->logger->debug('Connection to database successfully verified');
}
} catch (Exception $exception) {
$this->logger->critical('Connection checks to database failed with exception:' . $exception->getMessage());
}
}
}
8 changes: 7 additions & 1 deletion src/Broadway/AMQP/EventBusForwardingConsumerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Broadway\EventHandling\EventBus;
use CultuurNet\UDB3\Deserializer\DeserializerLocatorInterface;
use Doctrine\DBAL\Connection;
use InvalidArgumentException;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -33,14 +34,17 @@ final class EventBusForwardingConsumerFactory

private UuidFactory $uuidFactory;

private Connection $dbalConnection;

public function __construct(
int $executionDelay,
AMQPStreamConnection $connection,
LoggerInterface $logger,
DeserializerLocatorInterface $deserializerLocator,
EventBus $eventBus,
string $consumerTag,
UuidFactory $uuidFactory
UuidFactory $uuidFactory,
Connection $dbalConnection
) {
if ($executionDelay < 0) {
throw new InvalidArgumentException('Execution delay should be zero or higher.');
Expand All @@ -53,6 +57,7 @@ public function __construct(
$this->eventBus = $eventBus;
$this->consumerTag = $consumerTag;
$this->uuidFactory = $uuidFactory;
$this->dbalConnection = $dbalConnection;
}

public function create(
Expand All @@ -67,6 +72,7 @@ public function create(
$exchange,
$queue,
$this->uuidFactory,
$this->dbalConnection,
$this->executionDelay
);

Expand Down
88 changes: 87 additions & 1 deletion tests/Broadway/AMQP/EventBusForwardingConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
use CultuurNet\UDB3\Deserializer\DeserializerLocatorInterface;
use CultuurNet\UDB3\Deserializer\DeserializerNotFoundException;
use CultuurNet\UDB3\Model\ValueObject\Identity\UuidFactory\GeneratedUuidFactory;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
Expand Down Expand Up @@ -40,6 +42,7 @@ final class EventBusForwardingConsumerTest extends TestCase

private DeserializerInterface&MockObject $deserializer;

private Connection&MockObject $dbalConnection;

public function setUp(): void
{
Expand All @@ -61,6 +64,8 @@ public function setUp(): void
->method('channel')
->willReturn($this->channel);

$this->dbalConnection = $this->createMock(Connection::class);

$this->eventBusForwardingConsumer = new EventBusForwardingConsumer(
$this->connection,
$this->eventBus,
Expand All @@ -69,6 +74,7 @@ public function setUp(): void
$this->exchangeName,
$this->queueName,
new GeneratedUuidFactory(),
$this->dbalConnection,
$delay
);

Expand All @@ -94,7 +100,8 @@ public function it_can_get_the_connection(): void
$this->consumerTag,
$this->exchangeName,
$this->queueName,
new GeneratedUuidFactory()
new GeneratedUuidFactory(),
$this->dbalConnection,
);

$expectedConnection = $this->connection;
Expand Down Expand Up @@ -310,4 +317,83 @@ public function it_automatically_acknowledges_when_no_deserializer_was_found():

$this->eventBusForwardingConsumer->consume($message);
}

/**
* @test
* @dataProvider databaseConnectionScenarios
*/
public function it_logs_database_connection_status(array $scenario): void
{
if (isset($scenario['exception'])) {
$this->dbalConnection->expects($this->once())
->method('isConnected')
->willThrowException(new Exception($scenario['exception']));
} else {
$this->dbalConnection->expects($this->once())
->method('isConnected')
->willReturn($scenario['isConnected']);

if (!$scenario['isConnected']) {
$this->dbalConnection->expects($this->once())
->method('connect')
->willReturn($scenario['connectResult']);
}
}

$this->logger->expects($this->once())
->method($scenario['logLevel'])
->with($scenario['logMessage']);

$this->deserializerLocator->method('getDeserializerForContentType')
->willReturn($this->deserializer);
$this->deserializer->method('deserialize')->willReturn('');

$message = new AMQPMessage(
'',
[
'content_type' => 'application/vnd.cultuurnet.udb3-events.dummy-event+json',
'correlation_id' => 'my-correlation-id-123',
]
);
$message->delivery_info['channel'] = $this->channel;
$message->delivery_info['delivery_tag'] = 'my-delivery-tag';

$this->eventBusForwardingConsumer->consume($message);
}

public function databaseConnectionScenarios(): array
{
return [
'connection already active' => [
[
'isConnected' => true,
'logLevel' => 'debug',
'logMessage' => 'Connection to database successfully verified',
],
],
'reconnection successful' => [
[
'isConnected' => false,
'connectResult' => true,
'logLevel' => 'debug',
'logMessage' => 'Connection to database restored successfully',
],
],
'reconnection failed' => [
[
'isConnected' => false,
'connectResult' => false,
'logLevel' => 'critical',
'logMessage' => 'Reconnection to database failed',
],
],
'connection check throws exception' => [
[
'exception' => 'Database connection error',
'logLevel' => 'critical',
'logMessage' => 'Connection checks to database failed with exception:Database connection error',
],
],
];
}
}