Skip to content

Commit 887b07d

Browse files
authored
feat: Deltalake to iceberg transfer (#1058)
* feat: Deltalake to iceberg transfer * fix: add gcs deltalake to iceberg test case * fix: syntax error * fix: format and typo mistakes * fix: change input location to align with other templates * fix: improve code based on suggestions * fix: SaveMode based on enum value provided by spark * fix: SaveMode * fix: using already available metastore * fix: merge schema setting for iceberg table
1 parent a74872e commit 887b07d

File tree

13 files changed

+527
-11
lines changed

13 files changed

+527
-11
lines changed

java/.ci/dataproc-serverless-integration-tests.yaml

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ steps:
746746
script: |
747747
#!/usr/bin/env bash
748748
749-
echo "Running PubSub TO GCS avro"
749+
echo "Running PubSub TO BigQuery avro"
750750
gcloud pubsub topics publish pubsub-to-bq --message='{"Template": "PUBSUBTOBQ", "Branch": "$BRANCH_NAME", "Commit": "$SHORT_SHA"}' --project=$GCP_PROJECT
751751
cd java
752752
bin/start.sh \
@@ -758,6 +758,30 @@ steps:
758758
--templateProperty pubsub.bq.output.table=pubsubtobq
759759
waitFor: ['build-and-upload']
760760

761+
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
762+
id: pubsub-to-bigtable
763+
env:
764+
- 'GCP_PROJECT=${_GCP_PROJECT}'
765+
- 'GCS_STAGING_LOCATION=${_GCS_STAGING_LOCATION_BASE}'
766+
- 'REGION=${_REGION}'
767+
- 'SKIP_BUILD=true'
768+
- 'SERVICE_ACCOUNT_NAME=${_SERVICE_ACCOUNT}'
769+
script: |
770+
#!/usr/bin/env bash
771+
772+
echo "Running PubSub TO BigTable"
773+
gcloud pubsub topics publish pubsub-to-bt --message='{"key": "1000", "name": "dataproc_pubsub_bt_test", "address": "GCP Cloud", "emp_no": "10"}' --project=$GCP_PROJECT
774+
cd java
775+
bin/start.sh \
776+
-- --template PUBSUBTOBIGTABLE \
777+
--templateProperty pubsub.input.subscription=subs-pubsub-to-bt \
778+
--templateProperty pubsub.input.project.id=$GCP_PROJECT \
779+
--templateProperty pubsub.bigtable.output.project.id=$GCP_PROJECT \
780+
--templateProperty pubsub.bigtable.output.instance.id=bt-int-test \
781+
--templateProperty pubsub.bigtable.output.table=employee \
782+
--templateProperty pubsub.bigtable.catalog.location="gs://dataproc-templates_cloudbuild/integration-testing/bigtable/employeecatalog.json"
783+
waitFor: ['build-and-upload']
784+
761785
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
762786
id: kafka-to-bigquery
763787
env:
@@ -883,6 +907,31 @@ steps:
883907
--templateProperty kafka.gcs.await.termination.timeout.ms=60000
884908
waitFor: ['build-and-upload']
885909

910+
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
911+
id: gcs-deltalake-to-iceberg
912+
env:
913+
- 'GCP_PROJECT=${_GCP_PROJECT}'
914+
- 'GCS_STAGING_LOCATION=${_GCS_STAGING_LOCATION_BASE}'
915+
- 'REGION=${_REGION}'
916+
- 'SKIP_BUILD=true'
917+
- 'SERVICE_ACCOUNT_NAME=${_SERVICE_ACCOUNT}'
918+
- 'DATAPROC_METASTORE_SERVICE=${_ENV_DATAPROC_METASTORE_SERVICE}'
919+
script: |
920+
#!/usr/bin/env bash
921+
922+
echo "Running GCS Deltalake To Iceberg"
923+
cd java
924+
bin/start.sh \
925+
--metastore-service=$DATAPROC_METASTORE_SERVICE \
926+
-- --template GCSDELTALAKETOICEBERG \
927+
--templateProperty project.id=$GCP_PROJECT \
928+
--templateProperty gcsdeltalaketoiceberg.input.path="gs://dataproc-templates_cloudbuild/integration-testing/GCSDELTALAKETOICEBERG/hive_warehouse/delta-table-trn-data" \
929+
--templateProperty deltalake.version.as_of=0 \
930+
--templateProperty iceberg.table.name="spark_catalog.default.iceberg_tbl_trn_data" \
931+
--templateProperty iceberg.table.partition.columns="dt" \
932+
--templateProperty iceberg.gcs.output.mode="overwrite"
933+
waitFor: ['build-and-upload']
934+
886935
options:
887936
logging: CLOUD_LOGGING_ONLY
888937
pubsubTopic: projects/dataproc-templates/topics/dpt-build

java/bin/start.sh

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,18 @@ OPT_SPARK_VERSION="--version=1.2"
6868
OPT_PROJECT="--project=${GCP_PROJECT}"
6969
OPT_REGION="--region=${REGION}"
7070
OPT_JARS="--jars=file:///usr/lib/spark/connector/spark-avro.jar,${GCS_STAGING_LOCATION}/${JAR_FILE}"
71-
if [[ $OPT_SPARK_VERSION == *"=1.1"* ]]; then
72-
echo "Dataproc Serverless Runtime 1.1 or CLUSTER Job Type Detected"
73-
OPT_JARS="--jars=file:///usr/lib/spark/external/spark-avro.jar,${GCS_STAGING_LOCATION}/${JAR_FILE}"
71+
if [[ $JOB_TYPE == "SERVERLESS" ]]; then
72+
if [[ $OPT_SPARK_VERSION == *"=1.1"* ]]; then
73+
echo "Dataproc Serverless Runtime 1.1 Job Type Detected"
74+
OPT_JARS="--jars=file:///usr/lib/spark/external/spark-avro.jar,${GCS_STAGING_LOCATION}/${JAR_FILE}"
75+
else
76+
# We added Deltalake and Iceberg jars to align with serverless dataproc version compatibility
77+
# Please check below internal document to make sure you aligned with what dataproc team provides i.e. jar version, path etc.
78+
# https://docs.google.com/document/d/1VCv9sewWdFVu_2gzmZHFmikKBGxX7_zefhk0yJ-i0OA/edit?tab=t.0#heading=h.6ogrcxwzv7et
79+
# Our java templates have scope provided for these dependencies to avoid version conflicting with dataproc
80+
# We are doing this only for serverless dataproc because cluster dataproc is providing optional components which make sures dependencies available within cluster
81+
OPT_JARS="--jars=file:///usr/lib/spark/connector/spark-avro.jar,gs://spark-lib/delta/delta-spark_2.12-3.2.0.jar,gs://spark-lib/delta/delta-storage-3.2.0.jar,gs://spark-lib/iceberg/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,${GCS_STAGING_LOCATION}/${JAR_FILE}"
82+
fi
7483
fi
7584
if [[ $JOB_TYPE == "CLUSTER" ]]; then
7685
if [[ -n "${CLUSTER}" ]]; then

java/pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
<maven.compiler.target>${java.version}</maven.compiler.target>
4545

4646
<!-- Spark Version Aligned with Dataproc Serverless Spark Runtime 1.2 -->
47+
<spark.major.version>3.5</spark.major.version>
4748
<spark.version>3.5.1</spark.version>
4849

4950
<!-- GCP Dependencies. Only change when it is aligned with Spark -->
@@ -99,6 +100,12 @@
99100
<hbase.spark.connector.version>1.0.1</hbase.spark.connector.version>
100101
<hbase.client.version>2.6.0-hadoop3</hbase.client.version>
101102

103+
<!-- Iceberg Dependencies. Compatible with dataproc and spark major version. -->
104+
<iceberg.version>1.6.1</iceberg.version>
105+
106+
<!-- Deltalake Dependencies. Compatible with spark major version. -->
107+
<deltalake.version>3.2.0</deltalake.version>
108+
102109
<!-- jakarta.validation -->
103110
<hibernate.validator.version>7.0.1.Final</hibernate.validator.version>
104111
<jakarta.el.api.version>4.0.0</jakarta.el.api.version>
@@ -405,6 +412,22 @@
405412
<scope>provided</scope>
406413
</dependency>
407414

415+
<!-- Iceberg Dependencies -->
416+
<dependency>
417+
<groupId>org.apache.iceberg</groupId>
418+
<artifactId>iceberg-spark-runtime-${spark.major.version}_${scala.binary.version}</artifactId>
419+
<version>${iceberg.version}</version>
420+
<scope>provided</scope>
421+
</dependency>
422+
423+
<!-- Deltalake Dependencies -->
424+
<dependency>
425+
<groupId>io.delta</groupId>
426+
<artifactId>delta-spark_${scala.binary.version}</artifactId>
427+
<version>${deltalake.version}</version>
428+
<scope>provided</scope>
429+
</dependency>
430+
408431
<!-- jakarta.validation -->
409432
<dependency>
410433
<groupId>org.hibernate.validator</groupId>

java/src/main/java/com/google/cloud/dataproc/templates/BaseTemplate.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ enum TemplateName {
3737
GCSTOBIGQUERY,
3838
GCSTOGCS,
3939
GCSTOMONGO,
40+
GCSDELTALAKETOICEBERG,
4041
JDBCTOBIGQUERY,
4142
JDBCTOGCS,
4243
BIGQUERYTOGCS,
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.dataproc.templates.gcs;
17+
18+
import static com.google.cloud.dataproc.templates.util.TemplateConstants.*;
19+
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
import com.fasterxml.jackson.databind.DeserializationFeature;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import jakarta.validation.constraints.AssertTrue;
24+
import jakarta.validation.constraints.Min;
25+
import jakarta.validation.constraints.NotEmpty;
26+
import jakarta.validation.constraints.Pattern;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.List;
30+
import java.util.Properties;
31+
import org.apache.commons.lang3.StringUtils;
32+
33+
public class GCSDLtoIBConfig {
34+
35+
static final ObjectMapper mapper =
36+
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
37+
38+
@JsonProperty(value = PROJECT_ID_PROP)
39+
@NotEmpty
40+
private String projectId;
41+
42+
@JsonProperty(value = DELTALAKE_INPUT_LOCATION)
43+
@NotEmpty
44+
private String inputFileLocation;
45+
46+
@JsonProperty(value = DELTALAKE_VERSION_AS_OF)
47+
@Min(value = 0)
48+
private int versionAsOf;
49+
50+
@JsonProperty(value = DELTALAKE_TIMESTAMP_AS_OF)
51+
private String timestampAsOf;
52+
53+
@JsonProperty(value = ICEBERG_TABLE_NAME)
54+
@NotEmpty
55+
private String icebergTableName;
56+
57+
@JsonProperty(value = ICEBERG_TABLE_PARTITION_COLUMNS)
58+
private String icebergTablePartitionColumns;
59+
60+
@JsonProperty(value = ICEBERG_GCS_OUTPUT_MODE)
61+
@Pattern(regexp = "(?i)(Append|Overwrite)")
62+
private String icebergTableWriteMode;
63+
64+
@JsonProperty(value = ICEBERG_TABLE_MERGE_SCHEMA)
65+
private String icebergTableMergeSchema;
66+
67+
@JsonProperty(value = SPARK_LOG_LEVEL)
68+
@Pattern(regexp = "ALL|DEBUG|ERROR|FATAL|INFO|OFF|TRACE|WARN")
69+
private String sparkLogLevel;
70+
71+
public @NotEmpty String getProjectId() {
72+
return projectId;
73+
}
74+
75+
public @NotEmpty String getInputFileLocation() {
76+
return inputFileLocation;
77+
}
78+
79+
public int getVersionAsOf() {
80+
return versionAsOf;
81+
}
82+
83+
public String getTimestampAsOf() {
84+
return timestampAsOf;
85+
}
86+
87+
public @NotEmpty String getIcebergTableName() {
88+
return icebergTableName;
89+
}
90+
91+
public List<String> getIcebergTablePartitionColumns() {
92+
93+
if (icebergTablePartitionColumns != null && !icebergTablePartitionColumns.isEmpty()) {
94+
return new ArrayList<>(Arrays.asList(icebergTablePartitionColumns.split(",")));
95+
}
96+
return new ArrayList<>();
97+
}
98+
99+
public String getIcebergTableWriteMode() {
100+
return icebergTableWriteMode;
101+
}
102+
103+
public String getIcebergTableMergeSchema() {
104+
return icebergTableMergeSchema;
105+
}
106+
107+
public @NotEmpty String getSparkLogLevel() {
108+
return sparkLogLevel;
109+
}
110+
111+
@Override
112+
public String toString() {
113+
return "GCSDLtoIBConfig{"
114+
+ "projectId='"
115+
+ projectId
116+
+ '\''
117+
+ ", inputFileLocation='"
118+
+ inputFileLocation
119+
+ '\''
120+
+ ", versionAsOf="
121+
+ versionAsOf
122+
+ ", timestampAsOf='"
123+
+ timestampAsOf
124+
+ '\''
125+
+ ", icebergTableName='"
126+
+ icebergTableName
127+
+ '\''
128+
+ ", icebergTablePartitionColumns='"
129+
+ icebergTablePartitionColumns
130+
+ '\''
131+
+ ", icebergTableWriteMode='"
132+
+ icebergTableWriteMode
133+
+ '\''
134+
+ ", icebergTableMergeSchema='"
135+
+ icebergTableMergeSchema
136+
+ '\''
137+
+ ", sparkLogLevel='"
138+
+ sparkLogLevel
139+
+ '\''
140+
+ '}';
141+
}
142+
143+
public static GCSDLtoIBConfig fromProperties(Properties properties) {
144+
return mapper.convertValue(properties, GCSDLtoIBConfig.class);
145+
}
146+
147+
@AssertTrue(
148+
message =
149+
"Required parameters for GCSDeltalakeToIceberg not passed. Refer to gcs/README.md for more instructions.")
150+
private boolean isInputValid() {
151+
return StringUtils.isNotBlank(projectId)
152+
&& StringUtils.isNotBlank(inputFileLocation)
153+
&& StringUtils.isNotBlank(icebergTableName);
154+
}
155+
}

0 commit comments

Comments
 (0)