diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java index 52936b8e0f87..ec748a74b7de 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java @@ -40,7 +40,7 @@ import org.apache.druid.msq.indexing.IndexerControllerContext; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; -import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -88,8 +88,9 @@ public class DartControllerContext implements ControllerContext private final DartWorkerClient workerClient; private final TimelineServerView serverView; private final MemoryIntrospector memoryIntrospector; - private final QueryContext context; + private final List inputSpecSlicerProviders; private final ServiceEmitter emitter; + private final QueryContext context; public DartControllerContext( final Injector injector, @@ -98,6 +99,7 @@ public DartControllerContext( final DartWorkerClient workerClient, final MemoryIntrospector memoryIntrospector, final TimelineServerView serverView, + final List inputSpecSlicerProviders, final ServiceEmitter emitter, final QueryContext context ) @@ -108,8 +110,9 @@ public DartControllerContext( this.workerClient = workerClient; this.serverView = serverView; this.memoryIntrospector = memoryIntrospector; - this.context = context; + this.inputSpecSlicerProviders = inputSpecSlicerProviders; this.emitter = emitter; + this.context = context; } @Override @@ -190,9 +193,9 @@ public DruidNode selfNode() } @Override - public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) + public List inputSpecSlicerProviders() { - return DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), serverView, context); + return inputSpecSlicerProviders; } @Override @@ -260,4 +263,12 @@ public boolean isDebug() { return context.isDebug(); } + + /** + * Getter for {@link DartTableInputSpecSlicerProvider} to retrieve the server view. + */ + TimelineServerView serverView() + { + return serverView; + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java index 87582d977aa2..260d6156c8c4 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java @@ -28,14 +28,19 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.msq.dart.Dart; import org.apache.druid.msq.dart.worker.DartWorkerClientImpl; import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.MemoryIntrospector; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContexts; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.server.DruidNode; +import java.util.List; +import java.util.Set; + public class DartControllerContextFactoryImpl implements DartControllerContextFactory { protected final Injector injector; @@ -45,6 +50,7 @@ public class DartControllerContextFactoryImpl implements DartControllerContextFa protected final ServiceClientFactory serviceClientFactory; protected final TimelineServerView serverView; protected final MemoryIntrospector memoryIntrospector; + protected final List inputSpecSlicerProviders; protected final ServiceEmitter emitter; @Inject @@ -56,6 +62,7 @@ public DartControllerContextFactoryImpl( @EscalatedGlobal final ServiceClientFactory serviceClientFactory, final MemoryIntrospector memoryIntrospector, final TimelineServerView serverView, + @Dart final Set inputSpecSlicerProviders, final ServiceEmitter emitter ) { @@ -66,6 +73,7 @@ public DartControllerContextFactoryImpl( this.serviceClientFactory = serviceClientFactory; this.serverView = serverView; this.memoryIntrospector = memoryIntrospector; + this.inputSpecSlicerProviders = List.copyOf(inputSpecSlicerProviders); this.emitter = emitter; } @@ -80,6 +88,7 @@ public ControllerContext newContext(final QueryContext context) new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()), memoryIntrospector, serverView, + inputSpecSlicerProviders, emitter, context ); diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java new file mode 100644 index 000000000000..13dcb49e519e --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.controller; + +import org.apache.druid.msq.exec.ControllerContext; +import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; +import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.query.QueryContext; + +import java.util.List; + +/** + * Controller-side provider for {@link TableInputSpec} in Dart. + */ +public class DartTableInputSpecSlicerProvider implements InputSpecSlicerProvider +{ + @Override + public Class specClass() + { + return TableInputSpec.class; + } + + @Override + public InputSpecSlicer createSlicer( + ControllerContext controllerContext, + QueryContext queryContext, + List workerIds + ) + { + return DartTableInputSpecSlicer.createFromWorkerIds( + workerIds, + ((DartControllerContext) controllerContext).serverView(), + queryContext + ); + } +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java index 8a9b1cf36073..0937888b0a9b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java @@ -47,11 +47,13 @@ import org.apache.druid.msq.dart.controller.DartControllerRegistry; import org.apache.druid.msq.dart.controller.DartMessageRelayFactoryImpl; import org.apache.druid.msq.dart.controller.DartMessageRelays; +import org.apache.druid.msq.dart.controller.DartTableInputSpecSlicerProvider; import org.apache.druid.msq.dart.controller.http.DartQueryInfo; import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactory; import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactoryImpl; import org.apache.druid.msq.dart.controller.sql.DartSqlClients; import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; +import org.apache.druid.msq.guice.MSQBinders; import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.query.DefaultQueryConfig; import org.apache.druid.query.QueryConfigProvider; @@ -111,6 +113,10 @@ public void configure(Binder binder) .addBinding() .to(DartSqlEngine.class) .in(LazySingleton.class); + MSQBinders.inputSpecSlicerProviderBinder(binder, Dart.class) + .addBinding() + .to(DartTableInputSpecSlicerProvider.class) + .in(LazySingleton.class); } @Provides diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java index e98d0ec2195b..5ee9aa1f6ee1 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java @@ -53,11 +53,13 @@ import org.apache.druid.msq.dart.controller.messages.ControllerMessage; import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory; +import org.apache.druid.msq.dart.worker.DartSegmentsInputSliceReaderProvider; import org.apache.druid.msq.dart.worker.DartWorkerContextFactory; import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl; import org.apache.druid.msq.dart.worker.DartWorkerRunner; import org.apache.druid.msq.dart.worker.http.DartWorkerResource; import org.apache.druid.msq.exec.MemoryIntrospector; +import org.apache.druid.msq.guice.MSQBinders; import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.rpc.ServiceClientFactory; @@ -104,6 +106,11 @@ public void configure(Binder binder) binder.bind(ResourcePermissionMapper.class) .annotatedWith(Dart.class) .to(DartResourcePermissionMapper.class); + + MSQBinders.inputSliceReaderProviderBinder(binder, Dart.class) + .addBinding() + .to(DartSegmentsInputSliceReaderProvider.class) + .in(LazySingleton.class); } @Provides diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java new file mode 100644 index 000000000000..7421e87f0a30 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.worker; + +import org.apache.druid.msq.exec.FrameContext; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.InputSliceReaderProvider; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.input.table.SegmentsInputSliceReader; +import org.apache.druid.query.QueryContext; + +/** + * Worker-side provider for {@link SegmentsInputSlice} in Dart. + */ +public class DartSegmentsInputSliceReaderProvider implements InputSliceReaderProvider +{ + @Override + public Class sliceClass() + { + return SegmentsInputSlice.class; + } + + @Override + public InputSliceReader createReader(FrameContext frameContext, QueryContext queryContext) + { + return new SegmentsInputSliceReader(frameContext, false); + } +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java index 768801261d1b..39888b757177 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java @@ -43,12 +43,12 @@ import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerStorageParameters; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContexts; -import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.policy.PolicyEnforcer; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.server.DruidNode; @@ -57,6 +57,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import java.io.File; +import java.util.List; /** * Dart implementation of {@link WorkerContext}. @@ -79,7 +80,6 @@ public class DartWorkerContext implements WorkerContext private final Injector injector; private final DartWorkerClient workerClient; private final SegmentWrangler segmentWrangler; - private final GroupingEngine groupingEngine; private final SegmentManager segmentManager; private final CoordinatorClient coordinatorClient; private final MemoryIntrospector memoryIntrospector; @@ -96,6 +96,7 @@ public class DartWorkerContext implements WorkerContext @MonotonicNonNull private volatile ResourceHolder processingBuffersSet; private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory; + private final List inputSliceReaderProviders; DartWorkerContext( final String queryId, @@ -107,7 +108,6 @@ public class DartWorkerContext implements WorkerContext final DartWorkerClient workerClient, final DruidProcessingConfig processingConfig, final SegmentWrangler segmentWrangler, - final GroupingEngine groupingEngine, final SegmentManager segmentManager, final CoordinatorClient coordinatorClient, final MemoryIntrospector memoryIntrospector, @@ -116,7 +116,8 @@ public class DartWorkerContext implements WorkerContext final File tempDir, final QueryContext queryContext, final DataServerQueryHandlerFactory dataServerQueryHandlerFactory, - final ServiceEmitter emitter + final ServiceEmitter emitter, + final List inputSliceReaderProviders ) { this.queryId = queryId; @@ -129,7 +130,6 @@ public class DartWorkerContext implements WorkerContext this.injector = injector; this.workerClient = workerClient; this.segmentWrangler = segmentWrangler; - this.groupingEngine = groupingEngine; this.segmentManager = segmentManager; this.coordinatorClient = coordinatorClient; this.memoryIntrospector = memoryIntrospector; @@ -138,6 +138,7 @@ public class DartWorkerContext implements WorkerContext this.tempDir = tempDir; this.queryContext = Preconditions.checkNotNull(queryContext, "queryContext"); this.emitter = emitter; + this.inputSliceReaderProviders = inputSliceReaderProviders; // Compute thread count once in constructor final int baseThreadCount = processingConfig.getNumThreads(); @@ -175,6 +176,12 @@ public Injector injector() return injector; } + @Override + public List inputSliceReaderProviders() + { + return inputSliceReaderProviders; + } + @Override public void registerWorker(Worker worker, Closer closer) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java index 07117c80f19e..dc74cea3513f 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java @@ -34,6 +34,7 @@ import org.apache.druid.msq.exec.MemoryIntrospector; import org.apache.druid.msq.exec.ProcessingBuffersProvider; import org.apache.druid.msq.exec.WorkerContext; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContext; import org.apache.druid.query.groupby.GroupingEngine; @@ -44,6 +45,8 @@ import org.apache.druid.server.SegmentManager; import java.io.File; +import java.util.List; +import java.util.Set; /** * Production implementation of {@link DartWorkerContextFactory}. @@ -66,6 +69,7 @@ public class DartWorkerContextFactoryImpl implements DartWorkerContextFactory private final Outbox outbox; private final DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory; private final ServiceEmitter emitter; + private final List inputSliceReaderProviders; @Inject public DartWorkerContextFactoryImpl( @@ -84,7 +88,8 @@ public DartWorkerContextFactoryImpl( @Dart ProcessingBuffersProvider processingBuffersProvider, Outbox outbox, DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory, - ServiceEmitter emitter + ServiceEmitter emitter, + @Dart Set inputSliceReaderProviders ) { this.selfNode = selfNode; @@ -103,6 +108,7 @@ public DartWorkerContextFactoryImpl( this.outbox = outbox; this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; this.emitter = emitter; + this.inputSliceReaderProviders = List.copyOf(inputSliceReaderProviders); } @Override @@ -123,7 +129,6 @@ public WorkerContext build( createWorkerClient(queryId), processingConfig, segmentWrangler, - groupingEngine, segmentManager, coordinatorClient, memoryIntrospector, @@ -132,7 +137,8 @@ public WorkerContext build( tempDir, queryContext, dataServerQueryHandlerFactory, - emitter + emitter, + inputSliceReaderProviders ); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index e12c5c3b3362..013df33fad10 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -26,14 +26,14 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.msq.indexing.MSQSpec; -import org.apache.druid.msq.input.InputSpecSlicer; -import org.apache.druid.msq.input.table.SegmentsInputSlice; -import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.ShuffleSpec; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.server.DruidNode; import java.io.File; +import java.util.List; /** * Context used by multi-stage query controllers. Useful because it allows test fixtures to provide their own @@ -82,9 +82,10 @@ public interface ControllerContext DruidNode selfNode(); /** - * Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}. + * Extension point for {@link InputSpec} beyond the builtin ones provided by + * {@link ControllerImpl#makeInputSpecSlicerFactory}. */ - InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager); + List inputSpecSlicerProviders(); /** * Provide access to segment actions in the Overlord. Only called for ingestion queries, i.e., where diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 83568629be1d..db9b3cbc9fbf 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -139,6 +139,7 @@ import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.InputSpecSlicerFactory; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.input.MapInputSpecSlicer; import org.apache.druid.msq.input.external.ExternalInputSpec; import org.apache.druid.msq.input.external.ExternalInputSpecSlicer; @@ -148,7 +149,6 @@ import org.apache.druid.msq.input.lookup.LookupInputSpecSlicer; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.input.stage.StageInputSpecSlicer; -import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.kernel.StageId; @@ -208,6 +208,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -423,8 +424,11 @@ private MSQTaskReportPayload runInternal(final QueryListener queryListener, fina closer.register(workerSketchFetcher::close); // Execution-related: run the multi-stage QueryDefinition. - final InputSpecSlicerFactory inputSpecSlicerFactory = - makeInputSpecSlicerFactory(context.newTableInputSpecSlicer(workerManager)); + final InputSpecSlicerFactory inputSpecSlicerFactory = makeInputSpecSlicerFactory( + context, + workerManager.getWorkerIds(), + getQueryContext() + ); final Pair> queryRunResult = new RunQueryUntilDone( @@ -2149,17 +2153,30 @@ private static MSQStatusReport makeStatusReport( ); } - private static InputSpecSlicerFactory makeInputSpecSlicerFactory(final InputSpecSlicer tableInputSpecSlicer) + private static InputSpecSlicerFactory makeInputSpecSlicerFactory( + final ControllerContext controllerContext, + final List workerIds, + final QueryContext queryContext + ) { - return (stagePartitionsMap, stageOutputChannelModeMap) -> new MapInputSpecSlicer( - ImmutableMap., InputSpecSlicer>builder() - .put(StageInputSpec.class, new StageInputSpecSlicer(stagePartitionsMap, stageOutputChannelModeMap)) - .put(ExternalInputSpec.class, new ExternalInputSpecSlicer()) - .put(InlineInputSpec.class, new InlineInputSpecSlicer()) - .put(LookupInputSpec.class, new LookupInputSpecSlicer()) - .put(TableInputSpec.class, tableInputSpecSlicer) - .build() - ); + return (stagePartitionsMap, stageOutputChannelModeMap) -> { + Map, InputSpecSlicer> slicers = new LinkedHashMap<>(); + + slicers.put(StageInputSpec.class, new StageInputSpecSlicer(stagePartitionsMap, stageOutputChannelModeMap)); + slicers.put(ExternalInputSpec.class, new ExternalInputSpecSlicer()); + slicers.put(InlineInputSpec.class, new InlineInputSpecSlicer()); + slicers.put(LookupInputSpec.class, new LookupInputSpecSlicer()); + + // Context-supplied providers override the default ones, so they get added last. + for (final InputSpecSlicerProvider slicerProvider : controllerContext.inputSpecSlicerProviders()) { + slicers.put( + slicerProvider.specClass(), + slicerProvider.createSlicer(controllerContext, queryContext, workerIds) + ); + } + + return new MapInputSpecSlicer(slicers); + }; } private static Map copyOfStageRuntimesEndingAtCurrentTime( diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index ba613768781c..20593f266219 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -21,7 +21,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -48,6 +47,7 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.msq.input.MapInputSliceReader; import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.msq.input.NilInputSliceReader; @@ -71,6 +71,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -399,16 +400,21 @@ private ExecutionContext makeExecutionContext() private InputSliceReader makeInputSliceReader() { final boolean reindex = MultiStageQueryContext.isReindex(workOrder.getWorkerContext()); - return new MapInputSliceReader( - ImmutableMap., InputSliceReader>builder() - .put(NilInputSlice.class, NilInputSliceReader.INSTANCE) - .put(StageInputSlice.class, StageInputSliceReader.INSTANCE) - .put(ExternalInputSlice.class, new ExternalInputSliceReader(frameContext.tempDir("external"))) - .put(InlineInputSlice.class, new InlineInputSliceReader(frameContext.segmentWrangler())) - .put(LookupInputSlice.class, new LookupInputSliceReader(frameContext.segmentWrangler())) - .put(SegmentsInputSlice.class, new SegmentsInputSliceReader(frameContext, reindex)) - .build() - ); + LinkedHashMap, InputSliceReader> readers = new LinkedHashMap<>(); + readers.put(NilInputSlice.class, NilInputSliceReader.INSTANCE); + readers.put(StageInputSlice.class, StageInputSliceReader.INSTANCE); + readers.put(ExternalInputSlice.class, new ExternalInputSliceReader(frameContext.tempDir("external"))); + readers.put(InlineInputSlice.class, new InlineInputSliceReader(frameContext.segmentWrangler())); + readers.put(LookupInputSlice.class, new LookupInputSliceReader(frameContext.segmentWrangler())); + readers.put(SegmentsInputSlice.class, new SegmentsInputSliceReader(frameContext, reindex)); + + // Context-supplied providers override the default ones, so they get added last. + for (final InputSliceReaderProvider readerProvider : workerContext.inputSliceReaderProviders()) { + final InputSliceReader reader = readerProvider.createReader(frameContext, workOrder.getWorkerContext()); + readers.put(readerProvider.sliceClass(), reader); + } + + return new MapInputSliceReader(readers); } private OutputChannelFactory makeStageOutputChannelFactory() diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java index 9e696a16cce6..b225112236d8 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.msq.indexing.MSQWorkerTask; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.policy.PolicyEnforcer; @@ -31,6 +33,7 @@ import java.io.Closeable; import java.io.File; +import java.util.List; /** * Context used by multi-stage query workers. @@ -121,6 +124,15 @@ public interface WorkerContext extends Closeable */ boolean isDebug(); + /** + * Extension point for additional {@link InputSlice} beyond those provided by + * {@link RunWorkOrder#makeInputSliceReader()}. + */ + default List inputSliceReaderProviders() + { + return List.of(); + } + @Override void close(); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java index 0a6b0827324a..e3cd3e4028e2 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java @@ -20,11 +20,20 @@ package org.apache.druid.msq.guice; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; +import com.google.inject.multibindings.Multibinder; +import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.msq.dart.Dart; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReaderProvider; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.querykit.QueryKit; import org.apache.druid.query.Query; +import java.lang.annotation.Annotation; + /** * Utility class for MSQ-related Guice bindings. */ @@ -50,4 +59,29 @@ public static MapBinder, QueryKit> queryKitBinder(Binder new TypeLiteral<>() {} ); } + + /** + * Bind an {@link InputSpecSlicerProvider} for use on a controller. The annotation should be + * {@link IndexingService} for providers used by tasks, or {@link Dart} for providers used by Dart. + */ + public static Multibinder inputSpecSlicerProviderBinder( + Binder binder, + Class annotation + ) + { + return Multibinder.newSetBinder(binder, Key.get(InputSpecSlicerProvider.class, annotation)); + } + + /** + * Bind an {@link InputSliceReaderProvider} for use on a worker, to handle a particular {@link InputSlice}. + * The annotation should be {@link IndexingService} for providers used by tasks, or {@link Dart} for providers + * used by Dart. + */ + public static Multibinder inputSliceReaderProviderBinder( + Binder binder, + Class annotation + ) + { + return Multibinder.newSetBinder(binder, Key.get(InputSliceReaderProvider.class, annotation)); + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java index 629754f2a9a1..8781fa5bb0f4 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java @@ -27,6 +27,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Provides; +import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.initialization.DruidModule; @@ -40,6 +41,8 @@ import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter; import org.apache.druid.msq.counters.WarningCounters; import org.apache.druid.msq.indexing.IndexerControllerContextFactory; +import org.apache.druid.msq.indexing.IndexerSegmentsInputSliceReaderProvider; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicerProvider; import org.apache.druid.msq.indexing.MSQCompactionRunner; import org.apache.druid.msq.indexing.MSQControllerTask; import org.apache.druid.msq.indexing.MSQWorkerTask; @@ -250,6 +253,16 @@ public void configure(Binder binder) .addBinding(WindowOperatorQuery.class) .to(WindowOperatorQueryKit.class); binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class); + + MSQBinders.inputSpecSlicerProviderBinder(binder, IndexingService.class) + .addBinding() + .to(IndexerTableInputSpecSlicerProvider.class) + .in(LazySingleton.class); + + MSQBinders.inputSliceReaderProviderBinder(binder, IndexingService.class) + .addBinding() + .to(IndexerSegmentsInputSliceReaderProvider.class) + .in(LazySingleton.class); } @Provides diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 18aebe0bcd21..5ea69d8c6a0a 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.TypeLiteral; +import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.frame.FrameType; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.common.TaskLockType; @@ -48,7 +50,7 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.indexing.error.UnknownFault; -import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -66,7 +68,9 @@ import java.io.File; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -90,6 +94,7 @@ public class IndexerControllerContext implements ControllerContext private final ServiceClientFactory clientFactory; private final OverlordClient overlordClient; private final MemoryIntrospector memoryIntrospector; + private final List inputSpecSlicerProviders; public IndexerControllerContext( final MSQControllerTask task, @@ -110,6 +115,9 @@ public IndexerControllerContext( this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class); final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)); final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir()); + final Set inputSpecSlicerProviders = + injector.getInstance(Key.get(new TypeLiteral<>() {}, IndexingService.class)); + this.inputSpecSlicerProviders = List.copyOf(inputSpecSlicerProviders); this.injector = injector.createChildInjector( binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) .toInstance(storageConnector)); @@ -174,15 +182,9 @@ public DruidNode selfNode() } @Override - public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager workerManager) + public List inputSpecSlicerProviders() { - final SegmentSource includeSegmentSource = - MultiStageQueryContext.getSegmentSources(taskQuerySpecContext, DEFAULT_SEGMENT_SOURCE); - return new IndexerTableInputSpecSlicer( - toolbox.getCoordinatorClient(), - toolbox.getTaskActionClient(), - includeSegmentSource - ); + return inputSpecSlicerProviders; } @Override diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java new file mode 100644 index 000000000000..d27a7852cac2 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import org.apache.druid.msq.exec.FrameContext; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.InputSliceReaderProvider; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.input.table.SegmentsInputSliceReader; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; + +/** + * Worker-side provider for {@link SegmentsInputSlice} in tasks. + */ +public class IndexerSegmentsInputSliceReaderProvider implements InputSliceReaderProvider +{ + @Override + public Class sliceClass() + { + return SegmentsInputSlice.class; + } + + @Override + public InputSliceReader createReader(FrameContext frameContext, QueryContext queryContext) + { + return new SegmentsInputSliceReader(frameContext, MultiStageQueryContext.isReindex(queryContext)); + } +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java new file mode 100644 index 000000000000..052d5c7691c2 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.google.inject.Inject; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.msq.exec.ControllerContext; +import org.apache.druid.msq.exec.SegmentSource; +import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; +import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; +import org.apache.druid.rpc.StandardRetryPolicy; + +import java.util.List; + +/** + * Controller-side provider for {@link TableInputSpec} in tasks. + */ +public class IndexerTableInputSpecSlicerProvider implements InputSpecSlicerProvider +{ + private final CoordinatorClient coordinatorClient; + + @Inject + public IndexerTableInputSpecSlicerProvider(CoordinatorClient coordinatorClient) + { + // Use the "aboutAnHour" retry policy, same as the one used in the TaskToolboxFactory. This prevents + // long-running tasks from failing if there are Coordinator/Overlord problems. Calls will still fail + // eventually if problems persist. + this.coordinatorClient = coordinatorClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour()); + } + + @Override + public Class specClass() + { + return TableInputSpec.class; + } + + @Override + public InputSpecSlicer createSlicer( + ControllerContext controllerContext, + QueryContext queryContext, + List workerIds + ) + { + final SegmentSource includeSegmentSource = + MultiStageQueryContext.getSegmentSources(queryContext, IndexerControllerContext.DEFAULT_SEGMENT_SOURCE); + return new IndexerTableInputSpecSlicer( + coordinatorClient, + controllerContext.taskActionClient(), + includeSegmentSource + ); + } +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 2589d0c0dab7..82d462fd096a 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.TypeLiteral; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Smile; @@ -46,6 +48,7 @@ import org.apache.druid.msq.indexing.client.IndexerControllerClient; import org.apache.druid.msq.indexing.client.IndexerWorkerClient; import org.apache.druid.msq.indexing.client.WorkerChatHandler; +import org.apache.druid.msq.input.InputSliceReaderProvider; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -66,6 +69,8 @@ import javax.annotation.Nullable; import java.io.File; +import java.util.List; +import java.util.Set; public class IndexerWorkerContext implements WorkerContext { @@ -90,6 +95,7 @@ public class IndexerWorkerContext implements WorkerContext private final ServiceClientFactory clientFactory; private final MemoryIntrospector memoryIntrospector; private final ProcessingBuffersProvider processingBuffersProvider; + private final List inputSliceReaderProviders; private final int maxConcurrentStages; private final boolean liveReportCounters; private final boolean includeAllCounters; @@ -112,7 +118,8 @@ public IndexerWorkerContext( final ServiceClientFactory clientFactory, final MemoryIntrospector memoryIntrospector, final ProcessingBuffersProvider processingBuffersProvider, - final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory + final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory, + final List inputSliceReaderProviders ) { this.task = task; @@ -127,6 +134,7 @@ public IndexerWorkerContext( this.memoryIntrospector = memoryIntrospector; this.processingBuffersProvider = processingBuffersProvider; this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; + this.inputSliceReaderProviders = inputSliceReaderProviders; final QueryContext queryContext = QueryContext.of(task.getContext()); this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStagesWithDefault( @@ -171,6 +179,8 @@ public static IndexerWorkerContext createProductionInstance( injector.getInstance(OverlordClient.class).withRetryPolicy(StandardRetryPolicy.unlimited()); final ProcessingBuffersProvider processingBuffersProvider = injector.getInstance(ProcessingBuffersProvider.class); final ObjectMapper smileMapper = injector.getInstance(Key.get(ObjectMapper.class, Smile.class)); + final Set inputSliceReaderProviders = + injector.getInstance(Key.get(new TypeLiteral<>() {}, IndexingService.class)); return new IndexerWorkerContext( task, @@ -189,7 +199,8 @@ public static IndexerWorkerContext createProductionInstance( toolbox.getCoordinatorClient(), serviceClientFactory, smileMapper - ) + ), + List.copyOf(inputSliceReaderProviders) ); } @@ -228,6 +239,12 @@ public Injector injector() return injector; } + @Override + public List inputSliceReaderProviders() + { + return inputSliceReaderProviders; + } + @Override public void emitMetric(MSQMetricEventBuilder metricBuilder) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java new file mode 100644 index 000000000000..8dd71efe5934 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.input; + +import org.apache.druid.msq.exec.FrameContext; +import org.apache.druid.query.QueryContext; + +/** + * Worker-side extension point for {@link InputSlice}: provides an {@link InputSliceReader} for a particular + * {@link InputSlice} class. + */ +public interface InputSliceReaderProvider +{ + /** + * The {@link InputSlice} class handled by this provider. + */ + Class sliceClass(); + + /** + * Returns an {@link InputSliceReader} that accepts {@link #sliceClass()}. + */ + InputSliceReader createReader(FrameContext frameContext, QueryContext queryContext); +} diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java new file mode 100644 index 000000000000..3d0f0e774284 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.input; + +import org.apache.druid.msq.exec.ControllerContext; +import org.apache.druid.query.QueryContext; + +import java.util.List; + +/** + * Controller-side extension point for {@link InputSpec}: provides an {@link InputSpecSlicer} for a particular + * {@link InputSpec} class. + */ +public interface InputSpecSlicerProvider +{ + /** + * The {@link InputSpec} class handled by this provider. + */ + Class specClass(); + + /** + * Returns an {@link InputSpecSlicer} that accepts {@link #specClass()}. + */ + InputSpecSlicer createSlicer( + ControllerContext controllerContext, + QueryContext queryContext, + List workerIds + ); +} diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java index 3888368c1b86..1ab1b55024c5 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java @@ -106,8 +106,17 @@ public void tearDown() throws Exception @Test public void test_queryKernelConfig() { - final DartControllerContext controllerContext = - new DartControllerContext(null, null, SELF_NODE, null, memoryIntrospector, serverView, null, queryContext); + final DartControllerContext controllerContext = new DartControllerContext( + null, + null, + SELF_NODE, + null, + memoryIntrospector, + serverView, + List.of(), + null, + queryContext + ); final ControllerQueryKernelConfig queryKernelConfig = controllerContext.queryKernelConfig(querySpec); Assertions.assertFalse(queryKernelConfig.isFaultTolerant()); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java index 787b980307f4..895de0319daa 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java @@ -30,6 +30,7 @@ import com.google.inject.Module; import com.google.inject.util.Modules; import com.google.inject.util.Providers; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.error.DruidException; @@ -237,6 +238,7 @@ public void setUpMSQ() new SegmentWranglerModule(), new LookylooModule(), new MSQIndexingModule(), + binder -> binder.bind(CoordinatorClient.class).toInstance(coordinatorClient), binder -> binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()), binder -> binder.bind(WireTransferableContext.class).toInstance(new WireTransferableContext(null, null, true)), binder -> binder.bind(DataSegmentPusher.class) diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java index 7b7f4cb93b11..fc7b499eb1ad 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java @@ -21,8 +21,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Provides; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.TestDruidModule; import org.apache.druid.msq.guice.MultiStageQuery; @@ -39,6 +42,12 @@ public class TestMSQSqlModule extends TestDruidModule { + @Override + public void configure(Binder binder) + { + binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class).in(LazySingleton.class); + } + @Provides @MultiStageQuery @LazySingleton diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java index 41956d556956..f7947676ed7d 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java @@ -31,6 +31,8 @@ import com.google.inject.util.Modules; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.frame.testutil.FrameTestUtil; import org.apache.druid.guice.ConfigModule; import org.apache.druid.guice.DruidGuiceExtensions; @@ -217,7 +219,10 @@ public void setUp() throws Exception new SegmentWranglerModule(), new LookylooModule(), new MSQIndexingModule(), - binder -> binder.bind(WireTransferableContext.class).toInstance(FrameTestUtil.WT_CONTEXT_LEGACY) + binder -> { + binder.bind(WireTransferableContext.class).toInstance(FrameTestUtil.WT_CONTEXT_LEGACY); + binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class); + } ); Injector injector = Guice.createInjector(defaultModule, BoundFieldModule.of(this)); DruidSecondaryModule.setupJackson(injector, objectMapper, Collections.emptyMap(), true); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java index dd0370008add..43558e8d517e 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java @@ -22,8 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Binder; import com.google.inject.Provides; -import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.guice.LazySingleton; @@ -126,7 +124,6 @@ final DruidNodeDiscoveryProvider getDiscoveryProvider() @Override public void configure(Binder binder) { - binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class); } } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 5a2ecbcf2432..10c3e8cbec31 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -38,6 +38,8 @@ import com.google.inject.util.Providers; import org.apache.calcite.avatica.remote.TypedValue; import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LongDimensionSchema; @@ -576,7 +578,8 @@ public String getFormatString() new SegmentWranglerModule(), new HllSketchModule(), binder -> binder.bind(Bouncer.class).toInstance(new Bouncer(1)), - binder -> binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()) + binder -> binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()), + binder -> binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class).in(LazySingleton.class) ); // adding node role injection to the modules, since CliPeon would also do that through run method injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build(), ImmutableSet.of(NodeRole.PEON)) diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 6f09c43ff028..8cc6ed620d29 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -51,7 +51,6 @@ import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.ControllerMemoryParameters; import org.apache.druid.msq.exec.MSQMetricEventBuilder; -import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerClient; import org.apache.druid.msq.exec.WorkerFailureListener; @@ -61,12 +60,12 @@ import org.apache.druid.msq.exec.WorkerRunRef; import org.apache.druid.msq.exec.WorkerStorageParameters; import org.apache.druid.msq.indexing.IndexerControllerContext; -import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicerProvider; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQWorkerTask; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig; -import org.apache.druid.msq.input.InputSpecSlicer; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -169,6 +168,7 @@ public MSQTestControllerContext( serviceEmitter, Mockito.mock(CoordinatorClient.class) ); + Mockito.when(coordinatorClient.withRetryPolicy(ArgumentMatchers.any())).thenReturn(coordinatorClient); Mockito.when(coordinatorClient.fetchServerViewSegments( ArgumentMatchers.anyString(), ArgumentMatchers.any() @@ -416,13 +416,9 @@ public TaskLockType taskLockType() } @Override - public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) + public List inputSpecSlicerProviders() { - return new IndexerTableInputSpecSlicer( - coordinatorClient, - taskActionClient, - MultiStageQueryContext.getSegmentSources(queryContext, SegmentSource.NONE) - ); + return List.of(new IndexerTableInputSpecSlicerProvider(coordinatorClient)); } @Override diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java index c3eaf6e9c9eb..d7f48f2dc419 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java @@ -46,6 +46,7 @@ import org.apache.druid.msq.exec.WorkerImpl; import org.apache.druid.msq.exec.WorkerRunRef; import org.apache.druid.msq.exec.WorkerStorageParameters; +import org.apache.druid.msq.input.InputSpecSlicerProvider; import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.query.QueryContext; @@ -53,6 +54,7 @@ import org.apache.druid.server.DruidNode; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; public class TestDartControllerContextFactoryImpl extends DartControllerContextFactoryImpl @@ -74,11 +76,22 @@ public TestDartControllerContextFactoryImpl( @EscalatedGlobal final ServiceClientFactory serviceClientFactory, final MemoryIntrospector memoryIntrospector, final TimelineServerView serverView, + @Dart final Set inputSpecSlicerProviders, final ServiceEmitter emitter, @Dart Map workerMap ) { - super(injector, jsonMapper, smileMapper, selfNode, serviceClientFactory, memoryIntrospector, serverView, emitter); + super( + injector, + jsonMapper, + smileMapper, + selfNode, + serviceClientFactory, + memoryIntrospector, + serverView, + inputSpecSlicerProviders, + emitter + ); this.workerMap = workerMap; } @@ -92,6 +105,7 @@ public ControllerContext newContext(QueryContext context) new DartTestWorkerClient(), memoryIntrospector, serverView, + inputSpecSlicerProviders, emitter, context )