Skip to content

Commit 5001f13

Browse files
committed
Populate snapshot summary in iceberg ConnectorOutputMetadata
1 parent 85bfae5 commit 5001f13

File tree

3 files changed

+103
-7
lines changed

3 files changed

+103
-7
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg;
15+
16+
import com.fasterxml.jackson.annotation.JsonCreator;
17+
import com.fasterxml.jackson.annotation.JsonProperty;
18+
import com.google.common.collect.ImmutableMap;
19+
import io.trino.spi.connector.ConnectorOutputMetadata;
20+
21+
import java.util.Map;
22+
23+
final class IcebergCommitMetadata
24+
implements ConnectorOutputMetadata
25+
{
26+
private final Map<String, String> commitMetrics;
27+
28+
@JsonCreator
29+
public IcebergCommitMetadata(@JsonProperty("commitMetrics") Map<String, String> commitMetrics)
30+
{
31+
this.commitMetrics = ImmutableMap.copyOf(commitMetrics);
32+
}
33+
34+
@Override
35+
@JsonProperty
36+
public Map<String, String> getInfo()
37+
{
38+
return commitMetrics;
39+
}
40+
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation;
4747
import io.trino.plugin.hive.HiveCompressionCodec;
4848
import io.trino.plugin.hive.HiveStorageFormat;
49-
import io.trino.plugin.hive.HiveWrittenPartitions;
5049
import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer;
5150
import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats;
5251
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
@@ -1612,9 +1611,12 @@ else if (!computedStatistics.isEmpty()) {
16121611
commitTransaction(transaction, "insert");
16131612
transaction = null;
16141613

1615-
return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
1616-
.map(CommitTaskData::path)
1617-
.collect(toImmutableList())));
1614+
long insertSnapshotId = icebergTable.currentSnapshot().snapshotId();
1615+
Map<String, String> summary = icebergTable.snapshot(insertSnapshotId).summary();
1616+
if (summary == null) {
1617+
return Optional.empty();
1618+
}
1619+
return Optional.of(new IcebergCommitMetadata(summary));
16181620
}
16191621

16201622
@Override
@@ -3992,9 +3994,12 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
39923994
commitUpdateAndTransaction(appendFiles, session, transaction, "refresh materialized view");
39933995
transaction = null;
39943996
fromSnapshotForRefresh = Optional.empty();
3995-
return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
3996-
.map(CommitTaskData::path)
3997-
.collect(toImmutableList())));
3997+
long refreshedSnapshotId = icebergTable.currentSnapshot().snapshotId();
3998+
Map<String, String> summary = icebergTable.snapshot(refreshedSnapshotId).summary();
3999+
if (summary == null) {
4000+
return Optional.empty();
4001+
}
4002+
return Optional.of(new IcebergCommitMetadata(summary));
39984003
}
39994004

40004005
@Override

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.trino.Session;
2626
import io.trino.connector.MockConnectorFactory;
2727
import io.trino.connector.MockConnectorPlugin;
28+
import io.trino.execution.QueryStats;
2829
import io.trino.execution.StageId;
2930
import io.trino.execution.StageInfo;
3031
import io.trino.execution.StagesInfo;
@@ -36,6 +37,7 @@
3637
import io.trino.metadata.QualifiedObjectName;
3738
import io.trino.metadata.TableHandle;
3839
import io.trino.operator.OperatorStats;
40+
import io.trino.operator.TableFinishInfo;
3941
import io.trino.plugin.hive.HiveCompressionCodec;
4042
import io.trino.server.DynamicFilterService;
4143
import io.trino.spi.QueryId;
@@ -9253,6 +9255,55 @@ void testExplainAnalyzeSplitSourceMetrics()
92539255
"splits generation metrics");
92549256
}
92559257

9258+
@Test
9259+
void testCommitMetrics()
9260+
{
9261+
try (TestTable table = newTrinoTable(
9262+
"test_commit_metrics",
9263+
"AS SELECT * FROM nation WITH NO DATA")) {
9264+
assertQueryStats(
9265+
getSession(),
9266+
"INSERT INTO " + table.getName() + " SELECT * FROM nation",
9267+
queryStats -> {
9268+
TableFinishInfo info = getTableFinishInfo(queryStats);
9269+
assertThat(info.getConnectorOutputMetadata()).contains("\"added-records\" : \"25\"");
9270+
},
9271+
_ -> {});
9272+
}
9273+
9274+
String tableName = "test_commit_metrics_ctas_" + randomNameSuffix();
9275+
assertQueryStats(
9276+
getSession(),
9277+
"CREATE TABLE " + tableName + " AS SELECT * FROM nation",
9278+
queryStats -> {
9279+
TableFinishInfo info = getTableFinishInfo(queryStats);
9280+
assertThat(info.getConnectorOutputMetadata()).contains("\"added-records\" : \"25\"");
9281+
},
9282+
_ -> {});
9283+
assertUpdate("DROP TABLE " + tableName);
9284+
9285+
String materializedViewName = "test_commit_metrics_mv_" + randomNameSuffix();
9286+
assertUpdate("CREATE MATERIALIZED VIEW " + materializedViewName + " AS SELECT * FROM nation");
9287+
assertQueryStats(
9288+
getSession(),
9289+
"REFRESH MATERIALIZED VIEW " + materializedViewName,
9290+
queryStats -> {
9291+
TableFinishInfo info = getTableFinishInfo(queryStats);
9292+
assertThat(info.getConnectorOutputMetadata()).contains("\"added-records\" : \"25\"");
9293+
},
9294+
_ -> {});
9295+
assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName);
9296+
}
9297+
9298+
private static TableFinishInfo getTableFinishInfo(QueryStats queryStats)
9299+
{
9300+
OperatorStats finishOperatorStats = queryStats.getOperatorSummaries()
9301+
.stream()
9302+
.filter(summary -> summary.getOperatorType().startsWith("TableFinish"))
9303+
.collect(onlyElement());
9304+
return (TableFinishInfo) finishOperatorStats.getInfo();
9305+
}
9306+
92569307
// regression test for https://github.com/trinodb/trino/issues/22922
92579308
@Test
92589309
void testArrayElementChange()

0 commit comments

Comments
 (0)