Skip to content

RuntimeStatsMetric Reporter for table scans in Iceberg Connector #24904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3bd3c8e
Initial template for RuntimeStatsMetricsReporter
j-sund Mar 14, 2025
afe94e5
More method stubs for MetricsReporter
j-sund Mar 14, 2025
f46a0e5
Added totalDuration metric tracker
j-sund Mar 14, 2025
e01409f
Added resultDataFiles and resultDeleteFiles Metric and added template…
j-sund Mar 15, 2025
d3b78a6
Added 4 more metrics, totalManifests related
j-sund Mar 15, 2025
e416ff6
Filled out rest of metric templates
j-sund Mar 15, 2025
0831294
Added guards for possible null values
j-sund Mar 15, 2025
e5c733f
First 3 Metrics for MetricsReporter
j-sund Apr 7, 2025
b66ff2a
Added query creation
j-sund Apr 7, 2025
27fc795
Deleted Testing File, didn't work, and should be placed elsewhere
j-sund Apr 9, 2025
0a4b2b8
Added Metrics Reporter to Iceberg Connector
j-sund Apr 9, 2025
1625a0f
Added a tableString builder method and fixed naming conventions
j-sund Apr 9, 2025
2254777
Added testing for RuntimestatsMetricReporter
j-sund Apr 10, 2025
d944e2f
Finished basic test for RuntimeStatsMetricReporter
j-sund Apr 10, 2025
0bea095
Adjusted RuntimeStatsMetricsReporter to follow checkstyle practice
j-sund Apr 23, 2025
7bcdc29
Fixed checkstyle for runtimestatsmetricreporter
j-sund Apr 23, 2025
6bc1e84
Removed comments and added better assert checks
j-sund May 4, 2025
84c3cf8
Added RuntimeStatsMetricReporter to all newScans in IcebergUtil
j-sund May 5, 2025
f4eecbd
Fixed test case and passed runtimeStats besides session
j-sund May 15, 2025
9c23516
Attempt test case fix and added reporter to TableStatisticsMaker
j-sund May 16, 2025
9ef6b65
Added reporter to PartitionTable newScan
j-sund May 16, 2025
8c38914
addressed review feedback
j-sund Jun 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.iceberg;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.StandardTypes;
Expand Down Expand Up @@ -117,13 +118,14 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId));
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId, session));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional<Long> snapshotId)
private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional<Long> snapshotId, ConnectorSession session)
{
PageListBuilder pagesBuilder = forTable(tableMetadata);
TableScan tableScan = getTableScan(TupleDomain.all(), snapshotId, icebergTable).includeColumnStats();
RuntimeStats runtimeStats = session.getRuntimeStats();
TableScan tableScan = getTableScan(TupleDomain.all(), snapshotId, icebergTable, runtimeStats).includeColumnStats();
Map<Integer, Type> idToTypeMap = getIdToTypeMap(icebergTable.schema());

try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.BigintType;
Expand Down Expand Up @@ -284,12 +285,14 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
partitions = ImmutableList.of(new HivePartition(handle.getSchemaTableName()));
}
else {
RuntimeStats runtimeStats = session.getRuntimeStats();
partitions = getPartitions(
typeManager,
handle,
icebergTable,
constraint,
partitionColumns);
partitionColumns,
runtimeStats);
}

ConnectorTableLayout layout = getTableLayout(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.iceberg;

import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.iceberg.changelog.ChangelogSplitSource;
Expand Down Expand Up @@ -91,21 +92,25 @@ public ConnectorSplitSource getSplits(
long toSnapshot = table.getIcebergTableName().getChangelogEndSnapshot()
.orElseGet(icebergTable.currentSnapshot()::snapshotId);
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
.fromSnapshotExclusive(fromSnapshot)
.toSnapshot(toSnapshot);
return new ChangelogSplitSource(session, typeManager, icebergTable, scan);
}
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
RuntimeStats runtimeStats = session.getRuntimeStats();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't comment above, so commenting here. Should we also register metrics reporter for CHANGELOG table scan?

CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
table.getIcebergTableName().getSnapshotId().get(),
predicate,
table.getPartitionSpecId(),
table.getEqualityFieldIds());
table.getEqualityFieldIds(),
runtimeStats);

