import "go.openfort.xyz/pubsub"- type Event
- type Handler
- type Listener
- type Metadata
- type Middleware
- type Publisher
- type Subscriber
- type Subscription
- type Topic
- type Transport
- type TransportFunc
- type TransportWrapper
Event represents a message published to a topic.
type Event struct {
Topic Topic `json:"event"`
Metadata Metadata `json:"metadata,omitempty"`
Payload []byte `json:"payload"`
}func NewEvent(topic Topic, payload []byte) *EventNewEvent creates a new event.
Handler is the function that processes the event.
type Handler func(ctx context.Context, event *Event) errorListener is the interface that subscriber uses to connect to the message broker agnostic of the underlying implementation.
type Listener interface {
// Connect connects to the message broker.
Connect(ctx context.Context) error
// Subscribe subscribes to a topic and returns messages to the subscription channel.
Subscribe(ctx context.Context, subscription *Subscription) error
// Close closes the connection to the message broker.
Close() error
}Metadata represents the key-value pairs of an event.
type Metadata map[string]interface{}func NewMetadata() MetadataNewMetadata creates a new metadata.
func (m Metadata) Del(key string)Del deletes the key-value pair from the metadata.
func (m Metadata) Get(key string) (interface{}, bool)Get returns the value associated with the key.
func (m Metadata) Keys() []stringKeys returns the keys of the metadata.
func (m Metadata) Set(key string, value interface{})Set sets the key-value pair in the metadata.
Middleware is the function that wraps a Handler to add functionality.
type Middleware func(next Handler) HandlerPublisher is the struct that sends messages to the message broker using the Transport interface agnostic of the underlying implementation.
type Publisher struct {
Transport Transport
}func NewPublisher(transport Transport) *PublisherNewPublisher creates a new publisher with the given transport.
func (p *Publisher) Publish(ctx context.Context, event *Event) errorPublish sends the event to the message broker.
func (p *Publisher) Use(transport TransportWrapper)Use adds a transport wrapper to the publisher.
Subscriber is the struct that receives events from the listener and calls the appropriate Handler using the Listener interface agnostic of the underlying implementation.
type Subscriber struct {
// contains filtered or unexported fields
}func NewSubscriber(listener Listener) *SubscriberNewSubscriber creates a new Subscriber with the given Listener.
func (s *Subscriber) HandleFunc(topic Topic, handler Handler)HandleFunc adds a Handler to the Subscriber for the given Topic.
func (s *Subscriber) Start(ctx context.Context) errorStart starts the Subscriber and listens for events from the Listener.
func (s *Subscriber) Stop(_ context.Context) errorStop stops gracefully the Subscriber and closes the Listener.
func (s *Subscriber) Use(middleware Middleware)Use adds a Middleware to the Subscriber.
Subscription is the struct that represents a subscription to a Topic with a channel to receive Event.
type Subscription struct {
Topic Topic
Consumer string
Channel chan *Event
}func NewSubscription(topic Topic) *SubscriptionNewSubscription creates a new Subscription for the given Topic.
Topic represents a message topic.
type Topic stringconst (
// SendNotificationTopic is the topic for sending notifications.
SendNotificationTopic Topic = "send.notification"
// UserOperationTopic is the topic for user operations.
UserOperationTopic Topic = "user.operation"
// TransactionUpdatedTopic is the topic for transaction updates.
TransactionUpdatedTopic Topic = "transaction.updated"
// WriteMetricTopic is the topic for writing metrics.
WriteMetricTopic Topic = "write.metric"
)func (t Topic) String() stringString returns the string representation of the topic.
Transport is the interface that publisher uses to send messages to the message broker agnostic of the underlying implementation.
type Transport interface {
Send(ctx context.Context, event *Event) error
}TransportFunc is a function type that implements the Transport interface.
type TransportFunc func(ctx context.Context, event *Event) errorfunc (f TransportFunc) Send(ctx context.Context, event *Event) errorSend calls f(ctx, event).
TransportWrapper is a function that wraps a transport with additional functionality.
type TransportWrapper func(Transport) TransportGenerated by gomarkdoc