Skip to content

plugin/decision: check if event is too large after compression #7521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

sspaink
Copy link
Contributor

@sspaink sspaink commented Apr 17, 2025

Why the changes in this PR are needed?

resolve: #7526

What are the changes in this PR?

This PR changes what happens when an event is written to the chunk encoder when calling Write and WriteBytes. Originally the incoming event uncompressed size was compared to the compressed limit causing the issue. To fix this, the logic has changed to rely on the adaptive uncompressed limit to prevent large events from sneaking into a chunk. In case the uncompressed limit is wrong, the events are decoded and written recursively into a chunk. The base case is that the incoming event is the first event being written into a chunk. This is when the event is compressed and the ND cache or the entire event can be dropped, the benefit is that in case the event is too big even after compression only a single event had to be compressed multiple times.

Moving the logic when to drop the ND cache into the encoder also has the benefit that the size and event buffer can reuse the logic.

The variable soft limit has also been renamed to uncompressed limit throughout the code and documentation to help clarify what it is meant to represent.

Notes to assist PR review:

Repeating the reproduction steps outlined in #7526, but using a build with the changes in this PR no error is logged.

…D cache sparingly

Renamed the "soft" limit to "uncompressed limit" throughout the code and documentation for clarity.
In the size and event buffer the uncompressed limit was being dropped after each upload, now it is carried over. The event buffer doesn't reset the encoder at all. Checking if an individual size is too big was comparing the uncompressed limit to the compressed limit causing events to be dropped or lose the ND cache unnecesarily. This is now fixed, instead if the uncompressed limit allows it the event is compressed and then multiple attempts are made before losing the ND cache or dropping the event. The configurable upload is used to calculate the uncompressed size by exponentially growing it, this could cause an overflow if it was set too high. Added a max.

Signed-off-by: sspaink <[email protected]>
Copy link

netlify bot commented Apr 17, 2025

Deploy Preview for openpolicyagent ready!

Name Link
🔨 Latest commit bf55f2b
🔍 Latest deploy log https://app.netlify.com/sites/openpolicyagent/deploys/681e82d01748280008adccb7
😎 Deploy Preview https://deploy-preview-7521--openpolicyagent.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

Copy link
Contributor

@johanfylling johanfylling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts and questions.

}
}

// if we reach this part of the code it can mean two things:
// * the uncompressed limit has grown too large and the events need to be split up into multiple chunks
// * an event has a large ND cache that could be dropped
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that if the ND cache can be dropped, then it looks like we'll end up marshaling and unmarshaling the event multiple times.

  1. The event is first added through Write(): marshaled
  2. Below here, all events are enumerated: unmarshaled
  3. Each event is added back through Write(): marshaled
  4. In WriteBytes(), enc.buf.Len() > int(enc.limit): all events written to the buffer are unmarshaled again
  5. after which all events are marchalled again (maybe dropping ND-cache of last event)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. If the last event had its ND-cache dropped, but didn't fit in the current chunk, all events are marshaled again.

if err := enc.writeClose(); err != nil {
return nil, err
}

var err error
result, err = enc.reset()
Copy link
Contributor

@johanfylling johanfylling Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that reset might recurse back into Write(), could we end up in an infinite recursion loop here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might have been possible! I've updated the logic to actually take advantage of the recursion but have the base case be when a single event is written it will either drop it or write it.

