Skip to content

Commit 966fd78

Browse files
committed
ENH: use default credentials for aws glue
Signed-off-by: George Chen <[email protected]>
1 parent f136f48 commit 966fd78

File tree

12 files changed

+102
-79
lines changed

12 files changed

+102
-79
lines changed

Diff for: data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerIT.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.junit.jupiter.api.BeforeEach;
1414
import org.junit.jupiter.api.Test;
1515
import org.mockito.Mock;
16+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
1617
import org.opensearch.dataprepper.metrics.PluginMetrics;
1718
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
1819
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -76,6 +77,9 @@ public class ConfluentKafkaProducerConsumerIT {
7677
@Mock
7778
private PluginConfigObservable pluginConfigObservable;
7879

80+
@Mock
81+
private AwsCredentialsSupplier awsCredentialsSupplier;
82+
7983
private KafkaSource kafkaSource;
8084
private TopicConsumerConfig topicConfig;
8185
private Counter counter;
@@ -164,7 +168,8 @@ public void KafkaProduceConsumerTest() {
164168
}
165169

166170
public void consumeRecords(String servers) {
167-
kafkaSource = new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, null, pluginConfigObservable);
171+
kafkaSource = new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
172+
null, pluginConfigObservable, awsCredentialsSupplier);
168173
kafkaSource.start(buffer);
169174
}
170175

