Skip to content
25 changes: 21 additions & 4 deletions src/Driver/FlatFile/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ public function popMessage($queueName, $duration = 5)
$runtime = microtime(true) + $duration;
$queueDir = $this->getQueueDirectory($queueName);

$it = new \GlobIterator($queueDir.DIRECTORY_SEPARATOR.'*.job', \FilesystemIterator::KEY_AS_FILENAME);
$files = array_keys(iterator_to_array($it));

natsort($files);
$files = $this->getJobFiles($queueName);

while (microtime(true) < $runtime) {
if ($files) {
Expand All @@ -107,6 +104,9 @@ public function popMessage($queueName, $duration = 5)
}

return $this->processFileOrFail($queueDir, $id);
} else {
// In order to notice that a new message received, update the list.
$files = $this->getJobFiles($queueName);
}

usleep(1000);
Expand Down Expand Up @@ -244,4 +244,21 @@ private function getJobFilename($queueName)

return $filename;
}

/**
* @param string $queueName
*
* @return string[]
*/
private function getJobFiles($queueName)
{
$it = new \GlobIterator(
$this->getQueueDirectory($queueName) . DIRECTORY_SEPARATOR . '*.job',
\FilesystemIterator::KEY_AS_FILENAME
);
$files = array_keys(iterator_to_array($it));
natsort($files);

return $files;
}
}
21 changes: 21 additions & 0 deletions tests/Driver/FlatFile/DriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@ public function testPopMessage()
}
}

public function testPopMessageWhichPushedAfterTheInitialCollect()
{
$this->driver->createQueue('send-newsletter');

$pid = pcntl_fork();

if ($pid === -1) {
$this->fail('Failed to fork the currently running process: ' . pcntl_strerror(pcntl_get_last_error()));
} elseif ($pid === 0) {
// Child process pushes a message after the initial collect
sleep(5);
$this->driver->pushMessage('send-newsletter', 'test');
exit;
}

list($message, ) = $this->driver->popMessage('send-newsletter', 10);
$this->assertSame('test', $message);

pcntl_waitpid($pid, $status);
}

public function testAcknowledgeMessage()
{
$this->driver->createQueue('send-newsletter');
Expand Down