Skip to content

Commit baa4f5e

Browse files
committed
ADD updatedRows TO QueryStatistics
Come from TableMutationOperator and MergeWriteOperator's rowCount
1 parent 0f2f205 commit baa4f5e

File tree

53 files changed

+656
-285
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+656
-285
lines changed

Diff for: core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java

+1
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ private static QueryStats immediateFailureQueryStats()
332332
DataSize.ofBytes(0),
333333
0,
334334
0,
335+
0,
335336
new Duration(0, MILLISECONDS),
336337
new Duration(0, MILLISECONDS),
337338
DataSize.ofBytes(0),

Diff for: core/trino-main/src/main/java/io/trino/event/QueryMonitor.java

+2
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
231231
0,
232232
0,
233233
0,
234+
0,
234235
ImmutableList.of(),
235236
0,
236237
true,
@@ -343,6 +344,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
343344
queryStats.getRawInputPositions(),
344345
queryStats.getOutputDataSize().toBytes(),
345346
queryStats.getOutputPositions(),
347+
queryStats.getUpdatedPositions(),
346348
queryStats.getLogicalWrittenDataSize().toBytes(),
347349
queryStats.getWrittenPositions(),
348350
queryStats.getSpilledDataSize().toBytes(),

Diff for: core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java

+4
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
695695
long outputDataSize = 0;
696696
long failedOutputDataSize = 0;
697697
long outputPositions = 0;
698+
long updatedPositions = 0;
698699
long failedOutputPositions = 0;
699700

700701
long outputBlockedTime = 0;
@@ -711,6 +712,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
711712
ImmutableList.Builder<OperatorStats> operatorStatsSummary = ImmutableList.builder();
712713
for (StageInfo stageInfo : allStages) {
713714
StageStats stageStats = stageInfo.getStageStats();
715+
updatedPositions += stageStats.getUpdatedPositions();
714716
totalTasks += stageStats.getTotalTasks();
715717
runningTasks += stageStats.getRunningTasks();
716718
completedTasks += stageStats.getCompletedTasks();
@@ -907,6 +909,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
907909
succinctBytes(outputDataSize),
908910
succinctBytes(failedOutputDataSize),
909911
outputPositions,
912+
updatedPositions,
910913
failedOutputPositions,
911914

912915
new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
@@ -1511,6 +1514,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
15111514
queryStats.getOutputDataSize(),
15121515
queryStats.getFailedOutputDataSize(),
15131516
queryStats.getOutputPositions(),
1517+
queryStats.getUpdatedPositions(),
15141518
queryStats.getFailedOutputPositions(),
15151519
queryStats.getOutputBlockedTime(),
15161520
queryStats.getFailedOutputBlockedTime(),

Diff for: core/trino-main/src/main/java/io/trino/execution/QueryStats.java

+10
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ public class QueryStats
118118
private final DataSize outputDataSize;
119119
private final DataSize failedOutputDataSize;
120120
private final long outputPositions;
121+
private final long updatedPositions;
121122
private final long failedOutputPositions;
122123

123124
private final Duration outputBlockedTime;
@@ -213,6 +214,7 @@ public QueryStats(
213214
@JsonProperty("outputDataSize") DataSize outputDataSize,
214215
@JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize,
215216
@JsonProperty("outputPositions") long outputPositions,
217+
@JsonProperty("updatedPositions") long updatedPositions,
216218
@JsonProperty("failedOutputPositions") long failedOutputPositions,
217219

218220
@JsonProperty("outputBlockedTime") Duration outputBlockedTime,
@@ -323,6 +325,8 @@ public QueryStats(
323325
this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null");
324326
checkArgument(outputPositions >= 0, "outputPositions is negative");
325327
this.outputPositions = outputPositions;
328+
checkArgument(updatedPositions >= 0, "updatedPositions is negative");
329+
this.updatedPositions = updatedPositions;
326330
checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative");
327331
this.failedOutputPositions = failedOutputPositions;
328332

@@ -745,6 +749,12 @@ public long getOutputPositions()
745749
return outputPositions;
746750
}
747751

752+
@JsonProperty
753+
public long getUpdatedPositions()
754+
{
755+
return updatedPositions;
756+
}
757+
748758
@JsonProperty
749759
public long getFailedOutputPositions()
750760
{

Diff for: core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java

+3
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
473473
long outputDataSize = 0;
474474
long failedOutputDataSize = 0;
475475
long outputPositions = 0;
476+
long updatedPositions = 0;
476477
long failedOutputPositions = 0;
477478
Metrics.Accumulator outputBufferMetrics = Metrics.accumulator();
478479

@@ -553,6 +554,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
553554
taskInfo.outputBuffers().getUtilization().ifPresent(bufferUtilizationHistograms::add);
554555
outputDataSize += taskStats.getOutputDataSize().toBytes();
555556
outputPositions += taskStats.getOutputPositions();
557+
updatedPositions += taskStats.getUpdatedPositions();
556558
bufferMetrics.ifPresent(outputBufferMetrics::add);
557559

558560
outputBlockedTime += taskStats.getOutputBlockedTime().roundTo(NANOSECONDS);
@@ -660,6 +662,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
660662
succinctBytes(outputDataSize),
661663
succinctBytes(failedOutputDataSize),
662664
outputPositions,
665+
updatedPositions,
663666
failedOutputPositions,
664667
outputBufferMetrics.get(),
665668
succinctDuration(outputBlockedTime, NANOSECONDS),

Diff for: core/trino-main/src/main/java/io/trino/execution/StageStats.java

+12
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public class StageStats
107107
private final DataSize outputDataSize;
108108
private final DataSize failedOutputDataSize;
109109
private final long outputPositions;
110+
private final long updatedPositions;
110111
private final long failedOutputPositions;
111112
private final Metrics outputBufferMetrics;
112113

@@ -183,6 +184,7 @@ public StageStats(
183184
@JsonProperty("outputDataSize") DataSize outputDataSize,
184185
@JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize,
185186
@JsonProperty("outputPositions") long outputPositions,
187+
@JsonProperty("updatedPositions") long updatedPositions,
186188
@JsonProperty("failedOutputPositions") long failedOutputPositions,
187189
@JsonProperty("outputBufferMetrics") Metrics outputBufferMetrics,
188190

@@ -274,6 +276,8 @@ public StageStats(
274276
this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null");
275277
checkArgument(outputPositions >= 0, "outputPositions is negative");
276278
this.outputPositions = outputPositions;
279+
checkArgument(updatedPositions >= 0, "updatedPositions is negative");
280+
this.updatedPositions = updatedPositions;
277281
checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative");
278282
this.failedOutputPositions = failedOutputPositions;
279283
this.outputBufferMetrics = requireNonNull(outputBufferMetrics, "outputBufferMetrics is null");
@@ -590,6 +594,12 @@ public long getOutputPositions()
590594
return outputPositions;
591595
}
592596

597+
@JsonProperty
598+
public long getUpdatedPositions()
599+
{
600+
return updatedPositions;
601+
}
602+
593603
@JsonProperty
594604
public long getFailedOutputPositions()
595605
{
@@ -735,6 +745,7 @@ public StageStats pruneDigests()
735745
outputDataSize,
736746
failedOutputDataSize,
737747
outputPositions,
748+
updatedPositions,
738749
failedOutputPositions,
739750
pruneMetrics(outputBufferMetrics),
740751
outputBlockedTime,
@@ -803,6 +814,7 @@ public static StageStats createInitial()
803814
zeroBytes,
804815
0,
805816
0,
817+
0,
806818
Metrics.EMPTY,
807819
zeroSeconds,
808820
zeroSeconds,

Diff for: core/trino-main/src/main/java/io/trino/operator/DriverContext.java

+11
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,14 @@ public List<OperatorStats> getOperatorStats()
308308
.collect(toImmutableList());
309309
}
310310

311+
public Long getUpdatedPositions()
312+
{
313+
return operatorContexts.stream()
314+
.map(OperatorContext::getOperatorStats)
315+
.mapToLong(OperatorStats::getUpdatedPositions)
316+
.sum();
317+
}
318+
311319
public DriverStats getDriverStats()
312320
{
313321
long totalScheduledTime = overallTiming.getWallNanos();
@@ -346,6 +354,7 @@ public DriverStats getDriverStats()
346354
Duration inputBlockedTime;
347355
DataSize outputDataSize;
348356
long outputPositions;
357+
long updatedPositions = getUpdatedPositions();
349358
Duration outputBlockedTime;
350359
if (inputOperator != null) {
351360
physicalInputDataSize = inputOperator.getPhysicalInputDataSize();
@@ -389,6 +398,7 @@ public DriverStats getDriverStats()
389398

390399
outputDataSize = DataSize.ofBytes(0);
391400
outputPositions = 0;
401+
updatedPositions = 0;
392402

393403
outputBlockedTime = new Duration(0, MILLISECONDS);
394404
}
@@ -428,6 +438,7 @@ public DriverStats getDriverStats()
428438
inputBlockedTime,
429439
outputDataSize.succinct(),
430440
outputPositions,
441+
updatedPositions,
431442
outputBlockedTime,
432443
succinctBytes(physicalWrittenDataSize),
433444
operators);

Diff for: core/trino-main/src/main/java/io/trino/operator/DriverStats.java

+11
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class DriverStats
6767

6868
private final DataSize outputDataSize;
6969
private final long outputPositions;
70+
private final long updatedPositions;
7071

7172
private final Duration outputBlockedTime;
7273

@@ -109,6 +110,7 @@ public DriverStats()
109110

110111
this.outputDataSize = DataSize.ofBytes(0);
111112
this.outputPositions = 0;
113+
this.updatedPositions = 0;
112114

113115
this.outputBlockedTime = new Duration(0, MILLISECONDS);
114116

@@ -152,6 +154,7 @@ public DriverStats(
152154

153155
@JsonProperty("outputDataSize") DataSize outputDataSize,
154156
@JsonProperty("outputPositions") long outputPositions,
157+
@JsonProperty("updatedPositions") long updatedPositions,
155158

156159
@JsonProperty("outputBlockedTime") Duration outputBlockedTime,
157160

@@ -197,6 +200,8 @@ public DriverStats(
197200
this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");
198201
checkArgument(outputPositions >= 0, "outputPositions is negative");
199202
this.outputPositions = outputPositions;
203+
checkArgument(updatedPositions >= 0, "updatedPositions is negative");
204+
this.updatedPositions = updatedPositions;
200205

201206
this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null");
202207

@@ -357,6 +362,12 @@ public long getOutputPositions()
357362
return outputPositions;
358363
}
359364

365+
@JsonProperty
366+
public long getUpdatedPositions()
367+
{
368+
return updatedPositions;
369+
}
370+
360371
@JsonProperty
361372
public Duration getOutputBlockedTime()
362373
{

Diff for: core/trino-main/src/main/java/io/trino/operator/MergeWriterOperator.java

+2
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ public Page getOutput()
185185
VARBINARY.writeSlice(fragmentBuilder, fragment);
186186
}
187187

188+
this.operatorContext.recordUpdatedPositions(rowCount);
189+
188190
return page.build();
189191
}
190192

Diff for: core/trino-main/src/main/java/io/trino/operator/OperatorContext.java

+13
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public class OperatorContext
8282
private final OperationTiming getOutputTiming = new OperationTiming();
8383
private final CounterStat outputDataSize = new CounterStat();
8484
private final CounterStat outputPositions = new CounterStat();
85+
private final CounterStat updatedPositions = new CounterStat();
8586

8687
private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();
8788
private final AtomicReference<Metrics> metrics = new AtomicReference<>(Metrics.EMPTY); // this is not incremental, but gets overwritten by the latest value.
@@ -226,6 +227,12 @@ public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits)
226227
dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits);
227228
}
228229

230+
public void recordUpdatedPositions(long updatedPositions)
231+
{
232+
checkArgument(updatedPositions >= 0, "updatedPositions is negative (%s)", updatedPositions);
233+
this.updatedPositions.update(updatedPositions);
234+
}
235+
229236
/**
230237
* Overwrites the metrics with the latest one.
231238
*
@@ -490,6 +497,11 @@ public CounterStat getOutputPositions()
490497
return outputPositions;
491498
}
492499

500+
public CounterStat getUpdatedPositions()
501+
{
502+
return updatedPositions;
503+
}
504+
493505
public long getWriterInputDataSize()
494506
{
495507
return writerInputDataSize.get();
@@ -554,6 +566,7 @@ public OperatorStats getOperatorStats()
554566
new Duration(getOutputTiming.getCpuNanos(), NANOSECONDS).convertToMostSuccinctTimeUnit(),
555567
DataSize.ofBytes(outputDataSize.getTotalCount()),
556568
outputPositions.getTotalCount(),
569+
updatedPositions.getTotalCount(),
557570

558571
dynamicFilterSplitsProcessed.get(),
559572
getOperatorMetrics(

Diff for: core/trino-main/src/main/java/io/trino/operator/OperatorStats.java

+16
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class OperatorStats
6363
private final Duration getOutputCpu;
6464
private final DataSize outputDataSize;
6565
private final long outputPositions;
66+
private final long updatedPositions;
6667

6768
private final long dynamicFilterSplitsProcessed;
6869
private final Metrics metrics;
@@ -118,6 +119,7 @@ public OperatorStats(
118119
@JsonProperty("getOutputCpu") Duration getOutputCpu,
119120
@JsonProperty("outputDataSize") DataSize outputDataSize,
120121
@JsonProperty("outputPositions") long outputPositions,
122+
@JsonProperty("updatedPositions") long updatedPositions,
121123

122124
@JsonProperty("dynamicFilterSplitsProcessed") long dynamicFilterSplitsProcessed,
123125
@JsonProperty("metrics") Metrics metrics,
@@ -175,6 +177,8 @@ public OperatorStats(
175177
this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");
176178
checkArgument(outputPositions >= 0, "outputPositions is negative");
177179
this.outputPositions = outputPositions;
180+
checkArgument(updatedPositions >= 0, "updatedPositions is negative");
181+
this.updatedPositions = updatedPositions;
178182

179183
this.dynamicFilterSplitsProcessed = dynamicFilterSplitsProcessed;
180184
this.metrics = requireNonNull(metrics, "metrics is null");
@@ -341,6 +345,12 @@ public long getOutputPositions()
341345
return outputPositions;
342346
}
343347

348+
@JsonProperty
349+
public long getUpdatedPositions()
350+
{
351+
return updatedPositions;
352+
}
353+
344354
@JsonProperty
345355
public long getDynamicFilterSplitsProcessed()
346356
{
@@ -486,6 +496,7 @@ private OperatorStats add(Iterable<OperatorStats> operators, Optional<Metrics> p
486496
long getOutputCpu = this.getOutputCpu.roundTo(NANOSECONDS);
487497
long outputDataSize = this.outputDataSize.toBytes();
488498
long outputPositions = this.outputPositions;
499+
long updatedPositions = this.updatedPositions;
489500

490501
long dynamicFilterSplitsProcessed = this.dynamicFilterSplitsProcessed;
491502
Metrics.Accumulator metricsAccumulator = Metrics.accumulator().add(this.getMetrics());
@@ -536,6 +547,7 @@ private OperatorStats add(Iterable<OperatorStats> operators, Optional<Metrics> p
536547
getOutputCpu += operator.getGetOutputCpu().roundTo(NANOSECONDS);
537548
outputDataSize += operator.getOutputDataSize().toBytes();
538549
outputPositions += operator.getOutputPositions();
550+
updatedPositions += operator.getUpdatedPositions();
539551

540552
dynamicFilterSplitsProcessed += operator.getDynamicFilterSplitsProcessed();
541553
metricsAccumulator.add(operator.getMetrics());
@@ -597,6 +609,7 @@ private OperatorStats add(Iterable<OperatorStats> operators, Optional<Metrics> p
597609
new Duration(getOutputCpu, NANOSECONDS).convertToMostSuccinctTimeUnit(),
598610
DataSize.ofBytes(outputDataSize),
599611
outputPositions,
612+
updatedPositions,
600613

601614
dynamicFilterSplitsProcessed,
602615
metricsAccumulator.get(),
@@ -669,6 +682,7 @@ public OperatorStats pruneDigests()
669682
getOutputCpu,
670683
outputDataSize,
671684
outputPositions,
685+
updatedPositions,
672686
dynamicFilterSplitsProcessed,
673687
pruneMetrics(metrics),
674688
pruneMetrics(connectorMetrics),
@@ -718,6 +732,7 @@ public OperatorStats summarize()
718732
getOutputCpu,
719733
outputDataSize,
720734
outputPositions,
735+
updatedPositions,
721736
dynamicFilterSplitsProcessed,
722737
metrics,
723738
connectorMetrics,
@@ -763,6 +778,7 @@ public OperatorStats withPipelineMetrics(Metrics pipelineMetrics)
763778
getOutputCpu,
764779
outputDataSize,
765780
outputPositions,
781+
updatedPositions,
766782
dynamicFilterSplitsProcessed,
767783
metrics,
768784
connectorMetrics,

0 commit comments

Comments
 (0)