1616from ..database .models import Project
1717from .config import VectorConfig , load_vector_config
1818from .monitoring .file_watcher import create_file_watcher , FileWatcher
19+ from .providers .voyage_client import VoyageClient , create_voyage_client
20+ from .providers .turbopuffer_client import create_turbopuffer_client
21+ from .services .embedding_service import EmbeddingService
22+ from .services .vector_storage_service import VectorStorageService
1923
2024from .monitoring .change_detector import FileChange , ChangeType
21- from .chunking .ast_chunker import ASTChunker
25+ from .chunking .ast_chunker import ASTChunker , CodeChunk
2226from .types import (
2327 ScanProjectTask ,
2428 VectorDaemonTaskType ,
@@ -69,6 +73,16 @@ def __init__(
6973 "last_activity" : time .time (),
7074 }
7175
76+ # Initialize VoyageClient and EmbeddingService for embedding generation
77+ self ._voyage_client = create_voyage_client (self .config )
78+ self ._embedding_service = EmbeddingService (self ._voyage_client , self .config )
79+
80+ # Initialize TurbopufferClient and VectorStorageService for vector storage
81+ self ._turbopuffer_client = create_turbopuffer_client (self .config )
82+ self ._vector_storage_service = VectorStorageService (
83+ self ._turbopuffer_client , self .config
84+ )
85+
7286 # Signal handling is delegated to the parent process
7387
7488 def _on_file_change (self , project_name : str ) -> callable :
@@ -285,23 +299,26 @@ async def _process_file_change_task(
285299 project_name : str = task ["project_name" ]
286300 change : FileChange = task ["change" ]
287301 logger .info (
288- f"Worker { worker_id } : File change detected for project { project_name } : { change .path } ({ change .change_type .value } )" ,
289- extra = {
290- "structured_data" : {
291- "worker_id" : worker_id ,
292- "project_name" : project_name ,
293- "file_path" : str (change .path ),
294- "change_type" : change .change_type .value ,
295- "timestamp" : change .timestamp .isoformat (),
296- "size" : change .size ,
297- }
298- },
302+ f"Worker { worker_id } : File change detected for project { project_name } : { change .path } ({ change .change_type .value } )"
299303 )
300304
301305 try :
302- # Skip deleted files - only process created/modified files
306+ # Handle deleted files by removing their vectors from the database
303307 if change .change_type == ChangeType .DELETED :
304- logger .debug (f"Worker { worker_id } : Skipping deleted file { change .path } " )
308+ logger .info (
309+ f"Worker { worker_id } : Deleting vectors for deleted file { change .path } "
310+ )
311+ try :
312+ await self ._vector_storage_service .delete_vectors_for_file (
313+ project_name , str (change .path )
314+ )
315+ logger .info (
316+ f"Worker { worker_id } : Successfully deleted vectors for { change .path } "
317+ )
318+ except Exception as e :
319+ logger .error (
320+ f"Worker { worker_id } : Failed to delete vectors for { change .path } : { e } "
321+ )
305322 return
306323
307324 # Initialize ASTChunker with default settings
@@ -335,40 +352,16 @@ async def _process_file_change_task(
335352 redacted_count += 1
336353
337354 logger .info (
338- f"Worker { worker_id } : Chunked { change .path } into { chunk_count } chunks" ,
339- extra = {
340- "structured_data" : {
341- "worker_id" : worker_id ,
342- "project_name" : project_name ,
343- "file_path" : str (change .path ),
344- "chunk_count" : chunk_count ,
345- "chunk_types" : chunk_types ,
346- "redacted_chunks" : redacted_count ,
347- "file_size" : change .size ,
348- }
349- },
350- )
351-
352- # Log detailed debug information
353- sample_chunks = [
354- f" [{ i } ] { chunk .chunk_type .value } - { chunk .name or 'unnamed' } "
355- f"(lines { chunk .start_line } -{ chunk .end_line } , "
356- f"{ len (chunk .content )} chars, redacted: { chunk .redacted } )"
357- for i , chunk in enumerate (chunks [:3 ]) # Show first 3 chunks
358- ]
359- logger .debug (
360- f"Worker { worker_id } : File { change .path } chunking details: "
361- f"Total chunks: { chunk_count } , "
362- f"Chunk types: { chunk_types } , "
363- f"Redacted chunks: { redacted_count } , "
364- f"Sample chunks: { ', ' .join (sample_chunks )} "
355+ f"Worker { worker_id } : Chunked { change .path } into { chunk_count } chunks"
365356 )
366357
367358 # Generate and store embeddings for chunks
368359 embeddings = await self ._generate_embeddings (
369360 chunks , project_name , change .path
370361 )
371- await self ._store_embeddings (embeddings , project_name , change .path )
362+ await self ._store_embeddings (
363+ embeddings , chunks , project_name , change .path
364+ )
372365
373366 # Only increment stats for successfully chunked files
374367 self .stats ["files_processed" ] += 1
@@ -430,22 +423,15 @@ async def _process_project_scan(self, task: dict, worker_id: str) -> None:
430423 logger .debug (f"VectorDaemon: Created watcher for { project_name } " )
431424 # Initialize the watcher
432425 await watcher .initialize ()
433- logger .debug (
434- f"VectorDaemon: Initialized watcher for { project_name } "
435- )
426+
436427 # Add change callback
437428 watcher .add_change_callback (self ._on_file_change (project_name ))
438- logger .debug (
439- f"VectorDaemon: Added change callback for { project_name } "
440- )
441429
442430 # Start watching
443431 watcher .start_watching ()
444- logger .debug (f"VectorDaemon: Started watching for { project_name } " )
445432
446433 # Store watcher for later cleanup
447434 self .file_watchers [project_name ] = watcher
448- logger .debug (f"VectorDaemon: Stored watcher for { project_name } " )
449435
450436 logger .info (
451437 f"File watcher started for project { project_name } " ,
@@ -464,14 +450,6 @@ async def _process_project_scan(self, task: dict, worker_id: str) -> None:
464450
465451 self .stats ["files_processed" ] += 1
466452
467- # TODO: Implement remaining vector processing:
468- # 1. Use watcher to get changed files since last scan
469- # 2. Chunk modified files using AST
470- # 3. Apply secret redaction
471- # 4. Generate embeddings via Voyage
472- # 5. Store in Turbopuffer
473- # 6. Update database metadata
474-
475453 except Exception as e :
476454 logger .error (f"Error processing project { project_name } : { e } " , exc_info = True )
477455 self .stats ["errors_count" ] += 1
@@ -557,18 +535,41 @@ def get_status(self) -> dict:
557535 }
558536
559537 async def _generate_embeddings (
560- self , chunks : list , project_name : str , file_path
538+ self , chunks : list [ CodeChunk ] , project_name : str , file_path : Path
561539 ) -> list [list [float ]]:
562- """Generate embeddings for file chunks."""
563- # TODO: implement
564- return []
540+ """Generate embeddings for file chunks using EmbeddingService."""
541+ try :
542+ embeddings = await self ._embedding_service .generate_embeddings_for_chunks (
543+ chunks , project_name , file_path
544+ )
545+
546+ # Update daemon statistics
547+ self .stats ["embeddings_generated" ] += len (embeddings )
548+ self .stats ["last_activity" ] = time .time ()
549+
550+ return embeddings
551+
552+ except Exception as e :
553+ # Update error statistics
554+ self .stats ["errors_count" ] += 1
555+ raise
565556
566557 async def _store_embeddings (
567- self , embeddings : list [list [float ]], project_name : str , file_path
558+ self ,
559+ embeddings : list [list [float ]],
560+ chunks : list [CodeChunk ],
561+ project_name : str ,
562+ file_path : str ,
568563 ) -> None :
569564 """Store embeddings in vector database."""
570- # TODO: implement
571- pass
565+ try :
566+ await self ._vector_storage_service .store_embeddings (
567+ embeddings , chunks , project_name , file_path
568+ )
569+ except Exception as e :
570+ # Update error statistics
571+ self .stats ["errors_count" ] += 1
572+ raise
572573
573574
574575async def start_vector_daemon (
0 commit comments