return new EqualityDeletesSplitSource(session, icebergTable, deleteFiles);
}
else {
TableScan tableScan = icebergTable.newScan()
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
.filter(toIcebergExpression(predicate))
.useSnapshot(table.getIcebergTableName().getSnapshotId().get())
.planWith(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.GenericInternalException;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
Expand Down Expand Up @@ -410,10 +411,10 @@ public static Optional<String> getViewComment(View view)
return Optional.ofNullable(view.properties().get(TABLE_COMMENT));
}

public static TableScan getTableScan(TupleDomain<IcebergColumnHandle> predicates, Optional<Long> snapshotId, Table icebergTable)
public static TableScan getTableScan(TupleDomain<IcebergColumnHandle> predicates, Optional<Long> snapshotId, Table icebergTable, RuntimeStats runtimeStats)
{
Expression expression = ExpressionConverter.toIcebergExpression(predicates);
TableScan tableScan = icebergTable.newScan().filter(expression);
TableScan tableScan = icebergTable.newScan().metricsReporter(new RuntimeStatsMetricsReporter(runtimeStats)).filter(expression);
return snapshotId
.map(id -> isSnapshot(icebergTable, id) ? tableScan.useSnapshot(id) : tableScan.asOfTime(id))
.orElse(tableScan);
Expand All @@ -434,9 +435,9 @@ public static LocationProvider getLocationProvider(SchemaTableName schemaTableNa
return locationsFor(tableLocation, storageProperties);
}

public static TableScan buildTableScan(Table icebergTable, MetadataTableType metadataTableType)
public static TableScan buildTableScan(Table icebergTable, MetadataTableType metadataTableType, RuntimeStats runtimeStats)
{
return createMetadataTableInstance(icebergTable, metadataTableType).newScan();
return createMetadataTableInstance(icebergTable, metadataTableType).newScan().metricsReporter(new RuntimeStatsMetricsReporter(runtimeStats));
}

public static Map<String, Integer> columnNameToPositionInSchema(Schema schema)
Expand Down Expand Up @@ -593,7 +594,8 @@ public static List<HivePartition> getPartitions(
ConnectorTableHandle tableHandle,
Table icebergTable,
Constraint<ColumnHandle> constraint,
List<IcebergColumnHandle> partitionColumns)
List<IcebergColumnHandle> partitionColumns,
RuntimeStats runtimeStats)
{
IcebergTableName name = ((IcebergTableHandle) tableHandle).getIcebergTableName();
FileFormat fileFormat = getFileFormat(icebergTable);
Expand All @@ -604,6 +606,7 @@ public static List<HivePartition> getPartitions(
}

TableScan tableScan = icebergTable.newScan()
.metricsReporter(new RuntimeStatsMetricsReporter(runtimeStats))
.filter(toIcebergExpression(getNonMetadataColumnConstraints(constraint
.getSummary()
.simplify())))
Expand Down Expand Up @@ -878,10 +881,11 @@ public static CloseableIterable<DeleteFile> getDeleteFiles(Table table,
long snapshot,
TupleDomain<IcebergColumnHandle> filter,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
Optional<Set<Integer>> requestedSchema,
RuntimeStats runtimeStats)
{
Expression filterExpression = toIcebergExpression(filter);
CloseableIterable<FileScanTask> fileTasks = table.newScan().useSnapshot(snapshot).filter(filterExpression).planFiles();
CloseableIterable<FileScanTask> fileTasks = table.newScan().metricsReporter(new RuntimeStatsMetricsReporter(runtimeStats)).useSnapshot(snapshot).filter(filterExpression).planFiles();

return new CloseableIterable<DeleteFile>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
return new InMemoryRecordSet(resultTypes, ImmutableList.of()).cursor();
}
TableScan tableScan = icebergTable.newScan()
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
.useSnapshot(snapshotId.get())
.includeColumnStats();
return buildRecordCursor(getPartitions(tableScan), icebergTable.spec().fields());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.RuntimeUnit;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.ScanReport;

/**
* A MetricsReporter implementation for reporting
* Iceberg scan metrics to Presto's RuntimeStats.
*/

public final class RuntimeStatsMetricsReporter
implements MetricsReporter
{
/**
* RuntimeStats variable used for storing scan metrics from Iceberg reports.
*/
private final RuntimeStats runtimeStats;

/**
* Constructs a RuntimeStatsMetricsReporter.
*
* @param runtimeStat the RuntimeStats instance to report metrics to
*/
public RuntimeStatsMetricsReporter(final RuntimeStats runtimeStat)
{
this.runtimeStats = runtimeStat;
}

@Override
public void report(final MetricsReport report)
{
if (!(report instanceof ScanReport)) {
return;
}

ScanReport scanReport = (ScanReport) report;
String tableName = scanReport.tableName();

if (scanReport.scanMetrics().totalPlanningDuration() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "totalPlanningDuration"),
RuntimeUnit.NANO,
scanReport.scanMetrics().totalPlanningDuration()
.totalDuration().toNanos());
}

if (scanReport.scanMetrics().resultDataFiles() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "resultDataFiles"),
RuntimeUnit.NONE,
scanReport.scanMetrics().resultDataFiles().value());
}

