Skip to content

Commit 68ec6a5

Browse files
committed
Merge branch 'main' of github.com:elastic/beats into 47164-add-failure-store-count
2 parents e13f928 + 5d8d824 commit 68ec6a5

File tree

17 files changed

+902
-96
lines changed

17 files changed

+902
-96
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Improving input error reporting to Elastic Agent, specially when pipeline configurations are incorrect.
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: filebeat
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/beats/pull/47905
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/elastic/beats/issues/45649
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Make beats receivers emit status for their subcomponents
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: all
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: feature
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: Upgrade osquery to 5.19.0
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: osquerybeat
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
# issue: https://github.com/owner/repo/1234

filebeat/input/filestream/internal/input-logfile/harvester.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
2929
inputv2 "github.com/elastic/beats/v7/filebeat/input/v2"
3030
"github.com/elastic/beats/v7/libbeat/beat"
31+
"github.com/elastic/beats/v7/libbeat/management/status"
3132
"github.com/elastic/elastic-agent-libs/logp"
3233
"github.com/elastic/go-concert/ctxtool"
3334
)
@@ -129,6 +130,7 @@ type defaultHarvesterGroup struct {
129130
tg *task.Group
130131
metrics *Metrics
131132
notifyChan chan HarvesterStatus
133+
inputID string
132134
}
133135

134136
// HarvesterStatus is used to notify an observer that the harvester for the ID
@@ -160,7 +162,7 @@ func (hg *defaultHarvesterGroup) Start(ctx inputv2.Context, src Source) {
160162
sourceName := hg.identifier.ID(src)
161163
ctx.Logger = ctx.Logger.With("source_file", sourceName)
162164

163-
if err := hg.tg.Go(startHarvester(ctx, hg, src, false, hg.metrics)); err != nil {
165+
if err := hg.tg.Go(startHarvester(ctx, hg, src, false, hg.metrics, hg.inputID)); err != nil {
164166
ctx.Logger.Warnf(
165167
"tried to start harvester for %s with task group already closed",
166168
ctx.ID)
@@ -177,7 +179,7 @@ func (hg *defaultHarvesterGroup) Restart(ctx inputv2.Context, src Source) {
177179
ctx.Logger = ctx.Logger.With("source_file", sourceName)
178180
ctx.Logger.Debug("Restarting harvester for file")
179181

180-
if err := hg.tg.Go(startHarvester(ctx, hg, src, true, hg.metrics)); err != nil {
182+
if err := hg.tg.Go(startHarvester(ctx, hg, src, true, hg.metrics, hg.inputID)); err != nil {
181183
ctx.Logger.Warnf(
182184
"input %s tried to restart harvester with task group already closed",
183185
ctx.ID)
@@ -194,6 +196,7 @@ func startHarvester(
194196
src Source,
195197
restart bool,
196198
metrics *Metrics,
199+
inputID string,
197200
) func(context.Context) error {
198201
srcID := hg.identifier.ID(src)
199202

@@ -204,6 +207,14 @@ func startHarvester(
204207
ctx.Logger.Errorf("Harvester crashed with: %+v", err)
205208
hg.readers.remove(srcID)
206209
}
210+
211+
// Report any harvester error as a degraded state for the input
212+
if err != nil {
213+
ctx.StatusReporter.UpdateStatus(
214+
status.Degraded,
215+
fmt.Sprintf("Harvester for Filestream input %q failed: %s", inputID, err),
216+
)
217+
}
207218
}()
208219

209220
if restart {

filebeat/input/filestream/internal/input-logfile/harvester_test.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
3535
input "github.com/elastic/beats/v7/filebeat/input/v2"
3636
"github.com/elastic/beats/v7/libbeat/beat"
37+
"github.com/elastic/beats/v7/libbeat/management/status"
3738
"github.com/elastic/beats/v7/libbeat/tests/resources"
3839
"github.com/elastic/elastic-agent-libs/logp"
3940
"github.com/elastic/elastic-agent-libs/logp/logptest"
@@ -115,7 +116,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
115116
defer goroutinesChecker.WaitUntilOriginalCount()
116117

117118
wg.Add(1)
118-
hg.Start(input.Context{Logger: logp.L(), Cancelation: context.Background()}, source)
119+
hg.Start(input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}}, source)
119120

120121
// wait until harvester.Run is done
121122
wg.Wait()
@@ -167,10 +168,10 @@ func TestDefaultHarvesterGroup(t *testing.T) {
167168
source2 := &testSource{name: "/path/to/test/2"}
168169
wg.Add(2)
169170
hg.Start(
170-
input.Context{Logger: logp.L(), Cancelation: context.Background()},
171+
input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}},
171172
source1)
172173
hg.Start(
173-
input.Context{Logger: logp.L(), Cancelation: context.Background()},
174+
input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}},
174175
source2)
175176

176177
assert.Eventually(t,
@@ -221,7 +222,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
221222

222223
goroutinesChecker := resources.NewGoroutinesChecker()
223224

224-
hg.Start(input.Context{Logger: logp.L(), Cancelation: context.Background()}, source)
225+
hg.Start(input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}}, source)
225226

226227
goroutinesChecker.WaitUntilIncreased(1)
227228
// wait until harvester is started
@@ -241,7 +242,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
241242
t.Run("assert a harvester for same source cannot be started", func(t *testing.T) {
242243
mockHarvester := &mockHarvester{onRun: blockUntilCancelOnRun}
243244
hg := testDefaultHarvesterGroup(t, mockHarvester)
244-
inputCtx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
245+
inputCtx := input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}}
245246

246247
goroutinesChecker := resources.NewGoroutinesChecker()
247248
defer goroutinesChecker.WaitUntilOriginalCount()
@@ -274,7 +275,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
274275

275276
goroutinesChecker := resources.NewGoroutinesChecker()
276277

277-
hg.Start(input.Context{Logger: logp.L(), Cancelation: context.Background()}, source)
278+
hg.Start(input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}}, source)
278279

