Skip to content

Commit a5aeeaa

Browse files
mpraglowskifidel
authored andcommitted
Projections redesigned
Projection class is redesigned here. It now only suports read & reduce based on existing events, without possibility to subscribe for events. New & redesigned: * replace `from_all_streams` & `from_stream` with passing read scopes to `call` method, it allows to fully use all read specification features to define what events should be handled * instead of providing several streams (and starting points) to be processed `call` expects several read scopes * `when` method replaced with `on` method, with usage consistent with `on` handlers (as in `AggregateRoot`), the `on` methods require block to process state & event and it must return new projection state * allows to use simple values as initial state, no need to use hash to pass values, `nil` is the initial default state now * initial state is passed to projection using costructor Typical usage: ```ruby account_balance = RailsEventStore::Projection .new(0.0) .on(MoneyDeposited) { |state, event| state += event.data[:amount] } .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] } .call(client.read) ```
1 parent 7368e94 commit a5aeeaa

File tree

2 files changed

+97
-207
lines changed

2 files changed

+97
-207
lines changed

Diff for: ruby_event_store/lib/ruby_event_store/projection.rb

+20-69
Original file line numberDiff line numberDiff line change
@@ -2,94 +2,45 @@
22

33
module RubyEventStore
44
class Projection
5-
private_class_method :new
5+
ANONYMOUS_CLASS = "#<Class:".freeze
66

7-
def self.from_stream(stream_or_streams)
8-
streams = Array(stream_or_streams)
9-
raise(ArgumentError, "At least one stream must be given") if streams.empty?
10-
new(streams: streams)
11-
end
12-
13-
def self.from_all_streams
14-
new
15-
end
16-
17-
def initialize(streams: [])
18-
@streams = streams
7+
def initialize(initial_state = nil)
198
@handlers = {}
20-
@init = -> { {} }
9+
@init = -> { initial_state }
2110
end
2211

23-
attr_reader :streams, :handlers
24-
25-
def init(handler)
26-
@init = handler
27-
self
28-
end
12+
def on(*event_klasses, &block)
13+
raise(ArgumentError, 'No handler block given') unless block_given?
2914

30-
def when(events, handler)
31-
Array(events).each { |event| handlers[event.to_s] = handler }
15+
event_klasses.each do |event_klass|
16+
name = event_klass.to_s
17+
raise(ArgumentError, 'Anonymous class is missing name') if name.start_with? ANONYMOUS_CLASS
3218

19+
@handlers[name] = ->(state, event) { block.call(state, event) }
20+
end
3321
self
3422
end
3523

36-
def initial_state
37-
@init.call
38-
end
39-
40-
def current_state
41-
@current_state ||= initial_state
42-
end
43-
44-
def call(event)
45-
handlers.fetch(event.event_type).(current_state, event)
46-
end
47-
48-
def handled_events
49-
handlers.keys
50-
end
51-
52-
def run(event_store, start: nil, count: PAGE_SIZE)
24+
def call(*scopes)
5325
return initial_state if handled_events.empty?
54-
streams.any? ? reduce_from_streams(event_store, start, count) : reduce_from_all_streams(event_store, start, count)
55-
end
56-
57-
private
5826

59-
def valid_starting_point?(start)
60-
return true unless start
61-
streams.any? ? (start.instance_of?(Array) && start.size === streams.size) : start.instance_of?(String)
27+
scopes.reduce(initial_state) do |state, scope|
28+
scope.of_types(handled_events).reduce(state, &method(:transition))
29+
end
6230
end
6331

64-
def reduce_from_streams(event_store, start, count)
65-
raise ArgumentError.new("Start must be an array with event ids") unless valid_starting_point?(start)
66-
streams
67-
.zip(start_events(start))
68-
.reduce(initial_state) do |state, (stream_name, start_event_id)|
69-
read_scope(event_store, stream_name, count, start_event_id).reduce(state, &method(:transition))
70-
end
71-
end
72-
73-
def reduce_from_all_streams(event_store, start, count)
74-
raise ArgumentError.new("Start must be valid event id") unless valid_starting_point?(start)
75-
read_scope(event_store, nil, count, start).reduce(initial_state, &method(:transition))
76-
end
32+
private
7733

78-
def read_scope(event_store, stream, count, start)
79-
scope = event_store.read.in_batches(count)
80-
scope = scope.of_type(handled_events)
81-
scope = scope.stream(stream) if stream
82-
scope = scope.from(start) if start
83-
scope
34+
def initial_state
35+
@init.call
8436
end
8537

86-
def start_events(start)
87-
start ? start : Array.new
38+
def handled_events
39+
@handlers.keys
8840
end
8941

9042
def transition(state, event)
91-
handlers.fetch(event.event_type).call(state, event)
92-
state
43+
@handlers.fetch(event.event_type).call(state, event)
9344
end
9445
end
9546
end

0 commit comments

Comments
 (0)