Skip to content

Commit 5468e7b

Browse files
committed
Parameterize clock when initializing repository
Instead of passing it every time.
1 parent a0b67dc commit 5468e7b

File tree

6 files changed

+87
-76
lines changed

6 files changed

+87
-76
lines changed

Diff for: contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/consumer.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:)
2424
raise "Unknown format" if configuration.message_format != SIDEKIQ5_FORMAT
2525
@processor = SidekiqProcessor.new(Redis.new(url: configuration.redis_url))
2626

27-
@repository = Repositories::Mysql57.build_for_consumer(configuration.database_url)
27+
@repository = Repositories::Mysql57.build_for_consumer(configuration.database_url, clock: clock)
2828
@cleanup_strategy = CleanupStrategies.build(configuration, repository)
2929
end
3030

@@ -123,7 +123,7 @@ def log_error(e)
123123
end
124124

125125
def obtain_lock_for_process(fetch_specification)
126-
result = repository.obtain_lock_for_process(fetch_specification, consumer_uuid, clock: @clock)
126+
result = repository.obtain_lock_for_process(fetch_specification, consumer_uuid)
127127
case result
128128
when :deadlocked
129129
logger.warn "Obtaining lock for split_key '#{fetch_specification.split_key}' failed (deadlock)"
@@ -158,7 +158,7 @@ def release_lock_for_process(fetch_specification)
158158
end
159159

160160
def refresh_lock_for_process(lock)
161-
result = repository.refresh_lock(lock, clock: @clock)
161+
result = repository.refresh_lock(lock)
162162
case result
163163
when :ok
164164
return true

Diff for: contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories/mysql57.rb

+15-4
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,20 @@ def self.get_lock_record(fetch_specification)
115115
end
116116
end
117117

118-
def self.build_for_consumer(database_url)
118+
def self.build_for_consumer(database_url, clock:)
119119
::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected?
120120
if ::ActiveRecord::Base.connection.adapter_name == "Mysql2"
121121
::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;")
122122
end
123-
new
123+
new(clock: clock)
124+
end
125+
126+
def self.build_for_producer(clock: Time)
127+
new(clock: clock)
128+
end
129+
130+
def initialize(clock:)
131+
@clock = clock
124132
end
125133

126134
def insert_record(format, split_key, payload)
@@ -135,15 +143,15 @@ def get_remaining_count(fetch_specification)
135143
Record.remaining_for(fetch_specification).count
136144
end
137145

138-
def obtain_lock_for_process(fetch_specification, process_uuid, clock:)
146+
def obtain_lock_for_process(fetch_specification, process_uuid)
139147
Lock.obtain(fetch_specification, process_uuid, clock: clock)
140148
end
141149

142150
def release_lock_for_process(fetch_specification, process_uuid)
143151
Lock.release(fetch_specification, process_uuid)
144152
end
145153

146-
def refresh_lock(lock, clock:)
154+
def refresh_lock(lock)
147155
Lock.refresh(lock, clock: clock)
148156
end
149157

@@ -161,6 +169,9 @@ def delete_enqueued_older_than(fetch_specification, duration, limit)
161169
rescue ::ActiveRecord::LockWaitTimeout
162170
:lock_timeout
163171
end
172+
173+
private
174+
attr_reader :clock
164175
end
165176
end
166177
end

Diff for: contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_producer.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
module RubyEventStore
88
module Outbox
99
class SidekiqProducer
10-
def initialize(repository = Repositories::Mysql57.new)
10+
def initialize(repository)
1111
@repository = repository
1212
end
1313

Diff for: contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_scheduler.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
module RubyEventStore
66
module Outbox
77
class SidekiqScheduler
8-
def initialize(repository: Repositories::Mysql57.new, serializer: RubyEventStore::Serializers::YAML)
8+
def initialize(repository: Repositories::Mysql57.build_for_producer, serializer: RubyEventStore::Serializers::YAML)
99
@serializer = serializer
1010
@sidekiq_producer = SidekiqProducer.new(repository)
1111
end

