diff --git a/.chloggen/config.yaml b/.chloggen/config.yaml index 0ae927cd439eb..4e1b44a68b2c9 100644 --- a/.chloggen/config.yaml +++ b/.chloggen/config.yaml @@ -190,6 +190,7 @@ components: - processor/deltatorate - processor/digitaloceandetector - processor/dnslookup + - processor/drain - processor/dynatracedetector - processor/filter - processor/geoip diff --git a/.chloggen/processor-drainprocessor.yaml b/.chloggen/processor-drainprocessor.yaml new file mode 100644 index 0000000000000..8c77ba223f4f9 --- /dev/null +++ b/.chloggen/processor-drainprocessor.yaml @@ -0,0 +1,36 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: processor/drain + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add drain processor that applies the Drain log clustering algorithm to annotate log records with a derived template string. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [47235] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The processor sets `log.record.template` (e.g. `"user <*> logged in from <*>"`) on each log record. + Downstream processors such as the filter processor can act on this attribute to, for example, drop + entire classes of noisy logs by template string. + + Key features: + - Configurable Drain parse tree parameters (depth, similarity threshold, max clusters with LRU eviction) + - Optional seeding via known template strings or example log lines for stable templates across restarts + - `passthrough` warmup mode (default) and `buffer` warmup mode that holds records until the tree has stabilized + - Internal telemetry metrics: active cluster count gauge, annotated and unannotated record counters + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.codecov.yml b/.codecov.yml index 08a6b1f68fdd9..219b6827bf2ff 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -457,6 +457,10 @@ component_management: name: processor_dnslookup paths: - processor/dnslookupprocessor/** + - component_id: processor_drain + name: processor_drain + paths: + - processor/drainprocessor/** - component_id: processor_filter name: processor_filter paths: diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 57d3e1ed3e68b..0736bce8e0b25 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -199,6 +199,7 @@ processor/cumulativetodeltaprocessor/ @open-telemetry processor/deltatocumulativeprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams processor/deltatorateprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9 processor/dnslookupprocessor/ @open-telemetry/collector-contrib-approvers @andrzej-stencel @kaisecheng @edmocosta +processor/drainprocessor/ @open-telemetry/collector-contrib-approvers @MikeGoldsmith processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @evan-bradley @edmocosta @bogdandrutu processor/geoipprocessor/ @open-telemetry/collector-contrib-approvers @andrzej-stencel @michalpristas @rogercoll processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo @amdprophet diff --git a/.github/ISSUE_TEMPLATE/beta_stability.yaml b/.github/ISSUE_TEMPLATE/beta_stability.yaml index 8f777d4c9ea5e..dcb38cf4def33 100644 --- a/.github/ISSUE_TEMPLATE/beta_stability.yaml +++ b/.github/ISSUE_TEMPLATE/beta_stability.yaml @@ -197,6 +197,7 @@ body: - processor/deltatocumulative - processor/deltatorate - processor/dnslookup + - processor/drain - processor/filter - processor/geoip - processor/groupbyattrs diff --git a/.github/ISSUE_TEMPLATE/bug_report.yaml b/.github/ISSUE_TEMPLATE/bug_report.yaml index af5c1d48ff430..ade93ab0417d0 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yaml +++ b/.github/ISSUE_TEMPLATE/bug_report.yaml @@ -200,6 +200,7 @@ body: - processor/deltatocumulative - processor/deltatorate - processor/dnslookup + - processor/drain - processor/filter - processor/geoip - processor/groupbyattrs diff --git a/.github/ISSUE_TEMPLATE/feature_request.yaml b/.github/ISSUE_TEMPLATE/feature_request.yaml index 7203a65d469ce..d2016f509a1da 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yaml +++ b/.github/ISSUE_TEMPLATE/feature_request.yaml @@ -194,6 +194,7 @@ body: - processor/deltatocumulative - processor/deltatorate - processor/dnslookup + - processor/drain - processor/filter - processor/geoip - processor/groupbyattrs diff --git a/.github/ISSUE_TEMPLATE/other.yaml b/.github/ISSUE_TEMPLATE/other.yaml index eaefa4f05a68e..f52fbd98f0c2c 100644 --- a/.github/ISSUE_TEMPLATE/other.yaml +++ b/.github/ISSUE_TEMPLATE/other.yaml @@ -194,6 +194,7 @@ body: - processor/deltatocumulative - processor/deltatorate - processor/dnslookup + - processor/drain - processor/filter - processor/geoip - processor/groupbyattrs diff --git a/.github/ISSUE_TEMPLATE/unmaintained.yaml b/.github/ISSUE_TEMPLATE/unmaintained.yaml index 0a0298b3c5e04..f40e0456e5b23 100644 --- a/.github/ISSUE_TEMPLATE/unmaintained.yaml +++ b/.github/ISSUE_TEMPLATE/unmaintained.yaml @@ -199,6 +199,7 @@ body: - processor/deltatocumulative - processor/deltatorate - processor/dnslookup + - processor/drain - processor/filter - processor/geoip - processor/groupbyattrs diff --git a/.github/component_labels.txt b/.github/component_labels.txt index 8b327a6a9ff81..a5610fc1ff69b 100644 --- a/.github/component_labels.txt +++ b/.github/component_labels.txt @@ -180,6 +180,7 @@ processor/cumulativetodeltaprocessor processor/cumulativetodelta processor/deltatocumulativeprocessor processor/deltatocumulative processor/deltatorateprocessor processor/deltatorate processor/dnslookupprocessor processor/dnslookup +processor/drainprocessor processor/drain processor/filterprocessor processor/filter processor/geoipprocessor processor/geoip processor/groupbyattrsprocessor processor/groupbyattrs diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index 88e36f0320956..fdd00e811d43d 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -119,6 +119,7 @@ processors: - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/coralogixprocessor v0.148.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.148.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor v0.148.0 + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor v0.148.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.148.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor v0.148.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.148.0 diff --git a/internal/tidylist/tidylist.txt b/internal/tidylist/tidylist.txt index 4b13724200ca6..859b300c47075 100644 --- a/internal/tidylist/tidylist.txt +++ b/internal/tidylist/tidylist.txt @@ -215,6 +215,7 @@ processor/coralogixprocessor processor/cumulativetodeltaprocessor processor/deltatorateprocessor processor/dnslookupprocessor +processor/drainprocessor processor/filterprocessor processor/geoipprocessor processor/groupbyattrsprocessor diff --git a/processor/drainprocessor/Makefile b/processor/drainprocessor/Makefile new file mode 100644 index 0000000000000..ded7a36092dc3 --- /dev/null +++ b/processor/drainprocessor/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/processor/drainprocessor/README.md b/processor/drainprocessor/README.md new file mode 100644 index 0000000000000..335b29eedd50e --- /dev/null +++ b/processor/drainprocessor/README.md @@ -0,0 +1,223 @@ +# Drain Processor + +| Status | | +| ------------- |-----------| +| Stability | [development]: logs | +| Distributions | [contrib] | +| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fdrain%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fdrain) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fdrain%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fdrain) | + +[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development +[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib + +The drain processor applies the [Drain log clustering algorithm](https://jiemingzhu.github.io/pub/pjhe_icws2017.pdf) to log records as they pass through the pipeline. For each record it derives a template string (e.g. `"user <*> logged in from <*>"`) and attaches it as an attribute on the record. + +This processor **annotates**; it does not filter. Use the [filter processor](../filterprocessor/README.md) downstream to act on the `log.record.template` attribute — for example, to drop entire classes of noisy logs by pattern. + +## How it works + +Drain builds a parse tree from the token structure of log lines. Lines with similar structure are grouped into a **cluster**, and a **template** is derived by replacing variable tokens with `<*>` wildcards. As more logs arrive the templates become more accurate and stable. + +Templates are derived from the token structure of each log line and become more stable as more logs are observed. Use the template **string** for filtering rules; it converges to the same value across instances given the same configuration and log patterns (see [Deployment considerations](#deployment-considerations)). + +## Configuration + +```yaml +processors: + drain: + # Drain parse tree parameters + log_cluster_depth: 4 # default: 4 (minimum: 3) + sim_threshold: 0.4 # default: 0.4, range [0.0, 1.0] + max_children: 100 # default: 100 + max_clusters: 0 # default: 0 (unlimited, LRU eviction when > 0) + extra_delimiters: [] # default: [] (extra token delimiters beyond whitespace) + + # Body extraction + body_field: "" # default: "" (use full body string) + + # Output attribute name + template_attribute: "log.record.template" # default + + # Seeding (optional) + seed_templates: [] + seed_logs: [] + + # Warmup mode + warmup_mode: passthrough # default: "passthrough" | "buffer" + warmup_min_clusters: 10 # default: 10 (only used when warmup_mode: buffer) + warmup_buffer_max_logs: 10000 # default: 10000 (only used when warmup_mode: buffer) +``` + +### Parameters + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `log_cluster_depth` | int | `4` | Max depth of the Drain parse tree. Higher values produce more specific templates. Minimum: 3. | +| `sim_threshold` | float | `0.4` | Similarity threshold in [0.0, 1.0]. Lines below this threshold create a new cluster rather than merging with an existing one. | +| `max_children` | int | `100` | Maximum children per parse tree node. | +| `max_clusters` | int | `0` | Maximum clusters tracked. When exceeded, the least-recently-used cluster is evicted. `0` means unlimited. | +| `extra_delimiters` | []string | `[]` | Additional token delimiters beyond whitespace (e.g. `[",", ":"]`). | +| `body_field` | string | `""` | If set, and the log body is a structured map, the value of this top-level key is used as the text to template instead of the full body. | +| `template_attribute` | string | `"log.record.template"` | Attribute key written with the derived template string. | +| `seed_templates` | []string | `[]` | Template strings to pre-load at startup (see [Seeding](#seeding)). | +| `seed_logs` | []string | `[]` | Raw example log lines to train on at startup (see [Seeding](#seeding)). | +| `warmup_mode` | string | `"passthrough"` | Controls behavior during the warmup period. `"passthrough"` (default) or `"buffer"` (see [Warmup mode](#warmup-mode)). | +| `warmup_min_clusters` | int | `10` | Minimum distinct clusters before warmup ends. Only used when `warmup_mode: buffer`. | +| `warmup_buffer_max_logs` | int | `10000` | Maximum records to buffer before flushing regardless of cluster count. Only used when `warmup_mode: buffer`. Must be > 0. | + +## Seeding + +Seeding pre-populates the Drain tree before any live logs arrive. This is the primary mechanism for stable templates across restarts. + +### `seed_templates` + +Provide known template strings directly. The processor trains on each entry at startup, establishing clusters for those patterns immediately. + +```yaml +processors: + drain: + seed_templates: + - "user <*> logged in from <*>" + - "connected to <*>" + - "heartbeat ping <*>" +``` + +### `seed_logs` + +Provide raw example log lines. The processor trains on them at startup, letting Drain derive the templates itself. Useful when exact template strings are not known in advance. + +```yaml +processors: + drain: + seed_logs: + - "user alice logged in from 10.0.0.1" + - "user bob logged in from 192.168.1.1" + - "connected to 10.0.0.1" +``` + +Empty and whitespace-only entries in both lists are silently skipped. + +## Deployment considerations + +### Multiple collector instances + +Each collector instance builds its Drain parse tree independently in memory. Two instances processing the same log patterns will converge on identical templates because the Drain algorithm is deterministic: given the same configuration and a representative sample of log forms, the same token structure produces the same template string. + +The main caveat is the **early training phase**. Before an instance has seen enough lines to abstract a wildcard (e.g. before `"user alice logged in"` and `"user bob logged in"` have both been observed), different instances may temporarily produce different templates for the same logical pattern. This is most noticeable at startup with low-volume or highly variable log streams. + +**Mitigations:** +- Use `seed_templates` or `seed_logs` to pre-load known patterns at startup. With a comprehensive seed set, instances start in an already-converged state and live training only fills in the gaps. +- Use `buffer` warmup mode if downstream consumers require stable templates from the first record they receive. + +### Warmup modes + +The `warmup_mode` setting controls what happens before the parse tree has stabilized — i.e. before it has observed enough distinct log forms to produce reliable, abstracted templates. + +| Mode | Behavior | Trade-off | +|------|----------|-----------| +| `passthrough` (default) | Annotates every record immediately. Early records may receive less-abstracted templates (e.g. a raw line rather than a wildcarded form) that change as more data arrives. | No latency or memory overhead. Downstream consumers must tolerate template churn at startup. | +| `buffer` | Holds records in memory until `warmup_min_clusters` distinct templates have been observed, or `warmup_buffer_max_logs` is reached. Flushes all buffered records at once, fully annotated. | Templates are stable from the first record downstream sees. Adds startup latency and memory pressure proportional to buffer size. | + +Choose `passthrough` when: +- Downstream consumers are tolerant of occasional template changes (e.g. they use templates for volume aggregation where a brief inconsistency is acceptable). +- You are using `seed_templates` or `seed_logs` to pre-stabilize the tree. + +Choose `buffer` when: +- A downstream `filter` processor must reliably match templates from the very first record — emitting an unstabilised template could cause records to pass through a filter they should have been dropped by. +- You have strict ordering or completeness requirements and cannot tolerate records being annotated with different templates for the same log pattern. + +```yaml +processors: + drain: + warmup_mode: buffer + warmup_min_clusters: 20 + warmup_buffer_max_logs: 5000 +``` + +> **Memory note**: in buffer mode, all records are held in memory until flush. Size the buffer with `warmup_buffer_max_logs` according to your available memory and expected log volume during startup. + +## Metrics + +The processor emits the following internal telemetry metrics: + +| Metric | Type | Description | +|--------|------|-------------| +| `otelcol_processor_drain_clusters_active` | gauge | Current number of active clusters in the Drain parse tree. Useful for tracking tree growth and stability over time. | +| `otelcol_processor_drain_log_records_annotated` | counter | Number of log records successfully annotated with a template. | +| `otelcol_processor_drain_log_records_unannotated` | counter | Number of log records not annotated — empty body, Train error, or no cluster returned by Drain. | + +## Output attributes + +The processor sets the following attribute on each log record: + +| Attribute | Type | Example | Description | +|-----------|------|---------|-------------| +| `log.record.template` | string | `"user <*> logged in from <*>"` | The Drain-derived template string. Stable within an instance once the tree has warmed up. Use this for filtering rules. | + +The attribute name is configurable via `template_attribute`. + +> **Semantic conventions**: `log.record.template` aligns with the proposed OTel attribute in [open-telemetry/semantic-conventions#1283](https://github.com/open-telemetry/semantic-conventions/issues/1283) and [#2064](https://github.com/open-telemetry/semantic-conventions/issues/2064). These names may be updated if a convention is formally adopted. + +## Example pipeline + +The following pipeline annotates logs with Drain templates and then drops known noisy patterns using the filter processor: + +```yaml +processors: + drain: + log_cluster_depth: 4 + sim_threshold: 0.4 + max_clusters: 500 + seed_templates: + - "user <*> logged in from <*>" + - "connected to <*>" + - "heartbeat ping <*>" + warmup_mode: buffer + warmup_min_clusters: 20 + warmup_buffer_max_logs: 5000 + + filter/drop_noisy: + error_mode: ignore + logs: + log_record: + - attributes["log.record.template"] == "heartbeat ping <*>" + - attributes["log.record.template"] == "connected to <*>" + +service: + pipelines: + logs: + receivers: [otlp] + processors: [drain, filter/drop_noisy] + exporters: [otlp] +``` + +## `body_field` + +`body_field` is a convenience for pipelines where the log body is a structured map and you do not have full control over how upstream processors shape it. + +If you **do** control the pipeline, the preferred approach is a `move` operator in the filelog receiver (or equivalent) to promote the message field back to a plain string body before the drain processor sees the record: + +```yaml +operators: + - type: json_parser + - type: move + from: body.message + to: body +``` + +If you **cannot** do that — for example, logs arrive via OTLP already structured — set `body_field` to the map key whose value should be fed to Drain: + +```yaml +processors: + drain: + body_field: "message" +``` + +Given a log body `{"level": "info", "message": "user alice logged in from 10.0.0.1"}`, only the `message` value is fed to Drain. The full body is used unchanged if the field is absent or the body is not a map. + +> **Note**: `body_field` only supports a single top-level key. Full OTTL path expressions (e.g. `body["event"]["message"]`) are not supported and are noted as a future extension. + +## Future extensions + +- **Snapshot persistence**: save and restore the Drain tree state across restarts, eliminating the need for seeding. This requires serialization support and is tracked as a future improvement. +- **OTTL body extraction**: support full OTTL path expressions for `body_field` instead of a single top-level key name. +- **Multi-instance synchronization**: optional shared snapshot file or gossip-based tree merging for consistent templates across horizontally scaled deployments. diff --git a/processor/drainprocessor/config.go b/processor/drainprocessor/config.go new file mode 100644 index 0000000000000..4c89bf737098f --- /dev/null +++ b/processor/drainprocessor/config.go @@ -0,0 +1,94 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package drainprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor" + +import ( + "errors" + "fmt" +) + +// Config defines configuration for the drain processor. +type Config struct { + // LogClusterDepth is the max depth of the Drain parse tree. + // Higher values produce more specific templates. Default: 4. Minimum: 3. + LogClusterDepth int `mapstructure:"log_cluster_depth"` + + // SimThreshold is the similarity threshold (0.0–1.0) below which a new + // cluster is created rather than merged with an existing one. Default: 0.4. + SimThreshold float64 `mapstructure:"sim_threshold"` + + // MaxChildren is the maximum number of children per parse tree node. + // Default: 100. + MaxChildren int `mapstructure:"max_children"` + + // MaxClusters is the maximum number of clusters tracked. When the limit is + // reached, the least recently used cluster is evicted. 0 means unlimited. + // Default: 0. + MaxClusters int `mapstructure:"max_clusters"` + + // ExtraDelimiters are additional token delimiters beyond whitespace. + ExtraDelimiters []string `mapstructure:"extra_delimiters"` + + // BodyField optionally specifies a top-level key to extract from a + // structured (map) log body before feeding the value to Drain. If empty, + // the full body string representation is used. This is a convenience for + // pipelines where the body is a parsed map (e.g. after json_parser) and + // the user does not have a move operator to promote the message field back + // to a plain string body. Pipelines that do have that control should use a + // move operator instead and leave this unset. + BodyField string `mapstructure:"body_field"` + + // TemplateAttribute is the log record attribute key to write the derived + // template string to. Default: "log.record.template". + TemplateAttribute string `mapstructure:"template_attribute"` + + // SeedTemplates is a list of pre-known template strings to train on at + // startup before any live logs arrive. Improves template stability across + // restarts for known log patterns. + SeedTemplates []string `mapstructure:"seed_templates"` + + // SeedLogs is a list of raw example log lines to train on at startup. + // Drain derives templates from these lines itself. + SeedLogs []string `mapstructure:"seed_logs"` + + // WarmupMode controls processor behavior during the initial period before + // the Drain tree has stabilized. Valid values: "passthrough" (default), + // "buffer". + WarmupMode string `mapstructure:"warmup_mode"` + + // WarmupMinClusters is the number of distinct clusters that must be + // observed before warmup ends. Only used when WarmupMode is "buffer". + // Default: 10. + WarmupMinClusters int `mapstructure:"warmup_min_clusters"` + + // WarmupBufferMaxLogs is the maximum number of log records to buffer + // during warmup before flushing regardless of cluster count. Only used + // when WarmupMode is "buffer". Must be > 0. Default: 10000. + WarmupBufferMaxLogs int `mapstructure:"warmup_buffer_max_logs"` +} + +const ( + warmupModePassthrough = "passthrough" + warmupModeBuffer = "buffer" +) + +// Validate checks the Config for invalid values. +func (cfg *Config) Validate() error { + if cfg.LogClusterDepth < 3 { + return fmt.Errorf("log_cluster_depth must be >= 3, got %d", cfg.LogClusterDepth) + } + if cfg.SimThreshold < 0.0 || cfg.SimThreshold > 1.0 { + return fmt.Errorf("sim_threshold must be in [0.0, 1.0], got %f", cfg.SimThreshold) + } + if cfg.WarmupMode != warmupModePassthrough && cfg.WarmupMode != warmupModeBuffer { + return fmt.Errorf("warmup_mode must be %q or %q, got %q", warmupModePassthrough, warmupModeBuffer, cfg.WarmupMode) + } + if cfg.WarmupMode == warmupModeBuffer && cfg.WarmupMinClusters <= 0 { + return errors.New("warmup_min_clusters must be > 0 when warmup_mode is \"buffer\"") + } + if cfg.WarmupMode == warmupModeBuffer && cfg.WarmupBufferMaxLogs <= 0 { + return errors.New("warmup_buffer_max_logs must be > 0 when warmup_mode is \"buffer\"") + } + return nil +} diff --git a/processor/drainprocessor/config_test.go b/processor/drainprocessor/config_test.go new file mode 100644 index 0000000000000..41adae20a66e3 --- /dev/null +++ b/processor/drainprocessor/config_test.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package drainprocessor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConfigValidate(t *testing.T) { + valid := func() *Config { + cfg := createDefaultConfig().(*Config) + return cfg + } + + t.Run("valid default config", func(t *testing.T) { + require.NoError(t, valid().Validate()) + }) + + t.Run("log_cluster_depth < 3", func(t *testing.T) { + cfg := valid() + cfg.LogClusterDepth = 2 + assert.Error(t, cfg.Validate()) + }) + + t.Run("log_cluster_depth == 3 is valid", func(t *testing.T) { + cfg := valid() + cfg.LogClusterDepth = 3 + require.NoError(t, cfg.Validate()) + }) + + t.Run("sim_threshold below range", func(t *testing.T) { + cfg := valid() + cfg.SimThreshold = -0.1 + assert.Error(t, cfg.Validate()) + }) + + t.Run("sim_threshold above range", func(t *testing.T) { + cfg := valid() + cfg.SimThreshold = 1.1 + assert.Error(t, cfg.Validate()) + }) + + t.Run("sim_threshold at boundaries", func(t *testing.T) { + cfg := valid() + cfg.SimThreshold = 0.0 + require.NoError(t, cfg.Validate()) + cfg.SimThreshold = 1.0 + require.NoError(t, cfg.Validate()) + }) + + t.Run("invalid warmup_mode", func(t *testing.T) { + cfg := valid() + cfg.WarmupMode = "invalid" + assert.Error(t, cfg.Validate()) + }) + + t.Run("buffer mode requires warmup_min_clusters > 0", func(t *testing.T) { + cfg := valid() + cfg.WarmupMode = warmupModeBuffer + cfg.WarmupMinClusters = 0 + cfg.WarmupBufferMaxLogs = 100 + assert.Error(t, cfg.Validate()) + }) + + t.Run("buffer mode requires warmup_buffer_max_logs > 0", func(t *testing.T) { + cfg := valid() + cfg.WarmupMode = warmupModeBuffer + cfg.WarmupBufferMaxLogs = 0 + assert.Error(t, cfg.Validate()) + }) + + t.Run("buffer mode valid", func(t *testing.T) { + cfg := valid() + cfg.WarmupMode = warmupModeBuffer + cfg.WarmupBufferMaxLogs = 100 + require.NoError(t, cfg.Validate()) + }) +} diff --git a/processor/drainprocessor/doc.go b/processor/drainprocessor/doc.go new file mode 100644 index 0000000000000..9b6f2ed879728 --- /dev/null +++ b/processor/drainprocessor/doc.go @@ -0,0 +1,9 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:generate make mdatagen + +// Package drainprocessor implements a processor that applies the Drain log +// clustering algorithm to log records, annotating each record with a derived +// template string and cluster ID. +package drainprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor" diff --git a/processor/drainprocessor/documentation.md b/processor/drainprocessor/documentation.md new file mode 100644 index 0000000000000..103d6d6f29129 --- /dev/null +++ b/processor/drainprocessor/documentation.md @@ -0,0 +1,31 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# drain + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_processor_drain_clusters_active + +Current number of active clusters in the Drain parse tree. + +| Unit | Metric Type | Value Type | Stability | +| ---- | ----------- | ---------- | --------- | +| {clusters} | Gauge | Int | Development | + +### otelcol_processor_drain_log_records_annotated + +Number of log records successfully annotated with a template. + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {records} | Sum | Int | true | Development | + +### otelcol_processor_drain_log_records_unannotated + +Number of log records not annotated (empty body, Train error, or no cluster returned by Drain). + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {records} | Sum | Int | true | Development | diff --git a/processor/drainprocessor/factory.go b/processor/drainprocessor/factory.go new file mode 100644 index 0000000000000..59ae3cc8affd0 --- /dev/null +++ b/processor/drainprocessor/factory.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package drainprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor/internal/metadata" +) + +var processorCapabilities = consumer.Capabilities{MutatesData: true} + +// NewFactory returns a new factory for the drain processor. +func NewFactory() processor.Factory { + return processor.NewFactory( + metadata.Type, + createDefaultConfig, + processor.WithLogs(createLogsProcessor, metadata.LogsStability), + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + LogClusterDepth: 4, + SimThreshold: 0.4, + MaxChildren: 100, + MaxClusters: 0, + TemplateAttribute: "log.record.template", + WarmupMode: warmupModePassthrough, + WarmupMinClusters: 10, + WarmupBufferMaxLogs: 10000, + } +} + +func createLogsProcessor( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (processor.Logs, error) { + proc, err := newDrainProcessor(set, cfg.(*Config)) + if err != nil { + return nil, err + } + return processorhelper.NewLogs( + ctx, + set, + cfg, + nextConsumer, + proc.processLogs, + processorhelper.WithCapabilities(processorCapabilities), + ) +} diff --git a/processor/drainprocessor/factory_test.go b/processor/drainprocessor/factory_test.go new file mode 100644 index 0000000000000..99c25d7573c5e --- /dev/null +++ b/processor/drainprocessor/factory_test.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package drainprocessor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor/processortest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor/internal/metadata" +) + +func TestNewFactory(t *testing.T) { + factory := NewFactory() + assert.Equal(t, metadata.Type, factory.Type()) +} + +func TestCreateDefaultConfig(t *testing.T) { + cfg := NewFactory().CreateDefaultConfig() + require.NoError(t, componenttest.CheckConfigStruct(cfg)) + + dc := cfg.(*Config) + assert.Equal(t, 4, dc.LogClusterDepth) + assert.InDelta(t, 0.4, dc.SimThreshold, 1e-9) + assert.Equal(t, 100, dc.MaxChildren) + assert.Equal(t, 0, dc.MaxClusters) + assert.Equal(t, "log.record.template", dc.TemplateAttribute) + assert.Equal(t, warmupModePassthrough, dc.WarmupMode) +} + +func TestCreateLogsProcessor(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + set := processortest.NewNopSettings(metadata.Type) + + lp, err := factory.CreateLogs(t.Context(), set, cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NotNil(t, lp) + + require.NoError(t, lp.Start(t.Context(), componenttest.NewNopHost())) + require.NoError(t, lp.Shutdown(t.Context())) +} diff --git a/processor/drainprocessor/generated_component_test.go b/processor/drainprocessor/generated_component_test.go new file mode 100644 index 0000000000000..11f64abacce8f --- /dev/null +++ b/processor/drainprocessor/generated_component_test.go @@ -0,0 +1,153 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package drainprocessor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" +) + +var typ = component.MustNewType("drain") + +func TestComponentFactoryType(t *testing.T) { + require.Equal(t, typ, NewFactory().Type()) +} + +func TestComponentConfigStruct(t *testing.T) { + require.NoError(t, componenttest.CheckConfigStruct(NewFactory().CreateDefaultConfig())) +} + +func TestComponentLifecycle(t *testing.T) { + factory := NewFactory() + + tests := []struct { + createFn func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) + name string + }{ + + { + name: "logs", + createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateLogs(ctx, set, cfg, consumertest.NewNop()) + }, + }, + } + + cm, err := confmaptest.LoadConf("metadata.yaml") + require.NoError(t, err) + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub("tests::config") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(&cfg)) + + for _, tt := range tests { + t.Run(tt.name+"-shutdown", func(t *testing.T) { + c, err := tt.createFn(context.Background(), processortest.NewNopSettings(typ), cfg) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + t.Run(tt.name+"-lifecycle", func(t *testing.T) { + c, err := tt.createFn(context.Background(), processortest.NewNopSettings(typ), cfg) + require.NoError(t, err) + host := newMdatagenNopHost() + err = c.Start(context.Background(), host) + require.NoError(t, err) + require.NotPanics(t, func() { + switch tt.name { + case "logs": + e, ok := c.(processor.Logs) + require.True(t, ok) + logs := generateLifecycleTestLogs() + if !e.Capabilities().MutatesData { + logs.MarkReadOnly() + } + err = e.ConsumeLogs(context.Background(), logs) + case "metrics": + e, ok := c.(processor.Metrics) + require.True(t, ok) + metrics := generateLifecycleTestMetrics() + if !e.Capabilities().MutatesData { + metrics.MarkReadOnly() + } + err = e.ConsumeMetrics(context.Background(), metrics) + case "traces": + e, ok := c.(processor.Traces) + require.True(t, ok) + traces := generateLifecycleTestTraces() + if !e.Capabilities().MutatesData { + traces.MarkReadOnly() + } + err = e.ConsumeTraces(context.Background(), traces) + } + }) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + } +} + +func generateLifecycleTestLogs() plog.Logs { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("resource", "R1") + l := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + l.Body().SetStr("test log message") + l.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return logs +} + +func generateLifecycleTestMetrics() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("resource", "R1") + m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + m.SetName("test_metric") + dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + dp.Attributes().PutStr("test_attr", "value_1") + dp.SetIntValue(123) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return metrics +} + +func generateLifecycleTestTraces() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("resource", "R1") + span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.Attributes().PutStr("test_attr", "value_1") + span.SetName("test_span") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second))) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return traces +} + +var _ component.Host = (*mdatagenNopHost)(nil) + +type mdatagenNopHost struct{} + +func newMdatagenNopHost() component.Host { + return &mdatagenNopHost{} +} + +func (mnh *mdatagenNopHost) GetExtensions() map[component.ID]component.Component { + return nil +} + +func (mnh *mdatagenNopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { + return nil +} diff --git a/processor/drainprocessor/generated_package_test.go b/processor/drainprocessor/generated_package_test.go new file mode 100644 index 0000000000000..b9ff3af3938c0 --- /dev/null +++ b/processor/drainprocessor/generated_package_test.go @@ -0,0 +1,13 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package drainprocessor + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/processor/drainprocessor/go.mod b/processor/drainprocessor/go.mod new file mode 100644 index 0000000000000..0b30dd0e55df3 --- /dev/null +++ b/processor/drainprocessor/go.mod @@ -0,0 +1,58 @@ +module github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor + +go 1.25.0 + +require ( + github.com/jaeyo/go-drain3 v0.1.2 + github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/collector/component v1.54.1-0.20260330144813-4d17eb8959de + go.opentelemetry.io/collector/component/componenttest v0.148.1-0.20260330144813-4d17eb8959de + go.opentelemetry.io/collector/confmap v1.54.1-0.20260330144813-4d17eb8959de + go.opentelemetry.io/collector/consumer v1.54.1-0.20260330144813-4d17eb8959de + go.opentelemetry.io/collector/consumer/consumertest v0.148.1-0.20260330144813-4d17eb8959de + go.opentelemetry.io/collector/pdata v1.54.1-0.20260330144813-4d17eb8959de + go.opentelemetry.io/collector/processor v1.54.1-0.20260330144813-4d17eb8959de + go.opentelemetry.io/collector/processor/processorhelper v0.148.1-0.20260330144813-4d17eb8959de + go.opentelemetry.io/collector/processor/processortest v0.148.1-0.20260330144813-4d17eb8959de + go.opentelemetry.io/otel/metric v1.42.0 + go.opentelemetry.io/otel/sdk/metric v1.42.0 + go.opentelemetry.io/otel/trace v1.42.0 + go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.1 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.5.0 // indirect + github.com/gobwas/glob v0.2.3 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.8.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/knadh/koanf/maps v0.1.2 // indirect + github.com/knadh/koanf/providers/confmap v1.0.0 // indirect + github.com/knadh/koanf/v2 v2.3.4 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/collector/component/componentstatus v0.148.1-0.20260330144813-4d17eb8959de // indirect + go.opentelemetry.io/collector/consumer/xconsumer v0.148.1-0.20260330144813-4d17eb8959de // indirect + go.opentelemetry.io/collector/featuregate v1.54.1-0.20260330144813-4d17eb8959de // indirect + go.opentelemetry.io/collector/internal/componentalias v0.148.1-0.20260330144813-4d17eb8959de // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.148.1-0.20260330144813-4d17eb8959de // indirect + go.opentelemetry.io/collector/pdata/testdata v0.148.1-0.20260330144813-4d17eb8959de // indirect + go.opentelemetry.io/collector/pipeline v1.54.1-0.20260330144813-4d17eb8959de // indirect + go.opentelemetry.io/collector/processor/xprocessor v0.148.1-0.20260330144813-4d17eb8959de // indirect + go.opentelemetry.io/otel v1.42.0 // indirect + go.opentelemetry.io/otel/sdk v1.42.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/sys v0.41.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/processor/drainprocessor/go.sum b/processor/drainprocessor/go.sum new file mode 100644 index 0000000000000..69cb06f56940b --- /dev/null +++ b/processor/drainprocessor/go.sum @@ -0,0 +1,126 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro= +github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= +github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= +github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/jaeyo/go-drain3 v0.1.2 h1:fY21wgbwhzzaoRNSQ+6HVbpYw4KkAYjCFCoERYozIJ8= +github.com/jaeyo/go-drain3 v0.1.2/go.mod h1:6xr/0Dmq3BglAIZ5tDKiQiZvXevU1rE+qpfYZic9h9Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo= +github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v1.0.0 h1:mHKLJTE7iXEys6deO5p6olAiZdG5zwp8Aebir+/EaRE= +github.com/knadh/koanf/providers/confmap v1.0.0/go.mod h1:txHYHiI2hAtF0/0sCmcuol4IDcuQbKTybiB1nOcUo1A= +github.com/knadh/koanf/v2 v2.3.4 h1:fnynNSDlujWE+v83hAp8wKr/cdoxHLO0629SN+U8Urc= +github.com/knadh/koanf/v2 v2.3.4/go.mod h1:gRb40VRAbd4iJMYYD5IxZ6hfuopFcXBpc9bbQpZwo28= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/collector/component v1.54.1-0.20260330144813-4d17eb8959de h1:/Fes1k74vYLkbgLwm74bwD9pR5bg/wMZdLpM5WVdESI= +go.opentelemetry.io/collector/component v1.54.1-0.20260330144813-4d17eb8959de/go.mod h1:yUMBYsySY/sDcXm8kOzEoZxt+JLdala6hxzSW0npOxY= +go.opentelemetry.io/collector/component/componentstatus v0.148.1-0.20260330144813-4d17eb8959de h1:Vp9/YVRYTYI+byqj2RQdSvEkhlszb/RGWVNRO+qCtZA= +go.opentelemetry.io/collector/component/componentstatus v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:yqg3SpGQc22W3wGICdnb+2kZVW9daBr3+LrGUCHkKfc= +go.opentelemetry.io/collector/component/componenttest v0.148.1-0.20260330144813-4d17eb8959de h1:8I5s76A+l6yF1kdgrN4vucOgxy9HIGaenRWyBKsA3s8= +go.opentelemetry.io/collector/component/componenttest v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:1c1+6mZOmI0raoya5vA/X0F+fawEjNS6tCEs5xLATtA= +go.opentelemetry.io/collector/confmap v1.54.1-0.20260330144813-4d17eb8959de h1:6BzQVVQ74/WhzfVS5AzpkoY1xzvH/9u6HVNmRyfG3KI= +go.opentelemetry.io/collector/confmap v1.54.1-0.20260330144813-4d17eb8959de/go.mod h1:l7J/5TJMZTzG0rp1jUR7VgWvL3O5GrhazjFsrT7L/cY= +go.opentelemetry.io/collector/consumer v1.54.1-0.20260330144813-4d17eb8959de h1:87CR+dj7TLtYmsajIoKJaCA7ENB2tNyZUGxn5PNAFZw= +go.opentelemetry.io/collector/consumer v1.54.1-0.20260330144813-4d17eb8959de/go.mod h1:1PC6XINTL9DdT1bwvfMdHE72EB4RWU/WcPemUrhqKN8= +go.opentelemetry.io/collector/consumer/consumertest v0.148.1-0.20260330144813-4d17eb8959de h1:xtcrdVElSz05dmn/D1xUHvb5y4MvWGizP1oMfRBoNZE= +go.opentelemetry.io/collector/consumer/consumertest v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:wScw/OzKkf/ZzJn4ToI30OoI1kJiY16WNrcFToXSzK0= +go.opentelemetry.io/collector/consumer/xconsumer v0.148.1-0.20260330144813-4d17eb8959de h1:K2Fbu31+ee/unM1TD1KzaxIasvMd7BwpRnG5vFKAyrE= +go.opentelemetry.io/collector/consumer/xconsumer v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:bG+Wz6xmIBl/gHzq1sqvksWXqTLuTX17Wo//zIsdZpw= +go.opentelemetry.io/collector/featuregate v1.54.1-0.20260330144813-4d17eb8959de h1:vsMwjijVKs1nmNVy7zkKSh94nhWu2zs+3RqanaGcvkM= +go.opentelemetry.io/collector/featuregate v1.54.1-0.20260330144813-4d17eb8959de/go.mod h1:PS7zY/zaCb28EqciePVwRHVhc3oKortTFXsi3I6ee4g= +go.opentelemetry.io/collector/internal/componentalias v0.148.1-0.20260330144813-4d17eb8959de h1:d9G+91bRVNGpj8nrLE2UJNypm7OLHhvrnGEnziqzlDA= +go.opentelemetry.io/collector/internal/componentalias v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:uwKzfehzwRgHxdHgFXYSBHNBeWSSqsqQYGWr5fk08G0= +go.opentelemetry.io/collector/internal/testutil v0.148.0 h1:3Z9hperte3vSmbBTYeNndoEUICICrNz8hzx+v0FYXBQ= +go.opentelemetry.io/collector/internal/testutil v0.148.0/go.mod h1:Jkjs6rkqs973LqgZ0Fe3zrokQRKULYXPIf4HuqStiEE= +go.opentelemetry.io/collector/pdata v1.54.1-0.20260330144813-4d17eb8959de h1:mz5uFmTlG6VUt9he5Hhk8q5SDjebOMHb5+bKKUXTC+0= +go.opentelemetry.io/collector/pdata v1.54.1-0.20260330144813-4d17eb8959de/go.mod h1:VNRXTsvs5vOKe7ve+OYkqPI6XNmBafY6G3g9EgSPvvg= +go.opentelemetry.io/collector/pdata/pprofile v0.148.1-0.20260330144813-4d17eb8959de h1:TckluEI8y3l6/jQHMGvaE8xIlhJWbPaOA1vzgVN2PK8= +go.opentelemetry.io/collector/pdata/pprofile v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:XDWXTmjNQVCGwzbKdFpcV6brcdpzuuqRLHgL0fcyibk= +go.opentelemetry.io/collector/pdata/testdata v0.148.1-0.20260330144813-4d17eb8959de h1:E37hGp+oGF+YrBCTMIR7F9YyHFN7sojIH2HfgOsA4+c= +go.opentelemetry.io/collector/pdata/testdata v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:2rFvxm8qwd3nlO90FtJw6ZGAjt+bLndxmQuJaMO9kfQ= +go.opentelemetry.io/collector/pipeline v1.54.1-0.20260330144813-4d17eb8959de h1:trATKleajYp8DR6YHAu2XmfU7Ik2xwY1wBW156bqcYY= +go.opentelemetry.io/collector/pipeline v1.54.1-0.20260330144813-4d17eb8959de/go.mod h1:RD90NG3Jbk965Xaqym3JyHkuol4uZJjQVUkD9ddXJIs= +go.opentelemetry.io/collector/processor v1.54.1-0.20260330144813-4d17eb8959de h1:MaW7CE2zDNDp4sb8U/rZSeHx4Uy4MBwhnftWwnoTLJ4= +go.opentelemetry.io/collector/processor v1.54.1-0.20260330144813-4d17eb8959de/go.mod h1:L0lA6DZ0VbrtQBg44cmYfSpRlgm4zxW1I6QfBnRizPw= +go.opentelemetry.io/collector/processor/processorhelper v0.148.1-0.20260330144813-4d17eb8959de h1:76u0hHyboGmu/V41UJoojOzaLp8T2JCNASKzYryYWfY= +go.opentelemetry.io/collector/processor/processorhelper v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:ZuNKoLZ3jMZQ+hsA6XFz3GmH6l23eLsYSgHB0qdAf/U= +go.opentelemetry.io/collector/processor/processortest v0.148.1-0.20260330144813-4d17eb8959de h1:GmREz21zDUf74LI3u8xwUYmVVd7zNMSh9t8o4IC9gB0= +go.opentelemetry.io/collector/processor/processortest v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:E2Li2gnkUXgvApvGyEtn3Eq5KyzV05ljfbFRsZ7sTC4= +go.opentelemetry.io/collector/processor/xprocessor v0.148.1-0.20260330144813-4d17eb8959de h1:ez8oGYyBliInfDi0Xg/dkq2N34clPey7IVDBDq1U7PA= +go.opentelemetry.io/collector/processor/xprocessor v0.148.1-0.20260330144813-4d17eb8959de/go.mod h1:r7ADpSX2nf0rZR9STxh956Qw1740QOWMXLnEM/ZiaF8= +go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= +go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= +go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= +go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= +go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo= +go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts= +go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA= +go.opentelemetry.io/otel/sdk/metric v1.42.0/go.mod h1:Ua6AAlDKdZ7tdvaQKfSmnFTdHx37+J4ba8MwVCYM5hc= +go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= +go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= +go.opentelemetry.io/proto/slim/otlp v1.10.0 h1:iR97Vs/ZDR+y9TfuP9b1XBtdPWeC+OMslIBmhcLU7jM= +go.opentelemetry.io/proto/slim/otlp v1.10.0/go.mod h1:lV9250stpjYLPNA5viFabIgP2QlUGRT1GdTgAf8SIUk= +go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0 h1:RUF5rO0hAlgiJt1fzQVzcVs3vZVNHIcMLgOgG4rWNcQ= +go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0/go.mod h1:I89cynRj8y+383o7tEQVg2SVA6SRgDVIouWPUVXjx0U= +go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0 h1:CQvJSldHRUN6Z8jsUeYv8J0lXRvygALXIzsmAeCcZE0= +go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0/go.mod h1:xSQ+mEfJe/GjK1LXEyVOoSI1N9JV9ZI923X5kup43W4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/processor/drainprocessor/internal/drain/drain.go b/processor/drainprocessor/internal/drain/drain.go new file mode 100644 index 0000000000000..a3ffee7f15d4d --- /dev/null +++ b/processor/drainprocessor/internal/drain/drain.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package drain wraps the go-drain3 library behind a minimal, stable API used +// by the drain processor. All callers go through this package; the underlying +// library can be swapped without touching the processor. +// +// Thread safety: Drain is NOT goroutine-safe. Callers must serialize access +// (e.g. with a sync.Mutex). +package drain // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor/internal/drain" + +import ( + "encoding/json" + "math" + + drain3 "github.com/jaeyo/go-drain3/pkg/drain3" +) + +// Config holds parameters for the Drain parse tree. +type Config struct { + // Depth is the maximum depth of the parse tree. Minimum 3 (go-drain3 requirement). + Depth int + // SimThreshold is the similarity threshold in [0.0, 1.0]. + SimThreshold float64 + // MaxChildren is the maximum number of children per tree node. + MaxChildren int + // MaxClusters limits the number of tracked clusters via LRU eviction. + // 0 means effectively unlimited (internally mapped to math.MaxInt32). + MaxClusters int + // ExtraDelimiters are additional token delimiters beyond whitespace. + ExtraDelimiters []string +} + +// Drain wraps the go-drain3 log clustering engine. +type Drain struct { + inner *drain3.Drain +} + +// New constructs a Drain instance from the provided Config. +func New(cfg Config) (*Drain, error) { + maxClusters := cfg.MaxClusters + if maxClusters <= 0 { + // go-drain3 uses an LRU which requires a positive size; use MaxInt32 as + // "effectively unlimited" — the LRU doesn't pre-allocate so this is safe. + maxClusters = math.MaxInt32 + } + + inner, err := drain3.NewDrain( + drain3.WithDepth(int64(cfg.Depth)), + drain3.WithSimTh(cfg.SimThreshold), + drain3.WithMaxChildren(int64(cfg.MaxChildren)), + drain3.WithMaxCluster(maxClusters), + drain3.WithExtraDelimiter(cfg.ExtraDelimiters), + ) + if err != nil { + return nil, err + } + return &Drain{inner: inner}, nil +} + +// Train feeds line to the Drain tree, updating or creating a cluster. +// Returns the derived template string. +// An error is returned only on internal go-drain3 failures; callers should +// log a warning and skip annotation rather than failing the pipeline. +func (d *Drain) Train(line string) (templateStr string, err error) { + cluster, _, err := d.inner.AddLogMessage(line) + if err != nil { + return "", err + } + if cluster == nil { + // go-drain3 returned no cluster without an error; treat as unannotatable. + return "", nil + } + return cluster.GetTemplate(), nil +} + +// Match searches the existing tree for a cluster matching line without +// creating new clusters. Returns ok=false if no cluster matches. +func (d *Drain) Match(line string) (templateStr string, ok bool) { + cluster, err := d.inner.Match(line, drain3.SearchStrategyFallback) + if err != nil || cluster == nil { + return "", false + } + return cluster.GetTemplate(), true +} + +// ClusterCount returns the number of clusters currently tracked in the tree. +// Must be called with the caller's mutex held if concurrent access is possible. +func (d *Drain) ClusterCount() int { + return len(d.inner.GetClusters()) +} + +// Snapshot serializes the current tree state to JSON. +func (d *Drain) Snapshot() ([]byte, error) { + return json.Marshal(d.inner) +} + +// Load restores tree state from a previously captured Snapshot. +func (d *Drain) Load(data []byte) error { + return json.Unmarshal(data, d.inner) +} diff --git a/processor/drainprocessor/internal/drain/drain_test.go b/processor/drainprocessor/internal/drain/drain_test.go new file mode 100644 index 0000000000000..bb6fb9b7d6d4d --- /dev/null +++ b/processor/drainprocessor/internal/drain/drain_test.go @@ -0,0 +1,143 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package drain + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func defaultCfg() Config { + return Config{ + Depth: 5, + SimThreshold: 0.4, + MaxChildren: 100, + MaxClusters: 0, // unlimited + } +} + +func TestNew(t *testing.T) { + d, err := New(defaultCfg()) + require.NoError(t, err) + require.NotNil(t, d) +} + +// TestTrainSimilarLinesShareTemplate verifies that structurally similar log +// lines are merged into the same cluster and produce a template with wildcards. +// +// The first three tokens must be identical for go-drain3's prefix tree to route +// all lines to the same leaf node where similarity scoring can merge them. +func TestTrainSimilarLinesShareTemplate(t *testing.T) { + d, err := New(defaultCfg()) + require.NoError(t, err) + + // "connected to host on port " — first 3 tokens identical + lines := []string{ + "connected to host 10.0.0.1 on port 443", + "connected to host 192.168.1.1 on port 8080", + "connected to host 172.16.0.1 on port 80", + } + + var templates []string + for _, line := range lines { + tmpl, err := d.Train(line) + require.NoError(t, err) + templates = append(templates, tmpl) + } + + // The first line creates a new cluster with itself as the template; abstraction + // kicks in once a second similar line is seen. Lines 1 and 2 should share the + // same abstracted template. + assert.Equal(t, templates[1], templates[2], "lines should converge on the same template") + assert.Contains(t, templates[2], "<*>", "merged template should contain wildcard tokens") +} + +// TestTrainDistinctLinesGetDifferentClusters confirms that structurally +// unrelated lines produce separate clusters. +func TestTrainDistinctLinesGetDifferentClusters(t *testing.T) { + d, err := New(defaultCfg()) + require.NoError(t, err) + + tmpl1, err1 := d.Train("connected to host 10.0.0.1 on port 443") + require.NoError(t, err1) + tmpl2, err2 := d.Train("disk write error on device sda") + require.NoError(t, err2) + + assert.NotEqual(t, tmpl1, tmpl2, "structurally different lines should get different templates") +} + +// TestMatchAfterTemplateAbstracts verifies that Match finds an existing cluster +// once its template has been abstracted (i.e. after multiple similar lines have +// been trained). +func TestMatchAfterTemplateAbstracts(t *testing.T) { + d, err := New(defaultCfg()) + require.NoError(t, err) + + // Build the cluster with enough examples to abstract the template. + var trainTmpl string + for _, line := range []string{ + "connected to host 10.0.0.1 on port 443", + "connected to host 192.168.1.1 on port 8080", + "connected to host 172.16.0.1 on port 80", + } { + var err error + trainTmpl, err = d.Train(line) + require.NoError(t, err) + } + + // Match a new, unseen line with the same structure. + matchTmpl, ok := d.Match("connected to host 10.10.10.10 on port 9000") + require.True(t, ok, "line matching the abstracted template should be found") + assert.Equal(t, trainTmpl, matchTmpl) + assert.Contains(t, matchTmpl, "<*>") +} + +// TestMatchDoesNotCreateClusters confirms that Match on an empty tree always +// returns ok=false. +func TestMatchDoesNotCreateClusters(t *testing.T) { + d, err := New(defaultCfg()) + require.NoError(t, err) + + _, ok := d.Match("some log line with no prior training") + assert.False(t, ok) +} + +// TestSnapshotRoundtrip verifies that tree state can be serialized and restored. +func TestSnapshotRoundtrip(t *testing.T) { + d, err := New(defaultCfg()) + require.NoError(t, err) + + var trainTmpl string + for _, line := range []string{ + "connected to host 10.0.0.1 on port 443", + "connected to host 192.168.1.1 on port 8080", + "connected to host 172.16.0.1 on port 80", + } { + var trainErr error + trainTmpl, trainErr = d.Train(line) + require.NoError(t, trainErr) + } + + snap, err := d.Snapshot() + require.NoError(t, err) + require.NotEmpty(t, snap) + + d2, err := New(defaultCfg()) + require.NoError(t, err) + require.NoError(t, d2.Load(snap)) + + matchTmpl, ok := d2.Match("connected to host 10.10.10.10 on port 9000") + require.True(t, ok, "restored drain should match lines fitting the trained template") + assert.Equal(t, trainTmpl, matchTmpl) +} + +func TestUnlimitedMaxClusters(t *testing.T) { + cfg := defaultCfg() + cfg.MaxClusters = 0 + d, err := New(cfg) + require.NoError(t, err) + require.NotNil(t, d) +} diff --git a/processor/drainprocessor/internal/metadata/generated_status.go b/processor/drainprocessor/internal/metadata/generated_status.go new file mode 100644 index 0000000000000..4253de6efbd8c --- /dev/null +++ b/processor/drainprocessor/internal/metadata/generated_status.go @@ -0,0 +1,16 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/component" +) + +var ( + Type = component.MustNewType("drain") + ScopeName = "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor" +) + +const ( + LogsStability = component.StabilityLevelDevelopment +) diff --git a/processor/drainprocessor/internal/metadata/generated_telemetry.go b/processor/drainprocessor/internal/metadata/generated_telemetry.go new file mode 100644 index 0000000000000..3bf39e077ea30 --- /dev/null +++ b/processor/drainprocessor/internal/metadata/generated_telemetry.go @@ -0,0 +1,81 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + ProcessorDrainClustersActive metric.Int64Gauge + ProcessorDrainLogRecordsAnnotated metric.Int64Counter + ProcessorDrainLogRecordsUnannotated metric.Int64Counter +} + +// TelemetryBuilderOption applies changes to default builder. +type TelemetryBuilderOption interface { + apply(*TelemetryBuilder) +} + +type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) + +func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { + tbof(mb) +} + +// Shutdown unregister all registered callbacks for async instruments. +func (builder *TelemetryBuilder) Shutdown() { + builder.mu.Lock() + defer builder.mu.Unlock() + for _, reg := range builder.registrations { + reg.Unregister() + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{} + for _, op := range options { + op.apply(&builder) + } + builder.meter = Meter(settings) + var err, errs error + builder.ProcessorDrainClustersActive, err = builder.meter.Int64Gauge( + "otelcol_processor_drain_clusters_active", + metric.WithDescription("Current number of active clusters in the Drain parse tree. [Development]"), + metric.WithUnit("{clusters}"), + ) + errs = errors.Join(errs, err) + builder.ProcessorDrainLogRecordsAnnotated, err = builder.meter.Int64Counter( + "otelcol_processor_drain_log_records_annotated", + metric.WithDescription("Number of log records successfully annotated with a template. [Development]"), + metric.WithUnit("{records}"), + ) + errs = errors.Join(errs, err) + builder.ProcessorDrainLogRecordsUnannotated, err = builder.meter.Int64Counter( + "otelcol_processor_drain_log_records_unannotated", + metric.WithDescription("Number of log records not annotated (empty body, Train error, or no cluster returned by Drain). [Development]"), + metric.WithUnit("{records}"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/processor/drainprocessor/internal/metadata/generated_telemetry_test.go b/processor/drainprocessor/internal/metadata/generated_telemetry_test.go new file mode 100644 index 0000000000000..0421fa7cfdd15 --- /dev/null +++ b/processor/drainprocessor/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,73 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := componenttest.NewNopTelemetrySettings() + applied := false + _, err := NewTelemetryBuilder(set, telemetryBuilderOptionFunc(func(b *TelemetryBuilder) { + applied = true + })) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/processor/drainprocessor/internal/metadatatest/generated_telemetrytest.go b/processor/drainprocessor/internal/metadatatest/generated_telemetrytest.go new file mode 100644 index 0000000000000..aa030f7b9f44f --- /dev/null +++ b/processor/drainprocessor/internal/metadatatest/generated_telemetrytest.go @@ -0,0 +1,68 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +func NewSettings(tt *componenttest.Telemetry) processor.Settings { + set := processortest.NewNopSettings(processortest.NopType) + set.ID = component.NewID(component.MustNewType("drain")) + set.TelemetrySettings = tt.NewTelemetrySettings() + return set +} + +func AssertEqualProcessorDrainClustersActive(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_processor_drain_clusters_active", + Description: "Current number of active clusters in the Drain parse tree. [Development]", + Unit: "{clusters}", + Data: metricdata.Gauge[int64]{ + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_processor_drain_clusters_active") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualProcessorDrainLogRecordsAnnotated(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_processor_drain_log_records_annotated", + Description: "Number of log records successfully annotated with a template. [Development]", + Unit: "{records}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_processor_drain_log_records_annotated") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualProcessorDrainLogRecordsUnannotated(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_processor_drain_log_records_unannotated", + Description: "Number of log records not annotated (empty body, Train error, or no cluster returned by Drain). [Development]", + Unit: "{records}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_processor_drain_log_records_unannotated") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} diff --git a/processor/drainprocessor/internal/metadatatest/generated_telemetrytest_test.go b/processor/drainprocessor/internal/metadatatest/generated_telemetrytest_test.go new file mode 100644 index 0000000000000..f1b5d9cb3e851 --- /dev/null +++ b/processor/drainprocessor/internal/metadatatest/generated_telemetrytest_test.go @@ -0,0 +1,36 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor/internal/metadata" +) + +func TestSetupTelemetry(t *testing.T) { + testTel := componenttest.NewTelemetry() + tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) + require.NoError(t, err) + defer tb.Shutdown() + tb.ProcessorDrainClustersActive.Record(context.Background(), 1) + tb.ProcessorDrainLogRecordsAnnotated.Add(context.Background(), 1) + tb.ProcessorDrainLogRecordsUnannotated.Add(context.Background(), 1) + AssertEqualProcessorDrainClustersActive(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualProcessorDrainLogRecordsAnnotated(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualProcessorDrainLogRecordsUnannotated(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + + require.NoError(t, testTel.Shutdown(context.Background())) +} diff --git a/processor/drainprocessor/metadata.yaml b/processor/drainprocessor/metadata.yaml new file mode 100644 index 0000000000000..83043f379a296 --- /dev/null +++ b/processor/drainprocessor/metadata.yaml @@ -0,0 +1,39 @@ +type: drain +display_name: Drain Processor + +status: + class: processor + stability: + development: [logs] + distributions: [contrib] + codeowners: + active: [] + +tests: + config: + +telemetry: + metrics: + processor_drain_clusters_active: + enabled: true + description: Current number of active clusters in the Drain parse tree. + unit: "{clusters}" + gauge: + value_type: int + stability: development + processor_drain_log_records_annotated: + enabled: true + description: Number of log records successfully annotated with a template. + unit: "{records}" + sum: + value_type: int + monotonic: true + stability: development + processor_drain_log_records_unannotated: + enabled: true + description: Number of log records not annotated (empty body, Train error, or no cluster returned by Drain). + unit: "{records}" + sum: + value_type: int + monotonic: true + stability: development diff --git a/processor/drainprocessor/processor.go b/processor/drainprocessor/processor.go new file mode 100644 index 0000000000000..e199e89afc60b --- /dev/null +++ b/processor/drainprocessor/processor.go @@ -0,0 +1,233 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package drainprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor" + +import ( + "context" + "strings" + "sync" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/processor" + "go.uber.org/zap" + + internaldrain "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor/internal/drain" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor/internal/metadata" +) + +type drainProcessor struct { + config *Config + logger *zap.Logger + telemetry *metadata.TelemetryBuilder + + mu sync.Mutex + drain *internaldrain.Drain + + // warmup state — only used when WarmupMode is "buffer" + warmedUp bool + buffer []plog.Logs + bufferedCount int +} + +func newDrainProcessor(set processor.Settings, cfg *Config) (*drainProcessor, error) { + d, err := internaldrain.New(internaldrain.Config{ + Depth: cfg.LogClusterDepth, + SimThreshold: cfg.SimThreshold, + MaxChildren: cfg.MaxChildren, + MaxClusters: cfg.MaxClusters, + ExtraDelimiters: cfg.ExtraDelimiters, + }) + if err != nil { + return nil, err + } + + tel, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + + p := &drainProcessor{ + config: cfg, + logger: set.Logger, + telemetry: tel, + drain: d, + } + p.seed() + return p, nil +} + +// seed pre-populates the Drain tree from SeedTemplates and SeedLogs before any +// live log records arrive. Empty entries are skipped. Train failures are logged +// as warnings and skipped rather than aborting startup. +func (p *drainProcessor) seed() { + for _, tmpl := range p.config.SeedTemplates { + if strings.TrimSpace(tmpl) == "" { + continue + } + if _, err := p.drain.Train(tmpl); err != nil { + p.logger.Warn("failed to seed template, skipping", zap.String("template", tmpl), zap.Error(err)) + } + } + for _, line := range p.config.SeedLogs { + if strings.TrimSpace(line) == "" { + continue + } + if _, err := p.drain.Train(line); err != nil { + p.logger.Warn("failed to seed log line, skipping", zap.String("line", line), zap.Error(err)) + } + } +} + +// processLogs is the ConsumeLogs handler passed to processorhelper.NewLogs. +func (p *drainProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { + var out plog.Logs + if p.config.WarmupMode == warmupModePassthrough { + out = p.annotateAll(ctx, ld) + } else { + out = p.processBuffered(ctx, ld) + } + p.mu.Lock() + count := p.drain.ClusterCount() + p.mu.Unlock() + p.telemetry.ProcessorDrainClustersActive.Record(ctx, int64(count)) + return out, nil +} + +// processBuffered implements the "buffer" warmup mode. During warmup, batches +// are trained (tree updated) but held in memory without annotation. Once the +// flush condition is met, all buffered batches are merged, annotated with the +// now-stable templates, and returned as a single combined batch. +func (p *drainProcessor) processBuffered(ctx context.Context, ld plog.Logs) plog.Logs { + // Fast-path: warmup already complete. + p.mu.Lock() + if p.warmedUp { + p.mu.Unlock() + return p.annotateAll(ctx, ld) + } + p.mu.Unlock() + + // Train all records in this batch to update the tree and grow cluster count. + // (attributes are not set yet — we wait for the tree to stabilize) + p.trainBatch(ld) + + // Update buffer state under lock, then decide whether to flush. + p.mu.Lock() + + // Re-check warmedUp: another goroutine may have flushed between the + // fast-path check above and this lock acquisition. If so, annotateAll + // calls Train again on these records (they were already trained in + // trainBatch above). Training the same line twice is harmless — it + // reinforces the existing cluster without creating a duplicate. + if p.warmedUp { + p.mu.Unlock() + return p.annotateAll(ctx, ld) + } + + p.buffer = append(p.buffer, ld) + p.bufferedCount += ld.LogRecordCount() + + shouldFlush := p.drain.ClusterCount() >= p.config.WarmupMinClusters || + p.bufferedCount >= p.config.WarmupBufferMaxLogs + + if !shouldFlush { + p.mu.Unlock() + return plog.NewLogs() + } + + // Warmup complete — take ownership of the buffer and flip the flag. + toFlush := p.buffer + p.buffer = nil + p.bufferedCount = 0 + p.warmedUp = true + p.mu.Unlock() + + // Merge all buffered batches into one, annotating as we go. + // Re-annotating (rather than reusing the training pass) ensures records + // get the final abstracted templates from the fully-warmed tree. + merged := plog.NewLogs() + for _, batch := range toFlush { + p.annotateAll(ctx, batch) // annotate in-place + batch.ResourceLogs().MoveAndAppendTo(merged.ResourceLogs()) + } + return merged +} + +// trainBatch calls Train on every record in ld without setting any attributes. +// Used during buffer warmup to grow the Drain tree without committing templates. +func (p *drainProcessor) trainBatch(ld plog.Logs) { + rls := ld.ResourceLogs() + for i := 0; i < rls.Len(); i++ { + sls := rls.At(i).ScopeLogs() + for j := 0; j < sls.Len(); j++ { + lrs := sls.At(j).LogRecords() + for k := 0; k < lrs.Len(); k++ { + text := extractBody(lrs.At(k), p.config.BodyField) + if text == "" { + continue + } + p.mu.Lock() + _, err := p.drain.Train(text) + p.mu.Unlock() + if err != nil { + p.logger.Warn("drain Train failed during warmup buffering, skipping", zap.Error(err)) + } + } + } + } +} + +// annotateAll annotates every record in ld in-place and returns ld. +func (p *drainProcessor) annotateAll(ctx context.Context, ld plog.Logs) plog.Logs { + rls := ld.ResourceLogs() + for i := 0; i < rls.Len(); i++ { + sls := rls.At(i).ScopeLogs() + for j := 0; j < sls.Len(); j++ { + lrs := sls.At(j).LogRecords() + for k := 0; k < lrs.Len(); k++ { + p.annotate(ctx, lrs.At(k)) + } + } + } + return ld +} + +func (p *drainProcessor) annotate(ctx context.Context, lr plog.LogRecord) { + text := extractBody(lr, p.config.BodyField) + if text == "" { + p.telemetry.ProcessorDrainLogRecordsUnannotated.Add(ctx, 1) + return + } + + p.mu.Lock() + tmpl, err := p.drain.Train(text) + p.mu.Unlock() + + if err != nil { + p.logger.Warn("drain Train failed, skipping annotation", zap.Error(err)) + p.telemetry.ProcessorDrainLogRecordsUnannotated.Add(ctx, 1) + return + } + if tmpl == "" { + // go-drain3 returned no cluster; skip annotation. + p.telemetry.ProcessorDrainLogRecordsUnannotated.Add(ctx, 1) + return + } + + lr.Attributes().PutStr(p.config.TemplateAttribute, tmpl) + p.telemetry.ProcessorDrainLogRecordsAnnotated.Add(ctx, 1) +} + +// extractBody returns the text to feed to Drain for the given log record. +// If bodyField is non-empty and the body is a map, the named field is extracted. +// Falls back to the full body string representation in all other cases. +func extractBody(lr plog.LogRecord, bodyField string) string { + body := lr.Body() + if bodyField != "" && body.Type() == pcommon.ValueTypeMap { + if v, ok := body.Map().Get(bodyField); ok { + return v.AsString() + } + } + return body.AsString() +} diff --git a/processor/drainprocessor/processor_test.go b/processor/drainprocessor/processor_test.go new file mode 100644 index 0000000000000..85cd552944f73 --- /dev/null +++ b/processor/drainprocessor/processor_test.go @@ -0,0 +1,324 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package drainprocessor + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/processor/processortest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor/internal/metadata" +) + +func newTestProcessor(t *testing.T, cfg *Config) *drainProcessor { + t.Helper() + set := processortest.NewNopSettings(metadata.Type) + p, err := newDrainProcessor(set, cfg) + require.NoError(t, err) + return p +} + +func makeLogRecord(body string) plog.Logs { + ld := plog.NewLogs() + lr := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + lr.Body().SetStr(body) + lr.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return ld +} + +func makeMapBodyLogRecord(msgField, msgValue string) plog.Logs { + ld := plog.NewLogs() + lr := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + m := lr.Body().SetEmptyMap() + m.PutStr(msgField, msgValue) + m.PutStr("level", "info") + lr.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return ld +} + +func getFirstRecord(ld plog.Logs) plog.LogRecord { + return ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) +} + +// templateAttr returns the log.record.template attribute value for the first +// log record in ld, failing the test if it is absent. +func templateAttr(t *testing.T, ld plog.Logs) string { + t.Helper() + v, ok := getFirstRecord(ld).Attributes().Get("log.record.template") + require.True(t, ok, "log.record.template attribute must be set") + return v.Str() +} + +// TestAnnotatesTemplate verifies that the template attribute is set after a +// single log record is processed. +func TestAnnotatesTemplate(t *testing.T) { + p := newTestProcessor(t, createDefaultConfig().(*Config)) + + out, err := p.processLogs(t.Context(), makeLogRecord("connected to host 10.0.0.1 on port 443")) + require.NoError(t, err) + + assert.NotEmpty(t, templateAttr(t, out)) +} + +// TestSimilarLinesSameTemplate verifies that after enough similar lines have +// been processed, they all share the same abstracted template. +// +// The first 3 tokens must be identical for go-drain3's prefix tree to route +// all lines to the same leaf node. +func TestSimilarLinesSameTemplate(t *testing.T) { + p := newTestProcessor(t, createDefaultConfig().(*Config)) + + lines := []string{ + "connected to host 10.0.0.1 on port 443", + "connected to host 192.168.1.1 on port 8080", + "connected to host 172.16.0.1 on port 80", + } + + var outs []plog.Logs + for _, line := range lines { + out, err := p.processLogs(t.Context(), makeLogRecord(line)) + require.NoError(t, err) + outs = append(outs, out) + } + + // The first line creates a new cluster with itself as the template; abstraction + // kicks in once a second similar line is seen. Lines 1 and 2 should share the + // same abstracted template. + tmpl1 := templateAttr(t, outs[1]) + assert.Equal(t, tmpl1, templateAttr(t, outs[2]), "lines 1 and 2 should converge on the same template") + assert.Contains(t, tmpl1, "<*>") +} + +// TestCustomAttributeName verifies that the configured attribute key is used +// instead of the default. +func TestCustomAttributeName(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.TemplateAttribute = "my.template" + p := newTestProcessor(t, cfg) + + out, err := p.processLogs(t.Context(), makeLogRecord("connected to host 10.0.0.1")) + require.NoError(t, err) + + _, ok := getFirstRecord(out).Attributes().Get("my.template") + assert.True(t, ok, "custom template_attribute key must be used") +} + +// TestBodyFieldExtraction verifies that BodyField pulls the named field from a +// structured map body rather than using the full body string. +func TestBodyFieldExtraction(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.BodyField = "message" + p := newTestProcessor(t, cfg) + + msgs := []string{ + "connected to host 10.0.0.1 on port 443", + "connected to host 192.168.1.1 on port 8080", + "connected to host 172.16.0.1 on port 80", + } + var lastOut plog.Logs + for _, msg := range msgs { + var err error + lastOut, err = p.processLogs(t.Context(), makeMapBodyLogRecord("message", msg)) + require.NoError(t, err) + } + + tmpl := templateAttr(t, lastOut) + assert.NotContains(t, tmpl, "level", "template should be derived from the message field, not the full map") + assert.Contains(t, tmpl, "<*>", "template should be abstracted after similar lines") +} + +// TestEmptyBodySkipped verifies that empty log bodies do not receive template +// attributes. +func TestEmptyBodySkipped(t *testing.T) { + p := newTestProcessor(t, createDefaultConfig().(*Config)) + + out, err := p.processLogs(t.Context(), makeLogRecord("")) + require.NoError(t, err) + + _, ok := getFirstRecord(out).Attributes().Get("log.record.template") + assert.False(t, ok, "empty body should not produce template attribute") +} + +// TestMultipleResourceLogs verifies that records across multiple resource log +// groups are all annotated. +func TestMultipleResourceLogs(t *testing.T) { + p := newTestProcessor(t, createDefaultConfig().(*Config)) + + ld := plog.NewLogs() + for range 3 { + lr := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + lr.Body().SetStr("heartbeat ping from server") + } + + out, err := p.processLogs(t.Context(), ld) + require.NoError(t, err) + + for i := 0; i < out.ResourceLogs().Len(); i++ { + lr := out.ResourceLogs().At(i).ScopeLogs().At(0).LogRecords().At(0) + _, ok := lr.Attributes().Get("log.record.template") + assert.True(t, ok, "resource log group %d: record should have template attribute", i) + } +} + +// TestSeedTemplatesPrePopulateTree verifies that seed_templates establishes +// clusters before any live logs arrive, so the first matching live record gets +// a stable cluster ID. +func TestSeedTemplatesPrePopulateTree(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SeedTemplates = []string{ + "connected to host <*> on port <*>", + } + p := newTestProcessor(t, cfg) + + out, err := p.processLogs(t.Context(), makeLogRecord("connected to host 10.0.0.1 on port 443")) + require.NoError(t, err) + + tmpl := templateAttr(t, out) + assert.Contains(t, tmpl, "<*>", "seeded template should match the live record") +} + +// TestSeedLogsPrePopulateTree verifies that seed_logs trains the tree before +// any live logs arrive. +func TestSeedLogsPrePopulateTree(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SeedLogs = []string{ + "connected to host 10.0.0.1 on port 443", + "connected to host 192.168.1.1 on port 8080", + "connected to host 172.16.0.1 on port 80", + } + p := newTestProcessor(t, cfg) + + out, err := p.processLogs(t.Context(), makeLogRecord("connected to host 10.10.10.10 on port 9000")) + require.NoError(t, err) + + tmpl := templateAttr(t, out) + assert.Contains(t, tmpl, "<*>", "template should already be abstracted from seed logs") +} + +// TestEmptySeedEntriesSkipped verifies that blank entries in seed lists do not +// cause errors or get added to the tree. +func TestEmptySeedEntriesSkipped(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SeedTemplates = []string{"", " ", "connected to host <*> on port <*>"} + cfg.SeedLogs = []string{"", " "} + + p := newTestProcessor(t, cfg) + + out, err := p.processLogs(t.Context(), makeLogRecord("connected to host 10.0.0.1 on port 443")) + require.NoError(t, err) + assert.NotEmpty(t, templateAttr(t, out)) +} + +// --- buffer warmup tests --- + +func bufferCfg() *Config { + cfg := createDefaultConfig().(*Config) + cfg.WarmupMode = warmupModeBuffer + cfg.WarmupMinClusters = 2 + cfg.WarmupBufferMaxLogs = 100 + return cfg +} + +// TestBufferWarmupHoldsRecords verifies that records are not forwarded while +// cluster count is below WarmupMinClusters. +func TestBufferWarmupHoldsRecords(t *testing.T) { + p := newTestProcessor(t, bufferCfg()) + + out, err := p.processLogs(t.Context(), makeLogRecord("connected to host 10.0.0.1 on port 443")) + require.NoError(t, err) + assert.Equal(t, 0, out.LogRecordCount(), "records should be held in buffer during warmup") +} + +// TestBufferWarmupFlushOnMinClusters verifies that once WarmupMinClusters +// distinct templates have been observed the buffer is flushed with all records +// annotated. +func TestBufferWarmupFlushOnMinClusters(t *testing.T) { + p := newTestProcessor(t, bufferCfg()) + + batches := []string{ + "connected to host 10.0.0.1 on port 443", // cluster 1 + "disk write error on device sda", // cluster 2 — triggers flush + } + + var allOut []plog.Logs + for _, line := range batches { + out, err := p.processLogs(t.Context(), makeLogRecord(line)) + require.NoError(t, err) + allOut = append(allOut, out) + } + + assert.Equal(t, 0, allOut[0].LogRecordCount(), "first batch should be held") + assert.Equal(t, 2, allOut[1].LogRecordCount(), "flushed output should contain all buffered records") + + for i := 0; i < allOut[1].ResourceLogs().Len(); i++ { + lr := allOut[1].ResourceLogs().At(i).ScopeLogs().At(0).LogRecords().At(0) + _, ok := lr.Attributes().Get("log.record.template") + assert.True(t, ok, "record %d should have template attribute after flush", i) + } +} + +// TestBufferWarmupFlushOnCap verifies that the buffer is flushed when +// WarmupBufferMaxLogs is reached even if WarmupMinClusters has not been met. +func TestBufferWarmupFlushOnCap(t *testing.T) { + cfg := bufferCfg() + cfg.WarmupMinClusters = 1000 // unreachably high + cfg.WarmupBufferMaxLogs = 2 + p := newTestProcessor(t, cfg) + + out1, err := p.processLogs(t.Context(), makeLogRecord("connected to host 10.0.0.1 on port 443")) + require.NoError(t, err) + assert.Equal(t, 0, out1.LogRecordCount(), "first record should be buffered") + + out2, err := p.processLogs(t.Context(), makeLogRecord("disk write error on device sda")) + require.NoError(t, err) + assert.Equal(t, 2, out2.LogRecordCount(), "buffer cap reached: both records should be flushed") +} + +// TestBufferWarmupPostFlushPassthrough verifies that records processed after +// warmup ends are passed through immediately without buffering. +func TestBufferWarmupPostFlushPassthrough(t *testing.T) { + p := newTestProcessor(t, bufferCfg()) + + // Trigger warmup flush (two distinct clusters). + _, err := p.processLogs(t.Context(), makeLogRecord("connected to host 10.0.0.1 on port 443")) + require.NoError(t, err) + _, err = p.processLogs(t.Context(), makeLogRecord("disk write error on device sda")) + require.NoError(t, err) + + out, err := p.processLogs(t.Context(), makeLogRecord("user alice logged in")) + require.NoError(t, err) + assert.Equal(t, 1, out.LogRecordCount(), "post-warmup records should pass through immediately") + _, ok := getFirstRecord(out).Attributes().Get("log.record.template") + assert.True(t, ok, "post-warmup records should be annotated") +} + +// TestBufferWarmupRecordOrder verifies that flushed records maintain their +// original arrival order. +func TestBufferWarmupRecordOrder(t *testing.T) { + p := newTestProcessor(t, bufferCfg()) + + lines := []string{ + "connected to host 10.0.0.1 on port 443", + "disk write error on device sda", + } + + var lastOut plog.Logs + for _, line := range lines { + out, err := p.processLogs(t.Context(), makeLogRecord(line)) + require.NoError(t, err) + lastOut = out + } + + require.Equal(t, 2, lastOut.LogRecordCount()) + + r0 := lastOut.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + r1 := lastOut.ResourceLogs().At(1).ScopeLogs().At(0).LogRecords().At(0) + assert.Equal(t, "connected to host 10.0.0.1 on port 443", r0.Body().Str()) + assert.Equal(t, "disk write error on device sda", r1.Body().Str()) +} diff --git a/reports/distributions/contrib.yaml b/reports/distributions/contrib.yaml index c1135d64852fe..ec37887d52e97 100644 --- a/reports/distributions/contrib.yaml +++ b/reports/distributions/contrib.yaml @@ -104,6 +104,7 @@ components: - cumulativetodelta - deltatocumulative - deltatorate + - drain - filter - geoip - groupbyattrs diff --git a/versions.yaml b/versions.yaml index 9880661d39a73..52cf2d0236802 100644 --- a/versions.yaml +++ b/versions.yaml @@ -196,6 +196,7 @@ module-sets: - github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor - github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor - github.com/open-telemetry/opentelemetry-collector-contrib/processor/dnslookupprocessor + - github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor - github.com/open-telemetry/opentelemetry-collector-contrib/processor/lookupprocessor - github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor - github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor