Skip to content

Commit b9dc742

Browse files
committed
Add telemetry logging and observability enhancements
1 parent b0a7e0a commit b9dc742

12 files changed

Lines changed: 1003 additions & 217 deletions

File tree

src/data_lakehouse_ingest/config_loader.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,16 @@ def get_tenant(self) -> str:
325325
def get_dataset(self) -> str:
326326
return self.config.get("dataset", "")
327327

328+
def get_pipeline_name(self) -> str:
329+
explicit_pipeline_name = self.config.get("pipeline_name")
330+
if explicit_pipeline_name:
331+
return explicit_pipeline_name
332+
333+
tenant = self.config.get("tenant") or "personal"
334+
dataset = self.get_dataset() or "unknown_dataset"
335+
336+
return f"{tenant}_{dataset}_ingest"
337+
328338
def is_tenant(self) -> bool:
329339
return bool(self.config.get("is_tenant", False))
330340

src/data_lakehouse_ingest/core.py

Lines changed: 148 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from pyspark.sql import SparkSession, DataFrame
1313

1414
from .utils.report_utils import generate_report
15-
from .logger import safe_log_json
15+
from .logger import safe_log_json, finalize_logger
1616
from .config_loader import ConfigLoader
1717
from .orchestrator.models import ProcessStatus
1818

