From c41de4674b32cec619cd7fedafb4c442f27370f5 Mon Sep 17 00:00:00 2001 From: Werner Mollentze Date: Thu, 15 Jun 2017 16:09:10 +0200 Subject: [PATCH] Updated FlatFileDriver.php Bug fixes and minor (opinionated) improvements for the flat file driver. * Changed from using an additional suffix (.proceed) to hidden (dot-prefixed) files to emulate a non-visible message state * Fixed incomplete queue removal * Fixes for inconsistent FIFO processing, message ordering and listing - changed from LIFO to FIFO throughout * Refactored some repetitive code --- src/Driver/FlatFileDriver.php | 87 ++++++++++++++++++++++------------- 1 file changed, 55 insertions(+), 32 deletions(-) diff --git a/src/Driver/FlatFileDriver.php b/src/Driver/FlatFileDriver.php index aa06b1fa..a9a4748f 100644 --- a/src/Driver/FlatFileDriver.php +++ b/src/Driver/FlatFileDriver.php @@ -10,8 +10,14 @@ */ class FlatFileDriver implements \Bernard\Driver { + /** + * @var string + */ private $baseDirectory; + /** + * @var integer + */ private $permissions; /** @@ -63,14 +69,7 @@ public function createQueue($queueName) */ public function countMessages($queueName) { - $iterator = new \RecursiveDirectoryIterator( - $this->getQueueDirectory($queueName), - \FilesystemIterator::SKIP_DOTS - ); - $iterator = new \RecursiveIteratorIterator($iterator); - $iterator = new \RegexIterator($iterator, '#\.job$#'); - - return iterator_count($iterator); + return iterator_count($this->getJobIterator($queueName)); } /** @@ -83,7 +82,7 @@ public function pushMessage($queueName, $message) $filename = $this->getJobFilename($queueName); file_put_contents($queueDir.DIRECTORY_SEPARATOR.$filename, $message); - chmod($queueDir . DIRECTORY_SEPARATOR . $filename, $this->permissions); + chmod($queueDir.DIRECTORY_SEPARATOR.$filename, $this->permissions); } /** @@ -93,18 +92,16 @@ public function popMessage($queueName, $duration = 5) { $runtime = microtime(true) + $duration; $queueDir = $this->getQueueDirectory($queueName); - - $it = new \GlobIterator($queueDir.DIRECTORY_SEPARATOR.'*.job', \FilesystemIterator::KEY_AS_FILENAME); - $files = array_keys(iterator_to_array($it)); + $files = $this->getJobFiles($queueName); natsort($files); while (microtime(true) < $runtime) { if ($files) { - $id = array_pop($files); + $id = array_shift($files); $data = array(file_get_contents($queueDir.DIRECTORY_SEPARATOR.$id), $id); - rename($queueDir.DIRECTORY_SEPARATOR.$id, $queueDir.DIRECTORY_SEPARATOR.$id.'.proceed'); - + // Set file hidden (emulating message invisibility) + rename($queueDir.DIRECTORY_SEPARATOR.$id, $queueDir.DIRECTORY_SEPARATOR.'.'.$id); return $data; } @@ -120,7 +117,8 @@ public function popMessage($queueName, $duration = 5) public function acknowledgeMessage($queueName, $receipt) { $queueDir = $this->getQueueDirectory($queueName); - $path = $queueDir.DIRECTORY_SEPARATOR.$receipt.'.proceed'; + // Set path to hidden filename + $path = $queueDir.DIRECTORY_SEPARATOR.'.'.$receipt; if (!is_file($path)) { return; @@ -136,11 +134,9 @@ public function peekQueue($queueName, $index = 0, $limit = 20) { $queueDir = $this->getQueueDirectory($queueName); - $it = new \GlobIterator($queueDir.DIRECTORY_SEPARATOR.'*.job', \FilesystemIterator::KEY_AS_FILENAME); - $files = array_keys(iterator_to_array($it)); + $files = $this->getJobFiles($queueName); natsort($files); - $files = array_reverse($files); $files = array_slice($files, $index, $limit); @@ -158,19 +154,7 @@ public function peekQueue($queueName, $index = 0, $limit = 20) */ public function removeQueue($queueName) { - $iterator = new \RecursiveDirectoryIterator( - $this->getQueueDirectory($queueName), - \FilesystemIterator::SKIP_DOTS - ); - $iterator = new \RecursiveIteratorIterator($iterator); - $iterator = new \RegexIterator($iterator, '#\.job(.proceed)?$#'); - - foreach ($iterator as $file) { - /* @var $file \DirectoryIterator */ - unlink($file->getRealPath()); - } - - rmdir($this->getQueueDirectory($queueName)); + $this->removeDirectoryRecursive($this->getQueueDirectory($queueName)); } /** @@ -226,4 +210,43 @@ private function getJobFilename($queueName) return $filename; } + + /** + * Creates an iterator of all message files in the queue + * @param string $queueName + * @return \GlobIterator + */ + private function getJobIterator($queueName) { + $queueDir = $this->getQueueDirectory($queueName); + $iterator = new \GlobIterator($queueDir.DIRECTORY_SEPARATOR.'*.job', \FilesystemIterator::KEY_AS_FILENAME); + return $iterator; + } + + /** + * Retrieves an array of all message files in the queue + * @param string $queueName + * @return array + */ + private function getJobFiles($queueName) { + $iterator = $this->getJobIterator($queueName); + $files = array_keys(iterator_to_array($iterator)); + return $files; + } + + /** + * Removes a directory recursively + * @param string $directory + */ + private function removeDirectoryRecursive($directory) + { + foreach (glob("{$directory}/{,.}[!.,!..]*", GLOB_MARK|GLOB_BRACE) as $file) + { + if (is_dir($file)) { + $this->removeDirectoryRecursive($file); + } else { + unlink($file); + } + } + rmdir($directory); + } }