Skip to content

MohammadMehrabani/consumer-generator

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Laravel - Consumer Generator

version license

Installation

You can install the package via Composer:

composer require mohammadmehrabani/consumer-generator:dev-master

Next, you must install the service provider to config/app.php:

'providers' => [
    // for laravel 5.4 and below
    MohammadMehrabani\ConsumerGenerator\ConsumerGeneratorServiceProvider::class,
];

Then, if you want to customize folder names, namespaces, etc... You need to publish config with command:

php artisan vendor:publish --provider="MohammadMehrabani\ConsumerGenerator\ConsumerGeneratorServiceProvider" --tag="config"

Now you can edit config/consumer-generator.php

Usage Consumer

Before using generate commdand you should customize config/consumer-generator.php for your own use. You can simply use consumer:generate command by terminal:

php artisan consumer:generate SMS --queue=sms

after run, by default create Consumers folder in app directory and create SMSConsumer.php.

<?php

namespace App\Consumers;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class SMSConsumer implements \MohammadMehrabani\ConsumerGenerator\ConsumerInterface
{
    protected $queue = 'sms';

    /**
     * Process incoming request
     */
    public function listen()
    {
        $connection = new AMQPStreamConnection(
            config('consumer-generator.host'),
            config('consumer-generator.port'),
            config('consumer-generator.user'),
            config('consumer-generator.password'),
            config('consumer-generator.vhost')
        );

        $channel = $connection->channel();

        $channel->queue_declare(
            $this->queue,          #queue
            false,                 #passive
            true,                  #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
            false,                 #exclusive - queues may only be accessed by the current connection
            false                  #auto delete - the queue is deleted when all consumers have finished using it
        );

        /**
         * don't dispatch a new message to a worker until it has processed and
         * acknowledged the previous one. Instead, it will dispatch it to the
         * next worker that is not still busy.
         */
        $channel->basic_qos(
            null,        #prefetch size - prefetch window size in octets, null meaning "no specific limit"
            1,           #prefetch count - prefetch window in terms of whole messages
            null         #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
        );

        /**
         * indicate interest in consuming messages from a particular queue. When they do
         * so, we say that they register a consumer or, simply put, subscribe to a queue.
         * Each consumer (subscription) has an identifier called a consumer tag
         */
        $channel->basic_consume(
            $this->queue,           #queue
            '',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
            false,                  #no local - TRUE: the server will not send messages to the connection that published them
            false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
            false,                  #exclusive - queues may only be accessed by the current connection
            false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
            [$this, 'process']      #callback
        );

        while(count($channel->callbacks)) {
            echo 'Waiting for incoming messages'.PHP_EOL;
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

    /**
     * process received request
     *
     * @param AMQPMessage $msg
     */
    public function process(AMQPMessage $msg)
    {
        // handle code

            // $msg->getBody();

        // end handle code

        /**
         * If a consumer dies without sending an acknowledgement the AMQP broker
         * will redeliver it to another consumer or, if none are available at the
         * time, the broker will wait until at least one consumer is registered
         * for the same queue before attempting redelivery
         */

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
}

start code in:

public function process(AMQPMessage $msg)
{
    // handle code

        $data = json_decode($msg->getBody(), 1);
        $mobile = $data['mobile'];
        // send SMS ...

    // end handle code

Usage Producer

$user = ['user_id' => 12, 'mobile' => '09xxxxxxxxx'];
$producer = new \MohammadMehrabani\ConsumerGenerator\Producer();
$producer->setQueue('sms')->send(json_encode($user));

Contributing

Thank you for considering contributing to the Consumer Generator! The contribution guide can be found in the CONTRIBUTING.md

About

No description, website, or topics provided.

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages