-
Notifications
You must be signed in to change notification settings - Fork 78
Profiler CLI #1623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Profiler CLI #1623
Changes from 94 commits
e61dd3e
bfa8490
65bd96f
e578178
73b36c2
8e95927
cabf93a
ec33f67
440cc70
d6fcabb
2988569
f0c87d6
0764e01
d188705
68b0f83
5bd9408
5899a3c
3cdbf7e
822da6b
299d293
9186b89
18f9b42
877a239
492a12f
d0ca03f
603aa0f
204c639
1af4de9
75fbfb5
de87a2a
3288543
2768763
b5bf390
1d9673f
fb66dd3
be954be
92f3d68
aea2178
0d6a047
c528807
eabdf29
9308475
c1a5913
79100e4
35aa15d
7458e3b
f4c63b9
8912f1f
af3ffdc
b192d10
8e6bffa
f053e7d
ab4ae41
b3a4cdd
02a0a56
cc5aea8
f58c9c9
d54589b
72ebe58
699d317
bdb0052
8bf4023
7ea7a3c
4782c14
37caa5f
e91a6d9
37e3b6e
a74cf51
a0324c5
0e7af90
2af79ca
8665b3e
849f67e
e541c22
f642683
05d20e3
40fc5ec
e153a91
dab0fbf
ba305ba
76d2e1f
ef09e51
414574f
0dbe824
480a590
d90cc7b
b23ad5f
1fbd1b2
1359234
ebce184
094e442
1991d8f
01f7850
7900fba
c0e7bf8
9bd424f
a4c9491
5e2709b
c43fd7f
3c76d7f
e271bc3
570384b
dff71af
6508981
f5216c5
4951a22
c0b98ae
08503d9
dfa0e85
9151f69
a2f5289
bbc9775
81ce857
e44af22
1d0375c
1bf1106
ffdb9a8
b00ff9e
b5120b3
5682bfe
3e874d8
a7f585c
741eb63
228ed24
dec2b71
37fd059
01b0868
dfc2d87
3cc3c83
d7ee063
562e3a6
29cdca6
b3f531b
0130766
89f908e
bc9239d
d38abfc
2678720
9219f0f
d0960b7
d7c9196
10077f3
67545d6
c36bb95
5f13f0f
46dcee6
153f224
7cf7c5c
b969785
7b0acd5
8f4405c
8a745dc
400b586
bfe2c37
33a42c8
abbe23b
34b6d9b
c095e8f
9e56b8e
a230739
be9b358
dcf8ee9
3dedb84
ee2a99e
a045625
aeb2e00
6c0a539
028fadf
18ff36c
177e69b
26fc1ed
eee4c24
ffc33aa
9853440
4c386bd
ac77f81
f916a99
afd8272
75c5b5b
8a631ed
26fd78a
11e38d3
e1e23f2
baef44e
9111c31
6d0de29
a604e0b
ec2c1e4
599cbf5
baf0eb3
da0c85c
e8f59a4
b8a6c70
ae841e7
354a461
d56958c
9925b44
d305754
eac40a3
c79c01a
2195df6
ccba2df
9a9e909
635b84a
c23cb0e
53b3fa9
be777f6
4763885
6f8997c
4992e6b
1a5288f
9df6d0b
9e55fe8
393dbe9
11da6d9
3dc6f88
6085fca
cb38bf5
13417bf
53429c4
b660e4a
3dd7455
5dbb664
b262013
6f547d2
fc076cd
fe50a8b
a08f4ab
3fb3165
773cbba
5643750
1c40b90
451caf2
77cc4a7
e2c0819
a5630cd
9847822
a3d03d6
0e659b0
37029bb
fca09b9
e3f7215
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| import logging | ||
| from pathlib import Path | ||
|
|
||
| from databricks.labs.lakebridge.assessments.pipeline import PipelineClass | ||
| from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig | ||
| from databricks.labs.lakebridge.connections.database_manager import DatabaseManager | ||
| from databricks.labs.lakebridge.connections.credential_manager import ( | ||
| create_credential_manager, | ||
| ) | ||
| from databricks.labs.lakebridge.connections.env_getter import EnvGetter | ||
|
|
||
| _PLATFORM_TO_SOURCE_TECHNOLOGY = { | ||
| "Synapse": "src/databricks/labs/lakebridge/resources/assessments/synapse/pipeline_config.yml", | ||
| } | ||
|
|
||
| _CONNECTOR_REQUIRED = { | ||
| "Synapse": False, | ||
| } | ||
|
|
||
| PRODUCT_NAME = "lakebridge" | ||
| PRODUCT_PATH_PREFIX = Path(__file__).home() / ".databricks" / "labs" / PRODUCT_NAME / "lib" | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Profiler: | ||
|
|
||
| @classmethod | ||
| def supported_source_technologies(cls) -> list[str]: | ||
| return list(_PLATFORM_TO_SOURCE_TECHNOLOGY.keys()) | ||
|
|
||
| @staticmethod | ||
| def path_modifier(config_file: str | Path) -> PipelineConfig: | ||
| # TODO: Make this work install during developer mode | ||
sundarshankar89 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| config = PipelineClass.load_config_from_yaml(config_file) | ||
| for step in config.steps: | ||
| step.extract_source = f"{PRODUCT_PATH_PREFIX}/{step.extract_source}" | ||
| return config | ||
|
|
||
| def profile(self, platform: str, extractor: DatabaseManager | None = None): | ||
| config_path = _PLATFORM_TO_SOURCE_TECHNOLOGY.get(platform, None) | ||
| if not config_path: | ||
| raise ValueError(f"Unsupported platform: {platform}") | ||
| self._execute(platform, config_path, extractor) | ||
|
|
||
| def _setup_extractor(self, platform: str) -> DatabaseManager | None: | ||
| if not _CONNECTOR_REQUIRED[platform]: | ||
| return None | ||
| cred_manager = create_credential_manager(PRODUCT_NAME, EnvGetter()) | ||
| connect_config = cred_manager.get_credentials(platform) | ||
| return DatabaseManager(platform, connect_config) | ||
|
|
||
| def _execute(self, platform: str, config_path: str, extractor=None): | ||
| try: | ||
| config_full_path = self._locate_config(config_path) | ||
| config = Profiler.path_modifier(config_full_path) | ||
|
||
|
|
||
| if extractor is None: | ||
| extractor = self._setup_extractor(platform) | ||
|
|
||
| results = PipelineClass(config, extractor).execute() | ||
|
|
||
| for result in results: | ||
| logger.info(f"Step: {result.step_name}, Status: {result.status}, Error: {result.error_message}") | ||
|
|
||
| except FileNotFoundError as e: | ||
| logging.error(f"Configuration file not found for source {platform}: {e}") | ||
| raise FileNotFoundError(f"Configuration file not found for source {platform}: {e}") from e | ||
| except Exception as e: | ||
| logging.error(f"Error executing pipeline for source {platform}: {e}") | ||
sundarshankar89 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
sundarshankar89 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| raise RuntimeError(f"Pipeline execution failed for source {platform} : {e}") from e | ||
|
|
||
| def _locate_config(self, config_path: str) -> Path: | ||
| config_file = PRODUCT_PATH_PREFIX / config_path | ||
| if not config_file.exists(): | ||
| raise FileNotFoundError(f"Configuration file not found: {config_file}") | ||
| return config_file | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| from pathlib import Path | ||
| import yaml | ||
| from databricks.labs.remorph.connections.credential_manager import ( | ||
| CredentialManager, | ||
| LocalSecretProvider, | ||
| EnvSecretProvider, | ||
| DatabricksSecretProvider, | ||
| ) | ||
|
|
||
| from databricks.labs.remorph.connections.env_getter import EnvGetter | ||
| from databricks.labs.remorph.resources.assessments.synapse.common.functions import get_synapse_jdbc_settings | ||
| from sqlalchemy import create_engine | ||
| from sqlalchemy.engine import Engine, Result, URL | ||
| from sqlalchemy.orm import sessionmaker | ||
| from sqlalchemy import text | ||
|
|
||
|
|
||
| def _load_credentials(path: Path) -> dict: | ||
| try: | ||
| with open(path, encoding="utf-8") as f: | ||
| return yaml.safe_load(f) | ||
| except FileNotFoundError as e: | ||
| raise FileNotFoundError(f"Credentials file not found at {path}") from e | ||
|
|
||
|
|
||
| def create_credential_manager(file_path: Path): | ||
| env_getter = EnvGetter() | ||
|
|
||
| secret_providers = { | ||
| 'local': LocalSecretProvider(), | ||
| 'env': EnvSecretProvider(env_getter), | ||
| 'databricks': DatabricksSecretProvider(), | ||
| } | ||
|
|
||
| loader = _load_credentials(file_path) | ||
| return CredentialManager(loader, secret_providers) | ||
|
|
||
|
|
||
| def get_sqlpool_reader(config: dict, db_name: str, endpoint_key='dedicated_sql_endpoint'): | ||
| """ | ||
| :param config: | ||
| :param db_name: | ||
| :return: returns a sqlachemy reader for the given dedicated SQL Pool database | ||
| """ | ||
|
|
||
| query_params = { | ||
| "driver": config['driver'], | ||
| "loginTimeout": "30", | ||
| } | ||
|
|
||
| connection_string = URL.create( | ||
| "mssql+pyodbc", | ||
| username=config['sql_user'], | ||
| password=config['sql_password'], | ||
| host=config[endpoint_key], | ||
| port=config.get('port', 1433), | ||
| database=db_name, | ||
| query=query_params, | ||
| ) | ||
| engine = create_engine(connection_string) | ||
| session = sessionmaker(bind=engine) | ||
| connection = session() | ||
|
|
||
| return connection |
Uh oh!
There was an error while loading. Please reload this page.