Skip to content

qor5/go-bus

Repository files navigation

go-bus

中文版本

A simple and reliable PostgreSQL-based publish/subscribe message bus system for Go applications. Built on top of tnclong/go-que.

Features

Flexible topic subscription patterns: Support for NATS-style topic matching with exact matching, single-level wildcards (*), and multi-level wildcards (>)
Persistent message queue: PostgreSQL-based storage ensures reliable message delivery
Multiple queue support: Multiple queues can subscribe to the same topic patterns
Custom retry strategies: Each subscription can configure its own message processing retry strategy
Message header support: Support for custom message metadata
Context propagation: Full integration with Go's context package

Installation

go get github.com/qor5/go-bus

Quick Start

Creating a Bus Instance

import (
    "database/sql"
    "github.com/qor5/go-bus/pgbus"
    _ "github.com/lib/pq"
)

// Connect to PostgreSQL
db, err := sql.Open("postgres", "postgres://user:password@localhost/dbname?sslmode=disable")
if err != nil {
    log.Fatalf("Failed to connect to database: %v", err)
}

// Create a new bus instance
bus, err := pgbus.New(db)
if err != nil {
    log.Fatalf("Failed to create bus: %v", err)
}

Consuming Messages

// Get a queue
queue := bus.Queue("my_service_queue")

// Basic consumption
consumer, err := queue.StartConsumer(ctx, func(ctx context.Context, msg *bus.Inbound) error {
    fmt.Printf("Received message: subject=%s, payload=%s\n", msg.Subject, string(msg.Payload))

    // Reading headers
    if contentType := msg.Header.Get("Content-Type"); contentType != "" {
        fmt.Printf("Content-Type: %s\n", contentType)
    }

    // Mark message as done after processing
    return msg.Done(ctx)
})
if err != nil {
    log.Fatalf("Failed to start consumer: %v", err)
}
// Ensure consumer is stopped when done
defer consumer.Stop(context.Background())

// Consumption with custom worker configuration
workerConfig := bus.WorkerConfig{
    MaxLockPerSecond:          5,
    MaxBufferJobsCount:        10,
    MaxPerformPerSecond:       5,
    MaxConcurrentPerformCount: 2,
}

consumer, err := queue.StartConsumer(ctx, func(ctx context.Context, msg *bus.Inbound) error {
    // Process message...
    fmt.Printf("Processing message: %s\n", string(msg.Payload))

    // If you want to discard the message, use Destroy instead of Done
    return msg.Destroy(ctx)
}, bus.WithWorkerConfig(workerConfig))
if err != nil {
    log.Fatalf("Failed to start consumer with options: %v", err)
}
defer consumer.Stop(context.Background())

Creating Subscriptions

// Create subscriptions - supporting various patterns
exactSub, err := queue.Subscribe(ctx, "orders.created")                // Exact match
wildcardSub, err := queue.Subscribe(ctx, "products.*.category.*.info") // Single-level wildcard at multiple positions
multiLevelSub, err := queue.Subscribe(ctx, "notifications.>")          // Multi-level wildcard

// Subscription with custom configuration
customPlan := bus.PlanConfig{
    RunAtDelta: 200 * time.Millisecond,
    RetryPolicy: &que.RetryPolicy{
        InitialInterval:        2 * time.Second,
        MaxInterval:            20 * time.Second,
        NextIntervalMultiplier: 2.0,
        IntervalRandomPercent:  20,
        MaxRetryCount:          5,
    },
}

customSub, err := queue.Subscribe(ctx, "payments.processed", bus.WithPlanConfig(&customPlan))

// Unsubscribe from a specific subscription
// This method is usually executed when the subscription is not needed, and is not supposed to be executed with the exit of the program.
// This is because go-bus is designed to support offline messages.
err = customSub.Unsubscribe(ctx)

Publishing Messages

// Basic publish
_, err = bus.Publish(ctx, "orders.created", []byte(`{"id": "12345", "total": 99.99}`))

// Publishing with unique ID (for deduplication)
_, err = bus.Publish(ctx, "orders.created", []byte(`{"id": "12345", "total": 99.99}`), bus.WithUniqueID("order-12345"))

// Publishing with headers
_, err = bus.Publish(ctx, "orders.created", []byte(`{"id": "12345", "total": 99.99}`), bus.WithHeader(bus.Header{
    "Content-Type": []string{"application/json"},
    "X-Request-ID": []string{"req-123456"},
}))

// Publishing with an Outbound object
outbound := &bus.Outbound{
    Message: bus.Message{
        Subject: "orders.created",
        Header:  bus.Header{"Content-Type": []string{"application/json"}},
        Payload: []byte(`{"id": "12345", "total": 99.99}`),
    },
    UniqueID: bus.UniqueID("order-12345"), // Optional unique ID for message deduplication
}
_, err = bus.Dispatch(ctx, outbound)

// Publishing multiple messages at once
outbound1 := &bus.Outbound{
    Message: bus.Message{
        Subject: "orders.created",
        Payload: []byte(`{"id": "12345", "total": 99.99}`),
    },
    UniqueID: bus.UniqueID("order-12345"),
}
outbound2 := &bus.Outbound{
    Message: bus.Message{
        Subject: "notifications.sent",
        Payload: []byte(`{"user_id": "user123", "message": "Your order has been created"}`),
    },
    UniqueID: bus.UniqueID("notification-user123-order-created"),
}
// Dispatch supports publishing multiple outbound messages in a single call
_, err = bus.Dispatch(ctx, outbound1, outbound2)

