Skip to content

Commit 11c9550

Browse files
authored
Merge pull request #4137 from daipom/in_tcp-add-message_length_limit
`in_tcp`: Add `message_length_limit` to drop large incoming data
2 parents c989079 + 0d9c320 commit 11c9550

File tree

2 files changed

+115
-0
lines changed

2 files changed

+115
-0
lines changed

lib/fluent/plugin/in_tcp.rb

+43
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ class TcpInput < Input
3636
desc "The field name of the client's address."
3737
config_param :source_address_key, :string, default: nil
3838

39+
# Setting default to nil for backward compatibility
40+
desc "The max bytes of message."
41+
config_param :message_length_limit, :size, default: nil
42+
3943
config_param :blocking_timeout, :time, default: 0.5
4044

4145
desc 'The payload is read up to this character.'
@@ -102,6 +106,7 @@ def start
102106

103107
log.info "listening tcp socket", bind: @bind, port: @port
104108
del_size = @delimiter.length
109+
discard_till_next_delimiter = false
105110
if @_extract_enabled && @_extract_tag_key
106111
server_create(:in_tcp_server_single_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn|
107112
unless check_client(conn)
@@ -116,6 +121,16 @@ def start
116121
msg = buf[pos...i]
117122
pos = i + del_size
118123

124+
if discard_till_next_delimiter
125+
discard_till_next_delimiter = false
126+
next
127+
end
128+
129+
if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
130+
log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
131+
next
132+
end
133+
119134
@parser.parse(msg) do |time, record|
120135
unless time && record
121136
log.warn "pattern not matched", message: msg
@@ -131,6 +146,15 @@ def start
131146
end
132147
end
133148
buf.slice!(0, pos) if pos > 0
149+
# If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
150+
# So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
151+
if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
152+
log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
153+
buf.clear
154+
# We should discard the subsequent data until the next delimiter comes.
155+
discard_till_next_delimiter = true
156+
next
157+
end
134158
end
135159
else
136160
server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn|
@@ -147,6 +171,16 @@ def start
147171
msg = buf[pos...i]
148172
pos = i + del_size
149173

174+
if discard_till_next_delimiter
175+
discard_till_next_delimiter = false
176+
next
177+
end
178+
179+
if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
180+
log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
181+
next
182+
end
183+
150184
@parser.parse(msg) do |time, record|
151185
unless time && record
152186
log.warn "pattern not matched", message: msg
@@ -161,6 +195,15 @@ def start
161195
end
162196
router.emit_stream(@tag, es)
163197
buf.slice!(0, pos) if pos > 0
198+
# If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
199+
# So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
200+
if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
201+
log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
202+
buf.clear
203+
# We should discard the subsequent data until the next delimiter comes.
204+
discard_till_next_delimiter = true
205+
next
206+
end
164207
end
165208
end
166209
end

test/plugin/test_in_tcp.rb

+72
Original file line numberDiff line numberDiff line change
@@ -253,4 +253,76 @@ def create_tcp_socket(host, port, &block)
253253
assert_equal 'hello', event[2]['msg']
254254
end
255255
end
256+
257+
sub_test_case "message_length_limit" do
258+
data("batch_emit", { extract: "" }, keep: true)
259+
data("single_emit", { extract: "<extract>\ntag_key tag\n</extract>\n" }, keep: true)
260+
test "drop records exceeding limit" do |data|
261+
message_length_limit = 10
262+
d = create_driver(base_config + %!
263+
message_length_limit #{message_length_limit}
264+
<parse>
265+
@type none
266+
</parse>
267+
#{data[:extract]}
268+
!)
269+
d.run(expect_records: 2, timeout: 10) do
270+
create_tcp_socket('127.0.0.1', @port) do |sock|
271+
sock.send("a" * message_length_limit + "\n", 0)
272+
sock.send("b" * (message_length_limit + 1) + "\n", 0)
273+
sock.send("c" * (message_length_limit - 1) + "\n", 0)
274+
end
275+
end
276+
277+
expected_records = [
278+
"a" * message_length_limit,
279+
"c" * (message_length_limit - 1)
280+
]
281+
actual_records = d.events.collect do |event|
282+
event[2]["message"]
283+
end
284+
285+
assert_equal expected_records, actual_records
286+
end
287+
288+
test "clear buffer and discard the subsequent data until the next delimiter" do |data|
289+
message_length_limit = 12
290+
d = create_driver(base_config + %!
291+
message_length_limit #{message_length_limit}
292+
delimiter ";"
293+
<parse>
294+
@type json
295+
</parse>
296+
#{data[:extract]}
297+
!)
298+
d.run(expect_records: 1, timeout: 10) do
299+
create_tcp_socket('127.0.0.1', @port) do |sock|
300+
sock.send('{"message":', 0)
301+
sock.send('"hello', 0)
302+
sleep 1 # To make the server read data and clear the buffer here.
303+
sock.send('world!"};', 0) # This subsequent data must be discarded so that a parsing failure doesn't occur.
304+
sock.send('{"k":"v"};', 0) # This will succeed to parse.
305+
end
306+
end
307+
308+
logs = d.logs.collect do |log|
309+
log.gsub(/\A\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [-+]\d{4} /, "")
310+
end
311+
actual_records = d.events.collect do |event|
312+
event[2]
313+
end
314+
315+
assert_equal(
316+
{
317+
# Asserting that '[warn]: pattern not matched message="world!\"}"' warning does not occur.
318+
logs: ['[info]: The buffer size exceeds \'message_length_limit\', cleared: limit=12 size=17 head="{\"message\":\"hello"' + "\n"],
319+
records: [{"k" => "v"}],
320+
},
321+
{
322+
logs: logs[1..],
323+
records: actual_records,
324+
}
325+
)
326+
end
327+
end
256328
end

0 commit comments

Comments
 (0)