Skip to content

Commit a56ca25

Browse files
committed
re-introduce the fail fast mechanic with fixes for documented bugs.
1 parent 56f1aa1 commit a56ca25

File tree

4 files changed

+252
-4
lines changed

4 files changed

+252
-4
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,9 @@ public abstract static class Read<K, V>
744744
@Pure
745745
public abstract long getConsumerPollingTimeout();
746746

747+
@Pure
748+
public abstract @Nullable Boolean getLogTopicVerification();
749+
747750
abstract Builder<K, V> toBuilder();
748751

749752
@AutoValue.Builder
@@ -810,6 +813,8 @@ Builder<K, V> setCheckStopReadingFn(
810813

811814
abstract Builder<K, V> setConsumerPollingTimeout(long consumerPollingTimeout);
812815

816+
abstract Builder<K, V> setLogTopicVerification(@Nullable Boolean logTopicVerification);
817+
813818
abstract Read<K, V> build();
814819

815820
static <K, V> void setupExternalBuilder(
@@ -1483,6 +1488,10 @@ public Read<K, V> withGCPApplicationDefaultCredentials() {
14831488
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"));
14841489
}
14851490

1491+
public Read<K, V> withTopicVerificationLogging(boolean logTopicVerification) {
1492+
return toBuilder().setLogTopicVerification(logTopicVerification).build();
1493+
}
1494+
14861495
/** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
14871496
public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
14881497
return new TypedWithoutMetadata<>(this);
@@ -1910,7 +1919,15 @@ public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
19101919
}
19111920
} else {
19121921
for (String topic : topics) {
1913-
for (PartitionInfo p : consumer.partitionsFor(topic)) {
1922+
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
1923+
checkState(
1924+
partitionInfoList != null && !partitionInfoList.isEmpty(),
1925+
"Could not find any partitions info for topic "
1926+
+ topic
1927+
+ ". Please check Kafka configuration and make sure "
1928+
+ "that provided topics exist.");
1929+
1930+
for (PartitionInfo p : partitionInfoList) {
19141931
partitions.add(new TopicPartition(p.topic(), p.partition()));
19151932
}
19161933
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.Comparator;
26+
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Set;
2830
import java.util.regex.Pattern;
31+
import java.util.stream.Collectors;
2932
import org.apache.beam.sdk.coders.Coder;
3033
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
3134
import org.apache.beam.sdk.io.UnboundedSource;
@@ -37,6 +40,7 @@
3740
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
3841
import org.apache.kafka.clients.consumer.Consumer;
3942
import org.apache.kafka.clients.consumer.ConsumerConfig;
43+
import org.apache.kafka.common.KafkaException;
4044
import org.apache.kafka.common.PartitionInfo;
4145
import org.apache.kafka.common.TopicPartition;
4246
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -89,7 +93,7 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
8993
for (String topic : topics) {
9094
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
9195
checkState(
92-
partitionInfoList != null,
96+
partitionInfoList != null && !partitionInfoList.isEmpty(),
9397
"Could not find any partitions info. Please check Kafka configuration and make sure "
9498
+ "that provided topics exist.");
9599
for (PartitionInfo p : partitionInfoList) {
@@ -100,8 +104,52 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
100104
}
101105
}
102106
} else {
107+
final Map<String, List<Integer>> topicsAndPartitions = new HashMap<>();
103108
for (TopicPartition p : partitions) {
104-
Lineage.getSources().add("kafka", ImmutableList.of(bootStrapServers, p.topic()));
109+
topicsAndPartitions.computeIfAbsent(p.topic(), k -> new ArrayList<>()).add(p.partition());
110+
}
111+
112+
try (Consumer<?, ?> consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
113+
for (Map.Entry<String, List<Integer>> e : topicsAndPartitions.entrySet()) {
114+
final String providedTopic = e.getKey();
115+
final List<Integer> providedPartitions = e.getValue();
116+
final Set<Integer> partitionsForTopic;
117+
try {
118+
partitionsForTopic =
119+
consumer.partitionsFor(providedTopic).stream()
120+
.map(PartitionInfo::partition)
121+
.collect(Collectors.toSet());
122+
if (spec.getLogTopicVerification() == null || !spec.getLogTopicVerification()){
123+
for (Integer p : providedPartitions) {
124+
checkState(
125+
partitionsForTopic.contains(p),
126+
"Partition "
127+
+ p
128+
+ " does not exist for topic "
129+
+ providedTopic
130+
+ ". Please check Kafka configuration.");
131+
}
132+
} else {
133+
for (Integer p : providedPartitions) {
134+
if (!partitionsForTopic.contains(p)) {
135+
LOG.warn("Partition {} does not exist for topic {}. Please check Kafka configuration.",
136+
p,
137+
providedTopic);
138+
}
139+
}
140+
}
141+
} catch (KafkaException exception) {
142+
LOG.warn("Unable to access cluster. Skipping fail fast checks.");
143+
}
144+
Lineage.getSources().add("kafka", ImmutableList.of(bootStrapServers, providedTopic));
145+
}
146+
} catch (KafkaException exception) {
147+
LOG.warn(
148+
"WARN: Failed to connect to kafka for running pre-submit validation of kafka "
149+
+ "topic and partition configuration. This may be due to local permissions or "
150+
+ "connectivity to the kafka bootstrap server, or due to misconfiguration of "
151+
+ "KafkaIO. This validation is not required, and this warning may be ignored "
152+
+ "if the Beam job runs successfully.");
105153
}
106154
}
107155

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.kafka;
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2122

2223
import java.util.ArrayList;
2324
import java.util.List;
@@ -189,7 +190,14 @@ static List<TopicPartition> getAllTopicPartitions(
189190
kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
190191
if (topics != null && !topics.isEmpty()) {
191192
for (String topic : topics) {
192-
for (PartitionInfo partition : kafkaConsumer.partitionsFor(topic)) {
193+
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
194+
checkState(
195+
partitionInfoList != null && !partitionInfoList.isEmpty(),
196+
"Could not find any partitions info for topic "
197+
+ topic
198+
+ ". Please check Kafka configuration and make sure "
199+
+ "that provided topics exist.");
200+
for (PartitionInfo partition : partitionInfoList) {
193201
current.add(new TopicPartition(topic, partition.partition()));
194202
}
195203
}

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.function.BiFunction;
3939
import java.util.stream.Collectors;
4040
import java.util.stream.LongStream;
41+
import org.apache.beam.sdk.Pipeline;
4142
import org.apache.beam.sdk.PipelineResult;
4243
import org.apache.beam.sdk.coders.ByteArrayCoder;
4344
import org.apache.beam.sdk.coders.NullableCoder;
@@ -90,13 +91,15 @@
9091
import org.apache.beam.sdk.values.Row;
9192
import org.apache.beam.sdk.values.TypeDescriptors;
9293
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
94+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
9395
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
9496
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
9597
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
9698
import org.apache.kafka.clients.admin.AdminClient;
9799
import org.apache.kafka.clients.admin.NewPartitions;
98100
import org.apache.kafka.clients.admin.NewTopic;
99101
import org.apache.kafka.clients.producer.ProducerConfig;
102+
import org.apache.kafka.common.KafkaException;
100103
import org.apache.kafka.common.TopicPartition;
101104
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
102105
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -114,6 +117,7 @@
114117
import org.junit.Ignore;
115118
import org.junit.Rule;
116119
import org.junit.Test;
120+
import org.junit.rules.ExpectedException;
117121
import org.junit.runner.RunWith;
118122
import org.junit.runners.JUnit4;
119123
import org.slf4j.Logger;
@@ -171,6 +175,8 @@ public class KafkaIOIT {
171175

172176
@Rule public TestPipeline readPipeline = TestPipeline.create();
173177

178+
@Rule public ExpectedException thrown = ExpectedException.none();
179+
174180
private static ExperimentalOptions sdfPipelineOptions;
175181

176182
static {
@@ -215,6 +221,175 @@ public static void afterClass() {
215221
}
216222
}
217223

224+
@Test
225+
public void testKafkaIOFailsFastWithInvalidPartitions() throws IOException {
226+
thrown.expect(Pipeline.PipelineExecutionException.class);
227+
thrown.expectMessage(
228+
"Partition 1000 does not exist for topic "
229+
+ options.getKafkaTopic()
230+
+ ". Please check Kafka configuration.");
231+
232+
// Use streaming pipeline to read Kafka records.
233+
readPipeline.getOptions().as(Options.class).setStreaming(true);
234+
TopicPartition invalidPartition = new TopicPartition(options.getKafkaTopic(), 1000);
235+
readPipeline.apply(
236+
"Read from unbounded Kafka",
237+
readFromKafka().withTopicPartitions(ImmutableList.of(invalidPartition)));
238+
239+
PipelineResult readResult = readPipeline.run();
240+
PipelineResult.State readState =
241+
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
242+
243+
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
244+
tearDownTopic(options.getKafkaTopic());
245+
cancelIfTimeouted(readResult, readState);
246+
}
247+
248+
@Test
249+
public void testKafkaIOFailsFastWithInvalidPartitionsAndFlagExplicitlySet() throws IOException {
250+
thrown.expect(Pipeline.PipelineExecutionException.class);
251+
thrown.expectMessage(
252+
"Partition 1000 does not exist for topic "
253+
+ options.getKafkaTopic()
254+
+ ". Please check Kafka configuration.");
255+
256+
// Use streaming pipeline to read Kafka records.
257+
readPipeline.getOptions().as(Options.class).setStreaming(true);
258+
TopicPartition invalidPartition = new TopicPartition(options.getKafkaTopic(), 1000);
259+
readPipeline.apply(
260+
"Read from unbounded Kafka",
261+
readFromKafka().withTopicPartitions(ImmutableList.of(invalidPartition)).withTopicVerificationLogging(false));
262+
263+
PipelineResult readResult = readPipeline.run();
264+
PipelineResult.State readState =
265+
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
266+
267+
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
268+
tearDownTopic(options.getKafkaTopic());
269+
cancelIfTimeouted(readResult, readState);
270+
}
271+
272+
@Test
273+
public void testKafkaIODoesNotFailFastWithInvalidPartitionsAndFlagExplicitlyNotSet()
274+
throws IOException {
275+
276+
// Expect a different error which is thrown at runtime. This is because we disable the failfast
277+
// error
278+
// by setting the logging flag to True.
279+
thrown.expect(java.lang.RuntimeException.class);
280+
281+
// Use streaming pipeline to read Kafka records.
282+
readPipeline.getOptions().as(Options.class).setStreaming(true);
283+
TopicPartition invalidPartition = new TopicPartition(options.getKafkaTopic(), 1000);
284+
readPipeline.apply(
285+
"Read from unbounded Kafka",
286+
readFromKafka()
287+
.withTopicPartitions(ImmutableList.of(invalidPartition))
288+
.withTopicVerificationLogging(true));
289+
290+
PipelineResult readResult = readPipeline.run();
291+
PipelineResult.State readState =
292+
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
293+
294+
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
295+
tearDownTopic(options.getKafkaTopic());
296+
cancelIfTimeouted(readResult, readState);
297+
}
298+
299+
@Test
300+
public void testKafkaIOFailsFastWithInvalidTopics() throws IOException {
301+
// This test will fail on versions before 2.3.0 due to the non-existence of the
302+
// allow.auto.create.topics
303+
// flag. This can be removed when/if support for this older version is dropped.
304+
String actualVer = AppInfoParser.getVersion();
305+
assumeFalse(actualVer.compareTo("2.0.0") >= 0 && actualVer.compareTo("2.3.0") < 0);
306+
307+
thrown.expect(Pipeline.PipelineExecutionException.class);
308+
thrown.expectMessage(
309+
"Could not find any partitions info for topic invalid_topic. Please check Kafka configuration"
310+
+ " and make sure that provided topics exist.");
311+
312+
// Use streaming pipeline to read Kafka records.
313+
sdfReadPipeline.getOptions().as(Options.class).setStreaming(true);
314+
String invalidTopic = "invalid_topic";
315+
sdfReadPipeline.apply(
316+
"Read from unbounded Kafka",
317+
KafkaIO.<byte[], byte[]>read()
318+
.withConsumerConfigUpdates(ImmutableMap.of("allow.auto.create.topics", "false"))
319+
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
320+
.withTopics(ImmutableList.of(invalidTopic))
321+
.withKeyDeserializer(ByteArrayDeserializer.class)
322+
.withValueDeserializer(ByteArrayDeserializer.class));
323+
324+
PipelineResult readResult = sdfReadPipeline.run();
325+
PipelineResult.State readState =
326+
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
327+
328+
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
329+
tearDownTopic(options.getKafkaTopic());
330+
cancelIfTimeouted(readResult, readState);
331+
}
332+
333+
@Test
334+
public void testKafkaIOFailsFastWithInvalidTopicsAndDynamicRead() throws IOException {
335+
// This test will fail on versions before 2.3.0 due to the non-existence of the
336+
// allow.auto.create.topics
337+
// flag. This can be removed when/if support for this older version is dropped.
338+
String actualVer = AppInfoParser.getVersion();
339+
assumeFalse(actualVer.compareTo("2.0.0") >= 0 && actualVer.compareTo("2.3.0") < 0);
340+
341+
thrown.expect(Pipeline.PipelineExecutionException.class);
342+
thrown.expectMessage(
343+
"Could not find any partitions info for topic invalid_topic. Please check Kafka configuration"
344+
+ " and make sure that provided topics exist.");
345+
346+
// Use streaming pipeline to read Kafka records.
347+
sdfReadPipeline.getOptions().as(Options.class).setStreaming(true);
348+
String invalidTopic = "invalid_topic";
349+
sdfReadPipeline.apply(
350+
"Read from unbounded Kafka",
351+
KafkaIO.<byte[], byte[]>read()
352+
.withConsumerConfigUpdates(ImmutableMap.of("allow.auto.create.topics", "false"))
353+
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
354+
.withTopics(ImmutableList.of(invalidTopic))
355+
.withDynamicRead(Duration.standardSeconds(5))
356+
.withKeyDeserializer(ByteArrayDeserializer.class)
357+
.withValueDeserializer(ByteArrayDeserializer.class));
358+
359+
PipelineResult readResult = sdfReadPipeline.run();
360+
PipelineResult.State readState =
361+
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
362+
363+
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
364+
tearDownTopic(options.getKafkaTopic());
365+
cancelIfTimeouted(readResult, readState);
366+
}
367+
368+
@Test
369+
public void testKafkaIODoesNotErrorAtValidationWithBadBootstrapServer() throws IOException {
370+
// expect an error during execution that the bootstrap server is bad, not during validation
371+
// steps in
372+
// KafakUnboundedSource.
373+
thrown.expect(KafkaException.class);
374+
// Use streaming pipeline to read Kafka records.
375+
readPipeline.getOptions().as(Options.class).setStreaming(true);
376+
TopicPartition invalidPartition = new TopicPartition(options.getKafkaTopic(), 1000);
377+
readPipeline.apply(
378+
"Read from unbounded Kafka",
379+
KafkaIO.readBytes()
380+
.withBootstrapServers("bootstrap.invalid-name.fake-region.bad-project:invalid-port")
381+
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
382+
.withTopicPartitions(ImmutableList.of(invalidPartition)));
383+
384+
PipelineResult readResult = readPipeline.run();
385+
PipelineResult.State readState =
386+
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
387+
388+
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
389+
tearDownTopic(options.getKafkaTopic());
390+
cancelIfTimeouted(readResult, readState);
391+
}
392+
218393
@Test
219394
public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
220395
// Use batch pipeline to write records.

0 commit comments

Comments
 (0)