Skip to content

Commit dfd00e3

Browse files
authored
[receiver/awss3] use object tags to checkpoint processed objects (#47397)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description introduce a `skip_ingesting_tagged_objects` to the s3 receiver. This builds on `tag_object_after_ingestion` to checkpoint the progress of the receiver and avoid reprocessing objects <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #47396 <!--Describe what testing was performed and which tests were added.--> #### Testing unit tests <!--Describe the documentation added.--> #### Documentation updated README <!--Please delete paragraphs that you did not use before submitting.-->
1 parent d38d1f9 commit dfd00e3

11 files changed

Lines changed: 649 additions & 19 deletions

File tree

.chloggen/s3-skip-reingestion.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/awss3
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: add a flag to the s3 receiver to skip objects previously tagged as ingested.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [47396]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/awss3receiver/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ The following exporter configuration parameters are supported.
5252
| `notifications:` | | | |
5353
| `opampextension` | Name of the OpAMP Extension to use to send ingest progress notifications. | | |
5454
| `tag_object_after_ingestion` | If enabled the receiver will attempt to tag the object after successfully ingesting it. | false | Optional |
55+
| `skip_ingesting_tagged_objects` | If enabled the receiver will skip objects tagged by `tag_object_after_ingestion`. This can be used as a checkpointing mechanism, and requires an additional `s3:GetObjectTagging` permission | false | Optional |
5556

5657
There are two modes of operation:
5758

receiver/awss3receiver/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type S3DownloaderConfig struct {
2727
EndpointPartitionID string `mapstructure:"endpoint_partition_id"`
2828
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
2929
TagObjectAfterIngestion bool `mapstructure:"tag_object_after_ingestion"`
30+
SkipIngestingTaggedObjects bool `mapstructure:"skip_ingesting_tagged_objects"`
3031
}
3132

3233
// SQSConfig holds SQS queue configuration for receiving object change notifications.

receiver/awss3receiver/config.schema.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ $defs:
4040
type: string
4141
s3_prefix:
4242
type: string
43+
skip_ingesting_tagged_objects:
44+
type: boolean
4345
tag_object_after_ingestion:
4446
type: boolean
4547
sqs_config:

receiver/awss3receiver/config_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func TestConfig_Validate_Valid(t *testing.T) {
3737
EndpointPartitionID: "aws",
3838
S3ForcePathStyle: false,
3939
TagObjectAfterIngestion: true,
40+
SkipIngestingTaggedObjects: false,
4041
},
4142
StartTime: "2024-01-01",
4243
EndTime: "2024-01-01",
@@ -59,6 +60,7 @@ func TestConfig_Validate_Valid(t *testing.T) {
5960
EndpointPartitionID: "aws",
6061
S3ForcePathStyle: false,
6162
TagObjectAfterIngestion: true,
63+
SkipIngestingTaggedObjects: false,
6264
},
6365
SQS: &SQSConfig{
6466
QueueURL: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue",
@@ -96,6 +98,7 @@ func TestLoadConfig(t *testing.T) {
9698
FilePrefixIncludeTelemetryType: true,
9799
EndpointPartitionID: "aws",
98100
TagObjectAfterIngestion: false,
101+
SkipIngestingTaggedObjects: false,
99102
},
100103
StartTime: "2024-01-31 15:00",
101104
EndTime: "2024-02-03",
@@ -113,6 +116,7 @@ func TestLoadConfig(t *testing.T) {
113116
FilePrefixIncludeTelemetryType: false,
114117
EndpointPartitionID: "aws",
115118
TagObjectAfterIngestion: false,
119+
SkipIngestingTaggedObjects: false,
116120
},
117121
StartTime: "2024-01-31 15:00",
118122
EndTime: "2024-02-03",
@@ -141,6 +145,7 @@ func TestLoadConfig(t *testing.T) {
141145
FilePrefixIncludeTelemetryType: true,
142146
EndpointPartitionID: "aws",
143147
TagObjectAfterIngestion: true,
148+
SkipIngestingTaggedObjects: true,
144149
},
145150
StartTime: "2024-01-31T15:00:00Z",
146151
EndTime: "2024-02-03T00:00:00Z",
@@ -156,6 +161,7 @@ func TestLoadConfig(t *testing.T) {
156161
FilePrefixIncludeTelemetryType: true,
157162
EndpointPartitionID: "aws",
158163
TagObjectAfterIngestion: false,
164+
SkipIngestingTaggedObjects: false,
159165
},
160166
SQS: &SQSConfig{
161167
QueueURL: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue",

receiver/awss3receiver/s3intf.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type ListObjectsAPI interface {
3030

3131
type SingleObjectAPI interface {
3232
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
33+
GetObjectTagging(ctx context.Context, params *s3.GetObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.GetObjectTaggingOutput, error)
3334
PutObjectTagging(ctx context.Context, params *s3.PutObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error)
3435
}
3536

@@ -101,3 +102,23 @@ func tagS3Object(ctx context.Context, client SingleObjectAPI, bucket, key string
101102
_, err := client.PutObjectTagging(ctx, &params)
102103
return err
103104
}
105+
106+
// hasIngestedTag checks if an S3 object has the ingested tag set
107+
func hasIngestedTag(ctx context.Context, client SingleObjectAPI, bucket, key string) (bool, error) {
108+
params := s3.GetObjectTaggingInput{
109+
Bucket: &bucket,
110+
Key: &key,
111+
}
112+
output, err := client.GetObjectTagging(ctx, &params)
113+
if err != nil {
114+
return false, err
115+
}
116+
117+
for _, tag := range output.TagSet {
118+
if tag.Key != nil && *tag.Key == ingestedTag &&
119+
tag.Value != nil && *tag.Value == ingestedStatus {
120+
return true, nil
121+
}
122+
}
123+
return false, nil
124+
}

receiver/awss3receiver/s3reader.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type s3TimeBasedReader struct {
2929
endTime time.Time
3030
notifier statusNotifier
3131
tagObjectAfterIngestion bool
32+
skipIngestingTaggedObjects bool
3233
}
3334

3435
func newS3TimeBasedReader(ctx context.Context, notifier statusNotifier, logger *zap.Logger, cfg *Config) (*s3TimeBasedReader, error) {
@@ -69,6 +70,7 @@ func newS3TimeBasedReader(ctx context.Context, notifier statusNotifier, logger *
6970
endTime: endTime,
7071
notifier: notifier,
7172
tagObjectAfterIngestion: cfg.S3Downloader.TagObjectAfterIngestion,
73+
skipIngestingTaggedObjects: cfg.S3Downloader.SkipIngestingTaggedObjects,
7274
}, nil
7375
}
7476

@@ -146,6 +148,21 @@ func (s3Reader *s3TimeBasedReader) readTelemetryForTime(ctx context.Context, t t
146148
s3Reader.logger.Info("No telemetry found for time", zap.String("prefix", prefix), zap.Time("time", t))
147149
} else {
148150
for _, obj := range page.Contents {
151+
if s3Reader.skipIngestingTaggedObjects {
152+
var hasTag bool
153+
hasTag, err = hasIngestedTag(ctx, s3Reader.singleObjectClient, s3Reader.s3Bucket, *obj.Key)
154+
if err != nil {
155+
s3Reader.logger.Error("Failed to check object tags",
156+
zap.String("key", *obj.Key),
157+
zap.Error(err))
158+
return err
159+
} else if hasTag {
160+
s3Reader.logger.Info("Skipping already ingested object",
161+
zap.String("key", *obj.Key))
162+
continue
163+
}
164+
}
165+
149166
data, err := retrieveS3Object(ctx, s3Reader.singleObjectClient, s3Reader.s3Bucket, *obj.Key)
150167
if err != nil {
151168
return err

receiver/awss3receiver/s3reader_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/aws/aws-sdk-go-v2/aws"
1516
"github.com/aws/aws-sdk-go-v2/service/s3"
1617
"github.com/aws/aws-sdk-go-v2/service/s3/types"
1718
"github.com/stretchr/testify/require"
@@ -185,6 +186,7 @@ func Test_s3Reader_getObjectPrefixForTime(t *testing.T) {
185186

186187
type mockSingleObjectAPI struct {
187188
getObjectFunc func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
189+
getObjectTaggingFunc func(ctx context.Context, params *s3.GetObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.GetObjectTaggingOutput, error)
188190
putObjectTaggingFunc func(ctx context.Context, params *s3.PutObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error)
189191
}
190192

@@ -195,6 +197,14 @@ func (m *mockSingleObjectAPI) GetObject(ctx context.Context, params *s3.GetObjec
195197
return nil, errors.New("GetObject not mocked")
196198
}
197199

200+
func (m *mockSingleObjectAPI) GetObjectTagging(ctx context.Context, params *s3.GetObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.GetObjectTaggingOutput, error) {
201+
if m.getObjectTaggingFunc != nil {
202+
return m.getObjectTaggingFunc(ctx, params, optFns...)
203+
}
204+
// Default to empty tag set if no mock function provided
205+
return &s3.GetObjectTaggingOutput{TagSet: []types.Tag{}}, nil
206+
}
207+
198208
func (m *mockSingleObjectAPI) PutObjectTagging(ctx context.Context, params *s3.PutObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error) {
199209
if m.putObjectTaggingFunc != nil {
200210
return m.putObjectTaggingFunc(ctx, params, optFns...)
@@ -294,6 +304,93 @@ func Test_readTelemetryForTime(t *testing.T) {
294304
require.NoError(t, err)
295305
}
296306

307+
func Test_readTelemetryForTime_skipTaggedObjects(t *testing.T) {
308+
testKey1 := "year=2021/month=02/day=01/hour=17/minute=32/traces_1"
309+
testKey2 := "year=2021/month=02/day=01/hour=17/minute=32/traces_2"
310+
testKey3 := "year=2021/month=02/day=01/hour=17/minute=32/traces_3"
311+
reader := s3TimeBasedReader{
312+
listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager {
313+
t.Helper()
314+
require.Equal(t, "bucket", *params.Bucket)
315+
require.Equal(t, "year=2021/month=02/day=01/hour=17/minute=32/traces_", *params.Prefix)
316+
317+
return &mockListObjectsV2Pager{
318+
Pages: []*s3.ListObjectsV2Output{
319+
{
320+
Contents: []types.Object{
321+
{
322+
// Not tagged
323+
Key: &testKey1,
324+
},
325+
},
326+
},
327+
{
328+
Contents: []types.Object{
329+
{
330+
// Already tagged by the receiver as ingested
331+
Key: &testKey2,
332+
},
333+
},
334+
},
335+
{
336+
Contents: []types.Object{
337+
{
338+
// Has a tag, but not one set by the receiver
339+
Key: &testKey3,
340+
},
341+
},
342+
},
343+
},
344+
}
345+
}),
346+
singleObjectClient: &mockSingleObjectAPI{
347+
getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
348+
t.Helper()
349+
require.Equal(t, "bucket", *params.Bucket)
350+
// testKey2 should not be fetched because it has the ingested tag
351+
require.Contains(t, []string{testKey1, testKey3}, *params.Key)
352+
return &s3.GetObjectOutput{
353+
Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))),
354+
}, nil
355+
},
356+
getObjectTaggingFunc: func(_ context.Context, params *s3.GetObjectTaggingInput, _ ...func(*s3.Options)) (*s3.GetObjectTaggingOutput, error) {
357+
t.Helper()
358+
require.Equal(t, "bucket", *params.Bucket)
359+
require.Contains(t, []string{testKey1, testKey2, testKey3}, *params.Key)
360+
var tagSet []types.Tag
361+
switch *params.Key {
362+
case testKey2:
363+
tagSet = []types.Tag{{Key: aws.String(ingestedTag), Value: aws.String(ingestedStatus)}}
364+
case testKey3:
365+
tagSet = []types.Tag{{Key: aws.String("env"), Value: aws.String("dev")}}
366+
}
367+
return &s3.GetObjectTaggingOutput{TagSet: tagSet}, nil
368+
},
369+
},
370+
logger: zap.NewNop(),
371+
s3Bucket: "bucket",
372+
s3PartitionFormat: s3PartitionFormatDefault,
373+
S3PartitionTimeLocation: time.UTC,
374+
s3Prefix: "",
375+
filePrefix: "",
376+
filePrefixIncludeTelemetryType: true,
377+
startTime: testTime,
378+
endTime: testTime.Add(time.Minute),
379+
skipIngestingTaggedObjects: true,
380+
}
381+
382+
dataCallbackKeys := make([]string, 0)
383+
384+
err := reader.readTelemetryForTime(t.Context(), testTime, "traces", func(_ context.Context, key string, data []byte) error {
385+
t.Helper()
386+
require.Equal(t, "this is the body of the object", string(data))
387+
dataCallbackKeys = append(dataCallbackKeys, key)
388+
return nil
389+
})
390+
require.Equal(t, []string{testKey1, testKey3}, dataCallbackKeys)
391+
require.NoError(t, err)
392+
}
393+
297394
func Test_readTelemetryForTime_GetObjectError(t *testing.T) {
298395
testKey := "year=2021/month=02/day=01/hour=17/minute=32/traces_1"
299396
testError := errors.New("test error")
@@ -757,6 +854,65 @@ func Test_readTelemetryForTime_TagFailure(t *testing.T) {
757854
require.Equal(t, []string{testKey}, dataCallbackKeys, "Data should still be processed")
758855
}
759856

857+
func Test_readTelemetryForTime_GetTagError(t *testing.T) {
858+
testKey := "year=2023/month=01/day=02/hour=03/minute=04/traces_test"
859+
860+
reader := &s3TimeBasedReader{
861+
listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager {
862+
require.Equal(t, "bucket", *params.Bucket)
863+
require.Equal(t, "year=2023/month=01/day=02/hour=03/minute=04/traces_", *params.Prefix)
864+
865+
return &mockListObjectsV2Pager{
866+
Pages: []*s3.ListObjectsV2Output{
867+
{
868+
Contents: []types.Object{
869+
{Key: &testKey},
870+
},
871+
},
872+
},
873+
}
874+
}),
875+
singleObjectClient: &mockSingleObjectAPI{
876+
getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
877+
t.Helper()
878+
require.Equal(t, "bucket", *params.Bucket)
879+
require.Equal(t, testKey, *params.Key)
880+
return &s3.GetObjectOutput{
881+
Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))),
882+
}, nil
883+
},
884+
getObjectTaggingFunc: func(_ context.Context, params *s3.GetObjectTaggingInput, _ ...func(*s3.Options)) (*s3.GetObjectTaggingOutput, error) {
885+
t.Helper()
886+
require.Equal(t, "bucket", *params.Bucket)
887+
require.Equal(t, testKey, *params.Key)
888+
return nil, errors.New("failed to get object tags")
889+
},
890+
},
891+
logger: zap.NewNop(),
892+
s3Bucket: "bucket",
893+
s3PartitionFormat: s3PartitionFormatDefault,
894+
S3PartitionTimeLocation: time.UTC,
895+
filePrefix: "",
896+
filePrefixIncludeTelemetryType: true,
897+
tagObjectAfterIngestion: true,
898+
skipIngestingTaggedObjects: true,
899+
}
900+
901+
testTime, err := time.Parse(time.RFC3339, "2023-01-02T03:04:05Z")
902+
require.NoError(t, err)
903+
904+
dataCallbackKeys := make([]string, 0)
905+
err = reader.readTelemetryForTime(t.Context(), testTime, "traces", func(_ context.Context, key string, _ []byte) error {
906+
t.Helper()
907+
dataCallbackKeys = append(dataCallbackKeys, key)
908+
return nil
909+
})
910+
911+
require.Error(t, err)
912+
require.Contains(t, err.Error(), "failed to get object tags")
913+
require.Empty(t, dataCallbackKeys, "No data should be processed when GetObjectTagging fails")
914+
}
915+
760916
func Test_determineTimestep(t *testing.T) {
761917
tests := []struct {
762918
name string

0 commit comments

Comments
 (0)