Skip to content

Event type resolver in message broker #1855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions ruby_event_store/lib/ruby_event_store/broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@

module RubyEventStore
class Broker
def initialize(subscriptions:, dispatcher:)
def initialize(subscriptions:, dispatcher:, event_type_resolver: EventTypeResolver.new)
@subscriptions = subscriptions
@dispatcher = dispatcher
@event_type_resolver = event_type_resolver
end

def call(event, record)
subscribers = subscriptions.all_for(event.event_type)
subscribers = all_subscriptions_for(event.event_type)
subscribers.each { |subscriber| dispatcher.call(subscriber, event, record) }
end

def add_subscription(subscriber, event_types)
verify_subscription(subscriber)
subscriptions.add_subscription(subscriber, event_types)
subscriptions.add_subscription(subscriber, event_types.map { |type| event_type_resolver.call(type) })
end

def add_global_subscription(subscriber)
Expand All @@ -24,17 +25,21 @@ def add_global_subscription(subscriber)

def add_thread_subscription(subscriber, event_types)
verify_subscription(subscriber)
subscriptions.add_thread_subscription(subscriber, event_types)
subscriptions.add_thread_subscription(subscriber, event_types.map { |type| event_type_resolver.call(type) })
end

def add_thread_global_subscription(subscriber)
verify_subscription(subscriber)
subscriptions.add_thread_global_subscription(subscriber)
end

def all_subscriptions_for(event_type)
subscriptions.all_for(event_type_resolver.call(event_type))
end

private

attr_reader :dispatcher, :subscriptions
attr_reader :dispatcher, :subscriptions, :event_type_resolver

def verify_subscription(subscriber)
raise SubscriberNotExist, "subscriber must be first argument or block" unless subscriber
Expand Down
17 changes: 7 additions & 10 deletions ruby_event_store/lib/ruby_event_store/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ def initialize(
@repository = repository
@mapper = mapper
@subscriptions = subscriptions
@broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher)
@broker = Broker.new(subscriptions: subscriptions, dispatcher: dispatcher, event_type_resolver: event_type_resolver)
@clock = clock
@metadata = Concurrent::ThreadLocalVar.new
@correlation_id_generator = correlation_id_generator
@event_type_resolver = event_type_resolver
end

# Persists events and notifies subscribed handlers about them
Expand Down Expand Up @@ -144,7 +143,7 @@ def event_in_stream?(event_id, stream_name)
def subscribe(subscriber = nil, to:, &proc)
raise ArgumentError, "subscriber must be first argument or block, cannot be both" if subscriber && proc
subscriber ||= proc
broker.add_subscription(subscriber, to.map { |event_klass| event_type_resolver.call(event_klass) })
broker.add_subscription(subscriber, to)
end

# Subscribes a handler (subscriber) that will be invoked for all published events
Expand All @@ -167,19 +166,18 @@ def subscribe_to_all_events(subscriber = nil, &proc)
# @param to [Class, String] type of events to get list of sybscribed handlers
# @return [Array<Object, Class>]
def subscribers_for(event_class)
subscriptions.all_for(event_type_resolver.call(event_class))
broker.all_subscriptions_for(event_class)
end

# Builder object for collecting temporary handlers (subscribers)
# which are active only during the invocation of the provided
# block of code.
class Within
def initialize(block, broker, resolver)
def initialize(block, broker)
@block = block
@broker = broker
@global_subscribers = []
@subscribers = Hash.new { [] }
@resolver = resolver
end

# Subscribes temporary handlers that
Expand Down Expand Up @@ -213,7 +211,7 @@ def subscribe_to_all_events(*handlers, &handler2)
# @return [self]
def subscribe(handler = nil, to:, &handler2)
raise ArgumentError if handler && handler2
@subscribers[handler || handler2] += Array(to).map { |event_klass| resolver.call(event_klass) }
@subscribers[handler || handler2] += Array(to)
self
end

Expand All @@ -231,7 +229,6 @@ def call
end

private
attr_reader :resolver

def add_thread_subscribers
@subscribers.map { |subscriber, types| @broker.add_thread_subscription(subscriber, types) }
Expand All @@ -249,7 +246,7 @@ def add_thread_global_subscribers
# @return [Within] builder object which collects temporary subscriptions
def within(&block)
raise ArgumentError if block.nil?
Within.new(block, broker, event_type_resolver)
Within.new(block, broker)
end

# Set additional metadata for all events published within the provided block
Expand Down Expand Up @@ -373,6 +370,6 @@ def default_correlation_id_generator
-> { SecureRandom.uuid }
end

attr_reader :repository, :mapper, :subscriptions, :broker, :clock, :correlation_id_generator, :event_type_resolver
attr_reader :repository, :mapper, :subscriptions, :broker, :clock, :correlation_id_generator
end
end
31 changes: 31 additions & 0 deletions ruby_event_store/lib/ruby_event_store/spec/broker_lint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,37 @@
broker.add_thread_global_subscription(handler)
end

specify "all_subscriptions_for" do
handler = Subscribers::ValidHandler.new
broker.add_subscription(handler, ["ProductAdded"])
broker.add_thread_subscription(handler, [TestEvent])
block = Proc.new { "Event published!" }
broker.add_subscription(block, ["OrderCreated"])

expect(broker.all_subscriptions_for(ProductAdded)).to eq [handler]
expect(broker.all_subscriptions_for(TestEvent)).to eq [handler]
expect(broker.all_subscriptions_for("OrderCreated")).to eq [block]
expect(broker.all_subscriptions_for("NotExistingOne")).to eq []
end

specify "all_subscriptions_for with event type resolver" do
event_klass =
Class.new do
def self.event_type
"non-derived-from-class"
end
end

broker = broker_klass.new(
subscriptions: subscriptions,
dispatcher: dispatcher,
event_type_resolver: ->(klass) { klass.event_type }
)
broker.add_subscription(handler = Proc.new {}, [event_klass])

expect(broker.all_subscriptions_for(event_klass)).to eq [handler]
end

private

class HandlerClass
Expand Down
2 changes: 1 addition & 1 deletion ruby_event_store/spec/within_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

module RubyEventStore
::RSpec.describe Client do
subject(:within) { Client::Within.new(nil, nil, -> (value) {value.to_s}) }
subject(:within) { Client::Within.new(nil, nil) }

specify "subscribe with handler as object and block" do
expect { within.subscribe(:handler, to: []) {} }.to raise_error(ArgumentError)
Expand Down
Loading