diff --git a/keras_remote/backend/gke_client.py b/keras_remote/backend/gke_client.py index f8400ef..a9d8def 100644 --- a/keras_remote/backend/gke_client.py +++ b/keras_remote/backend/gke_client.py @@ -7,6 +7,7 @@ from kubernetes import client, config from kubernetes.client.rest import ApiException +from keras_remote.backend.log_streaming import LogStreamer from keras_remote.core import accelerators from keras_remote.core.accelerators import TpuConfig @@ -108,37 +109,48 @@ def wait_for_job(job, namespace="default", timeout=3600, poll_interval=10): start_time = time.time() logged_running = False - while True: - # Check timeout - elapsed = time.time() - start_time - if elapsed > timeout: - raise RuntimeError(f"GKE job {job_name} timed out after {timeout}s") - - # Get job status - try: - job_status = batch_v1.read_namespaced_job_status(job_name, namespace) - except ApiException as e: - raise RuntimeError(f"Failed to read job status: {e.reason}") from e - - # Check completion conditions - if job_status.status.succeeded and job_status.status.succeeded >= 1: - logging.info("Job %s completed successfully", job_name) - return "success" - - if job_status.status.failed and job_status.status.failed >= 1: - # Get pod logs for debugging - _print_pod_logs(core_v1, job_name, namespace) - raise RuntimeError(f"GKE job {job_name} failed") - - # Check for pod scheduling issues - _check_pod_scheduling(core_v1, job_name, namespace) - - # Job still running - if not logged_running: - logging.info("Job %s running...", job_name) - logged_running = True - - time.sleep(poll_interval) + with LogStreamer(core_v1, namespace) as streamer: + while True: + # Check timeout + elapsed = time.time() - start_time + if elapsed > timeout: + raise RuntimeError(f"GKE job {job_name} timed out after {timeout}s") + + # Get job status + try: + job_status = batch_v1.read_namespaced_job_status(job_name, namespace) + except ApiException as e: + raise RuntimeError(f"Failed to read job status: {e.reason}") from e + + # Check completion conditions + if job_status.status.succeeded and job_status.status.succeeded >= 1: + logging.info("Job %s completed successfully", job_name) + return "success" + + if job_status.status.failed and job_status.status.failed >= 1: + # Get pod logs for debugging + _print_pod_logs(core_v1, job_name, namespace) + raise RuntimeError(f"GKE job {job_name} failed") + + # Check for pod scheduling issues + _check_pod_scheduling(core_v1, job_name, namespace) + + # Start log streaming when pod is running + with suppress(ApiException): + pods = core_v1.list_namespaced_pod( + namespace, label_selector=f"job-name={job_name}" + ) + for pod in pods.items: + if pod.status.phase == "Running": + streamer.start(pod.metadata.name) + break + + # Job still running + if not logged_running: + logging.info("Job %s running...", job_name) + logged_running = True + + time.sleep(poll_interval) def cleanup_job(job_name, namespace="default"): diff --git a/keras_remote/backend/gke_client_test.py b/keras_remote/backend/gke_client_test.py index 116340c..c030523 100644 --- a/keras_remote/backend/gke_client_test.py +++ b/keras_remote/backend/gke_client_test.py @@ -167,6 +167,16 @@ def setUp(self): mock.patch("keras_remote.backend.gke_client._load_kube_config") ) + self.mock_streamer = MagicMock() + self.enterContext( + mock.patch( + "keras_remote.backend.gke_client.LogStreamer", + return_value=self.mock_streamer, + ) + ) + self.mock_streamer.__enter__ = MagicMock(return_value=self.mock_streamer) + self.mock_streamer.__exit__ = MagicMock(return_value=False) + def _make_mock_job(self): job = MagicMock() job.metadata.name = "keras-remote-job-abc" @@ -194,6 +204,7 @@ def test_first_poll_success(self): ): result = wait_for_job(self._make_mock_job()) self.assertEqual(result, "success") + self.mock_streamer.start.assert_not_called() def test_first_poll_failure(self): mock_batch = MagicMock() @@ -264,6 +275,72 @@ def test_polls_until_success(self): self.assertEqual(result, "success") mock_sleep.assert_called_with(5) + def test_starts_streaming_when_pod_running(self): + mock_batch = MagicMock() + running = MagicMock() + running.status.succeeded = None + running.status.failed = None + succeeded = MagicMock() + succeeded.status.succeeded = 1 + succeeded.status.failed = None + mock_batch.read_namespaced_job_status.side_effect = [running, succeeded] + + running_pod = MagicMock() + running_pod.status.phase = "Running" + running_pod.metadata.name = "keras-remote-job-abc-pod" + + mock_core = MagicMock() + mock_core.list_namespaced_pod.return_value.items = [running_pod] + + with ( + mock.patch( + "keras_remote.backend.gke_client.client.BatchV1Api", + return_value=mock_batch, + ), + mock.patch( + "keras_remote.backend.gke_client.client.CoreV1Api", + return_value=mock_core, + ), + mock.patch("keras_remote.backend.gke_client.time.sleep"), + ): + result = wait_for_job(self._make_mock_job()) + + self.assertEqual(result, "success") + self.mock_streamer.start.assert_called_once_with("keras-remote-job-abc-pod") + + def test_no_streaming_when_pod_pending(self): + mock_batch = MagicMock() + running = MagicMock() + running.status.succeeded = None + running.status.failed = None + succeeded = MagicMock() + succeeded.status.succeeded = 1 + succeeded.status.failed = None + mock_batch.read_namespaced_job_status.side_effect = [running, succeeded] + + pending_pod = MagicMock() + pending_pod.status.phase = "Pending" + pending_pod.status.conditions = None + + mock_core = MagicMock() + mock_core.list_namespaced_pod.return_value.items = [pending_pod] + + with ( + mock.patch( + "keras_remote.backend.gke_client.client.BatchV1Api", + return_value=mock_batch, + ), + mock.patch( + "keras_remote.backend.gke_client.client.CoreV1Api", + return_value=mock_core, + ), + mock.patch("keras_remote.backend.gke_client.time.sleep"), + ): + result = wait_for_job(self._make_mock_job()) + + self.assertEqual(result, "success") + self.mock_streamer.start.assert_not_called() + class TestLoadKubeConfig(absltest.TestCase): def test_kubeconfig_fallback(self): diff --git a/keras_remote/backend/log_streaming.py b/keras_remote/backend/log_streaming.py new file mode 100644 index 0000000..f5c656f --- /dev/null +++ b/keras_remote/backend/log_streaming.py @@ -0,0 +1,137 @@ +"""Live log streaming from Kubernetes pods. + +Provides utilities to stream pod logs to stdout in real-time using a +background daemon thread. Used by both GKE and Pathways backends during +job execution. +""" + +import sys +import threading +from collections import deque + +import urllib3 +from absl import logging +from kubernetes.client.rest import ApiException +from rich.console import Console +from rich.live import Live +from rich.panel import Panel + +_MAX_DISPLAY_LINES = 25 + + +def _stream_pod_logs(core_v1, pod_name, namespace): + """Stream pod logs to stdout. Designed to run in a daemon thread. + + Uses the Kubernetes follow API to tail logs in real-time. The stream + ends naturally when the container exits. + + In interactive terminals, logs are displayed in a Rich Live panel. + In non-interactive contexts (piped output, CI), logs are streamed + as raw lines with Rich Rule delimiters. + + Args: + core_v1: Kubernetes CoreV1Api client. + pod_name: Name of the pod to stream logs from. + namespace: Kubernetes namespace. + """ + console = Console() + resp = None + try: + resp = core_v1.read_namespaced_pod_log( + name=pod_name, + namespace=namespace, + follow=True, + _preload_content=False, + ) + if console.is_terminal: + _render_live_panel(resp, pod_name, console) + else: + _render_plain(resp, pod_name, console) + except ApiException: + pass # Pod deleted or not found + except urllib3.exceptions.ProtocolError: + pass # Connection broken mid-stream (pod terminated) + except Exception: + logging.warning( + "Log streaming from %s failed unexpectedly", pod_name, exc_info=True + ) + finally: + if resp is not None: + resp.release_conn() + + +def _render_live_panel(resp, pod_name, console): + """Render streaming logs inside a Rich Live panel.""" + lines = deque(maxlen=_MAX_DISPLAY_LINES) + title = f"Remote logs \u2022 {pod_name}" + buffer = "" + + with Live( + _make_log_panel(lines, title), + console=console, + refresh_per_second=4, + ) as live: + for chunk in resp.stream(decode_content=True): + buffer += chunk.decode("utf-8", errors="replace") + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + lines.append(line) + live.update(_make_log_panel(lines, title)) + + # Flush remaining partial line + if buffer.strip(): + lines.append(buffer) + live.update(_make_log_panel(lines, title)) + + +def _render_plain(resp, pod_name, console): + """Render streaming logs as raw lines with Rule delimiters.""" + console.rule(f"Remote logs ({pod_name})", style="blue") + for chunk in resp.stream(decode_content=True): + sys.stdout.write(chunk.decode("utf-8", errors="replace")) + sys.stdout.flush() + console.rule("End remote logs", style="blue") + + +def _make_log_panel(lines, title): + """Build a Panel renderable from accumulated log lines.""" + content = "\n".join(lines) if lines else "Waiting for output..." + return Panel(content, title=title, border_style="blue") + + +class LogStreamer: + """Context manager that owns the log-streaming thread lifecycle. + + Usage:: + + with LogStreamer(core_v1, namespace) as streamer: + while polling: + ... + if pod_is_running: + streamer.start(pod_name) # idempotent + """ + + def __init__(self, core_v1, namespace): + self._core_v1 = core_v1 + self._namespace = namespace + self._thread = None + + def __enter__(self): + return self + + def __exit__(self, *exc): + if self._thread is not None: + self._thread.join(timeout=5) + return False + + def start(self, pod_name): + """Start streaming if not already active (idempotent).""" + if self._thread is not None: + return + logging.info("Streaming logs from %s...", pod_name) + self._thread = threading.Thread( + target=_stream_pod_logs, + args=(self._core_v1, pod_name, self._namespace), + daemon=True, + ) + self._thread.start() diff --git a/keras_remote/backend/log_streaming_test.py b/keras_remote/backend/log_streaming_test.py new file mode 100644 index 0000000..d07cf71 --- /dev/null +++ b/keras_remote/backend/log_streaming_test.py @@ -0,0 +1,228 @@ +"""Tests for keras_remote.backend.log_streaming — live pod log streaming.""" + +import io +from unittest import mock +from unittest.mock import MagicMock + +from absl.testing import absltest +from kubernetes.client.rest import ApiException + +from keras_remote.backend.log_streaming import ( + LogStreamer, + _render_live_panel, + _render_plain, + _stream_pod_logs, +) + + +class TestStreamPodLogs(absltest.TestCase): + """Tests for the top-level _stream_pod_logs orchestrator.""" + + def _make_mock_resp(self, chunks): + mock_resp = MagicMock() + mock_resp.stream.return_value = chunks + return mock_resp + + def test_calls_log_api_correctly(self): + mock_core = MagicMock() + mock_core.read_namespaced_pod_log.return_value = self._make_mock_resp([]) + + with mock.patch( + "keras_remote.backend.log_streaming.Console" + ) as mock_console_cls: + mock_console_cls.return_value.is_terminal = False + _stream_pod_logs(mock_core, "pod-1", "default") + + mock_core.read_namespaced_pod_log.assert_called_once_with( + name="pod-1", + namespace="default", + follow=True, + _preload_content=False, + ) + + def test_routes_by_terminal(self): + for is_terminal in (True, False): + mock_core = MagicMock() + mock_core.read_namespaced_pod_log.return_value = self._make_mock_resp( + [b"hello\n"] + ) + + with ( + mock.patch( + "keras_remote.backend.log_streaming.Console" + ) as mock_console_cls, + mock.patch( + "keras_remote.backend.log_streaming._render_live_panel" + ) as mock_live, + mock.patch( + "keras_remote.backend.log_streaming._render_plain" + ) as mock_plain, + ): + mock_console_cls.return_value.is_terminal = is_terminal + _stream_pod_logs(mock_core, "pod-1", "default") + + if is_terminal: + mock_live.assert_called_once() + mock_plain.assert_not_called() + else: + mock_plain.assert_called_once() + mock_live.assert_not_called() + + def test_releases_conn_on_api_exception(self): + mock_core = MagicMock() + mock_resp = MagicMock() + mock_resp.stream.side_effect = ApiException(status=404, reason="Not Found") + mock_core.read_namespaced_pod_log.return_value = mock_resp + + with mock.patch( + "keras_remote.backend.log_streaming.Console" + ) as mock_console_cls: + mock_console_cls.return_value.is_terminal = False + _stream_pod_logs(mock_core, "pod-1", "default") + + mock_resp.release_conn.assert_called_once() + + def test_handles_error_before_response(self): + mock_core = MagicMock() + mock_core.read_namespaced_pod_log.side_effect = ApiException( + status=404, reason="Not Found" + ) + + # Should not raise even when resp is None + _stream_pod_logs(mock_core, "pod-1", "default") + + def test_logs_warning_on_unexpected_error(self): + mock_core = MagicMock() + mock_resp = MagicMock() + mock_resp.stream.side_effect = ValueError("something unexpected") + mock_core.read_namespaced_pod_log.return_value = mock_resp + + with ( + mock.patch( + "keras_remote.backend.log_streaming.Console" + ) as mock_console_cls, + mock.patch("keras_remote.backend.log_streaming.logging") as mock_log, + ): + mock_console_cls.return_value.is_terminal = False + _stream_pod_logs(mock_core, "pod-1", "default") + + mock_log.warning.assert_called_once() + self.assertIn("pod-1", mock_log.warning.call_args[0][1]) + mock_resp.release_conn.assert_called_once() + + +class TestRenderPlain(absltest.TestCase): + """Tests for the non-terminal plain rendering path.""" + + def test_streams_chunks_to_stdout(self): + mock_resp = MagicMock() + mock_resp.stream.return_value = [b"line 1\n", b"line 2\n"] + console = MagicMock() + + with mock.patch("sys.stdout", new_callable=io.StringIO) as mock_stdout: + _render_plain(mock_resp, "pod-1", console) + + self.assertIn("line 1", mock_stdout.getvalue()) + self.assertIn("line 2", mock_stdout.getvalue()) + + def test_prints_rule_delimiters(self): + mock_resp = MagicMock() + mock_resp.stream.return_value = [] + console = MagicMock() + + _render_plain(mock_resp, "pod-1", console) + + self.assertEqual(console.rule.call_count, 2) + # Opening rule contains pod name + self.assertIn("pod-1", console.rule.call_args_list[0][0][0]) + + def test_handles_utf8_decode_errors(self): + mock_resp = MagicMock() + mock_resp.stream.return_value = [b"valid\n", b"\xff\xfe invalid\n"] + console = MagicMock() + + with mock.patch("sys.stdout", new_callable=io.StringIO) as mock_stdout: + _render_plain(mock_resp, "pod-1", console) + + output = mock_stdout.getvalue() + self.assertIn("valid", output) + self.assertIn("invalid", output) + + +class TestRenderLivePanel(absltest.TestCase): + """Tests for the terminal Live panel rendering path.""" + + def test_handles_partial_lines(self): + mock_resp = MagicMock() + # "hello\nwor" then "ld\n" — "world" is split across chunks + mock_resp.stream.return_value = [b"hello\nwor", b"ld\n"] + console = MagicMock() + + with mock.patch("keras_remote.backend.log_streaming.Live") as mock_live_cls: + mock_live = MagicMock() + mock_live_cls.return_value.__enter__ = MagicMock(return_value=mock_live) + mock_live_cls.return_value.__exit__ = MagicMock(return_value=False) + _render_live_panel(mock_resp, "pod-1", console) + + # Check the final panel contains both complete lines + last_panel = mock_live.update.call_args_list[-1][0][0] + panel_content = last_panel.renderable + self.assertIn("hello", panel_content) + self.assertIn("world", panel_content) + + +class TestLogStreamer(absltest.TestCase): + def test_start_launches_daemon_thread(self): + mock_core = MagicMock() + + with ( + mock.patch( + "keras_remote.backend.log_streaming._stream_pod_logs" + ) as mock_stream, + LogStreamer(mock_core, "default") as streamer, + ): + streamer.start("pod-1") + self.assertIsNotNone(streamer._thread) + self.assertTrue(streamer._thread.daemon) + + mock_stream.assert_called_once_with(mock_core, "pod-1", "default") + + def test_start_is_idempotent(self): + mock_core = MagicMock() + + with ( + mock.patch( + "keras_remote.backend.log_streaming._stream_pod_logs" + ) as mock_stream, + LogStreamer(mock_core, "default") as streamer, + ): + streamer.start("pod-1") + streamer.start("pod-1") + streamer.start("pod-2") # different name, still no-op + + mock_stream.assert_called_once_with(mock_core, "pod-1", "default") + + def test_exit_joins_thread(self): + mock_core = MagicMock() + mock_thread = MagicMock() + + with ( + mock.patch( + "keras_remote.backend.log_streaming.threading.Thread", + return_value=mock_thread, + ), + LogStreamer(mock_core, "ns") as streamer, + ): + streamer.start("pod-1") + + mock_thread.join.assert_called_once_with(timeout=5) + + def test_exit_without_start_is_noop(self): + mock_core = MagicMock() + # Should not raise + with LogStreamer(mock_core, "default"): + pass + + +if __name__ == "__main__": + absltest.main() diff --git a/keras_remote/backend/pathways_client.py b/keras_remote/backend/pathways_client.py index f0dfdb8..51aed65 100644 --- a/keras_remote/backend/pathways_client.py +++ b/keras_remote/backend/pathways_client.py @@ -11,6 +11,7 @@ _parse_accelerator, _print_pod_logs, ) +from keras_remote.backend.log_streaming import LogStreamer from keras_remote.core import accelerators from keras_remote.infra import infra @@ -138,68 +139,74 @@ def wait_for_job(job_id, namespace="default", timeout=3600, poll_interval=10): # The leader pod is suffixed with '-0' by LWS leader_pod_name = f"{job_name}-0" - while True: - elapsed = time.time() - start_time - if elapsed > timeout: - raise RuntimeError(f"Pathways job {job_name} timed out after {timeout}s") - - try: - pod = core_v1.read_namespaced_pod(leader_pod_name, namespace) - if not logged_running: - logger.info(f"Found pod: {leader_pod_name}") - logged_running = True - - if pod.status.phase == "Succeeded": - logger.info(f"[REMOTE] Job {job_name} completed successfully") - return "success" - - if pod.status.phase == "Failed": - _print_pod_logs(core_v1, job_name, namespace) - raise RuntimeError(f"Pathways job {job_name} failed") - - elif pod.status.phase == "Pending": - _check_pod_scheduling(core_v1, job_name, namespace) - logger.debug("Pod is Pending...") - - except ApiException as e: - if e.status == 404: - # Pod might not be created yet - pod = None - else: + with LogStreamer(core_v1, namespace) as streamer: + while True: + elapsed = time.time() - start_time + if elapsed > timeout: raise RuntimeError( - f"Failed to read leader pod status: {e.reason}" - ) from e + f"Pathways job {job_name} timed out after {timeout}s" + ) - if pod is not None and pod.status.container_statuses: - container_status = pod.status.container_statuses[0] + try: + pod = core_v1.read_namespaced_pod(leader_pod_name, namespace) + if not logged_running: + logger.info(f"Found pod: {leader_pod_name}") + logged_running = True - # Check current state - if container_status.state.terminated: - if container_status.state.terminated.exit_code == 0: + if pod.status.phase == "Succeeded": logger.info(f"[REMOTE] Job {job_name} completed successfully") return "success" - else: + + if pod.status.phase == "Failed": _print_pod_logs(core_v1, job_name, namespace) - raise RuntimeError( - f"Pathways job {job_name} failed with exit code " - f"{container_status.state.terminated.exit_code}" - ) - - # Check last state (in case it restarted) - if container_status.last_state.terminated: - if container_status.last_state.terminated.exit_code == 0: - logger.info( - f"[REMOTE] Job {job_name} completed successfully (restarted)" - ) - return "success" + raise RuntimeError(f"Pathways job {job_name} failed") + + elif pod.status.phase == "Pending": + _check_pod_scheduling(core_v1, job_name, namespace) + logger.debug("Pod is Pending...") + + elif pod.status.phase == "Running": + streamer.start(leader_pod_name) + + except ApiException as e: + if e.status == 404: + # Pod might not be created yet + pod = None else: - _print_pod_logs(core_v1, job_name, namespace) raise RuntimeError( - f"Pathways job {job_name} failed previously with " - f"exit code {container_status.last_state.terminated.exit_code}" - ) - - time.sleep(poll_interval) + f"Failed to read leader pod status: {e.reason}" + ) from e + + if pod is not None and pod.status.container_statuses: + container_status = pod.status.container_statuses[0] + + # Check current state + if container_status.state.terminated: + if container_status.state.terminated.exit_code == 0: + logger.info(f"[REMOTE] Job {job_name} completed successfully") + return "success" + else: + _print_pod_logs(core_v1, job_name, namespace) + raise RuntimeError( + f"Pathways job {job_name} failed with exit code " + f"{container_status.state.terminated.exit_code}" + ) + + # Check last state (in case it restarted) + if container_status.last_state.terminated: + if container_status.last_state.terminated.exit_code == 0: + logger.info( + f"[REMOTE] Job {job_name} completed successfully (restarted)" + ) + return "success" + else: + _print_pod_logs(core_v1, job_name, namespace) + raise RuntimeError( + f"Pathways job {job_name} failed previously with " + f"exit code {container_status.last_state.terminated.exit_code}" + ) + + time.sleep(poll_interval) def cleanup_job(job_name, namespace="default"): diff --git a/keras_remote/backend/pathways_client_test.py b/keras_remote/backend/pathways_client_test.py index c81a55b..c4f5b06 100644 --- a/keras_remote/backend/pathways_client_test.py +++ b/keras_remote/backend/pathways_client_test.py @@ -233,10 +233,10 @@ def _get_created_body(self): ] def test_multi_node_tpu(self): - # v3-8 → 2 nodes → 1 worker - self._call(accelerator="v3-8") + # v3-16 → 4 nodes → 3 workers + self._call(accelerator="v3-16") body = self._get_created_body() - self.assertEqual(body["spec"]["leaderWorkerTemplate"]["size"], 2) + self.assertEqual(body["spec"]["leaderWorkerTemplate"]["size"], 4) def test_single_node_tpu(self): # v5litepod-4 → 1 node → 0 workers @@ -273,6 +273,13 @@ def setUp(self): mock.patch(f"{_MODULE}.client.CoreV1Api") ).return_value + self.mock_streamer = MagicMock() + self.enterContext( + mock.patch(f"{_MODULE}.LogStreamer", return_value=self.mock_streamer) + ) + self.mock_streamer.__enter__ = MagicMock(return_value=self.mock_streamer) + self.mock_streamer.__exit__ = MagicMock(return_value=False) + def _make_pod(self, phase, container_statuses=None): pod = MagicMock() pod.status.phase = phase @@ -298,6 +305,7 @@ def test_immediate_success_phase(self): ) result = wait_for_job("j1") self.assertEqual(result, "success") + self.mock_streamer.start.assert_not_called() def test_immediate_failure_phase(self): self.mock_core.read_namespaced_pod.return_value = self._make_pod("Failed") @@ -387,6 +395,22 @@ def test_leader_pod_name(self): "keras-pathways-j1-0", "default" ) + def test_starts_streaming_when_pod_running(self): + running = self._make_pod("Running", container_statuses=None) + succeeded = self._make_pod("Succeeded") + self.mock_core.read_namespaced_pod.side_effect = [running, succeeded] + result = wait_for_job("j1") + self.assertEqual(result, "success") + self.mock_streamer.start.assert_called_once_with("keras-pathways-j1-0") + + def test_no_streaming_when_pod_pending(self): + pending = self._make_pod("Pending", container_statuses=None) + succeeded = self._make_pod("Succeeded") + self.mock_core.read_namespaced_pod.side_effect = [pending, succeeded] + result = wait_for_job("j1") + self.assertEqual(result, "success") + self.mock_streamer.start.assert_not_called() + class TestCleanupJob(absltest.TestCase): def setUp(self): diff --git a/pyproject.toml b/pyproject.toml index 926605f..e986d2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "cloudpickle", "numpy", "keras", + "rich>=14.0", "google-cloud-artifact-registry>=1.11", "google-cloud-storage>=3.9", "google-cloud-build>=3.24", @@ -27,7 +28,6 @@ dependencies = [ [project.optional-dependencies] cli = [ "click>=8.1", - "rich>=14.0", "pulumi>=3.0", "pulumi-gcp>=9.0", "pulumi-command>=1.0",