Skip to content

Implement conditional processing at the processor level for logs #10127

Closed
@niedbalski

Description

@niedbalski

Description

Add support for conditional execution of processors in Fluent Bit based on log record content. This will allow processors to be selectively applied only when specified conditions are met, leveraging the existing flb_conditionals.c implementation.

Implementation details

The implementation would extend the processor architecture to:

  1. Allow processors to have conditions defined in YAML configuration
  2. Check if conditions are met for each log record before applying the processor
  3. Skip processing for log records that don't match the conditions
  4. Use the existing conditional operators from flb_conditionals.c

Examples

1. Conditionally modify logs based on log level with content_modifier

pipeline:
  processors:
    logs:
      - name: content_modifier
        match: '*'
        condition:
          operator: AND
          rules:
            - field: "$log[\"level\"]"
              operator: eq
              value: "error"
        action: insert
        context: log_body
        key: priority
        value: high

2. Apply SQL processor only to database logs

pipeline:
  processors:
    logs:
      - name: sql
        match: '*'
        condition:
          operator: AND
          rules:
            - field: "$service"
              operator: eq
              value: "database"
        script: |
          SELECT 
            record.timestamp,
            CASE 
              WHEN record.query_time > 1.0 THEN 'slow'
              ELSE 'normal'
            END as query_speed,
            record.query
          FROM STREAM

3. Process only HTTP 500 errors with content_modifier

pipeline:
  processors:
    logs:
      - name: content_modifier
        match: '*'
        condition:
          operator: OR
          rules:
            - field: "$http[\"status_code\"]"
              operator: eq
              value: "500"
            - field: "$http[\"status_code\"]"
              operator: eq
              value: "503"
        action: insert
        context: log_body
        key: needs_investigation
        value: true

4. Filter sensitive data with content_modifier for specific paths

pipeline:
  processors:
    logs:
      - name: content_modifier
        match: '*'
        condition:
          operator: AND
          rules:
            - field: "$http[\"url\"]"
              operator: regex
              value: "^/api/(auth|payment|user).*$"
        action: delete
        context: log_body
        key: user.credentials

5. Add OpenTelemetry envelope only for production logs

pipeline:
  processors:
    logs:
      - name: opentelemetry_envelope
        match: '*'
        condition:
          operator: AND
          rules:
            - field: "$env"
              operator: eq
              value: "production"

6. Apply template processor only for specific services

pipeline:
  processors:
    logs:
      - name: template
        match: '*'
        condition:
          operator: AND
          rules:
            - field: "$app[\"type\"]"
              operator: eq
              value: "mobile"
            - field: "$app[\"version\"]"
              operator: regex
              value: "^1\\.[0-8].*$"

7. Conditionally add context data for debugging with content_modifier

pipeline:
  processors:
    logs:
      - name: content_modifier
        match: '*'
        condition:
          operator: AND
          rules:
            - field: "$log[\"level\"]"
              operator: eq
              value: "debug"
        action: upsert
        context: log_body
        key: debug_data
        value: true

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions