Description
Bug description
It seems that when using the KafkaItemReader
, when a job completes can be indeterministic. We have seen instances where the job will spin up, process records off the topic like it is supposed to and then shut down without issue. In other instances once all available records have been processed, it seems as if the job waits for new messages to come into the topic for an unpredictable amount of time, sometimes even exceeding the completion deadline of the job. It would be nice if there was a way to force job completion once all of the messages present at the start of the job on the topic are processed.
Environment
Spring Batch version: 3.0.5
(spring-boot-starter-batch
version)
Java version: 17.0.6
Steps to reproduce
Use the below configuration and run the job back to back on an active Kafka topic.
Expected behavior
The job would process all available records to a point and then shut down.
Minimal Complete Reproducible example
My job and step config:
@Configuration
public class BatchJobConfig {
@Bean
KafkaItemReader<String, MyEvent> kafkaItemReader(@Qualifier("kafkaProperties") Properties props) {
KafkaItemReader<String, MyEvent> reader = new KafkaItemReaderBuilder<String, MyEvent>()
.name("my-event-reader")
.consumerProperties(props)
.partitions(1)
.saveState(true)
.topic("my.topic")
.build();
reader.setPartitionOffsets(new HashMap<>());
return reader;
}
@Bean
Step step1(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
KafkaItemReader<String, MyEvent> itemReader,
ItemProcessor<MyEvent, List<MyFileItem>> processor,
ItemWriter<List<MyFileItem>> writer) {
return new StepBuilder("step1", jobRepository)
.<MyEvent, List<MyFileItem>> chunk(100, transactionManager)
.reader(itemReader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job processMysJob(JobRepository jobRepository,
Step step1,
MyJobExecutionListener listener) {
return new JobBuilder("MyJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
}
My Kafka properties config:
@Configuration
public class KafkaListenerConfigBuilder {
@Value("${spring.kafka.consumer.bootstrap.servers}")
private String bootstrapServers;
@Value("${spring.kafka.properties.group.id}")
private String groupId;
@Value("${spring.kafka.properties.ssl.keystore.location}")
private String keystoreLocation;
@Value("${spring.kafka.properties.ssl.keystore.password}")
private String keystorePassword;
@Value("${spring.kafka.properties.ssl.truststore.location}")
private String truststoreLocation;
@Value("${spring.kafka.properties.ssl.truststore.password}")
private String truststorePassword;
@Bean("kafkaProperties")
public Properties createConsumerConfig() {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
prop.put("security.protocol", "SSL");
prop.put("ssl.keystore.location", this.keystoreLocation);
prop.put("ssl.keystore.password", this.keystorePassword);
prop.put("ssl.truststore.location", this.truststoreLocation);
prop.put("ssl.truststore.password", this.truststorePassword);
prop.put("spring.json.trusted.packages", "*");
return prop;
}
}