Skip to content

Commit 60316ce

Browse files
committed
Add CloudMonitoringMetricsReporter to export Iceberg metrics to Google Cloud Monitoring
1 parent 65faa6f commit 60316ce

6 files changed

Lines changed: 475 additions & 0 deletions

File tree

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,7 @@ project(':iceberg-gcp') {
758758
compileOnly platform(libs.google.libraries.bom)
759759
compileOnly "com.google.cloud:google-cloud-storage"
760760
compileOnly "com.google.cloud:google-cloud-kms"
761+
compileOnly "com.google.cloud:google-cloud-monitoring"
761762
compileOnly(libs.gcs.analytics.core)
762763

763764
testImplementation "com.google.cloud:google-cloud-nio"

gcp-bundle/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ project(":iceberg-gcp-bundle") {
2929
implementation "com.google.cloud:google-cloud-bigquery"
3030
implementation "com.google.cloud:google-cloud-core"
3131
implementation "com.google.cloud:google-cloud-kms"
32+
implementation "com.google.cloud:google-cloud-monitoring"
3233
implementation libs.gcs.analytics.core
3334
}
3435

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.gcp.metrics;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import com.google.cloud.ServiceOptions;
24+
import com.google.cloud.monitoring.v3.MetricServiceClient;
25+
import com.google.monitoring.v3.ListTimeSeriesRequest;
26+
import com.google.monitoring.v3.ProjectName;
27+
import com.google.monitoring.v3.TimeInterval;
28+
import com.google.protobuf.util.Timestamps;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
import org.apache.iceberg.CatalogProperties;
32+
import org.apache.iceberg.catalog.TableIdentifier;
33+
import org.apache.iceberg.rest.RESTCatalog;
34+
import org.junit.jupiter.api.Assumptions;
35+
import org.junit.jupiter.api.Test;
36+
37+
public class TestCloudMonitoringMetricsReporterIntegration {
38+
39+
@Test
40+
void testMetricsReporting() throws Exception {
41+
String projectId =
42+
System.getProperty("iceberg.gcp.project", ServiceOptions.getDefaultProjectId());
43+
Assumptions.assumeTrue(projectId != null, "GCP project ID is not set in environment");
44+
45+
Map<String, String> properties = new HashMap<>();
46+
properties.put(CatalogProperties.URI, "https://biglake.googleapis.com/iceberg/v1/restcatalog");
47+
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "gs://biglake-public-nyc-taxi-iceberg");
48+
properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.gcp.gcs.GCSFileIO");
49+
properties.put("header.x-goog-user-project", projectId);
50+
properties.put("header.X-Iceberg-Access-Delegation", "vended-credentials");
51+
properties.put("rest.auth.type", "org.apache.iceberg.gcp.auth.GoogleAuthManager");
52+
properties.put("scan-planning-mode", "client");
53+
54+
properties.put(
55+
CatalogProperties.METRICS_REPORTER_IMPL,
56+
"org.apache.iceberg.gcp.metrics.CloudMonitoringMetricsReporter");
57+
properties.put("gcp.monitoring.project-id", projectId);
58+
59+
RESTCatalog catalog = new RESTCatalog();
60+
catalog.initialize("my_catalog", properties);
61+
62+
TableIdentifier tableIdent = TableIdentifier.of("public_data", "nyc_taxicab");
63+
org.apache.iceberg.Table table = catalog.loadTable(tableIdent);
64+
65+
long startTime = System.currentTimeMillis();
66+
67+
// Plan scan to trigger metrics
68+
try (org.apache.iceberg.io.CloseableIterable<org.apache.iceberg.FileScanTask> tasks =
69+
table.newScan().planFiles()) {
70+
tasks.forEach(task -> {});
71+
}
72+
73+
// Wait for async reporting
74+
Thread.sleep(10000);
75+
76+
// Verify metrics in Cloud Monitoring
77+
try (MetricServiceClient client = MetricServiceClient.create()) {
78+
TimeInterval interval =
79+
TimeInterval.newBuilder()
80+
.setStartTime(Timestamps.fromMillis(startTime))
81+
.setEndTime(Timestamps.fromMillis(System.currentTimeMillis()))
82+
.build();
83+
84+
ListTimeSeriesRequest request =
85+
ListTimeSeriesRequest.newBuilder()
86+
.setName(ProjectName.of(projectId).toString())
87+
.setFilter("metric.type=\"custom.googleapis.com/iceberg/scan_metrics\"")
88+
.setInterval(interval)
89+
.build();
90+
91+
MetricServiceClient.ListTimeSeriesPagedResponse response = client.listTimeSeries(request);
92+
93+
java.util.Iterator<com.google.monitoring.v3.TimeSeries> iterator =
94+
response.iterateAll().iterator();
95+
96+
boolean found = false;
97+
while (iterator.hasNext()) {
98+
com.google.monitoring.v3.TimeSeries ts = iterator.next();
99+
if ("scan_operations".equals(ts.getMetric().getLabelsOrDefault("metric_name", ""))) {
100+
assertThat(ts.getMetric().getLabelsOrThrow("table"))
101+
.isEqualTo("my_catalog.public_data.nyc_taxicab");
102+
found = true;
103+
break;
104+
}
105+
}
106+
assertThat(found).isTrue();
107+
}
108+
}
109+
}

gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,18 @@ public class GCPProperties implements Serializable {
7373
/** Controls whether analytics core library is enabled or not. Defaults to false. */
7474
public static final String GCS_ANALYTICS_CORE_ENABLED = "gcs.analytics-core.enabled";
7575

76+
/**
77+
* Controls the Google Cloud project ID used for Google Cloud Monitoring metrics. Defaults to the
78+
* GCS project ID.
79+
*/
80+
public static final String GCP_MONITORING_PROJECT_ID = "gcp.monitoring.project-id";
81+
82+
/** Controls the metric prefix used for Google Cloud Monitoring metrics. */
83+
public static final String GCP_MONITORING_METRIC_PREFIX = "gcp.monitoring.metric-prefix";
84+
85+
/** Default metric prefix for Google Cloud Monitoring metrics. */
86+
public static final String GCP_MONITORING_METRIC_PREFIX_DEFAULT = "custom.googleapis.com/iceberg";
87+
7688
/**
7789
* Max possible batch size for deletion. Currently, a max of 100 keys is advised, so we default to
7890
* a number below that. https://cloud.google.com/storage/docs/batch
@@ -99,6 +111,9 @@ public class GCPProperties implements Serializable {
99111
private boolean gcsOauth2RefreshCredentialsEnabled;
100112
private boolean gcsAnalyticsCoreEnabled;
101113

114+
private String gcpMonitoringProjectId;
115+
private String gcpMonitoringMetricPrefix;
116+
102117
private String gcsImpersonateServiceAccount;
103118
private int gcsImpersonateLifetimeSeconds;
104119
private List<String> gcsImpersonateDelegates;
@@ -197,6 +212,10 @@ public GCPProperties(Map<String, String> properties) {
197212

198213
gcsAnalyticsCoreEnabled =
199214
PropertyUtil.propertyAsBoolean(properties, GCS_ANALYTICS_CORE_ENABLED, false);
215+
216+
gcpMonitoringProjectId = properties.getOrDefault(GCP_MONITORING_PROJECT_ID, projectId);
217+
gcpMonitoringMetricPrefix =
218+
properties.getOrDefault(GCP_MONITORING_METRIC_PREFIX, GCP_MONITORING_METRIC_PREFIX_DEFAULT);
200219
}
201220

202221
public Optional<Integer> channelReadChunkSize() {
@@ -278,4 +297,12 @@ public Map<String, String> properties() {
278297
public boolean isGcsAnalyticsCoreEnabled() {
279298
return gcsAnalyticsCoreEnabled;
280299
}
300+
301+
public String gcpMonitoringProjectId() {
302+
return gcpMonitoringProjectId;
303+
}
304+
305+
public String gcpMonitoringMetricPrefix() {
306+
return gcpMonitoringMetricPrefix;
307+
}
281308
}

0 commit comments

Comments
 (0)