11"""
22Input/output utilities for the Data Lakehouse Ingest framework.
33Handles file format detection, data loading from Bronze sources,
4- and writing curated data to Silver Delta tables.
4+ and writing curated data to Silver tables via Iceberg catalogs .
55
66Provides a unified interface for reading CSV, TSV, JSON, and XML formats,
7- and ensures consistent creation and registration of Delta tables in Spark .
7+ and writes tables using catalog-driven APIs (no explicit path management) .
88"""
99
1010import logging
@@ -23,19 +23,18 @@ def detect_format(bronze_path: str, explicit_fmt: str | None) -> str:
2323 Determines the file format based on either an explicit configuration
2424 value (`explicit_fmt`) or by inspecting the file extension.
2525
26- Supported extensions: `.csv`, `.tsv`, `.json`, `.xml`.
26+ Supported extensions: `.csv`, `.tsv`, `.json`, `.xml`, `.parquet `.
2727
2828 Args:
2929 bronze_path (str): Full S3/local path to the input data file.
30- explicit_fmt (str | None): Optional explicit format (csv, tsv, json, xml).
30+ explicit_fmt (str | None): Optional explicit format (csv, tsv, json, xml, parquet ).
3131
3232 Returns:
33- str: The detected format name ("csv", "tsv", "json", or "xml") .
33+ str: The detected format name.
3434
3535 Notes:
3636 - Explicit format overrides file extension detection.
3737 - Defaults to "csv" when no recognizable extension is found.
38- - Ensures consistent downstream loader selection in ingestion pipelines.
3938 """
4039
4140 # TODO: Explore using python-magic or content-based format detection.
@@ -51,7 +50,6 @@ def detect_format(bronze_path: str, explicit_fmt: str | None) -> str:
5150 if explicit_fmt :
5251 return explicit_fmt .lower ()
5352
54- # Map file extensions to formats
5553 extension_map = {
5654 "csv" : "csv" ,
5755 "tsv" : "tsv" ,
@@ -61,7 +59,7 @@ def detect_format(bronze_path: str, explicit_fmt: str | None) -> str:
6159 }
6260
6361 ext = bronze_path .split ("." )[- 1 ].lower ()
64- return extension_map .get (ext , "csv" ) # default fallback
62+ return extension_map .get (ext , "csv" )
6563
6664
6765def load_table_data (
@@ -70,9 +68,12 @@ def load_table_data(
7068 fmt : str ,
7169 opts : dict ,
7270 logger : logging .Logger ,
73- ) -> tuple [object , int ]:
71+ ) -> tuple [DataFrame , int ]:
7472 """
75- Loads a DataFrame and returns (df, rows_in).
73+ Load source data into a DataFrame and return the DataFrame with its input row count.
74+
75+ Returns:
76+ tuple[DataFrame, int]: The loaded DataFrame and number of rows read from the source.
7677 """
7778 fmt_to_loader = {
7879 "json" : load_json_data ,
@@ -91,49 +92,88 @@ def load_table_data(
9192 return df , rows_in
9293
9394
94- def write_to_delta (
95+ def table_exists (spark : SparkSession , full_table : str ) -> bool :
96+ """
97+ Check whether a catalog table exists.
98+
99+ Uses Spark table access so it works with fully qualified catalog table names,
100+ including Iceberg tables.
101+ """
102+ try :
103+ spark .table (full_table ).limit (1 ).count ()
104+ return True
105+ except Exception :
106+ return False
107+
108+
109+ def write_table (
95110 df : DataFrame ,
96111 spark : SparkSession ,
97112 namespace : str ,
98- namespace_base_path : str ,
99113 name : str ,
100- silver_path : str ,
101114 partition_by : str | list [str ] | None ,
102115 mode : str ,
116+ rows_in : int ,
103117 logger : logging .Logger ,
104118) -> int :
105- # TODO: Explore replacing explicit `table_path` writes with a catalog-driven approach.
106- #
107- # Goal:
108- # Eliminate the need to manually construct and manage table paths (namespace_base_path/name)
109- # by allowing Spark to handle initial table creation and location assignment.
119+ """
120+ Write a DataFrame to a table using catalog-driven Iceberg APIs.
110121
111- # Construct deterministic table path inside namespace storage location
112- table_path = f"{ namespace_base_path } /{ name } "
122+ The Iceberg catalog manages table storage locations, so this function does
123+ not construct explicit paths or use LOCATION clauses. For overwrite mode,
124+ the table is created or replaced. For append mode, the table must already
125+ exist.
113126
114- logger .info (f"Resolved Delta target path: { table_path } " )
127+ Args:
128+ df: DataFrame to write.
129+ spark: Active SparkSession.
130+ namespace: Fully qualified namespace (e.g., ``my.dataset`` or ``kbase.dataset``).
131+ name: Table name.
132+ partition_by: Optional partition column(s).
133+ mode: Write mode. Defaults to ``"overwrite"`` when omitted.
134+ Supported values are ``"overwrite"`` and ``"append"``.
135+ rows_in: Number of rows read from the source DataFrame. This value is
136+ returned and logged as rows written, rather than counting the
137+ full target table after write.
138+ logger: Logger for structured output.
115139
116- rows_written = df .count ()
140+ Returns:
141+ Number of rows written.
142+ """
117143
118- # Write (with overwriteSchema only for overwrite mode)
119- writer = df .write .format ("delta" ).mode (mode )
144+ full_table = f"{ namespace } .{ name } "
145+ # Default mode
146+ mode = (mode or "overwrite" ).lower ()
120147
121- if mode == "overwrite" :
122- writer = writer .option ("overwriteSchema" , "true" )
148+ if mode not in {"overwrite" , "append" }:
149+ raise ValueError (
150+ f"Unsupported write mode '{ mode } ' for { full_table } . "
151+ "Supported modes are 'overwrite' and 'append'."
152+ )
123153
124- if partition_by :
125- writer = writer .partitionBy (partition_by )
154+ exists = table_exists (spark , full_table )
155+
156+ logger .info (f"Writing table: { full_table } (mode={ mode } , exists={ exists } )" )
126157
127- writer .save (table_path )
158+ if mode == "append" and not exists :
159+ raise ValueError (
160+ f"Cannot append to { full_table } because the table does not exist. "
161+ "Use mode='overwrite' or omit mode to create the table."
162+ )
163+
164+ rows_written = rows_in
165+
166+ writer = df .writeTo (full_table )
167+
168+ if partition_by :
169+ cols = [partition_by ] if isinstance (partition_by , str ) else list (partition_by )
170+ writer = writer .partitionedBy (* cols )
128171
129- # Register table if missing (no schema overwrite here!)
130- spark .sql (f"""
131- CREATE TABLE IF NOT EXISTS `{ namespace } `.`{ name } `
132- USING DELTA
133- LOCATION '{ table_path } '
134- """ )
172+ if mode == "append" :
173+ writer .append ()
174+ else :
175+ writer .createOrReplace ()
135176
136- # log rows
137- logger .info (f"Wrote { rows_written } rows → { namespace } .{ name } @ { table_path } " )
177+ logger .info (f"Wrote { rows_written } rows → { full_table } " )
138178
139179 return rows_written
0 commit comments