Skip to content

Commit 0ce4d32

Browse files
committed
fix(pipes): do final flush of buffered recod pipes inside the supplier thread - not back on the consumer thread
test to demonstrate the failure, and another the fix. feature flag in case the fix causes some unexpected harm, and one needs to use the old mode - but science tells us - the old mode couldn't have possibly been right. and note - this became exposed with a recent change to BufferedRecordPipe, to override the addRecords method - without that override, the buffer wasn't being used for bulk-additions, so the final-flush was effectively a noop in cases where the supplier was bulk-adding records to the pipe (which is the more common scenario).
1 parent 3c9581f commit 0ce4d32

File tree

3 files changed

+213
-8
lines changed

3 files changed

+213
-8
lines changed

qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,19 @@
3636

3737

3838
/*******************************************************************************
39-
** Class that knows how to Run an asynchronous job (lambda, supplier) that writes into a
40-
** RecordPipe, with another lambda (consumer) that consumes records from the pipe.
41-
**
42-
** Takes care of the job status monitoring, blocking when the pipe is empty, etc.
39+
* Class that knows how to Run an asynchronous job (lambda, supplier) that writes into a
40+
* RecordPipe, with another lambda (consumer) that consumes records from the pipe.
41+
*
42+
* Takes care of the job status monitoring, blocking when the pipe is empty, etc.
43+
*
44+
* There is a feature flag (System property "qqq.AsyncRecordPipeLoop.doFinalFlushInSupplierThread")
45+
* to control where a finalFlush call on a {@link BufferedRecordPipe} takes place.
46+
* Originally this call happened after the supplier job was complete, but it was
47+
* (we believe) incorrectly done on the same thread as the consumer. This feature
48+
* flag (which defaults to true - the now-believed correct behavior) moves that
49+
* finalFlush call to be on the supplier thread to avoid deadlocks. But, if this
50+
* fix/change turns out to not be correct, one can set this system property to
51+
* anything other than "true" (recommended "false") to revert to the old behavior.
4352
*******************************************************************************/
4453
public class AsyncRecordPipeLoop
4554
{
@@ -53,6 +62,10 @@ public class AsyncRecordPipeLoop
5362
private Integer minRecordsToConsume = 10;
5463
private String forcedJobUUID;
5564

65+
//////////////////////////////////////////////////////////////
66+
// feature flag - see class javadoc and usages inline below //
67+
//////////////////////////////////////////////////////////////
68+
private static boolean doFinalFlushInSupplierThread = System.getProperty("qqq.AsyncRecordPipeLoop.doFinalFlushInSupplierThread", "true").equalsIgnoreCase("true");
5669

5770

5871
/*******************************************************************************
@@ -81,7 +94,27 @@ public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, Unsaf
8194
asyncJobManager.setForcedJobUUID(getForcedJobUUID());
8295
}
8396

84-
String jobUUID = asyncJobManager.startJob(jobName, supplier::apply);
97+
String jobUUID = asyncJobManager.startJob(jobName, (callback) ->
98+
{
99+
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
100+
// originally this code just ran the supplier lambda here. However, we need to do a little bit more than that: //
101+
// If the recordPipe is a BufferedRecordPipe, we need to call finalFlush() it after the supplier lambda completes //
102+
// and critically, this needs to be on the same thread as the supplier, to avoid a potential deadlock in the pipe. //
103+
// And, since this is a tricky bit to know we're 100% confident changing, we'll wrap a feature flag around this //
104+
// change (see below, after the loop, in the consumer thread, where we originally did this flush). //
105+
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
106+
Serializable output = supplier.apply(callback);
107+
108+
if(doFinalFlushInSupplierThread)
109+
{
110+
if(recordPipe instanceof BufferedRecordPipe bufferedRecordPipe)
111+
{
112+
bufferedRecordPipe.finalFlush();
113+
}
114+
}
115+
116+
return (output);
117+
});
85118
LOG.debug("Started supplier job [" + jobUUID + "] for record pipe.");
86119

87120
AsyncJobState jobState = AsyncJobState.RUNNING;
@@ -162,9 +195,16 @@ public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, Unsaf
162195
jobState = asyncJobStatus.getState();
163196
}
164197

165-
if(recordPipe instanceof BufferedRecordPipe bufferedRecordPipe)
198+
/////////////////////////////////////////////////////////////////////////////////
199+
// in case it turns out that it isn't right to move this flush to the supplier //
200+
// thread, if this feature flag is not set to true, then do this flush here. //
201+
/////////////////////////////////////////////////////////////////////////////////
202+
if(!doFinalFlushInSupplierThread)
166203
{
167-
bufferedRecordPipe.finalFlush();
204+
if(recordPipe instanceof BufferedRecordPipe bufferedRecordPipe)
205+
{
206+
bufferedRecordPipe.finalFlush();
207+
}
168208
}
169209

170210
LOG.debug("Job [" + jobUUID + "][" + jobName + "] completed with status: " + asyncJobStatus);

qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ public class RecordPipe
4343
private static final QLogger LOG = QLogger.getLogger(RecordPipe.class);
4444

4545
private static final long BLOCKING_SLEEP_MILLIS = 100;
46-
private static final long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes
4746
private static final int DEFAULT_CAPACITY = 1_000;
4847

48+
private static long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes
49+
4950
private int capacity = DEFAULT_CAPACITY;
5051
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(capacity);
5152

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* QQQ - Low-code Application Framework for Engineers.
3+
* Copyright (C) 2021-2026. Kingsrook, LLC
4+
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
5+
* contact@kingsrook.com
6+
* https://github.com/Kingsrook/
7+
*
8+
* This program is free software: you can redistribute it and/or modify
9+
* it under the terms of the GNU Affero General Public License as
10+
* published by the Free Software Foundation, either version 3 of the
11+
* License, or (at your option) any later version.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* GNU Affero General Public License for more details.
17+
*
18+
* You should have received a copy of the GNU Affero General Public License
19+
* along with this program. If not, see <https://www.gnu.org/licenses/>.
20+
*/
21+
22+
package com.kingsrook.qqq.backend.core.actions.async;
23+
24+
25+
import java.io.Serializable;
26+
import java.lang.reflect.Field;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
import com.kingsrook.qqq.backend.core.BaseTest;
32+
import com.kingsrook.qqq.backend.core.actions.reporting.BufferedRecordPipe;
33+
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
34+
import com.kingsrook.qqq.backend.core.exceptions.QException;
35+
import com.kingsrook.qqq.backend.core.model.data.QRecord;
36+
import com.kingsrook.qqq.backend.core.utils.SleepUtils;
37+
import org.junit.jupiter.api.AfterEach;
38+
import org.junit.jupiter.api.Test;
39+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
40+
import static org.junit.jupiter.api.Assertions.assertEquals;
41+
42+
43+
/*******************************************************************************
44+
** Unit test for AsyncRecordPipeLoop
45+
*******************************************************************************/
46+
class AsyncRecordPipeLoopTest extends BaseTest
47+
{
48+
49+
/*******************************************************************************
50+
**
51+
*******************************************************************************/
52+
@AfterEach
53+
void afterEach() throws Exception
54+
{
55+
////////////////////////////////////////////////
56+
// reset the private fields manipulated below //
57+
////////////////////////////////////////////////
58+
setPrivateStaticField(AsyncRecordPipeLoop.class, "doFinalFlushInSupplierThread", true);
59+
setPrivateStaticField(RecordPipe.class, "MAX_SLEEP_LOOP_MILLIS", 300_000);
60+
}
61+
62+
63+
64+
/*******************************************************************************
65+
* This test established a baseline, where, before the finalFlush call was moved
66+
* to the supplier thread, that the pipe would fill up and deadlock.
67+
*
68+
* To trigger this condition, we set up some specific sizes. These are certainly
69+
* not the only way things could line up to make this condition happen, but these
70+
* do make it happen 100% of the time, so this test and testPipeDoesNotFillAndDeadlockWithFinalFlushInSupplierThread
71+
* both set up the same way.
72+
*
73+
* The conditions are:
74+
* - Make a BufferedPipe, whose buffer size is 1002. It'll internally wrap a
75+
* record pipe of size 1000.
76+
* - Have the supplier write (all in 1 call) 1001 records to the buffered pipe.
77+
* those records will fit in its buffer, and not be sent to the underlying pipe.
78+
* - Then the supplier job will finish, and a finalFlush will be called, to send
79+
* the 1001 records to the underlying pipe, which will block after putting 1000
80+
* records in the pipe.
81+
* - But - with the bug (consumer & supplier on same thread), it'll block until
82+
* the MAX_SLEEP_LOOP_MILLIS timeout happens.
83+
*******************************************************************************/
84+
@Test
85+
void testPipeDidFillAndDeadlockWithFinalFlushNotInSupplierThread() throws Exception
86+
{
87+
////////////////////////////////////////////////////////////////////////////////////////////////////
88+
// make sure the feature flag is set to false - to demonstrate the deadlock situation that occurs //
89+
// also make it only block for 10 ms, to not waste CI time //
90+
////////////////////////////////////////////////////////////////////////////////////////////////////
91+
setPrivateStaticField(AsyncRecordPipeLoop.class, "doFinalFlushInSupplierThread", false);
92+
setPrivateStaticField(RecordPipe.class, "MAX_SLEEP_LOOP_MILLIS", 10);
93+
94+
Integer rowsToSupply = 1_001;
95+
BufferedRecordPipe recordPipe = new BufferedRecordPipe(rowsToSupply + 1);
96+
AtomicInteger rowsConsumed = new AtomicInteger(0);
97+
98+
assertThatThrownBy(() -> new AsyncRecordPipeLoop().run("Test", null, recordPipe, (c) -> supplier(recordPipe, rowsToSupply), () -> consumer(recordPipe, rowsConsumed)))
99+
.isInstanceOf(IllegalStateException.class)
100+
.hasMessageContaining("Giving up adding record to pipe, due to pipe staying full too long");
101+
}
102+
103+
104+
105+
/*******************************************************************************
106+
* This test, with the feature flag set to true, should successfully complete.
107+
*
108+
* It should use the same setup as testPipeDidFillAndDeadlockWithFinalFlushNotInSupplierThread,
109+
* but with finalFlush moved to the supplier thread, the deadlock is avoided.
110+
*******************************************************************************/
111+
@Test
112+
void testPipeDoesNotFillAndDeadlockWithFinalFlushInSupplierThread() throws Exception
113+
{
114+
/////////////////////////////////////////////////////////////////////////////////////////////////////////
115+
// make sure the feature flag is set to true - as this is the test that we expect to work successfully //
116+
/////////////////////////////////////////////////////////////////////////////////////////////////////////
117+
setPrivateStaticField(AsyncRecordPipeLoop.class, "doFinalFlushInSupplierThread", true);
118+
119+
Integer rowsToSupply = 1_001;
120+
BufferedRecordPipe recordPipe = new BufferedRecordPipe(rowsToSupply + 1);
121+
AtomicInteger rowsConsumed = new AtomicInteger(0);
122+
123+
new AsyncRecordPipeLoop().run("Test", null, recordPipe, (c) -> supplier(recordPipe, rowsToSupply), () -> consumer(recordPipe, rowsConsumed));
124+
125+
assertEquals(rowsToSupply, rowsConsumed.get());
126+
}
127+
128+
129+
130+
/***************************************************************************
131+
*
132+
***************************************************************************/
133+
private Serializable supplier(RecordPipe recordPipe, Integer rowsToSupply) throws QException
134+
{
135+
recordPipe.addRecords(Collections.nCopies(rowsToSupply, new QRecord()));
136+
return true;
137+
}
138+
139+
140+
141+
/***************************************************************************
142+
*
143+
***************************************************************************/
144+
private Integer consumer(RecordPipe recordPipe, AtomicInteger rowsConsumed) throws QException
145+
{
146+
SleepUtils.sleep(5, TimeUnit.MILLISECONDS);
147+
List<QRecord> records = recordPipe.consumeAvailableRecords();
148+
rowsConsumed.addAndGet(records.size());
149+
return records.size();
150+
}
151+
152+
153+
154+
/***************************************************************************
155+
*
156+
***************************************************************************/
157+
public static void setPrivateStaticField(Class<?> targetClass, String fieldName, Object value) throws NoSuchFieldException, IllegalAccessException
158+
{
159+
Field field = targetClass.getDeclaredField(fieldName);
160+
field.setAccessible(true);
161+
field.set(null, value);
162+
}
163+
164+
}

0 commit comments

Comments
 (0)