@@ -53,6 +53,7 @@ public class AsyncRecordPipeLoop
5353 private Integer minRecordsToConsume = 10 ;
5454 private String forcedJobUUID ;
5555
56+ private static boolean doFinalFlushInSupplierThread = System .getProperty ("qqq.AsyncRecordPipeLoop.doFinalFlushInSupplierThread" , "true" ).equalsIgnoreCase ("true" );
5657
5758
5859 /*******************************************************************************
@@ -81,7 +82,20 @@ public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, Unsaf
8182 asyncJobManager .setForcedJobUUID (getForcedJobUUID ());
8283 }
8384
84- String jobUUID = asyncJobManager .startJob (jobName , supplier ::apply );
85+ String jobUUID = asyncJobManager .startJob (jobName , (callback ) ->
86+ {
87+ Serializable output = supplier .apply (callback );
88+
89+ if (doFinalFlushInSupplierThread )
90+ {
91+ if (recordPipe instanceof BufferedRecordPipe bufferedRecordPipe )
92+ {
93+ bufferedRecordPipe .finalFlush ();
94+ }
95+ }
96+
97+ return (output );
98+ });
8599 LOG .debug ("Started supplier job [" + jobUUID + "] for record pipe." );
86100
87101 AsyncJobState jobState = AsyncJobState .RUNNING ;
@@ -162,9 +176,12 @@ public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, Unsaf
162176 jobState = asyncJobStatus .getState ();
163177 }
164178
165- if (recordPipe instanceof BufferedRecordPipe bufferedRecordPipe )
179+ if (! doFinalFlushInSupplierThread )
166180 {
167- bufferedRecordPipe .finalFlush ();
181+ if (recordPipe instanceof BufferedRecordPipe bufferedRecordPipe )
182+ {
183+ bufferedRecordPipe .finalFlush ();
184+ }
168185 }
169186
170187 LOG .debug ("Job [" + jobUUID + "][" + jobName + "] completed with status: " + asyncJobStatus );
0 commit comments