Skip to content

Commit b99b6ad

Browse files
committed
fix: improve ChromaDB provisioner reliability and dataset deletion safety
- Verify actual OS process before reusing DB-marked-running provisioner - Wait for ChromaDB HTTP server health before marking provisioner ready - Use psutil for PID identity validation to prevent wrong-process kills - Add wait_until_ready() to provisioner protocol with no-op default - Bundle rapidocr data files in PyInstaller spec (docling dependency) - Catch chromadb.errors.NotFoundError on collection delete for un-ingested datasets - Stop ingestion (cancel pending jobs + stop watcher) before dataset deletion - Add TOCTOU guard: re-check dataset exists before ingest() call
1 parent 33c2da7 commit b99b6ad

7 files changed

Lines changed: 125 additions & 9 deletions

File tree

backend/syft-space-backend.spec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ PACKAGES_WITH_DATA = [
2323
'docling_core',
2424
'docling_ibm_models',
2525
'docling_parse',
26+
'rapidocr',
2627
]
2728

2829
# Packages that use importlib.metadata at runtime (need their dist-info).

backend/syft_space/components/dataset_types/chromadb_local/chromadb_provisioner.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,20 +228,50 @@ async def status(cls, state: dict[str, Any]) -> str:
228228
else:
229229
return "starting"
230230

231+
@classmethod
232+
async def wait_until_ready(cls, state: dict[str, Any]) -> None:
233+
"""Wait for ChromaDB HTTP server to be healthy.
234+
235+
Args:
236+
state: State dict from start()
237+
238+
Raises:
239+
TimeoutError: If not healthy within 60s or no port in state
240+
"""
241+
http_port = state.get("httpPort") or state.get("http_port")
242+
if not http_port:
243+
raise TimeoutError("No HTTP port in provisioner state")
244+
await cls._wait_for_healthy(int(http_port))
245+
231246
@classmethod
232247
async def _is_process_running(cls, pid: int) -> bool:
233-
"""Check if a process is running.
248+
"""Check if a ChromaDB process is running.
249+
250+
Uses psutil to verify the PID belongs to a ChromaDB process
251+
(not a reused PID for something else). Falls back to basic
252+
os.kill check only if psutil is not installed.
234253
235254
Args:
236255
pid: Process ID to check
237256
238257
Returns:
239-
True if process exists, False otherwise
258+
True if process exists and is a ChromaDB process, False otherwise
240259
"""
241260
try:
242-
os.kill(pid, 0)
243-
return True
244-
except (OSError, ProcessLookupError):
261+
import psutil
262+
except ImportError:
263+
# psutil not available — fall back to basic PID check
264+
try:
265+
os.kill(pid, 0)
266+
return True
267+
except (OSError, ProcessLookupError):
268+
return False
269+
270+
try:
271+
proc = psutil.Process(pid)
272+
cmdline = " ".join(proc.cmdline()).lower()
273+
return "chroma" in cmdline
274+
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
245275
return False
246276

247277
@classmethod

backend/syft_space/components/dataset_types/chromadb_local/chromadb_type.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import Any
1313

1414
from anyio import Path as AsyncPath
15+
from loguru import logger
1516
from pydantic import BaseModel, Field, ValidationError, field_validator
1617

1718
from syft_space.components.dataset_types.chunking import (
@@ -34,6 +35,12 @@
3435
from syft_space.components.shared.utils import ConfigSchemaGenerator
3536

3637

38+
try:
39+
from chromadb.errors import NotFoundError as _ChromaNotFoundError
40+
except ImportError:
41+
_ChromaNotFoundError = None
42+
43+
3744
def _import_chromadb() -> ModuleType:
3845
try:
3946
import chromadb
@@ -575,7 +582,14 @@ async def delete(self, ctx: IngestContext) -> None:
575582
try:
576583
await client.delete_collection(name=self.collection_name)
577584
except Exception as e:
578-
raise ValueError(f"Error deleting collection: {str(e)}") from e
585+
# Collection may not exist if no documents were ever ingested
586+
if _ChromaNotFoundError and isinstance(e, _ChromaNotFoundError):
587+
logger.info(
588+
f"Collection '{self.collection_name}' does not exist, "
589+
"skipping deletion"
590+
)
591+
else:
592+
raise ValueError(f"Error deleting collection: {str(e)}") from e
579593

580594
# Remove all page images for this collection
581595
await asyncio.to_thread(

backend/syft_space/components/dataset_types/interfaces.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,21 @@ async def is_running(cls, state: dict[str, Any]) -> bool:
318318
"""
319319
...
320320

321+
@classmethod
322+
async def wait_until_ready(cls, state: dict[str, Any]) -> None:
323+
"""Wait until the provisioned resource is ready to accept connections.
324+
325+
Default is a no-op. Override in subclasses that need startup health
326+
checks (e.g., HTTP server readiness).
327+
328+
Args:
329+
state: State dictionary returned from start()
330+
331+
Raises:
332+
TimeoutError: If not ready within implementation-defined timeout
333+
"""
334+
return None
335+
321336
@classmethod
322337
async def status(cls, state: dict[str, Any]) -> str:
323338
"""Get detailed status of the resource.

backend/syft_space/components/datasets/handlers.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,41 @@ async def _ensure_provisioner_running(
9191
# Remote type - no provisioner needed
9292
return None
9393

94-
# Check if already running
94+
# Check if already running (verify actual process, not just DB state)
9595
existing = await self.provisioner_state_repository.get_running_by_dtype(dtype)
9696
if existing:
97-
logger.info(f"Reusing existing provisioner for dtype '{dtype}'")
98-
return existing
97+
state = existing.state or {}
98+
if await provisioner_cls.is_running(state):
99+
# Process is alive, but the server may still be booting
100+
# (e.g. after an app restart the OS process survives but
101+
# ChromaDB's HTTP server hasn't finished starting yet).
102+
# Wait for it to be healthy before returning.
103+
try:
104+
await provisioner_cls.wait_until_ready(state)
105+
except (TimeoutError, Exception) as exc:
106+
logger.warning(
107+
f"Provisioner for '{dtype}' process alive but "
108+
f"not ready ({exc}), restarting..."
109+
)
110+
await provisioner_cls.stop(state)
111+
await self.provisioner_state_repository.upsert_status(
112+
dtype=dtype,
113+
status=ProvisionerStatus.STOPPED,
114+
)
115+
# Fall through to start a new one
116+
existing = None
117+
if existing:
118+
logger.info(f"Reusing existing provisioner for dtype '{dtype}'")
119+
return existing
120+
else:
121+
logger.warning(
122+
f"Provisioner for '{dtype}' marked as running in DB "
123+
f"but process is dead, restarting..."
124+
)
125+
await self.provisioner_state_repository.upsert_status(
126+
dtype=dtype,
127+
status=ProvisionerStatus.STOPPED,
128+
)
99129

100130
# Transition to STARTING (creates or updates, guards checked)
101131
logger.info(f"Starting provisioner for dtype '{dtype}'")

backend/syft_space/components/datasets/routes.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,23 @@ async def delete_dataset(
231231
) -> dict[str, str]:
232232
"""Delete a dataset.
233233
234+
Stops any active/pending ingestion before cleaning up resources.
235+
234236
Args:
235237
name: Dataset name
236238
tenant: Current tenant (injected)
237239
238240
Returns:
239241
Success message
240242
"""
243+
# Stop ingestion (cancel pending jobs, stop file watcher) before
244+
# deleting so that a queued job cannot recreate the collection after
245+
# we clean it up.
246+
if ingestion_manager:
247+
dataset = await handler.repository.get_by_name(name, tenant.id)
248+
if dataset:
249+
await ingestion_manager.stop_dataset_ingestion(dataset.id)
250+
241251
return await handler.delete_dataset(name, tenant)
242252

243253
@public_route

backend/syft_space/components/ingestion/manager.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,22 @@ async def _process_single_job(self, job: IngestionJob) -> None:
512512
# Create context (use system context for background ingestion)
513513
ctx = IngestContext(sender="system@openmined.org", dataset_id=dataset.id)
514514

515+
# Re-check dataset exists right before ingest to close TOCTOU
516+
# window (dataset could have been deleted during file I/O above)
517+
dataset = await self._dataset_repository.get_by_id(
518+
job.dataset_id, job.tenant_id
519+
)
520+
if not dataset:
521+
logger.info(
522+
f"Dataset deleted during ingestion prep for job {job.id}"
523+
)
524+
await self._ingestion_repository.update_status(
525+
job.id,
526+
IngestionJobStatus.CANCELLED,
527+
"Dataset deleted during processing",
528+
)
529+
return
530+
515531
# Call ingest (native async)
516532
await dataset_type.ingest(ctx, ingest_request)
517533

0 commit comments

Comments
 (0)