11
22import pandas as pd
3- from typing import List , Dict , Any , Optional
3+ from typing import List , Dict , Any , Optional , TypedDict
44from pathlib import Path
55import numpy as np
6- from llama_index .core .schema import Document
7- from llama_index .core .node_parser import SentenceSplitter
6+ from src .embedder .embedder import EmbeddingWrapper
87
98from datetime import datetime
109from dataclasses import dataclass
11-
1210from loguru import logger
13-
1411from src .config .config import Config
1512
13+
1614@dataclass
17- class DocumentMetadata :
15+ class DocumentMetadata ( TypedDict ) :
1816 """Class to store document metadata"""
1917 source_file : str
2018 ingestion_timestamp : str
2119 last_updated_timestamp : str
2220 embedding_version : str
23- embedding_model : str
21+ embedding_model_name : str
2422 processing_status : str
2523
2624
25+
26+ class ProcessedChunk (TypedDict ):
27+ """Type definition for processed file chunks."""
28+ embeddings : List [float ]
29+ text : str
30+ metadata : str
31+
32+
33+
2734class CsvParser :
2835
29- def __init__ (self , data_dir : str , embedding_version : str = Config .EMBEDDING_VERSION_NUMBER , embedding_model : str = Config .EMBEDDING_MODEL ) -> None :
36+ def __init__ (self , data_dir : str , embedding_version : str = Config .EMBEDDING_VERSION_NUMBER , embedding_model_name : str = Config .EMBEDDING_MODEL ) -> None :
3037 self .data_dir = Path (data_dir )
3138 self .embedding_version = embedding_version
32- self .embedding_model = embedding_model
33- self .node_parser = SentenceSplitter ( chunk_size = 1200 , chunk_overlap = 200 )
34-
39+ self .embedding_model_name = embedding_model_name
40+ self .embedder = EmbeddingWrapper ( )
41+ self . chunks : List [ ProcessedChunk ] = []
3542
3643 def create_document_metadata (self , row : pd .Series , file_name : str ,) -> DocumentMetadata :
3744 """Create comprehensive document metadata"""
@@ -42,7 +49,7 @@ def create_document_metadata(self, row: pd.Series, file_name: str,) -> DocumentM
4249 ingestion_timestamp = current_time ,
4350 last_updated_timestamp = current_time ,
4451 embedding_version = self .embedding_version ,
45- embedding_model = self .embedding_model , # In practice, this might be different
52+ embedding_model_name = self .embedding_model_name , # In practice, this might be different
4653 processing_status = "processed" ,
4754 )
4855
@@ -68,35 +75,35 @@ def read_file(self, file_path: Path) -> pd.DataFrame:
6875 return df
6976
7077
71- def process_file (self , file_path : Path ) -> List [ Document ] :
78+ def process_file (self , file_path : Path ) -> None :
7279 """Process a single CSV file with enhanced metadata and version control"""
7380 try :
7481 logger .info (f"Processing file: { file_path } " )
7582
7683 # Read CSV file
7784 df = self .read_file (file_path )
7885
79- documents = []
8086 for _ , row in df .iterrows ():
8187 # Combine text fields
8288 text_content = self .get_text (row )
8389
8490 # Create comprehensive metadata
8591 metadata = self .create_document_metadata (row , file_path .name )
86-
92+ embeddings = self .embedder .generate_embeddings (text_content )
93+
94+
8795 # Create Document object with enhanced metadata
88- doc = Document (
89- text = text_content ,
90- metadata = metadata .__dict__
91- )
92-
93- nodes = self .node_parser .get_nodes_from_documents ([doc ])
94- documents .extend (
95- [Document (text = node .text , metadata = node .metadata ) for node in nodes ]
96- )
96+ doc : ProcessedChunk = {
97+ "embeddings" : embeddings ,
98+ "text" :text_content ,
99+ "metadata" :"metadata"
100+ }
101+
102+
103+ self .chunks .append (doc )
97104
98- logger .info (f"Successfully processed { len ( documents ) } documents from { file_path } " )
99- return documents
105+ logger .info (f"Successfully processed all documents from { file_path } " )
106+
100107
101108 except Exception as e :
102109 logger .error (f"Error processing file { file_path } : { str (e )} " )
@@ -126,18 +133,18 @@ def get_text(self, row: pd.Series) -> str:
126133 return " | " .join (text_parts )
127134
128135
129- def process_directory (self ) -> List [Document ]:
136+ def process_directory (self ) -> List [Dict [ str , Any ] ]:
130137 """Process all CSV files in directory"""
131138 all_documents = []
132139
133140 logger .info ("Attempting to read all .csv files and indexing...." )
134141 for file_path in self .data_dir .glob ('*.csv' ):
135142 try :
136- documents = self .process_file (file_path )
137- all_documents .extend (documents )
143+ self .process_file (file_path )
138144 except Exception as e :
139145 logger .error (f"Skipping file { file_path } due to error: { str (e )} " )
140146 continue
147+
141148
142149 logger .info ("All .csv files processed. Returning chunks..." )
143- return all_documents
150+ return self . chunks
0 commit comments