-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsnapshot_repository.rb
96 lines (80 loc) · 3 KB
/
snapshot_repository.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# frozen_string_literal: true
require 'base64'
require 'ruby_event_store/event'
module AggregateRoot
class SnapshotRepository
DEFAULT_SNAPSHOT_INTERVAL = 2
SNAPSHOT_STREAM_PATTERN = ->(base_stream_name) { "#{base_stream_name}_snapshots" }
NotRestorableSnapshot = Class.new(StandardError)
NotDumpableAggregateRoot = Class.new(StandardError)
def initialize(event_store, interval = DEFAULT_SNAPSHOT_INTERVAL)
raise ArgumentError, 'interval must be an Integer' unless interval.instance_of?(Integer)
raise ArgumentError, 'interval must be greater than 0' unless interval > 0
@event_store = event_store
@interval = interval
end
Snapshot = Class.new(RubyEventStore::Event)
def load(aggregate, stream_name)
last_snapshot = load_snapshot_event(stream_name)
query = event_store.read.stream(stream_name)
if last_snapshot
begin
aggregate = load_marshal(last_snapshot)
rescue NotRestorableSnapshot
else
aggregate.version = last_snapshot.data.fetch(:version)
query = query.from(last_snapshot.data.fetch(:last_event_id))
end
end
query.reduce { |_, ev| aggregate.apply(ev) }
aggregate.version = aggregate.version + aggregate.unpublished_events.count
aggregate
end
def store(aggregate, stream_name)
events = aggregate.unpublished_events.to_a
event_store.publish(events,
stream_name: stream_name,
expected_version: aggregate.version)
aggregate.version = aggregate.version + events.count
if time_for_snapshot?(aggregate.version, events.size)
begin
publish_snapshot_event(aggregate, stream_name, events.last.event_id)
rescue NotDumpableAggregateRoot
end
end
end
def with_aggregate(aggregate, stream_name, &block)
aggregate = load(aggregate, stream_name)
block.call(aggregate)
store(aggregate, stream_name)
end
private
attr_reader :event_store, :interval
def publish_snapshot_event(aggregate, stream_name, last_event_id)
event_store.publish(
Snapshot.new(
data: { marshal: build_marshal(aggregate), last_event_id: last_event_id, version: aggregate.version }
),
stream_name: SNAPSHOT_STREAM_PATTERN.(stream_name)
)
end
def build_marshal(aggregate)
Marshal.dump(aggregate)
rescue TypeError
raise NotDumpableAggregateRoot
end
def load_snapshot_event(stream_name)
event_store.read.stream(SNAPSHOT_STREAM_PATTERN.(stream_name)).last
end
def load_marshal(snpashot_event)
Marshal.load(snpashot_event.data.fetch(:marshal))
rescue TypeError, ArgumentError
raise NotRestorableSnapshot
end
def time_for_snapshot?(aggregate_version, just_published_events)
events_in_stream = aggregate_version + 1
events_since_time_for_snapshot = events_in_stream % interval
just_published_events > events_since_time_for_snapshot
end
end
end