Skip to content

Commit 9fe872e

Browse files
committed
add new field updatedRows to QueryStatistics.java where it's available in EventListener callbacks. It comes from TableMutationOperator.java where update/delete queries are issued.
add method `recordUpdatedPositions` to the OperatorContext class update the updatedPositions for the queries run through MergeWriterOperator.java
1 parent aa78dcb commit 9fe872e

File tree

37 files changed

+168
-0
lines changed

37 files changed

+168
-0
lines changed

Diff for: core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java

+1
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ private static QueryStats immediateFailureQueryStats()
332332
DataSize.ofBytes(0),
333333
0,
334334
0,
335+
0,
335336
new Duration(0, MILLISECONDS),
336337
new Duration(0, MILLISECONDS),
337338
DataSize.ofBytes(0),

Diff for: core/trino-main/src/main/java/io/trino/event/QueryMonitor.java

+2
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
231231
0,
232232
0,
233233
0,
234+
0,
234235
ImmutableList.of(),
235236
0,
236237
true,
@@ -343,6 +344,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
343344
queryStats.getRawInputPositions(),
344345
queryStats.getOutputDataSize().toBytes(),
345346
queryStats.getOutputPositions(),
347+
queryStats.getUpdatedPositions(),
346348
queryStats.getLogicalWrittenDataSize().toBytes(),
347349
queryStats.getWrittenPositions(),
348350
queryStats.getSpilledDataSize().toBytes(),

Diff for: core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public static OperatorStats pruneOperatorStats(OperatorStats operatorStats)
7171
operatorStats.getGetOutputCpu(),
7272
operatorStats.getOutputDataSize(),
7373
operatorStats.getOutputPositions(),
74+
operatorStats.getUpdatedPositions(),
7475
operatorStats.getDynamicFilterSplitsProcessed(),
7576
pruneMetrics(operatorStats.getMetrics()),
7677
pruneMetrics(operatorStats.getConnectorMetrics()),

Diff for: core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java

+4
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
686686
long outputDataSize = 0;
687687
long failedOutputDataSize = 0;
688688
long outputPositions = 0;
689+
long updatedPositions = 0;
689690
long failedOutputPositions = 0;
690691

691692
long outputBlockedTime = 0;
@@ -702,6 +703,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
702703
ImmutableList.Builder<OperatorStats> operatorStatsSummary = ImmutableList.builder();
703704
for (StageInfo stageInfo : allStages) {
704705
StageStats stageStats = stageInfo.getStageStats();
706+
updatedPositions += stageStats.getUpdatedPositions();
705707
totalTasks += stageStats.getTotalTasks();
706708
runningTasks += stageStats.getRunningTasks();
707709
completedTasks += stageStats.getCompletedTasks();
@@ -898,6 +900,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo>
898900
succinctBytes(outputDataSize),
899901
succinctBytes(failedOutputDataSize),
900902
outputPositions,
903+
updatedPositions,
901904
failedOutputPositions,
902905

903906
new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
@@ -1502,6 +1505,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
15021505
queryStats.getOutputDataSize(),
15031506
queryStats.getFailedOutputDataSize(),
15041507
queryStats.getOutputPositions(),
1508+
queryStats.getUpdatedPositions(),
15051509
queryStats.getFailedOutputPositions(),
15061510
queryStats.getOutputBlockedTime(),
15071511
queryStats.getFailedOutputBlockedTime(),

Diff for: core/trino-main/src/main/java/io/trino/execution/QueryStats.java

+10
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ public class QueryStats
118118
private final DataSize outputDataSize;
119119
private final DataSize failedOutputDataSize;
120120
private final long outputPositions;
121+
private final long updatedPositions;
121122
private final long failedOutputPositions;
122123

123124
private final Duration outputBlockedTime;
@@ -213,6 +214,7 @@ public QueryStats(
213214
@JsonProperty("outputDataSize") DataSize outputDataSize,
214215
@JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize,
215216
@JsonProperty("outputPositions") long outputPositions,
217+
@JsonProperty("updatedPositions") long updatedPositions,
216218
@JsonProperty("failedOutputPositions") long failedOutputPositions,
217219

218220
@JsonProperty("outputBlockedTime") Duration outputBlockedTime,
@@ -323,6 +325,8 @@ public QueryStats(
323325
this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null");
324326
checkArgument(outputPositions >= 0, "outputPositions is negative");
325327
this.outputPositions = outputPositions;
328+
checkArgument(updatedPositions >= 0, "updatedPositions is negative");
329+
this.updatedPositions = updatedPositions;
326330
checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative");
327331
this.failedOutputPositions = failedOutputPositions;
328332

@@ -743,6 +747,12 @@ public long getOutputPositions()
743747
return outputPositions;
744748
}
745749

750+
@JsonProperty
751+
public long getUpdatedPositions()
752+
{
753+
return updatedPositions;
754+
}
755+
746756
@JsonProperty
747757
public long getFailedOutputPositions()
748758
{

Diff for: core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java

+3
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
473473
long outputDataSize = 0;
474474
long failedOutputDataSize = 0;
475475
long outputPositions = 0;
476+
long updatedPositions = 0;
476477
long failedOutputPositions = 0;
477478
Metrics.Accumulator outputBufferMetrics = Metrics.accumulator();
478479

@@ -553,6 +554,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
553554
taskInfo.outputBuffers().getUtilization().ifPresent(bufferUtilizationHistograms::add);
554555
outputDataSize += taskStats.getOutputDataSize().toBytes();
555556
outputPositions += taskStats.getOutputPositions();
557+
updatedPositions += taskStats.getUpdatedPositions();
556558
bufferMetrics.ifPresent(outputBufferMetrics::add);
557559

558560
outputBlockedTime += taskStats.getOutputBlockedTime().roundTo(NANOSECONDS);
@@ -660,6 +662,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
660662
succinctBytes(outputDataSize),
661663
succinctBytes(failedOutputDataSize),
662664
outputPositions,
665+
updatedPositions,
663666
failedOutputPositions,
664667
outputBufferMetrics.get(),
665668
succinctDuration(outputBlockedTime, NANOSECONDS),

Diff for: core/trino-main/src/main/java/io/trino/execution/StageStats.java

+11
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public class StageStats
106106
private final DataSize outputDataSize;
107107
private final DataSize failedOutputDataSize;
108108
private final long outputPositions;
109+
private final long updatedPositions;
109110
private final long failedOutputPositions;
110111
private final Metrics outputBufferMetrics;
111112

@@ -182,6 +183,7 @@ public StageStats(
182183
@JsonProperty("outputDataSize") DataSize outputDataSize,
183184
@JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize,
184185
@JsonProperty("outputPositions") long outputPositions,
186+
@JsonProperty("updatedPositions") long updatedPositions,
185187
@JsonProperty("failedOutputPositions") long failedOutputPositions,
186188
@JsonProperty("outputBufferMetrics") Metrics outputBufferMetrics,
187189

@@ -273,6 +275,8 @@ public StageStats(
273275
this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null");
274276
checkArgument(outputPositions >= 0, "outputPositions is negative");
275277
this.outputPositions = outputPositions;
278+
checkArgument(updatedPositions >= 0, "updatedPositions is negative");
279+
this.updatedPositions = updatedPositions;
276280
checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative");
277281
this.failedOutputPositions = failedOutputPositions;
278282
this.outputBufferMetrics = requireNonNull(outputBufferMetrics, "outputBufferMetrics is null");
@@ -588,6 +592,12 @@ public long getOutputPositions()
588592
return outputPositions;
589593
}
590594

595+
@JsonProperty
596+
public long getUpdatedPositions()
597+
{
598+
return updatedPositions;
599+
}
600+
591601
@JsonProperty
592602
public long getFailedOutputPositions()
593603
{
@@ -736,6 +746,7 @@ public static StageStats createInitial()
736746
zeroBytes,
737747
0,
738748
0,
749+
0,
739750
Metrics.EMPTY,
740751
zeroSeconds,
741752
zeroSeconds,

Diff for: core/trino-main/src/main/java/io/trino/operator/DriverContext.java

+11
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,14 @@ public List<OperatorStats> getOperatorStats()
308308
.collect(toImmutableList());
309309
}
310310

311+
public Long getUpdatedPositions()
312+
{
313+
return operatorContexts.stream()
314+
.map(OperatorContext::getOperatorStats)
315+
.mapToLong(OperatorStats::getUpdatedPositions)
316+
.sum();
317+
}
318+
311319
public DriverStats getDriverStats()
312320
{
313321
long totalScheduledTime = overallTiming.getWallNanos();
@@ -346,6 +354,7 @@ public DriverStats getDriverStats()
346354
Duration inputBlockedTime;
347355
DataSize outputDataSize;
348356
long outputPositions;
357+
long updatedPositions = getUpdatedPositions();
349358
Duration outputBlockedTime;
350359
if (inputOperator != null) {
351360
physicalInputDataSize = inputOperator.getPhysicalInputDataSize();
@@ -389,6 +398,7 @@ public DriverStats getDriverStats()
389398

390399
outputDataSize = DataSize.ofBytes(0);
391400
outputPositions = 0;
401+
updatedPositions = 0;
392402

393403
outputBlockedTime = new Duration(0, MILLISECONDS);
394404
}
@@ -428,6 +438,7 @@ public DriverStats getDriverStats()
428438
inputBlockedTime,
429439
outputDataSize.succinct(),
430440
outputPositions,
441+
updatedPositions,
431442
outputBlockedTime,
432443
succinctBytes(physicalWrittenDataSize),
433444
operators);

Diff for: core/trino-main/src/main/java/io/trino/operator/DriverStats.java

+11
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class DriverStats
6767

6868
private final DataSize outputDataSize;
6969
private final long outputPositions;
70+
private final long updatedPositions;
7071

7172
private final Duration outputBlockedTime;
7273

@@ -109,6 +110,7 @@ public DriverStats()
109110

110111
this.outputDataSize = DataSize.ofBytes(0);
111112
this.outputPositions = 0;
113+
this.updatedPositions = 0;
112114

113115
this.outputBlockedTime = new Duration(0, MILLISECONDS);
114116

@@ -152,6 +154,7 @@ public DriverStats(
152154

153155
@JsonProperty("outputDataSize") DataSize outputDataSize,
154156
@JsonProperty("outputPositions") long outputPositions,
157+
@JsonProperty("updatedPositions") long updatedPositions,
155158

156159
@JsonProperty("outputBlockedTime") Duration outputBlockedTime,
157160

@@ -197,6 +200,8 @@ public DriverStats(
197200
this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");
198201
checkArgument(outputPositions >= 0, "outputPositions is negative");
199202
this.outputPositions = outputPositions;
203+
checkArgument(updatedPositions >= 0, "updatedPositions is negative");
204+
this.updatedPositions = updatedPositions;
200205

201206
this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null");
202207

@@ -357,6 +362,12 @@ public long getOutputPositions()
357362
return outputPositions;
358363
}
359364

365+
@JsonProperty
366+
public long getUpdatedPositions()
367+
{
368+
return updatedPositions;
369+
}
370+
360371
@JsonProperty
361372
public Duration getOutputBlockedTime()
362373
{

Diff for: core/trino-main/src/main/java/io/trino/operator/MergeWriterOperator.java

+2
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ public Page getOutput()
185185
VARBINARY.writeSlice(fragmentBuilder, fragment);
186186
}
187187

188+
this.operatorContext.recordUpdatedPositions(rowCount);
189+
188190
return page.build();
189191
}
190192

Diff for: core/trino-main/src/main/java/io/trino/operator/OperatorContext.java

+13
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public class OperatorContext
8282
private final OperationTiming getOutputTiming = new OperationTiming();
8383
private final CounterStat outputDataSize = new CounterStat();
8484
private final CounterStat outputPositions = new CounterStat();
85+
private final CounterStat updatedPositions = new CounterStat();
8586

8687
private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();
8788
private final AtomicReference<Metrics> metrics = new AtomicReference<>(Metrics.EMPTY); // this is not incremental, but gets overwritten by the latest value.
@@ -226,6 +227,12 @@ public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits)
226227
dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits);
227228
}
228229

230+
public void recordUpdatedPositions(long updatedPositions)
231+
{
232+
checkArgument(updatedPositions >= 0, "updatedPositions is negative (%s)", updatedPositions);
233+
this.updatedPositions.update(updatedPositions);
234+
}
235+
229236
/**
230237
* Overwrites the metrics with the latest one.
231238
*
@@ -490,6 +497,11 @@ public CounterStat getOutputPositions()
490497
return outputPositions;
491498
}
492499

500+
public CounterStat getUpdatedPositions()
501+
{
502+
return updatedPositions;
503+
}
504+
493505
public long getWriterInputDataSize()
494506
{
495507
return writerInputDataSize.get();
@@ -554,6 +566,7 @@ public OperatorStats getOperatorStats()
554566
new Duration(getOutputTiming.getCpuNanos(), NANOSECONDS).convertToMostSuccinctTimeUnit(),
555567
DataSize.ofBytes(outputDataSize.getTotalCount()),
556568
outputPositions.getTotalCount(),
569+
updatedPositions.getTotalCount(),
557570

558571
dynamicFilterSplitsProcessed.get(),
559572
getOperatorMetrics(

Diff for: core/trino-main/src/main/java/io/trino/operator/OperatorStats.java

+15
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class OperatorStats
6262
private final Duration getOutputCpu;
6363
private final DataSize outputDataSize;
6464
private final long outputPositions;
65+
private final long updatedPositions;
6566

6667
private final long dynamicFilterSplitsProcessed;
6768
private final Metrics metrics;
@@ -117,6 +118,7 @@ public OperatorStats(
117118
@JsonProperty("getOutputCpu") Duration getOutputCpu,
118119
@JsonProperty("outputDataSize") DataSize outputDataSize,
119120
@JsonProperty("outputPositions") long outputPositions,
121+
@JsonProperty("updatedPositions") long updatedPositions,
120122

121123
@JsonProperty("dynamicFilterSplitsProcessed") long dynamicFilterSplitsProcessed,
122124
@JsonProperty("metrics") Metrics metrics,
@@ -174,6 +176,8 @@ public OperatorStats(
174176
this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");
175177
checkArgument(outputPositions >= 0, "outputPositions is negative");
176178
this.outputPositions = outputPositions;
179+
checkArgument(updatedPositions >= 0, "updatedPositions is negative");
180+
this.updatedPositions = updatedPositions;
177181

178182
this.dynamicFilterSplitsProcessed = dynamicFilterSplitsProcessed;
179183
this.metrics = requireNonNull(metrics, "metrics is null");
@@ -340,6 +344,12 @@ public long getOutputPositions()
340344
return outputPositions;
341345
}
342346

347+
@JsonProperty
348+
public long getUpdatedPositions()
349+
{
350+
return updatedPositions;
351+
}
352+
343353
@JsonProperty
344354
public long getDynamicFilterSplitsProcessed()
345355
{
@@ -485,6 +495,7 @@ private OperatorStats add(Iterable<OperatorStats> operators, Optional<Metrics> p
485495
long getOutputCpu = this.getOutputCpu.roundTo(NANOSECONDS);
486496
long outputDataSize = this.outputDataSize.toBytes();
487497
long outputPositions = this.outputPositions;
498+
long updatedPositions = this.updatedPositions;
488499

489500
long dynamicFilterSplitsProcessed = this.dynamicFilterSplitsProcessed;
490501
Metrics.Accumulator metricsAccumulator = Metrics.accumulator().add(this.getMetrics());
@@ -535,6 +546,7 @@ private OperatorStats add(Iterable<OperatorStats> operators, Optional<Metrics> p
535546
getOutputCpu += operator.getGetOutputCpu().roundTo(NANOSECONDS);
536547
outputDataSize += operator.getOutputDataSize().toBytes();
537548
outputPositions += operator.getOutputPositions();
549+
updatedPositions += operator.getUpdatedPositions();
538550

539551
dynamicFilterSplitsProcessed += operator.getDynamicFilterSplitsProcessed();
540552
metricsAccumulator.add(operator.getMetrics());
@@ -596,6 +608,7 @@ private OperatorStats add(Iterable<OperatorStats> operators, Optional<Metrics> p
596608
new Duration(getOutputCpu, NANOSECONDS).convertToMostSuccinctTimeUnit(),
597609
DataSize.ofBytes(outputDataSize),
598610
outputPositions,
611+
updatedPositions,
599612

600613
dynamicFilterSplitsProcessed,
601614
metricsAccumulator.get(),
@@ -672,6 +685,7 @@ public OperatorStats summarize()
672685
getOutputCpu,
673686
outputDataSize,
674687
outputPositions,
688+
updatedPositions,
675689
dynamicFilterSplitsProcessed,
676690
metrics,
677691
connectorMetrics,
@@ -717,6 +731,7 @@ public OperatorStats withPipelineMetrics(Metrics pipelineMetrics)
717731
getOutputCpu,
718732
outputDataSize,
719733
outputPositions,
734+
updatedPositions,
720735
dynamicFilterSplitsProcessed,
721736
metrics,
722737
connectorMetrics,

0 commit comments

Comments
 (0)