Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.druid.msq.indexing.IndexerControllerContext;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
Expand Down Expand Up @@ -88,8 +88,9 @@ public class DartControllerContext implements ControllerContext
private final DartWorkerClient workerClient;
private final TimelineServerView serverView;
private final MemoryIntrospector memoryIntrospector;
private final QueryContext context;
private final List<InputSpecSlicerProvider> inputSpecSlicerProviders;
private final ServiceEmitter emitter;
private final QueryContext context;

public DartControllerContext(
final Injector injector,
Expand All @@ -98,6 +99,7 @@ public DartControllerContext(
final DartWorkerClient workerClient,
final MemoryIntrospector memoryIntrospector,
final TimelineServerView serverView,
final List<InputSpecSlicerProvider> inputSpecSlicerProviders,
final ServiceEmitter emitter,
final QueryContext context
)
Expand All @@ -108,8 +110,9 @@ public DartControllerContext(
this.workerClient = workerClient;
this.serverView = serverView;
this.memoryIntrospector = memoryIntrospector;
this.context = context;
this.inputSpecSlicerProviders = inputSpecSlicerProviders;
this.emitter = emitter;
this.context = context;
}

@Override
Expand Down Expand Up @@ -190,9 +193,9 @@ public DruidNode selfNode()
}

@Override
public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager)
public List<InputSpecSlicerProvider> inputSpecSlicerProviders()
{
return DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), serverView, context);
return inputSpecSlicerProviders;
}

@Override
Expand Down Expand Up @@ -260,4 +263,12 @@ public boolean isDebug()
{
return context.isDebug();
}