// skip the incomingEventBytes but save it in case the ND cache needs to be dropped
if i == len(events)-1 {
incomingEvent = events[i]
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using break here, to clearly signal that the intention is to step out of the loop.

// 1. Try to fit the incoming event into the next chunk without losing ND cache
// 2. Try to drop the ND cache and see if the incoming event can fit with the current chunk without it (so we can maximize chunk size)
// 3. Try to add the incoming event without its ND cache to a chunk by itself
// 4. Drop the event, there isn't anything else to be done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these steps could be broken out into a helper method?

// 4. Drop the event, there isn't anything else to be done

// 1. Try to fit the incoming event into the next chunk without losing ND cache
tmpEncoder := newChunkEncoder(enc.limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't account for uncompressedLimit (and associated scaling) here?

result = append(result, enc.update()...)

// write event to empty chunk
if err := enc.appendEvent(incomingEventBytes); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmpEncoder already has the compressed event, can't we just swap buffers here?

result = append(result, enc.update()...)

// write event to empty chunk
if err := enc.appendEvent(incomingEventBytesDroppedNDCache); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here about reusing tmpEncoder's buffer?

expectedDroppedNDCacheEvents: 50,
// the adaptive uncompressed limit increases until it stabilizes
expectedChunksLengths: []int{197, 197, 197, 197, 214, 214, 214, 214, 214},
expectedEventsInChunk: []int{1, 1, 1, 1, 9, 9, 9, 9, 9}, // 49 in total, plus one in the encoder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What am I missing here? Why do we expect this spread of events, and why does a one-event chunk require 197 bytes while a nine-event chunk requires only slightly more (214)? Is this an artifact of how the test is setup or of gzip?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the uncompressed soft-limit be similarly traced and asserted as it changes?

expectedScaleUpEvents := uint64(8)
expectedScaleDownEvents := uint64(3)
expectedScaleUpEvents := uint64(4)
expectedScaleDownEvents := uint64(0)
expectedEquiEvents := uint64(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert the expected number of chunks too? I'm guessing these aren't exactly the sum of these events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to add more cases, it also checks the sum of the events and the expected IDs of the events to make sure nothing is lost.

@sspaink
Copy link
Contributor Author

sspaink commented Apr 29, 2025

@johanfylling I've updated the logic to find an event that is too big to instead make use of the recursion that splits chunks when the uncompressed limit grows too large. Now the uncompressed limit is taken in account, and the first event that is written helps adjust the uncompressed limit to a reasonable starting point opposed to growing from the upload size limit.

Also added a new histogram metric to track the number of events in each chunk. Not sure how useful this is for users 🤔 at the moment I am just using it in TestChunkEncoderAdaptive to find the maximum.

Thanks!

@sspaink sspaink changed the title fix: don't drop adaptive uncompressed size limit on upload and drop ND cache sparingly plugin/decision: check if event is too large after compression and don't drop adaptive uncompressed size limit on upload Apr 30, 2025
@sspaink sspaink added the monitoring Issues related to decision log and status plugins label Apr 30, 2025
Copy link
Contributor

@johanfylling johanfylling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions.

It's been a while since I looked at this last, so sorry if I'm rehashing old stuff 😅.

return nil, err
}

if int64(len(eventBytes)+enc.bytesWritten+1) <= enc.uncompressedLimit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This +1 accounts for the [/, prefixing the event, right? If we want to be sticklers to keeping below the limit, should this be +2? A [ or , will always be written, but if this ends up being the last event added, a closing ] will eventually be written. Overkill?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The +1 is accounting for the closing bracket ], if this event is the last one then enc.bytesWritten already has the opening bracket. Maybe I am misunderstanding?


currentSize := enc.buf.Len()

// 2. Try to drop the ND cache and see if the incoming event can fit with the current chunk without it (so we can maximize chunk size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// 2. Try to drop the ND cache and see if the incoming event can fit with the current chunk without it (so we can maximize chunk size)
// 2. Try to drop the ND cache and see if the incoming event can fit within the current chunk without it (so we can maximize chunk size)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed 👍

enc.scaleUp()

result, err := enc.reset()
if err := enc.appendEvent(eventBytes); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All we know here is that

  1. the event could be marshalled to json,
  2. the event + bytes already written didn't fit within the uncompressed limit,
  3. there are already bytes written to the buffer.

We don't know that the event will actually fit within the new scaled up buffer, right? Maybe it needs to have it's ND-cache dropped? Should this be a recursive call to Write()?

If so, we'd need to concatenate result and any non-nil returns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this call to appendEvent() ends us up with enc.bytesWritten >= enc.uncompressedLimit, then the next call to Write() will see an already too-full buffer + a new incoming event. Worst case, we'll end up in scenario 2) Scale Down, right? And since enc.bytesWritten != 0 we'll close the writer and decode the existing chunk. Eventually, the first event (that's too large) will get it's ND-cache dropped.

This is equally a request for asserting my understanding is correct, and a note to self 😄.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your understanding is correct! A recursive call to Write() makes sense and avoids the worst case scenario. Great catch, I've updated the code for 1) Scale Up and 3) Equilibrium to both do a recursive call to avoid burying a big event.

