Skip to content

Commit ad6b19f

Browse files
committed
[components] Reenable dg dev deployment test on windows
1 parent c37f6d3 commit ad6b19f

File tree

3 files changed

+341
-110
lines changed

3 files changed

+341
-110
lines changed

Diff for: python_modules/libraries/dagster-dg/dagster_dg/cli/dev.py

+81-65
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,30 @@
1-
import os
2-
import signal
31
import subprocess
4-
import sys
52
import time
6-
from collections.abc import Iterator, Mapping, Sequence
3+
from collections.abc import Iterator, Mapping
74
from contextlib import contextmanager, nullcontext
85
from pathlib import Path
96
from tempfile import NamedTemporaryFile
107
from typing import Optional, TypeVar
118

129
import click
13-
import psutil
1410
import yaml
1511

1612
from dagster_dg.cli.global_options import dg_global_options
1713
from dagster_dg.config import normalize_cli_config
1814
from dagster_dg.context import DgContext
1915
from dagster_dg.error import DgError
20-
from dagster_dg.utils import DgClickCommand, exit_with_error, pushd
16+
from dagster_dg.utils import (
17+
DgClickCommand,
18+
exit_with_error,
19+
get_venv_executable,
20+
pushd,
21+
)
22+
from dagster_dg.utils.ipc import (
23+
get_ipc_shutdown_pipe,
24+
interrupt_on_ipc_shutdown_message,
25+
open_ipc_subprocess,
26+
send_ipc_shutdown_message,
27+
)
2128

2229
T = TypeVar("T")
2330

