Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions lib/logstash/outputs/scalyr.rb
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ class LogStash::Outputs::Scalyr < LogStash::Outputs::Base
# for large batches, it may make sense to disable this option when logstash batch size is configured in a way that
# Scalyr single request limit won't be reached.
config :estimate_each_event_size, :validate => :boolean, :default => true
# The following settings tune event truncation, which will truncate the
# message field to below `max_field_size_bytes`, drop any other field that
# exceeds `max_field_size_bytes`, and drop enough fields to ensure the record
# does not exceed `max_record_size_bytes`. This feature is only active when
# `estimate_each_event_size` is enabled.
config :max_record_size_bytes, :validate => :number, :default => 200 * 1024
config :max_field_size_bytes, :validate => :number, :default => 50 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, server side limit for those accounts right now is 200k (and general one is 100k). Would be good to double check and decide how to proceed (e.g. change the default value here or tell user to bump this value to 200k in their config or similar).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know where to find that limit, so I used the defaults suggested by Steven. If you can direct me towards where that limit is defined, I'd be happy to make sure the defaults here match the server-side defaults.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I just remembered something: according to our docs on how we bill on logging volume (https://app.scalyr.com/help/scalyr-agent#billing), each key in the attrs object only counts as 1 byte. Since our concern is serialization size, we should set our max record size to be larger than the actual max record size to allow some wiggle room.


# Library to use for JSON serialization. Valid values are "stdlib" and "jrjackson". The later may offer 2-4 performance
# improvements on serialization.
Expand Down Expand Up @@ -930,6 +937,12 @@ def build_multi_event_request_array(logstash_events)
end
end

if event_json.bytesize > @max_record_size_bytes
Copy link
Contributor

@Kami Kami Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud - wonder what kind of performance impact this may have during the worst case (a lot of events which hit this limit and possibly also events with many such fields - we may want to add some quick micro benchmarks to better understand the impact).

Since previously we wouldn't perform additional processing in such case on the client side, right (aka we would send it as-is and server would reject / truncate it)?

May be worthwhile to add some instrumentation / period logging (number of such events, duration of truncation operation). I also wonder if we may want to only periodically log "Event size exceeds max_record_size_bytes" message (something similar to rate limited warn in the agent code) to avoid potential noise / flood of those messages in case we encounter many such events.

In addition to that, we may also want to add periodic DEBUG log with event content (pre-truncation) to make troubleshooting / similar easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since previously we wouldn't perform additional processing in such case on the client side, right (aka we would send it as-is and server would reject / truncate it)?

Right. This has actually been weighing on my mind a lot. I don't actually know how often logstash handles records that exceed the max record size (whatever that value is), or the arbitrary value we chose of 200KiB.

How does the server deal with events that exceed the max record size? Our docs don't mention a record size limit, only a post body size limit.
@Kami How does the agent deal with large events? I couldn't find an answer to that question quickly on my own, and didn't want to delay this feature in order to dig around the agent codebase to find out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned on Slack, agent datasets are usually different - there is usually a single potentially large message attribute vs logstash scenario where it's common to have many other attributes (in addition or without the message attribute) on the event.

Having said that, I don't know how agent handles that so I will dig into code and also try to test it out.

@logger.warn("Event size exceeds max_record_size_bytes, and will be truncated")
truncate_event(scalyr_event)
event_json = self.json_encode(scalyr_event)
end

# generate new request if json size of events in the array exceed maximum request buffer size
append_event = true
add_bytes = event_json.bytesize
Expand Down Expand Up @@ -991,6 +1004,46 @@ def add_client_timestamp_to_body(body)
body[:client_timestamp] = current_time_millis.to_s
end

