Skip to content

Commit 18b0796

Browse files
authored
Merge branch 'main' into refactor/improve-recon-persistence
2 parents ade062d + a678bfe commit 18b0796

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+528
-455
lines changed

labs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name: lakebridge
33
description: Code Transpiler and Data Reconciliation tool for Accelerating Data onboarding to Databricks from EDW, CDW and other ETL sources.
44
install:
5-
script: src/databricks/labs/lakebridge/base_install.py
5+
script: src/databricks/labs/lakebridge/install.py
66
uninstall:
77
script: src/databricks/labs/lakebridge/uninstall.py
88
entrypoint: src/databricks/labs/lakebridge/cli.py

src/databricks/labs/lakebridge/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,26 @@
1+
import logging
2+
13
from databricks.sdk.core import with_user_agent_extra, with_product
4+
from databricks.labs.blueprint.entrypoint import is_in_debug
25
from databricks.labs.blueprint.logger import install_logger
36
from databricks.labs.lakebridge.__about__ import __version__
47

8+
# Ensure that anything that imports this (or lower) submodules triggers setup of the blueprint logging.
59
install_logger()
610

11+
12+
def initialize_logging() -> None:
13+
"""Common logging initialisation for non-CLI entry-points."""
14+
# This is intended to be used by all the non-CLI entry-points, such as install/uninstall hooks and pipeline tasks.
15+
# It emulates the behaviour of the blueprint App() initialisation, except that we don't have handoff from the
16+
# Databricks CLI. As such the policy is:
17+
# - The root (and logging system in general) is left alone.
18+
# - If running in the IDE debugger, databricks.* will be set to DEBUG.
19+
# - Otherwise, databricks.* will be set to INFO.
20+
databricks_log_level = logging.DEBUG if is_in_debug() else logging.INFO
21+
logging.getLogger("databricks").setLevel(databricks_log_level)
22+
23+
724
# Add lakebridge/<version> for projects depending on lakebridge as a library
825
with_user_agent_extra("lakebridge", __version__)
926

src/databricks/labs/lakebridge/assessments/configure_assessment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from databricks.labs.lakebridge.assessments import CONNECTOR_REQUIRED
1717

1818
logger = logging.getLogger(__name__)
19-
logger.setLevel(logging.INFO)
2019

2120

2221
def _save_to_disk(credential: dict, cred_file: Path) -> None:

src/databricks/labs/lakebridge/assessments/dashboards/execute.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
11
import logging
22
import os
33
import sys
4+
from collections.abc import Sequence
5+
from importlib import resources
6+
from importlib.abc import Traversable
47
from pathlib import Path
5-
import yaml
6-
from yaml.parser import ParserError
7-
from yaml.scanner import ScannerError
88

99
import duckdb
10+
import yaml
1011
from pyspark.sql import SparkSession
12+
from yaml.parser import ParserError
13+
from yaml.scanner import ScannerError
1114

15+
import databricks.labs.lakebridge.resources.assessments as assessment_resources
1216
from databricks.labs.lakebridge.assessments.profiler_validator import (
1317
EmptyTableValidationCheck,
1418
build_validation_report,
1519
ExtractSchemaValidationCheck,
1620
build_validation_report_dataframe,
1721
)
22+
from databricks.labs.lakebridge import initialize_logging
1823

1924
logger = logging.getLogger(__name__)
2025

2126

22-
def main(*argv) -> None:
27+
def main(*argv: str) -> None:
28+
"""Lakeview Jobs task entry point: profiler_dashboards"""
29+
initialize_logging()
30+
2331
logger.debug(f"Arguments received: {argv}")
2432
assert len(sys.argv) == 4, f"Invalid number of arguments: {len(sys.argv)}"
2533
catalog_name = sys.argv[0]
@@ -34,22 +42,22 @@ def main(*argv) -> None:
3442
raise ValueError("Corrupt or invalid profiler extract.")
3543

3644