Diff for: data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerWithSchemaRegistryIT.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.junit.jupiter.api.BeforeEach;
2020
import org.junit.jupiter.api.Test;
2121
import org.mockito.Mock;
22+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2223
import org.opensearch.dataprepper.metrics.PluginMetrics;
2324
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2425
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -130,6 +131,9 @@ public UserRecord(String name, Integer id, Number value) {
130131
@Mock
131132
private PluginConfigObservable pluginConfigObservable;
132133

134+
@Mock
135+
private AwsCredentialsSupplier awsCredentialsSupplier;
136+
133137
private KafkaSource kafkaSource;
134138
private TopicConsumerConfig jsonTopicConfig;
135139
private TopicConsumerConfig avroTopicConfig;
@@ -271,7 +275,8 @@ public void KafkaJsonProducerConsumerTestWithLatestSchemaVersion() {
271275

272276
public void consumeRecords(String servers, KafkaSourceConfig sourceConfig) {
273277
kafkaSource = new KafkaSource(
274-
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, null, pluginConfigObservable);
278+
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
279+
null, pluginConfigObservable, awsCredentialsSupplier);
275280
kafkaSource.start(buffer);
276281
}
277282

Diff for: data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.api.BeforeEach;
2121
import org.junit.jupiter.api.Test;
2222
import org.mockito.Mock;
23+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2324
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
2425
import org.opensearch.dataprepper.metrics.PluginMetrics;
2526
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -95,6 +96,9 @@ public class KafkaSourceJsonTypeIT {
9596
@Mock
9697
private PluginConfigObservable pluginConfigObservable;
9798

99+
@Mock
100+
private AwsCredentialsSupplier awsCredentialsSupplier;
101+
98102
private KafkaSource kafkaSource;
99103

100104
private Counter counter;
@@ -111,7 +115,8 @@ public class KafkaSourceJsonTypeIT {
111115
private byte[] headerValue2;
112116

113117
public KafkaSource createObjectUnderTest() {
114-
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
118+
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
119+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
115120
}
116121

117122
@BeforeEach

Diff for: data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.junit.jupiter.api.BeforeEach;
1717
import org.junit.jupiter.api.Test;
1818
import org.mockito.Mock;
19+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
1920
import org.opensearch.dataprepper.metrics.PluginMetrics;
2021
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2122
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -88,6 +89,9 @@ public class KafkaSourceMultipleAuthTypeIT {
8889
@Mock
8990
private PluginConfigObservable pluginConfigObservable;
9091

92+
@Mock
93+
private AwsCredentialsSupplier awsCredentialsSupplier;
94+
9195
private TopicConfig jsonTopic;
9296
private TopicConfig avroTopic;
9397

@@ -106,7 +110,8 @@ public class KafkaSourceMultipleAuthTypeIT {
106110

107111
public KafkaSource createObjectUnderTest() {
108112
return new KafkaSource(
109-
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
113+
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
114+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
110115
}
111116

112117
@BeforeEach

Diff for: data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceSaslPlainTextIT.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.api.extension.ExtendWith;
2121
import org.mockito.Mock;
2222
import org.mockito.junit.jupiter.MockitoExtension;
23+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2324
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
2425
import org.opensearch.dataprepper.metrics.PluginMetrics;
2526
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -103,6 +104,9 @@ public class KafkaSourceSaslPlainTextIT {
103104
@Mock
104105
private PluginConfigObservable pluginConfigObservable;
105106

107+
@Mock
108+
private AwsCredentialsSupplier awsCredentialsSupplier;
109+
106110
private KafkaSource kafkaSource;
107111

108112
private Counter counter;
@@ -115,7 +119,8 @@ public class KafkaSourceSaslPlainTextIT {
115119
private String testGroup;
116120

117121
public KafkaSource createObjectUnderTest() {
118-
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
122+
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
123+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
119124
}
120125

121126
@BeforeEach

Diff for: data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceSaslScramIT.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.api.extension.ExtendWith;
2121
import org.mockito.Mock;
2222
import org.mockito.junit.jupiter.MockitoExtension;
23+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2324
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
2425
import org.opensearch.dataprepper.metrics.PluginMetrics;
2526
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -103,6 +104,9 @@ public class KafkaSourceSaslScramIT {
103104
@Mock
104105
private PluginConfigObservable pluginConfigObservable;
105106

107+
@Mock
108+
private AwsCredentialsSupplier awsCredentialsSupplier;
109+
106110
private KafkaSource kafkaSource;
107111

108112
private Counter counter;
@@ -115,7 +119,8 @@ public class KafkaSourceSaslScramIT {
115119
private String testGroup;
116120

117121
public KafkaSource createObjectUnderTest() {
118-
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
122+
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
123+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
119124
}
120125

121126
@BeforeEach

Diff for: data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.junit.jupiter.api.BeforeEach;
2424
import org.junit.jupiter.api.Test;
2525
import org.mockito.Mock;
26+
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
27+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2628
import org.opensearch.dataprepper.metrics.PluginMetrics;
2729
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2830
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -41,6 +43,7 @@
4143
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
4244
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
4345
import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier;
46+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
4447

4548
import java.io.File;
4649
import java.io.IOException;
@@ -106,6 +109,12 @@ public class MskGlueRegistryMultiTypeIT {
106109
@Mock
107110
private PluginConfigObservable pluginConfigObservable;
108111

112+
@Mock
113+
private AwsCredentialsSupplier awsCredentialsSupplier;
114+
115+
@Mock
116+
private AwsCredentialsOptions awsCredentialsOptions;
117+
109118
private KafkaSource kafkaSource;
110119
private SourceTopicConfig jsonTopic;
111120
private SourceTopicConfig avroTopic;
@@ -132,7 +141,8 @@ public class MskGlueRegistryMultiTypeIT {
132141

133142
public KafkaSource createObjectUnderTest() {
134143
return new KafkaSource(
135-
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
144+
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
145+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
136146
}
137147

138148
@BeforeEach
@@ -197,6 +207,8 @@ public void setup() {
197207
encryptionConfig = mock(EncryptionConfig.class);
198208
when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
199209
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
210+
when(awsConfig.toCredentialsOptions()).thenReturn(awsCredentialsOptions);
211+
when(awsCredentialsSupplier.getProvider(awsCredentialsOptions)).thenReturn(DefaultCredentialsProvider.create());
200212
}
201213

202214
@Test

Diff for: data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public List<KafkaCustomConsumer> createConsumersForTopic(final KafkaConsumerConf
9898
Deserializer<Object> keyDeserializer = (Deserializer<Object>) serializationFactory.getDeserializer(PlaintextKafkaDataConfig.plaintextDataConfig(dataConfig));
9999
Deserializer<Object> valueDeserializer = null;
100100
if(schema == MessageFormat.PLAINTEXT) {
101-
valueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(kafkaConsumerConfig);
101+
valueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(kafkaConsumerConfig, awsContext);
102102
}
103103
if(valueDeserializer == null) {
104104
valueDeserializer = (Deserializer<Object>) serializationFactory.getDeserializer(dataConfig);

Diff for: data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.common.errors.BrokerNotAvailableException;
2020
import org.apache.kafka.common.serialization.StringDeserializer;
2121
import org.apache.kafka.connect.json.JsonDeserializer;
22+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2223
import org.opensearch.dataprepper.metrics.PluginMetrics;
2324
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2425
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
@@ -30,6 +31,7 @@
3031
import org.opensearch.dataprepper.model.record.Record;
3132
import org.opensearch.dataprepper.model.source.Source;
3233
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
34+
import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext;
3335
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
3436
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
3537
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
@@ -93,14 +95,16 @@ public class KafkaSource implements Source<Record<Event>> {
9395
private final List<ExecutorService> allTopicExecutorServices;
9496
private final List<KafkaCustomConsumer> allTopicConsumers;
9597
private final PluginConfigObservable pluginConfigObservable;
98+
private final AwsCredentialsSupplier awsCredentialsSupplier;
9699

97100
@DataPrepperPluginConstructor
98101
public KafkaSource(final KafkaSourceConfig sourceConfig,
99102
final PluginMetrics pluginMetrics,
100103
final AcknowledgementSetManager acknowledgementSetManager,
101104
final PipelineDescription pipelineDescription,
102105
final KafkaClusterConfigSupplier kafkaClusterConfigSupplier,
103-
final PluginConfigObservable pluginConfigObservable) {
106+
final PluginConfigObservable pluginConfigObservable,
107+
final AwsCredentialsSupplier awsCredentialsSupplier) {
104108
this.sourceConfig = sourceConfig;
105109
this.pluginMetrics = pluginMetrics;
106110
this.acknowledgementSetManager = acknowledgementSetManager;
@@ -110,6 +114,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
110114
this.allTopicExecutorServices = new ArrayList<>();
111115
this.allTopicConsumers = new ArrayList<>();
112116
this.pluginConfigObservable = pluginConfigObservable;
117+
this.awsCredentialsSupplier = awsCredentialsSupplier;
113118
this.updateConfig(kafkaClusterConfigSupplier);
114119
}
115120

@@ -186,7 +191,8 @@ public void start(Buffer<Record<Event>> buffer) {
186191
return new KafkaConsumer<String, GenericRecord>(consumerProperties);
187192
case PLAINTEXT:
188193
default:
189-
glueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(sourceConfig);
194+
final AwsContext awsContext = new AwsContext(sourceConfig, awsCredentialsSupplier);
195+
glueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(sourceConfig, awsContext);
190196
if (Objects.nonNull(glueDeserializer)) {
191197
return new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer);
192198
} else {

Diff for: data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java

+8-28
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
88
import org.opensearch.dataprepper.plugins.kafka.authenticator.DynamicSaslClientCallbackHandler;
99
import org.opensearch.dataprepper.plugins.kafka.authenticator.DynamicBasicCredentialsProvider;
10+
import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext;
1011
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
1112
import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig;
1213
import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig;
@@ -384,16 +385,18 @@ private static boolean checkEncryptionType(final EncryptionConfig encryptionConf
384385
return Objects.nonNull(encryptionConfig) && encryptionConfig.getType() == encryptionType;
385386
}
386387

387-
public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaConsumerConfig kafkaConsumerConfig) {
388-
configureAwsGlueCredentialsProvider(kafkaConsumerConfig.getAwsConfig());
388+
public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(
389+
final KafkaConsumerConfig kafkaConsumerConfig, final AwsContext awsContext) {
390+
final AwsConfig awsConfig = kafkaConsumerConfig.getAwsConfig();
391+
awsGlueCredentialsProvider = awsContext.getOrDefault(awsConfig);
389392
SchemaConfig schemaConfig = kafkaConsumerConfig.getSchemaConfig();
390393
if (Objects.isNull(schemaConfig) || schemaConfig.getType() != SchemaRegistryType.AWS_GLUE) {
391394
return null;
392395
}
393396
Map<String, Object> configs = new HashMap<>();
394-
final AwsConfig awsConfig = kafkaConsumerConfig.getAwsConfig();
395-
if (Objects.nonNull(awsConfig) && Objects.nonNull(awsConfig.getRegion())) {
396-
configs.put(AWSSchemaRegistryConstants.AWS_REGION, kafkaConsumerConfig.getAwsConfig().getRegion());
397+
final Region region = awsContext.getRegionOrDefault(awsConfig);
398+
if (Objects.nonNull(region)) {
399+
configs.put(AWSSchemaRegistryConstants.AWS_REGION, region.id());
397400
}
398401
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
399402
configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000");
@@ -406,28 +409,5 @@ public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaC
406409
glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(awsGlueCredentialsProvider, configs);
407410
return glueDeserializer;
408411
}
409-
410-
private static void configureAwsGlueCredentialsProvider(final AwsConfig awsConfig) {
411-
awsGlueCredentialsProvider = DefaultCredentialsProvider.create();
412-
if (Objects.nonNull(awsConfig) &&
413-
Objects.nonNull(awsConfig.getRegion()) && Objects.nonNull(awsConfig.getStsRoleArn())) {
414-
String sessionName = "data-prepper-kafka-session" + UUID.randomUUID();
415-
StsClient stsClient = StsClient.builder()
416-
.region(Region.of(awsConfig.getRegion()))
417-
.credentialsProvider(awsGlueCredentialsProvider)
418-
.build();
419-
awsGlueCredentialsProvider = StsAssumeRoleCredentialsProvider
420-
.builder()
421-
.stsClient(stsClient)
422-
.refreshRequest(
423-
AssumeRoleRequest
424-
.builder()
425-
.roleArn(awsConfig.getStsRoleArn())
426-
.roleSessionName(sessionName)
427-
.build()
428-
).build();
429-
}
430-
}
431-
432412
}
433413

Diff for: data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.mockito.junit.jupiter.MockitoExtension;
1818
import org.mockito.junit.jupiter.MockitoSettings;
1919
import org.mockito.quality.Strictness;
20+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2021
import org.opensearch.dataprepper.metrics.PluginMetrics;
2122
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2223
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -88,12 +89,16 @@ class KafkaSourceTest {
8889
@Mock
8990
private PluginConfigObservable pluginConfigObservable;
9091

92+
@Mock
93+
private AwsCredentialsSupplier awsCredentialsSupplier;
94+
9195
private static final String TEST_GROUP_ID = "testGroupId";
9296
private static final String TEST_CLIENT_ID = "testClientId";
9397

9498
public KafkaSource createObjectUnderTest() {
9599
return new KafkaSource(
96-
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
100+
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
101+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
97102
}
98103

99104
@BeforeEach

0 commit comments

Comments
 (0)