279280
// wait until harvester is stopped
280281
goroutinesChecker.WaitUntilOriginalCount()
@@ -295,7 +296,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
295296
goroutinesChecker := resources.NewGoroutinesChecker()
296297
defer goroutinesChecker.WaitUntilOriginalCount()
297298

298-
hg.Start(input.Context{Logger: logp.L(), Cancelation: context.Background()}, source)
299+
hg.Start(input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}}, source)
299300

300301
goroutinesChecker.WaitUntilOriginalCount()
301302

@@ -311,7 +312,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
311312
var wg sync.WaitGroup
312313
mockHarvester := &mockHarvester{onRun: correctOnRun, wg: &wg}
313314
hg := testDefaultHarvesterGroup(t, mockHarvester)
314-
inputCtx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
315+
inputCtx := input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}}
315316

316317
r, err := lock(inputCtx, hg.store, hg.identifier.ID(source))
317318
if err != nil {
@@ -346,7 +347,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
346347
mockHarvester := &mockHarvester{onRun: correctOnRun}
347348
hg := testDefaultHarvesterGroup(t, mockHarvester)
348349
hg.tg = task.NewGroup(0, 50*time.Millisecond, testLog, "")
349-
inputCtx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
350+
inputCtx := input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}}
350351

351352
goroutinesChecker := resources.NewGoroutinesChecker()
352353
defer goroutinesChecker.WaitUntilOriginalCount()
@@ -369,7 +370,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
369370
var wg sync.WaitGroup
370371
mockHarvester := &mockHarvester{onRun: blockUntilCancelOnRun, wg: &wg}
371372
hg := testDefaultHarvesterGroup(t, mockHarvester)
372-
inputCtx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
373+
inputCtx := input.Context{Logger: logp.L(), Cancelation: t.Context(), StatusReporter: mockStatusReporter{}}
373374

