Skip to content

Commit ada0204

Browse files
committed
FIX: kafka IT
Signed-off-by: George Chen <[email protected]>
1 parent 966fd78 commit ada0204

File tree

7 files changed

+9
-0
lines changed

7 files changed

+9
-0
lines changed

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void setup() {
107107
receivedRecords = new ArrayList<>();
108108
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
109109
pipelineDescription = mock(PipelineDescription.class);
110+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
110111
when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig);
111112
when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig);
112113
when(sourceConfig.getAuthConfig()).thenReturn(authConfig);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/ConfluentKafkaProducerConsumerWithSchemaRegistryIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ public void setup() {
171171
receivedRecords = new ArrayList<>();
172172
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
173173
pipelineDescription = mock(PipelineDescription.class);
174+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
174175
when(authConfig.getSaslAuthConfig()).thenReturn(saslAuthConfig);
175176
when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(plainTextAuthConfig);
176177
when(schemaConfig.getType()).thenReturn(SchemaRegistryType.CONFLUENT);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void setup() throws Throwable {
130130
counter = mock(Counter.class);
131131
buffer = mock(Buffer.class);
132132
encryptionConfig = mock(EncryptionConfig.class);
133+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
133134
receivedRecords = new ArrayList<>();
134135
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
135136
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public void setup() {
124124
receivedRecords = new ArrayList<>();
125125
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
126126
pipelineDescription = mock(PipelineDescription.class);
127+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
127128
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
128129
when(sourceConfig.getSchemaConfig()).thenReturn(null);
129130
when(pluginMetrics.counter(anyString())).thenReturn(counter);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceSaslPlainTextIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public void setup() throws Throwable {
141141
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
142142
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
143143
pipelineDescription = mock(PipelineDescription.class);
144+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
144145
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
145146
when(sourceConfig.getSchemaConfig()).thenReturn(null);
146147
when(pluginMetrics.counter(anyString())).thenReturn(counter);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceSaslScramIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public void setup() throws Throwable {
143143
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
144144
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
145145
pipelineDescription = mock(PipelineDescription.class);
146+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
146147
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
147148
when(sourceConfig.getSchemaConfig()).thenReturn(null);
148149
when(pluginMetrics.counter(anyString())).thenReturn(counter);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java

+3
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ public void setup() {
159159
receivedRecords = new ArrayList<>();
160160
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
161161
pipelineDescription = mock(PipelineDescription.class);
162+
awsCredentialsOptions = mock(AwsCredentialsOptions.class);
163+
awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
162164
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
163165
when(sourceConfig.getSchemaConfig()).thenReturn(schemaConfig);
164166
when(schemaConfig.getType()).thenReturn(SchemaRegistryType.AWS_GLUE);
@@ -412,6 +414,7 @@ public void produceAvroRecords(String servers, String topic, int numRecords) thr
412414
properties.put(AWSSchemaRegistryConstants.AWS_REGION, awsConfig.getRegion());
413415
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, testRegistryName);
414416
properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, testAvroSchemaName);
417+
properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
415418

416419
Schema testSchema = null;
417420
Schema.Parser parser = new Schema.Parser();

0 commit comments

Comments
 (0)