Skip to content

Commit 813c511

Browse files
committed
refactor: prefer asyncio subprocess, fall back to Popen on Windows
Use asyncio.create_subprocess_exec for non-blocking pipe reads when the event loop supports it. Fall back to subprocess.Popen with daemon threads on Windows SelectorEventLoop (NotImplementedError).
1 parent 5f52a7b commit 813c511

1 file changed

Lines changed: 61 additions & 31 deletions

File tree

backend/syft_space/components/dataset_types/chromadb_local/chromadb_provisioner.py

Lines changed: 61 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -110,39 +110,69 @@ async def start(cls, config: dict[str, Any]) -> dict[str, Any]:
110110
"--host",
111111
"0.0.0.0",
112112
]
113-
# Use subprocess.Popen instead of asyncio.create_subprocess_exec
114-
# because Windows SelectorEventLoop (used by uvicorn) does not
115-
# support asyncio subprocess creation.
116-
proc = subprocess.Popen(
117-
cmd,
118-
stdout=subprocess.PIPE,
119-
stderr=subprocess.PIPE,
120-
)
113+
# Prefer asyncio subprocess for non-blocking pipe reads; fall back
114+
# to sync Popen on Windows SelectorEventLoop which doesn't support it.
115+
pid: int
116+
try:
117+
async_proc = await asyncio.create_subprocess_exec(
118+
*cmd,
119+
stdout=asyncio.subprocess.PIPE,
120+
stderr=asyncio.subprocess.PIPE,
121+
)
122+
if async_proc.pid is None:
123+
raise RuntimeError("Failed to start ChromaDB: no PID")
124+
pid = async_proc.pid
125+
126+
# Log streams via async tasks
127+
async def _async_log(stream: asyncio.StreamReader, level: str) -> None:
128+
while True:
129+
line = await stream.readline()
130+
if not line:
131+
break
132+
text = line.decode(errors="replace").rstrip()
133+
if text:
134+
if level == "err":
135+
logger.warning(f"[chromadb] {text}")
136+
else:
137+
logger.debug(f"[chromadb] {text}")
138+
139+
if async_proc.stdout:
140+
asyncio.create_task(_async_log(async_proc.stdout, "out"))
141+
if async_proc.stderr:
142+
asyncio.create_task(_async_log(async_proc.stderr, "err"))
143+
144+
except NotImplementedError:
145+
# Windows SelectorEventLoop — fall back to sync subprocess
146+
sync_proc = subprocess.Popen(
147+
cmd,
148+
stdout=subprocess.PIPE,
149+
stderr=subprocess.PIPE,
150+
)
151+
pid = sync_proc.pid
152+
153+
# Log streams via daemon threads
154+
def _sync_log(stream, level: str) -> None:
155+
for line in stream:
156+
text = line.decode(errors="replace").rstrip()
157+
if text:
158+
if level == "err":
159+
logger.warning(f"[chromadb] {text}")
160+
else:
161+
logger.debug(f"[chromadb] {text}")
162+
163+
if sync_proc.stdout:
164+
threading.Thread(
165+
target=_sync_log, args=(sync_proc.stdout, "out"), daemon=True
166+
).start()
167+
if sync_proc.stderr:
168+
threading.Thread(
169+
target=_sync_log, args=(sync_proc.stderr, "err"), daemon=True
170+
).start()
121171

122172
# Save PID for later management (async write)
123-
await pid_file.write_text(str(proc.pid))
124-
125-
logger.info(f"Started ChromaDB server with PID {proc.pid}")
126-
127-
# Log subprocess output in background threads (not async tasks,
128-
# since we're using sync Popen pipes)
129-
def _log_stream(stream, level: str) -> None:
130-
for line in stream:
131-
text = line.decode(errors="replace").rstrip()
132-
if text:
133-
if level == "err":
134-
logger.warning(f"[chromadb] {text}")
135-
else:
136-
logger.debug(f"[chromadb] {text}")
137-
138-
if proc.stdout:
139-
threading.Thread(
140-
target=_log_stream, args=(proc.stdout, "out"), daemon=True
141-
).start()
142-
if proc.stderr:
143-
threading.Thread(
144-
target=_log_stream, args=(proc.stderr, "err"), daemon=True
145-
).start()
173+
await pid_file.write_text(str(pid))
174+
175+
logger.info(f"Started ChromaDB server with PID {pid}")
146176

147177
# Wait for health
148178
await cls._wait_for_healthy(http_port)

0 commit comments

Comments
 (0)