A production-ready RabbitMQ queue driver for Laravel with attribute-based topology, automatic retries, and Kubernetes-native deployment.
Build resilient, scalable queue systems using RabbitMQ's powerful routing with Laravel's familiar job syntax. No Horizon required.
- 🎯 Attribute-Based Topology - Define exchanges and queues using PHP 8 attributes on your job classes
- 🔄 Automatic Retries & DLQ - Built-in dead letter queues with configurable retry strategies
- 📊 Priority Queues - Support for message priorities (0-255) on classic queues
- ⏰ Delayed Messages - Schedule jobs with native RabbitMQ delayed message exchange
- 🚀 Laravel-Native - Works with standard
dispatch()- no learning curve - ☸️ Kubernetes-Ready - Custom consumer commands designed for containerized deployments
- 💪 Production-Proven - Built on php-amqplib with heartbeat support and publisher confirms
This package is ideal for applications that need:
- Advanced Routing - Route messages based on patterns, headers, or broadcast to multiple queues
- Guaranteed Delivery - RabbitMQ's persistence and publisher confirms ensure messages aren't lost
- Complex Workflows - Multi-tenant systems, event-driven architectures, microservices communication
- Infrastructure-Level Control - Manage queue topology, clustering, and federation through RabbitMQ itself
- Kubernetes-Native Workers - Deploy queue consumers as standard Kubernetes Deployments with HPA
- Protocol Flexibility - AMQP protocol support for cross-platform messaging (Node.js, Python, Go, etc.)
- PHP 8.2+
- Laravel 11.0+ or 12.0+
- RabbitMQ 3.12+
- php-amqplib/php-amqplib ^3.6
Optional:
rabbitmq_delayed_message_exchangeplugin for delayed messagesrabbitmq_prometheusplugin for Prometheus metrics
composer require lettermint/laravel-rabbitmqPublish the configuration file:
php artisan vendor:publish --tag=rabbitmq-configUpdate your config/queue.php:
'connections' => [
'rabbitmq' => [
'driver' => 'rabbitmq',
'queue' => env('RABBITMQ_QUEUE', 'default'),
'exchange' => env('RABBITMQ_EXCHANGE', ''),
],
],
// Set as default if desired
'default' => env('QUEUE_CONNECTION', 'rabbitmq'),Add to your .env:
QUEUE_CONNECTION=rabbitmq
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/Here's a complete example - from defining a job to processing it:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Lettermint\RabbitMQ\Attributes\ConsumesQueue;
#[ConsumesQueue(
queue: 'emails',
bindings: ['notifications' => 'email.*'], // Listens to notifications exchange
quorum: true, // High availability queue
retryAttempts: 3, // Retry up to 3 times
)]
class SendEmailJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, SerializesModels;
public function __construct(
public string $email,
public string $subject,
public string $message,
) {}
public function handle(): void
{
// Send your email
Mail::to($this->email)->send(new NotificationMail($this->subject, $this->message));
}
}# Create the exchange, queue, and bindings in RabbitMQ
php artisan rabbitmq:declare// Dispatch the job (from anywhere in your app)
SendEmailJob::dispatch('user@example.com', 'Welcome!', 'Thanks for signing up');# Start consuming (in production, run this in a container/supervisor)
php artisan rabbitmq:consume emailsThat's it! Your job will be routed through RabbitMQ and processed with automatic retries and dead letter handling.
💡 Tip: In production, run consumers as Kubernetes Deployments or supervisor processes.
Understanding these RabbitMQ concepts will help you use this package effectively:
┌─────────────┐ routing key: email.welcome ┌──────────────┐
│ Producer │──────────────────────────────────►│ Exchange │
└─────────────┘ │ "notifications"│
└───────┬────────┘
│ binding: email.*
▼
┌──────────────┐
│ Queue │
│ "emails" │
└───────┬──────┘
│
▼
┌──────────────┐
│ Consumer │
│ (Your Job) │
└──────────────┘
- Exchange: Routes messages based on routing keys (like a post office)
- Queue: Stores messages until consumed (like a mailbox)
- Binding: Routing rule connecting exchange to queue (e.g.,
email.*matchesemail.welcome) - Routing Key: Label on each message determining which queue(s) receive it
Instead of manually configuring exchanges and queues in RabbitMQ, define them with attributes:
// This attribute tells the package:
// 1. Create a queue named "emails"
// 2. Create a quorum queue (HA, durable)
// 3. Bind it to the "notifications" exchange with pattern "email.*"
// 4. Set up DLQ with 3 retry attempts
#[ConsumesQueue(
queue: 'emails',
bindings: ['notifications' => 'email.*'],
quorum: true,
retryAttempts: 3,
)]When you run php artisan rabbitmq:declare, the package scans your job classes and creates everything automatically.
Simple job with a queue:
#[ConsumesQueue(
queue: 'default',
bindings: ['tasks' => '#'], // Catch all messages from 'tasks' exchange
)]
class ProcessTaskJob implements ShouldQueue
{
public function handle(): void
{
// Process the task
}
}Creating an exchange (optional - useful for organization):
<?php
namespace App\RabbitMQ\Exchanges;
use Lettermint\RabbitMQ\Attributes\Exchange;
use Lettermint\RabbitMQ\Enums\ExchangeType;
#[Exchange(name: 'tasks', type: ExchangeType::Topic)]
class TasksExchange {}Priority queues for time-sensitive jobs:
use Lettermint\RabbitMQ\Contracts\HasPriority;
#[ConsumesQueue(
queue: 'urgent-tasks',
bindings: ['tasks' => 'urgent.*'],
quorum: false, // Priority requires classic queue
maxPriority: 10, // 0 = lowest, 10 = highest
)]
class UrgentTaskJob implements ShouldQueue, HasPriority
{
public function __construct(
public string $taskId,
public int $priority = 5,
) {}
public function getPriority(): int
{
return $this->priority;
}
}
// Dispatch with high priority
UrgentTaskJob::dispatch($taskId, priority: 10);Delayed/scheduled messages:
// Requires rabbitmq_delayed_message_exchange plugin
// Enable in config/rabbitmq.php: 'delayed.enabled' => true
// Delay by seconds
SendEmailJob::dispatch($email)->delay(300); // 5 minutes
// Delay with Carbon
SendEmailJob::dispatch($email)->delay(now()->addHours(2));
// Schedule for specific time
SendEmailJob::dispatch($email)->delay(now()->tomorrow()->setHour(9));Multiple bindings (listen to multiple routing patterns):
#[ConsumesQueue(
queue: 'notifications',
bindings: [
'events' => ['user.created', 'user.updated'],
'alerts' => 'critical.*',
],
)]Exchange-to-exchange binding (hierarchical routing):
// Parent exchange
#[Exchange(name: 'events', type: ExchangeType::Topic)]
class EventsExchange {}
// Child exchange bound to parent
#[Exchange(
name: 'user-events',
type: ExchangeType::Topic,
bindTo: 'events',
bindRoutingKey: 'user.#',
)]
class UserEventsExchange {}Custom retry strategy:
use Lettermint\RabbitMQ\Enums\RetryStrategy;
#[ConsumesQueue(
queue: 'api-calls',
bindings: ['tasks' => 'api.*'],
retryAttempts: 5,
retryStrategy: RetryStrategy::Exponential,
retryDelays: [30, 60, 300, 900, 3600], // 30s, 1m, 5m, 15m, 1h
)]Repeatable attribute (one job, multiple queues):
// This job can be consumed from either queue
#[ConsumesQueue(queue: 'primary', bindings: ['tasks' => 'important.*'])]
#[ConsumesQueue(queue: 'secondary', bindings: ['tasks' => 'background.*'])]
class FlexibleJob implements ShouldQueue
{
// ...
}# Connection
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/
# Behavior
RABBITMQ_HEARTBEAT=60
RABBITMQ_PREFETCH_COUNT=10
RABBITMQ_DELAYED_ENABLED=true
# Publisher
RABBITMQ_PUBLISHER_CONFIRM=true#[Exchange(
name: 'events', // Required: Exchange name
type: ExchangeType::Topic, // topic, direct, fanout, headers, x-delayed-message
durable: true, // Survive broker restart
autoDelete: false, // Delete when no bindings
internal: false, // Only accessible via e2e bindings
bindTo: 'parent-exchange', // Parent exchange for e2e binding
bindRoutingKey: 'events.#', // Routing pattern for parent
arguments: [], // Custom exchange arguments
)]Exchange Types:
Topic: Pattern-based routing (e.g.,user.*.created)Direct: Exact routing key matchFanout: Broadcast to all bound queuesHeaders: Route by message headersDelayedMessage: Delayed delivery (requires plugin)
#[ConsumesQueue(
// Required
queue: 'my-queue', // Queue name
// Bindings
bindings: [ // Exchange => routing key(s)
'exchange-name' => 'routing.key',
'other-exchange' => ['key1', 'key2'],
],
// Queue type (choose one)
quorum: true, // Quorum queue (HA, recommended)
maxPriority: 10, // Classic with priority (quorum: false)
// Limits
messageTtl: 86400000, // Message TTL in ms (24h)
maxLength: 1000000, // Max queue length
overflow: OverflowBehavior::RejectPublishDlx, // Overflow behavior
// Dead letter & retry
dlqExchange: null, // Custom DLQ exchange (auto-derived if null)
retryAttempts: 3, // Max retries before permanent DLQ
retryStrategy: RetryStrategy::Exponential, // exponential, fixed, linear
retryDelays: [60, 300, 900], // Delays in seconds
// Consumer settings
prefetch: 10, // Messages to prefetch (QoS)
timeout: 30, // Job timeout in seconds
)]Important Notes:
- Quorum queues provide high availability but don't support priorities
- Use classic queues (
quorum: false) if you needmaxPriority - DLQ exchange is auto-created based on your first binding exchange
See config/rabbitmq.php for full options. Key settings:
// Discovery paths (where to scan for attributes)
'discovery' => [
'paths' => [
app_path('Jobs'),
app_path('RabbitMQ'),
],
],
// Default queue settings (for jobs without attributes)
'queue' => [
'exchange' => '', // Fallback exchange
],
// Delayed messages
'delayed' => [
'enabled' => true,
'max_delay' => 86400000, // 24 hours max
],# Declare all exchanges, queues, and bindings
php artisan rabbitmq:declare
# Preview what will be created (dry run)
php artisan rabbitmq:declare --dry-run
# View topology as tree
php artisan rabbitmq:topology
# Export topology as JSON
php artisan rabbitmq:topology --format=json# List all queues with stats
php artisan rabbitmq:queues
# Include DLQ queues in list
php artisan rabbitmq:queues --include-dlq
# Watch mode (updates every 2s)
php artisan rabbitmq:queues --watch
# Purge a queue (delete all messages)
php artisan rabbitmq:purge my-queue# Start consuming from a queue
php artisan rabbitmq:consume my-queue
# With custom settings
php artisan rabbitmq:consume my-queue \
--prefetch=25 \
--timeout=120 \
--max-jobs=500 \
--max-memory=256
# Stop when empty (useful for testing)
php artisan rabbitmq:consume my-queue --stop-when-emptyConsumer Options:
--prefetch: Messages to prefetch (default: 10)--timeout: Job timeout in seconds (default: 60)--max-jobs: Exit after N jobs (0 = unlimited)--max-time: Exit after N seconds (0 = unlimited)--max-memory: Exit if memory exceeds N MB (default: 128)--stop-when-empty: Exit when queue is empty
# Replay DLQ messages back to original queue
php artisan rabbitmq:replay-dlq my-queue
# Preview replay without moving messages
php artisan rabbitmq:replay-dlq my-queue --dry-run
# Limit number of messages to replay
php artisan rabbitmq:replay-dlq my-queue --limit=100
# Inspect DLQ messages without removing them
php artisan rabbitmq:dlq-inspect my-queue
# Inspect specific message
php artisan rabbitmq:dlq-inspect my-queue --id=message-uuid
# Limit number of messages shown
php artisan rabbitmq:dlq-inspect my-queue --limit=20
# JSON output
php artisan rabbitmq:dlq-inspect my-queue --format=json
# Purge DLQ messages (permanently delete)
php artisan rabbitmq:dlq-purge my-queue
# Purge specific message
php artisan rabbitmq:dlq-purge my-queue --id=message-uuid
# Purge old messages only
php artisan rabbitmq:dlq-purge my-queue --older-than=7d
# Preview without deleting
php artisan rabbitmq:dlq-purge my-queue --dry-run
# Skip confirmation
php artisan rabbitmq:dlq-purge my-queue --force# Health check
php artisan rabbitmq:health
# JSON output (for monitoring tools)
php artisan rabbitmq:health --jsonDLQs are automatically created for every queue to handle failed messages.
┌──────────────┐
│ Original Queue│ Job fails or times out
│ "emails" │─────────────┐
└──────────────┘ │
▼
┌──────────────┐
│ DLQ Exchange│
│ "notifications.dlq"
└───────┬──────┘
│
▼
┌──────────────┐
│ DLQ Queue │ Retry after delay
│ "dlq:emails" │─────────────┐
└──────────────┘ │
│
┌────────────────────────────────────────┘
│
▼
┌──────────────┐
│ Original Queue│ If retries exhausted → stays in DLQ
│ "emails" │
└──────────────┘
Exponential (recommended for API calls):
retryStrategy: RetryStrategy::Exponential,
retryDelays: [60, 300, 900], // 1m, 5m, 15m, then 15m for remainingFixed (same delay every time):
retryStrategy: RetryStrategy::Fixed,
retryDelays: [300], // Always 5 minutesLinear (increasing delay):
retryStrategy: RetryStrategy::Linear,
retryDelays: [60], // 1m, 2m, 3m, 4m...- Job throws unhandled exception (after retries)
- Job exceeds timeout
- Consumer rejects without requeue
- Queue message TTL expires
- Queue max-length exceeded (with
overflow: RejectPublishDlx)
Deploy workers as Kubernetes Deployments for automatic scaling and restarts.
apiVersion: apps/v1
kind: Deployment
metadata:
name: queue-worker-emails
spec:
replicas: 3
selector:
matchLabels:
app: queue-worker
queue: emails
template:
metadata:
labels:
app: queue-worker
queue: emails
spec:
containers:
- name: worker
image: your-app:latest
command: ["php", "artisan", "rabbitmq:consume", "emails"]
args:
- "--prefetch=25"
- "--max-jobs=500"
- "--max-memory=256"
env:
- name: RABBITMQ_HOST
value: "rabbitmq.default.svc.cluster.local"
- name: RABBITMQ_USER
valueFrom:
secretKeyRef:
name: rabbitmq-credentials
key: username
- name: RABBITMQ_PASSWORD
valueFrom:
secretKeyRef:
name: rabbitmq-credentials
key: password
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"Scale based on queue depth using KEDA or RabbitMQ metrics:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: queue-worker-emails-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: queue-worker-emails
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: rabbitmq_queue_messages_ready
selector:
matchLabels:
queue: emails
target:
type: AverageValue
averageValue: "100" # Target: 100 messages per pod- Set
--max-jobsto restart workers periodically (prevents memory leaks) - Set
--max-memoryslightly below container limits - Use
livenessProbeandreadinessProbefor health checks - Run
rabbitmq:declarein init container or CI/CD pipeline - Use
PodDisruptionBudgetto maintain availability during updates
Jobs without #[ConsumesQueue] (e.g., from packages) use fallback routing:
Routing: config('rabbitmq.queue.exchange') with key 'fallback.{queue_name}'
Create a catch-all queue for these:
#[ConsumesQueue(
queue: 'fallback',
bindings: ['your-exchange' => 'fallback.#'],
)]
class FallbackJob implements ShouldQueue {}Use Quorum Queues (default) when:
- You need high availability (HA)
- Data durability is critical
- Running in clustered RabbitMQ
Use Classic Queues when:
- You need message priorities
- You need very low latency (single-node)
- Legacy compatibility required
Cannot combine: quorum: true and maxPriority are mutually exclusive.
Publisher confirms ensure messages reach RabbitMQ successfully. Enabled by default:
'publisher' => [
'confirm' => true, // Wait for RabbitMQ acknowledgment
],If confirm fails, Laravel throws an exception and the job can be retried by your queue worker.
The package sends heartbeats automatically during job execution to prevent connection timeouts.
For jobs longer than 2× heartbeat interval:
- Heartbeats work automatically with
ext-pcntl - If job has
$timeoutproperty, heartbeats are disabled during execution (both useSIGALRM) - For long jobs needing heartbeat: set
public $timeout = 0;on the job class
Problem: AMQPConnectionException: Connection refused
Solutions:
- Verify RabbitMQ is running:
docker psorsystemctl status rabbitmq-server - Check connection details in
.envmatch your RabbitMQ instance - Ensure firewall allows port 5672
- Test connection:
telnet rabbitmq-host 5672
Problem: Messages published but not appearing in queue
Solutions:
- Run
php artisan rabbitmq:topologyto verify bindings - Check routing key matches binding pattern:
email.*matchesemail.welcomebut notemail.welcome.urgentemail.#matchesemail.welcome.urgent
- Verify exchange and queue were declared:
php artisan rabbitmq:declare - Check RabbitMQ management UI (port 15672) for unrouted messages
Problem: rabbitmq:consume exits without error
Solutions:
- Check memory limit:
--max-memory=256(increase if needed) - Check job limit:
--max-jobs=500(consumer exits after N jobs by design) - Check time limit:
--max-time=3600(consumer exits after N seconds) - Review logs for connection errors or exceptions
- Verify heartbeat settings if jobs run longer than 2× heartbeat interval
Problem: Jobs marked as processed but work not completed
Solutions:
- Check your job's
handle()method for unhandled exceptions - Enable failed job logging: check
failed_jobstable - Review RabbitMQ DLQ:
php artisan rabbitmq:dlq-inspect your-queue - Add logging to job:
Log::info('Job started', ['id' => $this->id]);
Problem: High priority messages not processed first
Solutions:
- Verify
quorum: false(quorum queues don't support priority) - Verify
maxPriorityis set on queue attribute - Ensure job implements
HasPriorityinterface - Check messages have priority set before prefetched messages processed
Problem: ->delay() doesn't delay message
Solutions:
- Install plugin:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange - Verify enabled in config:
'delayed.enabled' => true - Run
php artisan rabbitmq:declareto create delayed exchange - Check delay is within max: default 24 hours (
delayed.max_delay)
Problem: Worker memory grows over time
Solutions:
- Set
--max-memory=256to restart worker before OOM - Set
--max-jobs=500to periodically restart workers - Check for memory leaks in job code
- Ensure job releases large objects:
unset($largeVariable); - Use
--max-time=3600for time-based restarts
Contributions are welcome! Please see CONTRIBUTING.md for details.
MIT License. See LICENSE for details.