Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions _data-prepper/pipelines/configuration/processors/add-entries.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,146 @@ The processed event will contain the following data:
```json
{"message": ["hello", "world"]}
```

## Complete example

The following pipeline is used to:

1. Add `app_id` using a format string ${app}-${env}.
2. Add `message_len` with `length(/message)`.
3. Add metadata key `msg_len_meta` with `length(/message)`.
4. If `/metric/name` and `/metric/value` exist, create a new field named after `/metric/name` and set its value to `/metric/value`.
5. If `/level == "error"`, add `severity: "high"`.
6. Add `"ingested"` to `tags`.
7. Set `env_normalized: "prod"` and overwrite it if it already exists.

```yaml
example-pipeline:
source:
http:
port: 2021
path: /events
ssl: false

processor:
- add_entries:
entries:
- key: "app_id"
format: "${app}-${env}"

- key: "message_len"
value_expression: "length(/message)"

- metadata_key: "msg_len_meta"
value_expression: "length(/message)"

# dynamic key from the event, only when both metric fields exist
- key: "${/metric/name}"
value_expression: "/metric/value"
add_when: "/metric/name != null and /metric/value != null"

# set severity ONLY on error level
- key: "severity"
value: "high"
add_when: '/level == "error"'

# append behavior: if tags already exists, it becomes/extends an array
- key: "tags"
value: "ingested"
append_if_key_exists: true

# overwrite behavior
- key: "env_normalized"
value: "prod"
overwrite_if_key_exists: true

sink:
- opensearch:
hosts: ["https://opensearch:9200"]
insecure: true
username: admin
password: "admin_pass"
index_type: custom
index: "example-%{yyyy.MM.dd}"
```
{% include copy.html %}

You can test this pipeline by executing the following command:

```bash
curl -sS -X POST "http://localhost:2021/events" \
-H "Content-Type: application/json" \
-d '[
{"app":"shop","env":"dev","message":"hello","level":"info","metric":{"name":"cpu","value":42}},
{"app":"shop","env":"prod","message":"boom","level":"error"},
{"app":"api","env":"stage","message":"hi","level":"warn","metric":{"name":"mem","value":2048},"tags":"pretag"}
]'
```
{% include copy.html %}

Once these documents are processed, the index `example-2025.10.10` will contain the following documents:

```json
"hits": [
{
"_index": "example-2025.10.10",
"_id": "BnvWzpkBTMZ443JmHuHI",
"_score": 1,
"_source": {
"app": "shop",
"env": "dev",
"message": "hello",
"level": "info",
"metric": {
"name": "cpu",
"value": 42
},
"app_id": "shop-dev",
"message_len": 5,
"cpu": 42,
"tags": "ingested",
"env_normalized": "prod"
}
},
{
"_index": "example-2025.10.10",
"_id": "B3vWzpkBTMZ443JmHuHI",
"_score": 1,
"_source": {
"app": "shop",
"env": "prod",
"message": "boom",
"level": "error",
"app_id": "shop-prod",
"message_len": 4,
"severity": "high",
"tags": "ingested",
"env_normalized": "prod"
}
},
{
"_index": "example-2025.10.10",
"_id": "CHvWzpkBTMZ443JmHuHI",
"_score": 1,
"_source": {
"app": "api",
"env": "stage",
"message": "hi",
"level": "warn",
"metric": {
"name": "mem",
"value": 2048
},
"tags": [
"pretag",
"ingested"
],
"app_id": "api-stage",
"message_len": 2,
"mem": 2048,
"env_normalized": "prod"
}
}
]

```