-
Notifications
You must be signed in to change notification settings - Fork 616
adding examples to drop_event processor in data prepper #11225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,14 +8,296 @@ nav_order: 130 | |
|
||
# Drop events processor | ||
|
||
|
||
The `drop_events` processor drops all the events that are passed into it. The following table describes when events are dropped and how exceptions for dropping events are handled. | ||
|
||
Option | Required | Type | Description | ||
:--- | :--- | :--- | :--- | ||
drop_when | Yes | String | Accepts an OpenSearch Data Prepper expression string following the [expression syntax]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/expression-syntax/). Configuring `drop_events` with `drop_when: true` drops all the events received. | ||
handle_failed_events | No | Enum | Specifies how exceptions are handled when an exception occurs while evaluating an event. Default value is `drop`, which drops the event so that it is not sent to OpenSearch. Available options are `drop`, `drop_silently`, `skip`, and `skip_silently`. For more information, see [handle_failed_events](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/drop-events-processor#handle_failed_events). | ||
handle_failed_events | No | Enum | Specifies how exceptions are handled when an exception occurs while evaluating an event. Default value is `drop`, which drops the event so that it is not sent to OpenSearch. Available options are: <br> - `drop`: The event will be dropped and a warning will be logged.<br> - `drop_silently`: The event will be dropped without warning. <br> - `skip`: The event will not be dropped and a warning will be logged. <br> - `skip_silently`: The event will not be dropped and no warning will be logged.<br>For more information, see [handle_failed_events](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/drop-events-processor#handle_failed_events). | ||
|
||
## Examples | ||
|
||
The following are examples of possible pipeline configurations using `drop_events` processors. | ||
|
||
### Filter out debug logs | ||
|
||
This example configuration demonstrates filtering out `DEBUG` level logs to reduce noise and storage costs while allowing `INFO`, `WARN`, and `ERROR` events through. | ||
|
||
```yaml | ||
filter-debug-logs-pipeline: | ||
source: | ||
http: | ||
port: 2021 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not include the port configuration. 2021 is the default. |
||
path: /events | ||
ssl: false | ||
|
||
processor: | ||
- drop_events: | ||
drop_when: '/level == "DEBUG"' | ||
handle_failed_events: drop | ||
|
||
sink: | ||
- opensearch: | ||
hosts: ["https://opensearch:9200"] | ||
insecure: true | ||
username: admin | ||
password: "admin_pass" | ||
index_type: custom | ||
index: "filtered-logs-%{yyyy.MM.dd}" | ||
``` | ||
{% include copy.html %} | ||
|
||
You can test this pipeline using the following command: | ||
|
||
```bash | ||
curl -sS -X POST "http://localhost:2021/events" \ | ||
-H "Content-Type: application/json" \ | ||
-d '[ | ||
{"level": "DEBUG", "message": "Database connection established", "service": "user-service", "timestamp": "2023-10-13T14:30:45Z"}, | ||
{"level": "INFO", "message": "User login successful", "service": "user-service", "timestamp": "2023-10-13T14:31:00Z"}, | ||
{"level": "ERROR", "message": "Database connection failed", "service": "user-service", "timestamp": "2023-10-13T14:32:00Z"}, | ||
{"level": "WARN", "message": "Cache miss detected", "service": "user-service", "timestamp": "2023-10-13T14:33:00Z"} | ||
]' | ||
``` | ||
{% include copy.html %} | ||
|
||
Only three documents get indexed in OpenSearch: | ||
|
||
```json | ||
{ | ||
... | ||
"hits": { | ||
"total": { | ||
"value": 3, | ||
"relation": "eq" | ||
}, | ||
"max_score": 1, | ||
"hits": [ | ||
{ | ||
"_index": "filtered-logs-2025.10.13", | ||
"_id": "1zsA3pkBWoWOQ0uhY3EJ", | ||
"_score": 1, | ||
"_source": { | ||
"level": "INFO", | ||
"message": "User login successful", | ||
"service": "user-service", | ||
"timestamp": "2023-10-13T14:31:00Z" | ||
} | ||
}, | ||
{ | ||
"_index": "filtered-logs-2025.10.13", | ||
"_id": "2DsA3pkBWoWOQ0uhY3EJ", | ||
"_score": 1, | ||
"_source": { | ||
"level": "ERROR", | ||
"message": "Database connection failed", | ||
"service": "user-service", | ||
"timestamp": "2023-10-13T14:32:00Z" | ||
} | ||
}, | ||
{ | ||
"_index": "filtered-logs-2025.10.13", | ||
"_id": "2TsA3pkBWoWOQ0uhY3EJ", | ||
"_score": 1, | ||
"_source": { | ||
"level": "WARN", | ||
"message": "Cache miss detected", | ||
"service": "user-service", | ||
"timestamp": "2023-10-13T14:33:00Z" | ||
} | ||
} | ||
] | ||
} | ||
} | ||
``` | ||
|
||
### Multi condition event filtering | ||
|
||
The following example shows how to drop events based on multiple criteria, such as debug logs, error status codes, and missing user IDs, ensuring only valid and important events reach OpenSearch. | ||
|
||
```yaml | ||
multi-condition-filter-pipeline: | ||
source: | ||
http: | ||
port: 2022 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not include the port configuration and use the 2021 default. It is not likely that people are running these at the same time. |
||
path: /events | ||
ssl: false | ||
|
||
processor: | ||
- drop_events: | ||
drop_when: '/level == "DEBUG" or /status_code >= 400 or /user_id == null' | ||
handle_failed_events: drop_silently | ||
|
||
- date: | ||
from_time_received: true | ||
destination: "@timestamp" | ||
|
||
sink: | ||
- opensearch: | ||
hosts: ["https://opensearch:9200"] | ||
insecure: true | ||
username: admin | ||
password: "admin_pass" | ||
index_type: custom | ||
index: "filtered-events-%{yyyy.MM.dd}" | ||
``` | ||
{% include copy.html %} | ||
|
||
You can test this pipeline using the following command: | ||
|
||
```bash | ||
curl -sS -X POST "http://localhost:2022/events" \ | ||
-H "Content-Type: application/json" \ | ||
-d '[ | ||
{"level": "DEBUG", "message": "Cache hit", "status_code": 200, "user_id": "user123", "timestamp": "2023-10-13T14:33:00Z"}, | ||
{"level": "INFO", "message": "Request failed", "status_code": 500, "user_id": "user123", "timestamp": "2023-10-13T14:34:00Z"}, | ||
{"level": "INFO", "message": "Anonymous request", "status_code": 200, "timestamp": "2023-10-13T14:35:00Z"}, | ||
{"level": "INFO", "message": "User request processed", "status_code": 200, "user_id": "user123", "timestamp": "2023-10-13T14:36:00Z"}, | ||
{"level": "INFO", "message": "Another valid request", "status_code": 201, "user_id": "user456", "timestamp": "2023-10-13T14:37:00Z"} | ||
]' | ||
``` | ||
{% include copy.html %} | ||
|
||
Only two documents reach OpenSearch: | ||
|
||
```json | ||
{ | ||
... | ||
"hits": { | ||
"total": { | ||
"value": 2, | ||
"relation": "eq" | ||
}, | ||
"max_score": 1, | ||
"hits": [ | ||
{ | ||
"_index": "filtered-events-2025.10.13", | ||
"_id": "7DsB3pkBWoWOQ0uhmHH4", | ||
"_score": 1, | ||
"_source": { | ||
"level": "INFO", | ||
"message": "User request processed", | ||
"status_code": 200, | ||
"user_id": "user123", | ||
"timestamp": "2023-10-13T14:36:00Z", | ||
"@timestamp": "2025-10-13T14:37:47.636Z" | ||
} | ||
}, | ||
{ | ||
"_index": "filtered-events-2025.10.13", | ||
"_id": "7TsB3pkBWoWOQ0uhmHH4", | ||
"_score": 1, | ||
"_source": { | ||
"level": "INFO", | ||
"message": "Another valid request", | ||
"status_code": 201, | ||
"user_id": "user456", | ||
"timestamp": "2023-10-13T14:37:00Z", | ||
"@timestamp": "2025-10-13T14:37:47.636Z" | ||
} | ||
} | ||
] | ||
} | ||
} | ||
``` | ||
|
||
### Intelligent data sampling | ||
|
||
The following example demonstrates how to implement sampling strategies that drop high volume traffic based on request ID patterns and internal IP addresses, to manage data volume while preserving representative samples: | ||
|
||
```yaml | ||
sampling-pipeline: | ||
source: | ||
http: | ||
port: 2023 | ||
path: /events | ||
ssl: false | ||
|
||
processor: | ||
# Sample based on request_id being in specific sets | ||
- drop_events: | ||
drop_when: '/sampling_rate > 0.4 and /request_id not in {1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000}' | ||
handle_failed_events: skip | ||
|
||
- drop_events: | ||
drop_when: '/source_ip =~ "^192.168.*"' | ||
handle_failed_events: skip | ||
|
||
sink: | ||
- opensearch: | ||
hosts: ["https://opensearch:9200"] | ||
insecure: true | ||
username: admin | ||
password: "admin_pass" | ||
index_type: custom | ||
index: "sampled-events-%{yyyy.MM.dd}" | ||
``` | ||
{% include copy.html %} | ||
|
||
You can test this pipeline using the following command: | ||
|
||
```bash | ||
curl -sS -X POST "http://localhost:2023/events" \ | ||
-H "Content-Type: application/json" \ | ||
-d '[ | ||
{"request_id": 12345, "sampling_rate": 0.9, "source_ip": "10.0.0.1", "message": "High volume request - dropped", "timestamp": "2023-10-13T14:37:00Z"}, | ||
{"request_id": 5000, "sampling_rate": 0.9, "source_ip": "10.0.0.1", "message": "High volume request - sampled", "timestamp": "2023-10-13T14:37:30Z"}, | ||
{"request_id": 12346, "sampling_rate": 0.6, "source_ip": "192.168.1.100", "message": "Internal request - dropped", "timestamp": "2023-10-13T14:38:00Z"}, | ||
{"request_id": 12347, "sampling_rate": 0.3, "source_ip": "203.0.113.45", "message": "External request - passed", "timestamp": "2023-10-13T14:39:00Z"}, | ||
{"request_id": 1000, "sampling_rate": 0.9, "source_ip": "10.0.0.2", "message": "Another sampled request", "timestamp": "2023-10-13T14:40:00Z"} | ||
]' | ||
``` | ||
{% include copy.html %} | ||
|
||
<!---## Configuration | ||
Three documents are indexed in OpenSearch: | ||
|
||
Content will be added to this section.---> | ||
```json | ||
{ | ||
... | ||
"hits": { | ||
"total": { | ||
"value": 3, | ||
"relation": "eq" | ||
}, | ||
"max_score": 1, | ||
"hits": [ | ||
{ | ||
"_index": "sampled-events-2025.10.13", | ||
"_id": "i2Ej3pkBh3nNS_N9KazV", | ||
"_score": 1, | ||
"_source": { | ||
"request_id": 5000, | ||
"sampling_rate": 0.9, | ||
"source_ip": "10.0.0.1", | ||
"message": "High volume request - sampled", | ||
"timestamp": "2023-10-13T14:37:30Z" | ||
} | ||
}, | ||
{ | ||
"_index": "sampled-events-2025.10.13", | ||
"_id": "jGEj3pkBh3nNS_N9KazV", | ||
"_score": 1, | ||
"_source": { | ||
"request_id": 12347, | ||
"sampling_rate": 0.3, | ||
"source_ip": "203.0.113.45", | ||
"message": "External request - passed", | ||
"timestamp": "2023-10-13T14:39:00Z" | ||
} | ||
}, | ||
{ | ||
"_index": "sampled-events-2025.10.13", | ||
"_id": "jWEj3pkBh3nNS_N9KazV", | ||
"_score": 1, | ||
"_source": { | ||
"request_id": 1000, | ||
"sampling_rate": 0.9, | ||
"source_ip": "10.0.0.2", | ||
"message": "Another sampled request", | ||
"timestamp": "2023-10-13T14:40:00Z" | ||
} | ||
} | ||
] | ||
} | ||
} | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data Prepper can send to multiple sinks. Also, it won't go to any other processors after this. It would be more accurate to say: