Skip to content

Commit f4a244a

Browse files
committed
Testing Edge Case
1 parent 9635be3 commit f4a244a

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
132132
}
133133
Lineage.getSources().add("kafka", ImmutableList.of(bootStrapServers, providedTopic));
134134
}
135+
} catch (KafkaException exception) {
136+
LOG.warn(
137+
"WARN: Failed to connect to kafka for running pre-submit validation of kafka "
138+
+ "topic and partition configuration. This may be due to local permissions or "
139+
+ "connectivity to the kafka bootstrap server, or due to misconfiguration of "
140+
+ "KafkaIO. This validation is not required, and this warning may be ignored "
141+
+ "if the Beam job runs successfully.");
135142
}
136143
}
137144

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.apache.kafka.clients.admin.NewPartitions;
9898
import org.apache.kafka.clients.admin.NewTopic;
9999
import org.apache.kafka.clients.producer.ProducerConfig;
100+
import org.apache.kafka.common.KafkaException;
100101
import org.apache.kafka.common.TopicPartition;
101102
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
102103
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -274,6 +275,31 @@ public void testKafkaIOFailsFastWithInvalidTopics() throws IOException {
274275
cancelIfTimeouted(readResult, readState);
275276
}
276277

278+
@Test
279+
public void testKafkaIODoesNotErrorAtValidationWithBadBootstrapServer() throws IOException {
280+
// expect an error during execution that the bootstrap server is bad, not during validation steps in
281+
// KafakUnboundedSource.
282+
thrown.expect(KafkaException.class);
283+
// Use streaming pipeline to read Kafka records.
284+
readPipeline.getOptions().as(Options.class).setStreaming(true);
285+
TopicPartition invalidPartition = new TopicPartition(options.getKafkaTopic(), 1000);
286+
readPipeline.apply(
287+
"Read from unbounded Kafka",
288+
KafkaIO.readBytes()
289+
.withBootstrapServers(
290+
"bootstrap.invalid-name.fake-region.bad-project:invalid-port")
291+
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
292+
.withTopicPartitions(ImmutableList.of(invalidPartition)));
293+
294+
PipelineResult readResult = readPipeline.run();
295+
PipelineResult.State readState =
296+
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
297+
298+
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
299+
tearDownTopic(options.getKafkaTopic());
300+
cancelIfTimeouted(readResult, readState);
301+
}
302+
277303
@Test
278304
public void testKafkaIOFailsFastWithInvalidTopicsAndDynamicRead() throws IOException {
279305
// This test will fail on versions before 2.3.0 due to the non-existence of the

0 commit comments

Comments
 (0)