Finding Matching Subscriptions

// Find subscriptions matching a specific subject
subs, err := bus.BySubject(ctx, "orders.created")
for _, sub := range subs {
    fmt.Printf("Queue %s matches with pattern %s\n", sub.Queue(), sub.Pattern())
}

// Get all subscriptions for a specific queue
queueSubs, err := queue.Subscriptions(ctx)
for _, sub := range queueSubs {
    fmt.Printf("Pattern: %s, ID: %s\n", sub.Pattern(), sub.ID())
}

Advanced Usage

Using Consumer-Specific Nonce in Queue Names for Distributed Broadcast Reception

In a distributed environment (such as Kubernetes), when you need to ensure that each instance in the cluster receives the same broadcast message, you can create queues with a consumer-specific nonce in their names for each instance. This way, each instance can independently receive the same message, achieving the broadcast effect.

import (
    "github.com/google/uuid"
    "github.com/qor5/go-bus/pgbus"
    "context"
)

// Create a unique queue name for each service instance (like a K8s Pod)
podQueueName := fmt.Sprintf("broadcast-receiver-%s", uuid.New().String())
podQueue := bus.Queue(podQueueName)

// Start consuming messages
consumer, err := podQueue.StartConsumer(ctx, func(ctx context.Context, msg *bus.Inbound) error {
    log.Printf("Instance %s received broadcast message: %s - %s",
        podQueueName, msg.Subject, string(msg.Payload))
    return msg.Destroy(ctx)
})
if err != nil {
    log.Printf("Failed to start consumer: %v", err)
}
defer consumer.Stop(context.Background())

// Subscribe to broadcast topics
// WithAutoDrain(true) ensures that all pending jobs will be automatically cleaned up
// when this subscription is unsubscribed, which is essential for temporary queues
sub, err := podQueue.Subscribe(ctx, "broadcast.events.>", bus.WithAutoDrain(true))
if err != nil {
    log.Fatalf("Failed to create broadcast subscription: %v", err)
}
defer func() {
    // Since podQueue is one-time, you should unsubscribe immediately when the program exits
    if err := sub.Unsubscribe(context.Background()); err != nil {
        log.Printf("Failed to unsubscribe: %v", err)
    }
}()

// Other service blocking logic

This pattern is particularly useful for:

  • Broadcasting configuration changes or system notifications to all service instances
  • Ensuring each instance in the cluster independently processes the same message, implementing a reliable broadcast mechanism
  • Implementing event-driven system-wide notifications in microservice architectures

Each instance creates a queue with a unique name, so each message is processed independently by each subscribed instance, achieving a true broadcast effect.

Topic Pattern Explanation

go-bus supports three types of topic matching patterns, following the NATS messaging system style:

  1. Exact Match: Matches the exact topic string

    • Example: orders.created only matches orders.created
  2. Single-Level Wildcard (*): Matches any string in a single level

    • Example: products.*.category.*.info matches products.xyz.category.abc.info and products.123.category.456.info, but not products.category.info or products.xyz.category.abc.def.info
  3. Multi-Level Wildcard (>): Matches zero or more levels

    • Example: orders.> matches orders.created, orders.updated, and orders.items.created

Important Notes

Avoid Overlapping Subscription Patterns

Do not subscribe to potentially overlapping patterns in the same queue. When a message matches multiple subscriptions in a queue, the system will only use the configuration (such as retry strategy) from the earliest created subscription and ignore others.

Problem Example

Suppose you create these two subscriptions in the same queue:

// First created subscription - using default configuration
sub1, err := queue.Subscribe(ctx, "orders.>")

// Later created subscription - with custom retry strategy
customPlan := bus.PlanConfig{
    RetryPolicy: &que.RetryPolicy{
        MaxRetryCount: 10,
        // Other configurations...
    },
}
sub2, err := queue.Subscribe(ctx, "orders.created", bus.WithPlanConfig(&customPlan))

When publishing a message with the subject orders.created:

  • The message matches both patterns: orders.> and orders.created
  • The system will use the configuration from sub1 (the earlier created orders.> subscription)
  • The custom retry strategy in sub2 (MaxRetryCount: 10) will be ignored

Correct Approach

To avoid this issue, use different queues for potentially overlapping patterns:

// First queue handles general orders events
queue1 := bus.Queue("orders_general_queue")
sub1, err := queue1.Subscribe(ctx, "orders.>")

// Second queue specifically handles orders.created events with custom configuration
queue2 := bus.Queue("orders_created_queue")
customPlan := bus.PlanConfig{
    RetryPolicy: &que.RetryPolicy{
        MaxRetryCount: 10,
        // Other configurations...
    },
}
sub2, err := queue2.Subscribe(ctx, "orders.created", bus.WithPlanConfig(&customPlan))

This way, the two subscriptions will process messages independently in their respective queues, and each configuration will be effective.

License

This project is licensed under the MIT License.

Acknowledgments

This project is based on tnclong/go-que - a high-performance PostgreSQL backend job queue.

About

PubSub message bus system based on go-que

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages