Skip to content

Commit c97813c

Browse files
authored
[lake/iceberg] Support pass hadoop configuration (apache#1541)
1 parent 1a48571 commit c97813c

File tree

8 files changed

+393
-14
lines changed

8 files changed

+393
-14
lines changed

fluss-lake/fluss-lake-iceberg/pom.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@
3333
<packaging>jar</packaging>
3434

3535
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.iceberg</groupId>
38+
<artifactId>iceberg-api</artifactId>
39+
<version>${iceberg.version}</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.iceberg</groupId>
43+
<artifactId>iceberg-common</artifactId>
44+
<version>${iceberg.version}</version>
45+
</dependency>
3646
<dependency>
3747
<groupId>org.apache.iceberg</groupId>
3848
<artifactId>iceberg-core</artifactId>
@@ -48,10 +58,21 @@
4858
<artifactId>iceberg-parquet</artifactId>
4959
<version>${iceberg.version}</version>
5060
</dependency>
61+
<dependency>
62+
<groupId>org.apache.iceberg</groupId>
63+
<artifactId>iceberg-orc</artifactId>
64+
<version>${iceberg.version}</version>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.apache.iceberg</groupId>
68+
<artifactId>iceberg-bundled-guava</artifactId>
69+
<version>${iceberg.version}</version>
70+
</dependency>
5171
<dependency>
5272
<groupId>com.alibaba.fluss</groupId>
5373
<artifactId>fluss-common</artifactId>
5474
<version>${project.version}</version>
75+
<scope>provided</scope>
5576
</dependency>
5677
<dependency>
5778
<groupId>com.alibaba.fluss</groupId>
@@ -136,7 +157,13 @@
136157
<configuration>
137158
<artifactSet>
138159
<includes>
160+
<include>org.apache.iceberg:iceberg-api</include>
139161
<include>org.apache.iceberg:iceberg-core</include>
162+
<include>org.apache.iceberg:iceberg-data</include>
163+
<include>org.apache.iceberg:iceberg-common</include>
164+
<include>org.apache.iceberg:iceberg-parquet</include>
165+
<include>org.apache.iceberg:iceberg-orc</include>
166+
<include>org.apache.iceberg:iceberg-bundled-guava</include>
140167
</includes>
141168
</artifactSet>
142169
<filters>

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.annotation.VisibleForTesting;
2121
import com.alibaba.fluss.config.Configuration;
2222
import com.alibaba.fluss.exception.TableAlreadyExistException;
23+
import com.alibaba.fluss.lake.iceberg.conf.IcebergConfiguration;
2324
import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
2425
import com.alibaba.fluss.metadata.TableDescriptor;
2526
import com.alibaba.fluss.metadata.TablePath;
@@ -53,6 +54,8 @@
5354
/** An Iceberg implementation of {@link LakeCatalog}. */
5455
public class IcebergLakeCatalog implements LakeCatalog {
5556

57+
public static final String ICEBERG_CATALOG_DEFAULT_NAME = "fluss-iceberg-catalog";
58+
5659
private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new LinkedHashMap<>();
5760

5861
static {
@@ -81,15 +84,9 @@ protected Catalog getIcebergCatalog() {
8184

8285
private Catalog createIcebergCatalog(Configuration configuration) {
8386
Map<String, String> icebergProps = configuration.toMap();
84-
85-
String catalogName = icebergProps.getOrDefault("name", "fluss-iceberg-catalog");
86-
87+
String catalogName = icebergProps.getOrDefault("name", ICEBERG_CATALOG_DEFAULT_NAME);
8788
return buildIcebergCatalog(
88-
catalogName,
89-
icebergProps, // todo: current is an empty configuration, need to init from env or
90-
// fluss
91-
// configurations
92-
new org.apache.hadoop.conf.Configuration());
89+
catalogName, icebergProps, IcebergConfiguration.from(configuration).get());
9390
}
9491

9592
@Override
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.iceberg.conf;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
22+
import java.io.IOException;
23+
import java.io.ObjectInputStream;
24+
import java.io.ObjectOutputStream;
25+
26+
/** Serde of {@link Configuration} . */
27+
public class HadoopConfSerde {
28+
29+
public static void writeObject(ObjectOutputStream out, Object hadoopConf) throws IOException {
30+
try {
31+
Configuration conf = (Configuration) hadoopConf;
32+
conf.write(out);
33+
} catch (IOException e) {
34+
throw new IOException("Failed to serialize Hadoop Configuration: " + e.getMessage(), e);
35+
}
36+
}
37+
38+
public static Configuration readObject(ObjectInputStream in) throws IOException {
39+
try {
40+
Configuration hadoopConf = new Configuration();
41+
hadoopConf.readFields(in);
42+
return hadoopConf;
43+
} catch (IOException e) {
44+
throw new IOException(
45+
"Failed to deserialize Hadoop Configuration: " + e.getMessage(), e);
46+
}
47+
}
48+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.iceberg.conf;
19+
20+
import com.alibaba.fluss.config.ConfigBuilder;
21+
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hdfs.HdfsConfiguration;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import java.io.File;
28+
29+
/**
30+
* Utility class for working with Hadoop-related classes. This should only be used if Hadoop is on
31+
* the classpath.
32+
*
33+
* <p>Note: Copied from HadoopUtils of fluss-fs-hadoop module
34+
*/
35+
public class HadoopUtils {
36+
37+
private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
38+
39+
/** The prefixes that Fluss adds to the Hadoop config for iceberg. */
40+
private static final String[] FLUSS_CONFIG_PREFIXES = {"iceberg.hadoop."};
41+
42+
public static Configuration getHadoopConfiguration(
43+
com.alibaba.fluss.config.Configuration flussConfiguration) {
44+
45+
// Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
46+
// from the classpath
47+
48+
Configuration result = new HdfsConfiguration();
49+
boolean foundHadoopConfiguration = false;
50+
51+
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
52+
// the hdfs configuration.
53+
// The properties of a newly added resource will override the ones in previous resources, so
54+
// a configuration
55+
// file with higher priority should be added later.
56+
57+
// Approach 1: HADOOP_HOME environment variables
58+
String[] possibleHadoopConfPaths = new String[2];
59+
60+
final String hadoopHome = System.getenv("HADOOP_HOME");
61+
if (hadoopHome != null) {
62+
LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
63+
possibleHadoopConfPaths[0] = hadoopHome + "/conf";
64+
possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
65+
}
66+
67+
for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
68+
if (possibleHadoopConfPath != null) {
69+
foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
70+
}
71+
}
72+
73+
// Approach 2: HADOOP_CONF_DIR environment variable
74+
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
75+
if (hadoopConfDir != null) {
76+
LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
77+
foundHadoopConfiguration =
78+
addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
79+
}
80+
81+
// Approach 3: Fluss configuration
82+
// add all configuration key with prefix 'iceberg.hadoop.' in fluss conf to hadoop conf
83+
for (String key : flussConfiguration.keySet()) {
84+
for (String prefix : FLUSS_CONFIG_PREFIXES) {
85+
if (key.startsWith(prefix)) {
86+
String newKey = key.substring(prefix.length());
87+
String value =
88+
flussConfiguration.getString(
89+
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
90+
result.set(newKey, value);
91+
LOG.debug(
92+
"Adding Fluss config entry for {} as {}={} to Hadoop config",
93+
key,
94+
newKey,
95+
value);
96+
foundHadoopConfiguration = true;
97+
}
98+
}
99+
}
100+
101+
if (!foundHadoopConfiguration) {
102+
LOG.warn(
103+
"Could not find Hadoop configuration via any of the supported methods "
104+
+ "(Fluss configuration, environment variables).");
105+
}
106+
107+
return result;
108+
}
109+
110+
/**
111+
* Search Hadoop configuration files in the given path, and add them to the configuration if
112+
* found.
113+
*/
114+
private static boolean addHadoopConfIfFound(
115+
Configuration configuration, String possibleHadoopConfPath) {
116+
boolean foundHadoopConfiguration = false;
117+
if (new File(possibleHadoopConfPath).exists()) {
118+
if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
119+
configuration.addResource(
120+
new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
121+
LOG.debug(
122+
"Adding {}/core-site.xml to hadoop configuration", possibleHadoopConfPath);
123+
foundHadoopConfiguration = true;
124+
}
125+
if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
126+
configuration.addResource(
127+
new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
128+
LOG.debug(
129+
"Adding {}/hdfs-site.xml to hadoop configuration", possibleHadoopConfPath);
130+
foundHadoopConfiguration = true;
131+
}
132+
}
133+
return foundHadoopConfiguration;
134+
}
135+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.iceberg.conf;
19+
20+
import com.alibaba.fluss.annotation.VisibleForTesting;
21+
import com.alibaba.fluss.config.Configuration;
22+
23+
import org.apache.iceberg.common.DynClasses;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import java.io.IOException;
28+
import java.io.ObjectInputStream;
29+
import java.io.ObjectOutputStream;
30+
import java.io.Serializable;
31+
32+
/**
33+
* Wraps the hadoop configuration used to configure {@link org.apache.iceberg.catalog.Catalog} if
34+
* hadoop related classes is available.
35+
*
36+
* <p>It don't declare Hadoop configuration explicitly for some catalogs won't need hadoop
37+
* configuration. For these catalogs, it won't throw class not found exception. It set the conf to
38+
* null if no hadoop dependencies are found. It's fine to use null for the catalogs don't require
39+
* Hadoop configuration.
40+
*
41+
* <p>For the catalogs require Hadoop configuration, hadoop related class not found exception will
42+
* be thrown which guides users to add hadoop related classes.
43+
*/
44+
public class IcebergConfiguration implements Serializable {
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(IcebergConfiguration.class);
47+
48+
private transient Object conf;
49+
50+
@VisibleForTesting
51+
protected IcebergConfiguration(Object conf) {
52+
this.conf = conf;
53+
}
54+
55+
public static IcebergConfiguration from(Configuration flussConfig) {
56+
return new IcebergConfiguration(loadHadoopConfig(flussConfig));
57+
}
58+
59+
private void writeObject(ObjectOutputStream out) throws IOException {
60+
out.defaultWriteObject();
61+
if (conf == null) {
62+
out.writeBoolean(false);
63+
} else {
64+
out.writeBoolean(true);
65+
HadoopConfSerde.writeObject(out, conf);
66+
}
67+
}
68+
69+
private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
70+
in.defaultReadObject();
71+
boolean configIsNotNull = in.readBoolean();
72+
if (configIsNotNull) {
73+
conf = HadoopConfSerde.readObject(in);
74+
} else {
75+
conf = null;
76+
}
77+
}
78+
79+
private static Object loadHadoopConfig(Configuration flussConfig) {
80+
Class<?> configClass =
81+
DynClasses.builder()
82+
.impl("org.apache.hadoop.hdfs.HdfsConfiguration")
83+
.impl("org.apache.hadoop.conf.Configuration")
84+
.orNull()
85+
.build();
86+
87+
if (configClass == null) {
88+
LOG.info("Hadoop not found on classpath, not creating Hadoop config");
89+
return null;
90+
}
91+
92+
return HadoopUtils.getHadoopConfiguration(flussConfig);
93+
}
94+
95+
public Object get() {
96+
return conf;
97+
}
98+
}

0 commit comments

Comments
 (0)