Skip to content

Commit 6cf5aad

Browse files
committed
add drain processor for log template annotation [#47235]
Adds processor/drain, which applies the Drain log clustering algorithm to annotate log records with a derived template string and numeric cluster ID. - log.record.template (e.g. "user <*> logged in from <*>") - log.record.template.id (numeric cluster ID) Key features: - Configurable parse tree depth, similarity threshold, max clusters (LRU) - Optional body_field extraction for structured map bodies - Seeding via seed_templates or seed_logs for stable IDs across restarts - passthrough warmup mode (default) and buffer warmup mode Assisted-by: Claude Sonnet 4.6
1 parent 3783e9b commit 6cf5aad

File tree

22 files changed

+1716
-0
lines changed

22 files changed

+1716
-0
lines changed

.chloggen/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ components:
188188
- processor/cumulativetodelta
189189
- processor/deltatocumulative
190190
- processor/deltatorate
191+
- processor/drain
191192
- processor/digitaloceandetector
192193
- processor/dnslookup
193194
- processor/dynatracedetector
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: new_component
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: processor/drain
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add drain processor that applies the Drain log clustering algorithm to annotate log records with a derived template string and cluster ID.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [47235]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The processor sets `log.record.template` (e.g. `"user <*> logged in from <*>"`) and
20+
`log.record.template.id` on each log record. Downstream processors such as the filter
21+
processor can then act on those attributes to, for example, drop entire classes of
22+
noisy logs by template string.
23+
24+
Key features:
25+
- Configurable Drain parse tree parameters (depth, similarity threshold, max clusters with LRU eviction)
26+
- Optional seeding via known template strings or example log lines for stable IDs across restarts
27+
- `passthrough` warmup mode (default) and `buffer` warmup mode that holds records until the tree has stabilised
28+
29+
# If your change doesn't affect end users or the exported elements of any package,
30+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
31+
# Optional: The change log or logs in which this entry should be included.
32+
# e.g. '[user]' or '[user, api]'
33+
# Include 'user' if the change is relevant to end users.
34+
# Include 'api' if there is a change to a library API.
35+
# Default: '[user]'
36+
change_logs: [user]

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ processor/cumulativetodeltaprocessor/ @open-telemetry
199199
processor/deltatocumulativeprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams
200200
processor/deltatorateprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
201201
processor/dnslookupprocessor/ @open-telemetry/collector-contrib-approvers @andrzej-stencel @kaisecheng @edmocosta
202+
processor/drainprocessor/ @open-telemetry/collector-contrib-approvers @MikeGoldsmith
202203
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @evan-bradley @edmocosta @bogdandrutu
203204
processor/geoipprocessor/ @open-telemetry/collector-contrib-approvers @andrzej-stencel @michalpristas @rogercoll
204205
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo @amdprophet

cmd/otelcontribcol/builder-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ processors:
119119
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/coralogixprocessor v0.148.0
120120
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.148.0
121121
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor v0.148.0
122+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor v0.148.0
122123
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.148.0
123124
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor v0.148.0
124125
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.148.0

internal/tidylist/tidylist.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ processor/coralogixprocessor
215215
processor/cumulativetodeltaprocessor
216216
processor/deltatorateprocessor
217217
processor/dnslookupprocessor
218+
processor/drainprocessor
218219
processor/filterprocessor
219220
processor/geoipprocessor
220221
processor/groupbyattrsprocessor

processor/drainprocessor/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../Makefile.Common

processor/drainprocessor/README.md

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
# Drain Processor
2+
3+
| Status | |
4+
| ------------- |-----------|
5+
| Stability | [development]: logs |
6+
| Distributions | [contrib] |
7+
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fdrain%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fdrain) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fdrain%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fdrain) |
8+
9+
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
10+
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
11+
12+
The drain processor applies the [Drain log clustering algorithm](https://jiemingzhu.github.io/pub/pjhe_icws2017.pdf) to log records as they pass through the pipeline. For each record it derives a template string (e.g. `"user <*> logged in from <*>"`) and a numeric cluster ID, then attaches both as attributes on the record.
13+
14+
This processor **annotates**; it does not filter. Use the [filter processor](../filterprocessor/README.md) downstream to act on the `log.record.template` attribute — for example, to drop entire classes of noisy logs by pattern.
15+
16+
## How it works
17+
18+
Drain builds a parse tree from the token structure of log lines. Lines with similar structure are grouped into a **cluster**, and a **template** is derived by replacing variable tokens with `<*>` wildcards. As more logs arrive the templates become more accurate and stable.
19+
20+
Template IDs are numeric and local to each collector instance. They are not stable across restarts unless the tree is pre-seeded with known templates (see [Seeding](#seeding)). Use the template **string** (not the ID) for persistent filtering rules.
21+
22+
## Configuration
23+
24+
```yaml
25+
processors:
26+
drain:
27+
# Drain parse tree parameters
28+
log_cluster_depth: 4 # default: 4 (minimum: 3)
29+
sim_threshold: 0.4 # default: 0.4, range [0.0, 1.0]
30+
max_children: 100 # default: 100
31+
max_clusters: 0 # default: 0 (unlimited, LRU eviction when > 0)
32+
extra_delimiters: [] # default: [] (extra token delimiters beyond whitespace)
33+
34+
# Body extraction
35+
body_field: "" # default: "" (use full body string)
36+
37+
# Output attribute names
38+
template_attribute: "log.record.template" # default
39+
template_id_attribute: "log.record.template.id" # default
40+
41+
# Seeding (optional)
42+
seed_templates: []
43+
seed_logs: []
44+
45+
# Warmup mode
46+
warmup_mode: passthrough # default: "passthrough" | "buffer"
47+
warmup_min_clusters: 10 # default: 10 (only used when warmup_mode: buffer)
48+
warmup_buffer_max_logs: 10000 # default: 10000 (only used when warmup_mode: buffer)
49+
```
50+
51+
### Parameters
52+
53+
| Field | Type | Default | Description |
54+
|-------|------|---------|-------------|
55+
| `log_cluster_depth` | int | `4` | Max depth of the Drain parse tree. Higher values produce more specific templates. Minimum: 3. |
56+
| `sim_threshold` | float | `0.4` | Similarity threshold in [0.0, 1.0]. Lines below this threshold create a new cluster rather than merging with an existing one. |
57+
| `max_children` | int | `100` | Maximum children per parse tree node. |
58+
| `max_clusters` | int | `0` | Maximum clusters tracked. When exceeded, the least-recently-used cluster is evicted. `0` means unlimited. |
59+
| `extra_delimiters` | []string | `[]` | Additional token delimiters beyond whitespace (e.g. `[",", ":"]`). |
60+
| `body_field` | string | `""` | If set, and the log body is a structured map, the value of this top-level key is used as the text to template instead of the full body. |
61+
| `template_attribute` | string | `"log.record.template"` | Attribute key written with the derived template string. |
62+
| `template_id_attribute` | string | `"log.record.template.id"` | Attribute key written with the numeric cluster ID. |
63+
| `seed_templates` | []string | `[]` | Template strings to pre-load at startup (see [Seeding](#seeding)). |
64+
| `seed_logs` | []string | `[]` | Raw example log lines to train on at startup (see [Seeding](#seeding)). |
65+
| `warmup_mode` | string | `"passthrough"` | Controls behavior during the warmup period. `"passthrough"` (default) or `"buffer"` (see [Warmup mode](#warmup-mode)). |
66+
| `warmup_min_clusters` | int | `10` | Minimum distinct clusters before warmup ends. Only used when `warmup_mode: buffer`. |
67+
| `warmup_buffer_max_logs` | int | `10000` | Maximum records to buffer before flushing regardless of cluster count. Only used when `warmup_mode: buffer`. Must be > 0. |
68+
69+
## Seeding
70+
71+
Seeding pre-populates the Drain tree before any live logs arrive. This is the primary mechanism for stable template IDs across restarts.
72+
73+
### `seed_templates`
74+
75+
Provide known template strings directly. The processor trains on each entry at startup, establishing clusters for those patterns immediately.
76+
77+
```yaml
78+
processors:
79+
drain:
80+
seed_templates:
81+
- "user <*> logged in from <*>"
82+
- "connected to <*>"
83+
- "heartbeat ping <*>"
84+
```
85+
86+
### `seed_logs`
87+
88+
Provide raw example log lines. The processor trains on them at startup, letting Drain derive the templates itself. Useful when exact template strings are not known in advance.
89+
90+
```yaml
91+
processors:
92+
drain:
93+
seed_logs:
94+
- "user alice logged in from 10.0.0.1"
95+
- "user bob logged in from 192.168.1.1"
96+
- "connected to 10.0.0.1"
97+
```
98+
99+
Empty and whitespace-only entries in both lists are silently skipped.
100+
101+
> **Note on multi-instance deployments**: Each collector instance maintains its own independent Drain tree. Template IDs will differ between instances. Providing identical `seed_templates` across all instances produces consistent template **strings** (though IDs may still differ). Filtering rules should always match on the template string, not the ID.
102+
103+
## Warmup mode
104+
105+
### `passthrough` (default)
106+
107+
Records are annotated and forwarded immediately from the first record. Early templates may be unstable (exact log lines rather than abstracted patterns) until enough similar lines have been observed.
108+
109+
### `buffer`
110+
111+
Records are held in memory until `warmup_min_clusters` distinct templates have been observed, at which point the buffer is flushed with annotations applied using the now-stable templates. If `warmup_buffer_max_logs` is reached before the cluster threshold, the buffer is flushed anyway.
112+
113+
Use buffer mode when downstream consumers (e.g. a filter processor) must act on stable, wildcard-abstracted templates from the very first record.
114+
115+
```yaml
116+
processors:
117+
drain:
118+
warmup_mode: buffer
119+
warmup_min_clusters: 20
120+
warmup_buffer_max_logs: 5000
121+
```
122+
123+
> **Memory note**: in buffer mode, all records are held in memory until flush. Size the buffer with `warmup_buffer_max_logs` according to your available memory and expected log volume during startup.
124+
125+
## Output attributes
126+
127+
By default the processor sets two attributes on each log record:
128+
129+
| Attribute | Type | Example | Description |
130+
|-----------|------|---------|-------------|
131+
| `log.record.template` | string | `"user <*> logged in from <*>"` | The Drain-derived template string. Stable within an instance once the tree has warmed up. Use this for filtering rules. |
132+
| `log.record.template.id` | int | `3` | Numeric cluster ID. Unstable across restarts unless seeding is used. |
133+
134+
Both attribute names are configurable via `template_attribute` and `template_id_attribute`.
135+
136+
> **Semantic conventions**: `log.record.template` aligns with the proposed OTel attribute in [open-telemetry/semantic-conventions#1283](https://github.com/open-telemetry/semantic-conventions/issues/1283) and [#2064](https://github.com/open-telemetry/semantic-conventions/issues/2064). These names may be updated if a convention is formally adopted.
137+
138+
## Example pipeline
139+
140+
The following pipeline annotates logs with Drain templates and then drops known noisy patterns using the filter processor:
141+
142+
```yaml
143+
processors:
144+
drain:
145+
log_cluster_depth: 4
146+
sim_threshold: 0.4
147+
max_clusters: 500
148+
seed_templates:
149+
- "user <*> logged in from <*>"
150+
- "connected to <*>"
151+
- "heartbeat ping <*>"
152+
warmup_mode: buffer
153+
warmup_min_clusters: 20
154+
warmup_buffer_max_logs: 5000
155+
156+
filter/drop_noisy:
157+
error_mode: ignore
158+
logs:
159+
log_record:
160+
- attributes["log.record.template"] == "heartbeat ping <*>"
161+
- attributes["log.record.template"] == "connected to <*>"
162+
163+
service:
164+
pipelines:
165+
logs:
166+
receivers: [otlp]
167+
processors: [drain, filter/drop_noisy]
168+
exporters: [otlp]
169+
```
170+
171+
## `body_field`
172+
173+
`body_field` is a convenience for pipelines where the log body is a structured map and you do not have full control over how upstream processors shape it.
174+
175+
If you **do** control the pipeline, the preferred approach is a `move` operator in the filelog receiver (or equivalent) to promote the message field back to a plain string body before the drain processor sees the record:
176+
177+
```yaml
178+
operators:
179+
- type: json_parser
180+
- type: move
181+
from: body.message
182+
to: body
183+
```
184+
185+
If you **cannot** do that — for example, logs arrive via OTLP already structured — set `body_field` to the map key whose value should be fed to Drain:
186+
187+
```yaml
188+
processors:
189+
drain:
190+
body_field: "message"
191+
```
192+
193+
Given a log body `{"level": "info", "message": "user alice logged in from 10.0.0.1"}`, only the `message` value is fed to Drain. The full body is used unchanged if the field is absent or the body is not a map.
194+
195+
> **Note**: `body_field` only supports a single top-level key. Full OTTL path expressions (e.g. `body["event"]["message"]`) are not supported and are noted as a future extension.
196+
197+
## Future extensions
198+
199+
- **Snapshot persistence**: save and restore the Drain tree state across restarts, eliminating the need for seeding. This requires serialization support and is tracked as a future improvement.
200+
- **OTTL body extraction**: support full OTTL path expressions for `body_field` instead of a single top-level key name.
201+
- **Multi-instance synchronisation**: optional shared snapshot file or gossip-based tree merging for consistent templates across horizontally scaled deployments.

processor/drainprocessor/config.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package drainprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/drainprocessor"
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
)
10+
11+
// Config defines configuration for the drain processor.
12+
type Config struct {
13+
// LogClusterDepth is the max depth of the Drain parse tree.
14+
// Higher values produce more specific templates. Default: 4. Minimum: 3.
15+
LogClusterDepth int `mapstructure:"log_cluster_depth"`
16+
17+
// SimThreshold is the similarity threshold (0.0–1.0) below which a new
18+
// cluster is created rather than merged with an existing one. Default: 0.4.
19+
SimThreshold float64 `mapstructure:"sim_threshold"`
20+
21+
// MaxChildren is the maximum number of children per parse tree node.
22+
// Default: 100.
23+
MaxChildren int `mapstructure:"max_children"`
24+
25+
// MaxClusters is the maximum number of clusters tracked. When the limit is
26+
// reached, the least recently used cluster is evicted. 0 means unlimited.
27+
// Default: 0.
28+
MaxClusters int `mapstructure:"max_clusters"`
29+
30+
// ExtraDelimiters are additional token delimiters beyond whitespace.
31+
ExtraDelimiters []string `mapstructure:"extra_delimiters"`
32+
33+
// BodyField optionally specifies a top-level key to extract from a
34+
// structured (map) log body before feeding the value to Drain. If empty,
35+
// the full body string representation is used. This is a convenience for
36+
// pipelines where the body is a parsed map (e.g. after json_parser) and
37+
// the user does not have a move operator to promote the message field back
38+
// to a plain string body. Pipelines that do have that control should use a
39+
// move operator instead and leave this unset.
40+
BodyField string `mapstructure:"body_field"`
41+
42+
// TemplateAttribute is the log record attribute key to write the derived
43+
// template string to. Default: "log.record.template".
44+
TemplateAttribute string `mapstructure:"template_attribute"`
45+
46+
// TemplateIDAttribute is the log record attribute key to write the numeric
47+
// cluster ID to. Default: "log.record.template.id".
48+
TemplateIDAttribute string `mapstructure:"template_id_attribute"`
49+
50+
// SeedTemplates is a list of pre-known template strings to train on at
51+
// startup before any live logs arrive. Improves template stability across
52+
// restarts for known log patterns.
53+
SeedTemplates []string `mapstructure:"seed_templates"`
54+
55+
// SeedLogs is a list of raw example log lines to train on at startup.
56+
// Drain derives templates from these lines itself.
57+
SeedLogs []string `mapstructure:"seed_logs"`
58+
59+
// WarmupMode controls processor behavior during the initial period before
60+
// the Drain tree has stabilized. Valid values: "passthrough" (default),
61+
// "buffer".
62+
WarmupMode string `mapstructure:"warmup_mode"`
63+
64+
// WarmupMinClusters is the number of distinct clusters that must be
65+
// observed before warmup ends. Only used when WarmupMode is "buffer".
66+
// Default: 10.
67+
WarmupMinClusters int `mapstructure:"warmup_min_clusters"`
68+
69+
// WarmupBufferMaxLogs is the maximum number of log records to buffer
70+
// during warmup before flushing regardless of cluster count. Only used
71+
// when WarmupMode is "buffer". Must be > 0. Default: 10000.
72+
WarmupBufferMaxLogs int `mapstructure:"warmup_buffer_max_logs"`
73+
}
74+
75+
const (
76+
warmupModePassthrough = "passthrough"
77+
warmupModeBuffer = "buffer"
78+
)
79+
80+
// Validate checks the Config for invalid values.
81+
func (cfg *Config) Validate() error {
82+
if cfg.LogClusterDepth < 3 {
83+
return fmt.Errorf("log_cluster_depth must be >= 3, got %d", cfg.LogClusterDepth)
84+
}
85+
if cfg.SimThreshold < 0.0 || cfg.SimThreshold > 1.0 {
86+
return fmt.Errorf("sim_threshold must be in [0.0, 1.0], got %f", cfg.SimThreshold)
87+
}
88+
if cfg.WarmupMode != warmupModePassthrough && cfg.WarmupMode != warmupModeBuffer {
89+
return fmt.Errorf("warmup_mode must be %q or %q, got %q", warmupModePassthrough, warmupModeBuffer, cfg.WarmupMode)
90+
}
91+
if cfg.WarmupMode == warmupModeBuffer && cfg.WarmupMinClusters <= 0 {
92+
return errors.New("warmup_min_clusters must be > 0 when warmup_mode is \"buffer\"")
93+
}
94+
if cfg.WarmupMode == warmupModeBuffer && cfg.WarmupBufferMaxLogs <= 0 {
95+
return errors.New("warmup_buffer_max_logs must be > 0 when warmup_mode is \"buffer\"")
96+
}
97+
return nil
98+
}

0 commit comments

Comments
 (0)