if (scanReport.scanMetrics().resultDeleteFiles() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "resultDeleteFiles"),
RuntimeUnit.NONE,
scanReport.scanMetrics().resultDeleteFiles().value());
}

if (scanReport.scanMetrics().totalDataManifests() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "totalDataManifests"),
RuntimeUnit.NONE,
scanReport.scanMetrics().totalDataManifests().value());
}

if (scanReport.scanMetrics().totalDeleteManifests() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "totalDeleteManifests"),
RuntimeUnit.NONE,
scanReport.scanMetrics().totalDeleteManifests().value());
}

if (scanReport.scanMetrics().scannedDataManifests() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "scannedDataManifests"),
RuntimeUnit.NONE,
scanReport.scanMetrics().scannedDataManifests().value());
}

if (scanReport.scanMetrics().skippedDataManifests() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "skippedDataManifests"),
RuntimeUnit.NONE,
scanReport.scanMetrics().skippedDataManifests().value());
}

if (scanReport.scanMetrics().totalFileSizeInBytes() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "totalFileSizeInBytes"),
RuntimeUnit.BYTE,
scanReport.scanMetrics().totalFileSizeInBytes()
.value());
}

if (scanReport.scanMetrics().totalDeleteFileSizeInBytes() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "totalDeleteFileSizeInBytes"),
RuntimeUnit.BYTE,
scanReport.scanMetrics().totalDeleteFileSizeInBytes()
.value());
}

if (scanReport.scanMetrics().skippedDataFiles() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "skippedDataFiles"),
RuntimeUnit.NONE,
scanReport.scanMetrics().skippedDataFiles()
.value());
}

if (scanReport.scanMetrics().skippedDeleteFiles() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "skippedDeleteFiles"),
RuntimeUnit.NONE,
scanReport.scanMetrics().skippedDeleteFiles().value());
}

if (scanReport.scanMetrics().scannedDeleteManifests() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "scannedDeleteManifests"),
RuntimeUnit.NONE,
scanReport.scanMetrics().scannedDeleteManifests().value());
}

if (scanReport.scanMetrics().skippedDeleteManifests() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "skippedDeleteManifests"),
RuntimeUnit.NONE,
scanReport.scanMetrics().skippedDeleteManifests().value());
}

if (scanReport.scanMetrics().indexedDeleteFiles() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "indexedDeleteFiles"),
RuntimeUnit.NONE,
scanReport.scanMetrics().indexedDeleteFiles().value());
}

if (scanReport.scanMetrics().equalityDeleteFiles() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "equalityDeleteFiles"),
RuntimeUnit.NONE,
scanReport.scanMetrics().equalityDeleteFiles().value());
}

if (scanReport.scanMetrics().positionalDeleteFiles() != null) {
runtimeStats.addMetricValue(
tableScanString(tableName, "positionalDeleteFiles"),
RuntimeUnit.NONE,
scanReport.scanMetrics().positionalDeleteFiles().value());
}
}

/**
* Helper method to construct the full metric name for a table scan.
*
* @param tableName the name of the table
* @param metricName the name of the metric
* @return the composed metric name in the format: table.scan.metric
*/
private static String tableScanString(final String tableName, final String metricName)
{
return tableName + ".scan." + metricName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.iceberg;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.TimeZoneKey;
Expand Down Expand Up @@ -101,7 +102,8 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, ConnectorSession session, Table icebergTable)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
TableScan tableScan = buildTableScan(icebergTable, SNAPSHOTS);
RuntimeStats runtimeStats = session.getRuntimeStats();
TableScan tableScan = buildTableScan(icebergTable, SNAPSHOTS, runtimeStats);
TimeZoneKey timeZoneKey = session.getTimeZoneKey();

Map<String, Integer> columnNameToPosition = columnNameToPositionInSchema(tableScan.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ private Partition getDataTableSummary(IcebergTableHandle tableHandle,
List<PartitionField> partitionFields)
{
TableScan tableScan = icebergTable.newScan()
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
.filter(toIcebergExpression(intersection))
.select(selectedColumns.stream().map(IcebergColumnHandle::getName).collect(Collectors.toList()))
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get())
Expand All @@ -301,7 +302,8 @@ private Partition getEqualityDeleteTableSummary(IcebergTableHandle tableHandle,
tableHandle.getIcebergTableName().getSnapshotId().get(),
intersection,
tableHandle.getPartitionSpecId(),
tableHandle.getEqualityFieldIds());
tableHandle.getEqualityFieldIds(),
session.getRuntimeStats());
CloseableIterable<ContentFile<?>> files = CloseableIterable.transform(deleteFiles, deleteFile -> deleteFile);
return getSummaryFromFiles(files, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
}
Expand Down
Loading
Loading