|
| 1 | +import logging |
| 2 | +from pathlib import Path |
| 3 | + |
| 4 | +from databricks.labs.lakebridge.assessments.pipeline import PipelineClass |
| 5 | +from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig |
| 6 | +from databricks.labs.lakebridge.connections.database_manager import DatabaseManager |
| 7 | +from databricks.labs.lakebridge.connections.credential_manager import ( |
| 8 | + create_credential_manager, |
| 9 | +) |
| 10 | +from databricks.labs.lakebridge.connections.env_getter import EnvGetter |
| 11 | +from databricks.labs.lakebridge.assessments import ( |
| 12 | + PRODUCT_NAME, |
| 13 | + PRODUCT_PATH_PREFIX, |
| 14 | + PLATFORM_TO_SOURCE_TECHNOLOGY_CFG, |
| 15 | + CONNECTOR_REQUIRED, |
| 16 | +) |
| 17 | + |
| 18 | +logger = logging.getLogger(__name__) |
| 19 | + |
| 20 | + |
| 21 | +class Profiler: |
| 22 | + |
| 23 | + def __init__(self, platform: str, pipeline_configs: PipelineConfig | None = None): |
| 24 | + self._platform = platform |
| 25 | + self._pipeline_config = pipeline_configs |
| 26 | + |
| 27 | + @classmethod |
| 28 | + def create(cls, platform: str) -> "Profiler": |
| 29 | + pipeline_config_path = PLATFORM_TO_SOURCE_TECHNOLOGY_CFG.get(platform, None) |
| 30 | + pipeline_config = None |
| 31 | + if pipeline_config_path: |
| 32 | + pipeline_config_absolute_path = Profiler._locate_config(pipeline_config_path) |
| 33 | + pipeline_config = Profiler.path_modifier(config_file=pipeline_config_absolute_path) |
| 34 | + return cls(platform, pipeline_config) |
| 35 | + |
| 36 | + @classmethod |
| 37 | + def supported_platforms(cls) -> list[str]: |
| 38 | + return list(PLATFORM_TO_SOURCE_TECHNOLOGY_CFG.keys()) |
| 39 | + |
| 40 | + @staticmethod |
| 41 | + def path_modifier(*, config_file: str | Path, path_prefix: Path = PRODUCT_PATH_PREFIX) -> PipelineConfig: |
| 42 | + # TODO: Make this work install during developer mode |
| 43 | + config = PipelineClass.load_config_from_yaml(config_file) |
| 44 | + for step in config.steps: |
| 45 | + step.extract_source = f"{path_prefix}/{step.extract_source}" |
| 46 | + return config |
| 47 | + |
| 48 | + def profile( |
| 49 | + self, |
| 50 | + *, |
| 51 | + extractor: DatabaseManager | None = None, |
| 52 | + pipeline_config: PipelineConfig | None = None, |
| 53 | + ) -> None: |
| 54 | + platform = self._platform.lower() |
| 55 | + if not pipeline_config: |
| 56 | + if not self._pipeline_config: |
| 57 | + raise ValueError(f"Cannot Proceed without a valid pipeline configuration for {platform}") |
| 58 | + pipeline_config = self._pipeline_config |
| 59 | + self._execute(platform, pipeline_config, extractor) |
| 60 | + |
| 61 | + @staticmethod |
| 62 | + def _setup_extractor(platform: str) -> DatabaseManager | None: |
| 63 | + if not CONNECTOR_REQUIRED[platform]: |
| 64 | + return None |
| 65 | + cred_manager = create_credential_manager(PRODUCT_NAME, EnvGetter()) |
| 66 | + connect_config = cred_manager.get_credentials(platform) |
| 67 | + return DatabaseManager(platform, connect_config) |
| 68 | + |
| 69 | + def _execute(self, platform: str, pipeline_config: PipelineConfig, extractor=None) -> None: |
| 70 | + try: |
| 71 | + if extractor is None: |
| 72 | + extractor = Profiler._setup_extractor(platform) |
| 73 | + |
| 74 | + result = PipelineClass(pipeline_config, extractor).execute() |
| 75 | + logger.info(f"Profile execution has completed successfully for {platform} for more info check: {result}.") |
| 76 | + except FileNotFoundError as e: |
| 77 | + logger.error(f"Configuration file not found for source {platform}: {e}") |
| 78 | + raise FileNotFoundError(f"Configuration file not found for source {platform}: {e}") from e |
| 79 | + except Exception as e: |
| 80 | + logger.error(f"Error executing pipeline for source {platform}: {e}") |
| 81 | + raise RuntimeError(f"Pipeline execution failed for source {platform} : {e}") from e |
| 82 | + |
| 83 | + @staticmethod |
| 84 | + def _locate_config(config_path: str | Path) -> Path: |
| 85 | + config_file = PRODUCT_PATH_PREFIX / config_path |
| 86 | + if not config_file.exists(): |
| 87 | + raise FileNotFoundError(f"Configuration file not found: {config_file}") |
| 88 | + return config_file |
0 commit comments