Diff for: contrib/ruby_event_store-outbox/spec/consumer_spec.rb

+27-25
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ module Outbox
3636
end
3737

3838
specify "push the jobs to sidekiq" do
39-
record = create_record("default", "default")
4039
clock = TickingClock.new
40+
record = create_record("default", "default", clock: clock)
4141
consumer =
4242
Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics)
4343
result = consumer.process
@@ -53,10 +53,10 @@ module Outbox
5353
end
5454

5555
specify "push multiple jobs to different queues" do
56-
create_record("default", "default")
57-
create_record("default", "default")
58-
create_record("default2", "default2")
5956
clock = TickingClock.new
57+
create_record("default", "default", clock: clock)
58+
create_record("default", "default", clock: clock)
59+
create_record("default2", "default2", clock: clock)
6060
consumer =
6161
Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics)
6262

@@ -87,9 +87,9 @@ module Outbox
8787
end
8888

8989
specify "returns false if didnt aquire lock" do
90-
create_record("default", "default")
91-
consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics)
9290
clock = TickingClock.new
91+
create_record("default", "default", clock: clock)
92+
consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics)
9393
Repositories::Mysql57::Lock.obtain(
9494
FetchSpecification.new(SIDEKIQ5_FORMAT, "default"),
9595
"some-other-process-uuid",
@@ -168,10 +168,10 @@ module Outbox
168168
end
169169

170170
specify "incorrect payload wont cause later messages to schedule" do
171-
record1 = create_record("default", "default")
172-
record1.update!(payload: "unparsable garbage")
173-
record2 = create_record("default", "default")
174171
clock = TickingClock.new
172+
record1 = create_record("default", "default", clock: clock)
173+
record1.update!(payload: "unparsable garbage")
174+
record2 = create_record("default", "default", clock: clock)
175175
consumer =
176176
Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics)
177177

@@ -245,15 +245,15 @@ module Outbox
245245
end
246246

247247
specify "deadlock when releasing lock doesnt do anything" do
248-
create_record("default", "default")
248+
clock = TickingClock.new
249+
create_record("default", "default", clock: clock)
249250
allow(Repositories::Mysql57::Lock).to receive(:lock).and_wrap_original do |m, *args|
250251
if caller.any? { |l| l.include? "`release'" }
251252
raise ::ActiveRecord::Deadlocked
252253
else
253254
m.call(*args)
254255
end
255256
end
256-
clock = TickingClock.new
257257
consumer =
258258
Consumer.new(
259259
SecureRandom.uuid,
@@ -272,7 +272,6 @@ module Outbox
272272
end
273273

274274
specify "lock timeout when releasing lock doesnt do anything" do
275-
create_record("default", "default")
276275
allow(Repositories::Mysql57::Lock).to receive(:lock).and_wrap_original do |m, *args|
277276
if caller.any? { |l| l.include? "`release'" }
278277
raise ::ActiveRecord::LockWaitTimeout
@@ -281,6 +280,7 @@ module Outbox
281280
end
282281
end
283282
clock = TickingClock.new
283+
create_record("default", "default", clock: clock)
284284
consumer =
285285
Consumer.new(
286286
SecureRandom.uuid,
@@ -299,8 +299,8 @@ module Outbox
299299
end
300300

301301
specify "after successful loop, lock is released" do
302-
create_record("default", "default")
303302
clock = TickingClock.new
303+
create_record("default", "default", clock: clock)
304304
consumer =
305305
Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics)
306306

@@ -312,8 +312,8 @@ module Outbox
312312
end
313313

314314
specify "lock disappearing in the meantime, doesnt do anything" do
315-
create_record("default", "default")
316315
clock = TickingClock.new
316+
create_record("default", "default", clock: clock)
317317
consumer =
318318
Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: test_metrics)
319319
allow(consumer).to receive(:release_lock_for_process).and_wrap_original do |m, *args|
@@ -332,8 +332,8 @@ module Outbox
332332
end
333333

