Skip to content

[pkg/stanza] Container parser still uses BatchingLogEmitter when synchronousLogEmitter feature gate is enabled #47828

@jeffreylimnardy

Description

@jeffreylimnardy

Component(s)

pkg/stanza

Is your feature request related to a problem? Please describe.

The stanza.synchronousLogEmitter feature gate replaces the asynchronous BatchingLogEmitter with a synchronous passthrough emitter to prevent data loss. However, the container parser creates its own internal BatchingLogEmitter that doesn't check the feature gate, making the feature gate ineffective on the CRI log path.

The problem

pkg/stanza/operator/parser/container/config.go:84 hardcodes:

cLogEmitter := helper.NewBatchingLogEmitter(set, p.consumeEntries)

This emitter sits in an internal sub-pipeline used for CRI partial line reassembly:

recombine → criLogEmitter (BatchingLogEmitter) → consumeEntries → WriteBatch

The feature gate is only checked at the adapter level.

if metadata.StanzaSynchronousLogEmitterFeatureGate.IsEnabled() {

The internal emitter is instantiated lazily on the CRI/containerd path via criConsumerStartOnce in both Process and ProcessBatch

Docker JSON logs bypass it entirely and are unaffected.

Observed behavior

v0.149.0, feature gate enabled, generator writing 1 log/sec to each of 10 files for 30s (300 lines total). Metric is backend export RPC count.

Log format With container parser Without
CRI/containerd 37 300
Docker JSON 300 300

On the CRI path, the internal emitter pre-coalesces entries within its 100ms flush window, so the adapter-level synchronous emitter sees pre-batched input and the gate has no observable effect. Docker JSON matches the baseline in both configs, confirming the issue is scoped to the CRI path where the BatchingLogEmitter is used.

Collector configs used

service:
  pipelines:
    logs/test:
      receivers:
        - filelog/test
      exporters:
        - otlp_grpc/test
  telemetry:
    metrics:
      level: detailed
      readers:
        - pull:
            exporter:
              prometheus:
                host: 0.0.0.0
                port: 8888
    logs:
      level: info
      encoding: json
receivers:
  filelog/test:
    exclude:
      - /var/log/pods/kube-system_*/*/*.log
    include:
      - /var/log/pods/*_*/*/*.log
    include_file_name: false
    include_file_path: true
    start_at: beginning
    retry_on_failure:
      enabled: true
      initial_interval: 5s
      max_interval: 30s
      max_elapsed_time: 300s

# remove for the config without containerparser
    operators:
      - id: containerd-parser
        type: container
        add_metadata_from_file_path: true
        format: containerd
exporters:
  otlp_grpc/test:
    endpoint: http://otelcol-backend:4317
    tls:
      insecure: true
    compression: gzip
    retry_on_failure:
      enabled: false

Describe the solution you'd like

Have the container parser check the feature gate when creating its internal emitter:

// config.go line 84, currently:
cLogEmitter := helper.NewBatchingLogEmitter(set, p.consumeEntries)

// proposed:
var cLogEmitter helper.LogEmitter
if metadata.StanzaSynchronousLogEmitterFeatureGate.IsEnabled() {
    cLogEmitter = helper.NewSynchronousLogEmitter(set, p.consumeEntries)
} else {
    cLogEmitter = helper.NewBatchingLogEmitter(set, p.consumeEntries)
}

The criLogEmitter field type in the Parser struct would change from *helper.BatchingLogEmitter to helper.LogEmitter (the interface that both types satisfy). The SynchronousLogEmitter already handles both Process (used by recombine's background flush timer for stray partial lines) and ProcessBatch (used by recombine's normal output path).

Describe alternatives you've considered

No response

Additional context

Feature gate issue: #35456

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions