Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DLQ support to CloudWatchLogs sink #5542

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

kkondaka
Copy link
Collaborator

Description

Add DLQ support to CloudWatchLogs sink

  • Added common DLQ push handler to failure common package
  • Added DLQ support to cloudwatch using the common DLQ

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • [X ] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [X ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

chenqi0805
chenqi0805 previously approved these changes Mar 24, 2025
private final CloudWatchLogsDispatcher cloudWatchLogsDispatcher;
private final Buffer buffer;
private final CloudWatchLogsLimits cloudWatchLogsLimits;
private List<EventHandle> bufferedEventHandles;
//private List<EventHandle> bufferedEventHandles;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments that can be removed in this file

Comment on lines +65 to +75
if (cloudWatchLogsSinkConfig.getDlq() != null) {
String region = awsConfig.getAwsRegion().toString();
String role = awsConfig.getAwsStsRoleArn();
dlqPushHandler = new DlqPushHandler(pluginFactory, pluginSetting, pluginMetrics, cloudWatchLogsSinkConfig.getDlq(), region, role, "cloudWatchLogs");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use AwsCredentialsSupplier default region and role when this is not specified in the DLQ config? We shold also make sure this DLQ supports the sts header overrides

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just following what we do for opensearch sink dlq. There is no point in doing much different because pipeline DLQ will make replace this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between using awsCredentialsSupplier.getDefaultRegion() vs awsConfig.getAwsRegion()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed comment


public DlqPushHandler(final PluginFactory pluginFactory, final PluginSetting pluginSetting, final PluginMetrics pluginMetrics,
final PluginModel dlqConfig, final String region, final String role, final String metricsPrefix) {
Map<String, Object> dlqSettings = dlqConfig.getPluginSettings();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the sts_header_overrides

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just following what we do for opensearch sink dlq. There is no point in doing much different because pipeline DLQ will make replace this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you aren't doing it exactly the same. In OpenSearch sink we pass the full dlq config here (

), which includes sts header overrides

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graytaylor0 Yes. That's there in data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/ The config is under s3: not under dlq: It is already supported

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if users pass

dlq:
  s3:
    sts_header_overrides:
        ....

to cloudwatch sink those will get applied?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

@@ -27,11 +28,14 @@ private CloudWatchLogsClientFactory() {
* @return CloudWatchLogsClient used to interact with CloudWatch Logs services.
*/
public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialOptions(awsConfig);
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);
final AwsCredentialsProvider awsCredentialsProvider = awsConfig != null ? awsCredentialsSupplier.getProvider(convertToCredentialOptions(awsConfig)) : awsCredentialsSupplier.getProvider(AwsCredentialsOptions.defaultOptionsWithDefaultCredentialsProvider());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not quite write. All you need to do here is to pass an empty AwsCredentialsOptions to this method here. Using defaultOptionsWithDefaultCredentialProvider() will use the default credentials of the data prepper node, not the default role configured

kkondaka added 3 commits April 1, 2025 00:03
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants