Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 66 additions & 65 deletions src/mcp_code_indexer/vector_mode/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
from ..database.models import Project
from .config import VectorConfig, load_vector_config
from .monitoring.file_watcher import create_file_watcher, FileWatcher
from .providers.voyage_client import VoyageClient, create_voyage_client
from .providers.turbopuffer_client import create_turbopuffer_client
from .services.embedding_service import EmbeddingService
from .services.vector_storage_service import VectorStorageService

from .monitoring.change_detector import FileChange, ChangeType
from .chunking.ast_chunker import ASTChunker
from .chunking.ast_chunker import ASTChunker, CodeChunk
from .types import (
ScanProjectTask,
VectorDaemonTaskType,
Expand Down Expand Up @@ -69,6 +73,16 @@ def __init__(
"last_activity": time.time(),
}

# Initialize VoyageClient and EmbeddingService for embedding generation
self._voyage_client = create_voyage_client(self.config)
self._embedding_service = EmbeddingService(self._voyage_client, self.config)

# Initialize TurbopufferClient and VectorStorageService for vector storage
self._turbopuffer_client = create_turbopuffer_client(self.config)
self._vector_storage_service = VectorStorageService(
self._turbopuffer_client, self.config
)

# Signal handling is delegated to the parent process

def _on_file_change(self, project_name: str) -> callable:
Expand Down Expand Up @@ -285,23 +299,26 @@ async def _process_file_change_task(
project_name: str = task["project_name"]
change: FileChange = task["change"]
logger.info(
f"Worker {worker_id}: File change detected for project {project_name}: {change.path} ({change.change_type.value})",
extra={
"structured_data": {
"worker_id": worker_id,
"project_name": project_name,
"file_path": str(change.path),
"change_type": change.change_type.value,
"timestamp": change.timestamp.isoformat(),
"size": change.size,
}
},
f"Worker {worker_id}: File change detected for project {project_name}: {change.path} ({change.change_type.value})"
)

try:
# Skip deleted files - only process created/modified files
# Handle deleted files by removing their vectors from the database
if change.change_type == ChangeType.DELETED:
logger.debug(f"Worker {worker_id}: Skipping deleted file {change.path}")
logger.info(
f"Worker {worker_id}: Deleting vectors for deleted file {change.path}"
)
try:
await self._vector_storage_service.delete_vectors_for_file(
project_name, str(change.path)
)
logger.info(
f"Worker {worker_id}: Successfully deleted vectors for {change.path}"
)
except Exception as e:
logger.error(
f"Worker {worker_id}: Failed to delete vectors for {change.path}: {e}"
)
return

# Initialize ASTChunker with default settings
Expand Down Expand Up @@ -335,40 +352,16 @@ async def _process_file_change_task(
redacted_count += 1

logger.info(
f"Worker {worker_id}: Chunked {change.path} into {chunk_count} chunks",
extra={
"structured_data": {
"worker_id": worker_id,
"project_name": project_name,
"file_path": str(change.path),
"chunk_count": chunk_count,
"chunk_types": chunk_types,
"redacted_chunks": redacted_count,
"file_size": change.size,
}
},
)

# Log detailed debug information
sample_chunks = [
f" [{i}] {chunk.chunk_type.value} - {chunk.name or 'unnamed'} "
f"(lines {chunk.start_line}-{chunk.end_line}, "
f"{len(chunk.content)} chars, redacted: {chunk.redacted})"
for i, chunk in enumerate(chunks[:3]) # Show first 3 chunks
]
logger.debug(
f"Worker {worker_id}: File {change.path} chunking details: "
f"Total chunks: {chunk_count}, "
f"Chunk types: {chunk_types}, "
f"Redacted chunks: {redacted_count}, "
f"Sample chunks: {', '.join(sample_chunks)}"
f"Worker {worker_id}: Chunked {change.path} into {chunk_count} chunks"
)

# Generate and store embeddings for chunks
embeddings = await self._generate_embeddings(
chunks, project_name, change.path
)
await self._store_embeddings(embeddings, project_name, change.path)
await self._store_embeddings(
embeddings, chunks, project_name, change.path
)

# Only increment stats for successfully chunked files
self.stats["files_processed"] += 1
Expand Down Expand Up @@ -430,22 +423,15 @@ async def _process_project_scan(self, task: dict, worker_id: str) -> None:
logger.debug(f"VectorDaemon: Created watcher for {project_name}")
# Initialize the watcher
await watcher.initialize()
logger.debug(
f"VectorDaemon: Initialized watcher for {project_name}"
)

# Add change callback
watcher.add_change_callback(self._on_file_change(project_name))
logger.debug(
f"VectorDaemon: Added change callback for {project_name}"
)

# Start watching
watcher.start_watching()
logger.debug(f"VectorDaemon: Started watching for {project_name}")

# Store watcher for later cleanup
self.file_watchers[project_name] = watcher
logger.debug(f"VectorDaemon: Stored watcher for {project_name}")

logger.info(
f"File watcher started for project {project_name}",
Expand All @@ -464,14 +450,6 @@ async def _process_project_scan(self, task: dict, worker_id: str) -> None:

self.stats["files_processed"] += 1

# TODO: Implement remaining vector processing:
# 1. Use watcher to get changed files since last scan
# 2. Chunk modified files using AST
# 3. Apply secret redaction
# 4. Generate embeddings via Voyage
# 5. Store in Turbopuffer
# 6. Update database metadata

except Exception as e:
logger.error(f"Error processing project {project_name}: {e}", exc_info=True)
self.stats["errors_count"] += 1
Expand Down Expand Up @@ -557,18 +535,41 @@ def get_status(self) -> dict:
}

async def _generate_embeddings(
self, chunks: list, project_name: str, file_path
self, chunks: list[CodeChunk], project_name: str, file_path: Path
) -> list[list[float]]:
"""Generate embeddings for file chunks."""
# TODO: implement
return []
"""Generate embeddings for file chunks using EmbeddingService."""
try:
embeddings = await self._embedding_service.generate_embeddings_for_chunks(
chunks, project_name, file_path
)

# Update daemon statistics
self.stats["embeddings_generated"] += len(embeddings)
self.stats["last_activity"] = time.time()

return embeddings

except Exception as e:
# Update error statistics
self.stats["errors_count"] += 1
raise

async def _store_embeddings(
self, embeddings: list[list[float]], project_name: str, file_path
self,
embeddings: list[list[float]],
chunks: list[CodeChunk],
project_name: str,
file_path: str,
) -> None:
"""Store embeddings in vector database."""
# TODO: implement
pass
try:
await self._vector_storage_service.store_embeddings(
embeddings, chunks, project_name, file_path
)
except Exception as e:
# Update error statistics
self.stats["errors_count"] += 1
raise


async def start_vector_daemon(
Expand Down
36 changes: 33 additions & 3 deletions src/mcp_code_indexer/vector_mode/monitoring/file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Callable, Optional, List, Dict, Any
import time
from concurrent.futures import ThreadPoolExecutor
from abc import ABC, abstractmethod

try:
from watchdog.observers import Observer, ObserverType
Expand All @@ -30,7 +31,28 @@
logger = logging.getLogger(__name__)


class BaseFileWatcher(ABC):
"""Abstract base class for file watchers."""

@abstractmethod
async def initialize(self) -> None:
"""Initialize the file watcher."""
pass

@abstractmethod
def add_change_callback(self, callback: Callable[[FileChange], None]) -> None:
"""Add a callback to be called when files change."""
pass

@abstractmethod
def start_watching(self) -> None:
"""Start watching for file changes."""
pass

@abstractmethod
def get_stats(self) -> Dict[str, Any]:
"""Get watcher statistics."""
pass


class VectorModeEventHandler(FileSystemEventHandler):
Expand Down Expand Up @@ -134,7 +156,7 @@ async def _process_after_debounce(self, file_path: str) -> None:
self.debounce_tasks.pop(file_path, None)


class FileWatcher:
class FileWatcher(BaseFileWatcher):
"""
Real-time file system watcher for vector mode.
Expand Down Expand Up @@ -372,7 +394,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):


# Fallback implementation for when watchdog is not available
class PollingFileWatcher:
class PollingFileWatcher(BaseFileWatcher):
"""
Fallback file watcher using polling instead of OS events.
Expand Down Expand Up @@ -442,14 +464,22 @@ async def _poll_loop(self) -> None:
logger.error(f"Error in polling loop: {e}")
await asyncio.sleep(self.poll_interval)

def get_stats(self) -> Dict[str, Any]:
"""Get watcher statistics."""
return {
"project_root": str(self.project_root),
"project_id": self.project_id,
"is_watching": self.is_watching,
}

def cleanup(self) -> None:
"""Clean up resources."""
self.stop_watching()


def create_file_watcher(
project_root: Path, project_id: str, use_polling: bool = False, **kwargs
) -> Any:
) -> BaseFileWatcher:
"""
Create appropriate file watcher based on availability.
Expand Down
Loading