Skip to content

Commit 964ec58

Browse files
authored
Speed up node boot times by parallelizing buffer acquisition (#19025)
Brokers/Historicals/Peons currently allocate buffers serially on boot. For large quantities of larger buffers (100+ buffers @ ~2GB per buffer) this can mean waiting for several minutes (in our case, upwards of 6mins for brokers, 5+ seconds for peons) just to acquire the memory needed, which isn't great. This is because it is effectively doing 100 sequential malloc/mmap calls each needing 2GB zero'd out memory. This change parallelizes the acquisition of the buffers proportional to the number of cores available on the machine. FJP threadpool is temporary and released once finished (this happens before the broker comes online and is serving queries anyways).
1 parent f17b4f4 commit 964ec58

9 files changed

Lines changed: 88 additions & 5 deletions

File tree

docs/configuration/index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,6 +1381,7 @@ Processing properties set on the Middle Manager are passed through to Peons.
13811381
|`druid.processing.fifo`|Enables the processing queue to treat tasks of equal priority in a FIFO manner.|`true`|
13821382
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
13831383
|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks. <br />Set to `local` to store segment files in the local storage of the Middle Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid does not support automated cleanup for the `shuffle-data` directory. You can set up cloud storage lifecycle rules for automated cleanup of data at the `shuffle-data` prefix location.|`local`|
1384+
|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing memory pools to be allocated in parallel on process launch. This may significantly speed up Peon launch times if allocating several large buffers.|`false`|
13841385

13851386
The amount of direct memory needed by Druid is at least
13861387
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
@@ -1526,6 +1527,7 @@ Druid uses Jetty to serve HTTP requests.
15261527
|`druid.processing.numTimeoutThreads`|The number of processing threads to have available for handling per-segment query timeouts. Setting this value to `0` removes the ability to service per-segment timeouts, irrespective of `perSegmentTimeout` query context parameter. As these threads are just servicing timers, it's recommended to set this value to some small percent (e.g. 5%) of the total query processing cores available to the indexer.|0|
15271528
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`true`|
15281529
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
1530+
|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing memory pools to be allocated in parallel on process launch. This may significantly speed up Indexer launch times if allocating several large buffers.|`false`|
15291531

15301532
The amount of direct memory needed by Druid is at least
15311533
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
@@ -1636,6 +1638,7 @@ Druid uses Jetty to serve HTTP requests.
16361638
|`druid.processing.numTimeoutThreads`|The number of processing threads to have available for handling per-segment query timeouts. Setting this value to `0` removes the ability to service per-segment timeouts, irrespective of `perSegmentTimeout` query context parameter. As these threads are just servicing timers, it's recommended to set this value to some small percent (e.g. 5%) of the total query processing cores available to the historical.|0|
16371639
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`true`|
16381640
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
1641+
|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing memory pools to be allocated in parallel on process launch. This may significantly speed up Historical/Broker launch times if allocating several large buffers.|`false`|
16391642

16401643
The amount of direct memory needed by Druid is at least
16411644
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ public MockProcessingConfig(final int numThreads, final int numMergeBuffers, fin
198198
null,
199199
new DruidProcessingBufferConfig(HumanReadableBytes.valueOf(bufferSize), null, null),
200200
null,
201+
null,
201202
JvmUtils.getRuntimeInfo()
202203
);
203204
}

processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@
2929
import java.util.ArrayList;
3030
import java.util.Collections;
3131
import java.util.List;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.ForkJoinPool;
3234
import java.util.concurrent.TimeUnit;
3335
import java.util.concurrent.atomic.AtomicLong;
3436
import java.util.concurrent.locks.Condition;
3537
import java.util.concurrent.locks.ReentrantLock;
3638
import java.util.stream.Collectors;
39+
import java.util.stream.IntStream;
3740

