Skip to content
25 changes: 18 additions & 7 deletions spec/inputs/redis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def process(conf, event_count)
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
expect(command[0]).to eql :blpop
expect(command[1]).to eql ['foo', 0]
end.and_return ['foo', "{\"foo1\":\"bar\""], nil
end.and_return ['foo', "{\"foo1\":\"bar\"}"], nil

tt = Thread.new do
sleep 0.25
Expand Down Expand Up @@ -250,7 +250,7 @@ def process(conf, event_count)
allow_any_instance_of( Redis ).to receive(:script)
allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
expect(command[0]).to eql :evalsha
end.and_return ['{"a": 1}', '{"b":'], []
end.and_return ['{"a": 1}', '{"b": 2}'], []

tt = Thread.new do
sleep 0.25
Expand Down Expand Up @@ -314,6 +314,10 @@ def process(conf, event_count)

before { subject.register }

let(:channel_name) { 'foo' }

let(:redis_client) { subject.send(:new_redis_instance) }

def run_it_thread(inst)
Thread.new(inst) do |subj|
subj.run(queue)
Expand All @@ -324,7 +328,7 @@ def publish_thread(new_redis, prefix)
Thread.new(new_redis, prefix) do |r, p|
sleep 0.1
2.times do |i|
r.publish('foo', "#{p}#{i.next}")
r.publish(channel_name, { data: "#{p}#{i.next}" }.to_json)
end
end
end
Expand Down Expand Up @@ -365,27 +369,31 @@ def close_thread(inst, rt)
end

context 'real redis', :redis => true do

it 'calling the run method, adds events to the queue' do
#simulate the input thread
rt = run_it_thread(subject)
try(10) { expect(redis_client.pubsub('channels')).to include(channel_name) }
#simulate the other system thread
publish_thread(subject.send(:new_redis_instance), 'c').join
#simulate the pipeline thread
close_thread(subject, rt).join

expect(queue.size).to eq(2)
end

it 'events had redis_channel' do
#simulate the input thread
rt = run_it_thread(subject)
try(10) { expect(redis_client.pubsub('channels')).to include(channel_name) }
#simulate the other system thread
publish_thread(subject.send(:new_redis_instance), 'c').join
#simulate the pipeline thread
close_thread(subject, rt).join
e1 = queue.pop
e2 = queue.pop
expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
expect(e2.get('[@metadata][redis_channel]')).to eq('foo')
expect(e1.get('[@metadata][redis_channel]')).to eq(channel_name)
expect(e2.get('[@metadata][redis_channel]')).to eq(channel_name)
end
end
end
Expand All @@ -403,9 +411,11 @@ def close_thread(inst, rt)
end

context 'real redis', :redis => true do

it 'calling the run method, adds events to the queue' do
#simulate the input thread
rt = run_it_thread(subject)
try(10) { expect(redis_client.pubsub('numpat') > 0) }
#simulate the other system thread
publish_thread(subject.send(:new_redis_instance), 'pc').join
#simulate the pipeline thread
Expand All @@ -417,14 +427,15 @@ def close_thread(inst, rt)
it 'events had redis_channel' do
#simulate the input thread
rt = run_it_thread(subject)
try(10) { expect(redis_client.pubsub('numpat') > 0) }
#simulate the other system thread
publish_thread(subject.send(:new_redis_instance), 'pc').join
#simulate the pipeline thread
close_thread(subject, rt).join
e1 = queue.pop
e2 = queue.pop
expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
expect(e2.get('[@metadata][redis_channel]')).to eq('foo')
expect(e1.get('[@metadata][redis_channel]')).to eq(channel_name)
expect(e2.get('[@metadata][redis_channel]')).to eq(channel_name)
end
end
end
Expand Down