# This should only be called on events that have already been determined to be too large
def truncate_event(event)
attrs = event[:attrs]
new_attrs = Hash.new
total_size = 0
priority_events = ["serverHost", "parser", "logfile", "severity", "message"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could define this as a top / module level constants, and perhaps we want to call it something along the lines of "special / reserved" field names or similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on where this processing happens (either before or after the special event level serverHost field value is set), we may also need to include EVENT_LEVEL_SERVER_HOST_ATTRIBUTE_NAME constant / __origServerHost field name here.

# Rename serverHost (if exists) to __origServerHost so sources filtering works correctly

On a related note, do we also want to include thread + log here?

And IIRC, post processing (what gets sent to the sever side), severity field value is stored in sev field -

scalyr_event[:sev] = severity_int

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also worth noting that some of those field values (top level serverHost, logs, etc.) are, in some / all cases, sent once as session level parameters and not as event level attributes.

Not sure how things would behave in case one of the session level fields is very large (e.g. very large logs session field with many entries).

Probably also not a blocker and fully relevant to this change, but also worth thinking about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And IIRC, post processing (what gets sent to the sever side), severity field value is stored in sev field

That is correct. That field is also in the parent object, so if it is present I don't touch it.

And also worth noting that some of those field values (top level serverHost, logs, etc.) are, in some / all cases, sent once as session level parameters and not as event level attributes.

also correct. I decided against touching anything in there since that isn't where the problem currently lies. For the sake of expediency, and to avoid introducing regression, I felt it better to be surgical in my approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on where this processing happens (either before or after the special event level serverHost field value is set), we may also need to include EVENT_LEVEL_SERVER_HOST_ATTRIBUTE_NAME constant / __origServerHost field name here.

It happens after. I'll look into this. I think that data is stored outside of the attrs object, but I'll verify.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, correct, some of those attributes (e.g. sev) are stored outside of the event.attrs (tbh, not sure how limits apply to those attributes).

priority_events.each do |key|
next unless attrs.has_key? key
total_size += key.bytesize
value = attrs.delete key
value_size = _get_size(value)
if value_size > @max_field_size_bytes
value.slice!(@max_field_size_bytes, value_size)
Copy link
Contributor

@Kami Kami Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add "..." or similar to the end to signal that the field name has been truncated? (iirc, we do the same on the server side for truncated field values)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. I'll add that.

total_size += @max_field_size_bytes
else
total_size += value_size
end
new_attrs[key] = value
end
attrs.each do |key, value|
kv_size = key.bytesize + _get_size(value)
# skip field if the combined size of the key and value exceed the max field size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this is a good call since doing that would be somewhat complex and possibly also add a lot of overhead during worst case scenario.

Having said that, I wonder if we still want to add some periodic opt-in debug logging of those offending fields to make troubleshooting easier for the end user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

# We do this so we don't have to deal with figuring out how to truncate complex types
next if kv_size > @max_field_size_bytes
# Stop copying fields over if we would exceed the max record size
break if kv_size + total_size > @max_record_size_bytes
total_size += kv_size
new_attrs[key] = value
end
event[:attrs] = new_attrs
return event
end

def _get_size(value)
if not value.kind_of? String
value = value.to_s
end
return value.bytesize
end


# A request comprises multiple Scalyr Events. This function creates a request hash for
# final upload to Scalyr (from an array of events, and an optional hash of current threads)
Expand Down
105 changes: 105 additions & 0 deletions spec/logstash/outputs/scalyr_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,111 @@ def post_add_events(body, is_status, body_serialization_duration = 0)
end
end

context "when an event exceeds the max record size" do
def setup_plugin
config = {
'api_write_token' => '1234',
'perform_connectivity_check' => false,
'estimate_each_event_size' => true,
}
plugin = LogStash::Outputs::Scalyr.new(config)

allow(plugin).to receive(:send_status).and_return(nil)
plugin.register
return plugin
end

it "truncates the message field if it exceeds the max field size" do
plugin = setup_plugin()
e = LogStash::Event.new
e.set('message', 'a' * (205 * 1024))

result = plugin.build_multi_event_request_array([e])
body = JSON.parse(result[0][:body])
events = body['events']
scalyr_event = events[0]
attrs = scalyr_event['attrs']
expect(attrs['message'].bytesize).to eq(50 * 1024)
end
it "doesn't copy fields that exceed the max field size" do
plugin = setup_plugin()
e = LogStash::Event.new
e.set('message', 'a' * (205 * 1024))
e.set('honk', 'b' * (65 * 1024))
e.set('blarg', 'honk')
e.set('rawr', 'blah')

result = plugin.build_multi_event_request_array([e])
body = JSON.parse(result[0][:body])
events = body['events']
scalyr_event = events[0]
attrs = scalyr_event['attrs']
expect(attrs.has_key? 'honk').to be false
end
it "takes field key size into account" do
plugin = setup_plugin()
e = LogStash::Event.new
e.set('b' * (20 * 1024), 'blarg')
e.set('c' * (20 * 1024), 'blarg')
e.set('d' * (20 * 1024), 'blarg')
e.set('e' * (20 * 1024), 'blarg')
e.set('q' * (20 * 1024), 'blarg')
e.set('w' * (20 * 1024), 'blarg')
e.set('r' * (20 * 1024), 'blarg')
e.set('z' * (20 * 1024), 'blarg')
e.set('x' * (20 * 1024), 'blarg')
e.set('c' * (20 * 1024), 'blarg')
e.set('v' * (20 * 1024), 'blarg')
e.set('t' * (20 * 1024), 'blarg')

result = plugin.build_multi_event_request_array([e])
body = JSON.parse(result[0][:body])
events = body['events']
scalyr_event = events[0]
attrs = scalyr_event['attrs']
expect(attrs.size).to eq(10)
expect(attrs.to_json.bytesize).to be <= 200*1024
end
it "stops copying fields when the record would exceed the max record size" do
plugin = setup_plugin()
e = LogStash::Event.new
e.set('b', 'a' * (20 * 1024))
e.set('c', 'a' * (20 * 1024))
e.set('d', 'a' * (20 * 1024))
e.set('e', 'a' * (20 * 1024))
e.set('q', 'a' * (20 * 1024))
e.set('w', 'a' * (20 * 1024))
e.set('r', 'a' * (20 * 1024))
e.set('z', 'a' * (20 * 1024))
e.set('x', 'a' * (20 * 1024))
e.set('c', 'a' * (20 * 1024))
e.set('v', 'a' * (20 * 1024))
e.set('t', 'a' * (20 * 1024))

result = plugin.build_multi_event_request_array([e])
body = JSON.parse(result[0][:body])
events = body['events']
scalyr_event = events[0]
attrs = scalyr_event['attrs']
expect(attrs.size).to eq(10)
end
it "can estimate the size of complex nested objects, and throw them away" do
plugin = setup_plugin()
e = LogStash::Event.new
e.set('message', 'a' * (205 * 1024))
e.set('honk', [['b' * (65 * 1024)]])
e.set('blarg', 'honk')
e.set('rawr', 'blah')

result = plugin.build_multi_event_request_array([e])
body = JSON.parse(result[0][:body])
events = body['events']
scalyr_event = events[0]
attrs = scalyr_event['attrs']
expect(attrs.has_key? 'honk').to be false
end
end

context "scalyr_server config option handling and connectivity check" do
it "doesn't throw an error on valid url" do
config = {
Expand Down