@@ -72,164 +72,179 @@ def ingest(
7272
"""
7373
logger = init_logger(logger)
7474

75-
# ----------------------------------------------------------------------
76-
# Spark Session Initialization
77-
# ----------------------------------------------------------------------
78-
"""
79-
Initialize a SparkSession if not provided by the caller.
75+
try:
76+
# ----------------------------------------------------------------------
77+
# Spark Session Initialization
78+
# ----------------------------------------------------------------------
79+
"""
80+
Initialize a SparkSession if not provided by the caller.
81+
82+
Since `berdl_notebook_utils` is an explicit project dependency, this function
83+
uses `get_spark_session()` from `berdl_notebook_utils.setup_spark_session`
84+
to construct a properly configured Spark session. If Spark initialization fails,
85+
the error is logged and a structured failure report is returned.
86+
"""
87+
if spark is None:
88+
logger.info("No SparkSession provided — initializing via get_spark_session()")
89+
try:
90+
spark = get_spark_session()
91+
except Exception as e:
92+
error_msg = f"Failed to initialize Spark session via get_spark_session(): {e}"
93+
return log_error(
94+
logger=logger,
95+
error_msg=error_msg,
96+
phase="spark_initialization",
97+
started_at=started_at,
98+
exc=e,
99+
)
80100

81-
Since `berdl_notebook_utils` is an explicit project dependency, this function
82-
uses `get_spark_session()` from `berdl_notebook_utils.setup_spark_session`
83-
to construct a properly configured Spark session. If Spark initialization fails,
84-
the error is logged and a structured failure report is returned.
85-
"""
86-
if spark is None:
87-
logger.info("No SparkSession provided — initializing via get_spark_session()")
88-
try:
89-
spark = get_spark_session()
90-
except Exception as e:
91-
error_msg = f"Failed to initialize Spark session via get_spark_session(): {e}"
92-
return log_error(
93-
logger=logger,
94-
error_msg=error_msg,
95-
phase="spark_initialization",
96-
started_at=started_at,
97-
exc=e,
101+
# ----------------------------------------------------------------------
102+
# MinIO Client Initialization
103+
# ----------------------------------------------------------------------
104+
if minio_client is None:
105+
logger.info(
106+
"No MinIO client provided — attempting auto-initialization via get_s3_client()"
98107
)
108+
try:
109+
minio_client = get_s3_client()
110+
logger.info("MinIO client successfully initialized via get_s3_client()")
111+
except Exception as e:
112+
error_msg = (
113+
"MinIO client is required for ingestion but could not be initialized. "
114+
"Call get_s3_client() and pass it explicitly into ingest(...)."
115+
)
116+
return log_error(
117+
logger=logger,
118+
error_msg=error_msg,
119+
phase="minio_initialization",
120+
started_at=started_at,
121+
exc=e,
122+
)
99123

100-
# ----------------------------------------------------------------------
101-
# MinIO Client Initialization
102-
# ----------------------------------------------------------------------
103-
if minio_client is None:
104-
logger.info("No MinIO client provided — attempting auto-initialization via get_s3_client()")
105-
try:
106-
minio_client = get_s3_client()
107-
logger.info("MinIO client successfully initialized via get_s3_client()")
108-
except Exception as e:
124+
# Defensive check in case get_s3_client() returned None without raising
125+
if minio_client is None:
109126
error_msg = (
110-
"MinIO client is required for ingestion but could not be initialized. "
111-
"Call get_s3_client() and pass it explicitly into ingest(...)."
127+
"MinIO client is required for ingestion but was not provided or initialized."
112128
)
113129
return log_error(
114130
logger=logger,
115131
error_msg=error_msg,
116132
phase="minio_initialization",
117133
started_at=started_at,
118-
exc=e,
134+
exc=None,
119135
)
120136

121-
# Defensive check in case get_s3_client() returned None without raising
122-
if minio_client is None:
123-
error_msg = "MinIO client is required for ingestion but was not provided or initialized."
124-
return log_error(
125-
logger=logger,
126-
error_msg=error_msg,
127-
phase="minio_initialization",
128-
started_at=started_at,
129-
exc=None,
130-
)
131-
132-
# --- Config Loader ---
133-
try:
134-
loader = ConfigLoader(config, logger=logger, minio_client=minio_client)
135-
except Exception as e:
136-
error_msg = f"Failed to load or validate configuration: {e}"
137-
logger.info("Ingestion terminated during config validation")
138-
return log_error(
139-
logger=logger,
140-
error_msg=error_msg,
141-
phase="config_validation",
142-
started_at=started_at,
143-
exc=e,
144-
)
145-
146-
# --- Init run context (tenant, defaults, tables, DB) ---
147-
ctx = init_run_context(spark, logger, loader)
148-
149-
# ----------------------------------------------------------------------
150-
# Validate DataFrame overrides (if provided)
151-
# ----------------------------------------------------------------------
152-
"""
153-
Validate optional DataFrame overrides.
154-
155-
Ensures `dataframes` is a dict[str, DataFrame] and that all provided
156-
table keys exist in the ingestion configuration before processing begins.
157-
"""
158-
if dataframes is not None:
159-
if not isinstance(dataframes, dict):
137+
# --- Config Loader ---
138+
try:
139+
loader = ConfigLoader(config, logger=logger, minio_client=minio_client)
140+
except Exception as e:
141+
error_msg = f"Failed to load or validate configuration: {e}"
142+
logger.info("Ingestion terminated during config validation")
160143
return log_error(
161144
logger=logger,
162-
error_msg=(
163-
"Invalid 'dataframes' argument. "
164-
"Expected dict[str, pyspark.sql.DataFrame], "
165-
f"got {type(dataframes).__name__}."
166-
),
167-
phase="dataframe_validation",
145+
error_msg=error_msg,
146+
phase="config_validation",
168147
started_at=started_at,
148+
exc=e,
169149
)
170150

171-
# Validate keys and values (accumulate errors)
172-
df_errors: list[str] = []
173-
174-
for key, value in dataframes.items():
175-
if not isinstance(key, str):
176-
df_errors.append(
177-
f"Invalid key in 'dataframes': expected str table name, got {type(key).__name__} ({key!r})."
151+
# --- Init run context (tenant, defaults, tables, DB) ---
152+
ctx = init_run_context(spark, logger, loader)
153+
154+
# ----------------------------------------------------------------------
155+
# Validate DataFrame overrides (if provided)
156+
# ----------------------------------------------------------------------
157+
"""
158+
Validate optional DataFrame overrides.
159+
160+
Ensures `dataframes` is a dict[str, DataFrame] and that all provided
161+
table keys exist in the ingestion configuration before processing begins.
162+
"""
163+
if dataframes is not None:
164+
if not isinstance(dataframes, dict):
165+
return log_error(
166+
logger=logger,
167+
error_msg=(
168+
"Invalid 'dataframes' argument. "
169+
"Expected dict[str, pyspark.sql.DataFrame], "
170+
f"got {type(dataframes).__name__}."
171+
),
172+
phase="dataframe_validation",
173+
started_at=started_at,
178174
)
179-
# If key isn't str, avoid using it in other messages safely
180-
continue
181175

182-
if not isinstance(value, DataFrame):
183-
df_errors.append(
184-
f"Invalid value for table '{key}' in 'dataframes': expected pyspark.sql.DataFrame, got {type(value).__name__}."
176+
# Validate keys and values (accumulate errors)
177+
df_errors: list[str] = []
178+
179+
for key, value in dataframes.items():
180+
if not isinstance(key, str):
181+
df_errors.append(
182+
f"Invalid key in 'dataframes': expected str table name, got {type(key).__name__} ({key!r})."
183+
)
184+
# If key isn't str, avoid using it in other messages safely
185+
continue
186+
187+
if not isinstance(value, DataFrame):
188+
df_errors.append(
189+
f"Invalid value for table '{key}' in 'dataframes': expected pyspark.sql.DataFrame, got {type(value).__name__}."
190+
)
191+
192+
if df_errors:
193+
return log_error(
194+
logger=logger,
195+
error_msg="DataFrame override validation failed:\n- " + "\n- ".join(df_errors),
196+
phase="dataframe_validation",
197+
started_at=started_at,
185198
)
186199

187-
if df_errors:
188-
return log_error(
189-
logger=logger,
190-
error_msg="DataFrame override validation failed:\n- " + "\n- ".join(df_errors),
191-
phase="dataframe_validation",
192-
started_at=started_at,
193-
)
200+
# validate that provided table names exist in config
201+
config_table_names = {t["name"] for t in ctx["tables"]}
202+
invalid_keys = {k for k in dataframes.keys() if isinstance(k, str)} - config_table_names
203+
204+
if invalid_keys:
205+
return log_error(
206+
logger=logger,
207+
error_msg=(
208+
"DataFrame override provided for unknown table(s): "
209+
f"{sorted(invalid_keys)}. "
210+
f"Valid tables: {sorted(config_table_names)}."
211+
),
212+
phase="dataframe_validation",
213+
started_at=started_at,
214+
)
194215

195-
# validate that provided table names exist in config
196-
config_table_names = {t["name"] for t in ctx["tables"]}
197-
invalid_keys = {k for k in dataframes.keys() if isinstance(k, str)} - config_table_names
216+
# --- Table-level processing ---
217+
table_reports, error_list = process_tables(
218+
spark=spark,
219+
logger=logger,
220+
loader=loader,
221+
ctx=ctx,
222+
started_at=started_at,
223+
minio_client=minio_client,
224+
dataframes=dataframes,
225+
)
198226

199-
if invalid_keys:
200-
return log_error(
201-
logger=logger,
202-
error_msg=(
203-
"DataFrame override provided for unknown table(s): "
204-
f"{sorted(invalid_keys)}. "
205-
f"Valid tables: {sorted(config_table_names)}."
206-
),
207-
phase="dataframe_validation",
208-
started_at=started_at,
209-
)
227+
# --- Final report ---
228+
report = generate_report(
229+
success=all(t.get("status") == ProcessStatus.SUCCESS for t in table_reports),
230+
started_at=started_at,
231+
tables=table_reports,
232+
errors=error_list,
233+
)
210234

211-
# --- Table-level processing ---
212-
table_reports, error_list = process_tables(
213-
spark=spark,
214-
logger=logger,
215-
loader=loader,
216-
ctx=ctx,
217-
started_at=started_at,
218-
minio_client=minio_client,
219-
dataframes=dataframes,
220-
)
235+
logger.info("Ingestion complete")
236+
safe_log_json(logger, report)
237+
return report
221238

222-
# --- Final report ---
223-
report = generate_report(
224-
success=all(t.get("status") == ProcessStatus.SUCCESS for t in table_reports),
225-
started_at=started_at,
226-
tables=table_reports,
227-
errors=error_list,
228-
)
239+
finally:
240+
# ------------------------------------------------------------------
241+
# Upload telemetry log to MinIO
242+
# ------------------------------------------------------------------
243+
try:
244+
finalize_logger(logger)
229245

230-
logger.info("Ingestion complete")
231-
safe_log_json(logger, report)
232-
return report
246+
except Exception:
247+
logger.exception("Failed to finalize ingest telemetry logger")
233248

234249

235250
def log_error(

0 commit comments

Comments
 (0)