Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

branch-3.0: [opt](vault) Check hdfs connectivity when creating hdfs storage vault #48369 #48816

Open
wants to merge 1 commit into
base: branch-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.doris.catalog;

import org.apache.doris.backup.Status;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand All @@ -31,6 +34,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -72,9 +76,10 @@ public class HdfsStorageVault extends StorageVault {
* Property keys used by Doris, and should not be put in HDFS client configs,
* such as `type`, `path_prefix`, etc.
*/
private static final Set<String> nonHdfsConfPropertyKeys = ImmutableSet.of(VAULT_TYPE, VAULT_PATH_PREFIX)
.stream().map(String::toLowerCase)
.collect(ImmutableSet.toImmutableSet());
private static final Set<String> nonHdfsConfPropertyKeys =
ImmutableSet.of(VAULT_TYPE, VAULT_PATH_PREFIX, S3Properties.VALIDITY_CHECK)
.stream().map(String::toLowerCase)
.collect(ImmutableSet.toImmutableSet());

@SerializedName(value = "properties")
private Map<String, String> properties;
Expand All @@ -85,17 +90,68 @@ public HdfsStorageVault(String name, boolean ifNotExists, boolean setAsDefault)
}

@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
for (Map.Entry<String, String> kv : properties.entrySet()) {
public void modifyProperties(Map<String, String> newProperties) throws DdlException {
for (Map.Entry<String, String> kv : newProperties.entrySet()) {
replaceIfEffectiveValue(this.properties, kv.getKey(), kv.getValue());
}
checkConnectivity(this.properties);
}

@Override
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(properties);
}

public static void checkConnectivity(Map<String, String> newProperties) throws DdlException {
if (newProperties.containsKey(S3Properties.VALIDITY_CHECK)
&& newProperties.get(S3Properties.VALIDITY_CHECK).equalsIgnoreCase("false")) {
return;
}

String hadoopFsName = null;
String pathPrefix = null;
for (Map.Entry<String, String> property : newProperties.entrySet()) {
if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
hadoopFsName = property.getValue();
} else if (property.getKey().equalsIgnoreCase(VAULT_PATH_PREFIX)) {
pathPrefix = property.getValue();
}
}
Preconditions.checkArgument(!Strings.isNullOrEmpty(hadoopFsName), "%s is null or empty", HADOOP_FS_NAME);
Preconditions.checkArgument(!Strings.isNullOrEmpty(pathPrefix), "%s is null or empty", VAULT_PATH_PREFIX);

try (DFSFileSystem dfsFileSystem = new DFSFileSystem(newProperties)) {
Long timestamp = System.currentTimeMillis();
String remotePath = hadoopFsName + "/" + pathPrefix + "/doris-check-connectivity" + timestamp.toString();

Status st = dfsFileSystem.makeDir(remotePath);
if (st != Status.OK) {
throw new DdlException(
"checkConnectivity(makeDir) failed, status: " + st + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}

st = dfsFileSystem.exists(remotePath);
if (st != Status.OK) {
throw new DdlException(
"checkConnectivity(exist) failed, status: " + st + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}

st = dfsFileSystem.delete(remotePath);
if (st != Status.OK) {
throw new DdlException(
"checkConnectivity(exist) failed, status: " + st + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}
} catch (IOException e) {
LOG.warn("checkConnectivity failed, properties:{}", new PrintableMap<>(
newProperties, "=", true, false, true, false), e);
throw new DdlException("checkConnectivity failed, properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false), e);
}
}

public static Cloud.HdfsVaultInfo generateHdfsParam(Map<String, String> properties) {
Cloud.HdfsVaultInfo.Builder hdfsVaultInfoBuilder =
Cloud.HdfsVaultInfo.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public abstract class StorageVault {
public static final String EXCLUDE_DATABASE_LIST = "exclude_database_list";
public static final String LOWER_CASE_META_NAMES = "lower_case_meta_names";
public static final String META_NAMES_MAPPING = "meta_names_mapping";

public static final String VAULT_NAME = "VAULT_NAME";

public enum StorageVaultType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public Status makeDir(String remotePath) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to make dir for " + remotePath);
}
} catch (Exception e) {
LOG.warn("failed to make dir for " + remotePath);
LOG.warn("failed to make dir for {}, exception:", remotePath, e);
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,33 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.SystemInfoService;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import mockit.Mock;
import mockit.MockUp;
import org.junit.Before;
import org.junit.Test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class HdfsStorageVaultTest {
private static final Logger LOG = LogManager.getLogger(HdfsStorageVaultTest.class);
private StorageVaultMgr mgr = new StorageVaultMgr(new SystemInfoService());

@Before
public void setUp() throws Exception {
@BeforeAll
public static void setUp() throws Exception {
Config.cloud_unique_id = "cloud_unique_id";
Config.meta_service_endpoint = "127.0.0.1:20121";
}
Expand All @@ -75,6 +83,7 @@ public void testAlterMetaServiceNormal() throws Exception {
StorageVault vault = createHdfsVault("hdfs", ImmutableMap.of(
"type", "hdfs",
"path", "abs/",
S3Properties.VALIDITY_CHECK, "false",
HdfsStorageVault.HADOOP_FS_NAME, "default"));
Map<String, String> properties = vault.getCopiedProperties();
// To check if the properties is carried correctly
Expand Down Expand Up @@ -104,7 +113,8 @@ public void testAlterMetaServiceWithDuplicateName() throws Exception {
};
StorageVault vault = createHdfsVault("hdfs", ImmutableMap.of(
"type", "hdfs",
"path", "abs/"));
"path", "abs/",
S3Properties.VALIDITY_CHECK, "false"));
mgr.createHdfsVault(vault);
Assertions.assertThrows(DdlException.class,
() -> {
Expand All @@ -131,7 +141,8 @@ public void testAlterMetaServiceWithMissingFiels() throws Exception {
};
StorageVault vault = createHdfsVault("", ImmutableMap.of(
"type", "hdfs",
"path", "abs/"));
"path", "abs/",
S3Properties.VALIDITY_CHECK, "false"));
Assertions.assertThrows(DdlException.class,
() -> {
mgr.createHdfsVault(vault);
Expand Down Expand Up @@ -161,7 +172,8 @@ public void testAlterMetaServiceIfNotExists() throws Exception {
StorageVault vault = new HdfsStorageVault("name", true, false);
vault.modifyProperties(ImmutableMap.of(
"type", "hdfs",
"path", "abs/"));
"path", "abs/",
S3Properties.VALIDITY_CHECK, "false"));
mgr.createHdfsVault(vault);
}

Expand Down Expand Up @@ -208,10 +220,58 @@ public Pair getDefaultStorageVault() {
});
vault.modifyProperties(ImmutableMap.of(
"type", "hdfs",
"path", "abs/"));
"path", "abs/",
S3Properties.VALIDITY_CHECK, "false"));
mgr.createHdfsVault(vault);
Assertions.assertTrue(mgr.getDefaultStorageVault() == null);
mgr.setDefaultStorageVault(new SetDefaultStorageVaultStmt(vault.getName()));
Assertions.assertTrue(mgr.getDefaultStorageVault().first.equals(vault.getName()));
}

@Test
public void testCheckConnectivity() {
try {
String hadoopFsName = System.getenv("HADOOP_FS_NAME");
String hadoopUser = System.getenv("HADOOP_USER");

Assumptions.assumeTrue(!Strings.isNullOrEmpty(hadoopFsName), "HADOOP_FS_NAME isNullOrEmpty.");
Assumptions.assumeTrue(!Strings.isNullOrEmpty(hadoopUser), "HADOOP_USER isNullOrEmpty.");

Map<String, String> properties = new HashMap<>();
properties.put(HdfsStorageVault.HADOOP_FS_NAME, hadoopFsName);
properties.put(AuthenticationConfig.HADOOP_USER_NAME, hadoopUser);
properties.put(HdfsStorageVault.VAULT_PATH_PREFIX, "testCheckConnectivityUtPrefix");

HdfsStorageVault vault = new HdfsStorageVault("testHdfsVault", false, false);
vault.modifyProperties(properties);
} catch (DdlException e) {
LOG.warn("testCheckConnectivity:", e);
Assertions.assertTrue(false, e.getMessage());
}
}

@Test
public void testCheckConnectivityException() {
Map<String, String> properties = new HashMap<>();
properties.put(HdfsStorageVault.HADOOP_FS_NAME, "hdfs://localhost:10000");
properties.put(AuthenticationConfig.HADOOP_USER_NAME, "notExistUser");
properties.put(HdfsStorageVault.VAULT_PATH_PREFIX, "testCheckConnectivityUtPrefix");

HdfsStorageVault vault = new HdfsStorageVault("testHdfsVault", false, false);
Assertions.assertThrows(DdlException.class, () -> {
vault.modifyProperties(properties);
});
}

@Test
public void testIgnoreCheckConnectivity() throws DdlException {
Map<String, String> properties = new HashMap<>();
properties.put(HdfsStorageVault.HADOOP_FS_NAME, "hdfs://localhost:10000");
properties.put(AuthenticationConfig.HADOOP_USER_NAME, "notExistUser");
properties.put(HdfsStorageVault.VAULT_PATH_PREFIX, "testCheckConnectivityUtPrefix");
properties.put(S3Properties.VALIDITY_CHECK, "false");

HdfsStorageVault vault = new HdfsStorageVault("testHdfsVault", false, false);
vault.modifyProperties(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ suite("test_create_vault", "nonConcurrent") {
"type"="hdfs",
"s3.bucket"="${getHmsHdfsFs()}",
"path_prefix" = "${hdfsVaultName}",
"fs.defaultFS"="${getHmsHdfsFs()}",
"hadoop.username" = "${getHmsUser()}"
);
"""
Expand All @@ -226,7 +227,8 @@ suite("test_create_vault", "nonConcurrent") {
PROPERTIES (
"type"="hdfs",
"path_prefix" = "${hdfsVaultName}",
"hadoop.username" = "${getHmsUser()}"
"hadoop.username" = "${getHmsUser()}",
"s3_validity_check" = "false"
);
"""
}, "invalid fs_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ suite("test_create_vault_with_case_sensitive", "nonConcurrent") {
"TYPE" = "HDFS",
"FS.DEFAULTFS"="${getHmsHdfsFs()}",
"PATH_PREFIX" = "${hdfsVaultName.toUpperCase()}",
"HADOOP.USERNAME" = "${getHmsUser()}"
"hadoop.username" = "${getHmsUser()}"
);
"""

Expand Down Expand Up @@ -113,6 +113,8 @@ suite("test_create_vault_with_case_sensitive", "nonConcurrent") {
PROPERTIES (
"type" = "hdfs",
"FS.DEFAULTFS"="${getHmsHdfsFs()}",
"path_prefix" = "${hdfsVaultName}",
"hadoop.username" = "${getHmsUser()}",
"s3.endpoint"="${getS3Endpoint()}",
"s3.region" = "${getS3Region()}",
"s3.access_key" = "${getS3AK()}",
Expand All @@ -131,6 +133,9 @@ suite("test_create_vault_with_case_sensitive", "nonConcurrent") {
CREATE STORAGE VAULT ${s3VaultName}
PROPERTIES (
"type" = "HDFS",
"FS.DEFAULTFS"="${getHmsHdfsFs()}",
"path_prefix" = "${hdfsVaultName}",
"hadoop.username" = "${getHmsUser()}",
"s3.endpoint"="${getS3Endpoint()}",
"s3.region" = "${getS3Region()}",
"s3.access_key" = "${getS3AK()}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,26 @@ suite("test_create_vault_with_kerberos", "nonConcurrent") {
def tableName = "tbl_" + randomStr
def tableName2 = "tbl2_" + randomStr

expectExceptionLike({
sql """
CREATE STORAGE VAULT ${hdfsVaultName}
PROPERTIES (
"type" = "hdfs",
"fs.defaultFS"="${getHmsHdfsFs()}",
"path_prefix" = "${hdfsVaultName}",
"hadoop.username" = "not_exist_user"
);
"""
}, "Permission denied")

sql """
CREATE STORAGE VAULT ${hdfsVaultName}
PROPERTIES (
"type" = "hdfs",
"fs.defaultFS"="${getHmsHdfsFs()}",
"path_prefix" = "${hdfsVaultName}",
"hadoop.username" = "not_exist_user"
"hadoop.username" = "not_exist_user",
"s3_validity_check" = "false"
);
"""

Expand Down Expand Up @@ -83,7 +96,8 @@ suite("test_create_vault_with_kerberos", "nonConcurrent") {
"hadoop.username" = "${getHmsUser()}",
"hadoop.security.authentication" = "kerberos",
"hadoop.kerberos.principal" = "hadoop/127.0.0.1@XXX",
"hadoop.kerberos.keytab" = "/etc/emr.keytab"
"hadoop.kerberos.keytab" = "/etc/not_exist/emr.keytab",
"s3_validity_check" = "false"
);
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ suite("test_vault_privilege_restart", "nonConcurrent") {
PROPERTIES (
"type"="hdfs",
"fs.defaultFS"="${dummyHdfsEndpoint}",
"path_prefix" = "test_vault_privilege_restart"
"path_prefix" = "test_vault_privilege_restart",
"s3_validity_check" = "false"
);
"""

Expand Down