Skip to content

Commit 7bbfa03

Browse files
authored
feat: Make segmentLoadAheadCount able to be configured at worker task level in addition to context (apache#19559)
* Make segmentLoadAheadCount able to be configured at worker task level in addition to context * fixups based on review
1 parent d06aa83 commit 7bbfa03

11 files changed

Lines changed: 184 additions & 5 deletions

File tree

docs/querying/dart.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ For Historicals, you can set the following configs:
7373
|---|---|---|
7474
| `druid.msq.dart.worker.concurrentQueries` | Maximum number of query workers that can run concurrently on a Historical. We recommend leaving this config at the default value. If need to change this value, set it to a value equal to or larger than `druid.msq.dart.controller.concurrentQueries` on your Brokers. If you don't, queries can get stuck waiting for each other. Don't set it to a value higher than the number of merge buffers. | Equal to the number of merge buffers |
7575
| `druid.msq.dart.worker.heapFraction` | Maximum amount of heap available for use across all Dart queries as a decimal. | 0.35 (35% of heap) |
76+
| `druid.msq.dart.worker.segmentLoadAheadCount` | Number of segments a worker will prefetch ahead of processing them. This lets tiers with different hardware tune prefetch independently. It acts as a worker-local default: a value supplied in the query context (`segmentLoadAheadCount`) always takes precedence when it is set. Non-positive values are treated as unset. | Unset (uses twice the number of processing threads) |
7677

7778

7879
## Run a Dart query

multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerConfig.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222
import com.fasterxml.jackson.annotation.JsonProperty;
2323
import org.apache.druid.msq.exec.MemoryIntrospector;
24+
import org.apache.druid.msq.querykit.ReadableInputQueue;
25+
26+
import javax.annotation.Nullable;
2427

2528
/**
2629
* Runtime configuration for workers (which run on Historicals).
@@ -41,6 +44,21 @@ public class DartWorkerConfig
4144
@JsonProperty("heapFraction")
4245
private double heapFraction = DEFAULT_HEAP_FRACTION;
4346

47+
/**
48+
* Worker-local value for the segment load-ahead count used to size segment prefetch in {@link ReadableInputQueue}.
49+
* <p>
50+
* Defaults to null (unset), which leaves the value to query context (client or controller-default supplied) and the
51+
* built-in {@code 2 * threadCount} fallback. When set to a positive value, it acts as a worker-local default used
52+
* only when the query context does not supply a value.
53+
* <p>
54+
* This is per-worker-process configuration, set independently on each worker. It lets workers with different
55+
* hardware (for example, separate tiers with more or less memory and storage bandwidth) tune their own prefetch
56+
* depth, rather than relying solely on a single cluster-wide query-context default.
57+
*/
58+
@JsonProperty("segmentLoadAheadCount")
59+
@Nullable
60+
private Integer segmentLoadAheadCount = null;
61+
4462
public int getConcurrentQueries()
4563
{
4664
return concurrentQueries;
@@ -50,4 +68,10 @@ public double getHeapFraction()
5068
{
5169
return heapFraction;
5270
}
71+
72+
@Nullable
73+
public Integer getSegmentLoadAheadCount()
74+
{
75+
return segmentLoadAheadCount;
76+
}
5377
}

multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.druid.messages.server.Outbox;
3131
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
3232
import org.apache.druid.msq.dart.controller.messages.PostCounters;
33+
import org.apache.druid.msq.dart.guice.DartWorkerConfig;
3334
import org.apache.druid.msq.exec.ControllerClient;
3435
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
3536
import org.apache.druid.msq.exec.FrameContext;
@@ -57,6 +58,7 @@
5758
import org.apache.druid.utils.CloseableUtils;
5859
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
5960

61+
import javax.annotation.Nullable;
6062
import java.io.File;
6163
import java.util.List;
6264

@@ -92,6 +94,13 @@ public class DartWorkerContext implements WorkerContext
9294
private final ServiceEmitter emitter;
9395
private final int threadCount;
9496

97+
/**
98+
* Worker-local segment load-ahead count from {@link DartWorkerConfig#getSegmentLoadAheadCount()}, or null if unset.
99+
* Used as the default in {@link #segmentLoadAheadCount(WorkOrder)} when the query context does not supply a value.
100+
*/
101+
@Nullable
102+
private final Integer segmentLoadAheadCountConfig;
103+
95104
/**
96105
* Lazy initialized upon call to {@link #frameContext(WorkOrder)}.
97106
*/
@@ -109,6 +118,7 @@ public class DartWorkerContext implements WorkerContext
109118
final Injector injector,
110119
final DartWorkerClient workerClient,
111120
final DruidProcessingConfig processingConfig,
121+
final DartWorkerConfig workerConfig,
112122
final SegmentWrangler segmentWrangler,
113123
final SegmentManager segmentManager,
114124
final VirtualStorageManager virtualStorageManager,
@@ -148,6 +158,9 @@ public class DartWorkerContext implements WorkerContext
148158
final int baseThreadCount = processingConfig.getNumThreads();
149159
final Integer maxThreads = MultiStageQueryContext.getMaxThreads(queryContext);
150160
this.threadCount = (maxThreads != null && maxThreads > 0) ? Math.min(baseThreadCount, maxThreads) : baseThreadCount;
161+
162+
// Worker-local segment load-ahead config from this worker's DartWorkerConfig.
163+
this.segmentLoadAheadCountConfig = workerConfig.getSegmentLoadAheadCount();
151164
}
152165