334334
specify "lock stolen in the meantime, doesnt do anything" do
335-
create_record("default", "default")
336335
clock = TickingClock.new
336+
create_record("default", "default", clock: clock)
337337
consumer =
338338
Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics)
339339
allow(consumer).to receive(:release_lock_for_process).and_wrap_original do |m, *args|
@@ -349,12 +349,13 @@ module Outbox
349349
end
350350

351351
specify "old lock can be reobtained" do
352+
clock = TickingClock.new(start: 10.minutes.ago)
352353
Repositories::Mysql57::Lock.obtain(
353354
FetchSpecification.new(SIDEKIQ5_FORMAT, "default"),
354355
"some-old-uuid",
355-
clock: TickingClock.new(start: 10.minutes.ago)
356+
clock: clock
356357
)
357-
record = create_record("default", "default")
358+
record = create_record("default", "default", clock: clock)
358359
consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics)
359360

360361
result = consumer.process
@@ -365,10 +366,11 @@ module Outbox
365366
end
366367

367368
specify "relatively fresh locks are not reobtained" do
369+
clock = TickingClock.new(start: 9.minutes.ago)
368370
Repositories::Mysql57::Lock.obtain(
369371
FetchSpecification.new(SIDEKIQ5_FORMAT, "default"),
370372
"some-old-uuid",
371-
clock: TickingClock.new(start: 9.minutes.ago)
373+
clock: clock
372374
)
373375
create_record("default", "default")
374376
consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics)
@@ -379,8 +381,8 @@ module Outbox
379381
end
380382

381383
specify "when inserting lock, other process may do same concurrently" do
382-
record = create_record("default", "default")
383384
clock = TickingClock.new
385+
record = create_record("default", "default", clock: clock)
384386
consumer =
385387
Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics)
386388
allow(Repositories::Mysql57::Lock).to receive(:create!).and_wrap_original do |m, *args|
@@ -464,8 +466,8 @@ module Outbox
464466
end
465467

466468
specify "clean old jobs" do
467-
create_record("default", "default")
468469
clock = TickingClock.new
470+
create_record("default", "default", clock: clock)
469471
consumer =
470472
Consumer.new(
471473
SecureRandom.uuid,
@@ -485,8 +487,8 @@ module Outbox
485487
end
486488

487489
specify "clean old jobs with limit" do
488-
3.times.map { create_record("default", "default") }
489490
clock = TickingClock.new
491+
3.times.map { create_record("default", "default", clock: clock) }
490492
consumer =
491493
Consumer.new(
492494
SecureRandom.uuid,
@@ -506,8 +508,8 @@ module Outbox
506508
end
507509

508510
specify "clean old jobs - lock timeout" do
509-
create_record("default", "default")
510511
clock = TickingClock.new
512+
create_record("default", "default", clock: clock)
511513
consumer =
512514
Consumer.new(
513515
SecureRandom.uuid,
@@ -530,8 +532,8 @@ module Outbox
530532
end
531533

532534
specify "clean old jobs - deadlock" do
533-
create_record("default", "default")
534535
clock = TickingClock.new
536+
create_record("default", "default", clock: clock)
535537
consumer =
536538
Consumer.new(
537539
SecureRandom.uuid,
@@ -553,7 +555,7 @@ module Outbox
553555
expect(test_metrics.operation_results).to include({ operation: "cleanup", result: "deadlocked" })
554556
end
555557

556-
def create_record(queue, split_key, format: "sidekiq5")
558+
def create_record(queue, split_key, format: "sidekiq5", clock: Time)
557559
payload = {
558560
class: "SomeAsyncHandler",
559561
queue: queue,
@@ -569,7 +571,7 @@ def create_record(queue, split_key, format: "sidekiq5")
569571
}
570572
]
571573
}
572-
Repositories::Mysql57.new.insert_record(format, split_key, payload.to_json)
574+
Repositories::Mysql57.build_for_producer(clock: clock).insert_record(format, split_key, payload.to_json)
573575
end
574576
end
575577
end

0 commit comments

Comments
 (0)