Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions changelog/fragments/1765562940-add-failure-store-metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user's deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Add `events.failure_store` metric to track events sent to Elasticsearch failure store

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: all

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/beats/pull/48068

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/beats/issues/47164
1 change: 1 addition & 0 deletions docs/reference/auditbeat/understand-auditbeat-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ The following tables explain the meaning of the most important fields under `.mo
| `.output.events.failed` | Integer | Number of events that Auditbeat tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it’s useful to check Auditbeat’s logs right before this log entry’s `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later. |
| `.output.events.dropped` | Integer | Number of events that Auditbeat gave up sending to the output destination because of a permanent (non-retryable) error. |
| `.output.events.dead_letter` | Integer | Number of events that Auditbeat successfully sent to a configured dead letter index after they failed to ingest in the primary index. |
| `.output.events.failure_store` | Integer | Number of events that were sent to the failure store. The failure store is a feature in Elasticsearch data streams that stores events that fail mapping or ingestion. Events sent to the failure store are still counted as acknowledged. | This metric indicates how many events encountered mapping or ingestion errors but were successfully stored in the failure store. A non-zero value suggests there may be mapping issues or data type mismatches that need to be addressed. |
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs. |

| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints |
Expand Down
1 change: 1 addition & 0 deletions docs/reference/filebeat/understand-filebeat-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ The following tables explain the meaning of the most important fields under `.mo
| `.output.events.failed` | Integer | Number of events that Filebeat tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it’s useful to check Filebeat’s logs right before this log entry’s `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later. |
| `.output.events.dropped` | Integer | Number of events that Filebeat gave up sending to the output destination because of a permanent (non-retryable) error. |
| `.output.events.dead_letter` | Integer | Number of events that Filebeat successfully sent to a configured dead letter index after they failed to ingest in the primary index. |
| `.output.events.failure_store` | Integer | Number of events that were sent to the failure store. The failure store is a feature in Elasticsearch data streams that stores events that fail mapping or ingestion. Events sent to the failure store are still counted as acknowledged. | This metric indicates how many events encountered mapping or ingestion errors but were successfully stored in the failure store. A non-zero value suggests there may be mapping issues or data type mismatches that need to be addressed. |
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, Redis, and Logstash outputs. | These latency statistics are calculated over the lifetime of the connection. For long-lived connections, the average value will stabilize, making it less sensitive to short-term disruptions. |

| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints |
Expand Down
1 change: 1 addition & 0 deletions docs/reference/heartbeat/understand-heartbeat-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ The following tables explain the meaning of the most important fields under `.mo
| `.output.events.failed` | Integer | Number of events that Heartbeat tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it’s useful to check Heartbeat’s logs right before this log entry’s `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later. |
| `.output.events.dropped` | Integer | Number of events that Heartbeat gave up sending to the output destination because of a permanent (non-retryable) error. |
| `.output.events.dead_letter` | Integer | Number of events that Heartbeat successfully sent to a configured dead letter index after they failed to ingest in the primary index. |
| `.output.events.failure_store` | Integer | Number of events that were sent to the failure store. The failure store is a feature in Elasticsearch data streams that stores events that fail mapping or ingestion. Events sent to the failure store are still counted as acknowledged. | This metric indicates how many events encountered mapping or ingestion errors but were successfully stored in the failure store. A non-zero value suggests there may be mapping issues or data type mismatches that need to be addressed. |
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs. |

| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints |
Expand Down
1 change: 1 addition & 0 deletions docs/reference/metricbeat/understand-metricbeat-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ The following tables explain the meaning of the most important fields under `.mo
| `.output.events.failed` | Integer | Number of events that Metricbeat tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it’s useful to check Metricbeat’s logs right before this log entry’s `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later. |
| `.output.events.dropped` | Integer | Number of events that Metricbeat gave up sending to the output destination because of a permanent (non-retryable) error. |
| `.output.events.dead_letter` | Integer | Number of events that Metricbeat successfully sent to a configured dead letter index after they failed to ingest in the primary index. |
| `.output.events.failure_store` | Integer | Number of events that were sent to the failure store. The failure store is a feature in Elasticsearch data streams that stores events that fail mapping or ingestion. Events sent to the failure store are still counted as acknowledged. | This metric indicates how many events encountered mapping or ingestion errors but were successfully stored in the failure store. A non-zero value suggests there may be mapping issues or data type mismatches that need to be addressed. |
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs. |

| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints |
Expand Down
1 change: 1 addition & 0 deletions docs/reference/packetbeat/understand-packetbeat-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ The following tables explain the meaning of the most important fields under `.mo
| `.output.events.failed` | Integer | Number of events that Packetbeat tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it’s useful to check Packetbeat’s logs right before this log entry’s `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later. |
| `.output.events.dropped` | Integer | Number of events that Packetbeat gave up sending to the output destination because of a permanent (non-retryable) error. |
| `.output.events.dead_letter` | Integer | Number of events that Packetbeat successfully sent to a configured dead letter index after they failed to ingest in the primary index. |
| `.output.events.failure_store` | Integer | Number of events that were sent to the failure store. The failure store is a feature in Elasticsearch data streams that stores events that fail mapping or ingestion. Events sent to the failure store are still counted as acknowledged. | This metric indicates how many events encountered mapping or ingestion errors but were successfully stored in the failure store. A non-zero value suggests there may be mapping issues or data type mismatches that need to be addressed. |
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs. |

| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints |
Expand Down
1 change: 1 addition & 0 deletions docs/reference/winlogbeat/understand-winlogbeat-logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ The following tables explain the meaning of the most important fields under `.mo
| `.output.events.failed` | Integer | Number of events that Winlogbeat tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it’s useful to check Winlogbeat’s logs right before this log entry’s `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later. |
| `.output.events.dropped` | Integer | Number of events that Winlogbeat gave up sending to the output destination because of a permanent (non-retryable) error. |
| `.output.events.dead_letter` | Integer | Number of events that Winlogbeat successfully sent to a configured dead letter index after they failed to ingest in the primary index. |
| `.output.events.failure_store` | Integer | Number of events that were sent to the failure store. The failure store is a feature in Elasticsearch data streams that stores events that fail mapping or ingestion. Events sent to the failure store are still counted as acknowledged. | This metric indicates how many events encountered mapping or ingestion errors but were successfully stored in the failure store. A non-zero value suggests there may be mapping issues or data type mismatches that need to be addressed. |
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs. |

| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints |
Expand Down
51 changes: 32 additions & 19 deletions libbeat/outputs/elasticsearch/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
errUnexpectedEmptyObject = errors.New("empty object")
errExpectedObjectEnd = errors.New("expected end of object")

nameItems = []byte("items")
nameStatus = []byte("status")
nameError = []byte("error")
nameItems = []byte("items")
nameStatus = []byte("status")
nameError = []byte("error")
nameFailureStore = []byte("failure_store")
)

// bulkReadToItems reads the bulk response up to (but not including) items
Expand All @@ -58,7 +59,7 @@
break
}

reader.ignoreNext()

Check failure on line 62 in libbeat/outputs/elasticsearch/bulk.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `reader.ignoreNext` is not checked (errcheck)
}

// check items field is an array
Expand All @@ -70,52 +71,53 @@
}

// bulkReadItemStatus reads the status and error fields from the bulk item
func bulkReadItemStatus(logger *logp.Logger, reader *jsonReader) (int, []byte, error) {
func bulkReadItemStatus(logger *logp.Logger, reader *jsonReader) (int, []byte, bool, error) {
// skip outer dictionary
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
return 0, nil, false, errExpectedItemObject
}

// find first field in outer dictionary (e.g. 'create')
kind, _, err := reader.nextFieldName()
if err != nil {
logger.Errorf("Failed to parse bulk response item: %s", err)
return 0, nil, err
return 0, nil, false, err
}
if kind == dictEnd {
err = errUnexpectedEmptyObject
logger.Errorf("Failed to parse bulk response item: %s", err)
return 0, nil, err
return 0, nil, false, err
}

// parse actual item response code and error message
status, msg, err := itemStatusInner(reader, logger)
status, msg, failureStoreUsed, err := itemStatusInner(reader, logger)
if err != nil {
logger.Errorf("Failed to parse bulk response item: %s", err)
return 0, nil, err
return 0, nil, false, err
}

// close dictionary. Expect outer dictionary to have only one element
kind, _, err = reader.step()
if err != nil {
logger.Errorf("Failed to parse bulk response item: %s", err)
return 0, nil, err
return 0, nil, false, err
}
if kind != dictEnd {
err = errExpectedObjectEnd
logger.Errorf("Failed to parse bulk response item: %s", err)
return 0, nil, err
return 0, nil, false, err
}

return status, msg, nil
return status, msg, failureStoreUsed, nil
}

