Skip to content

When writer throw non skippable exception(i.e. StringIndexOutOfBoundsException) then processor going infinite loop #4536

Open
@punitsingh2

Description

@punitsingh2

When writer throw non skippable exception(i.e. StringIndexOutOfBoundsException) then processor going infinite loop

@Bean
 @JobScope
  public Step getKeysFromDB(@Value("#{jobParameters[name]}") String name) {
   
      SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
      simpleAsyncTaskExecutor.setThreadNamePrefix("life-ins-core-thread-" + "data-");
      simpleAsyncTaskExecutor.setTaskDecorator(new Slf4JTaskDecorator());
      
      return new StepBuilder("", jobRepository)
              .partitioner(STEP_GET_KEYS_FROM_SOURCE_DB, rangePartitioner)
                  .partitionHandler(null).step(processData())
                  .gridSize(1)
                  .taskExecutor(simpleAsyncTaskExecutor)
                          .build();
  }
  
  
  private Step processData() {
      
      SimpleStepBuilder simpleStepBuilder = new StepBuilder("slaveStep", jobRepository)
              .chunk(1, transactionManager);
      
      simpleStepBuilder.listener(preProcessingWriter);
      simpleStepBuilder.listener(postProcessingWriter);
      simpleStepBuilder.listener(batchDetailItemWriter);
      simpleStepBuilder.listener(postProcessingRWriter);
      
      simpleStepBuilder
                      .reader(sourceDbKeysReader)
                      .processor(stagingToBatchDetailProcessor)
                      .writer(sequenceWriterNeeded())
                      .listener(coreItemProcessorListener)
                      .listener(coreItemWriterListener)
                      .listener(new CoreChunkListener ())
                      .listener(coreStepExecutionListener);
                  
                      return simpleStepBuilder.faultTolerant()
                          .retryLimit(2)
                          .retry(BatchSystemException.class)
                          .skipLimit(2)
                          .skip(SkippableException.class)
                          .build();
  }
  
  private CompositeItemWriter sequenceWriterNeeded() {
      
      CompositeItemWriter compositeWriter = new CompositeItemWriter();
      
      List<ItemWriter> listWriter = new ArrayList<>();
      listWriter.add(preProcessingWriter);
      listWriter.add(batchDetailItemWriter);
      listWriter.add(postProcessingWriter);
      listWriter.add(postProcessingRWriter);
              compositeWriter.setDelegates(listWriter);
      return compositeWriter;
      
  }

This is processor:

@Service
@StepScope
public class StagingToBatchDetailProcessor implements ItemProcessor<HashMap<String, Object>, HashMap<String, Object>>{
    
    private static final Logger logger = LoggerFactory.getLogger(StagingToBatchDetailProcessor.class);
    private ApplicationContext applicationContext;
    public StagingToBatchDetailProcessor(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }
    @Override
    public HashMap<String, Object> process(HashMap<String, Object> item) throws Exception {
        logger.info("inside StagingToBatchDetailProcessor::process");
        item.put("last_output", "item2");
         return item;
    }

}

if i see the code of FaultTolerantChunkProcessor only retryCallback is happening its not going in recoveryCallback.

try {
                    batchRetryTemplate.execute(retryCallback, recoveryCallback,
                            new DefaultRetryState(inputs, rollbackClassifier));
                }
                catch (Exception e) {
                    RetryContext context = contextHolder.get();
                    if (!batchRetryTemplate.canRetry(context)) {
                        /*
                         * BATCH-1761: we need advance warning of the scan about to start in
                         * the next transaction, so we can change the processing behaviour.
                         */
                        data.scanning(true);
                    }
                    throw e;
                }

Spring boot 3.1.6 and java 17

Steps to reproduce
its happening after migration from spring boot 2.7.13 to 3.16

Expected behavior
if any non-skippable error thrown from Item writer it should go in recoveryCallback(i.e. exausted) instead retryCall back. Here from StepContextRepeatCallback its should go to FaultTolerantChunkProcessor but its going to stagingToBatchDetailProcessor then scan is happening which is causing infinite loop.

While migration only change that we have made we used @JobScope only for late binding only we made the change.
When i debug i found that retryContextCache.get(key) always return null but i can value is there seems something wrong with hascode. this condition always gets satisfied while (canRetry(retryPolicy, context) && !context.isExhaustedOnly())

This infinite loop getting resolved only when doing faultTolerant().processorNonTransactional() or when returning new object from processor as below :

@Override
    public HashMap<String, Object> process(HashMap<String, Object> item) throws Exception {
        logger.info("inside StagingToBatchDetailProcessor::process");
        HashMap<String, Object> obj = new HashMap<String, Object>();
        obj.put("final_outpot", item.get("final_output"));
        
        item.put("last_output", "item2");
         return obj;
    }

I have checked every thing but don't see any issue, Just wondering there two solution what makes difference processorNonTransactional or retuning new Object from processor. I don't want to go with both solution as how it will impact on without transaction and with new object may leads to out of memory issue. As data is related to payment need to processed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions