Skip to content

Commit

Permalink
Add tests for REST Catalog to the Flink connector for Iceberg. #6368
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 7, 2025
1 parent 61cfb52 commit 015cba6
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 8 deletions.
2 changes: 2 additions & 0 deletions flink-connector/flink/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ dependencies {
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.metrics.core)

testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
testImplementation("org.apache.flink:flink-table-common:$flinkVersion")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ protected boolean supportGetSchemaWithoutCommentAndOption() {
return true;
}

protected abstract String getProvider();

protected abstract boolean supportDropCascade();

protected boolean supportsPrimaryKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand All @@ -37,11 +38,13 @@
import org.apache.gravitino.Catalog;
import org.apache.gravitino.client.GravitinoMetalake;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants;
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
import org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
import org.apache.gravitino.integration.test.container.ContainerSuite;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.gravitino.integration.test.util.BaseIT;
import org.apache.gravitino.server.web.JettyServerConfig;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -51,6 +54,9 @@
public abstract class FlinkEnvIT extends BaseIT {
private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvIT.class);
private static final ContainerSuite CONTAINER_SUITE = ContainerSuite.getInstance();

protected static final String icebergRestServiceName = "iceberg-rest";

protected static final String GRAVITINO_METALAKE = "flink";
protected static final String DEFAULT_CATALOG = "default_catalog";

Expand All @@ -65,32 +71,68 @@ public abstract class FlinkEnvIT extends BaseIT {

private static String gravitinoUri = "http://127.0.0.1:8090";

private final String lakeHouseIcebergProvider = "lakehouse-iceberg";

protected String icebergRestServiceUri;

@BeforeAll
void startUp() {
void startUp() throws Exception {
initHiveEnv();
if (lakeHouseIcebergProvider.equalsIgnoreCase(getProvider())) {
initIcebergRestServiceEnv();
}
// Start Gravitino server
super.startIntegrationTest();
initGravitinoEnv();
initMetalake();
initHiveEnv();
initHdfsEnv();
initFlinkEnv();
LOG.info("Startup Flink env successfully, Gravitino uri: {}.", gravitinoUri);
}

@AfterAll
static void stop() {
void stop() throws IOException, InterruptedException {
stopFlinkEnv();
stopHdfsEnv();
super.stopIntegrationTest();
LOG.info("Stop Flink env successfully.");
}

protected String flinkByPass(String key) {
return PropertiesConverter.FLINK_PROPERTY_PREFIX + key;
}

private void initIcebergRestServiceEnv() {
ignoreIcebergRestService = false;
Map<String, String> icebergRestServiceConfigs = new HashMap<>();
icebergRestServiceConfigs.put(
"gravitino."
+ icebergRestServiceName
+ "."
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE);
icebergRestServiceConfigs.put(
"gravitino."
+ icebergRestServiceName
+ "."
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
hiveMetastoreUri);
icebergRestServiceConfigs.put(
"gravitino."
+ icebergRestServiceName
+ "."
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
warehouse);
registerCustomConfigs(icebergRestServiceConfigs);
}

private void initGravitinoEnv() {
// Gravitino server is already started by AbstractIT, just construct gravitinoUrl
int gravitinoPort = getGravitinoServerPort();
gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort);
if (lakeHouseIcebergProvider.equalsIgnoreCase(getProvider())) {
this.icebergRestServiceUri = getIcebergRestServiceUri();
}
}

private void initMetalake() {
Expand Down Expand Up @@ -212,4 +254,14 @@ protected static void clearTableInSchema() {
TestUtils.assertTableResult(deleteResult, ResultKind.SUCCESS);
}
}

private String getIcebergRestServiceUri() {
JettyServerConfig jettyServerConfig =
JettyServerConfig.fromConfig(
serverConfig, String.format("gravitino.%s.", icebergRestServiceName));
return String.format(
"http://%s:%d/iceberg/", jettyServerConfig.getHost(), jettyServerConfig.getHttpPort());
}

protected abstract String getProvider();
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testCreateGravitinoIcebergCatalog() {
// Check the catalog properties.
org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName);
Map<String, String> properties = gravitinoCatalog.properties();
Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI));
Assertions.assertEquals(getUri(), properties.get(IcebergConstants.URI));

// Get the created catalog.
Optional<org.apache.flink.table.catalog.Catalog> catalog = tableEnv.getCatalog(catalogName);
Expand Down Expand Up @@ -153,14 +153,14 @@ public void testCreateGravitinoIcebergUsingSQL() {
catalogName,
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
getCatalogBackend(),
hiveMetastoreUri,
getUri(),
warehouse));
Assertions.assertTrue(metalake.catalogExists(catalogName));

// Check the properties of the created catalog.
org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName);
Map<String, String> properties = gravitinoCatalog.properties();
Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI));
Assertions.assertEquals(getUri(), properties.get(IcebergConstants.URI));

// Get the created catalog.
Optional<org.apache.flink.table.catalog.Catalog> catalog = tableEnv.getCatalog(catalogName);
Expand Down Expand Up @@ -499,4 +499,6 @@ protected boolean supportDropCascade() {
}

protected abstract String getCatalogBackend();

protected abstract String getUri();
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ protected Map<String, String> getCatalogConfigs() {
return catalogProperties;
}

@Override
protected String getCatalogBackend() {
return "hive";
}

@Override
protected String getUri() {
return hiveMetastoreUri;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.integration.test.iceberg;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants;
import org.junit.jupiter.api.Tag;

@Tag("gravitino-docker-test")
public class FlinkIcebergRestCatalogIT extends FlinkIcebergCatalogIT {

@Override
protected Map<String, String> getCatalogConfigs() {
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST);
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse);
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, icebergRestServiceUri);
return catalogProperties;
}

@Override
protected String getCatalogBackend() {
return "rest";
}

@Override
protected String getUri() {
return icebergRestServiceUri;
}
}

0 comments on commit 015cba6

Please sign in to comment.