Skip to content

Commit b72de1a

Browse files
authored
MSQ: Dependency injection into StageProcessors. (#19080)
This patch updates tests to round-trip work orders using an ObjectMapper, which ensures that any JacksonInject fields present on StageProcessor are populated. In production, this happens naturally because work orders are generally sent over HTTP. To demonstrate that the approach works, GroupingEngine is removed from FrameContext. Instead it is injected directly into the relevant StageProcessors.
1 parent 911379d commit b72de1a

11 files changed

Lines changed: 38 additions & 31 deletions

File tree

multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,6 @@ public SegmentWrangler segmentWrangler()
100100
return segmentWrangler;
101101
}
102102

103-
@Override
104-
public GroupingEngine groupingEngine()
105-
{
106-
return groupingEngine;
107-
}
108-
109103
@Override
110104
public RowIngestionMeters rowIngestionMeters()
111105
{

multi-stage-query/src/main/java/org/apache/druid/msq/exec/FrameContext.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import org.apache.druid.client.coordinator.CoordinatorClient;
2424
import org.apache.druid.msq.kernel.WorkOrder;
25-
import org.apache.druid.query.groupby.GroupingEngine;
2625
import org.apache.druid.query.policy.PolicyEnforcer;
2726
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
2827
import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
@@ -49,8 +48,6 @@ public interface FrameContext extends Closeable
4948

5049
SegmentWrangler segmentWrangler();
5150

52-
GroupingEngine groupingEngine();
53-
5451
RowIngestionMeters rowIngestionMeters();
5552

5653
/**

multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.druid.msq.exec.WorkerMemoryParameters;
3131
import org.apache.druid.msq.exec.WorkerStorageParameters;
3232
import org.apache.druid.msq.kernel.StageId;
33-
import org.apache.druid.query.groupby.GroupingEngine;
3433
import org.apache.druid.query.policy.PolicyEnforcer;
3534
import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
3635
import org.apache.druid.segment.IndexIO;
@@ -94,12 +93,6 @@ public SegmentWrangler segmentWrangler()
9493
return context.injector().getInstance(SegmentWrangler.class);
9594
}
9695

97-
@Override
98-
public GroupingEngine groupingEngine()
99-
{
100-
return context.injector().getInstance(GroupingEngine.class);
101-
}
102-
10396
@Override
10497
public RowIngestionMeters rowIngestionMeters()
10598
{

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.msq.querykit.groupby;
2121

22+
import com.fasterxml.jackson.annotation.JacksonInject;
2223
import com.fasterxml.jackson.annotation.JsonCreator;
2324
import com.fasterxml.jackson.annotation.JsonProperty;
2425
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -44,6 +45,7 @@
4445
import org.apache.druid.query.groupby.GroupByQuery;
4546
import org.apache.druid.query.groupby.GroupingEngine;
4647

48+
import javax.annotation.Nullable;
4749
import java.io.IOException;
4850
import java.util.List;
4951

@@ -52,6 +54,10 @@ public class GroupByPostShuffleStageProcessor extends BasicStageProcessor
5254
{
5355
private final GroupByQuery query;
5456

57+
@JacksonInject
58+
@Nullable
59+
private GroupingEngine groupingEngine;
60+
5561
@JsonCreator
5662
public GroupByPostShuffleStageProcessor(
5763
@JsonProperty("query") GroupByQuery query
@@ -74,7 +80,6 @@ public ListenableFuture<Long> execute(ExecutionContext context)
7480
// Expecting a single input slice from some prior stage.
7581
final List<InputSlice> inputSlices = context.workOrder().getInputs();
7682
final StageInputSlice slice = (StageInputSlice) Iterables.getOnlyElement(inputSlices);
77-
final GroupingEngine engine = context.frameContext().groupingEngine();
7883
final Int2ObjectSortedMap<OutputChannel> outputChannels = new Int2ObjectAVLTreeMap<>();
7984

8085
for (final ReadablePartition partition : slice.getPartitions()) {
@@ -99,7 +104,7 @@ public ListenableFuture<Long> execute(ExecutionContext context)
99104

100105
return new GroupByPostShuffleFrameProcessor(
101106
query,
102-
engine,
107+
groupingEngine,
103108
readableInput.getChannel(),
104109
outputChannel.getWritableChannel(),
105110
context.workOrder().getStageDefinition().createFrameWriterFactory(

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.msq.querykit.groupby;
2121

22+
import com.fasterxml.jackson.annotation.JacksonInject;
2223
import com.fasterxml.jackson.annotation.JsonCreator;
2324
import com.fasterxml.jackson.annotation.JsonProperty;
2425
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -33,9 +34,11 @@
3334
import org.apache.druid.msq.querykit.BaseLeafStageProcessor;
3435
import org.apache.druid.msq.querykit.ReadableInput;
3536
import org.apache.druid.query.groupby.GroupByQuery;
37+
import org.apache.druid.query.groupby.GroupingEngine;
3638
import org.apache.druid.segment.SegmentMapFunction;
3739
import org.joda.time.Interval;
3840

41+
import javax.annotation.Nullable;
3942
import java.util.ArrayList;
4043
import java.util.List;
4144

@@ -44,6 +47,10 @@ public class GroupByPreShuffleStageProcessor extends BaseLeafStageProcessor
4447
{
4548
private final GroupByQuery query;
4649

50+
@JacksonInject
51+
@Nullable
52+
private GroupingEngine groupingEngine;
53+
4754
@JsonCreator
4855
public GroupByPreShuffleStageProcessor(@JsonProperty("query") GroupByQuery query)
4956
{
@@ -68,7 +75,7 @@ protected FrameProcessor<Object> makeProcessor(
6875
{
6976
return new GroupByPreShuffleFrameProcessor(
7077
query,
71-
frameContext.groupingEngine(),
78+
groupingEngine,
7279
frameContext.processingBuffers().getBufferPool(),
7380
baseInput,
7481
segmentMapFn,

multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.msq.exec;
2121

22+
import com.fasterxml.jackson.databind.InjectableValues;
2223
import com.fasterxml.jackson.databind.ObjectMapper;
2324
import com.google.common.base.Preconditions;
2425
import com.google.common.collect.Iterables;
@@ -217,6 +218,7 @@ public void setUpMSQ()
217218
new GroupByQueryConfig(),
218219
TestGroupByBuffers.createDefault()
219220
).getGroupingEngine();
221+
((InjectableValues.Std) objectMapper.getInjectableValues()).addValue(GroupingEngine.class, groupingEngine);
220222

221223
Module modules = Modules.combine(
222224
new DruidGuiceExtensions(),

multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ public void setUp() throws Exception
222222
);
223223
Injector injector = Guice.createInjector(defaultModule, BoundFieldModule.of(this));
224224
DruidSecondaryModule.setupJackson(injector, objectMapper, Collections.emptyMap(), true);
225+
new MSQIndexingModule().getJacksonModules().forEach(objectMapper::registerModule);
225226

226227
// Populate loadedSegmentMetadata from walker segments so CoordinatorClient.fetchSegment() can find them
227228
List<ImmutableSegmentLoadInfo> loadedSegmentMetadata = new ArrayList<>();

multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ public void registerController(Controller controller, Closer closer)
467467
@Override
468468
public WorkerClient newWorkerClient()
469469
{
470-
return new MSQTestWorkerClient(inMemoryWorkers);
470+
return new MSQTestWorkerClient(inMemoryWorkers, mapper);
471471
}
472472

473473
@Override

multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.druid.msq.test;
2121

22+
import com.fasterxml.jackson.annotation.JacksonInject;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
2224
import com.google.common.util.concurrent.Futures;
2325
import com.google.common.util.concurrent.ListenableFuture;
2426
import org.apache.druid.common.guava.FutureUtils;
@@ -27,6 +29,7 @@
2729
import org.apache.druid.java.util.common.ISE;
2830
import org.apache.druid.java.util.common.Stopwatch;
2931
import org.apache.druid.msq.counters.CounterSnapshotsTree;
32+
import org.apache.druid.msq.exec.StageProcessor;
3033
import org.apache.druid.msq.exec.Worker;
3134
import org.apache.druid.msq.exec.WorkerClient;
3235
import org.apache.druid.msq.exec.WorkerRunRef;
@@ -45,17 +48,19 @@ public class MSQTestWorkerClient implements WorkerClient
4548
private static final long WORKER_WAIT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
4649

4750
protected final Map<String, WorkerRunRef> inMemoryWorkers;
51+
private final ObjectMapper objectMapper;
4852
private final AtomicBoolean closed = new AtomicBoolean();
4953

50-
public MSQTestWorkerClient(Map<String, WorkerRunRef> inMemoryWorkers)
54+
public MSQTestWorkerClient(Map<String, WorkerRunRef> inMemoryWorkers, ObjectMapper objectMapper)
5155
{
5256
this.inMemoryWorkers = inMemoryWorkers;
57+
this.objectMapper = objectMapper;
5358
}
5459

5560
@Override
5661
public ListenableFuture<Void> postWorkOrder(String workerTaskId, WorkOrder workOrder)
5762
{
58-
getWorkerFor(workerTaskId).postWorkOrder(workOrder);
63+
getWorkerFor(workerTaskId).postWorkOrder(roundTripSerdeWorkOrder(workOrder));
5964
return Futures.immediateFuture(null);
6065
}
6166

@@ -190,4 +195,14 @@ public void close()
190195
inMemoryWorkers.forEach((k, v) -> v.cancel());
191196
}
192197
}
198+
199+
/**
200+
* Using {@link #objectMapper}, convert work order to work order. This ensures that any {@link JacksonInject} fields
201+
* present on {@link StageProcessor} are populated. In production, this happens naturally because work orders are
202+
* generally sent over HTTP.
203+
*/
204+
private WorkOrder roundTripSerdeWorkOrder(final WorkOrder workOrder)
205+
{
206+
return objectMapper.convertValue(workOrder, WorkOrder.class);
207+
}
193208
}

multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.druid.msq.exec.WorkerStorageParameters;
4444
import org.apache.druid.msq.kernel.WorkOrder;
4545
import org.apache.druid.msq.util.MultiStageQueryContext;
46-
import org.apache.druid.query.groupby.GroupingEngine;
4746
import org.apache.druid.query.policy.PolicyEnforcer;
4847
import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
4948
import org.apache.druid.segment.IndexIO;
@@ -158,7 +157,7 @@ public ControllerClient makeControllerClient()
158157
@Override
159158
public WorkerClient makeWorkerClient()
160159
{
161-
return new MSQTestWorkerClient(inMemoryWorkers);
160+
return new MSQTestWorkerClient(inMemoryWorkers, mapper);
162161
}
163162

164163
@Override
@@ -231,12 +230,6 @@ public SegmentWrangler segmentWrangler()
231230
return injector.getInstance(SegmentWrangler.class);
232231
}
233232

234-
@Override
235-
public GroupingEngine groupingEngine()
236-
{
237-
return injector.getInstance(GroupingEngine.class);
238-
}
239-
240233
@Override
241234
public RowIngestionMeters rowIngestionMeters()
242235
{

0 commit comments

Comments
 (0)