func itemStatusInner(reader *jsonReader, logger *logp.Logger) (int, []byte, error) {
func itemStatusInner(reader *jsonReader, logger *logp.Logger) (int, []byte, bool, error) {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
return 0, nil, false, errExpectedItemObject
}

status := -1
failureStoreUsed := false
var msg []byte
for {
kind, name, err := reader.nextFieldName()
Expand All @@ -131,25 +133,36 @@
status, err = reader.nextInt()
if err != nil {
logger.Errorf("Failed to parse bulk response item: %s", err)
return 0, nil, err
return 0, nil, false, err
}

case bytes.Equal(name, nameError): // name == "error"
msg, err = reader.ignoreNext() // collect raw string for "error" field
if err != nil {
return 0, nil, err
return 0, nil, false, err
}

case bytes.Equal(name, nameFailureStore):
msg, err := reader.ignoreNext()
if err != nil {
return 0, nil, false, err
}

if bytes.Equal(msg, []byte(`"used"`)) {
failureStoreUsed = true
}

default: // ignore unknown fields
_, err = reader.ignoreNext()
if err != nil {
return 0, nil, err
return 0, nil, false, err
}
}
}

if status < 0 {
return 0, nil, errExpectedStatusCode
return 0, nil, false, errExpectedStatusCode
}
return status, msg, nil

return status, msg, failureStoreUsed, nil
}
60 changes: 58 additions & 2 deletions libbeat/outputs/elasticsearch/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

// This file was contributed to by generative AI

//go:build !integration

package elasticsearch
Expand Down Expand Up @@ -76,7 +78,7 @@ func TestBulkReadItemStatus(t *testing.T) {
logger := logptest.NewTestingLogger(t, "")

reader := newJSONReader(response)
code, _, err := bulkReadItemStatus(logger, reader)
code, _, _, err := bulkReadItemStatus(logger, reader)
assert.NoError(t, err)
assert.Equal(t, 200, code)
}
Expand Down Expand Up @@ -135,6 +137,60 @@ func TestES2StyleExtendedErrorStatus(t *testing.T) {

func readStatusItem(in []byte, logger *logp.Logger) (int, string, error) {
reader := newJSONReader(in)
code, msg, err := bulkReadItemStatus(logger, reader)
code, msg, _, err := bulkReadItemStatus(logger, reader)
return code, string(msg), err
}

func TestBulkReadItemStatusWithFailureStore(t *testing.T) {
tests := []struct {
name string
response []byte
expectedStatus int
expectedFailure bool
}{
{
name: "failure_store used",
response: []byte(`{"create": {"status": 200, "failure_store": "used"}}`),
expectedStatus: 200,
expectedFailure: true,
},
{
name: "no failure_store field",
response: []byte(`{"create": {"status": 200}}`),
expectedStatus: 200,
expectedFailure: false,
},
{
name: "failure_store not used",
response: []byte(`{"create": {"status": 200, "failure_store": "not_used"}}`),
expectedStatus: 200,
expectedFailure: false,
},
{
name: "failure_store used with error",
response: []byte(`{"create": {"status": 400, "error": "mapping error", "failure_store": "used"}}`),
expectedStatus: 400,
expectedFailure: true,
},
{
name: "failure_store used with index operation",
response: []byte(`{"index": {"status": 201, "failure_store": "used"}}`),
expectedStatus: 201,
expectedFailure: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := logptest.NewTestingLogger(t, "")
reader := newJSONReader(tt.response)
status, msg, failureStoreUsed, err := bulkReadItemStatus(logger, reader)
assert.NoError(t, err)
assert.Equal(t, tt.expectedStatus, status)
assert.Equal(t, tt.expectedFailure, failureStoreUsed)
if tt.expectedStatus >= 400 {
assert.NotEmpty(t, msg)
}
})
}
}
Loading
Loading