Skip to content

Commit 7e93fc0

Browse files
feat: add classic cluster option for Switch job deployment (#2222)
## Changes ### What does this PR do? Add support for classic job cluster via environment variable `LAKEBRIDGE_CLUSTER_TYPE=CLASSIC` for Switch jobs. This provides a workaround for workspaces where serverless is not available. ### Relevant implementation details - Added `switch_use_serverless` flag to `LakebridgeConfiguration` (not persisted, used only during install) - Environment variable `LAKEBRIDGE_CLUSTER_TYPE=CLASSIC` triggers classic cluster mode - `SwitchDeployment.install()` now accepts `use_serverless` parameter - When `use_serverless=False`, creates a job cluster with: - LTS Spark version - Single worker node (16GB+ memory) - `USER_ISOLATION` data security mode ### Usage ```bash # Default: Serverless (no change needed) databricks labs lakebridge install-transpile --include-llm-transpiler true # Classic cluster LAKEBRIDGE_CLUSTER_TYPE=CLASSIC databricks labs lakebridge install-transpile --include-llm-transpiler true ``` ### Caveats/things to watch out for when reviewing: - Default behavior remains serverless (backwards compatible) - Classic cluster sizing (16GB+ memory, 1 worker) is fixed for now ### Linked issues Resolves #2219 ### Functionality - [ ] added relevant user documentation - [ ] added new CLI command - [x] modified existing command: `databricks labs lakebridge install-transpile` ### Tests - [x] manually tested - [x] added unit tests - [ ] added integration tests
1 parent f665611 commit 7e93fc0

File tree

8 files changed

+172
-12
lines changed

8 files changed

+172
-12
lines changed

src/databricks/labs/lakebridge/cli.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,16 +761,23 @@ def install_transpile(
761761
ctx.add_user_agent_extra("cmd", "install-transpile")
762762
if artifact:
763763
ctx.add_user_agent_extra("artifact-overload", Path(artifact).name)
764+
# Internal: use LAKEBRIDGE_CLUSTER_TYPE=CLASSIC env var to use classic job cluster
765+
switch_use_serverless = os.environ.get("LAKEBRIDGE_CLUSTER_TYPE", "").upper() != "CLASSIC"
764766
if include_llm_transpiler:
765767
ctx.add_user_agent_extra("include-llm-transpiler", "true")
766768
# Decision was made not to prompt when include_llm_transpiler is set, and we expect users to use llm-transpile
767769
# and pass all the arguments.
768770
logger.info("Including LLM transpiler as part of install, interactive mode disabled: will skip questionnaire.")
769771
is_interactive = False
772+
770773
user = w.current_user
771774
logger.debug(f"User: {user}")
772775
transpile_installer = installer(
773-
w, transpiler_repository, is_interactive=is_interactive, include_llm=include_llm_transpiler
776+
w,
777+
transpiler_repository,
778+
is_interactive=is_interactive,
779+
include_llm=include_llm_transpiler,
780+
switch_use_serverless=switch_use_serverless,
774781
)
775782
transpile_installer.run(module="transpile", artifact=artifact)
776783

src/databricks/labs/lakebridge/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,3 +276,5 @@ class LakebridgeConfiguration:
276276
reconcile: ReconcileConfig | None
277277
# Temporary flag, indicating whether to include the LLM-based Switch transpiler.
278278
include_switch: bool = False
279+
# Internal: Use serverless compute for Switch job. Set via LAKEBRIDGE_CLUSTER_TYPE env var.
280+
switch_use_serverless: bool = True

src/databricks/labs/lakebridge/deployment/installation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def install(self, config: LakebridgeConfiguration):
9797
self._recon_deployment.install(config.reconcile, wheel_path)
9898
if config.include_switch:
9999
logger.info("Installing Switch transpiler to workspace.")
100-
self._switch_deployment.install()
100+
self._switch_deployment.install(use_serverless=config.switch_use_serverless)
101101

102102
def uninstall(self, config: LakebridgeConfiguration):
103103
# This will remove all the Lakebridge modules

src/databricks/labs/lakebridge/deployment/switch.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
from databricks.labs.blueprint.paths import WorkspacePath
1313
from databricks.sdk import WorkspaceClient
1414
from databricks.sdk.errors import InvalidParameterValue, NotFound
15-
from databricks.sdk.service.jobs import JobParameterDefinition, JobSettings, NotebookTask, Source, Task
15+
from databricks.sdk.service import compute
16+
from databricks.sdk.service.jobs import JobCluster, JobParameterDefinition, JobSettings, NotebookTask, Source, Task
1617

1718

1819
logger = logging.getLogger(__name__)
@@ -32,12 +33,12 @@ def __init__(
3233
self._installation = installation
3334
self._install_state = install_state
3435

35-
def install(self) -> None:
36+
def install(self, use_serverless: bool = True) -> None:
3637
"""Deploy Switch to workspace and configure resources."""
3738
logger.debug("Deploying Switch resources to workspace...")
3839
try:
3940
self._deploy_resources_to_workspace()
40-
self._setup_job()
41+
self._setup_job(use_serverless)
4142
logger.debug("Switch deployment completed")
4243
except (RuntimeError, ValueError, InvalidParameterValue) as e:
4344
msg = f"Failed to setup required resources for Switch llm transpiler: {e}"
@@ -108,11 +109,11 @@ def _enumerate_resources(
108109

109110
yield from _enumerate_resources(root)
110111

111-
def _setup_job(self) -> None:
112+
def _setup_job(self, use_serverless: bool = True) -> None:
112113
"""Create or update Switch job."""
113114
existing_job_id = self._get_existing_job_id()
114115
logger.debug("Setting up Switch job in workspace...")
115-
job_id = self._create_or_update_switch_job(existing_job_id)
116+
job_id = self._create_or_update_switch_job(existing_job_id, use_serverless)
116117
self._install_state.jobs[self._INSTALL_STATE_KEY] = job_id
117118
self._install_state.save()
118119
job_url = f"{self._ws.config.host}/jobs/{job_id}"
@@ -129,9 +130,9 @@ def _get_existing_job_id(self) -> str | None:
129130
except (InvalidParameterValue, NotFound, ValueError):
130131
return None
131132

132-
def _create_or_update_switch_job(self, job_id: str | None) -> str:
133+
def _create_or_update_switch_job(self, job_id: str | None, use_serverless: bool = True) -> str:
133134
"""Create or update Switch job, returning job ID."""
134-
job_settings = self._get_switch_job_settings()
135+
job_settings = self._get_switch_job_settings(use_serverless)
135136

136137
# Try to update existing job
137138
if job_id:
@@ -149,7 +150,7 @@ def _create_or_update_switch_job(self, job_id: str | None) -> str:
149150
assert new_job_id is not None
150151
return new_job_id
151152

152-
def _get_switch_job_settings(self) -> dict[str, Any]:
153+
def _get_switch_job_settings(self, use_serverless: bool = True) -> dict[str, Any]:
153154
"""Build job settings for Switch transpiler."""
154155
job_name = "Lakebridge_Switch"
155156
notebook_path = self._get_switch_workspace_path() / "notebooks" / "00_main"
@@ -160,14 +161,31 @@ def _get_switch_job_settings(self) -> dict[str, Any]:
160161
disable_auto_optimization=True, # To disable retries on failure
161162
)
162163

163-
return {
164+
settings: dict[str, Any] = {
164165
"name": job_name,
165166
"tags": {"created_by": self._ws.current_user.me().user_name, "switch_version": f"v{switch_version}"},
166167
"tasks": [task],
167168
"parameters": self._generate_switch_job_parameters(),
168169
"max_concurrent_runs": 100, # Allow simultaneous transpilations
169170
}
170171

172+
if not use_serverless:
173+
job_cluster_key = "Switch_Cluster"
174+
settings["job_clusters"] = [
175+
JobCluster(
176+
job_cluster_key=job_cluster_key,
177+
new_cluster=compute.ClusterSpec(
178+
spark_version=self._ws.clusters.select_spark_version(latest=True, long_term_support=True),
179+
node_type_id=self._ws.clusters.select_node_type(local_disk=True, min_memory_gb=16),
180+
num_workers=1,
181+
data_security_mode=compute.DataSecurityMode.USER_ISOLATION,
182+
),
183+
)
184+
]
185+
task.job_cluster_key = job_cluster_key
186+
187+
return settings
188+
171189
@staticmethod
172190
def _generate_switch_job_parameters() -> Sequence[JobParameterDefinition]:
173191
# Add required runtime parameters, static for now.

src/databricks/labs/lakebridge/install.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def __init__( # pylint: disable=too-many-arguments
5757
*,
5858
is_interactive: bool = True,
5959
include_llm: bool = False,
60+
switch_use_serverless: bool = True,
6061
transpiler_repository: TranspilerRepository = TranspilerRepository.user_home(),
6162
transpiler_installers: Sequence[Callable[[TranspilerRepository], TranspilerInstaller]] = (
6263
BladebridgeInstaller,
@@ -73,6 +74,7 @@ def __init__( # pylint: disable=too-many-arguments
7374
# TODO: Refactor the 'prompts' property in preference to using this flag, which should be redundant.
7475
self._is_interactive = is_interactive
7576
self._include_llm = include_llm
77+
self._switch_use_serverless = switch_use_serverless
7678
self._transpiler_repository = transpiler_repository
7779
self._transpiler_installer_factories = transpiler_installers
7880

@@ -142,7 +144,10 @@ def configure(self, module: str) -> LakebridgeConfiguration:
142144
case "transpile":
143145
logger.info("Configuring lakebridge `transpile`.")
144146
return LakebridgeConfiguration(
145-
self._configure_transpile(), reconcile=None, include_switch=self._include_llm
147+
self._configure_transpile(),
148+
reconcile=None,
149+
include_switch=self._include_llm,
150+
switch_use_serverless=self._switch_use_serverless,
146151
)
147152
case "reconcile":
148153
logger.info("Configuring lakebridge `reconcile`.")
@@ -153,6 +158,7 @@ def configure(self, module: str) -> LakebridgeConfiguration:
153158
self._configure_transpile(),
154159
self._configure_reconcile(),
155160
include_switch=self._include_llm,
161+
switch_use_serverless=self._switch_use_serverless,
156162
)
157163
case _:
158164
raise ValueError(f"Invalid input: {module}")
@@ -410,6 +416,7 @@ def installer(
410416
*,
411417
is_interactive: bool,
412418
include_llm: bool = False,
419+
switch_use_serverless: bool = True,
413420
) -> WorkspaceInstaller:
414421
app_context = ApplicationContext(_verify_workspace_client(ws))
415422
return WorkspaceInstaller(
@@ -423,6 +430,7 @@ def installer(
423430
transpiler_repository=transpiler_repository,
424431
is_interactive=is_interactive,
425432
include_llm=include_llm,
433+
switch_use_serverless=switch_use_serverless,
426434
)
427435

428436

tests/unit/deployment/test_switch.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from databricks.labs.lakebridge.deployment.switch import SwitchDeployment
1111
from databricks.sdk import WorkspaceClient, JobsExt
1212
from databricks.sdk.errors import NotFound, InvalidParameterValue
13+
from databricks.sdk.service import compute
1314
from databricks.sdk.service.jobs import CreateResponse
1415
from databricks.sdk.service.iam import User
1516

@@ -319,3 +320,62 @@ def test_install_creates_new_job_on_update_failure(
319320
assert install_state.jobs["Switch"] == str(expected_job_id)
320321
mock_workspace_client.jobs.reset.assert_called_once()
321322
mock_workspace_client.jobs.create.assert_called_once()
323+
324+
325+
# Tests for serverless vs classic cluster selection
326+
327+
328+
def test_install_with_serverless_creates_job_without_cluster_key(
329+
switch_deployment: SwitchDeployment, mock_workspace_client: Any
330+
) -> None:
331+
"""Test serverless mode does not include job_clusters in settings."""
332+
new_job = CreateResponse(job_id=123)
333+
mock_workspace_client.jobs.create.return_value = new_job
334+
335+
switch_deployment.install(use_serverless=True)
336+
337+
call_kwargs = mock_workspace_client.jobs.create.call_args.kwargs
338+
assert "job_clusters" not in call_kwargs
339+
assert call_kwargs["tasks"][0].job_cluster_key is None
340+
341+
342+
def test_install_with_classic_cluster_creates_job_with_cluster_key(
343+
switch_deployment: SwitchDeployment, mock_workspace_client: Any
344+
) -> None:
345+
"""Test classic cluster mode includes job_clusters and job_cluster_key."""
346+
new_job = CreateResponse(job_id=123)
347+
mock_workspace_client.jobs.create.return_value = new_job
348+
mock_workspace_client.clusters.select_spark_version.return_value = "15.4.x-scala2.12"
349+
mock_workspace_client.clusters.select_node_type.return_value = "m5.xlarge"
350+
351+
switch_deployment.install(use_serverless=False)
352+
353+
call_kwargs = mock_workspace_client.jobs.create.call_args.kwargs
354+
assert "job_clusters" in call_kwargs
355+
assert len(call_kwargs["job_clusters"]) == 1
356+
assert call_kwargs["job_clusters"][0].job_cluster_key == "Switch_Cluster"
357+
assert call_kwargs["tasks"][0].job_cluster_key == "Switch_Cluster"
358+
359+
360+
def test_install_with_classic_cluster_configures_correct_cluster_spec(
361+
switch_deployment: SwitchDeployment, mock_workspace_client: Any
362+
) -> None:
363+
"""Test classic cluster mode configures ClusterSpec correctly."""
364+
new_job = CreateResponse(job_id=123)
365+
mock_workspace_client.jobs.create.return_value = new_job
366+
mock_workspace_client.clusters.select_spark_version.return_value = "15.4.x-scala2.12"
367+
mock_workspace_client.clusters.select_node_type.return_value = "m5.xlarge"
368+
369+
switch_deployment.install(use_serverless=False)
370+
371+
call_kwargs = mock_workspace_client.jobs.create.call_args.kwargs
372+
cluster_spec = call_kwargs["job_clusters"][0].new_cluster
373+
374+
assert cluster_spec.spark_version == "15.4.x-scala2.12"
375+
assert cluster_spec.node_type_id == "m5.xlarge"
376+
assert cluster_spec.num_workers == 1
377+
assert cluster_spec.data_security_mode == compute.DataSecurityMode.USER_ISOLATION
378+
379+
# Verify cluster selection methods were called with correct parameters
380+
mock_workspace_client.clusters.select_spark_version.assert_called_once_with(latest=True, long_term_support=True)
381+
mock_workspace_client.clusters.select_node_type.assert_called_once_with(local_disk=True, min_memory_gb=16)

tests/unit/test_cli_other.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import io
2+
import os
23
from unittest.mock import Mock, patch, MagicMock, create_autospec, PropertyMock
34

45
import pytest
@@ -102,3 +103,32 @@ def test_prompts_question():
102103
prompts = MockPrompts({"Some question": "something"})
103104
response = option.prompt_for_value(prompts)
104105
assert response == "something"
106+
107+
108+
@pytest.mark.parametrize(
109+
("env_value", "expected_serverless"),
110+
(
111+
(None, True), # Default: serverless (no env var)
112+
("", True), # Empty string: serverless
113+
("CLASSIC", False), # CLASSIC: use classic cluster
114+
("classic", False), # lowercase: use classic cluster
115+
("Classic", False), # Mixed case: use classic cluster
116+
("SERVERLESS", True), # Any other value: serverless
117+
("other", True), # Any other value: serverless
118+
),
119+
)
120+
def test_install_transpile_cluster_type_env_var(
121+
env_value: str | None,
122+
expected_serverless: bool,
123+
monkeypatch: pytest.MonkeyPatch,
124+
) -> None:
125+
"""Test LAKEBRIDGE_CLUSTER_TYPE env var determines serverless vs classic cluster."""
126+
if env_value is None:
127+
monkeypatch.delenv("LAKEBRIDGE_CLUSTER_TYPE", raising=False)
128+
else:
129+
monkeypatch.setenv("LAKEBRIDGE_CLUSTER_TYPE", env_value)
130+
131+
# The logic from cli.py install_transpile:
132+
# switch_use_serverless = os.environ.get("LAKEBRIDGE_CLUSTER_TYPE", "").upper() != "CLASSIC"
133+
actual = os.environ.get("LAKEBRIDGE_CLUSTER_TYPE", "").upper() != "CLASSIC"
134+
assert actual == expected_serverless

tests/unit/test_install.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,3 +1726,38 @@ def test_transpiler_installers_llm_flag(
17261726
)
17271727
assert installer.configure("transpile").include_switch == should_include_switch
17281728
assert installer.configure("all").include_switch == should_include_switch
1729+
1730+
1731+
@pytest.mark.parametrize(
1732+
("switch_use_serverless", "expected_serverless"),
1733+
(
1734+
(True, True), # Default: use serverless
1735+
(False, False), # Flag disabled: use classic cluster
1736+
),
1737+
)
1738+
def test_configure_switch_use_serverless_flag(
1739+
ws_installer: Callable[..., WorkspaceInstaller],
1740+
ws: WorkspaceClient,
1741+
switch_use_serverless: bool,
1742+
expected_serverless: bool,
1743+
) -> None:
1744+
"""Test switch_use_serverless configuration flag is passed through correctly."""
1745+
ctx = ApplicationContext(ws).replace(
1746+
product_info=ProductInfo.for_testing(LakebridgeConfiguration),
1747+
prompts=MockPrompts({}),
1748+
installation=MockInstallation({}),
1749+
)
1750+
installer = ws_installer(
1751+
ctx.workspace_client,
1752+
ctx.prompts,
1753+
ctx.installation,
1754+
ctx.install_state,
1755+
ctx.product_info,
1756+
ctx.resource_configurator,
1757+
ctx.workspace_installation,
1758+
is_interactive=False,
1759+
include_llm=True,
1760+
switch_use_serverless=switch_use_serverless,
1761+
)
1762+
config = installer.configure("transpile")
1763+
assert config.switch_use_serverless == expected_serverless

0 commit comments

Comments
 (0)