Skip to content

Commit 6a9d3f7

Browse files
authored
feat: Add initial Hadoop catalog with listing tables (#13)
* feat: Add initial Hadoop catalog with listing tables * chore: Add todos for namespace HadoopCatalog checks * chore: Bump Cargo lock
1 parent edb521d commit 6a9d3f7

38 files changed

Lines changed: 433 additions & 1 deletion

Cargo.lock

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ Apache Iceberg is an active open-source project, governed under the Apache Softw
9797
at [Slack #rust channel](https://join.slack.com/t/apache-iceberg/shared_invite/zt-1zbov3k6e-KtJfoaxp97YfX6dPz1Bk7A).
9898

9999
The Apache Iceberg community is built on the principles described in the [Apache Way](https://www.apache.org/theapacheway/index.html) and all who engage with the community are expected to be respectful, open, come with the best interests of the community in mind, and abide by the Apache Foundation [Code of Conduct](https://www.apache.org/foundation/policies/conduct.html).
100+
100101
## Users
101102

102103
- [Databend](https://github.com/datafuselabs/databend/): An open-source cloud data warehouse that serves as a cost-effective alternative to Snowflake.

crates/catalog/hadoop/Cargo.toml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
edition = { workspace = true }
20+
homepage = { workspace = true }
21+
name = "iceberg-catalog-hadoop"
22+
rust-version = { workspace = true }
23+
version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Hadoop Catalog Support"
27+
keywords = ["iceberg", "hadoop", "catalog"]
28+
license = { workspace = true }
29+
repository = { workspace = true }
30+
31+
[dependencies]
32+
anyhow = { workspace = true }
33+
async-trait = { workspace = true }
34+
futures = { workspace = true }
35+
iceberg = { workspace = true }
36+
opendal = { workspace = true }
37+
serde_json = { workspace = true }
38+
tokio = { workspace = true }
39+
tracing = { workspace = true }
40+
typed-builder = { workspace = true }
41+
uuid = { workspace = true }
42+
43+
[dev-dependencies]
44+
ctor = { workspace = true }
45+
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
46+
port_scanner = { workspace = true }

crates/catalog/hadoop/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing,
13+
~ software distributed under the License is distributed on an
14+
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
~ KIND, either express or implied. See the License for the
16+
~ specific language governing permissions and limitations
17+
~ under the License.
18+
-->
19+
20+
# Apache Iceberg Hadoop Catalog Official Native Rust Implementation
21+
22+
[![crates.io](https://img.shields.io/crates/v/iceberg.svg)](https://crates.io/crates/iceberg-catalog-hadoop)
23+
[![docs.rs](https://img.shields.io/docsrs/iceberg.svg)](https://docs.rs/iceberg/latest/iceberg-catalog-hadoop/)
24+
25+
This crate contains the official Native Rust implementation of Apache Iceberg Hadoop Catalog.
26+
27+
See the [API documentation](https://docs.rs/iceberg-catalog-hadoop/latest) for examples and the full API.
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg Hadoop Catalog implementation.
19+
20+
use std::collections::HashMap;
21+
22+
use async_trait::async_trait;
23+
use futures::TryStreamExt;
24+
use iceberg::io::FileIO;
25+
use iceberg::table::Table;
26+
use iceberg::{
27+
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
28+
TableIdent,
29+
};
30+
use opendal::EntryMode;
31+
32+
/// Represents a hadoop catalog backed by storage from a `FileIO`
33+
#[derive(Debug)]
34+
pub struct HadoopCatalog {
35+
file_io: FileIO,
36+
warehouse_root: String,
37+
}
38+
39+
impl HadoopCatalog {
40+
/// Creates a new instance of a `HadoopCatalog`
41+
/// The `warehouse_root` should be the absolute path to the warehouse directory, including the scheme prefix for the FileIO
42+
pub fn new(warehouse_root: String, file_io: FileIO) -> Self {
43+
// TODO: validate the warehouse_root starts with the same scheme as the FileIO
44+
Self {
45+
file_io,
46+
warehouse_root,
47+
}
48+
}
49+
}
50+
51+
#[async_trait]
52+
impl Catalog for HadoopCatalog {
53+
// Unsupported operations in Hadoop Catalog
54+
async fn create_namespace(
55+
&self,
56+
_namespace: &NamespaceIdent,
57+
_properties: HashMap<String, String>,
58+
) -> Result<Namespace> {
59+
Err(Error::new(
60+
ErrorKind::FeatureUnsupported,
61+
"Creating namespaces is not supported in hadoop catalog",
62+
))
63+
}
64+
65+
async fn update_namespace(
66+
&self,
67+
_namespace: &NamespaceIdent,
68+
_properties: HashMap<String, String>,
69+
) -> Result<()> {
70+
Err(Error::new(
71+
ErrorKind::FeatureUnsupported,
72+
"Updating namespaces is not supported in hadoop catalog",
73+
))
74+
}
75+
76+
async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
77+
Err(Error::new(
78+
ErrorKind::FeatureUnsupported,
79+
"Dropping namespaces is not supported in hadoop catalog",
80+
))
81+
}
82+
83+
async fn create_table(
84+
&self,
85+
_namespace: &NamespaceIdent,
86+
_creation: TableCreation,
87+
) -> Result<Table> {
88+
Err(Error::new(
89+
ErrorKind::FeatureUnsupported,
90+
"Creating tables is not supported in hadoop catalog",
91+
))
92+
}
93+
94+
async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
95+
Err(Error::new(
96+
ErrorKind::FeatureUnsupported,
97+
"Dropping tables is not supported in hadoop catalog",
98+
))
99+
}
100+
101+
async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> {
102+
Err(Error::new(
103+
ErrorKind::FeatureUnsupported,
104+
"Renaming tables is not supported in hadoop catalog",
105+
))
106+
}
107+
108+
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
109+
Err(Error::new(
110+
ErrorKind::FeatureUnsupported,
111+
"Updating tables is not supported in hadoop catalog",
112+
))
113+
}
114+
115+
// Supported operations in Hadoop Catalog
116+
async fn list_namespaces(
117+
&self,
118+
_parent: Option<&NamespaceIdent>,
119+
) -> Result<Vec<NamespaceIdent>> {
120+
Err(Error::new(
121+
ErrorKind::FeatureUnsupported,
122+
"Not implemented yet",
123+
))
124+
}
125+
126+
async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result<bool> {
127+
Err(Error::new(
128+
ErrorKind::FeatureUnsupported,
129+
"Not implemented yet",
130+
))
131+
}
132+
133+
async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result<Namespace> {
134+
Err(Error::new(
135+
ErrorKind::FeatureUnsupported,
136+
"Not implemented yet",
137+
))
138+
}
139+
140+
async fn load_table(&self, _table_identifier: &TableIdent) -> Result<Table> {
141+
Err(Error::new(
142+
ErrorKind::FeatureUnsupported,
143+
"Not implemented yet",
144+
))
145+
}
146+
147+
async fn table_exists(&self, _table: &TableIdent) -> Result<bool> {
148+
Err(Error::new(
149+
ErrorKind::FeatureUnsupported,
150+
"Not implemented yet",
151+
))
152+
}
153+
154+
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
155+
// List the tables in the specified namespace
156+
let path = format!("{}/{}/", self.warehouse_root, namespace.to_string());
157+
let mut tables = Vec::new();
158+
159+
let mut lister = self.file_io.lister(&path).await?;
160+
while let Some(entry) = lister.try_next().await? {
161+
if matches!(entry.metadata().mode(), EntryMode::DIR) {
162+
if path.ends_with(entry.path()) {
163+
// Skip the root directory itself
164+
continue;
165+
}
166+
167+
let table_name = entry
168+
.name()
169+
.strip_suffix("/")
170+
.unwrap_or(entry.name())
171+
.to_string();
172+
173+
let table_ident = TableIdent {
174+
namespace: namespace.clone(),
175+
name: table_name,
176+
};
177+
178+
// TODO: validate the directory contains metadata files
179+
tables.push(table_ident);
180+
}
181+
}
182+
183+
Ok(tables)
184+
}
185+
}

crates/catalog/hadoop/src/lib.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg Glue Catalog implementation.
19+
20+
#![deny(missing_docs)]
21+
22+
mod catalog;
23+
pub use catalog::*;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Hadoop Test Data
2+
3+
* `./hadoop_warehouse` contains a single namespace `test` with a 2 tables: `my_table_1` and `my_table_2`. Each table contains 2 rows.
4+
* `my_table_1`:
5+
```console
6+
+---+----+
7+
| id|name|
8+
+---+----+
9+
| 1| foo|
10+
| 2| bar|
11+
+---+----+
12+
```
13+
* `my_table_2`:
14+
```console
15+
+---+----+
16+
| id|name|
17+
+---+----+
18+
| 3| foo|
19+
| 4| bar|
20+
+---+----+
21+
```
22+
* `./hadoop_warehouse` was generated with `spark-shell`:
23+
```bash
24+
./spark-shell \
25+
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.9.2
26+
27+
spark.conf.set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
28+
spark.conf.set("spark.sql.catalog.hadoop_prod.type", "hadoop")
29+
spark.conf.set("spark.sql.catalog.hadoop_prod.warehouse", "file:///tmp/multi_table_warehouse")
30+
31+
spark.sql("CREATE NAMESPACE hadoop_prod.test")
32+
spark.sql("CREATE TABLE hadoop_prod.test.my_table_1 (id INT, name STRING) USING iceberg")
33+
spark.sql("INSERT INTO hadoop_prod.test.my_table_1 VALUES (1, 'foo'), (2, 'bar')")
34+
spark.sql("SELECT * FROM hadoop_prod.test.my_table_1").show()
35+
spark.sql("CREATE TABLE hadoop_prod.test.my_table_2 (id INT, name STRING) USING iceberg")
36+
spark.sql("INSERT INTO hadoop_prod.test.my_table_2 VALUES (3, 'foo'), (4, 'bar')")
37+
spark.sql("SELECT * FROM hadoop_prod.test.my_table_2").show()
38+
```

0 commit comments

Comments
 (0)