153166
@Override
@@ -277,6 +290,32 @@ public int threadCount()
277290
return threadCount;
278291
}
279292

293+
@Override
294+
public int segmentLoadAheadCount(final WorkOrder workOrder)
295+
{
296+
final Integer fromContext = MultiStageQueryContext.getSegmentLoadAheadCount(workOrder.getWorkerContext());
297+
return resolveSegmentLoadAheadCount(fromContext, segmentLoadAheadCountConfig, threadCount);
298+
}
299+
300+
/**
301+
* Determine which of the three potential sources of segment load ahead count to use.
302+
* <p>
303+
* Precedence is: a value supplied in the query context wins when set; otherwise the worker-local config is used
304+
* when set to a positive value; lastly we fall back to {@code 2 * threadCount}.
305+
*/
306+
static int resolveSegmentLoadAheadCount(
307+
@Nullable final Integer fromContext,
308+
@Nullable final Integer workerConfig,
309+
final int threadCount
310+
)
311+
{
312+
if (fromContext != null) {
313+
return fromContext;
314+
}
315+
final boolean hasWorkerConfig = workerConfig != null && workerConfig > 0;
316+
return hasWorkerConfig ? workerConfig : threadCount * 2;
317+
}
318+
280319
@Override
281320
public boolean includeAllCounters()
282321
{

multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.druid.messages.server.Outbox;
3232
import org.apache.druid.msq.dart.Dart;
3333
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
34+
import org.apache.druid.msq.dart.guice.DartWorkerConfig;
3435
import org.apache.druid.msq.exec.MemoryIntrospector;
3536
import org.apache.druid.msq.exec.ProcessingBuffersProvider;
3637
import org.apache.druid.msq.exec.WorkerContext;
@@ -60,6 +61,7 @@ public class DartWorkerContextFactoryImpl implements DartWorkerContextFactory
6061
private final Injector injector;
6162
private final ServiceClientFactory serviceClientFactory;
6263
private final DruidProcessingConfig processingConfig;
64+
private final DartWorkerConfig workerConfig;
6365
private final SegmentWrangler segmentWrangler;
6466
private final SegmentManager segmentManager;
6567
private final VirtualStorageManager virtualStorageManager;
@@ -80,6 +82,7 @@ public DartWorkerContextFactoryImpl(
8082
Injector injector,
8183
@EscalatedGlobal ServiceClientFactory serviceClientFactory,
8284
DruidProcessingConfig processingConfig,
85+
DartWorkerConfig workerConfig,
8386
SegmentWrangler segmentWrangler,
8487
SegmentManager segmentManager,
8588
VirtualStorageManager virtualStorageManager,
@@ -99,6 +102,7 @@ public DartWorkerContextFactoryImpl(
99102
this.injector = injector;
100103
this.serviceClientFactory = serviceClientFactory;
101104
this.processingConfig = processingConfig;
105+
this.workerConfig = workerConfig;
102106
this.segmentWrangler = segmentWrangler;
103107
this.coordinatorClient = coordinatorClient;
104108
this.segmentManager = segmentManager;
@@ -128,6 +132,7 @@ public WorkerContext build(
128132
injector,
129133
createWorkerClient(queryId),
130134
processingConfig,
135+
workerConfig,
131136
segmentWrangler,
132137
segmentManager,
133138
virtualStorageManager,

multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ public interface ExecutionContext
9191
*/
9292
int threadCount();
9393

94+
/**
95+
* Effective segment load-ahead count for {@link #workOrder()}, resolved by
96+
* {@link WorkerContext#segmentLoadAheadCount(WorkOrder)}. Used to size the segment prefetch in
97+
* {@link org.apache.druid.msq.querykit.ReadableInputQueue}.
98+
*/
99+
int segmentLoadAheadCount();
100+
94101
/**
95102
* Cancellation ID that must be provided to {@link FrameProcessorExecutor} when running work.
96103
*/

multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class ExecutionContextImpl implements ExecutionContext
5050
private final FrameContext frameContext;
5151
private final CounterTracker counters;
5252
private final int maxOutstandingProcessors;
53+
private final int segmentLoadAheadCount;
5354
private final String cancellationId;
5455
private final RunWorkOrderListener listener;
5556
private final Set<String> intermediateOutputChannelFactoryNames = Sets.newConcurrentHashSet();
@@ -65,6 +66,7 @@ public class ExecutionContextImpl implements ExecutionContext
6566
final FrameContext frameContext,
6667
final CounterTracker counters,
6768
final int maxOutstandingProcessors,
69+
final int segmentLoadAheadCount,
6870
final String cancellationId,
6971
final RunWorkOrderListener listener
7072
)
@@ -79,6 +81,7 @@ public class ExecutionContextImpl implements ExecutionContext
7981
this.frameContext = frameContext;
8082
this.counters = counters;
8183
this.maxOutstandingProcessors = maxOutstandingProcessors;
84+
this.segmentLoadAheadCount = segmentLoadAheadCount;
8285
this.cancellationId = cancellationId;
8386
this.listener = listener;
8487
}
@@ -147,6 +150,12 @@ public int threadCount()
147150
return maxOutstandingProcessors;
148151
}
149152

153+
@Override
154+
public int segmentLoadAheadCount()
155+
{
156+
return segmentLoadAheadCount;
157+
}
158+
150159
@Override
151160
public String cancellationId()
152161
{

multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ private ExecutionContext makeExecutionContext()
392392
frameContext,
393393
counterTracker,
394394
workerContext.threadCount(),
395+
workerContext.segmentLoadAheadCount(workOrder),
395396
cancellationId,
396397
listener
397398
);

multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,20 @@ public interface WorkerContext extends Closeable
109109
*/
110110
int threadCount();
111111

112+
/**
113+
* Effective number of segments to load ahead of when they are needed while processing the given {@code workOrder},
114+
* used to size the segment prefetch in {@link org.apache.druid.msq.querykit.ReadableInputQueue}.
115+
*
116+
* The default honors {@link MultiStageQueryContext#CTX_SEGMENT_LOAD_AHEAD_COUNT} from the work order's context
117+
* (set by the controller from client and broker-default context), and otherwise falls back to
118+
* {@code 2 * threadCount()}. Implementations may override to layer in worker-local configuration.
119+
*/
120+
default int segmentLoadAheadCount(WorkOrder workOrder)
121+
{
122+
final Integer fromContext = MultiStageQueryContext.getSegmentLoadAheadCount(workOrder.getWorkerContext());
123+
return fromContext != null ? fromContext : threadCount() * 2;
124+
}
125+
112126
/**
113127
* Fetch node info about self.
114128
*/

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.druid.msq.input.stage.StageInputSlice;
5252
import org.apache.druid.msq.input.table.SegmentsInputSlice;
5353
import org.apache.druid.msq.kernel.StageDefinition;
54-
import org.apache.druid.msq.util.MultiStageQueryContext;
5554
import org.apache.druid.query.Query;
5655
import org.apache.druid.query.filter.SegmentPruner;
5756
import org.apache.druid.query.planning.ExecutionVertex;
@@ -294,12 +293,10 @@ private ReadableInputQueue makeBaseInputQueue(
294293
}
295294

296295
final List<PhysicalInputSlice> filteredSlices = filterBaseInput(physicalInputSlices);
297-
final Integer segmentLoadAheadCount =
298-
MultiStageQueryContext.getSegmentLoadAheadCount(context.workOrder().getWorkerContext());
299296
return new ReadableInputQueue(
300297
new StandardPartitionReader(context),
301298
filteredSlices,
302-
segmentLoadAheadCount != null ? segmentLoadAheadCount : context.threadCount() * 2
299+
context.segmentLoadAheadCount()
303300
);
304301
}
305302

multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,10 @@ public class MultiStageQueryContext
278278
public static final String CTX_MAX_THREADS = "maxThreads";
279279

280280
/**
281-
* Maximum number of segments to load ahead of them being needed. Used when setting up {@link ReadableInputQueue}.
281+
* Number of segments to load ahead of them being needed. Used when setting up {@link ReadableInputQueue}.
282+
* <p>
283+
* A worker may be configured with a local default for this value. When this context value is set, it always wins;
284+
* the worker-local default applies only when this context value is absent.
282285
*/
283286
public static final String CTX_SEGMENT_LOAD_AHEAD_COUNT = "segmentLoadAheadCount";
284287

0 commit comments

Comments
 (0)