Skip to content

Commit 75e8873

Browse files
authored
Merge branch 'main' into docs/remove-outdated-teradata-mention
2 parents b8027cf + b9ccebe commit 75e8873

File tree

6 files changed

+120
-35
lines changed

6 files changed

+120
-35
lines changed

docs/lakebridge/docs/installation.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ databricks configure --host <host> --profile <profile_name>
8787

8888
**Version Required:** Python 3.10 or newer; verify that the installed version is in the supported versions (3.10–3.13).
8989

90-
- **Windows** - Install python from [here](https://www.python.org/downloads/). Your Windows computer will need a shell environment ([GitBash](https://www.git-scm.com/downloads) or [WSL](https://learn.microsoft.com/en-us/windows/wsl/about))
90+
- **Windows** - Install python from [here](https://www.python.org/downloads/).
9191
- **MacOS/Unix** - Use [brew](https://formulae.brew.sh/formula/[email protected]) to install python in macOS/Unix machines
9292

9393
**Check Python version on Windows, macOS, and Unix:**

src/databricks/labs/lakebridge/assessments/pipeline.py

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
1-
from pathlib import Path
2-
from subprocess import run, CalledProcessError, Popen, PIPE, STDOUT, DEVNULL
3-
from dataclasses import dataclass
4-
from enum import Enum
5-
6-
import sys
1+
import json
2+
import logging
73
import os
4+
import sys
85
import venv
96
import tempfile
10-
import json
11-
import logging
12-
import yaml
13-
import duckdb
7+
from dataclasses import dataclass
8+
from enum import Enum
9+
from pathlib import Path
10+
from subprocess import CalledProcessError, DEVNULL, PIPE, Popen, STDOUT, run
1411

15-
from databricks.labs.lakebridge.connections.credential_manager import cred_file
12+
import duckdb
13+
import yaml
1614

1715
from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig, Step
16+
from databricks.labs.lakebridge.connections.credential_manager import cred_file
1817
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, FetchResult
1918

2019
logger = logging.getLogger(__name__)
@@ -235,21 +234,29 @@ def _run_python_script(venv_exec_cmd, script_path, db_path, credential_config):
235234
def _save_to_db(self, result: FetchResult, step_name: str, mode: str):
236235
db_path = str(self.db_path_prefix / DB_NAME)
237236

238-
with duckdb.connect(db_path) as conn:
239-
# TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable
240-
schema = ' STRING, '.join(result.columns) + ' STRING'
237+
# Check row count and log appropriately and skip data insertion if 0 rows
238+
if not result.rows:
239+
logging.warning(
240+
f"Query for step '{step_name}' returned 0 rows. Skipping table creation and data insertion."
241+
)
242+
return
243+
244+
row_count = len(result.rows)
245+
logging.info(f"Query for step '{step_name}' returned {row_count} rows.")
246+
# TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable
247+
_result_frame = result.to_df().astype(str)
241248

242-
# Handle write modes
249+
with duckdb.connect(db_path) as conn:
250+
# DuckDB can access _result_frame from the local scope automatically.
243251
if mode == 'overwrite':
244-
conn.execute(f"CREATE OR REPLACE TABLE {step_name} ({schema})")
252+
statement = f"CREATE OR REPLACE TABLE {step_name} AS SELECT * FROM _result_frame"
245253
elif mode == 'append' and step_name not in conn.get_table_names(""):
246-
conn.execute(f"CREATE TABLE {step_name} ({schema})")
247-
248-
# Batch insert using prepared statements
249-
placeholders = ', '.join(['?' for _ in result.columns])
250-
insert_query = f"INSERT INTO {step_name} VALUES ({placeholders})"
251-
252-
conn.executemany(insert_query, result.rows)
254+
statement = f"CREATE TABLE {step_name} AS SELECT * FROM _result_frame"
255+
else:
256+
statement = f"INSERT INTO {step_name} SELECT * FROM _result_frame"
257+
logging.debug(f"Inserting {row_count} rows: {statement}")
258+
conn.execute(statement)
259+
logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.")
253260

254261
@staticmethod
255262
def _create_dir(dir_path: Path):

src/databricks/labs/lakebridge/connections/database_manager.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from typing import Any
55
from collections.abc import Sequence, Set
66

7+
import pandas as pd
8+
79
from sqlalchemy import create_engine
810
from sqlalchemy.engine import Engine, URL
911
from sqlalchemy.engine.row import Row
@@ -19,6 +21,12 @@ class FetchResult:
1921
columns: Set[str]
2022
rows: Sequence[Row[Any]]
2123

24+
def to_df(self) -> pd.DataFrame:
25+
"""Create a pandas dataframe based on these results."""
26+
# Row emulates a named tuple, which Pandas understands natively. So the columns are safely inferred unless
27+
# we have an empty result-set.
28+
return pd.DataFrame(data=self.rows) if self.rows else pd.DataFrame(columns=list(self.columns))
29+
2230

2331
class DatabaseConnector(ABC):
2432
@abstractmethod

tests/integration/assessments/test_pipeline.py

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
from collections.abc import Callable
22
from pathlib import Path
3+
from logging import Logger
34
from typing import TypeAlias
45
import duckdb
56
import pytest
67

7-
from databricks.labs.lakebridge.assessments.pipeline import PipelineClass, DB_NAME, StepExecutionStatus
8+
from databricks.labs.lakebridge.assessments.pipeline import (
9+
PipelineClass,
10+
DB_NAME,
11+
StepExecutionStatus,
12+
StepExecutionResult,
13+
)
814
from databricks.labs.lakebridge.assessments.profiler import Profiler
915

1016
from databricks.labs.lakebridge.assessments.profiler_config import Step, PipelineConfig
@@ -45,7 +51,20 @@ def python_failure_config(pipeline_configuration_loader: _Loader) -> PipelineCon
4551
return pipeline_configuration_loader(Path("pipeline_config_python_failure.yml"))
4652

4753

48-
def test_run_pipeline(sandbox_sqlserver, pipeline_config, get_logger):
54+
@pytest.fixture(scope="module")
55+
def empty_result_config() -> PipelineConfig:
56+
prefix = Path(__file__).parent
57+
config_path = f"{prefix}/../../resources/assessments/pipeline_config_empty_result.yml"
58+
config: PipelineConfig = PipelineClass.load_config_from_yaml(config_path)
59+
updated_steps = [step.copy(extract_source=f"{prefix}/../../{step.extract_source}") for step in config.steps]
60+
return config.copy(steps=updated_steps)
61+
62+
63+
def test_run_pipeline(
64+
sandbox_sqlserver: DatabaseManager,
65+
pipeline_config: PipelineConfig,
66+
get_logger: Logger,
67+
) -> None:
4968
pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver)
5069
results = pipeline.execute()
5170

@@ -56,10 +75,14 @@ def test_run_pipeline(sandbox_sqlserver, pipeline_config, get_logger):
5675
StepExecutionStatus.SKIPPED,
5776
), f"Step {result.step_name} failed with status {result.status}"
5877

59-
assert verify_output(get_logger, pipeline_config.extract_folder)
78+
assert verify_output(get_logger, Path(pipeline_config.extract_folder))
6079

6180

62-
def test_run_sql_failure_pipeline(sandbox_sqlserver, sql_failure_config, get_logger):
81+
def test_run_sql_failure_pipeline(
82+
sandbox_sqlserver: DatabaseManager,
83+
sql_failure_config: PipelineConfig,
84+
get_logger: Logger,
85+
) -> None:
6386
pipeline = PipelineClass(config=sql_failure_config, executor=sandbox_sqlserver)
6487
with pytest.raises(RuntimeError) as e:
6588
pipeline.execute()
@@ -68,7 +91,11 @@ def test_run_sql_failure_pipeline(sandbox_sqlserver, sql_failure_config, get_log
6891
assert "Pipeline execution failed due to errors in steps: invalid_sql_step" in str(e.value)
6992

7093

71-
def test_run_python_failure_pipeline(sandbox_sqlserver, python_failure_config, get_logger):
94+
def test_run_python_failure_pipeline(
95+
sandbox_sqlserver: DatabaseManager,
96+
python_failure_config: PipelineConfig,
97+
get_logger: Logger,
98+
) -> None:
7299
pipeline = PipelineClass(config=python_failure_config, executor=sandbox_sqlserver)
73100
with pytest.raises(RuntimeError) as e:
74101
pipeline.execute()
@@ -77,7 +104,11 @@ def test_run_python_failure_pipeline(sandbox_sqlserver, python_failure_config, g
77104
assert "Pipeline execution failed due to errors in steps: invalid_python_step" in str(e.value)
78105

79106

80-
def test_run_python_dep_failure_pipeline(sandbox_sqlserver, pipeline_dep_failure_config, get_logger):
107+
def test_run_python_dep_failure_pipeline(
108+
sandbox_sqlserver: DatabaseManager,
109+
pipeline_dep_failure_config: PipelineConfig,
110+
get_logger: Logger,
111+
):
81112
pipeline = PipelineClass(config=pipeline_dep_failure_config, executor=sandbox_sqlserver)
82113
with pytest.raises(RuntimeError) as e:
83114
pipeline.execute()
@@ -101,16 +132,16 @@ def test_skipped_steps(sandbox_sqlserver: DatabaseManager, pipeline_config: Pipe
101132
assert result.error_message is None, "Skipped steps should not have error messages"
102133

103134

104-
def verify_output(get_logger, path):
135+
def verify_output(get_logger: Logger, path: Path):
105136
conn = duckdb.connect(str(Path(path)) + "/" + DB_NAME)
106137

107138
expected_tables = ["usage", "inventory", "random_data"]
108139
logger = get_logger
109140
for table in expected_tables:
110141
try:
111142
result = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()
112-
logger.info(f"Count for {table}: {result[0]}")
113-
if result[0] == 0:
143+
logger.info(f"Count for {table}: {result}")
144+
if result is None or result[0] == 0:
114145
logger.debug(f"Table {table} is empty")
115146
return False
116147
except duckdb.CatalogException:
@@ -122,7 +153,7 @@ def verify_output(get_logger, path):
122153
return True
123154

124155

125-
def test_pipeline_config_comments():
156+
def test_pipeline_config_comments() -> None:
126157
pipeline_w_comments = PipelineConfig(
127158
name="warehouse_profiler",
128159
version="1.0",
@@ -136,7 +167,7 @@ def test_pipeline_config_comments():
136167
assert pipeline_wo_comments.comment is None
137168

138169

139-
def test_pipeline_step_comments():
170+
def test_pipeline_step_comments() -> None:
140171
step_w_comment = Step(
141172
name="step_w_comment",
142173
type="sql",
@@ -156,3 +187,26 @@ def test_pipeline_step_comments():
156187
)
157188
assert step_w_comment.comment == "This is a step comment."
158189
assert step_wo_comment.comment is None
190+
191+
192+
def test_run_empty_result_pipeline(
193+
sandbox_sqlserver: DatabaseManager,
194+
empty_result_config: PipelineConfig,
195+
get_logger: Logger,
196+
) -> None:
197+
pipeline = PipelineClass(config=empty_result_config, executor=sandbox_sqlserver)
198+
results = pipeline.execute()
199+
200+
# Verify step completed successfully despite empty results
201+
assert len(results) == 1
202+
assert results == [
203+
StepExecutionResult(step_name="empty_result_step", status=StepExecutionStatus.COMPLETE, error_message=None)
204+
]
205+
206+
# Verify that no table was created (processing was skipped for empty resultset)
207+
with duckdb.connect(str(Path(empty_result_config.extract_folder)) + "/" + DB_NAME) as conn:
208+
tables = conn.execute("SHOW TABLES").fetchall()
209+
table_names = [table[0] for table in tables]
210+
211+
# Table should NOT be created when resultset is empty
212+
assert "empty_result_step" not in table_names, "Empty resultset should skip table creation"
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- Query that returns valid schema but 0 rows
2+
SELECT
3+
'test' as col1,
4+
'test' as col2,
5+
'test' as col3
6+
WHERE 1 = 0
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
name: test_empty_result_pipeline
2+
version: 1.0
3+
extract_folder: /tmp/lakebridge_test_empty_result
4+
steps:
5+
- name: empty_result_step
6+
type: sql
7+
extract_source: resources/assessments/empty_resultset.sql
8+
mode: overwrite
9+
frequency: once
10+
flag: active

0 commit comments

Comments
 (0)