@@ -69,6 +76,16 @@
6976
show_default=True,
7077
required=False,
7178
)
79+
@click.option(
80+
"--shutdown-pipe",
81+
type=click.INT,
82+
required=False,
83+
hidden=True,
84+
help=(
85+
"Internal use only. Pass a readable pipe file descriptor to the dg dev process"
86+
" that will be monitored for a shutdown signal. Useful to interrupt the process in CI."
87+
),
88+
)
7289
@dg_global_options
7390
@click.pass_context
7491
def dev_command(
@@ -79,6 +96,7 @@ def dev_command(
7996
port: Optional[int],
8097
host: Optional[str],
8198
live_data_poll_rate: int,
99+
shutdown_pipe: Optional[int],
82100
**global_options: Mapping[str, object],
83101
) -> None:
84102
"""Start a local deployment of your Dagster project.
@@ -99,16 +117,45 @@ def dev_command(
99117
*_format_forwarded_option("--live-data-poll-rate", live_data_poll_rate),
100118
]
101119

120+
read_fd, write_fd = get_ipc_shutdown_pipe()
121+
shutdown_pipe_options = ["--shutdown-pipe", str(read_fd)]
122+
123+
other_options = [*shutdown_pipe_options, *forward_options]
124+
102125
# In a code location context, we can just run `dagster dev` directly, using `dagster` from the
103126
# code location's environment.
104127
if dg_context.is_code_location:
105128
cmd_location = dg_context.get_executable("dagster")
106129
if dg_context.use_dg_managed_environment:
107-
cmd = ["uv", "run", "dagster", "dev", *forward_options]
130+
cmd = ["uv", "run", "dagster", "dev", *other_options]
108131
else:
109-
cmd = [cmd_location, "dev", *forward_options]
132+
cmd = [cmd_location, "dev", *other_options]
110133
temp_workspace_file_cm = nullcontext()
111134

135+
# In a deployment context with a venv containing dagster and dagster-webserver (both are
136+
# required for `dagster dev`), we can run `dagster dev` using whatever is installed in the
137+
# deployment venv.
138+
elif (
139+
dg_context.is_deployment
140+
and dg_context.has_venv
141+
and dg_context.has_executable("dagster")
142+
and dg_context.has_executable("dagster-webserver")
143+
):
144+
# --no-project because we might not have the necessary fields in deployment pyproject.toml
145+
cmd = [
146+
"uv",
147+
"run",
148+
# Unclear why this is necessary, but it seems to be in CI. May be a uv version issue.
149+
"--python",
150+
get_venv_executable(dg_context.venv_path),
151+
"--no-project",
152+
"dagster",
153+
"dev",
154+
*other_options,
155+
]
156+
cmd_location = dg_context.get_executable("dagster")
157+
temp_workspace_file_cm = _temp_workspace_file(dg_context)
158+
112159
# In a deployment context, dg dev will construct a temporary
113160
# workspace file that points at all defined code locations and invoke:
114161
#
@@ -127,7 +174,7 @@ def dev_command(
127174
"dagster-webserver",
128175
"dagster",
129176
"dev",
130-
*forward_options,
177+
*other_options,
131178
]
132179
cmd_location = "ephemeral dagster dev"
133180
temp_workspace_file_cm = _temp_workspace_file(dg_context)
@@ -138,32 +185,32 @@ def dev_command(
138185
print(f"Using {cmd_location}") # noqa: T201
139186
if workspace_file: # only non-None deployment context
140187
cmd.extend(["--workspace", workspace_file])
141-
uv_run_dagster_dev_process = _open_subprocess(cmd)
142-
try:
143-
while True:
144-
time.sleep(_CHECK_SUBPROCESS_INTERVAL)
145-
if uv_run_dagster_dev_process.poll() is not None:
146-
raise DgError(
147-
f"dagster-dev process shut down unexpectedly with return code {uv_run_dagster_dev_process.returncode}."
148-
)
149-
except KeyboardInterrupt:
150-
click.secho(
151-
"Received keyboard interrupt. Shutting down dagster-dev process.", fg="yellow"
152-
)
153-
finally:
154-
# For reasons not fully understood, directly interrupting the `uv run` process does not
155-
# work as intended. The interrupt signal is not correctly propagated to the `dagster
156-
# dev` process, and so that process never shuts down. Therefore, we send the signal
157-
# directly to the `dagster dev` process (the only child of the `uv run` process). This
158-
# will cause `dagster dev` to terminate which in turn will cause `uv run` to terminate.
159-
dagster_dev_pid = _get_child_process_pid(uv_run_dagster_dev_process)
160-
_interrupt_subprocess(dagster_dev_pid)
161-
188+
uv_run_dagster_dev_process = open_ipc_subprocess(cmd, pass_fds=[read_fd])
189+
with interrupt_on_ipc_shutdown_message(shutdown_pipe) if shutdown_pipe else nullcontext():
162190
try:
163-
uv_run_dagster_dev_process.wait(timeout=10)
164-
except subprocess.TimeoutExpired:
165-
click.secho("`dagster dev` did not terminate in time. Killing it.")
166-
uv_run_dagster_dev_process.kill()
191+
while True:
192+
time.sleep(_CHECK_SUBPROCESS_INTERVAL)
193+
if uv_run_dagster_dev_process.poll() is not None:
194+
raise DgError(
195+
f"dagster-dev process shut down unexpectedly with return code {uv_run_dagster_dev_process.returncode}."
196+
)
197+
except KeyboardInterrupt:
198+
click.secho(
199+
"Received keyboard interrupt. Shutting down dagster-dev process.", fg="yellow"
200+
)
201+
finally:
202+
# For reasons not fully understood, directly interrupting the `uv run` process does not
203+
# work as intended. The interrupt signal is not correctly propagated to the `dagster
204+
# dev` process, and so that process never shuts down. Therefore, we send the signal
205+
# directly to the `dagster dev` process (the only child of the `uv run` process). This
206+
# will cause `dagster dev` to terminate which in turn will cause `uv run` to terminate.
207+
send_ipc_shutdown_message(write_fd)
208+
209+
try:
210+
uv_run_dagster_dev_process.wait(timeout=10)
211+
except subprocess.TimeoutExpired:
212+
click.secho("`dagster dev` did not terminate in time. Killing it.")
213+
uv_run_dagster_dev_process.kill()
167214

168215

169216
@contextmanager
@@ -188,34 +235,3 @@ def _temp_workspace_file(dg_context: DgContext) -> Iterator[str]:
188235

189236
def _format_forwarded_option(option: str, value: object) -> list[str]:
190237
return [] if value is None else [option, str(value)]
191-
192-
193-
def _get_child_process_pid(proc: "subprocess.Popen") -> int:
194-
children = psutil.Process(proc.pid).children(recursive=False)
195-
if len(children) != 1:
196-
raise ValueError(f"Expected exactly one child process, but found {len(children)}")
197-
return children[0].pid
198-
199-
200-
# Windows subprocess termination utilities. See here for why we send CTRL_BREAK_EVENT on Windows:
201-
# https://stefan.sofa-rockers.org/2013/08/15/handling-sub-process-hierarchies-python-linux-os-x/
202-
203-
204-
def _interrupt_subprocess(pid: int) -> None:
205-
"""Send CTRL_BREAK_EVENT on Windows, SIGINT on other platforms."""
206-
if sys.platform == "win32":
207-
os.kill(pid, signal.CTRL_BREAK_EVENT)
208-
else:
209-
os.kill(pid, signal.SIGINT)
210-
211-
212-
def _open_subprocess(command: Sequence[str]) -> "subprocess.Popen":
213-
"""Sets the correct flags to support graceful termination."""
214-
creationflags = 0
215-
if sys.platform == "win32":
216-
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
217-
218-
return subprocess.Popen(
219-
command,
220-
creationflags=creationflags,
221-
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import _thread as thread
2+
import os
3+
import signal
4+
import subprocess
5+
import sys
6+
import threading
7+
from collections.abc import Iterator, Sequence
8+
from contextlib import contextmanager
9+
from typing import Any, Callable
10+
11+
# NOTE: This is copied over from dagster core. It primarily supports testing of dagster dev in
12+
# varied CI environments, where use of signals is dicey cross-platform.
13+
14+
15+
def open_ipc_subprocess(parts: Sequence[str], **kwargs: Any) -> "subprocess.Popen[Any]":
16+
creationflags = 0
17+
if sys.platform == "win32":
18+
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
19+
20+
# pass_fds is not supported on Windows. Instead we set close_fds to False, which will allow
21+
# any inheritable file descriptors marked as inheritable to be inherited by the child
22+
# process.
23+
if kwargs.get("pass_fds"):
24+
del kwargs["pass_fds"]
25+
kwargs["close_fds"] = False
26+
27+
return subprocess.Popen(
28+
parts,
29+
creationflags=creationflags,
30+
**kwargs,
31+
)
32+
33+
34+
_PIPE_SHUTDOWN_INDICATOR = "SHUTDOWN"
35+
36+
37+
def get_ipc_shutdown_pipe() -> tuple[int, int]:
38+
r_fd, w_fd = os.pipe()
39+
40+
# On windows, convert fd to a Windows handle so it can be reliably passed across processes.
41+
if sys.platform == "win32":
42+
import msvcrt
43+
44+
os.set_inheritable(r_fd, True)
45+
r_fd = msvcrt.get_osfhandle(r_fd)
46+
return r_fd, w_fd
47+
48+
49+
@contextmanager
50+
def monitor_ipc_shutdown_pipe(pipe_fd: int, handler: Callable[[], None]) -> Iterator[None]:
51+
"""Monitor the passed in pipe file descriptor for the shutdown indicator message.
52+
When received, trigger the handler.
53+
54+
Args:
55+
pipe_fd: The file descriptor of the pipe to monitor. Must be readable.
56+
If on windows, this is assumed to be a Windows handle rather than a regular file
57+
descriptor.
58+
handler: The handler to call when the shutdown indicator is received.
59+
"""
60+
# On windows, we expect to receive a raw Windows handle rather than a regular file descriptor.
61+
# Convert to a file descriptor before reading.
62+
if sys.platform == "win32":
63+
import msvcrt
64+
65+
pipe_fd = msvcrt.open_osfhandle(pipe_fd, os.O_RDONLY)
66+
67+
break_event = threading.Event()
68+
69+
def _watch_pipe_for_shutdown():
70+
with open(pipe_fd) as pipe:
71+
while not break_event.is_set():
72+
line = pipe.readline()
73+
if not line: # EOF or pipe closed
74+
break_event.set()
75+
elif _PIPE_SHUTDOWN_INDICATOR in line.strip():
76+
break_event.set()
77+
handler()
78+
79+
# Start a background thread that watches the pipe
80+
monitor_thread = threading.Thread(target=_watch_pipe_for_shutdown, daemon=True)
81+
monitor_thread.start()
82+
83+
try:
84+
yield
85+
finally:
86+
# Signal the thread to exit and wait for it to stop
87+
break_event.set()
88+
monitor_thread.join()
89+
90+
91+
@contextmanager
92+
def interrupt_on_ipc_shutdown_message(pipe_fd: int) -> Iterator[None]:
93+
"""Monitor the passed in pipe file descriptor for the shutdown indicator message. Interrupt the
94+
current process when the message is received.
95+
96+
Args:
97+
pipe_fd: The file descriptor of the pipe to monitor. Must be readable.
98+
If on windows, this is assumed to be raw Windows handle rather than a regular file
99+
descriptor.
100+
"""
101+
# Important to use `send_interrupt` here rather than unconditionally sending a signal. Sending a
102+
# signal, even to the process itself, often has strange behavior on windows.
103+
with monitor_ipc_shutdown_pipe(pipe_fd, handler=lambda: send_interrupt()):
104+
yield
105+
106+
107+
def send_ipc_shutdown_message(w_fd: int) -> None:
108+
os.write(w_fd, f"{_PIPE_SHUTDOWN_INDICATOR}\n".encode())
109+
os.close(w_fd)
110+
111+
112+
def send_interrupt() -> None:
113+
if sys.platform == "win32":
114+
thread.interrupt_main()
115+
else:
116+
# If on unix send an os level signal to interrupt any situation we may be stuck in
117+
os.kill(os.getpid(), signal.SIGINT)

0 commit comments

Comments
 (0)