Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
Client to interact with databricks apis
"""
import base64
import json
import traceback
from datetime import timedelta
Expand Down Expand Up @@ -372,3 +373,95 @@ def cache_lineage(self):
continue
self._job_column_lineage_executed = True
logger.debug("Table and column lineage caching completed.")

def get_pipeline_details(self, pipeline_id: str) -> Optional[dict]:
"""
Get DLT pipeline configuration including libraries and notebooks
"""
try:
url = f"{self.base_url}/pipelines/{pipeline_id}"
response = self.client.get(
url,
headers=self.headers,
timeout=self.api_timeout,
)
if response.status_code == 200:
return response.json()
logger.warning(
f"Failed to get pipeline details for {pipeline_id}: {response.status_code}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error getting pipeline details for {pipeline_id}: {exc}")
return None

def list_pipelines(self) -> Iterable[dict]:
"""
List all DLT (Delta Live Tables) pipelines in the workspace
Uses the Pipelines API (/api/2.0/pipelines)
"""
try:
url = f"{self.base_url}/pipelines"
params = {"max_results": PAGE_SIZE}

response = self.client.get(
url,
params=params,
headers=self.headers,
timeout=self.api_timeout,
)

if response.status_code == 200:
data = response.json()
pipelines = data.get("statuses", [])
logger.info(f"Found {len(pipelines)} DLT pipelines")
yield from pipelines

# Handle pagination if there's a next_page_token
while data.get("next_page_token"):
params["page_token"] = data["next_page_token"]
response = self.client.get(
url,
params=params,
headers=self.headers,
timeout=self.api_timeout,
)
if response.status_code == 200:
data = response.json()
yield from data.get("statuses", [])
else:
break
else:
logger.warning(
f"Failed to list pipelines: {response.status_code} - {response.text}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error listing DLT pipelines: {exc}")

def export_notebook_source(self, notebook_path: str) -> Optional[str]:
"""
Export notebook source code from Databricks workspace
"""
try:
url = f"{self.base_url}/workspace/export"
params = {"path": notebook_path, "format": "SOURCE"}

response = self.client.get(
url,
params=params,
headers=self.headers,
timeout=self.api_timeout,
)

if response.status_code == 200:
content = response.json().get("content")
if content:
return base64.b64decode(content).decode("utf-8")
logger.warning(
f"Failed to export notebook {notebook_path}: {response.status_code}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error exporting notebook {notebook_path}: {exc}")
return None
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.builders import (
create_generic_db_connection,
get_connection_args_common,
init_empty_connection_arguments,
)
from metadata.ingestion.connections.builders import init_empty_connection_arguments
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.client import DatabricksClient
Expand All @@ -50,12 +46,9 @@ def get_connection(connection: DatabricksPipelineConnection) -> DatabricksClient
connection.connectionArguments = init_empty_connection_arguments()
connection.connectionArguments.root["http_path"] = connection.httpPath

engine = create_generic_db_connection(
connection=connection,
get_connection_url_fn=get_connection_url,
get_connection_args_fn=get_connection_args_common,
)
return DatabricksClient(connection, engine)
# For pipeline source, we only need REST APIs, not SQL engine
# Pass None as engine to avoid databricks-sqlalchemy dependency
return DatabricksClient(connection, None)


def test_connection(
Expand Down
Loading
Loading