Skip to content

Commit 6c88599

Browse files
authored
ENH: use default credentials for aws glue registry in kafka source (#5537)
* ENH: use default credentials for aws glue Signed-off-by: George Chen <[email protected]>
1 parent 7c72188 commit 6c88599

File tree

12 files changed

+114
-79
lines changed

12 files changed

+114
-79
lines changed

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerIT.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.junit.jupiter.api.BeforeEach;
1414
import org.junit.jupiter.api.Test;
1515
import org.mockito.Mock;
16+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
1617
import org.opensearch.dataprepper.metrics.PluginMetrics;
1718
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
1819
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -76,6 +77,9 @@ public class ConfluentKafkaProducerConsumerIT {
7677
@Mock
7778
private PluginConfigObservable pluginConfigObservable;
7879

80+
@Mock
81+
private AwsCredentialsSupplier awsCredentialsSupplier;
82+
7983
private KafkaSource kafkaSource;
8084
private TopicConsumerConfig topicConfig;
8185
private Counter counter;
@@ -103,6 +107,7 @@ public void setup() {
103107
receivedRecords = new ArrayList<>();
104108
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
105109
pipelineDescription = mock(PipelineDescription.class);
110+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
106111
when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig);
107112
when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig);
108113
when(sourceConfig.getAuthConfig()).thenReturn(authConfig);
@@ -164,7 +169,8 @@ public void KafkaProduceConsumerTest() {
164169
}
165170

166171
public void consumeRecords(String servers) {
167-
kafkaSource = new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, null, pluginConfigObservable);
172+
kafkaSource = new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
173+
null, pluginConfigObservable, awsCredentialsSupplier);
168174
kafkaSource.start(buffer);
169175
}
170176

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerWithSchemaRegistryIT.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.junit.jupiter.api.BeforeEach;
2020
import org.junit.jupiter.api.Test;
2121
import org.mockito.Mock;
22+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2223
import org.opensearch.dataprepper.metrics.PluginMetrics;
2324
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2425
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -130,6 +131,9 @@ public UserRecord(String name, Integer id, Number value) {
130131
@Mock
131132
private PluginConfigObservable pluginConfigObservable;
132133

134+
@Mock
135+
private AwsCredentialsSupplier awsCredentialsSupplier;
136+
133137
private KafkaSource kafkaSource;
134138
private TopicConsumerConfig jsonTopicConfig;
135139
private TopicConsumerConfig avroTopicConfig;
@@ -167,6 +171,7 @@ public void setup() {
167171
receivedRecords = new ArrayList<>();
168172
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
169173
pipelineDescription = mock(PipelineDescription.class);
174+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
170175
when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig);
171176
when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig);
172177
when(schemaConfig.getType()).thenReturn(SchemaRegistryType.CONFLUENT);
@@ -271,7 +276,8 @@ public void KafkaJsonProducerConsumerTestWithLatestSchemaVersion() {
271276

272277
public void consumeRecords(String servers, KafkaSourceConfig sourceConfig) {
273278
kafkaSource = new KafkaSource(
274-
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, null, pluginConfigObservable);
279+
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
280+
null, pluginConfigObservable, awsCredentialsSupplier);
275281
kafkaSource.start(buffer);
276282
}
277283

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.api.BeforeEach;
2121
import org.junit.jupiter.api.Test;
2222
import org.mockito.Mock;
23+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2324
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
2425
import org.opensearch.dataprepper.metrics.PluginMetrics;
2526
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -95,6 +96,9 @@ public class KafkaSourceJsonTypeIT {
9596
@Mock
9697
private PluginConfigObservable pluginConfigObservable;
9798

99+
@Mock
100+
private AwsCredentialsSupplier awsCredentialsSupplier;
101+
98102
private KafkaSource kafkaSource;
99103

100104
private Counter counter;
@@ -111,7 +115,8 @@ public class KafkaSourceJsonTypeIT {
111115
private byte[] headerValue2;
112116

113117
public KafkaSource createObjectUnderTest() {
114-
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
118+
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
119+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
115120
}
116121

117122
@BeforeEach
@@ -125,6 +130,7 @@ public void setup() throws Throwable {
125130
counter = mock(Counter.class);
126131
buffer = mock(Buffer.class);
127132
encryptionConfig = mock(EncryptionConfig.class);
133+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
128134
receivedRecords = new ArrayList<>();
129135
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
130136
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.junit.jupiter.api.BeforeEach;
1717
import org.junit.jupiter.api.Test;
1818
import org.mockito.Mock;
19+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
1920
import org.opensearch.dataprepper.metrics.PluginMetrics;
2021
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2122
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -88,6 +89,9 @@ public class KafkaSourceMultipleAuthTypeIT {
8889
@Mock
8990
private PluginConfigObservable pluginConfigObservable;
9091

92+
@Mock
93+
private AwsCredentialsSupplier awsCredentialsSupplier;
94+
9195
private TopicConfig jsonTopic;
9296
private TopicConfig avroTopic;
9397

@@ -106,7 +110,8 @@ public class KafkaSourceMultipleAuthTypeIT {
106110

107111
public KafkaSource createObjectUnderTest() {
108112
return new KafkaSource(
109-
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
113+
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
114+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
110115
}
111116

112117
@BeforeEach
@@ -119,6 +124,7 @@ public void setup() {
119124
receivedRecords = new ArrayList<>();
120125
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
121126
pipelineDescription = mock(PipelineDescription.class);
127+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
122128
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
123129
when(sourceConfig.getSchemaConfig()).thenReturn(null);
124130
when(pluginMetrics.counter(anyString())).thenReturn(counter);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceSaslPlainTextIT.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.api.extension.ExtendWith;
2121
import org.mockito.Mock;
2222
import org.mockito.junit.jupiter.MockitoExtension;
23+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2324
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
2425
import org.opensearch.dataprepper.metrics.PluginMetrics;
2526
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -103,6 +104,9 @@ public class KafkaSourceSaslPlainTextIT {
103104
@Mock
104105
private PluginConfigObservable pluginConfigObservable;
105106

107+
@Mock
108+
private AwsCredentialsSupplier awsCredentialsSupplier;
109+
106110
private KafkaSource kafkaSource;
107111

108112
private Counter counter;
@@ -115,7 +119,8 @@ public class KafkaSourceSaslPlainTextIT {
115119
private String testGroup;
116120

117121
public KafkaSource createObjectUnderTest() {
118-
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
122+
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
123+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
119124
}
120125

121126
@BeforeEach
@@ -136,6 +141,7 @@ public void setup() throws Throwable {
136141
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
137142
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
138143
pipelineDescription = mock(PipelineDescription.class);
144+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
139145
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
140146
when(sourceConfig.getSchemaConfig()).thenReturn(null);
141147
when(pluginMetrics.counter(anyString())).thenReturn(counter);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceSaslScramIT.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.api.extension.ExtendWith;
2121
import org.mockito.Mock;
2222
import org.mockito.junit.jupiter.MockitoExtension;
23+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2324
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
2425
import org.opensearch.dataprepper.metrics.PluginMetrics;
2526
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -103,6 +104,9 @@ public class KafkaSourceSaslScramIT {
103104
@Mock
104105
private PluginConfigObservable pluginConfigObservable;
105106

107+
@Mock
108+
private AwsCredentialsSupplier awsCredentialsSupplier;
109+
106110
private KafkaSource kafkaSource;
107111

108112
private Counter counter;
@@ -115,7 +119,8 @@ public class KafkaSourceSaslScramIT {
115119
private String testGroup;
116120

117121
public KafkaSource createObjectUnderTest() {
118-
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
122+
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
123+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
119124
}
120125

121126
@BeforeEach
@@ -138,6 +143,7 @@ public void setup() throws Throwable {
138143
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
139144
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
140145
pipelineDescription = mock(PipelineDescription.class);
146+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
141147
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
142148
when(sourceConfig.getSchemaConfig()).thenReturn(null);
143149
when(pluginMetrics.counter(anyString())).thenReturn(counter);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.junit.jupiter.api.BeforeEach;
2424
import org.junit.jupiter.api.Test;
2525
import org.mockito.Mock;
26+
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
27+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2628
import org.opensearch.dataprepper.metrics.PluginMetrics;
2729
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2830
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -41,6 +43,7 @@
4143
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
4244
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
4345
import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier;
46+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
4447

4548
import java.io.File;
4649
import java.io.IOException;
@@ -106,6 +109,12 @@ public class MskGlueRegistryMultiTypeIT {
106109
@Mock
107110
private PluginConfigObservable pluginConfigObservable;
108111

112+
@Mock
113+
private AwsCredentialsSupplier awsCredentialsSupplier;
114+
115+
@Mock
116+
private AwsCredentialsOptions awsCredentialsOptions;
117+
109118
private KafkaSource kafkaSource;
110119
private SourceTopicConfig jsonTopic;
111120
private SourceTopicConfig avroTopic;
@@ -132,7 +141,8 @@ public class MskGlueRegistryMultiTypeIT {
132141

133142
public KafkaSource createObjectUnderTest() {
134143
return new KafkaSource(
135-
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
144+
sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
145+
kafkaClusterConfigSupplier, pluginConfigObservable, awsCredentialsSupplier);
136146
}
137147

138148
@BeforeEach
@@ -149,6 +159,8 @@ public void setup() {
149159
receivedRecords = new ArrayList<>();
150160
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
151161
pipelineDescription = mock(PipelineDescription.class);
162+
awsCredentialsOptions = mock(AwsCredentialsOptions.class);
163+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
152164
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
153165
when(sourceConfig.getSchemaConfig()).thenReturn(schemaConfig);
154166
when(schemaConfig.getType()).thenReturn(SchemaRegistryType.AWS_GLUE);
@@ -197,6 +209,8 @@ public void setup() {
197209
encryptionConfig = mock(EncryptionConfig.class);
198210
when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
199211
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
212+
when(awsConfig.toCredentialsOptions()).thenReturn(awsCredentialsOptions);
213+
when(awsCredentialsSupplier.getProvider(awsCredentialsOptions)).thenReturn(DefaultCredentialsProvider.create());
200214
}
201215

202216
@Test
@@ -400,6 +414,7 @@ public void produceAvroRecords(String servers, String topic, int numRecords) thr
400414
properties.put(AWSSchemaRegistryConstants.AWS_REGION, awsConfig.getRegion());
401415
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, testRegistryName);
402416
properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, testAvroSchemaName);
417+
properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
403418

404419
Schema testSchema = null;
405420
Schema.Parser parser = new Schema.Parser();

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public List<KafkaCustomConsumer> createConsumersForTopic(final KafkaConsumerConf
9898
Deserializer<Object> keyDeserializer = (Deserializer<Object>) serializationFactory.getDeserializer(PlaintextKafkaDataConfig.plaintextDataConfig(dataConfig));
9999
Deserializer<Object> valueDeserializer = null;
100100
if(schema == MessageFormat.PLAINTEXT) {
101-
valueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(kafkaConsumerConfig);
101+
valueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(kafkaConsumerConfig, awsContext);
102102
}
103103
if(valueDeserializer == null) {
104104
valueDeserializer = (Deserializer<Object>) serializationFactory.getDeserializer(dataConfig);

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.common.errors.BrokerNotAvailableException;
2020
import org.apache.kafka.common.serialization.StringDeserializer;
2121
import org.apache.kafka.connect.json.JsonDeserializer;
22+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2223
import org.opensearch.dataprepper.metrics.PluginMetrics;
2324
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2425
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
@@ -30,6 +31,7 @@
3031
import org.opensearch.dataprepper.model.record.Record;
3132
import org.opensearch.dataprepper.model.source.Source;
3233
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
34+
import org.opensearch.dataprepper.plugins.kafka.common.aws.AwsContext;
3335
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
3436
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
3537
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
@@ -93,14 +95,16 @@ public class KafkaSource implements Source<Record<Event>> {
9395
private final List<ExecutorService> allTopicExecutorServices;
9496
private final List<KafkaCustomConsumer> allTopicConsumers;
9597
private final PluginConfigObservable pluginConfigObservable;
98+
private final AwsCredentialsSupplier awsCredentialsSupplier;
9699

97100
@DataPrepperPluginConstructor
98101
public KafkaSource(final KafkaSourceConfig sourceConfig,
99102
final PluginMetrics pluginMetrics,
100103
final AcknowledgementSetManager acknowledgementSetManager,
101104
final PipelineDescription pipelineDescription,
102105
final KafkaClusterConfigSupplier kafkaClusterConfigSupplier,
103-
final PluginConfigObservable pluginConfigObservable) {
106+
final PluginConfigObservable pluginConfigObservable,
107+
final AwsCredentialsSupplier awsCredentialsSupplier) {
104108
this.sourceConfig = sourceConfig;
105109
this.pluginMetrics = pluginMetrics;
106110
this.acknowledgementSetManager = acknowledgementSetManager;
@@ -110,6 +114,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
110114
this.allTopicExecutorServices = new ArrayList<>();
111115
this.allTopicConsumers = new ArrayList<>();
112116
this.pluginConfigObservable = pluginConfigObservable;
117+
this.awsCredentialsSupplier = awsCredentialsSupplier;
113118
this.updateConfig(kafkaClusterConfigSupplier);
114119
}
115120

@@ -186,7 +191,8 @@ public void start(Buffer<Record<Event>> buffer) {
186191
return new KafkaConsumer<String, GenericRecord>(consumerProperties);
187192
case PLAINTEXT:
188193
default:
189-
glueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(sourceConfig);
194+
final AwsContext awsContext = new AwsContext(sourceConfig, awsCredentialsSupplier);
195+
glueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(sourceConfig, awsContext);
190196
if (Objects.nonNull(glueDeserializer)) {
191197
return new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer);
192198
} else {

0 commit comments

Comments
 (0)