/**
* Getter for {@link DartTableInputSpecSlicerProvider} to retrieve the server view.
*/
TimelineServerView serverView()
{
return serverView;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.worker.DartWorkerClientImpl;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;

import java.util.List;
import java.util.Set;

public class DartControllerContextFactoryImpl implements DartControllerContextFactory
{
protected final Injector injector;
Expand All @@ -45,6 +50,7 @@ public class DartControllerContextFactoryImpl implements DartControllerContextFa
protected final ServiceClientFactory serviceClientFactory;
protected final TimelineServerView serverView;
protected final MemoryIntrospector memoryIntrospector;
protected final List<InputSpecSlicerProvider> inputSpecSlicerProviders;
protected final ServiceEmitter emitter;

@Inject
Expand All @@ -56,6 +62,7 @@ public DartControllerContextFactoryImpl(
@EscalatedGlobal final ServiceClientFactory serviceClientFactory,
final MemoryIntrospector memoryIntrospector,
final TimelineServerView serverView,
@Dart final Set<InputSpecSlicerProvider> inputSpecSlicerProviders,
final ServiceEmitter emitter
)
{
Expand All @@ -66,6 +73,7 @@ public DartControllerContextFactoryImpl(
this.serviceClientFactory = serviceClientFactory;
this.serverView = serverView;
this.memoryIntrospector = memoryIntrospector;
this.inputSpecSlicerProviders = List.copyOf(inputSpecSlicerProviders);
this.emitter = emitter;
}

Expand All @@ -80,6 +88,7 @@ public ControllerContext newContext(final QueryContext context)
new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()),
memoryIntrospector,
serverView,
inputSpecSlicerProviders,
emitter,
context
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.dart.controller;

import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.query.QueryContext;

import java.util.List;

/**
* Controller-side provider for {@link TableInputSpec} in Dart.
*/
public class DartTableInputSpecSlicerProvider implements InputSpecSlicerProvider
{
@Override
public Class<? extends InputSpec> specClass()
{
return TableInputSpec.class;
}

@Override
public InputSpecSlicer createSlicer(
ControllerContext controllerContext,
QueryContext queryContext,
List<String> workerIds
)
{
return DartTableInputSpecSlicer.createFromWorkerIds(
workerIds,
((DartControllerContext) controllerContext).serverView(),
queryContext
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.controller.DartMessageRelayFactoryImpl;
import org.apache.druid.msq.dart.controller.DartMessageRelays;
import org.apache.druid.msq.dart.controller.DartTableInputSpecSlicerProvider;
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactory;
import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactoryImpl;
import org.apache.druid.msq.dart.controller.sql.DartSqlClients;
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.guice.MSQBinders;
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryConfigProvider;
Expand Down Expand Up @@ -111,6 +113,10 @@ public void configure(Binder binder)
.addBinding()
.to(DartSqlEngine.class)
.in(LazySingleton.class);
MSQBinders.inputSpecSlicerProviderBinder(binder, Dart.class)
.addBinding()
.to(DartTableInputSpecSlicerProvider.class)
.in(LazySingleton.class);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory;
import org.apache.druid.msq.dart.worker.DartSegmentsInputSliceReaderProvider;
import org.apache.druid.msq.dart.worker.DartWorkerContextFactory;
import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl;
import org.apache.druid.msq.dart.worker.DartWorkerRunner;
import org.apache.druid.msq.dart.worker.http.DartWorkerResource;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.guice.MSQBinders;
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.rpc.ServiceClientFactory;
Expand Down Expand Up @@ -104,6 +106,11 @@ public void configure(Binder binder)
binder.bind(ResourcePermissionMapper.class)
.annotatedWith(Dart.class)
.to(DartResourcePermissionMapper.class);

MSQBinders.inputSliceReaderProviderBinder(binder, Dart.class)
.addBinding()
.to(DartSegmentsInputSliceReaderProvider.class)
.in(LazySingleton.class);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.dart.worker;

import org.apache.druid.msq.exec.FrameContext;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.InputSliceReaderProvider;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
import org.apache.druid.query.QueryContext;

/**
* Worker-side provider for {@link SegmentsInputSlice} in Dart.
*/
public class DartSegmentsInputSliceReaderProvider implements InputSliceReaderProvider
{
@Override
public Class<? extends InputSlice> sliceClass()
{
return SegmentsInputSlice.class;
}

@Override
public InputSliceReader createReader(FrameContext frameContext, QueryContext queryContext)
{
return new SegmentsInputSliceReader(frameContext, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@
import org.apache.druid.msq.exec.WorkerContext;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.input.InputSliceReaderProvider;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.server.DruidNode;
Expand All @@ -57,6 +57,7 @@
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

import java.io.File;
import java.util.List;

/**
* Dart implementation of {@link WorkerContext}.
Expand All @@ -79,7 +80,6 @@ public class DartWorkerContext implements WorkerContext
private final Injector injector;
private final DartWorkerClient workerClient;
private final SegmentWrangler segmentWrangler;
private final GroupingEngine groupingEngine;
private final SegmentManager segmentManager;
private final CoordinatorClient coordinatorClient;
private final MemoryIntrospector memoryIntrospector;
Expand All @@ -96,6 +96,7 @@ public class DartWorkerContext implements WorkerContext
@MonotonicNonNull
private volatile ResourceHolder<ProcessingBuffersSet> processingBuffersSet;
private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
private final List<InputSliceReaderProvider> inputSliceReaderProviders;

DartWorkerContext(
final String queryId,
Expand All @@ -107,7 +108,6 @@ public class DartWorkerContext implements WorkerContext
final DartWorkerClient workerClient,
final DruidProcessingConfig processingConfig,
final SegmentWrangler segmentWrangler,
final GroupingEngine groupingEngine,
final SegmentManager segmentManager,
final CoordinatorClient coordinatorClient,
final MemoryIntrospector memoryIntrospector,
Expand All @@ -116,7 +116,8 @@ public class DartWorkerContext implements WorkerContext
final File tempDir,
final QueryContext queryContext,
final DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
final ServiceEmitter emitter
final ServiceEmitter emitter,
final List<InputSliceReaderProvider> inputSliceReaderProviders
)
{
this.queryId = queryId;
Expand All @@ -129,7 +130,6 @@ public class DartWorkerContext implements WorkerContext
this.injector = injector;
this.workerClient = workerClient;
this.segmentWrangler = segmentWrangler;
this.groupingEngine = groupingEngine;
this.segmentManager = segmentManager;
this.coordinatorClient = coordinatorClient;
this.memoryIntrospector = memoryIntrospector;
Expand All @@ -138,6 +138,7 @@ public class DartWorkerContext implements WorkerContext
this.tempDir = tempDir;
this.queryContext = Preconditions.checkNotNull(queryContext, "queryContext");
this.emitter = emitter;
this.inputSliceReaderProviders = inputSliceReaderProviders;

// Compute thread count once in constructor
final int baseThreadCount = processingConfig.getNumThreads();
Expand Down Expand Up @@ -175,6 +176,12 @@ public Injector injector()
return injector;
}

@Override
public List<InputSliceReaderProvider> inputSliceReaderProviders()
{
return inputSliceReaderProviders;
}

@Override
public void registerWorker(Worker worker, Closer closer)
{
Expand Down
Loading
Loading