🛑Warning:
This code was generated by artificial intelligence (AI) and has not been manually reviewed or verified. Use with caution and at your own risk.
The MicroBroker
library provides a message broker system that allows clients to subscribe to topics and publish messages to those topics. It uses a tree structure to organize topics, supports wildcard matching, and employs multiple message queues and worker goroutines to enhance message processing concurrency.
To use the MicroBroker
library in your Go project, you can simply import it:
import "your_package_path/broker"
type Payload struct {
Data interface{}
}
- Description: Represents the message data. The
Data
field can hold any type of data.
type Subscriber struct {
UUID string
Name string
Callback func(string, Payload)
}
- Description: Represents a client that subscribes to topics.
UUID
: A unique identifier for the subscriber.Name
: A human - readable name for the subscriber.Callback
: A function to handle received messages. It takes the topic name and the payload as parameters.
type topicNode struct {
name string
subscribers sync.Map
children map[string]*topicNode
mutex sync.RWMutex
}
- Description: Represents a node in the topic tree.
name
: The name of the node.subscribers
: A thread - safe map of subscribers by UUID.children
: A map of child nodes by name.mutex
: A mutex for thread safety.
type Broker struct {
root *topicNode
mutex sync.RWMutex
capacity int
messageQueues []chan messageItem
queueCount int
stopCh chan struct{}
wg sync.WaitGroup
conds []*sync.Cond
}
- Description: Represents the message broker.
root
: The root of the topic tree.mutex
: A global broker mutex.capacity
: The maximum number of messages in each queue.messageQueues
: Multiple queues for message distribution.queueCount
: The number of message queues.stopCh
: A channel to signal workers to stop.wg
: A wait group for graceful shutdown.conds
: Conditions for backpressure.
type messageItem struct {
subscriber *Subscriber
topic string
payload Payload
}
- Description: Represents a message in the queue. It contains the subscriber, the topic, and the payload.
func NewBroker(size int) *Broker
- Description: Creates a new broker with the specified channel capacity. It uses the number of CPU cores as the default queue/worker count.
- Parameters:
size
: The channel capacity.
- Returns: A pointer to a new
MicroBroker
instance.
func NewBrokerWithWorkers(size int, workerCount int) *Broker
- Description: Creates a new broker with the specified channel capacity and number of worker goroutines.
- Parameters:
size
: The channel capacity.workerCount
: The number of worker goroutines.
- Returns: A pointer to a new
MicroBroker
instance.
func (b *Broker) worker(queueID int)
- Description: Processes messages from its designated queue until it receives a stop signal.
- Parameters:
queueID
: The ID of the message queue to process.
func (b *Broker) getQueueForSubscriber(subscriberUUID string) int
- Description: Selects a queue for a subscriber using consistent hashing.
- Parameters:
subscriberUUID
: The UUID of the subscriber.
- Returns: The ID of the selected queue.
func (b *Broker) Publish(topic string, payload Payload)
- Description: Broadcasts a message to all subscribers of a topic and its parent topics. If the message queue is full, it will wait until there is space in the queue.
- Parameters:
topic
: The topic to publish the message to.payload
: The message payload.
func (b *Broker) Subscribe(topic string, subscriber Subscriber) error
- Description: Adds a subscriber to a topic.
- Parameters:
topic
: The topic to subscribe to.subscriber
: The subscriber to add.
- Returns: An error if the subscription fails, otherwise
nil
.
func (b *Broker) Unsubscribe(uuid string)
- Description: Removes a subscriber from all topics.
- Parameters:
uuid
: The UUID of the subscriber to remove.
func (b *Broker) UnsubscribeFromTopic(topic string, uuid string) error
- Description: Removes a subscriber from a specific topic.
- Parameters:
topic
: The topic to unsubscribe from.uuid
: The UUID of the subscriber to remove.
- Returns: An error if the topic is not found, otherwise
nil
.
func (b *Broker) GetSubscribers(topic string) ([]*Subscriber, error)
- Description: Returns all subscribers for a topic.
- Parameters:
topic
: The topic to get subscribers for.
- Returns: A slice of pointers to
Subscriber
instances and an error if the topic is empty, otherwisenil
.
func (b *Broker) Close()
- Description: Shuts down the broker and cleans up resources. It signals all workers to stop, waits for them to finish, closes all message queues, and resets the root node.
The library includes unit tests to ensure its correctness and stability. You can run the tests using the following command:
go test
The unit tests cover the following scenarios:
TestBrokerSubscribeAndPublish
: Tests the subscription and message publishing functionality.TestBrokerUnsubscribe
: Tests the unsubscribe functionality.