var result [][]byte
for i := range events {
// recursive call to make sure the chunk created adheres to the uncompressed size limit
chunk, err := enc.Write(events[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write() will only drop the ND-cache on the first event written to a chunk (enc.bytesWritten == 0). This means that there is a worst-case scenario where the ND-cache for any single event is always too large and must always be dropped; in which we'll always end up in the 1) Scale Up scenario, and each event will be written to individual chunks? But since each event also has its ND-cache dropped, we probably could have fitted more events in each chunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find! That isn't ideal at all. A solution I could come up with is tracking the size of an event when the ND cache had to be dropped with enc.maxEventSize. Then if another event gets written that exceeds that size it will immediately try to drop the cache giving it a chance to be added into the same chunk (if the uncompressed size has grown correctly).

enc.incrMetric(logNDBDropCounterName)
// success! the incoming chunk lost the ND cache, but it wasn't dropped entirely
// scale up the uncompressed limit using the uncompressed event size as a base
enc.uncompressedLimit = int64(len(eventBytes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we've dropped the ND-cache here, couldn't this effectively be lowering the uncompressed limit? should we first assert that int64(len(eventBytes)) > enc.uncompressedLimit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! I added the check

var result [][]byte
for i := range events {
chunk, err := enc.Write(events[i])
chunk, err := enc.scaleDown(events)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do this because the last event we wrote to the buffer was too much? Would be nice if there was a way to make sure that we're always below the limit in write. I.e. Whenever Write() returns, all hoops have been jumped through. This would mean too large expenses?

Copy link
Contributor Author

@sspaink sspaink May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it would be a lot nicer if Write() could guarantee the buffer was always below the limit. So the problem is getting the accurate compressed buffer size. You need to either Close or Flush the buffer to get the accurate size but they both have significant downsides:

  • Calling Flush degrades the compression ratio, from the Flush docs it says In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH. and in the zlib documentation it says Flushing may degrade compression. It definitely does degrade because I tried to add enc.w.Flush() after calling enc.appendEvent(eventBytes) to see if we can scale down without pushing in a new event. There was basically no compression, it caused the buffer size to reach the limit way before it should.
  • Calling Close would require copying the buffer and writer each time to remove the closing bracket.

So by allowing the buffer to potentially exceeds the compressed limit lets us skip the above costs and deal with it when an event comes in that exceeds the uncompressed limit. And hopefully the uncompressed limit gets adjusted correctly eventually to avoid this at all. Then the downside is that when you call Flush on the chunkEncoder it has to fix a buffer that exceeded the limit without an incoming event.

BTW I did not know that the gzip Flush call degraded the compression rate before trying it, definitely a hidden quirk that the Go documentation should do better at highlighting.

p.logger.Error("ND builtins cache dropped from this event to fit under maximum upload size limits. Increase upload size limit or change usage of non-deterministic builtins.")
p.incrMetric(logNDBDropCounterName)
}

p.mtx.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, parts of buffering an event, such as json marshalling, happened outside of this global lock. Do we have a sense of how this will affect lock contention and general performance under high load?

Copy link
Contributor Author

@sspaink sspaink May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will improve performance because calling p.encodeEvent(event) acquires the lock, and if there wasn't an error, it would immediately try to get the lock again. Dropping the ND cache also called p.encodeEvent(event) again so then it would need to acquire the lock three times before it can add the event. So this new way would have less contention.

@sspaink sspaink changed the title plugin/decision: check if event is too large after compression and don't drop adaptive uncompressed size limit on upload plugin/decision: check if event is too large after compression May 9, 2025
* revert not dropping adaptive uncompressed limit

Signed-off-by: sspaink <[email protected]>
@sspaink
Copy link
Contributor Author

sspaink commented May 9, 2025

@johanfylling I think I put too much into one pull request, so I decided to split up the changes into separate issues/PRs.

I think this should help make the reviewing a little easier. I also added better documentation describing the specific problem and how to reproduce it in each issue/PR. Sorry for not doing this to begin with, I don't think it affects any of your most recent review comments. Thank you for bearing with me 😄

sspaink added 2 commits May 9, 2025 17:33
Signed-off-by: sspaink <[email protected]>
Signed-off-by: sspaink <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
monitoring Issues related to decision log and status plugins
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Decision log plugin uses the upload_size_limit_bytes to represent both the compressed and uncompressed limit
2 participants