37-
def _get_extract_tables(schema_def_path: str) -> list:
45+
def _get_extract_tables(schema_def_path: Path | Traversable) -> Sequence[tuple[str, str, str]]:
3846
"""
3947
Given a schema definition file for a source technology, returns a list of table info tuples:
4048
(schema_name, table_name, fully_qualified_name)
4149
"""
4250
# First, load the schema definition file
4351
try:
44-
with open(schema_def_path, 'r', encoding="UTF-8") as f:
52+
with schema_def_path.open(mode="r", encoding="utf-8") as f:
4553
data = yaml.safe_load(f)
4654
except (ParserError, ScannerError) as e:
4755
raise ValueError(f"Could not read extract schema definition '{schema_def_path}': {e}") from e
4856
except FileNotFoundError as e:
4957
raise FileNotFoundError(f"Schema definition not found: {schema_def_path}") from e
5058
# Iterate through the defined schemas and build a list of
5159
# table info tuples: (schema_name, table_name, fully_qualified_name)
52-
extracted_tables = []
60+
extracted_tables: list[tuple[str, str, str]] = []
5361
for schema_name, schema_def in data.get("schemas", {}).items():
5462
tables = schema_def.get("tables", {})
5563
for table_name in tables.keys():
@@ -64,10 +72,11 @@ def _validate_profiler_extract(
6472
) -> bool:
6573
logger.info("Validating the profiler extract file.")
6674
validation_checks: list[EmptyTableValidationCheck | ExtractSchemaValidationCheck] = []
67-
schema_def_path = f"{Path(__file__).parent}/../../resources/assessments/{source_tech}_schema_def.yml"
68-
tables = _get_extract_tables(schema_def_path)
75+
# TODO: Verify this, I don't think it works? (These files are part of the test resources.)
76+
schema_def = resources.files(assessment_resources).joinpath(f"{source_tech}_schema_def.yml")
77+
tables = _get_extract_tables(schema_def)
6978
try:
70-
with duckdb.connect(database=extract_location) as duck_conn:
79+
with duckdb.connect(database=extract_location) as duck_conn, resources.as_file(schema_def) as schema_def_path:
7180
for table_info in tables:
7281
# Ensure that the table contains data
7382
empty_check = EmptyTableValidationCheck(table_info[2])
@@ -79,7 +88,7 @@ def _validate_profiler_extract(
7988
table_info[1],
8089
source_tech=source_tech,
8190
extract_path=extract_location,
82-
schema_path=schema_def_path,
91+
schema_path=str(schema_def_path),
8392
)
8493
validation_checks.append(schema_check)
8594
report = build_validation_report(validation_checks, duck_conn)

src/databricks/labs/lakebridge/assessments/pipeline.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, FetchResult
1919

2020
logger = logging.getLogger(__name__)
21-
logger.setLevel(logging.INFO)
2221

2322
DB_NAME = "profiler_extract.db"
2423

src/databricks/labs/lakebridge/assessments/profiler.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,10 @@ def supported_platforms(cls) -> list[str]:
3939

4040
@staticmethod
4141
def path_modifier(*, config_file: str | Path, path_prefix: Path = PRODUCT_PATH_PREFIX) -> PipelineConfig:
42-
# TODO: Make this work install during developer mode
42+
# TODO: Choose a better name for this.
4343
config = PipelineClass.load_config_from_yaml(config_file)
44-
for step in config.steps:
45-
step.extract_source = f"{path_prefix}/{step.extract_source}"
46-
return config
44+
new_steps = [step.copy(extract_source=str(path_prefix / step.extract_source)) for step in config.steps]
45+
return config.copy(steps=new_steps)
4746

4847
def profile(
4948
self,
Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,29 @@
1+
import dataclasses
12
from dataclasses import dataclass, field
23

34

4-
@dataclass
5+
@dataclass(frozen=True)
56
class Step:
67
name: str
78
type: str | None
89
extract_source: str
9-
mode: str | None
10-
frequency: str | None
11-
flag: str | None
10+
mode: str = "append"
11+
frequency: str = "once"
12+
flag: str = "active"
1213
dependencies: list[str] = field(default_factory=list)
1314
comment: str | None = None
1415

15-
def __post_init__(self):
16-
if self.frequency is None:
17-
self.frequency = "once"
18-
if self.flag is None:
19-
self.flag = "active"
20-
if self.mode is None:
21-
self.mode = "append"
16+
def copy(self, /, **changes) -> "Step":
17+
return dataclasses.replace(self, **changes)
2218

2319

24-
@dataclass
20+
@dataclass(frozen=True)
2521
class PipelineConfig:
2622
name: str
2723
version: str
2824
extract_folder: str
2925
comment: str | None = None
3026
steps: list[Step] = field(default_factory=list)
27+
28+
def copy(self, /, **changes) -> "PipelineConfig":
29+
return dataclasses.replace(self, **changes)

src/databricks/labs/lakebridge/assessments/profiler_validator.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
from dataclasses import dataclass
33
from collections.abc import Sequence
4+
from pathlib import Path
45

56
import yaml
67
from duckdb import DuckDBPyConnection, CatalogException, ParserException, Error
@@ -201,7 +202,7 @@ def validate(self, connection) -> ValidationOutcome:
201202
)
202203

203204

204-
def get_profiler_extract_path(pipeline_config_path: str) -> str:
205+
def get_profiler_extract_path(pipeline_config_path: Path) -> Path:
205206
"""
206207
Returns the filesystem path of the profiler extract database.
207208
input:
@@ -211,7 +212,7 @@ def get_profiler_extract_path(pipeline_config_path: str) -> str:
211212
"""
212213
pipeline_config = PipelineClass.load_config_from_yaml(pipeline_config_path)
213214
normalized_db_path = os.path.normpath(pipeline_config.extract_folder)
214-
database_path = f"{normalized_db_path}/{PROFILER_DB_NAME}"
215+
database_path = Path(normalized_db_path) / PROFILER_DB_NAME
215216
return database_path
216217

