Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ jobs:
ports:
- 5672:5672
steps:
- name: Start PubSub emulator
run: |
docker run -d -p 8085:8085 gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators \
gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
timeout 30 bash -c 'until curl -s http://localhost:8085 > /dev/null 2>&1; do sleep 1; done'
- uses: actions/checkout@v6
- uses: shivammathur/setup-php@v2
with:
Expand All @@ -30,4 +35,6 @@ jobs:
tools: composer, phpunit
- run: composer install -n --prefer-dist --no-security-blocking
- run: php vendor/phpunit/phpunit/phpunit -c phpunit.xml --coverage-clover=coverage.xml
env:
PUBSUB_EMULATOR_HOST: localhost:8085
- run: php vendor/bin/coverage-check coverage.xml 10
40 changes: 40 additions & 0 deletions src/Provider/Amqp/AmqpQueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -892,4 +892,44 @@ public function unbindQueue()
);
return $this;
}

/**
* Send a heartbeat on the given connection mode, or all active connections
*
* @param string|null $connectionMode One of CONN_PUSH, CONN_CONSUME, CONN_OTHER, or null for all
*
* @return $this
*/
public function heartbeat($connectionMode = null)
{
if($connectionMode !== null)
{
$this->_heartbeat($connectionMode);
}
else
{
foreach(array_keys($this->_connections) as $mode)
{
$this->_heartbeat($mode);
}
}
return $this;
}

protected function _heartbeat($connectionMode)
{
if(!empty($this->_connections[$connectionMode])
&& $this->_connections[$connectionMode]->isConnected()
)
{
try
{
$this->_connections[$connectionMode]->checkHeartBeat();
}
catch(AMQPHeartbeatMissedException $e)
{
$this->disconnect($connectionMode);
}
}
}
}
10 changes: 7 additions & 3 deletions src/Provider/Google/GooglePubSubProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ private function _getClient()
{
$options['keyFile'] = $this->_loadCredentials($rawCreds);
}
elseif(getenv('PUBSUB_EMULATOR_HOST'))
{
$options['credentials'] = new \Google\Auth\Credentials\InsecureCredentials();
}
$this->_client = new PubSubClient($options);
}
return $this->_client;
Expand Down Expand Up @@ -157,7 +161,7 @@ private function _createTopicAndSub()
}
catch(ConflictException $e)
{
if($e->getCode() != 409)
if($e->getCode() != 409 && $e->getCode() != 6)
{
throw $e;
}
Expand All @@ -169,7 +173,7 @@ private function _createTopicAndSub()
}
catch(ConflictException $e)
{
if($e->getCode() != 409)
if($e->getCode() != 409 && $e->getCode() != 6)
{
throw $e;
}
Expand Down Expand Up @@ -216,7 +220,7 @@ public function pushBatch(array $batch)
}
catch(NotFoundException $e)
{
if($this->_getAutoCreate() && ($e->getCode() == 404))
if($this->_getAutoCreate() && ($e->getCode() == 404 || $e->getCode() == 5))
{
$this->_createTopicAndSub();
return $topic->publishBatch($messages);
Expand Down
45 changes: 45 additions & 0 deletions tests/Provider/AmqpTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,51 @@ function ($msg, $tag) use ($q) {
);
}

public function testHeartbeat()
{
$q = $this->_getProvider('test_heartbeat_fn');
$q->declareExchange()
->declareQueue()
->bindQueue();

// heartbeat on a specific connection should not disconnect
$q->push('heartbeat_test');
$checksBefore = $q->getHeartbeatCheckCount();
$disconnectsBefore = $q->getDisconnectCount();
$q->heartbeat(AmqpMockProvider::CONN_PUSH);
self::assertGreaterThan($checksBefore, $q->getHeartbeatCheckCount());
self::assertEquals($disconnectsBefore, $q->getDisconnectCount());

// heartbeat on all connections should not disconnect
$checksBefore = $q->getHeartbeatCheckCount();
$q->heartbeat();
self::assertGreaterThan($checksBefore, $q->getHeartbeatCheckCount());
self::assertEquals($disconnectsBefore, $q->getDisconnectCount());
}

public function testHeartbeatAfterMissed()
{
$q = $this->_getProvider('test_heartbeat_missed')->unregisterHeartbeat();
$q->declareExchange()
->declareQueue()
->bindQueue();
$q->push('trigger');

// sleep past the heartbeat interval to cause a missed heartbeat
$timeLeft = (int)$q->config()->getItem('heartbeat') * 3;
while($timeLeft > 0)
{
$timeLeft = sleep($timeLeft);
}

$checksBefore = $q->getHeartbeatCheckCount();
$disconnectsBefore = $q->getDisconnectCount();
$q->heartbeat(AmqpMockProvider::CONN_PUSH);
// should have checked and disconnected due to missed heartbeat
self::assertGreaterThan($checksBefore, $q->getHeartbeatCheckCount());
self::assertGreaterThan($disconnectsBefore, $q->getDisconnectCount());
}

public function testAmqp()
{
$q = $this->_getProvider('test', 'testexchange');
Expand Down
Loading