3841
/**
3942
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
@@ -55,12 +58,41 @@ public DefaultBlockingPool(
5558
Supplier<T> generator,
5659
int limit
5760
)
61+
{
62+
this(generator, limit, false);
63+
}
64+
65+
public DefaultBlockingPool(
66+
Supplier<T> generator,
67+
int limit,
68+
boolean parallelInit
69+
)
5870
{
5971
this.objects = new ArrayDeque<>(limit);
6072
this.maxSize = limit;
6173

62-
for (int i = 0; i < limit; i++) {
63-
objects.add(generator.get());
74+
// Parallize allocations can significantly speed up node boot times
75+
if (parallelInit) {
76+
int parallelism = Runtime.getRuntime().availableProcessors();
77+
ForkJoinPool fjp = new ForkJoinPool(parallelism);
78+
try {
79+
objects.addAll(fjp.submit(
80+
() -> IntStream.range(0, limit)
81+
.parallel()
82+
.mapToObj(i -> generator.get())
83+
.collect(Collectors.toCollection(ArrayList::new))
84+
).get());
85+
}
86+
catch (ExecutionException | InterruptedException e) {
87+
throw new RuntimeException(e);
88+
}
89+
finally {
90+
fjp.shutdown();
91+
}
92+
} else {
93+
for (int i = 0; i < limit; i++) {
94+
objects.add(generator.get());
95+
}
6496
}
6597

6698
this.lock = new ReentrantLock();

processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class DruidProcessingConfig implements ColumnConfig
5454
private final DruidProcessingIndexesConfig indexes;
5555
@JsonProperty
5656
private final int numTimeoutThreads;
57+
@JsonProperty
58+
private final boolean parallelPoolInit;
5759

5860
private final AtomicReference<Integer> computedBufferSizeBytes = new AtomicReference<>();
5961
private final boolean numThreadsConfigured;
@@ -69,6 +71,7 @@ public DruidProcessingConfig(
6971
@JsonProperty("tmpDir") @Nullable String tmpDir,
7072
@JsonProperty("buffer") DruidProcessingBufferConfig buffer,
7173
@JsonProperty("indexes") DruidProcessingIndexesConfig indexes,
74+
@JsonProperty("parallelPoolInit") @Nullable Boolean parallelPoolInit,
7275
@JacksonInject RuntimeInfo runtimeInfo
7376
)
7477
{
@@ -86,6 +89,7 @@ public DruidProcessingConfig(
8689
this.tmpDir = Configs.valueOrDefault(tmpDir, System.getProperty("java.io.tmpdir"));
8790
this.buffer = Configs.valueOrDefault(buffer, new DruidProcessingBufferConfig());
8891
this.indexes = Configs.valueOrDefault(indexes, new DruidProcessingIndexesConfig());
92+
this.parallelPoolInit = Configs.valueOrDefault(parallelPoolInit, false);
8993

9094
this.numThreadsConfigured = numThreads != null;
9195
this.numMergeBuffersConfigured = numMergeBuffers != null;
@@ -95,7 +99,7 @@ public DruidProcessingConfig(
9599
@VisibleForTesting
96100
public DruidProcessingConfig()
97101
{
98-
this(null, null, null, null, null, null, null, null, JvmUtils.getRuntimeInfo());
102+
this(null, null, null, null, null, null, null, null, null, JvmUtils.getRuntimeInfo());
99103
}
100104

101105
private void initializeBufferSize(RuntimeInfo runtimeInfo)
@@ -202,5 +206,14 @@ public boolean isNumMergeBuffersConfigured()
202206
{
203207
return numMergeBuffersConfigured;
204208
}
209+
210+
/**
211+
* Whether buffers in this pool are allocated in parallel.
212+
* See the configuration property `druid.processing.parallelPoolInit` for more information.
213+
*/
214+
public boolean isParallelMemoryPoolInit()
215+
{
216+
return parallelPoolInit;
217+
}
205218
}
206219

processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,19 @@ public void teardown()
6262
service.shutdownNow();
6363
}
6464

65+
@Test(timeout = 60_000L)
66+
public void testParallelInit()
67+
{
68+
DefaultBlockingPool<Integer> parallelPool = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 10, true);
69+
Assert.assertEquals(10, parallelPool.getPoolSize());
70+
final ReferenceCountingResourceHolder<Integer> holder =
71+
Iterables.getOnlyElement(parallelPool.takeBatch(1, 100), null);
72+
Assert.assertNotNull(holder);
73+
Assert.assertEquals(9, parallelPool.getPoolSize());
74+
holder.close();
75+
Assert.assertEquals(10, parallelPool.getPoolSize());
76+
}
77+
6578
@Test
6679
public void testTakeFromEmptyPool()
6780
{

processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,24 @@ public void testReplacements()
117117
Assert.assertEquals(0, config.getNumInitalBuffersForIntermediatePool());
118118
}
119119

120+
@Test
121+
public void testParallelPoolInitDefaultIsFalse()
122+
{
123+
Injector injector = makeInjector(NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE);
124+
DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class);
125+
Assert.assertFalse(config.isParallelMemoryPoolInit());
126+
}
127+
128+
@Test
129+
public void testParallelPoolInitEnabled()
130+
{
131+
Properties props = new Properties();
132+
props.setProperty("druid.processing.parallelPoolInit", "true");
133+
Injector injector = makeInjector(NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE, props);
134+
DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class);
135+
Assert.assertTrue(config.isParallelMemoryPoolInit());
136+
}
137+
120138
@Test
121139
public void testInvalidSizeBytes()
122140
{

server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig config,
109109
verifyDirectMemory(config, runtimeInfo);
110110
return new DefaultBlockingPool<>(
111111
new OffheapBufferGenerator("result merging", config.intermediateComputeSizeBytes()),
112-
config.getNumMergeBuffers()
112+
config.getNumMergeBuffers(),
113+
config.isParallelMemoryPoolInit()
113114
);
114115
}
115116

server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ public static BlockingPool<ByteBuffer> createMergeBufferPool(
185185
verifyDirectMemory(config, runtimeInfo);
186186
return new DefaultBlockingPool<>(
187187
new OffheapBufferGenerator("result merging", config.intermediateComputeSizeBytes()),
188-
config.getNumMergeBuffers()
188+
config.getNumMergeBuffers(),
189+
config.isParallelMemoryPoolInit()
189190
);
190191
}
191192

website/.spelling

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1883,6 +1883,7 @@ minTopNThreshold
18831883
parallelMergeInitialYieldRows
18841884
parallelMergeParallelism
18851885
parallelMergeSmallBatchRows
1886+
parallelPoolInit
18861887
populateCache
18871888
populateResultLevelCache
18881889
queryId

0 commit comments

Comments
 (0)