diff --git a/ruby_event_store/lib/ruby_event_store/projection.rb b/ruby_event_store/lib/ruby_event_store/projection.rb index d06a1a62a0..3b4eb30471 100644 --- a/ruby_event_store/lib/ruby_event_store/projection.rb +++ b/ruby_event_store/lib/ruby_event_store/projection.rb @@ -76,7 +76,7 @@ def reduce_from_all_streams(event_store, start, count) end def read_scope(event_store, stream, count, start) - scope = event_store.read.in_batches(count) + scope = event_store.read.in_batches(count).as_of scope = scope.of_type(handled_events) scope = scope.stream(stream) if stream scope = scope.from(start) if start diff --git a/ruby_event_store/spec/projection_spec.rb b/ruby_event_store/spec/projection_spec.rb index b004dff155..61f17ff15b 100644 --- a/ruby_event_store/spec/projection_spec.rb +++ b/ruby_event_store/spec/projection_spec.rb @@ -209,6 +209,28 @@ module RubyEventStore expect(balance).to eq(total: 6) end + specify "all events from the stream must be read in valid order (starting from given event)" do + event_store.append( + [ + MoneyDeposited.new(data: { amount: 10 }, metadata: { valid_at: Time.new(2020, 1, 7) }), # this should be included + starting = MoneyWithdrawn.new(data: { amount: 2 }, metadata: { valid_at: Time.new(2020, 1, 4) }), + MoneyDeposited.new(data: { amount: 4 }, metadata: { valid_at: Time.new(2020, 1, 5) }), + MoneyWithdrawn.new(data: { amount: 3 }, metadata: { valid_at: Time.new(2020, 1, 6) }), + MoneyDeposited.new(data: { amount: 5 }, metadata: { valid_at: Time.new(2020, 1, 3) }) # this should be skipped + ], + stream_name: stream_name + ) + + balance = + Projection + .from_stream(stream_name) + .init(-> { { total: 0 } }) + .when([MoneyDeposited], ->(state, event) { state[:total] += event.data[:amount] }) + .when([MoneyWithdrawn], ->(state, event) { state[:total] -= event.data[:amount] }) + .run(event_store, start: [starting.event_id], count: 2) + expect(balance).to eq(total: 11) + end + specify "all events from all streams must be read (starting from begining of each stream)" do event_store.append(MoneyDeposited.new(data: { amount: 10 }), stream_name: stream_name) event_store.append(MoneyWithdrawn.new(data: { amount: 2 }), stream_name: stream_name) @@ -255,7 +277,7 @@ module RubyEventStore ) specification = Specification.new(SpecificationReader.new(repository, mapper)) - expected = specification.in_batches(100).of_type([MoneyDeposited, MoneyWithdrawn, MoneyLost]).result + expected = specification.in_batches(100).of_type([MoneyDeposited, MoneyWithdrawn, MoneyLost]).as_of.result expect(repository).to receive(:read).with(expected).and_call_original balance = @@ -270,7 +292,7 @@ module RubyEventStore specify do specification = Specification.new(SpecificationReader.new(repository, mapper)) - expected = specification.in_batches(2).of_type([MoneyDeposited, MoneyWithdrawn]).result + expected = specification.in_batches(2).of_type([MoneyDeposited, MoneyWithdrawn]).as_of.result expect(repository).to receive(:read).with(expected).and_return([]) Projection @@ -283,7 +305,7 @@ module RubyEventStore specify do specification = Specification.new(SpecificationReader.new(repository, mapper)) - expected = specification.in_batches(2).of_type([MoneyDeposited, MoneyWithdrawn]).stream("FancyStream").result + expected = specification.in_batches(2).of_type([MoneyDeposited, MoneyWithdrawn]).stream("FancyStream").as_of.result expect(repository).to receive(:read).with(expected).and_return([]) Projection