diff --git a/docs/content/configuration.md b/docs/content/configuration.md index 507afc134b..79c150057e 100644 --- a/docs/content/configuration.md +++ b/docs/content/configuration.md @@ -788,7 +788,7 @@ included in the actual bundle gzipped tarball. | `decision_logs.reporting.buffer_size_limit_events` | `int64` | No (default: `10000`) | Decision log buffer size limit by events. OPA will drop old events from the log if this limit is exceeded. By default, 100 events are held. This number has to be greater than zero. Only works with "event" buffer type. | | `decision_logs.reporting.buffer_size_limit_bytes` | `int64` | No (default: `unlimited`) | Decision log buffer size limit in bytes. OPA will drop old events from the log if this limit is exceeded. By default, no limit is set. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. Only works with "size" buffer type. | | `decision_logs.reporting.max_decisions_per_second` | `float64` | No | Maximum number of decision log events to buffer per second. OPA will drop events if the rate limit is exceeded. Only one of `buffer_size_limit_bytes`, `max_decisions_per_second` may be set. | -| `decision_logs.reporting.upload_size_limit_bytes` | `int64` | No (default: `32768`) | Decision log upload size limit in bytes. OPA will chunk uploads to cap message body to this limit. | +| `decision_logs.reporting.upload_size_limit_bytes` | `int64` | No (default: `32768`) | Decision log upload size limit in bytes. This limit enforces the maximum size of a gzip compressed payload of events within the message body. The maximum value allowed is 4294967296 bytes. | | `decision_logs.reporting.min_delay_seconds` | `int64` | No (default: `300`) | Minimum amount of time to wait between uploads. | | `decision_logs.reporting.max_delay_seconds` | `int64` | No (default: `600`) | Maximum amount of time to wait between uploads. | | `decision_logs.reporting.trigger` | `string` | No (default: `periodic`) | Controls how decision logs are reported to the remote server. Allowed values are `periodic` and `manual` (`manual` triggers are only possible when using OPA as a Go package). | diff --git a/docs/content/management-decision-logs.md b/docs/content/management-decision-logs.md index efedb21184..f4e5454427 100644 --- a/docs/content/management-decision-logs.md +++ b/docs/content/management-decision-logs.md @@ -87,18 +87,22 @@ during the next upload event. OPA also performs an exponential backoff to calcul when the remote service responds with a non-2xx status. OPA periodically uploads decision logs to the remote service. In order to conserve network and memory resources, OPA -attempts to fill up each upload chunk with as many events as possible while respecting the user-specified -`upload_size_limit_bytes` config option. OPA defines an adaptive (`soft`) limit that acts as a measure for encoding -as many decisions into each chunk as possible. It uses the below algorithm to optimize the number of log events to -include in a chunk. The algorithm features three phases namely: - -`Scale Up`: If the current chunk size is within 90% of the user-configured (`hard`) limit, exponentially increase the -soft limit. The exponential function is 2^x where x has a minimum value of 1 - -`Scale Down`: If the current chunk size exceeds the hard limit, decrease the soft limit and re-encode the decisions in -the last chunk. - -`Equilibrium`: If the chunk size is between 90% and 100% of the user-configured limit, maintain soft limit value. +attempts to fill up each message body with as many events as possible while respecting the user-specified +`upload_size_limit_bytes` config option. Each message body is a gzip compressed JSON array and the `upload_size_limit_bytes` +config option represents the gzip compressed size, it can be referred to as the compressed limit. To avoid compressing +each incoming event to get its compressed size to see if the compressed limit is reached, OPA tries to make an educated +guess what the uncompressed limit could be. It does so by using an adaptive limit, referred to as the uncompressed limit, +that gets adjusted by measuring incoming events. This does mean that initially the chunk sizes will most likely be smaller +than the compressed limit, but as OPA consumes more decision events it will adjust the adaptive uncompressed limit to +optimize the messages. The algorithm to adjust the uncompressed limit uses the following criteria: + +`Scale Up`: If the current chunk size is within 90% of the user-configured compressed limit, exponentially increase the +uncompressed limit. The exponential function is 2^x where x has a minimum value of 1 + +`Scale Down`: If the current chunk size exceeds the compressed limit, decrease the uncompressed limit and re-encode the +decisions in the last chunk. + +`Equilibrium`: If the chunk size is between 90% and 100% of the user-configured limit, maintain uncompressed limit value. When an event containing `nd_builtin_cache` cannot fit into a chunk smaller than `upload_size_limit_bytes`, OPA will drop the `nd_builtin_cache` key from the event, and will retry encoding the chunk without the non-deterministic diff --git a/v1/plugins/discovery/discovery.go b/v1/plugins/discovery/discovery.go index a061191567..76e6b198ff 100644 --- a/v1/plugins/discovery/discovery.go +++ b/v1/plugins/discovery/discovery.go @@ -105,13 +105,13 @@ func New(manager *plugins.Manager, opts ...func(*Discovery)) (*Discovery, error) f(result) } - config, err := NewConfigBuilder().WithBytes(manager.Config.Discovery).WithServices(manager.Services()). - WithKeyConfigs(manager.PublicKeys()).Parse() + result.logger = manager.Logger().WithFields(map[string]interface{}{"plugin": Name}) + config, err := NewConfigBuilder().WithBytes(manager.Config.Discovery).WithServices(manager.Services()).WithKeyConfigs(manager.PublicKeys()).Parse() if err != nil { return nil, err } else if config == nil { - if _, err := getPluginSet(result.factories, manager, manager.Config, result.metrics, nil); err != nil { + if _, err := getPluginSet(result.factories, manager, manager.Config, result.metrics, result.logger, nil); err != nil { return nil, err } return result, nil @@ -141,8 +141,6 @@ func New(manager *plugins.Manager, opts ...func(*Discovery)) (*Discovery, error) Name: Name, } - result.logger = manager.Logger().WithFields(map[string]interface{}{"plugin": Name}) - manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) return result, nil } @@ -509,7 +507,7 @@ func (c *Discovery) processBundle(ctx context.Context, b *bundleApi.Bundle) (*pl return nil, err } - ps, err := getPluginSet(c.factories, c.manager, overriddenConfig, c.metrics, c.config.Trigger) + ps, err := getPluginSet(c.factories, c.manager, overriddenConfig, c.metrics, c.logger, c.config.Trigger) if err != nil { return nil, err } @@ -584,7 +582,7 @@ type pluginfactory struct { config interface{} } -func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager, config *config.Config, m metrics.Metrics, trigger *plugins.TriggerMode) (*pluginSet, error) { +func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager, config *config.Config, m metrics.Metrics, l logging.Logger, trigger *plugins.TriggerMode) (*pluginSet, error) { // Parse and validate plugin configurations. pluginNames := []string{} @@ -628,7 +626,7 @@ func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager } decisionLogsConfig, err := logs.NewConfigBuilder().WithBytes(config.DecisionLogs).WithServices(manager.Services()). - WithPlugins(pluginNames).WithTriggerMode(trigger).Parse() + WithPlugins(pluginNames).WithTriggerMode(trigger).WithLogger(l).Parse() if err != nil { return nil, err } diff --git a/v1/plugins/discovery/discovery_test.go b/v1/plugins/discovery/discovery_test.go index 132ba34dbb..8983c0441d 100644 --- a/v1/plugins/discovery/discovery_test.go +++ b/v1/plugins/discovery/discovery_test.go @@ -3231,7 +3231,7 @@ bundle: ` manager := getTestManager(t, conf) trigger := plugins.TriggerManual - _, err := getPluginSet(nil, manager, manager.Config, nil, &trigger) + _, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger) if err != nil { t.Fatalf("Unexpected error: %s", err) } @@ -3268,7 +3268,7 @@ bundles: ` manager := getTestManager(t, conf) trigger := plugins.TriggerManual - _, err := getPluginSet(nil, manager, manager.Config, nil, &trigger) + _, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger) if err != nil { t.Fatalf("Unexpected error: %s", err) } @@ -3328,7 +3328,7 @@ bundles: manager := getTestManager(t, tc.conf) trigger := plugins.TriggerManual - _, err := getPluginSet(nil, manager, manager.Config, nil, &trigger) + _, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger) if tc.wantErr { if err == nil { @@ -3393,7 +3393,7 @@ decision_logs: manager := getTestManager(t, tc.conf) trigger := plugins.TriggerManual - _, err := getPluginSet(nil, manager, manager.Config, nil, &trigger) + _, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger) if tc.wantErr { if err == nil { @@ -3464,7 +3464,7 @@ status: manager := getTestManager(t, tc.conf) trigger := plugins.TriggerManual - _, err := getPluginSet(nil, manager, manager.Config, nil, &trigger) + _, err := getPluginSet(nil, manager, manager.Config, nil, nil, &trigger) if tc.wantErr { if err == nil { diff --git a/v1/plugins/logs/encoder.go b/v1/plugins/logs/encoder.go index ce2a8ede8a..96cb3cda30 100644 --- a/v1/plugins/logs/encoder.go +++ b/v1/plugins/logs/encoder.go @@ -8,42 +8,53 @@ import ( "bytes" "compress/gzip" "encoding/json" - "fmt" "math" + "github.com/open-policy-agent/opa/v1/logging" "github.com/open-policy-agent/opa/v1/metrics" ) const ( - encHardLimitThreshold = 0.9 - softLimitBaseFactor = 2 - softLimitExponentScaleFactor = 0.2 - encLogExUploadSizeLimitCounterName = "enc_log_exceeded_upload_size_limit_bytes" - encSoftLimitScaleUpCounterName = "enc_soft_limit_scale_up" - encSoftLimitScaleDownCounterName = "enc_soft_limit_scale_down" - encSoftLimitStableCounterName = "enc_soft_limit_stable" + encCompressedLimitThreshold = 0.9 + uncompressedLimitBaseFactor = 2 + uncompressedLimitExponentScaleFactor = 0.2 + encLogExUploadSizeLimitCounterName = "enc_log_exceeded_upload_size_limit_bytes" + encUncompressedLimitScaleUpCounterName = "enc_uncompressed_limit_scale_up" + encUncompressedLimitScaleDownCounterName = "enc_uncompressed_limit_scale_down" + encUncompressedLimitStableCounterName = "enc_uncompressed_limit_stable" + encSoftLimitScaleUpCounterName = "enc_soft_limit_scale_up" // deprecated, use uncompressed version instead + encSoftLimitScaleDownCounterName = "enc_soft_limit_scale_down" // deprecated, use uncompressed version instead + encSoftLimitStableCounterName = "enc_soft_limit_stable" // deprecated, use uncompressed version instead ) -// chunkEncoder implements log buffer chunking and compression. Log events are -// written to the encoder and the encoder outputs chunks that are fit to the -// configured limit. +// chunkEncoder implements log buffer chunking and compression. +// Decision events are written to the encoder and the encoder outputs chunks that are fit to the configured limit. type chunkEncoder struct { - limit int64 - softLimit int64 - softLimitScaleUpExponent float64 - softLimitScaleDownExponent float64 - bytesWritten int - buf *bytes.Buffer - w *gzip.Writer - metrics metrics.Metrics + // limit is the maximum compressed payload size (configured by upload_size_limit_bytes) + limit int64 + // bytesWritten is used to track if anything has been written to the buffer + // using this avoids working around the fact that the gzip compression adds a header + bytesWritten int + buf *bytes.Buffer + w *gzip.Writer + metrics metrics.Metrics + logger logging.Logger + + // The uncompressedLimit is an adaptive limit that will attempt to guess the uncompressedLimit based on the utilization of the buffer on upload. + // This minimizes having to decompress all the events in case the limit is reached, needing to only do it if the guess is too large. + // Otherwise, you would need to compress the incoming event by itself to get an accurate size for comparison which would cause two compressions each write. + // This means that at first the chunks will contain fewer events until the uncompressedLimit can grow to a stable state. + uncompressedLimit int64 + uncompressedLimitScaleUpExponent float64 + uncompressedLimitScaleDownExponent float64 } func newChunkEncoder(limit int64) *chunkEncoder { enc := &chunkEncoder{ - limit: limit, - softLimit: limit, - softLimitScaleUpExponent: 0, - softLimitScaleDownExponent: 0, + limit: limit, + uncompressedLimit: limit, + uncompressedLimitScaleUpExponent: 0, + uncompressedLimitScaleDownExponent: 0, } enc.update() @@ -56,60 +67,66 @@ func (enc *chunkEncoder) WithMetrics(m metrics.Metrics) *chunkEncoder { } func (enc *chunkEncoder) Write(event EventV1) (result [][]byte, err error) { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(event); err != nil { + b, err := json.Marshal(&event) + if err != nil { return nil, err } - return enc.WriteBytes(buf.Bytes()) + return enc.WriteBytes(b) } // WriteBytes attempts to write a serialized event to the current chunk. // If the upload limit is reached the chunk is closed and a result is returned. // The incoming event that didn't fit is added to the next chunk. -func (enc *chunkEncoder) WriteBytes(bs []byte) (result [][]byte, err error) { - if len(bs) == 0 { - return nil, nil - } else if int64(len(bs)+2) > enc.limit { - if enc.metrics != nil { - enc.metrics.Counter(encLogExUploadSizeLimitCounterName).Incr() - } - return nil, fmt.Errorf("upload chunk size (%d) exceeds upload_size_limit_bytes (%d)", - int64(len(bs)+2), enc.limit) +func (enc *chunkEncoder) WriteBytes(event []byte) ([][]byte, error) { + if err := enc.appendEvent(event); err != nil { + return nil, err } - if int64(len(bs)+enc.bytesWritten+1) > enc.softLimit { - if err := enc.writeClose(); err != nil { - return nil, err - } - - result, err = enc.reset() + if int64(enc.bytesWritten) > enc.uncompressedLimit { + result, err := enc.reset() if err != nil { return nil, err } + + return result, nil + } + + return nil, nil +} + +func (enc *chunkEncoder) appendEvent(event []byte) error { + if len(event) == 0 { + return nil } if enc.bytesWritten == 0 { n, err := enc.w.Write([]byte(`[`)) if err != nil { - return nil, err + return err } enc.bytesWritten += n } else { n, err := enc.w.Write([]byte(`,`)) if err != nil { - return nil, err + return err } enc.bytesWritten += n } - n, err := enc.w.Write(bs) + n, err := enc.w.Write(event) if err != nil { - return nil, err + return err } - enc.bytesWritten += n - return + + if int64(enc.bytesWritten+1) > enc.uncompressedLimit { + if err := enc.writeClose(); err != nil { + return err + } + } + + return nil } func (enc *chunkEncoder) writeClose() error { @@ -126,58 +143,67 @@ func (enc *chunkEncoder) Flush() ([][]byte, error) { if err := enc.writeClose(); err != nil { return nil, err } - return enc.reset() + // don't call enc.reset() because the uncompressed limit shouldn't be updated when forcing the buffer to be emptied + // the buffer could most likely be underutilized (<90%) and won't be an accurate data point + return enc.update(), nil } -//nolint:unconvert func (enc *chunkEncoder) reset() ([][]byte, error) { + // make sure there aren't any pending writes to get an accurate size + if err := enc.w.Flush(); err != nil { + return nil, err + } - // Adjust the encoder's soft limit based on the current amount of - // data written to the underlying buffer. The soft limit decides when to flush a chunk. - // The soft limit is modified based on the below algorithm: + // Adjust the encoder's uncompressed limit based on the current amount of + // data written to the underlying buffer. The uncompressed limit decides when to flush a chunk. + // The uncompressed limit is modified based on the below algorithm: // 1) Scale Up: If the current chunk size is within 90% of the user-configured limit, exponentially increase - // the soft limit. The exponential function is 2^x where x has a minimum value of 1 - // 2) Scale Down: If the current chunk size exceeds the hard limit, decrease the soft limit and re-encode the + // the uncompressed limit. The exponential function is 2^x where x has a minimum value of 1 + // 2) Scale Down: If the current chunk size exceeds the compressed limit, decrease the uncompressed limit and re-encode the // decisions in the last chunk. - // 3) Equilibrium: If the chunk size is between 90% and 100% of the user-configured limit, maintain soft limit value. + // 3) Equilibrium: If the chunk size is between 90% and 100% of the user-configured limit, maintain uncompressed limit value. - if enc.buf.Len() < int(float64(enc.limit)*encHardLimitThreshold) { - if enc.metrics != nil { - enc.metrics.Counter(encSoftLimitScaleUpCounterName).Incr() - } + // 1) Scale Up + if enc.buf.Len() < int(float64(enc.limit)*encCompressedLimitThreshold) { + enc.incrMetric(encUncompressedLimitScaleUpCounterName) + enc.incrMetric(encSoftLimitScaleUpCounterName) + + mul := int64(math.Pow(float64(uncompressedLimitBaseFactor), float64(enc.uncompressedLimitScaleUpExponent+1))) + enc.uncompressedLimit *= mul + enc.uncompressedLimitScaleUpExponent += uncompressedLimitExponentScaleFactor - mul := int64(math.Pow(float64(softLimitBaseFactor), float64(enc.softLimitScaleUpExponent+1))) - enc.softLimit *= mul - enc.softLimitScaleUpExponent += softLimitExponentScaleFactor return enc.update(), nil } - if int(enc.limit) > enc.buf.Len() && enc.buf.Len() >= int(float64(enc.limit)*encHardLimitThreshold) { - if enc.metrics != nil { - enc.metrics.Counter(encSoftLimitStableCounterName).Incr() - } + // 3) Equilibrium + if int(enc.limit) > enc.buf.Len() && enc.buf.Len() >= int(float64(enc.limit)*encCompressedLimitThreshold) { + enc.incrMetric(encUncompressedLimitStableCounterName) + enc.incrMetric(encSoftLimitStableCounterName) - enc.softLimitScaleDownExponent = enc.softLimitScaleUpExponent + enc.uncompressedLimitScaleDownExponent = enc.uncompressedLimitScaleUpExponent return enc.update(), nil } - if enc.softLimit > enc.limit { - if enc.metrics != nil { - enc.metrics.Counter(encSoftLimitScaleDownCounterName).Incr() - } + // 2) Scale Down + if enc.uncompressedLimit > enc.limit { + enc.incrMetric(encUncompressedLimitScaleDownCounterName) + enc.incrMetric(encSoftLimitScaleDownCounterName) - if enc.softLimitScaleDownExponent < enc.softLimitScaleUpExponent { - enc.softLimitScaleDownExponent = enc.softLimitScaleUpExponent + if enc.uncompressedLimitScaleDownExponent < enc.uncompressedLimitScaleUpExponent { + enc.uncompressedLimitScaleDownExponent = enc.uncompressedLimitScaleUpExponent } - den := int64(math.Pow(float64(softLimitBaseFactor), float64(enc.softLimitScaleDownExponent-enc.softLimitScaleUpExponent+1))) - enc.softLimit /= den + den := int64(math.Pow(float64(uncompressedLimitBaseFactor), float64(enc.uncompressedLimitScaleDownExponent-enc.uncompressedLimitScaleUpExponent+1))) + enc.uncompressedLimit /= den - if enc.softLimitScaleUpExponent > 0 { - enc.softLimitScaleUpExponent -= softLimitExponentScaleFactor + if enc.uncompressedLimitScaleUpExponent > 0 { + enc.uncompressedLimitScaleUpExponent -= uncompressedLimitExponentScaleFactor } } + // if we reach this part of the code it can mean two things: + // * the uncompressed limit has grown too large and the events need to be split up into multiple chunks + // * an event has a large ND cache that could be dropped events, decErr := newChunkDecoder(enc.buf.Bytes()).decode() if decErr != nil { return nil, decErr @@ -187,6 +213,58 @@ func (enc *chunkEncoder) reset() ([][]byte, error) { var result [][]byte for i := range events { + tmpEncoder := newChunkEncoder(enc.limit) + b, err := json.Marshal(&events[i]) + if err != nil { + return nil, err + } + err = tmpEncoder.appendEvent(b) + if err != nil { + return nil, err + } + if err := tmpEncoder.w.Flush(); err != nil { + return nil, err + } + + if int64(tmpEncoder.buf.Len()) > tmpEncoder.limit { + enc.incrMetric(encLogExUploadSizeLimitCounterName) + // Try to drop ND cache. If there's no ND builtins cache in the event, then we don't need to retry encoding anything. + if events[i].NDBuiltinCache == nil { + enc.incrMetric(logEncodingFailureCounterName) + enc.logError("Log encoding failed: received a decision event size (%d) that exceeds upload_size_limit_bytes (%d).", + tmpEncoder.buf.Len(), tmpEncoder.limit) + continue + } + + // Attempt to encode the event again, dropping the ND builtins cache. + events[i].NDBuiltinCache = nil + + tmpEncoder.initialize() + b, err := json.Marshal(&events[i]) + if err != nil { + return nil, err + } + err = tmpEncoder.appendEvent(b) + if err != nil { + return nil, err + } + + if err := tmpEncoder.w.Flush(); err != nil { + return nil, err + } + if int64(tmpEncoder.buf.Len()) > tmpEncoder.limit { + enc.incrMetric(logEncodingFailureCounterName) + enc.logError("Log encoding failed: received a decision event size (%d) that exceeds upload_size_limit_bytes (%d).", + tmpEncoder.buf.Len(), tmpEncoder.limit) + + continue + } + + // Re-encoding was successful, but we still need to alert users. + enc.logError("ND builtins cache dropped from this event to fit under maximum upload size limits. Increase upload size limit or change usage of non-deterministic builtins.") + enc.incrMetric(logNDBDropCounterName) + } + chunk, err := enc.Write(events[i]) if err != nil { return nil, err @@ -214,6 +292,18 @@ func (enc *chunkEncoder) initialize() { enc.w = gzip.NewWriter(enc.buf) } +func (enc *chunkEncoder) logError(fmt string, a ...interface{}) { + if enc.logger != nil { + enc.logger.Error(fmt, a) + } +} + +func (enc *chunkEncoder) incrMetric(name string) { + if enc.metrics != nil { + enc.metrics.Counter(name).Incr() + } +} + // chunkDecoder decodes the encoded chunks and outputs the log events type chunkDecoder struct { raw []byte diff --git a/v1/plugins/logs/encoder_test.go b/v1/plugins/logs/encoder_test.go index 445d9cabbb..618501514e 100644 --- a/v1/plugins/logs/encoder_test.go +++ b/v1/plugins/logs/encoder_test.go @@ -13,7 +13,6 @@ import ( ) func TestChunkEncoder(t *testing.T) { - enc := newChunkEncoder(1000) var result interface{} = false var expInput interface{} = map[string]interface{}{"method": "GET"} @@ -73,18 +72,26 @@ func TestChunkEncoderSizeLimit(t *testing.T) { RequestedBy: "test", Timestamp: ts, } - _, err = enc.Write(event) - if err == nil { - t.Error("Expected error as upload chunk size exceeds configured limit") + chunks, err := enc.Write(event) + if err != nil { + t.Error(err) + } + if len(chunks) != 0 { + t.Errorf("Unexpected result: %v", result) } - expected := "upload chunk size (200) exceeds upload_size_limit_bytes (1)" - if err.Error() != expected { - t.Errorf("expected: '%s', got: '%s'", expected, err.Error()) + if err := enc.w.Flush(); err != nil { + t.Fatal(err) + } + // only the expected flush contents (header+Z_SYNC_FLUSH content) without the event is expected + if enc.buf.Len() != 15 { + t.Errorf("Unexpected buffer size: %v", enc.buf.Len()) + } + if enc.bytesWritten != 0 { + t.Errorf("Unexpected bytes written: %v", enc.bytesWritten) } } func TestChunkEncoderAdaptive(t *testing.T) { - enc := newChunkEncoder(1000).WithMetrics(metrics.New()) var result interface{} = false var expInput interface{} = map[string]interface{}{"method": "GET"} @@ -145,12 +152,12 @@ func TestChunkEncoderAdaptive(t *testing.T) { t.Fatalf("Expected %v events but got %v", numEvents, numEventsActual) } - actualScaleUpEvents := enc.metrics.Counter(encSoftLimitScaleUpCounterName).Value().(uint64) - actualScaleDownEvents := enc.metrics.Counter(encSoftLimitScaleDownCounterName).Value().(uint64) - actualEquiEvents := enc.metrics.Counter(encSoftLimitStableCounterName).Value().(uint64) + actualScaleUpEvents := enc.metrics.Counter(encUncompressedLimitScaleUpCounterName).Value().(uint64) + actualScaleDownEvents := enc.metrics.Counter(encUncompressedLimitScaleDownCounterName).Value().(uint64) + actualEquiEvents := enc.metrics.Counter(encUncompressedLimitStableCounterName).Value().(uint64) - expectedScaleUpEvents := uint64(8) - expectedScaleDownEvents := uint64(3) + expectedScaleUpEvents := uint64(5) + expectedScaleDownEvents := uint64(0) expectedEquiEvents := uint64(0) if actualScaleUpEvents != expectedScaleUpEvents { diff --git a/v1/plugins/logs/eventBuffer.go b/v1/plugins/logs/eventBuffer.go index 464d511d0f..02944655e9 100644 --- a/v1/plugins/logs/eventBuffer.go +++ b/v1/plugins/logs/eventBuffer.go @@ -6,8 +6,6 @@ package logs import ( "context" - "encoding/json" - "fmt" "sync" "github.com/open-policy-agent/opa/v1/logging" @@ -27,6 +25,7 @@ type eventBuffer struct { client rest.Client // client is used to upload the data to the configured service uploadPath string // uploadPath is the configured HTTP resource path for upload uploadSizeLimitBytes int64 // uploadSizeLimitBytes will enforce a maximum payload size to be uploaded + enc *chunkEncoder // encoder appends events into the gzip compressed JSON array metrics metrics.Metrics logger logging.Logger } @@ -37,11 +36,13 @@ func newEventBuffer(bufferSizeLimitEvents int64, client rest.Client, uploadPath client: client, uploadPath: uploadPath, uploadSizeLimitBytes: uploadSizeLimitBytes, + enc: newChunkEncoder(uploadSizeLimitBytes), } } func (b *eventBuffer) WithMetrics(m metrics.Metrics) *eventBuffer { b.metrics = m + b.enc.metrics = m return b } @@ -69,6 +70,7 @@ func (b *eventBuffer) Reconfigure(bufferSizeLimitEvents int64, client rest.Clien b.client = client b.uploadPath = uploadPath b.uploadSizeLimitBytes = uploadSizeLimitBytes + b.enc.limit = uploadSizeLimitBytes if int64(cap(b.buffer)) == bufferSizeLimitEvents { return @@ -128,8 +130,6 @@ func (b *eventBuffer) Upload(ctx context.Context) error { return &bufferEmpty{} } - encoder := newChunkEncoder(b.uploadSizeLimitBytes) - for range eventLen { event := b.readEvent() if event == nil { @@ -140,13 +140,8 @@ func (b *eventBuffer) Upload(ctx context.Context) error { if event.chunk != nil { result = [][]byte{event.chunk} } else { - serialized, err := b.processEvent(event.EventV1) - if err != nil { - b.logError("%v", err) - continue - } - - result, err = encoder.WriteBytes(serialized) + var err error + result, err = b.enc.Write(*event.EventV1) if err != nil { b.incrMetric(logEncodingFailureCounterName) b.logError("encoding failure: %v, dropping event with decision ID: %v", err, event.DecisionID) @@ -160,7 +155,7 @@ func (b *eventBuffer) Upload(ctx context.Context) error { } // flush any chunks that didn't hit the upload limit - result, err := encoder.Flush() + result, err := b.enc.Flush() if err != nil { b.incrMetric(logEncodingFailureCounterName) b.logError("encoding failure: %v", err) @@ -199,34 +194,3 @@ func (b *eventBuffer) readEvent() *bufferItem { return nil } } - -// processEvent serializes the event and determines if the ND cache needs to be dropped -func (b *eventBuffer) processEvent(event *EventV1) ([]byte, error) { - serialized, err := json.Marshal(event) - - // The non-deterministic cache (NDBuiltinCache) could cause issues, if it is too big or can't be encoded try to drop it. - if err != nil || int64(len(serialized)) >= b.uploadSizeLimitBytes { - if event.NDBuiltinCache == nil { - return nil, fmt.Errorf("upload event size (%d) exceeds upload_size_limit_bytes (%d), dropping event with decision ID: %v", - int64(len(serialized)), b.uploadSizeLimitBytes, event.DecisionID) - } - - // Attempt to drop the ND cache to reduce size. If it is still too large, drop the event. - event.NDBuiltinCache = nil - var err error - serialized, err = json.Marshal(event) - if err != nil { - b.incrMetric(logEncodingFailureCounterName) - return nil, fmt.Errorf("encoding failure: %v, dropping event with decision ID: %v", err, event.DecisionID) - } - if int64(len(serialized)) > b.uploadSizeLimitBytes { - return nil, fmt.Errorf("upload event size (%d) exceeds upload_size_limit_bytes (%d), dropping event with decision ID: %v", - int64(len(serialized)), b.uploadSizeLimitBytes, event.DecisionID) - } - - b.incrMetric(logNDBDropCounterName) - b.logError("ND builtins cache dropped from this event to fit under maximum upload size limits. Increase upload size limit or change usage of non-deterministic builtins.") - } - - return serialized, nil -} diff --git a/v1/plugins/logs/plugin.go b/v1/plugins/logs/plugin.go index 5893afee1c..d9118fde31 100644 --- a/v1/plugins/logs/plugin.go +++ b/v1/plugins/logs/plugin.go @@ -6,7 +6,6 @@ package logs import ( - "bytes" "context" "encoding/json" "errors" @@ -254,7 +253,8 @@ const ( defaultMaxDelaySeconds = int64(600) defaultBufferSizeLimitEvents = int64(10000) defaultUploadSizeLimitBytes = int64(32768) // 32KB limit - defaultBufferSizeLimitBytes = int64(0) // unlimited + maxUploadSizeLimitBytes = int64(4294967296) + defaultBufferSizeLimitBytes = int64(0) // unlimited defaultMaskDecisionPath = "/system/log/mask" defaultDropDecisionPath = "/system/log/drop" logRateLimitExDropCounterName = "decision_logs_dropped_rate_limit_exceeded" @@ -303,7 +303,7 @@ type Config struct { dropDecisionRef ast.Ref } -func (c *Config) validateAndInjectDefaults(services []string, pluginsList []string, trigger *plugins.TriggerMode) error { +func (c *Config) validateAndInjectDefaults(services []string, pluginsList []string, trigger *plugins.TriggerMode, l logging.Logger) error { if c.Plugin != nil { var found bool @@ -372,7 +372,18 @@ func (c *Config) validateAndInjectDefaults(services []string, pluginsList []stri uploadLimit = *c.Reporting.UploadSizeLimitBytes } - c.Reporting.UploadSizeLimitBytes = &uploadLimit + maxUploadLimit := maxUploadSizeLimitBytes + switch { + case uploadLimit > maxUploadLimit: + c.Reporting.UploadSizeLimitBytes = &maxUploadLimit + if l != nil { + l.Warn("the configured `upload_size_limit_bytes` (%d) has been set to the maximum limit (%d)", uploadLimit, maxUploadLimit) + } + case uploadLimit <= 0: + return fmt.Errorf("the configured `upload_size_limit_bytes` (%d) must be greater than 0", uploadLimit) + default: + c.Reporting.UploadSizeLimitBytes = &uploadLimit + } if c.Reporting.BufferType == "" { c.Reporting.BufferType = sizeBufferType @@ -511,6 +522,7 @@ type ConfigBuilder struct { services []string plugins []string trigger *plugins.TriggerMode + logger logging.Logger } // NewConfigBuilder returns a new ConfigBuilder to build and parse the plugin config. @@ -518,6 +530,11 @@ func NewConfigBuilder() *ConfigBuilder { return &ConfigBuilder{} } +func (b *ConfigBuilder) WithLogger(l logging.Logger) *ConfigBuilder { + b.logger = l + return b +} + // WithBytes sets the raw plugin config. func (b *ConfigBuilder) WithBytes(config []byte) *ConfigBuilder { b.raw = config @@ -559,7 +576,7 @@ func (b *ConfigBuilder) Parse() (*Config, error) { return nil, nil } - if err := parsedConfig.validateAndInjectDefaults(b.services, b.plugins, b.trigger); err != nil { + if err := parsedConfig.validateAndInjectDefaults(b.services, b.plugins, b.trigger, b.logger); err != nil { return nil, err } @@ -912,6 +929,11 @@ func (p *Plugin) oneShot(ctx context.Context) error { oldBuffer := p.buffer p.buffer = newLogBuffer(*p.config.Reporting.BufferSizeLimitBytes) p.enc = newChunkEncoder(*p.config.Reporting.UploadSizeLimitBytes).WithMetrics(p.metrics) + // keep the adaptive uncompressed limit throughout the lifecycle of the size buffer + // this ensures that the uncompressed limit can grow/shrink appropriately as new data comes in + p.enc.uncompressedLimit = oldChunkEnc.uncompressedLimit + p.enc.uncompressedLimitScaleDownExponent = oldChunkEnc.uncompressedLimitScaleDownExponent + p.enc.uncompressedLimitScaleUpExponent = oldChunkEnc.uncompressedLimitScaleUpExponent p.mtx.Unlock() // Along with uploading the compressed events in the buffer @@ -1027,54 +1049,17 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) { return } - result, err := p.encodeEvent(event) - if err != nil { - // If there's no ND builtins cache in the event, then we don't - // need to retry encoding anything. - if event.NDBuiltinCache == nil { - // TODO(tsandall): revisit this now that we have an API that - // can return an error. Should the default behaviour be to - // fail-closed as we do for plugins? - - p.incrMetric(logEncodingFailureCounterName) - p.logger.Error("Log encoding failed: %v.", err) - return - } - - // Attempt to encode the event again, dropping the ND builtins cache. - newEvent := event - newEvent.NDBuiltinCache = nil - - result, err = p.encodeEvent(newEvent) - if err != nil { - p.incrMetric(logEncodingFailureCounterName) - p.logger.Error("Log encoding failed: %v.", err) - return - } - - // Re-encoding was successful, but we still need to alert users. - p.logger.Error("ND builtins cache dropped from this event to fit under maximum upload size limits. Increase upload size limit or change usage of non-deterministic builtins.") - p.incrMetric(logNDBDropCounterName) - } - p.mtx.Lock() defer p.mtx.Unlock() + result, err := p.enc.Write(event) + if err != nil { + return + } for _, chunk := range result { p.bufferChunk(p.buffer, chunk) } } -func (p *Plugin) encodeEvent(event EventV1) ([][]byte, error) { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(event); err != nil { - return nil, err - } - - p.mtx.Lock() - defer p.mtx.Unlock() - return p.enc.WriteBytes(buf.Bytes()) -} - func (p *Plugin) bufferChunk(buffer *logBuffer, bs []byte) { dropped := buffer.Push(bs) if dropped > 0 { diff --git a/v1/plugins/logs/plugin_benchmark_test.go b/v1/plugins/logs/plugin_benchmark_test.go index 1dfb08e8a9..20dce0e609 100644 --- a/v1/plugins/logs/plugin_benchmark_test.go +++ b/v1/plugins/logs/plugin_benchmark_test.go @@ -149,7 +149,7 @@ func BenchmarkMaskingNop(b *testing.B) { cfg := &Config{Service: "svc"} t := plugins.DefaultTriggerMode - if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &t); err != nil { + if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &t, nil); err != nil { b.Fatal(err) } plugin := New(cfg, manager) @@ -187,7 +187,7 @@ func BenchmarkMaskingRuleCountsNop(b *testing.B) { cfg := &Config{Service: "svc"} t := plugins.DefaultTriggerMode - if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &t); err != nil { + if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &t, nil); err != nil { b.Fatal(err) } plugin := New(cfg, manager) @@ -240,7 +240,7 @@ func BenchmarkMaskingErase(b *testing.B) { cfg := &Config{Service: "svc"} t := plugins.DefaultTriggerMode - if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &t); err != nil { + if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &t, nil); err != nil { b.Fatal(err) } plugin := New(cfg, manager) diff --git a/v1/plugins/logs/plugin_test.go b/v1/plugins/logs/plugin_test.go index dd3820e9e2..32d6bac81b 100644 --- a/v1/plugins/logs/plugin_test.go +++ b/v1/plugins/logs/plugin_test.go @@ -420,8 +420,8 @@ func TestPluginStartSameInput(t *testing.T) { var input interface{} = map[string]interface{}{"method": "GET"} - for i := 0; i < 400; i++ { - fixture.plugin.Log(ctx, &server.Info{ + for i := range 800 { + if err := fixture.plugin.Log(ctx, &server.Info{ Revision: fmt.Sprint(i), DecisionID: fmt.Sprint(i), Path: "tda/bar", @@ -430,7 +430,9 @@ func TestPluginStartSameInput(t *testing.T) { RemoteAddr: "test", Timestamp: ts, Metrics: testMetrics, - }) + }); err != nil { + t.Fatal(err) + } } err = fixture.plugin.oneShot(ctx) @@ -441,9 +443,10 @@ func TestPluginStartSameInput(t *testing.T) { chunk1 := <-fixture.server.ch chunk2 := <-fixture.server.ch chunk3 := <-fixture.server.ch - expLen1 := 122 - expLen2 := 242 - expLen3 := 36 + // first size is smallest as the adaptive uncompressed limit increases more events can be added + expLen1 := 123 + expLen2 := 244 + expLen3 := 433 if len(chunk1) != expLen1 || len(chunk2) != expLen2 || len(chunk3) != expLen3 { t.Fatalf("Expected chunk lens %v, %v, and %v but got: %v, %v, and %v", expLen1, expLen2, expLen3, len(chunk1), len(chunk2), len(chunk3)) @@ -462,8 +465,8 @@ func TestPluginStartSameInput(t *testing.T) { "app": "example-app", "version": version.Version, }, - Revision: "399", - DecisionID: "399", + Revision: "799", + DecisionID: "799", Path: "tda/bar", Input: &expInput, Result: &result, @@ -489,7 +492,7 @@ func TestPluginStartChangingInputValues(t *testing.T) { fixture := newTestFixture(t) defer fixture.server.stop() - fixture.server.ch = make(chan []EventV1, 3) + fixture.server.ch = make(chan []EventV1, 2) var result interface{} = false ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z") @@ -499,10 +502,10 @@ func TestPluginStartChangingInputValues(t *testing.T) { var input interface{} - for i := 0; i < 400; i++ { + for i := 0; i < 350; i++ { input = map[string]interface{}{"method": getValueForMethod(i), "path": getValueForPath(i), "user": getValueForUser(i)} - fixture.plugin.Log(ctx, &server.Info{ + if err := fixture.plugin.Log(ctx, &server.Info{ Revision: fmt.Sprint(i), DecisionID: fmt.Sprint(i), Path: "foo/bar", @@ -510,7 +513,9 @@ func TestPluginStartChangingInputValues(t *testing.T) { Results: &result, RemoteAddr: "test", Timestamp: ts, - }) + }); err != nil { + t.Fatal(err) + } } err = fixture.plugin.oneShot(ctx) @@ -520,13 +525,11 @@ func TestPluginStartChangingInputValues(t *testing.T) { chunk1 := <-fixture.server.ch chunk2 := <-fixture.server.ch - chunk3 := <-fixture.server.ch - expLen1 := 124 - expLen2 := 247 - expLen3 := 29 + expLen1 := 126 + expLen2 := 224 - if len(chunk1) != expLen1 || len(chunk2) != expLen2 || len((chunk3)) != expLen3 { - t.Fatalf("Expected chunk lens %v, %v and %v but got: %v, %v and %v", expLen1, expLen2, expLen3, len(chunk1), len(chunk2), len(chunk3)) + if len(chunk1) != expLen1 || len(chunk2) != expLen2 { + t.Fatalf("Expected chunk lens %v and %v but got: %v and %v", expLen1, expLen2, len(chunk1), len(chunk2)) } exp := EventV1{ @@ -535,8 +538,8 @@ func TestPluginStartChangingInputValues(t *testing.T) { "app": "example-app", "version": version.Version, }, - Revision: "399", - DecisionID: "399", + Revision: "349", + DecisionID: "349", Path: "foo/bar", Input: &input, Result: &result, @@ -544,8 +547,8 @@ func TestPluginStartChangingInputValues(t *testing.T) { Timestamp: ts, } - if !reflect.DeepEqual(chunk3[expLen3-1], exp) { - t.Fatalf("Expected %+v but got %+v", exp, chunk3[expLen3-1]) + if !reflect.DeepEqual(chunk2[expLen2-1], exp) { + t.Fatalf("Expected %+v but got %+v", exp, chunk2[expLen2-1]) } } @@ -557,7 +560,7 @@ func TestPluginStartChangingInputKeysAndValues(t *testing.T) { fixture := newTestFixture(t) defer fixture.server.stop() - fixture.server.ch = make(chan []EventV1, 5) + fixture.server.ch = make(chan []EventV1, 1) var result interface{} = false ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z") @@ -567,10 +570,10 @@ func TestPluginStartChangingInputKeysAndValues(t *testing.T) { var input interface{} - for i := 0; i < 250; i++ { + for i := 0; i < 146; i++ { input = generateInputMap(i) - fixture.plugin.Log(ctx, &server.Info{ + if err := fixture.plugin.Log(ctx, &server.Info{ Revision: fmt.Sprint(i), DecisionID: fmt.Sprint(i), Path: "foo/bar", @@ -578,7 +581,9 @@ func TestPluginStartChangingInputKeysAndValues(t *testing.T) { Results: &result, RemoteAddr: "test", Timestamp: ts, - }) + }); err != nil { + t.Fatal(err) + } } err = fixture.plugin.oneShot(ctx) @@ -586,8 +591,7 @@ func TestPluginStartChangingInputKeysAndValues(t *testing.T) { t.Fatal(err) } - <-fixture.server.ch - chunk2 := <-fixture.server.ch + chunk := <-fixture.server.ch exp := EventV1{ Labels: map[string]string{ @@ -595,8 +599,8 @@ func TestPluginStartChangingInputKeysAndValues(t *testing.T) { "app": "example-app", "version": version.Version, }, - Revision: "249", - DecisionID: "249", + Revision: "145", + DecisionID: "145", Path: "foo/bar", Input: &input, Result: &result, @@ -604,8 +608,8 @@ func TestPluginStartChangingInputKeysAndValues(t *testing.T) { Timestamp: ts, } - if !reflect.DeepEqual(chunk2[len(chunk2)-1], exp) { - t.Fatalf("Expected %+v but got %+v", exp, chunk2[len(chunk2)-1]) + if !reflect.DeepEqual(chunk[len(chunk)-1], exp) { + t.Fatalf("Expected %+v but got %+v", exp, chunk[len(chunk)-1]) } } @@ -761,7 +765,7 @@ func TestPluginRateLimitInt(t *testing.T) { var input interface{} = map[string]interface{}{"method": "GET"} var result interface{} = false - eventSize := 218 + eventSize := 217 event1 := &server.Info{ DecisionID: "abc", Path: "foo/bar", @@ -917,7 +921,7 @@ func TestPluginRateLimitFloat(t *testing.T) { var input interface{} = map[string]interface{}{"method": "GET"} var result interface{} = false - eventSize := 218 + eventSize := 217 event1 := &server.Info{ DecisionID: "abc", Path: "foo/bar", @@ -1210,8 +1214,8 @@ func TestPluginStatusUpdateBufferSizeExceeded(t *testing.T) { fixture := newTestFixture(t, testFixtureOptions{ ConsoleLogger: testLogger, - ReportingBufferSizeLimitBytes: 200, - ReportingUploadSizeLimitBytes: 300, + ReportingBufferSizeLimitBytes: 400, + ReportingUploadSizeLimitBytes: 218, }) defer fixture.server.stop() @@ -1250,14 +1254,19 @@ func TestPluginStatusUpdateBufferSizeExceeded(t *testing.T) { } // write event 1 and 2 into the encoder and check the chunk is inserted into the buffer - _ = fixture.plugin.Log(ctx, event1) - _ = fixture.plugin.Log(ctx, event2) - - fixture.plugin.mtx.Lock() + if err := fixture.plugin.Log(ctx, event1); err != nil { + t.Error(err) + } if fixture.plugin.enc.bytesWritten == 0 { t.Fatal("Expected event to be written into the encoder") } + if err := fixture.plugin.Log(ctx, event2); err != nil { + t.Error(err) + } + + fixture.plugin.mtx.Lock() + if fixture.plugin.buffer.Len() == 0 { t.Fatal("Expected one chunk to be written into the buffer") } @@ -1610,7 +1619,7 @@ func TestPluginRateLimitDropCountStatus(t *testing.T) { t.Fatal(err) } - eventSize := 218 + eventSize := 217 expectedLen := 1 currentLen := getBufferLen(t, fixture, eventSize) if currentLen != expectedLen { @@ -1670,53 +1679,178 @@ func TestPluginRateLimitDropCountStatus(t *testing.T) { func TestChunkMaxUploadSizeLimitNDBCacheDropping(t *testing.T) { t.Parallel() - ctx := context.Background() - testLogger := test.New() - - ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z") - if err != nil { - panic(err) + // note this only tests the size buffer type so that the encoder is used immediately on log + tests := []struct { + name string + uploadSizeLimitBytes int64 + numOfEventsWithNDCache int // also the number of expected ND caches to be dropped + numOfEventsWithoutNDCache int + expectedUncompressedBytesWritten int + expectedEventsInChunk []int + expectedChunksLengths []int // this will show the adaptive uncompressed limit adjusting + expectedUncompressedLimit int64 + }{ + { + name: "logging one event with ND Cache", + uploadSizeLimitBytes: 220, + numOfEventsWithNDCache: 1, + // opening bracket (1 byte) + event(214 bytes) + // written after dropping the ND cache, otherwise the size would have been 3472 + expectedUncompressedBytesWritten: 215, + expectedUncompressedLimit: 220, + }, + { + name: "logging one event without ND Cache", + uploadSizeLimitBytes: 220, + numOfEventsWithoutNDCache: 1, + // opening bracket (1 byte) + event(214 bytes) + expectedUncompressedBytesWritten: 215, + expectedUncompressedLimit: 220, + }, + { + name: "logging a combination of events with ND Cache and without", + uploadSizeLimitBytes: 220, + numOfEventsWithoutNDCache: 1, + numOfEventsWithNDCache: 1, + // opening bracket (1 byte) + event(214 bytes) + expectedUncompressedBytesWritten: 0, + // the adaptive uncompressed limit won't have increased enough + // so it will close the chunk early thinking the two events exceed it + expectedChunksLengths: []int{196}, + expectedEventsInChunk: []int{2}, + expectedUncompressedLimit: 440, + }, + { + name: "logging multiple with ND Cache and without to show uncompressed limit improving", + uploadSizeLimitBytes: 220, + numOfEventsWithoutNDCache: 11, + // the adaptive uncompressed limit increases enough to fit in one more event before above the 90% threshold + expectedChunksLengths: []int{196, 199, 199, 199}, + expectedEventsInChunk: []int{2, 3, 3, 3}, + expectedUncompressedLimit: 440, + }, + { + name: "logging multiple with ND Cache showing the uncompressed limit gets reset", + uploadSizeLimitBytes: 220, + // the eleventh event is still in the encoder waiting to be written into a chunk + expectedUncompressedBytesWritten: 215, + numOfEventsWithNDCache: 11, + // no way of telling if the ND cache is being dropped or the uncompressed limit went to high + // so the uncompressed limit gets reset + expectedChunksLengths: []int{196, 196, 196, 196, 196}, + expectedEventsInChunk: []int{2, 2, 2, 2, 2}, + expectedUncompressedLimit: 220, + }, + { + name: "dropping the ND cache doesn't help, drop the event", + uploadSizeLimitBytes: 200, + expectedUncompressedLimit: 200, + }, } - fixture := newTestFixture(t, testFixtureOptions{ - ConsoleLogger: testLogger, - ReportingMaxDecisionsPerSecond: float64(1), // 1 decision per second - ReportingUploadSizeLimitBytes: 400, - }) - defer fixture.server.stop() + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + testLogger := test.New() - fixture.plugin.metrics = metrics.New() + ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z") + if err != nil { + panic(err) + } - var input interface{} = map[string]interface{}{"method": "GET"} - var result interface{} = false + fixture := newTestFixture(t, testFixtureOptions{ + ConsoleLogger: testLogger, + // With the ND cache the compressed size is 259 bytes + // Without the ND cache the compressed size is 215 bytes + ReportingUploadSizeLimitBytes: tc.uploadSizeLimitBytes, + }) + defer fixture.server.stop() - // Purposely oversized NDBCache entry will force dropping during Log(). - var ndbCacheExample interface{} = ast.MustJSON(builtins.NDBCache{ - "test.custom_space_waster": ast.NewObject([2]*ast.Term{ - ast.ArrayTerm(), - ast.StringTerm(strings.Repeat("Wasted space... ", 200)), - }), - }.AsValue()) + fixture.plugin.enc.metrics = metrics.New() - event := &server.Info{ - DecisionID: "abc", - Path: "foo/bar", - Input: &input, - Results: &result, - RemoteAddr: "test", - Timestamp: ts, - NDBuiltinCache: &ndbCacheExample, - } + var input interface{} = map[string]interface{}{"method": "GET"} + var result interface{} = false - beforeNDBDropCount := fixture.plugin.metrics.Counter(logNDBDropCounterName).Value().(uint64) - err = fixture.plugin.Log(ctx, event) // event should be written into the encoder - if err != nil { - t.Fatal(err) - } - afterNDBDropCount := fixture.plugin.metrics.Counter(logNDBDropCounterName).Value().(uint64) + for range tc.numOfEventsWithoutNDCache { + event := &server.Info{ + DecisionID: "abc", + Path: "foo/bar", + Input: &input, + Results: &result, + RemoteAddr: "test", + Timestamp: ts, + } + + err := fixture.plugin.Log(ctx, event) // event should be written into the encoder + if err != nil { + t.Fatal(err) + } + } + + // Purposely oversize NDBCache entry will force dropping during Log(). + ndbCacheExample := ast.MustJSON(builtins.NDBCache{ + "test.custom_space_waster": ast.NewObject([2]*ast.Term{ + ast.ArrayTerm(), + ast.StringTerm(strings.Repeat("Wasted space... ", 200)), + }), + }.AsValue()) + + for range tc.numOfEventsWithNDCache { + event := &server.Info{ + DecisionID: "abc", + Path: "foo/bar", + Input: &input, + Results: &result, + RemoteAddr: "test", + Timestamp: ts, + NDBuiltinCache: &ndbCacheExample, + } + + err := fixture.plugin.Log(ctx, event) + if err != nil { + t.Fatal(err) + } + } - if afterNDBDropCount != beforeNDBDropCount+1 { - t.Fatalf("Expected %v NDBCache drop events, saw %v events instead.", beforeNDBDropCount+1, afterNDBDropCount) + if tc.expectedUncompressedLimit != fixture.plugin.enc.uncompressedLimit { + t.Errorf("Expected %d uncompressed limit but got %d", tc.expectedUncompressedLimit, fixture.plugin.enc.uncompressedLimit) + } + + if len(tc.expectedChunksLengths) != fixture.plugin.buffer.Len() { + currentBufferLen := fixture.plugin.buffer.Len() + var receivedChunkLens []int + for chunk := fixture.plugin.buffer.Pop(); chunk != nil; chunk = fixture.plugin.buffer.Pop() { + receivedChunkLens = append(receivedChunkLens, len(chunk)) + } + t.Errorf("Expected %d chunks (%v) but got %d (%v)", + len(tc.expectedChunksLengths), tc.expectedChunksLengths, currentBufferLen, receivedChunkLens) + } + + for i, c := range tc.expectedChunksLengths { + chunk := fixture.plugin.buffer.Pop() + if c != len(chunk) { + t.Errorf("Expected %d chunk length but got %d", c, len(chunk)) + } + + events, err := newChunkDecoder(chunk).decode() + if err != nil { + t.Fatal(err) + } + if tc.expectedEventsInChunk[i] != len(events) { + t.Errorf("Expected %d events but got %d", tc.expectedEventsInChunk[i], len(events)) + } + } + + m := fixture.plugin.enc.metrics.Counter(logNDBDropCounterName).Value().(uint64) + if m != uint64(tc.numOfEventsWithNDCache) { + t.Errorf("Expected %d NDBCache drop events, saw %d events instead.", tc.numOfEventsWithNDCache, m) + } + + if fixture.plugin.enc.bytesWritten != tc.expectedUncompressedBytesWritten { + t.Errorf("Expected %d bytes to have been written to the encoder but got %d", tc.expectedUncompressedBytesWritten, fixture.plugin.enc.bytesWritten) + } + + }) } } @@ -2587,7 +2721,9 @@ func TestPluginMasking(t *testing.T) { // Instantiate the plugin. cfg := &Config{Service: "svc"} trigger := plugins.DefaultTriggerMode - cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger) + if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger, nil); err != nil { + t.Fatal(err) + } plugin := New(cfg, manager) @@ -2637,7 +2773,7 @@ func TestPluginMasking(t *testing.T) { // Reconfigure and ensure that mask is invalidated. maskDecision := "dead/beef" newConfig := &Config{Service: "svc", MaskDecision: &maskDecision} - if err := newConfig.validateAndInjectDefaults([]string{"svc"}, nil, &trigger); err != nil { + if err := newConfig.validateAndInjectDefaults([]string{"svc"}, nil, &trigger, nil); err != nil { t.Fatal(err) } @@ -2737,7 +2873,9 @@ func TestPluginDrop(t *testing.T) { // Instantiate the plugin. cfg := &Config{Service: "svc"} trigger := plugins.DefaultTriggerMode - cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger) + if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger, nil); err != nil { + t.Fatal(err) + } plugin := New(cfg, manager) @@ -2808,7 +2946,9 @@ func TestPluginMaskErrorHandling(t *testing.T) { // Instantiate the plugin. cfg := &Config{Service: "svc"} trigger := plugins.DefaultTriggerMode - cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger) + if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger, nil); err != nil { + t.Fatal(err) + } plugin := New(cfg, manager) @@ -2884,7 +3024,9 @@ func TestPluginDropErrorHandling(t *testing.T) { // Instantiate the plugin. cfg := &Config{Service: "svc"} trigger := plugins.DefaultTriggerMode - cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger) + if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger, nil); err != nil { + t.Fatal(err) + } plugin := New(cfg, manager) @@ -3564,6 +3706,8 @@ type testServer struct { } func (t *testServer) handle(w http.ResponseWriter, r *http.Request) { + t.t.Helper() + gr, err := gzip.NewReader(r.Body) if err != nil { t.t.Fatal(err) @@ -3654,3 +3798,67 @@ func testStatus() *bundle.Status { LastSuccessfulActivation: tActivate, } } + +func TestConfigUploadLimit(t *testing.T) { + tests := []struct { + name string + limit int64 + expectedLimit int64 + expectedLog string + expectedErr string + }{ + { + name: "exceed maximum limit", + limit: int64(8589934592), + expectedLimit: maxUploadSizeLimitBytes, + expectedLog: "the configured `upload_size_limit_bytes` (8589934592) has been set to the maximum limit (4294967296)", + }, + { + name: "nothing changes", + limit: 1000, + expectedLimit: 1000, + }, + { + name: "negative limit", + limit: -1, + expectedErr: "the configured `upload_size_limit_bytes` (-1) must be greater than 0", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + testLogger := test.New() + + cfg := &Config{ + Service: "svc", + Reporting: ReportingConfig{ + UploadSizeLimitBytes: &tc.limit, + }, + } + trigger := plugins.DefaultTriggerMode + if err := cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger, testLogger); err != nil { + if tc.expectedErr != "" { + if tc.expectedErr != err.Error() { + t.Fatalf("Expected error to be `%s` but got `%s`", tc.expectedErr, err.Error()) + } else { + return + } + } else { + t.Fatal(err) + } + } + + if *cfg.Reporting.UploadSizeLimitBytes != tc.expectedLimit { + t.Fatalf("Expected upload limit to be %d but got %d", tc.expectedLimit, cfg.Reporting.UploadSizeLimitBytes) + } + + if tc.expectedLog != "" { + e := testLogger.Entries() + if e[0].Message != tc.expectedLog { + t.Fatalf("Expected log to be %s but got %s", tc.expectedLog, e[0].Message) + } + } + }) + } +}