Skip to content

Commit 2de325d

Browse files
xeounxzxufmbenhassine
authored andcommitted
Fix Update job information only when the transaction in ChunkOrientedStep is successful
Signed-off-by: xeounxzxu <[email protected]>
1 parent d5fbb54 commit 2de325d

File tree

2 files changed

+56
-11
lines changed

2 files changed

+56
-11
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.commons.logging.LogFactory;
2525
import org.jspecify.annotations.Nullable;
2626

27-
import org.springframework.batch.core.ExitStatus;
2827
import org.springframework.batch.core.job.JobInterruptedException;
2928
import org.springframework.batch.core.listener.ChunkListener;
3029
import org.springframework.batch.core.listener.CompositeChunkListener;
@@ -89,6 +88,7 @@
8988
* @param <O> type of output items
9089
* @author Mahmoud Ben Hassine
9190
* @author Andrey Litvitski
91+
* @author xeounxzxu
9292
* @since 6.0
9393
*/
9494
public class ChunkOrientedStep<I, O> extends AbstractStep {
@@ -376,6 +376,8 @@ protected void doExecute(StepExecution stepExecution) throws Exception {
376376
chunkTransactionEvent.commit();
377377
});
378378

379+
this.compositeItemStream.update(stepExecution.getExecutionContext());
380+
getJobRepository().updateExecutionContext(stepExecution);
379381
getJobRepository().update(stepExecution);
380382
}
381383
}
@@ -427,10 +429,8 @@ private void processChunkConcurrently(TransactionStatus status, StepContribution
427429
throw new FatalStepExecutionException("Unable to process chunk", e);
428430
}
429431
finally {
430-
// apply contribution and update streams
432+
// apply contribution
431433
stepExecution.apply(contribution);
432-
this.compositeItemStream.update(stepExecution.getExecutionContext());
433-
getJobRepository().updateExecutionContext(stepExecution);
434434
}
435435

436436
}
@@ -458,10 +458,8 @@ private void processChunkSequentially(TransactionStatus status, StepContribution
458458
throw new FatalStepExecutionException("Unable to process chunk", e);
459459
}
460460
finally {
461-
// apply contribution and update streams
461+
// apply contribution
462462
stepExecution.apply(contribution);
463-
compositeItemStream.update(stepExecution.getExecutionContext());
464-
getJobRepository().updateExecutionContext(stepExecution);
465463
}
466464
}
467465

@@ -779,4 +777,4 @@ boolean moreItems() {
779777

780778
}
781779

782-
}
780+
}

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@
3333
import org.springframework.batch.core.step.builder.StepBuilder;
3434
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
3535
import org.springframework.batch.core.step.skip.NonSkippableProcessException;
36-
import org.springframework.batch.infrastructure.item.ItemProcessor;
37-
import org.springframework.batch.infrastructure.item.ItemReader;
38-
import org.springframework.batch.infrastructure.item.ItemWriter;
36+
import org.springframework.batch.infrastructure.item.*;
3937
import org.springframework.batch.infrastructure.item.support.ListItemReader;
4038
import org.springframework.batch.infrastructure.item.support.ListItemWriter;
4139
import org.springframework.batch.infrastructure.support.transaction.ResourcelessTransactionManager;
@@ -52,6 +50,7 @@
5250
/**
5351
* @author Mahmoud Ben Hassine
5452
* @author Andrey Litvitski
53+
* @author xeounxzxu
5554
*/
5655
public class ChunkOrientedStepTests {
5756

@@ -319,4 +318,52 @@ class SkippableException extends RuntimeException {
319318
assertEquals(1, stepExecution.getSkipCount());
320319
}
321320

321+
@Test
322+
void testItemStreamUpdateStillOccursWhenChunkRollsBack_bugReproduction() throws Exception {
323+
// given: tracking stream to capture update invocations
324+
TrackingItemStream trackingItemStream = new TrackingItemStream();
325+
ItemReader<String> reader = new ListItemReader<>(List.of("item1"));
326+
ItemWriter<String> writer = chunk -> {
327+
throw new RuntimeException("Simulated failure");
328+
};
329+
JobRepository jobRepository = new ResourcelessJobRepository();
330+
ChunkOrientedStep<String, String> step = new ChunkOrientedStep<>("step", 1, reader, writer, jobRepository);
331+
step.registerItemStream(trackingItemStream);
332+
step.afterPropertiesSet();
333+
JobInstance jobInstance = new JobInstance(1L, "job");
334+
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
335+
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);
336+
337+
// when: execute step (writer causes chunk rollback)
338+
step.execute(stepExecution);
339+
340+
// then: due to current bug the stream update count becomes 1 although chunk
341+
// rolled back
342+
assertEquals(0, trackingItemStream.getUpdateCount(),
343+
"ItemStream should not be updated when chunk transaction fails (bug reproduction)");
344+
}
345+
346+
private static final class TrackingItemStream implements ItemStream {
347+
348+
private int updateCount;
349+
350+
@Override
351+
public void open(ExecutionContext executionContext) {
352+
}
353+
354+
@Override
355+
public void update(ExecutionContext executionContext) {
356+
this.updateCount++;
357+
}
358+
359+
@Override
360+
public void close() {
361+
}
362+
363+
int getUpdateCount() {
364+
return this.updateCount;
365+
}
366+
367+
}
368+
322369
}

0 commit comments

Comments
 (0)