Skip to content

Commit dc7e4b2

Browse files
authored
Merge pull request #4178 from daipom/make-sure-to-reset-local-unpacker
MessagePackFactory: Make sure to reset local unpacker to prevent received broken data from affecting other receiving data
2 parents 0a6d706 + 3c4fbf0 commit dc7e4b2

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

lib/fluent/msgpack_factory.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,12 @@ def self.thread_local_msgpack_packer
100100
end
101101

102102
def self.thread_local_msgpack_unpacker
103-
Thread.current[:local_msgpack_unpacker] ||= MessagePackFactory.engine_factory.unpacker
103+
unpacker = Thread.current[:local_msgpack_unpacker]
104+
if unpacker.nil?
105+
return Thread.current[:local_msgpack_unpacker] = MessagePackFactory.engine_factory.unpacker
106+
end
107+
unpacker.reset
108+
unpacker
104109
end
105110
end
106111
end

test/test_msgpack_factory.rb

+32
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,36 @@ class MessagePackFactoryTest < Test::Unit::TestCase
1515
assert mp.msgpack_factory
1616
assert mp.msgpack_factory
1717
end
18+
19+
sub_test_case 'thread_local_msgpack_packer' do
20+
test 'packer is cached' do
21+
packer1 = Fluent::MessagePackFactory.thread_local_msgpack_packer
22+
packer2 = Fluent::MessagePackFactory.thread_local_msgpack_packer
23+
assert_equal packer1, packer2
24+
end
25+
end
26+
27+
sub_test_case 'thread_local_msgpack_unpacker' do
28+
test 'unpacker is cached' do
29+
unpacker1 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
30+
unpacker2 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
31+
assert_equal unpacker1, unpacker2
32+
end
33+
34+
# We need to reset the buffer every time so that received incomplete data
35+
# must not affect data from other senders.
36+
test 'reset the internal buffer of unpacker every time' do
37+
unpacker1 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
38+
unpacker1.feed_each("\xA6foo") do |result|
39+
flunk("This callback must not be called since the data is uncomplete.")
40+
end
41+
42+
records = []
43+
unpacker2 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker
44+
unpacker2.feed_each("\xA3foo") do |result|
45+
records.append(result)
46+
end
47+
assert_equal ["foo"], records
48+
end
49+
end
1850
end

0 commit comments

Comments
 (0)