Skip to content

Commit 71f36fd

Browse files
authored
[receiver/awscloudwatch]: fix issue with polling of deleted log group while polling (#40571)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Resolves #38940 If a log group was deleted while a poll was being executed we were silently failing after the first log message. While examining the code I don't believe we were failing on future polls but can imagine that this change is an improvement as it now radiates a tad bit more on the `pollForLogs` function as it now actually returns up errors. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #38940 <!--Describe what testing was performed and which tests were added.--> #### Testing Unit testing the situation described in the issue #### Documentation changelog entry
1 parent 48498e5 commit 71f36fd

File tree

3 files changed

+258
-8
lines changed

3 files changed

+258
-8
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awscloudwatchreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fixes issue with autodiscovered groups that were deleted preventing logs during that poll
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38940]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/awscloudwatchreceiver/logs.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/aws/aws-sdk-go-v2/aws"
1414
"github.com/aws/aws-sdk-go-v2/config"
1515
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
16+
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
1617
"go.opentelemetry.io/collector/component"
1718
"go.opentelemetry.io/collector/consumer"
1819
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -268,7 +269,6 @@ func (l *logsReceiver) pollForLogs(ctx context.Context, pc groupRequest, startTi
268269

269270
for nextToken != nil {
270271
select {
271-
// if done, we want to stop processing paginated stream of events
272272
case _, ok := <-l.doneChan:
273273
if !ok {
274274
return nil
@@ -277,17 +277,24 @@ func (l *logsReceiver) pollForLogs(ctx context.Context, pc groupRequest, startTi
277277
input := pc.request(l.maxEventsPerRequest, *nextToken, &startTime, &endTime)
278278
resp, err := l.client.FilterLogEvents(ctx, input)
279279
if err != nil {
280-
l.settings.Logger.Error("unable to retrieve logs from cloudatch",
281-
zap.String("logGroup", logGroup),
282-
zap.Error(err))
283-
break
280+
var resourceNotFoundException *types.ResourceNotFoundException
281+
if errors.As(err, &resourceNotFoundException) {
282+
l.settings.Logger.Warn("log group no longer exists, skipping",
283+
zap.String("logGroup", logGroup),
284+
zap.Error(err))
285+
return fmt.Errorf("log group %s no longer exists: %w", logGroup, err)
286+
}
287+
return fmt.Errorf("failed to retrieve logs from log group %s: %w", logGroup, err)
284288
}
289+
285290
observedTime := pcommon.NewTimestampFromTime(time.Now())
286291
logs := l.processEvents(observedTime, logGroup, resp)
287292
if logs.LogRecordCount() > 0 {
288293
if err = l.consumer.ConsumeLogs(ctx, logs); err != nil {
289-
l.settings.Logger.Error("unable to consume logs", zap.Error(err))
290-
break
294+
l.settings.Logger.Error("unable to consume logs",
295+
zap.String("logGroup", logGroup),
296+
zap.Error(err))
297+
return fmt.Errorf("failed to consume logs from log group %s: %w", logGroup, err)
291298
}
292299
}
293300
nextToken = resp.NextToken
@@ -340,7 +347,6 @@ func (l *logsReceiver) processEvents(now pcommon.Timestamp, logGroupName string,
340347
}
341348
group[logStreamName] = resourceLogs
342349

343-
// Ensure one scopeLogs is initialized so we can handle in standardized way going forward.
344350
_ = resourceLogs.ScopeLogs().AppendEmpty()
345351
}
346352

receiver/awscloudwatchreceiver/logs_test.go

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,223 @@ func TestAutodiscoverLimit(t *testing.T) {
274274
require.Len(t, grs, cfg.Logs.Groups.AutodiscoverConfig.Limit)
275275
}
276276

277+
func TestDeletedLogGroupContinuesPolling(t *testing.T) {
278+
cfg := createDefaultConfig().(*Config)
279+
cfg.Region = "us-west-1"
280+
cfg.Logs.PollInterval = 1 * time.Second
281+
cfg.Logs.Groups = GroupConfig{
282+
NamedConfigs: map[string]StreamConfig{
283+
"existing-group": {
284+
Names: []*string{aws.String("stream1")},
285+
},
286+
"deleted-group": {
287+
Names: []*string{aws.String("stream2")},
288+
},
289+
},
290+
}
291+
292+
sink := &consumertest.LogsSink{}
293+
logsRcvr := newLogsReceiver(cfg, receiver.Settings{
294+
TelemetrySettings: component.TelemetrySettings{
295+
Logger: zap.NewNop(),
296+
},
297+
}, sink)
298+
mc := &mockClient{}
299+
300+
mc.On("FilterLogEvents", mock.Anything, mock.MatchedBy(func(input *cloudwatchlogs.FilterLogEventsInput) bool {
301+
return *input.LogGroupName == "existing-group"
302+
}), mock.Anything).Return(&cloudwatchlogs.FilterLogEventsOutput{
303+
Events: []types.FilteredLogEvent{
304+
{
305+
EventId: aws.String("event1"),
306+
LogStreamName: aws.String("stream1"),
307+
Message: aws.String("test message"),
308+
Timestamp: aws.Int64(time.Now().UnixMilli()),
309+
},
310+
},
311+
NextToken: nil,
312+
}, nil)
313+
314+
mc.On("FilterLogEvents", mock.Anything, mock.MatchedBy(func(input *cloudwatchlogs.FilterLogEventsInput) bool {
315+
return *input.LogGroupName == "deleted-group"
316+
}), mock.Anything).Return((*cloudwatchlogs.FilterLogEventsOutput)(nil), &types.ResourceNotFoundException{
317+
Message: aws.String("The specified log group does not exist"),
318+
})
319+
320+
logsRcvr.client = mc
321+
322+
err := logsRcvr.Start(context.Background(), componenttest.NewNopHost())
323+
require.NoError(t, err)
324+
325+
require.Eventually(t, func() bool {
326+
return sink.LogRecordCount() > 0
327+
}, 2*time.Second, 10*time.Millisecond)
328+
logs := sink.AllLogs()
329+
require.Len(t, logs, 1)
330+
require.Equal(t, 1, logs[0].LogRecordCount())
331+
332+
logRecord := logs[0].ResourceLogs().At(0)
333+
require.Equal(t, "existing-group", logRecord.Resource().Attributes().AsRaw()["cloudwatch.log.group.name"])
334+
335+
err = logsRcvr.Shutdown(context.Background())
336+
require.NoError(t, err)
337+
338+
// Verify all mock expectations were met
339+
mc.AssertExpectations(t)
340+
}
341+
342+
func TestDeletedLogGroupDuringAutodiscover(t *testing.T) {
343+
cfg := createDefaultConfig().(*Config)
344+
cfg.Region = "us-west-1"
345+
cfg.Logs.PollInterval = 1 * time.Second
346+
cfg.Logs.Groups = GroupConfig{
347+
AutodiscoverConfig: &AutodiscoverConfig{
348+
Limit: 3,
349+
Prefix: "/aws/",
350+
},
351+
}
352+
353+
firstPollDone := make(chan struct{}, 1)
354+
355+
sink := &consumertest.LogsSink{}
356+
logsRcvr := newLogsReceiver(cfg, receiver.Settings{
357+
TelemetrySettings: component.TelemetrySettings{
358+
Logger: zap.NewNop(),
359+
},
360+
}, sink)
361+
362+
mc := &mockClient{}
363+
364+
// First DescribeLogGroups call returns two groups
365+
mc.On("DescribeLogGroups", mock.Anything, mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
366+
return input.LogGroupNamePrefix != nil && *input.LogGroupNamePrefix == "/aws/"
367+
}), mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
368+
LogGroups: []types.LogGroup{
369+
{
370+
LogGroupName: aws.String("/aws/working-group"),
371+
},
372+
{
373+
LogGroupName: aws.String("/aws/delete-group"),
374+
},
375+
{
376+
LogGroupName: aws.String("/aws/third-group"),
377+
},
378+
},
379+
NextToken: nil,
380+
}, nil).Once()
381+
382+
// Second DescribeLogGroups call returns working group plus a new group
383+
mc.On("DescribeLogGroups", mock.Anything, mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
384+
return input.LogGroupNamePrefix != nil && *input.LogGroupNamePrefix == "/aws/"
385+
}), mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
386+
LogGroups: []types.LogGroup{
387+
{
388+
LogGroupName: aws.String("/aws/working-group"),
389+
},
390+
{
391+
LogGroupName: aws.String("/aws/third-group"),
392+
},
393+
},
394+
NextToken: nil,
395+
}, nil).Run(func(_ mock.Arguments) {
396+
close(firstPollDone)
397+
})
398+
399+
// Setup mock for working-group to return normal logs
400+
mc.On("FilterLogEvents", mock.Anything, mock.MatchedBy(func(input *cloudwatchlogs.FilterLogEventsInput) bool {
401+
return input.LogGroupName != nil && *input.LogGroupName == "/aws/working-group"
402+
}), mock.Anything).Return(&cloudwatchlogs.FilterLogEventsOutput{
403+
Events: []types.FilteredLogEvent{
404+
{
405+
EventId: aws.String("event1"),
406+
LogStreamName: aws.String("stream1"),
407+
Message: aws.String("test message"),
408+
Timestamp: aws.Int64(time.Now().UnixMilli()),
409+
},
410+
},
411+
NextToken: nil,
412+
}, nil)
413+
414+
// Setup mock for /aws/delete-group to return ResourceNotFoundException
415+
mc.On("FilterLogEvents", mock.Anything, mock.MatchedBy(func(input *cloudwatchlogs.FilterLogEventsInput) bool {
416+
return input.LogGroupName != nil && *input.LogGroupName == "/aws/delete-group"
417+
}), mock.Anything).Return(&cloudwatchlogs.FilterLogEventsOutput{
418+
Events: []types.FilteredLogEvent{
419+
{
420+
EventId: aws.String("event1"),
421+
LogStreamName: aws.String("stream1"),
422+
Message: aws.String("test message"),
423+
Timestamp: aws.Int64(time.Now().UnixMilli()),
424+
},
425+
},
426+
NextToken: aws.String("next"),
427+
}, nil).Once()
428+
429+
mc.On("FilterLogEvents", mock.Anything, mock.MatchedBy(func(input *cloudwatchlogs.FilterLogEventsInput) bool {
430+
return input.LogGroupName != nil && *input.LogGroupName == "/aws/delete-group" && input.NextToken != nil && *input.NextToken == "next"
431+
}), mock.Anything).Return(&cloudwatchlogs.FilterLogEventsOutput{}, &types.ResourceNotFoundException{
432+
Message: aws.String("The specified log group does not exist"),
433+
}).Once()
434+
435+
// Setup mock for /aws/working-group or /aws/third-group to return normal logs
436+
mc.On("FilterLogEvents",
437+
mock.Anything,
438+
mock.MatchedBy(func(input *cloudwatchlogs.FilterLogEventsInput) bool {
439+
return input.LogGroupName != nil && (*input.LogGroupName == "/aws/working-group" || *input.LogGroupName == "/aws/third-group")
440+
}), mock.Anything).Return(&cloudwatchlogs.FilterLogEventsOutput{
441+
Events: []types.FilteredLogEvent{
442+
{
443+
EventId: aws.String("event2"),
444+
LogStreamName: aws.String("stream2"),
445+
Message: aws.String("test message from group"),
446+
Timestamp: aws.Int64(time.Now().UnixMilli()),
447+
},
448+
},
449+
NextToken: nil,
450+
}, nil)
451+
452+
logsRcvr.client = mc
453+
454+
err := logsRcvr.Start(context.Background(), componenttest.NewNopHost())
455+
require.NoError(t, err)
456+
457+
// Wait for first poll to complete
458+
require.Eventually(t, func() bool {
459+
select {
460+
case <-firstPollDone:
461+
return true
462+
default:
463+
return false
464+
}
465+
}, 15*time.Second, 10*time.Millisecond)
466+
467+
err = logsRcvr.Shutdown(context.Background())
468+
require.NoError(t, err)
469+
470+
groupNames := make([]string, 0, len(logsRcvr.groupRequests))
471+
for _, gr := range logsRcvr.groupRequests {
472+
groupNames = append(groupNames, gr.groupName())
473+
}
474+
// validate deleted group is not in the groupRequests
475+
require.ElementsMatch(t, []string{"/aws/working-group", "/aws/third-group"}, groupNames)
476+
logs := sink.AllLogs()
477+
require.GreaterOrEqual(t, len(logs), 3)
478+
479+
firstWorkingGroupLog := logs[0].ResourceLogs().At(0)
480+
require.Equal(t, "/aws/working-group", firstWorkingGroupLog.Resource().Attributes().AsRaw()["cloudwatch.log.group.name"])
481+
require.Equal(t, 1, firstWorkingGroupLog.ScopeLogs().Len())
482+
483+
secondWorkingGroupLog := logs[1].ResourceLogs().At(0)
484+
require.Equal(t, "/aws/delete-group", secondWorkingGroupLog.Resource().Attributes().AsRaw()["cloudwatch.log.group.name"])
485+
require.Equal(t, 1, secondWorkingGroupLog.ScopeLogs().Len())
486+
487+
thirdWorkingGroupLog := logs[2].ResourceLogs().At(0)
488+
require.Equal(t, "/aws/third-group", thirdWorkingGroupLog.Resource().Attributes().AsRaw()["cloudwatch.log.group.name"])
489+
require.Equal(t, 1, thirdWorkingGroupLog.ScopeLogs().Len())
490+
491+
mc.AssertExpectations(t)
492+
}
493+
277494
func defaultMockClient() client {
278495
mc := &mockClient{}
279496
mc.On("DescribeLogGroups", mock.Anything, mock.Anything, mock.Anything).Return(

0 commit comments

Comments
 (0)