From 91cbd082c3f857f8f16d0e17e85452354d35da19 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 18 May 2026 16:23:56 -0700 Subject: [PATCH 1/4] feat: Extension point for MSQ InputSpecs. This patch adds extension points for InputSpecSlicer and InputSliceReader, and uses them to implement TableInputSpec. This eliminates and generalizes the "newTableInputSpecSlicer" method on the ControllerContext, which was previously needed because the slicing logic differs for tasks and Dart. --- .../controller/DartControllerContext.java | 21 ++++-- .../DartControllerContextFactoryImpl.java | 9 +++ .../DartTableInputSpecSlicerProvider.java | 55 +++++++++++++++ .../msq/dart/guice/DartControllerModule.java | 6 ++ .../msq/dart/guice/DartWorkerModule.java | 7 ++ .../DartSegmentsInputSliceReaderProvider.java | 46 +++++++++++++ .../msq/dart/worker/DartWorkerContext.java | 17 +++-- .../worker/DartWorkerContextFactoryImpl.java | 12 +++- .../druid/msq/exec/ControllerContext.java | 11 +-- .../apache/druid/msq/exec/ControllerImpl.java | 43 ++++++++---- .../apache/druid/msq/exec/RunWorkOrder.java | 28 +++++--- .../apache/druid/msq/exec/WorkerContext.java | 12 ++++ .../apache/druid/msq/guice/MSQBinders.java | 34 +++++++++ .../druid/msq/guice/MSQIndexingModule.java | 13 ++++ .../indexing/IndexerControllerContext.java | 20 +++--- ...dexerSegmentsInputSliceReaderProvider.java | 47 +++++++++++++ .../IndexerTableInputSpecSlicerProvider.java | 69 +++++++++++++++++++ .../msq/indexing/IndexerWorkerContext.java | 21 +++++- .../msq/input/InputSliceReaderProvider.java | 40 +++++++++++ .../msq/input/InputSpecSlicerProvider.java | 46 +++++++++++++ .../controller/DartControllerContextTest.java | 13 +++- .../msq/test/MSQTestControllerContext.java | 13 ++-- .../TestDartControllerContextFactoryImpl.java | 16 ++++- 23 files changed, 534 insertions(+), 65 deletions(-) create mode 100644 multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java create mode 100644 multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java create mode 100644 multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java create mode 100644 multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java create mode 100644 multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java create mode 100644 multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java index 52936b8e0f87..ec748a74b7de 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java @@ -40,7 +40,7 @@ import org.apache.druid.msq.indexing.IndexerControllerContext; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; -import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -88,8 +88,9 @@ public class DartControllerContext implements ControllerContext private final DartWorkerClient workerClient; private final TimelineServerView serverView; private final MemoryIntrospector memoryIntrospector; - private final QueryContext context; + private final List inputSpecSlicerProviders; private final ServiceEmitter emitter; + private final QueryContext context; public DartControllerContext( final Injector injector, @@ -98,6 +99,7 @@ public DartControllerContext( final DartWorkerClient workerClient, final MemoryIntrospector memoryIntrospector, final TimelineServerView serverView, + final List inputSpecSlicerProviders, final ServiceEmitter emitter, final QueryContext context ) @@ -108,8 +110,9 @@ public DartControllerContext( this.workerClient = workerClient; this.serverView = serverView; this.memoryIntrospector = memoryIntrospector; - this.context = context; + this.inputSpecSlicerProviders = inputSpecSlicerProviders; this.emitter = emitter; + this.context = context; } @Override @@ -190,9 +193,9 @@ public DruidNode selfNode() } @Override - public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) + public List inputSpecSlicerProviders() { - return DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), serverView, context); + return inputSpecSlicerProviders; } @Override @@ -260,4 +263,12 @@ public boolean isDebug() { return context.isDebug(); } + + /** + * Getter for {@link DartTableInputSpecSlicerProvider} to retrieve the server view. + */ + TimelineServerView serverView() + { + return serverView; + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java index 87582d977aa2..260d6156c8c4 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java @@ -28,14 +28,19 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.msq.dart.Dart; import org.apache.druid.msq.dart.worker.DartWorkerClientImpl; import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.MemoryIntrospector; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContexts; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.server.DruidNode; +import java.util.List; +import java.util.Set; + public class DartControllerContextFactoryImpl implements DartControllerContextFactory { protected final Injector injector; @@ -45,6 +50,7 @@ public class DartControllerContextFactoryImpl implements DartControllerContextFa protected final ServiceClientFactory serviceClientFactory; protected final TimelineServerView serverView; protected final MemoryIntrospector memoryIntrospector; + protected final List inputSpecSlicerProviders; protected final ServiceEmitter emitter; @Inject @@ -56,6 +62,7 @@ public DartControllerContextFactoryImpl( @EscalatedGlobal final ServiceClientFactory serviceClientFactory, final MemoryIntrospector memoryIntrospector, final TimelineServerView serverView, + @Dart final Set inputSpecSlicerProviders, final ServiceEmitter emitter ) { @@ -66,6 +73,7 @@ public DartControllerContextFactoryImpl( this.serviceClientFactory = serviceClientFactory; this.serverView = serverView; this.memoryIntrospector = memoryIntrospector; + this.inputSpecSlicerProviders = List.copyOf(inputSpecSlicerProviders); this.emitter = emitter; } @@ -80,6 +88,7 @@ public ControllerContext newContext(final QueryContext context) new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()), memoryIntrospector, serverView, + inputSpecSlicerProviders, emitter, context ); diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java new file mode 100644 index 000000000000..13dcb49e519e --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.controller; + +import org.apache.druid.msq.exec.ControllerContext; +import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; +import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.query.QueryContext; + +import java.util.List; + +/** + * Controller-side provider for {@link TableInputSpec} in Dart. + */ +public class DartTableInputSpecSlicerProvider implements InputSpecSlicerProvider +{ + @Override + public Class specClass() + { + return TableInputSpec.class; + } + + @Override + public InputSpecSlicer createSlicer( + ControllerContext controllerContext, + QueryContext queryContext, + List workerIds + ) + { + return DartTableInputSpecSlicer.createFromWorkerIds( + workerIds, + ((DartControllerContext) controllerContext).serverView(), + queryContext + ); + } +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java index 8a9b1cf36073..0937888b0a9b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java @@ -47,11 +47,13 @@ import org.apache.druid.msq.dart.controller.DartControllerRegistry; import org.apache.druid.msq.dart.controller.DartMessageRelayFactoryImpl; import org.apache.druid.msq.dart.controller.DartMessageRelays; +import org.apache.druid.msq.dart.controller.DartTableInputSpecSlicerProvider; import org.apache.druid.msq.dart.controller.http.DartQueryInfo; import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactory; import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactoryImpl; import org.apache.druid.msq.dart.controller.sql.DartSqlClients; import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; +import org.apache.druid.msq.guice.MSQBinders; import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.query.DefaultQueryConfig; import org.apache.druid.query.QueryConfigProvider; @@ -111,6 +113,10 @@ public void configure(Binder binder) .addBinding() .to(DartSqlEngine.class) .in(LazySingleton.class); + MSQBinders.inputSpecSlicerProviderBinder(binder, Dart.class) + .addBinding() + .to(DartTableInputSpecSlicerProvider.class) + .in(LazySingleton.class); } @Provides diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java index e98d0ec2195b..5ee9aa1f6ee1 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java @@ -53,11 +53,13 @@ import org.apache.druid.msq.dart.controller.messages.ControllerMessage; import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory; +import org.apache.druid.msq.dart.worker.DartSegmentsInputSliceReaderProvider; import org.apache.druid.msq.dart.worker.DartWorkerContextFactory; import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl; import org.apache.druid.msq.dart.worker.DartWorkerRunner; import org.apache.druid.msq.dart.worker.http.DartWorkerResource; import org.apache.druid.msq.exec.MemoryIntrospector; +import org.apache.druid.msq.guice.MSQBinders; import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.rpc.ServiceClientFactory; @@ -104,6 +106,11 @@ public void configure(Binder binder) binder.bind(ResourcePermissionMapper.class) .annotatedWith(Dart.class) .to(DartResourcePermissionMapper.class); + + MSQBinders.inputSliceReaderProviderBinder(binder, Dart.class) + .addBinding() + .to(DartSegmentsInputSliceReaderProvider.class) + .in(LazySingleton.class); } @Provides diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java new file mode 100644 index 000000000000..7421e87f0a30 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.worker; + +import org.apache.druid.msq.exec.FrameContext; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.InputSliceReaderProvider; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.input.table.SegmentsInputSliceReader; +import org.apache.druid.query.QueryContext; + +/** + * Worker-side provider for {@link SegmentsInputSlice} in Dart. + */ +public class DartSegmentsInputSliceReaderProvider implements InputSliceReaderProvider +{ + @Override + public Class sliceClass() + { + return SegmentsInputSlice.class; + } + + @Override + public InputSliceReader createReader(FrameContext frameContext, QueryContext queryContext) + { + return new SegmentsInputSliceReader(frameContext, false); + } +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java index 768801261d1b..39888b757177 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java @@ -43,12 +43,12 @@ import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerStorageParameters; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContexts; -import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.policy.PolicyEnforcer; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.server.DruidNode; @@ -57,6 +57,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import java.io.File; +import java.util.List; /** * Dart implementation of {@link WorkerContext}. @@ -79,7 +80,6 @@ public class DartWorkerContext implements WorkerContext private final Injector injector; private final DartWorkerClient workerClient; private final SegmentWrangler segmentWrangler; - private final GroupingEngine groupingEngine; private final SegmentManager segmentManager; private final CoordinatorClient coordinatorClient; private final MemoryIntrospector memoryIntrospector; @@ -96,6 +96,7 @@ public class DartWorkerContext implements WorkerContext @MonotonicNonNull private volatile ResourceHolder processingBuffersSet; private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory; + private final List inputSliceReaderProviders; DartWorkerContext( final String queryId, @@ -107,7 +108,6 @@ public class DartWorkerContext implements WorkerContext final DartWorkerClient workerClient, final DruidProcessingConfig processingConfig, final SegmentWrangler segmentWrangler, - final GroupingEngine groupingEngine, final SegmentManager segmentManager, final CoordinatorClient coordinatorClient, final MemoryIntrospector memoryIntrospector, @@ -116,7 +116,8 @@ public class DartWorkerContext implements WorkerContext final File tempDir, final QueryContext queryContext, final DataServerQueryHandlerFactory dataServerQueryHandlerFactory, - final ServiceEmitter emitter + final ServiceEmitter emitter, + final List inputSliceReaderProviders ) { this.queryId = queryId; @@ -129,7 +130,6 @@ public class DartWorkerContext implements WorkerContext this.injector = injector; this.workerClient = workerClient; this.segmentWrangler = segmentWrangler; - this.groupingEngine = groupingEngine; this.segmentManager = segmentManager; this.coordinatorClient = coordinatorClient; this.memoryIntrospector = memoryIntrospector; @@ -138,6 +138,7 @@ public class DartWorkerContext implements WorkerContext this.tempDir = tempDir; this.queryContext = Preconditions.checkNotNull(queryContext, "queryContext"); this.emitter = emitter; + this.inputSliceReaderProviders = inputSliceReaderProviders; // Compute thread count once in constructor final int baseThreadCount = processingConfig.getNumThreads(); @@ -175,6 +176,12 @@ public Injector injector() return injector; } + @Override + public List inputSliceReaderProviders() + { + return inputSliceReaderProviders; + } + @Override public void registerWorker(Worker worker, Closer closer) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java index 07117c80f19e..dc74cea3513f 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java @@ -34,6 +34,7 @@ import org.apache.druid.msq.exec.MemoryIntrospector; import org.apache.druid.msq.exec.ProcessingBuffersProvider; import org.apache.druid.msq.exec.WorkerContext; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContext; import org.apache.druid.query.groupby.GroupingEngine; @@ -44,6 +45,8 @@ import org.apache.druid.server.SegmentManager; import java.io.File; +import java.util.List; +import java.util.Set; /** * Production implementation of {@link DartWorkerContextFactory}. @@ -66,6 +69,7 @@ public class DartWorkerContextFactoryImpl implements DartWorkerContextFactory private final Outbox outbox; private final DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory; private final ServiceEmitter emitter; + private final List inputSliceReaderProviders; @Inject public DartWorkerContextFactoryImpl( @@ -84,7 +88,8 @@ public DartWorkerContextFactoryImpl( @Dart ProcessingBuffersProvider processingBuffersProvider, Outbox outbox, DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory, - ServiceEmitter emitter + ServiceEmitter emitter, + @Dart Set inputSliceReaderProviders ) { this.selfNode = selfNode; @@ -103,6 +108,7 @@ public DartWorkerContextFactoryImpl( this.outbox = outbox; this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; this.emitter = emitter; + this.inputSliceReaderProviders = List.copyOf(inputSliceReaderProviders); } @Override @@ -123,7 +129,6 @@ public WorkerContext build( createWorkerClient(queryId), processingConfig, segmentWrangler, - groupingEngine, segmentManager, coordinatorClient, memoryIntrospector, @@ -132,7 +137,8 @@ public WorkerContext build( tempDir, queryContext, dataServerQueryHandlerFactory, - emitter + emitter, + inputSliceReaderProviders ); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index e12c5c3b3362..013df33fad10 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -26,14 +26,14 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.msq.indexing.MSQSpec; -import org.apache.druid.msq.input.InputSpecSlicer; -import org.apache.druid.msq.input.table.SegmentsInputSlice; -import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.ShuffleSpec; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.server.DruidNode; import java.io.File; +import java.util.List; /** * Context used by multi-stage query controllers. Useful because it allows test fixtures to provide their own @@ -82,9 +82,10 @@ public interface ControllerContext DruidNode selfNode(); /** - * Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}. + * Extension point for {@link InputSpec} beyond the builtin ones provided by + * {@link ControllerImpl#makeInputSpecSlicerFactory}. */ - InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager); + List inputSpecSlicerProviders(); /** * Provide access to segment actions in the Overlord. Only called for ingestion queries, i.e., where diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 83568629be1d..db9b3cbc9fbf 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -139,6 +139,7 @@ import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicerFactory; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.input.MapInputSpecSlicer; import org.apache.druid.msq.input.external.ExternalInputSpec; import org.apache.druid.msq.input.external.ExternalInputSpecSlicer; @@ -148,7 +149,6 @@ import org.apache.druid.msq.input.lookup.LookupInputSpecSlicer; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.input.stage.StageInputSpecSlicer; -import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.kernel.StageId; @@ -208,6 +208,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -423,8 +424,11 @@ private MSQTaskReportPayload runInternal(final QueryListener queryListener, fina closer.register(workerSketchFetcher::close); // Execution-related: run the multi-stage QueryDefinition. - final InputSpecSlicerFactory inputSpecSlicerFactory = - makeInputSpecSlicerFactory(context.newTableInputSpecSlicer(workerManager)); + final InputSpecSlicerFactory inputSpecSlicerFactory = makeInputSpecSlicerFactory( + context, + workerManager.getWorkerIds(), + getQueryContext() + ); final Pair> queryRunResult = new RunQueryUntilDone( @@ -2149,17 +2153,30 @@ private static MSQStatusReport makeStatusReport( ); } - private static InputSpecSlicerFactory makeInputSpecSlicerFactory(final InputSpecSlicer tableInputSpecSlicer) + private static InputSpecSlicerFactory makeInputSpecSlicerFactory( + final ControllerContext controllerContext, + final List workerIds, + final QueryContext queryContext + ) { - return (stagePartitionsMap, stageOutputChannelModeMap) -> new MapInputSpecSlicer( - ImmutableMap., InputSpecSlicer>builder() - .put(StageInputSpec.class, new StageInputSpecSlicer(stagePartitionsMap, stageOutputChannelModeMap)) - .put(ExternalInputSpec.class, new ExternalInputSpecSlicer()) - .put(InlineInputSpec.class, new InlineInputSpecSlicer()) - .put(LookupInputSpec.class, new LookupInputSpecSlicer()) - .put(TableInputSpec.class, tableInputSpecSlicer) - .build() - ); + return (stagePartitionsMap, stageOutputChannelModeMap) -> { + Map, InputSpecSlicer> slicers = new LinkedHashMap<>(); + + slicers.put(StageInputSpec.class, new StageInputSpecSlicer(stagePartitionsMap, stageOutputChannelModeMap)); + slicers.put(ExternalInputSpec.class, new ExternalInputSpecSlicer()); + slicers.put(InlineInputSpec.class, new InlineInputSpecSlicer()); + slicers.put(LookupInputSpec.class, new LookupInputSpecSlicer()); + + // Context-supplied providers override the default ones, so they get added last. + for (final InputSpecSlicerProvider slicerProvider : controllerContext.inputSpecSlicerProviders()) { + slicers.put( + slicerProvider.specClass(), + slicerProvider.createSlicer(controllerContext, queryContext, workerIds) + ); + } + + return new MapInputSpecSlicer(slicers); + }; } private static Map copyOfStageRuntimesEndingAtCurrentTime( diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index ba613768781c..20593f266219 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -21,7 +21,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -48,6 +47,7 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.msq.input.MapInputSliceReader; import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.msq.input.NilInputSliceReader; @@ -71,6 +71,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -399,16 +400,21 @@ private ExecutionContext makeExecutionContext() private InputSliceReader makeInputSliceReader() { final boolean reindex = MultiStageQueryContext.isReindex(workOrder.getWorkerContext()); - return new MapInputSliceReader( - ImmutableMap., InputSliceReader>builder() - .put(NilInputSlice.class, NilInputSliceReader.INSTANCE) - .put(StageInputSlice.class, StageInputSliceReader.INSTANCE) - .put(ExternalInputSlice.class, new ExternalInputSliceReader(frameContext.tempDir("external"))) - .put(InlineInputSlice.class, new InlineInputSliceReader(frameContext.segmentWrangler())) - .put(LookupInputSlice.class, new LookupInputSliceReader(frameContext.segmentWrangler())) - .put(SegmentsInputSlice.class, new SegmentsInputSliceReader(frameContext, reindex)) - .build() - ); + LinkedHashMap, InputSliceReader> readers = new LinkedHashMap<>(); + readers.put(NilInputSlice.class, NilInputSliceReader.INSTANCE); + readers.put(StageInputSlice.class, StageInputSliceReader.INSTANCE); + readers.put(ExternalInputSlice.class, new ExternalInputSliceReader(frameContext.tempDir("external"))); + readers.put(InlineInputSlice.class, new InlineInputSliceReader(frameContext.segmentWrangler())); + readers.put(LookupInputSlice.class, new LookupInputSliceReader(frameContext.segmentWrangler())); + readers.put(SegmentsInputSlice.class, new SegmentsInputSliceReader(frameContext, reindex)); + + // Context-supplied providers override the default ones, so they get added last. + for (final InputSliceReaderProvider readerProvider : workerContext.inputSliceReaderProviders()) { + final InputSliceReader reader = readerProvider.createReader(frameContext, workOrder.getWorkerContext()); + readers.put(readerProvider.sliceClass(), reader); + } + + return new MapInputSliceReader(readers); } private OutputChannelFactory makeStageOutputChannelFactory() diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java index 9e696a16cce6..b225112236d8 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.msq.indexing.MSQWorkerTask; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.policy.PolicyEnforcer; @@ -31,6 +33,7 @@ import java.io.Closeable; import java.io.File; +import java.util.List; /** * Context used by multi-stage query workers. @@ -121,6 +124,15 @@ public interface WorkerContext extends Closeable */ boolean isDebug(); + /** + * Extension point for additional {@link InputSlice} beyond those provided by + * {@link RunWorkOrder#makeInputSliceReader()}. + */ + default List inputSliceReaderProviders() + { + return List.of(); + } + @Override void close(); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java index 0a6b0827324a..e3cd3e4028e2 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java @@ -20,11 +20,20 @@ package org.apache.druid.msq.guice; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; +import com.google.inject.multibindings.Multibinder; +import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.msq.dart.Dart; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReaderProvider; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.querykit.QueryKit; import org.apache.druid.query.Query; +import java.lang.annotation.Annotation; + /** * Utility class for MSQ-related Guice bindings. */ @@ -50,4 +59,29 @@ public static MapBinder, QueryKit> queryKitBinder(Binder new TypeLiteral<>() {} ); } + + /** + * Bind an {@link InputSpecSlicerProvider} for use on a controller. The annotation should be + * {@link IndexingService} for providers used by tasks, or {@link Dart} for providers used by Dart. + */ + public static Multibinder inputSpecSlicerProviderBinder( + Binder binder, + Class annotation + ) + { + return Multibinder.newSetBinder(binder, Key.get(InputSpecSlicerProvider.class, annotation)); + } + + /** + * Bind an {@link InputSliceReaderProvider} for use on a worker, to handle a particular {@link InputSlice}. + * The annotation should be {@link IndexingService} for providers used by tasks, or {@link Dart} for providers + * used by Dart. + */ + public static Multibinder inputSliceReaderProviderBinder( + Binder binder, + Class annotation + ) + { + return Multibinder.newSetBinder(binder, Key.get(InputSliceReaderProvider.class, annotation)); + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java index 629754f2a9a1..8781fa5bb0f4 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java @@ -27,6 +27,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Provides; +import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.initialization.DruidModule; @@ -40,6 +41,8 @@ import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter; import org.apache.druid.msq.counters.WarningCounters; import org.apache.druid.msq.indexing.IndexerControllerContextFactory; +import org.apache.druid.msq.indexing.IndexerSegmentsInputSliceReaderProvider; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicerProvider; import org.apache.druid.msq.indexing.MSQCompactionRunner; import org.apache.druid.msq.indexing.MSQControllerTask; import org.apache.druid.msq.indexing.MSQWorkerTask; @@ -250,6 +253,16 @@ public void configure(Binder binder) .addBinding(WindowOperatorQuery.class) .to(WindowOperatorQueryKit.class); binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class); + + MSQBinders.inputSpecSlicerProviderBinder(binder, IndexingService.class) + .addBinding() + .to(IndexerTableInputSpecSlicerProvider.class) + .in(LazySingleton.class); + + MSQBinders.inputSliceReaderProviderBinder(binder, IndexingService.class) + .addBinding() + .to(IndexerSegmentsInputSliceReaderProvider.class) + .in(LazySingleton.class); } @Provides diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 18aebe0bcd21..5ea69d8c6a0a 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.TypeLiteral; +import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.frame.FrameType; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.common.TaskLockType; @@ -48,7 +50,7 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.indexing.error.UnknownFault; -import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -66,7 +68,9 @@ import java.io.File; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -90,6 +94,7 @@ public class IndexerControllerContext implements ControllerContext private final ServiceClientFactory clientFactory; private final OverlordClient overlordClient; private final MemoryIntrospector memoryIntrospector; + private final List inputSpecSlicerProviders; public IndexerControllerContext( final MSQControllerTask task, @@ -110,6 +115,9 @@ public IndexerControllerContext( this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class); final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)); final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir()); + final Set inputSpecSlicerProviders = + injector.getInstance(Key.get(new TypeLiteral<>() {}, IndexingService.class)); + this.inputSpecSlicerProviders = List.copyOf(inputSpecSlicerProviders); this.injector = injector.createChildInjector( binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) .toInstance(storageConnector)); @@ -174,15 +182,9 @@ public DruidNode selfNode() } @Override - public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager workerManager) + public List inputSpecSlicerProviders() { - final SegmentSource includeSegmentSource = - MultiStageQueryContext.getSegmentSources(taskQuerySpecContext, DEFAULT_SEGMENT_SOURCE); - return new IndexerTableInputSpecSlicer( - toolbox.getCoordinatorClient(), - toolbox.getTaskActionClient(), - includeSegmentSource - ); + return inputSpecSlicerProviders; } @Override diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java new file mode 100644 index 000000000000..d27a7852cac2 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import org.apache.druid.msq.exec.FrameContext; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.InputSliceReaderProvider; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.input.table.SegmentsInputSliceReader; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; + +/** + * Worker-side provider for {@link SegmentsInputSlice} in tasks. + */ +public class IndexerSegmentsInputSliceReaderProvider implements InputSliceReaderProvider +{ + @Override + public Class sliceClass() + { + return SegmentsInputSlice.class; + } + + @Override + public InputSliceReader createReader(FrameContext frameContext, QueryContext queryContext) + { + return new SegmentsInputSliceReader(frameContext, MultiStageQueryContext.isReindex(queryContext)); + } +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java new file mode 100644 index 000000000000..7cfc4763dac0 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.google.inject.Inject; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.msq.exec.ControllerContext; +import org.apache.druid.msq.exec.SegmentSource; +import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; +import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; + +import java.util.List; + +/** + * Controller-side provider for {@link TableInputSpec} in tasks. + */ +public class IndexerTableInputSpecSlicerProvider implements InputSpecSlicerProvider +{ + private final CoordinatorClient coordinatorClient; + + @Inject + public IndexerTableInputSpecSlicerProvider(CoordinatorClient coordinatorClient) + { + this.coordinatorClient = coordinatorClient; + } + + @Override + public Class specClass() + { + return TableInputSpec.class; + } + + @Override + public InputSpecSlicer createSlicer( + ControllerContext controllerContext, + QueryContext queryContext, + List workerIds + ) + { + final SegmentSource includeSegmentSource = + MultiStageQueryContext.getSegmentSources(queryContext, IndexerControllerContext.DEFAULT_SEGMENT_SOURCE); + return new IndexerTableInputSpecSlicer( + coordinatorClient, + controllerContext.taskActionClient(), + includeSegmentSource + ); + } +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 2589d0c0dab7..82d462fd096a 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.TypeLiteral; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Smile; @@ -46,6 +48,7 @@ import org.apache.druid.msq.indexing.client.IndexerControllerClient; import org.apache.druid.msq.indexing.client.IndexerWorkerClient; import org.apache.druid.msq.indexing.client.WorkerChatHandler; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -66,6 +69,8 @@ import javax.annotation.Nullable; import java.io.File; +import java.util.List; +import java.util.Set; public class IndexerWorkerContext implements WorkerContext { @@ -90,6 +95,7 @@ public class IndexerWorkerContext implements WorkerContext private final ServiceClientFactory clientFactory; private final MemoryIntrospector memoryIntrospector; private final ProcessingBuffersProvider processingBuffersProvider; + private final List inputSliceReaderProviders; private final int maxConcurrentStages; private final boolean liveReportCounters; private final boolean includeAllCounters; @@ -112,7 +118,8 @@ public IndexerWorkerContext( final ServiceClientFactory clientFactory, final MemoryIntrospector memoryIntrospector, final ProcessingBuffersProvider processingBuffersProvider, - final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory + final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory, + final List inputSliceReaderProviders ) { this.task = task; @@ -127,6 +134,7 @@ public IndexerWorkerContext( this.memoryIntrospector = memoryIntrospector; this.processingBuffersProvider = processingBuffersProvider; this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; + this.inputSliceReaderProviders = inputSliceReaderProviders; final QueryContext queryContext = QueryContext.of(task.getContext()); this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStagesWithDefault( @@ -171,6 +179,8 @@ public static IndexerWorkerContext createProductionInstance( injector.getInstance(OverlordClient.class).withRetryPolicy(StandardRetryPolicy.unlimited()); final ProcessingBuffersProvider processingBuffersProvider = injector.getInstance(ProcessingBuffersProvider.class); final ObjectMapper smileMapper = injector.getInstance(Key.get(ObjectMapper.class, Smile.class)); + final Set inputSliceReaderProviders = + injector.getInstance(Key.get(new TypeLiteral<>() {}, IndexingService.class)); return new IndexerWorkerContext( task, @@ -189,7 +199,8 @@ public static IndexerWorkerContext createProductionInstance( toolbox.getCoordinatorClient(), serviceClientFactory, smileMapper - ) + ), + List.copyOf(inputSliceReaderProviders) ); } @@ -228,6 +239,12 @@ public Injector injector() return injector; } + @Override + public List inputSliceReaderProviders() + { + return inputSliceReaderProviders; + } + @Override public void emitMetric(MSQMetricEventBuilder metricBuilder) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java new file mode 100644 index 000000000000..8dd71efe5934 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.input; + +import org.apache.druid.msq.exec.FrameContext; +import org.apache.druid.query.QueryContext; + +/** + * Worker-side extension point for {@link InputSlice}: provides an {@link InputSliceReader} for a particular + * {@link InputSlice} class. + */ +public interface InputSliceReaderProvider +{ + /** + * The {@link InputSlice} class handled by this provider. + */ + Class sliceClass(); + + /** + * Returns an {@link InputSliceReader} that accepts {@link #sliceClass()}. + */ + InputSliceReader createReader(FrameContext frameContext, QueryContext queryContext); +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java new file mode 100644 index 000000000000..3d0f0e774284 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.input; + +import org.apache.druid.msq.exec.ControllerContext; +import org.apache.druid.query.QueryContext; + +import java.util.List; + +/** + * Controller-side extension point for {@link InputSpec}: provides an {@link InputSpecSlicer} for a particular + * {@link InputSpec} class. + */ +public interface InputSpecSlicerProvider +{ + /** + * The {@link InputSpec} class handled by this provider. + */ + Class specClass(); + + /** + * Returns an {@link InputSpecSlicer} that accepts {@link #specClass()}. + */ + InputSpecSlicer createSlicer( + ControllerContext controllerContext, + QueryContext queryContext, + List workerIds + ); +} diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java index 3888368c1b86..1ab1b55024c5 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java @@ -106,8 +106,17 @@ public void tearDown() throws Exception @Test public void test_queryKernelConfig() { - final DartControllerContext controllerContext = - new DartControllerContext(null, null, SELF_NODE, null, memoryIntrospector, serverView, null, queryContext); + final DartControllerContext controllerContext = new DartControllerContext( + null, + null, + SELF_NODE, + null, + memoryIntrospector, + serverView, + List.of(), + null, + queryContext + ); final ControllerQueryKernelConfig queryKernelConfig = controllerContext.queryKernelConfig(querySpec); Assertions.assertFalse(queryKernelConfig.isFaultTolerant()); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 6f09c43ff028..9ce1649e61ed 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -51,7 +51,6 @@ import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.ControllerMemoryParameters; import org.apache.druid.msq.exec.MSQMetricEventBuilder; -import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerClient; import org.apache.druid.msq.exec.WorkerFailureListener; @@ -61,12 +60,12 @@ import org.apache.druid.msq.exec.WorkerRunRef; import org.apache.druid.msq.exec.WorkerStorageParameters; import org.apache.druid.msq.indexing.IndexerControllerContext; -import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicerProvider; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQWorkerTask; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig; -import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -416,13 +415,9 @@ public TaskLockType taskLockType() } @Override - public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) + public List inputSpecSlicerProviders() { - return new IndexerTableInputSpecSlicer( - coordinatorClient, - taskActionClient, - MultiStageQueryContext.getSegmentSources(queryContext, SegmentSource.NONE) - ); + return List.of(new IndexerTableInputSpecSlicerProvider(coordinatorClient)); } @Override diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java index c3eaf6e9c9eb..d7f48f2dc419 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java @@ -46,6 +46,7 @@ import org.apache.druid.msq.exec.WorkerImpl; import org.apache.druid.msq.exec.WorkerRunRef; import org.apache.druid.msq.exec.WorkerStorageParameters; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.query.QueryContext; @@ -53,6 +54,7 @@ import org.apache.druid.server.DruidNode; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; public class TestDartControllerContextFactoryImpl extends DartControllerContextFactoryImpl @@ -74,11 +76,22 @@ public TestDartControllerContextFactoryImpl( @EscalatedGlobal final ServiceClientFactory serviceClientFactory, final MemoryIntrospector memoryIntrospector, final TimelineServerView serverView, + @Dart final Set inputSpecSlicerProviders, final ServiceEmitter emitter, @Dart Map workerMap ) { - super(injector, jsonMapper, smileMapper, selfNode, serviceClientFactory, memoryIntrospector, serverView, emitter); + super( + injector, + jsonMapper, + smileMapper, + selfNode, + serviceClientFactory, + memoryIntrospector, + serverView, + inputSpecSlicerProviders, + emitter + ); this.workerMap = workerMap; } @@ -92,6 +105,7 @@ public ControllerContext newContext(QueryContext context) new DartTestWorkerClient(), memoryIntrospector, serverView, + inputSpecSlicerProviders, emitter, context ) From e8630f79e46a2184d8720aa0c0db81c270771021 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 18 May 2026 22:48:18 -0700 Subject: [PATCH 2/4] Bind CoordinatorClient in tests. --- .../apache/druid/msq/exec/MSQCompactionTaskRunTest.java | 2 ++ .../java/org/apache/druid/msq/exec/TestMSQSqlModule.java | 9 +++++++++ .../druid/msq/test/AbstractDartComponentSupplier.java | 3 --- .../test/java/org/apache/druid/msq/test/MSQTestBase.java | 5 ++++- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java index 787b980307f4..895de0319daa 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java @@ -30,6 +30,7 @@ import com.google.inject.Module; import com.google.inject.util.Modules; import com.google.inject.util.Providers; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.error.DruidException; @@ -237,6 +238,7 @@ public void setUpMSQ() new SegmentWranglerModule(), new LookylooModule(), new MSQIndexingModule(), + binder -> binder.bind(CoordinatorClient.class).toInstance(coordinatorClient), binder -> binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()), binder -> binder.bind(WireTransferableContext.class).toInstance(new WireTransferableContext(null, null, true)), binder -> binder.bind(DataSegmentPusher.class) diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java index 7b7f4cb93b11..fc7b499eb1ad 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java @@ -21,8 +21,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Provides; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.TestDruidModule; import org.apache.druid.msq.guice.MultiStageQuery; @@ -39,6 +42,12 @@ public class TestMSQSqlModule extends TestDruidModule { + @Override + public void configure(Binder binder) + { + binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class).in(LazySingleton.class); + } + @Provides @MultiStageQuery @LazySingleton diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java index dd0370008add..43558e8d517e 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java @@ -22,8 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Binder; import com.google.inject.Provides; -import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.guice.LazySingleton; @@ -126,7 +124,6 @@ final DruidNodeDiscoveryProvider getDiscoveryProvider() @Override public void configure(Binder binder) { - binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class); } } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 5a2ecbcf2432..10c3e8cbec31 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -38,6 +38,8 @@ import com.google.inject.util.Providers; import org.apache.calcite.avatica.remote.TypedValue; import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LongDimensionSchema; @@ -576,7 +578,8 @@ public String getFormatString() new SegmentWranglerModule(), new HllSketchModule(), binder -> binder.bind(Bouncer.class).toInstance(new Bouncer(1)), - binder -> binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()) + binder -> binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()), + binder -> binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class).in(LazySingleton.class) ); // adding node role injection to the modules, since CliPeon would also do that through run method injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build(), ImmutableSet.of(NodeRole.PEON)) From df8d5d22991957bf4439db63b17f0157794b15a3 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 19 May 2026 09:36:36 -0700 Subject: [PATCH 3/4] Use aboutAnHour retry policy. --- .../msq/indexing/IndexerTableInputSpecSlicerProvider.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java index 7cfc4763dac0..052d5c7691c2 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java @@ -29,6 +29,7 @@ import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; +import org.apache.druid.rpc.StandardRetryPolicy; import java.util.List; @@ -42,7 +43,10 @@ public class IndexerTableInputSpecSlicerProvider implements InputSpecSlicerProvi @Inject public IndexerTableInputSpecSlicerProvider(CoordinatorClient coordinatorClient) { - this.coordinatorClient = coordinatorClient; + // Use the "aboutAnHour" retry policy, same as the one used in the TaskToolboxFactory. This prevents + // long-running tasks from failing if there are Coordinator/Overlord problems. Calls will still fail + // eventually if problems persist. + this.coordinatorClient = coordinatorClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour()); } @Override From a2be09d681d87f1fb90313ca12af8c59ee16d31d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 19 May 2026 12:26:58 -0700 Subject: [PATCH 4/4] Fix tests. --- .../org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java | 7 ++++++- .../apache/druid/msq/test/MSQTestControllerContext.java | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java index 41956d556956..f7947676ed7d 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java @@ -31,6 +31,8 @@ import com.google.inject.util.Modules; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.frame.testutil.FrameTestUtil; import org.apache.druid.guice.ConfigModule; import org.apache.druid.guice.DruidGuiceExtensions; @@ -217,7 +219,10 @@ public void setUp() throws Exception new SegmentWranglerModule(), new LookylooModule(), new MSQIndexingModule(), - binder -> binder.bind(WireTransferableContext.class).toInstance(FrameTestUtil.WT_CONTEXT_LEGACY) + binder -> { + binder.bind(WireTransferableContext.class).toInstance(FrameTestUtil.WT_CONTEXT_LEGACY); + binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class); + } ); Injector injector = Guice.createInjector(defaultModule, BoundFieldModule.of(this)); DruidSecondaryModule.setupJackson(injector, objectMapper, Collections.emptyMap(), true); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 9ce1649e61ed..8cc6ed620d29 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -168,6 +168,7 @@ public MSQTestControllerContext( serviceEmitter, Mockito.mock(CoordinatorClient.class) ); + Mockito.when(coordinatorClient.withRetryPolicy(ArgumentMatchers.any())).thenReturn(coordinatorClient); Mockito.when(coordinatorClient.fetchServerViewSegments( ArgumentMatchers.anyString(), ArgumentMatchers.any()