374375
goroutinesChecker := resources.NewGoroutinesChecker()
375376
defer goroutinesChecker.WaitUntilOriginalCount()
@@ -588,10 +589,10 @@ func panicOnRun(_ input.Context, _ Source, _ Cursor, _ Publisher) error {
588589

589590
type testLogger strings.Builder
590591

591-
func (tl *testLogger) Errorf(format string, args ...interface{}) {
592+
func (tl *testLogger) Errorf(format string, args ...any) {
592593
sb := (*strings.Builder)(tl)
593-
sb.WriteString(fmt.Sprintf(format, args...))
594-
sb.WriteString("\n")
594+
fmt.Fprintf(sb, format, args...)
595+
fmt.Fprint(sb, "\n")
595596
}
596597

597598
func (tl *testLogger) String() string {
@@ -681,3 +682,9 @@ func (mp *MockPipeline) ConnectWith(config beat.ClientConfig) (beat.Client, erro
681682
func (mp *MockPipeline) Connect() (beat.Client, error) {
682683
return mp.ConnectWith(beat.ClientConfig{})
683684
}
685+
686+
type mockStatusReporter struct{}
687+
688+
// UpdateStatus is a no-op
689+
func (m mockStatusReporter) UpdateStatus(status status.Status, msg string) {
690+
}

filebeat/input/filestream/internal/input-logfile/input.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func (inp *managedInput) Run(
8181
ctx.Logger,
8282
"harvester:"),
8383
metrics: metrics,
84+
inputID: inp.id,
8485
}
8586

8687
prospectorStore := inp.manager.getRetainedStore()

filebeat/input/v2/compat/compat.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,9 @@ func (r *runner) Start() {
161161

162162
err := r.input.Run(ctx, pc)
163163
if err != nil && !errors.Is(err, context.Canceled) {
164-
log.Errorf("Input '%s' failed with: %+v", name, err)
164+
errMsg := fmt.Sprintf("Input '%s' failed with: %+v", name, err)
165+
log.Error(errMsg)
166+
ctx.StatusReporter.UpdateStatus(status.Failed, errMsg)
165167
} else {
166168
log.Infof("Input '%s' stopped (goroutine)", name)
167169
}

filebeat/input/v2/input-cursor/input.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ func newInputACKHandler(log *logp.Logger) beat.EventListener {
224224
if n == 0 {
225225
return
226226
}
227+
//nolint:errcheck // We know it will always work
227228
private[last].(*updateOp).Execute(n)
228229
})
229230
}

libbeat/otelbeat/oteltest/oteltest.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ type CheckReceiversParams struct {
8787
// The function is called periodically until the assertions are met or the timeout is reached.
8888
AssertFunc func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs)
8989

90-
Status ExpectedStatus
90+
Status *componentstatus.Event
9191
NumRestarts int
9292
}
9393

@@ -200,12 +200,11 @@ func CheckReceivers(params CheckReceiversParams) {
200200
}
201201
require.NotNil(ct, host.getEvent(), "expected not nil, got nil")
202202

203-
if params.Status.Error == "" {
204-
require.Equalf(ct, host.Evt.Status(), componentstatus.StatusOK, "expected %v, got %v", params.Status.Status, host.Evt.Status())
205-
require.Nilf(ct, host.Evt.Err(), "expected nil, got %v", host.Evt.Err())
206-
} else {
207-
require.Equalf(ct, host.Evt.Status(), params.Status.Status, "expected %v, got %v", params.Status.Status, host.Evt.Status())
208-
require.ErrorContainsf(ct, host.Evt.Err(), params.Status.Error, "expected error to contain '%v': %v", params.Status.Error, host.Evt.Err())
203+
if params.Status != nil {
204+
assert.Equal(t, params.Status.Status(), params.Status.Status(), host.Evt.Status(),
205+
"expected status to be %v, got %v", params.Status.Status(), host.Evt.Status())
206+
assert.Equal(t, params.Status.Err(), host.Evt.Err())
207+
assert.Equal(t, params.Status.Attributes().AsRaw(), host.Evt.Attributes().AsRaw())
209208
}
210209

211210
if params.AssertFunc != nil {

testing/environments/snapshot.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
services:
44
elasticsearch:
5-
image: docker.elastic.co/elasticsearch/elasticsearch:9.3.0-a866d8e8-SNAPSHOT
5+
image: docker.elastic.co/elasticsearch/elasticsearch:9.3.0-70731819-SNAPSHOT
66
# When extend is used it merges healthcheck.tests, see:
77
# https://github.com/docker/compose/issues/8962
88
# healthcheck:
@@ -37,7 +37,7 @@ services:
3737
- "./docker/elasticsearch/users_roles:/usr/share/elasticsearch/config/users_roles"
3838

3939
logstash:
40-
image: docker.elastic.co/logstash/logstash:9.3.0-a866d8e8-SNAPSHOT
40+
image: docker.elastic.co/logstash/logstash:9.3.0-70731819-SNAPSHOT
4141
healthcheck:
4242
test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"]
4343
retries: 600
@@ -50,7 +50,7 @@ services:
5050
- 5055:5055
5151

5252
kibana:
53-
image: docker.elastic.co/kibana/kibana:9.3.0-a866d8e8-SNAPSHOT
53+
image: docker.elastic.co/kibana/kibana:9.3.0-70731819-SNAPSHOT
5454
environment:
5555
- "ELASTICSEARCH_USERNAME=kibana_system_user"
5656
- "ELASTICSEARCH_PASSWORD=testing"

0 commit comments

Comments
 (0)