1313
1414Usage (via SparkApplication YAML — all args come from SparkApplication.spec.arguments):
1515 spark-submit convert_parquet_to_iceberg.py \\
16- --src-bucket <S3_BUCKET> \\
17- --src-prefix TPCDS-TEST-3TB \\
18- --glue-db tpcds_3tb \\
19- --warehouse s3a://<S3_BUCKET>/ TPCDS-TEST-3TB-ICEBERG \\
20- --region us-west-2 \\
21- [--table <single_table>] # optional: convert one table only
22-
23- S3 layout (three dedicated prefixes — no cross-contamination between benchmark runs ):
24- Parquet-only data : s3://<bucket>/TPCDS-TEST-{N}TB/{table}/ (--src-prefix)
25- Iceberg data files: s3://<bucket>/TPCDS-TEST-{N}TB-ICEBERG-DATA/ {table}/ (--data-path )
26- Iceberg metadata : s3://<bucket>/TPCDS-TEST-{N}TB-ICEBERG-WH/ (--warehouse )
16+ --src-bucket <S3_BUCKET> \\
17+ --src-prefix TPCDS-TEST-3TB \\
18+ --glue-db tpcds_3tb \\
19+ --iceberg-prefix TPCDS-TEST-3TB-ICEBERG \\
20+ --region us-west-2 \\
21+ [--table <single_table>] # optional: convert one table only
22+
23+ S3 layout (single Iceberg prefix with data/ and metadata/ subdirectories ):
24+ Parquet-only data : s3://<bucket>/TPCDS-TEST-{N}TB/{table}/ (--src-prefix)
25+ Iceberg data files: s3://<bucket>/TPCDS-TEST-{N}TB-ICEBERG/data/ {table}/ (derived )
26+ Iceberg metadata : s3://<bucket>/TPCDS-TEST-{N}TB-ICEBERG/metadata/ (derived )
2727
2828 To run at a different scale (3TB, 10TB), change --src-prefix, --glue-db,
29- --warehouse, and --data-path in the SparkApplication YAML — no image rebuild needed.
29+ and --iceberg-prefix in the SparkApplication YAML — no image rebuild needed.
3030
3131The job is idempotent — it checks existing Glue tables and skips any already converted.
3232
8181def parse_args ():
8282 parser = argparse .ArgumentParser (description = "Convert TPC-DS Parquet → Iceberg (Glue catalog)" )
8383 # ── Scale-specific args — only these change per benchmark scale in the YAML ─
84- # Scale | --src-prefix | --glue-db | --warehouse | --data-path
85- # 1TB | TPCDS-TEST-1TB | tpcds_1tb | s3a://<b>/TPCDS-TEST-1TB-ICEBERG-WH | s3a://<b>/TPCDS-TEST-1TB-ICEBERG-DATA
86- # 3TB | TPCDS-TEST-3TB | tpcds_3tb | s3a://<b>/TPCDS-TEST-3TB-ICEBERG-WH | s3a://<b>/TPCDS-TEST-3TB-ICEBERG-DATA
87- # 10TB | TPCDS-TEST-10TB | tpcds_10tb | s3a://<b>/TPCDS-TEST-10TB-ICEBERG-WH | s3a://<b>/TPCDS-TEST-10TB-ICEBERG-DATA
88- parser .add_argument ("--src-bucket" , required = True , help = "Source S3 bucket name (no s3a:// prefix)" )
89- parser .add_argument ("--src-prefix" , required = True , help = "S3 prefix for source Parquet, e.g. TPCDS-TEST-3TB" )
90- parser .add_argument ("--glue-db" , required = True , help = "Target Glue database name, e.g. tpcds_3tb" )
91- parser .add_argument ("--warehouse" , required = True , help = "Iceberg metadata warehouse, e.g. s3a://<bucket>/TPCDS-TEST-3TB-ICEBERG-WH" )
92- parser .add_argument ("--data-path" , required = True , help = "Iceberg data file root, e.g. s3a://<bucket>/TPCDS-TEST-3TB-ICEBERG-DATA" )
84+ # Scale | --src-prefix | --glue-db | --iceberg-prefix
85+ # 1TB | TPCDS-TEST-1TB | tpcds_1tb | TPCDS-TEST-1TB-ICEBERG
86+ # 3TB | TPCDS-TEST-3TB | tpcds_3tb | TPCDS-TEST-3TB-ICEBERG
87+ # 10TB | TPCDS-TEST-10TB | tpcds_10tb | TPCDS-TEST-10TB-ICEBERG
88+ parser .add_argument ("--src-bucket" , required = True , help = "Source S3 bucket name (no s3a:// prefix)" )
89+ parser .add_argument ("--src-prefix" , required = True , help = "S3 prefix for source Parquet, e.g. TPCDS-TEST-3TB" )
90+ parser .add_argument ("--glue-db" , required = True , help = "Target Glue database name, e.g. tpcds_3tb" )
91+ parser .add_argument ("--iceberg-prefix" , required = True , help = "S3 prefix for Iceberg output, e.g. TPCDS-TEST-3TB-ICEBERG" )
9392 # ── Fixed args ───────────────────────────────────────────────────────────────
9493 parser .add_argument ("--region" , default = "us-west-2" , help = "AWS region for Glue" )
9594 parser .add_argument ("--table" , default = None , help = "Convert a single table only (optional)" )
@@ -115,10 +114,10 @@ def convert_table(spark, table_name, partition_col, src_path, glue_db, data_path
115114 CTAS: read Parquet from src_path, write as partitioned Iceberg table in Glue.
116115 Uses CREATE OR REPLACE so re-running is safe (drops old Iceberg snapshot).
117116
118- Three fully separate S3 prefixes (set via YAML args — no image rebuild needed ):
119- src_path → --src-prefix (raw Parquet, read-only source)
120- data_path → -- data-path (Iceberg data files, e.g. TPCDS-TEST-3TB-ICEBERG-DATA )
121- warehouse → --warehouse (Iceberg metadata/snapshots, e.g. TPCDS-TEST-3TB-ICEBERG-WH )
117+ S3 layout (derived from --iceberg-prefix ):
118+ src_path → --src-prefix (raw Parquet, read-only source)
119+ data_path → <iceberg-prefix>/ data (Iceberg data files)
120+ warehouse → <iceberg-prefix>/metadata (Iceberg metadata/snapshots)
122121 """
123122 full_table = f"glue_catalog.{ glue_db } .{ table_name } "
124123
@@ -152,6 +151,8 @@ def main():
152151 args = parse_args ()
153152
154153 src_base = f"s3a://{ args .src_bucket } /{ args .src_prefix } "
154+ warehouse = f"s3a://{ args .src_bucket } /{ args .iceberg_prefix } /metadata"
155+ data_path = f"s3a://{ args .src_bucket } /{ args .iceberg_prefix } /data"
155156
156157 spark = (
157158 SparkSession .builder
@@ -163,7 +164,7 @@ def main():
163164 "org.apache.iceberg.spark.SparkCatalog" )
164165 .config ("spark.sql.catalog.glue_catalog.catalog-impl" ,
165166 "org.apache.iceberg.aws.glue.GlueCatalog" )
166- .config ("spark.sql.catalog.glue_catalog.warehouse" , args . warehouse )
167+ .config ("spark.sql.catalog.glue_catalog.warehouse" , warehouse )
167168 .config ("spark.sql.catalog.glue_catalog.io-impl" ,
168169 "org.apache.iceberg.aws.s3.S3FileIO" )
169170 .config ("spark.sql.catalog.glue_catalog.glue.region" , args .region )
@@ -204,7 +205,7 @@ def main():
204205
205206 src_path = f"{ src_base } /{ table_name } "
206207 try :
207- convert_table (spark , table_name , partition_col , src_path , args .glue_db , args . data_path )
208+ convert_table (spark , table_name , partition_col , src_path , args .glue_db , data_path )
208209 converted += 1
209210 except Exception as exc :
210211 print (f"[fail] { table_name } : { exc } " )
0 commit comments