217218

src/databricks/labs/lakebridge/base_install.py

Lines changed: 0 additions & 28 deletions
This file was deleted.

src/databricks/labs/lakebridge/cli.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from databricks.sdk import WorkspaceClient
1717

1818
from databricks.labs.blueprint.cli import App
19-
from databricks.labs.blueprint.entrypoint import is_in_debug
19+
from databricks.labs.blueprint.entrypoint import get_logger
2020
from databricks.labs.blueprint.installation import RootJsonValue, JsonObject, JsonValue
2121
from databricks.labs.blueprint.tui import Prompts
2222

@@ -30,7 +30,6 @@
3030
from databricks.labs.lakebridge.connections.credential_manager import cred_file
3131
from databricks.labs.lakebridge.helpers.recon_config_utils import ReconConfigPrompts
3232
from databricks.labs.lakebridge.helpers.telemetry_utils import make_alphanum_or_semver
33-
from databricks.labs.lakebridge.install import installer
3433
from databricks.labs.lakebridge.reconcile.runner import ReconcileRunner
3534
from databricks.labs.lakebridge.lineage import lineage_generator
3635
from databricks.labs.lakebridge.reconcile.recon_config import RECONCILE_OPERATION_NAME, AGG_RECONCILE_OPERATION_NAME
@@ -48,7 +47,6 @@
4847

4948
# Subclass to allow controlled access to protected methods.
5049
class Lakebridge(App):
51-
_logger_instance: logging.Logger | None = None
5250

5351
def create_workspace_client(self) -> WorkspaceClient:
5452
"""Create a workspace client, with the appropriate product and version information.
@@ -58,15 +56,25 @@ def create_workspace_client(self) -> WorkspaceClient:
5856
self._patch_databricks_host()
5957
return self._workspace_client()
6058

61-
def get_logger(self) -> logging.Logger:
62-
if self._logger_instance is None:
63-
self._logger_instance = self._logger
64-
self._logger_instance.setLevel(logging.INFO)
65-
return self._logger_instance
59+
def _log_level(self, raw: str) -> int:
60+
"""Convert the log-level provided by the Databricks CLI into a logging level supported by Python."""
61+
log_level = super()._log_level(raw)
62+
# Due to an issue in the handoff of the intended logging level from the Databricks CLI to our
63+
# application, we can't currently distinguish between --log-level=WARN and nothing at all, where we
64+
# prefer (and the application logging expects) INFO.
65+
#
66+
# Rather than default to only have WARNING logs show, it's preferable to default to INFO and have
67+
# --log-level=WARN not work for now.
68+
#
69+
# See: https://github.com/databrickslabs/lakebridge/issues/2167
70+
# TODO: Remove this once #2167 has been resolved.
71+
if log_level == logging.WARNING:
72+
log_level = logging.INFO
73+
return log_level
6674

6775

6876
lakebridge = Lakebridge(__file__)
69-
logger = lakebridge.get_logger()
77+
logger = get_logger(__file__)
7078

7179

7280
def raise_validation_exception(msg: str) -> NoReturn:
@@ -745,6 +753,9 @@ def install_transpile(
745753
transpiler_repository: TranspilerRepository = TranspilerRepository.user_home(),
746754
) -> None:
747755
"""Install or upgrade the Lakebridge transpilers."""
756+
# Avoid circular imports.
757+
from databricks.labs.lakebridge.install import installer # pylint: disable=cyclic-import, import-outside-toplevel
758+
748759
is_interactive = interactive_mode(interactive)
749760
ctx = ApplicationContext(w)
750761
ctx.add_user_agent_extra("cmd", "install-transpile")
@@ -804,6 +815,9 @@ def configure_reconcile(
804815
transpiler_repository: TranspilerRepository = TranspilerRepository.user_home(),
805816
) -> None:
806817
"""Configure the Lakebridge reconciliation module"""
818+
# Avoid circular imports.
819+
from databricks.labs.lakebridge.install import installer # pylint: disable=cyclic-import, import-outside-toplevel
820+
807821
ctx = ApplicationContext(w)
808822
ctx.add_user_agent_extra("cmd", "configure-reconcile")
809823
user = w.current_user
@@ -1017,8 +1031,4 @@ def create_profiler_dashboard(
10171031

10181032

10191033
if __name__ == "__main__":
1020-
app = lakebridge
1021-
logger = app.get_logger()
1022-
if is_in_debug():
1023-
logger.setLevel(logging.DEBUG)
1024-
app()
1034+
lakebridge()

0 commit comments

Comments
 (0)