-
Notifications
You must be signed in to change notification settings - Fork 1.4k
plugin/decision: check if event is too large after compression and don't drop adaptive uncompressed size limit on upload #7521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…D cache sparingly Renamed the "soft" limit to "uncompressed limit" throughout the code and documentation for clarity. In the size and event buffer the uncompressed limit was being dropped after each upload, now it is carried over. The event buffer doesn't reset the encoder at all. Checking if an individual size is too big was comparing the uncompressed limit to the compressed limit causing events to be dropped or lose the ND cache unnecesarily. This is now fixed, instead if the uncompressed limit allows it the event is compressed and then multiple attempts are made before losing the ND cache or dropping the event. The configurable upload is used to calculate the uncompressed size by exponentially growing it, this could cause an overflow if it was set too high. Added a max. Signed-off-by: sspaink <[email protected]>
✅ Deploy Preview for openpolicyagent ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
Signed-off-by: sspaink <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts and questions.
v1/plugins/logs/encoder.go
Outdated
} | ||
} | ||
|
||
// if we reach this part of the code it can mean two things: | ||
// * the uncompressed limit has grown too large and the events need to be split up into multiple chunks | ||
// * an event has a large ND cache that could be dropped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unfortunate that if the ND cache can be dropped, then it looks like we'll end up marshaling and unmarshaling the event multiple times.
- The event is first added through
Write()
: marshaled - Below here, all events are enumerated: unmarshaled
- Each event is added back through
Write()
: marshaled - In
WriteBytes()
,enc.buf.Len() > int(enc.limit)
: all events written to the buffer are unmarshaled again - after which all events are marchalled again (maybe dropping ND-cache of last event)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If the last event had its ND-cache dropped, but didn't fit in the current chunk, all events are marshaled again.
v1/plugins/logs/encoder.go
Outdated
if err := enc.writeClose(); err != nil { | ||
return nil, err | ||
} | ||
|
||
var err error | ||
result, err = enc.reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that reset might recurse back into Write()
, could we end up in an infinite recursion loop here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might have been possible! I've updated the logic to actually take advantage of the recursion but have the base case be when a single event is written it will either drop it or write it.
v1/plugins/logs/encoder.go
Outdated
// skip the incomingEventBytes but save it in case the ND cache needs to be dropped | ||
if i == len(events)-1 { | ||
incomingEvent = events[i] | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using break
here, to clearly signal that the intention is to step out of the loop.
v1/plugins/logs/encoder.go
Outdated
// 1. Try to fit the incoming event into the next chunk without losing ND cache | ||
// 2. Try to drop the ND cache and see if the incoming event can fit with the current chunk without it (so we can maximize chunk size) | ||
// 3. Try to add the incoming event without its ND cache to a chunk by itself | ||
// 4. Drop the event, there isn't anything else to be done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe these steps could be broken out into a helper method?
v1/plugins/logs/encoder.go
Outdated
// 4. Drop the event, there isn't anything else to be done | ||
|
||
// 1. Try to fit the incoming event into the next chunk without losing ND cache | ||
tmpEncoder := newChunkEncoder(enc.limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't account for uncompressedLimit
(and associated scaling) here?
v1/plugins/logs/encoder.go
Outdated
result = append(result, enc.update()...) | ||
|
||
// write event to empty chunk | ||
if err := enc.appendEvent(incomingEventBytes); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tmpEncoder
already has the compressed event, can't we just swap buffers here?
v1/plugins/logs/encoder.go
Outdated
result = append(result, enc.update()...) | ||
|
||
// write event to empty chunk | ||
if err := enc.appendEvent(incomingEventBytesDroppedNDCache); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here about reusing tmpEncoder
's buffer?
v1/plugins/logs/plugin_test.go
Outdated
expectedDroppedNDCacheEvents: 50, | ||
// the adaptive uncompressed limit increases until it stabilizes | ||
expectedChunksLengths: []int{197, 197, 197, 197, 214, 214, 214, 214, 214}, | ||
expectedEventsInChunk: []int{1, 1, 1, 1, 9, 9, 9, 9, 9}, // 49 in total, plus one in the encoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What am I missing here? Why do we expect this spread of events, and why does a one-event chunk require 197
bytes while a nine-event chunk requires only slightly more (214
)? Is this an artifact of how the test is setup or of gzip?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the uncompressed soft-limit be similarly traced and asserted as it changes?
v1/plugins/logs/encoder_test.go
Outdated
expectedScaleUpEvents := uint64(8) | ||
expectedScaleDownEvents := uint64(3) | ||
expectedScaleUpEvents := uint64(4) | ||
expectedScaleDownEvents := uint64(0) | ||
expectedEquiEvents := uint64(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assert the expected number of chunks too? I'm guessing these aren't exactly the sum of these events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the test to add more cases, it also checks the sum of the events and the expected IDs of the events to make sure nothing is lost.
Signed-off-by: sspaink <[email protected]>
Signed-off-by: sspaink <[email protected]>
Signed-off-by: sspaink <[email protected]>
@johanfylling I've updated the logic to find an event that is too big to instead make use of the recursion that splits chunks when the uncompressed limit grows too large. Now the uncompressed limit is taken in account, and the first event that is written helps adjust the uncompressed limit to a reasonable starting point opposed to growing from the upload size limit. Also added a new histogram metric to track the number of events in each chunk. Not sure how useful this is for users 🤔 at the moment I am just using it in Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions.
It's been a while since I looked at this last, so sorry if I'm rehashing old stuff 😅.
return nil, err | ||
} | ||
|
||
if int64(len(eventBytes)+enc.bytesWritten+1) <= enc.uncompressedLimit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This +1
accounts for the [
/,
prefixing the event, right? If we want to be sticklers to keeping below the limit, should this be +2
? A [
or ,
will always be written, but if this ends up being the last event added, a closing ]
will eventually be written. Overkill?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The +1
is accounting for the closing bracket ]
, if this event is the last one then enc.bytesWritten
already has the opening bracket. Maybe I am misunderstanding?
v1/plugins/logs/encoder.go
Outdated
|
||
currentSize := enc.buf.Len() | ||
|
||
// 2. Try to drop the ND cache and see if the incoming event can fit with the current chunk without it (so we can maximize chunk size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// 2. Try to drop the ND cache and see if the incoming event can fit with the current chunk without it (so we can maximize chunk size) | |
// 2. Try to drop the ND cache and see if the incoming event can fit within the current chunk without it (so we can maximize chunk size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed 👍
v1/plugins/logs/encoder.go
Outdated
enc.scaleUp() | ||
|
||
result, err := enc.reset() | ||
if err := enc.appendEvent(eventBytes); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All we know here is that
- the event could be marshalled to json,
- the event + bytes already written didn't fit within the uncompressed limit,
- there are already bytes written to the buffer.
We don't know that the event will actually fit within the new scaled up buffer, right? Maybe it needs to have it's ND-cache dropped? Should this be a recursive call to Write()
?
If so, we'd need to concatenate result
and any non-nil returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this call to appendEvent()
ends us up with enc.bytesWritten >= enc.uncompressedLimit
, then the next call to Write()
will see an already too-full buffer + a new incoming event. Worst case, we'll end up in scenario 2) Scale Down
, right? And since enc.bytesWritten != 0
we'll close the writer and decode the existing chunk. Eventually, the first event (that's too large) will get it's ND-cache dropped.
This is equally a request for asserting my understanding is correct, and a note to self 😄.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your understanding is correct! A recursive call to Write()
makes sense and avoids the worst case scenario. Great catch, I've updated the code for 1) Scale Up
and 3) Equilibrium
to both do a recursive call to avoid burying a big event.
var result [][]byte | ||
for i := range events { | ||
// recursive call to make sure the chunk created adheres to the uncompressed size limit | ||
chunk, err := enc.Write(events[i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write()
will only drop the ND-cache on the first event written to a chunk (enc.bytesWritten == 0
). This means that there is a worst-case scenario where the ND-cache for any single event is always too large and must always be dropped; in which we'll always end up in the 1) Scale Up
scenario, and each event will be written to individual chunks? But since each event also has its ND-cache dropped, we probably could have fitted more events in each chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice find! That isn't ideal at all. A solution I could come up with is tracking the size of an event when the ND cache had to be dropped with enc.maxEventSize
. Then if another event gets written that exceeds that size it will immediately try to drop the cache giving it a chance to be added into the same chunk (if the uncompressed size has grown correctly).
v1/plugins/logs/encoder.go
Outdated
enc.incrMetric(logNDBDropCounterName) | ||
// success! the incoming chunk lost the ND cache, but it wasn't dropped entirely | ||
// scale up the uncompressed limit using the uncompressed event size as a base | ||
enc.uncompressedLimit = int64(len(eventBytes)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we've dropped the ND-cache here, couldn't this effectively be lowering the uncompressed limit? should we first assert that int64(len(eventBytes)) > enc.uncompressedLimit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! I added the check
var result [][]byte | ||
for i := range events { | ||
chunk, err := enc.Write(events[i]) | ||
chunk, err := enc.scaleDown(events) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to do this because the last event we wrote to the buffer was too much? Would be nice if there was a way to make sure that we're always below the limit in write. I.e. Whenever Write()
returns, all hoops have been jumped through. This would mean too large expenses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it would be a lot nicer if Write()
could guarantee the buffer was always below the limit. So the problem is getting the accurate compressed buffer size. You need to either Close
or Flush
the buffer to get the accurate size but they both have significant downsides:
- Calling
Flush
degrades the compression ratio, from the Flush docs it saysIn the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
and in the zlib documentation it saysFlushing may degrade compression
. It definitely does degrade because I tried to addenc.w.Flush()
after callingenc.appendEvent(eventBytes)
to see if we can scale down without pushing in a new event. There was basically no compression, it caused the buffer size to reach the limit way before it should. - Calling
Close
would require copying the buffer and writer each time to remove the closing bracket.
So by allowing the buffer to potentially exceeds the compressed limit lets us skip the above costs and deal with it when an event comes in that exceeds the uncompressed limit. And hopefully the uncompressed limit gets adjusted correctly eventually to avoid this at all. Then the downside is that when you call Flush
on the chunkEncoder it has to fix a buffer that exceeded the limit without an incoming event.
BTW I did not know that the gzip Flush call degraded the compression rate before trying it, definitely a hidden quirk that the Go documentation should do better at highlighting.
p.logger.Error("ND builtins cache dropped from this event to fit under maximum upload size limits. Increase upload size limit or change usage of non-deterministic builtins.") | ||
p.incrMetric(logNDBDropCounterName) | ||
} | ||
|
||
p.mtx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before, parts of buffering an event, such as json marshalling, happened outside of this global lock. Do we have a sense of how this will affect lock contention and general performance under high load?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will improve performance because calling p.encodeEvent(event)
acquires the lock, and if there wasn't an error, it would immediately try to get the lock again. Dropping the ND cache also called p.encodeEvent(event)
again so then it would need to acquire the lock three times before it can add the event. So this new way would have less contention.
* track maxEventSize to drop ND cache quicker Signed-off-by: sspaink <[email protected]>
Signed-off-by: sspaink <[email protected]>
Signed-off-by: sspaink <[email protected]>
Why the changes in this PR are needed?
resolve: #7526
What are the changes in this PR?
TestChunkMaxUploadSizeLimitNDBCacheDropping
to see how ND caches are less likely to be dropped. The changes are mostly inWriteBytes()
which still uses the uncompressed limit to make an educated guess if an event can fit it also checks if after compression an event still fits or if the ND cache could be dropped. Now that dropping of the ND cached moved into the chunk encoder the event and size buffer can reuse the logic.soft limit
touncompressed limit
throughout the documentation and code. Also renamed the termhard limit
tocompressed limit
. The goal is clarify so that it is clear that the "uncompressed limit" is dynamically updated as decision events are logged to make an educated guess what it could be without having to compress the events first.decision_logs.reporting.upload_size_limit_bytes
with a warning message if it was exceeded.