|
| 1 | +"""Data Ingestion""" |
| 2 | + |
| 3 | +import logging |
| 4 | +import pathlib |
| 5 | +from datetime import datetime |
| 6 | + |
| 7 | +import pandas as pd |
| 8 | +from langchain_huggingface.embeddings import HuggingFaceEmbeddings |
| 9 | +from langchain_postgres import PGVector |
| 10 | + |
| 11 | +from constants import ( |
| 12 | + DEVICE, |
| 13 | + DIRECTORY_PATH, |
| 14 | + KNOWLEDGE_REPOSITORY_PATH, |
| 15 | + PGVECTOR_DATABASE_NAME, |
| 16 | + PGVECTOR_HOST, |
| 17 | + PGVECTOR_PASS, |
| 18 | + PGVECTOR_PORT, |
| 19 | + PGVECTOR_USER, |
| 20 | +) |
| 21 | +from split import load_documents, split_document |
| 22 | + |
| 23 | +logger = logging.getLogger(__name__) |
| 24 | + |
| 25 | + |
| 26 | +def get_embedder(embedding_model_name: str) -> HuggingFaceEmbeddings: |
| 27 | + """Initialize an embedder to convert text into vectors.""" |
| 28 | + return HuggingFaceEmbeddings( |
| 29 | + model_name=embedding_model_name, |
| 30 | + model_kwargs={"device": DEVICE}, |
| 31 | + show_progress=True, |
| 32 | + ) |
| 33 | + |
| 34 | + |
| 35 | +def ingest( |
| 36 | + meta_lookup: dict[pathlib.Path, dict], |
| 37 | + collection_name: str, |
| 38 | + chunk_size: int, |
| 39 | + chunk_overlap: int, |
| 40 | + ingest_threads: int = 8, |
| 41 | + embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2", |
| 42 | + mode: str = "overwrite", |
| 43 | + collection_metadata: dict = {}, |
| 44 | +): |
| 45 | + """Load documents into a vectorstore.""" |
| 46 | + # Get documents |
| 47 | + all_documents = [] |
| 48 | + origin_urls = {} |
| 49 | + documents = load_documents(KNOWLEDGE_REPOSITORY_PATH, ingest_threads=ingest_threads) |
| 50 | + for extension, document in documents: |
| 51 | + # Split each document into chunks |
| 52 | + document = document[0] |
| 53 | + # Rename "source" to "_source" and save filename to "source" |
| 54 | + source = pathlib.Path(document.metadata["source"]) |
| 55 | + file_name = source.stem |
| 56 | + document.metadata["_source"] = document.metadata["source"] |
| 57 | + document.metadata["source"] = file_name |
| 58 | + chunks = split_document( |
| 59 | + document, extension, chunk_size=chunk_size, chunk_overlap=chunk_overlap |
| 60 | + ) |
| 61 | + # Attach metadata to each chunk |
| 62 | + for chunk in chunks: |
| 63 | + path_metadata = meta_lookup.get(source, {}) |
| 64 | + chunk.metadata = chunk.metadata | path_metadata |
| 65 | + # Record how many chunks were made |
| 66 | + rel_path = source.relative_to(KNOWLEDGE_REPOSITORY_PATH) |
| 67 | + origin = rel_path.parts[0] |
| 68 | + origin_url = (origin, chunk.metadata.get("url")) |
| 69 | + origin_urls[origin_url] = len(chunks) |
| 70 | + all_documents.extend(chunks) |
| 71 | + |
| 72 | + # Create embeddings |
| 73 | + embedder = get_embedder(embedding_model_name) |
| 74 | + |
| 75 | + # Build the Postgres connection string |
| 76 | + connection_string = PGVector.connection_string_from_db_params( |
| 77 | + driver="psycopg", |
| 78 | + host=PGVECTOR_HOST, |
| 79 | + port=int(PGVECTOR_PORT), |
| 80 | + database=PGVECTOR_DATABASE_NAME, |
| 81 | + user=PGVECTOR_USER, |
| 82 | + password=PGVECTOR_PASS, |
| 83 | + ) |
| 84 | + |
| 85 | + # Connect to the db |
| 86 | + db = PGVector( |
| 87 | + connection=connection_string, |
| 88 | + embeddings=embedder, |
| 89 | + collection_name=collection_name, |
| 90 | + collection_metadata=collection_metadata, |
| 91 | + use_jsonb=True, |
| 92 | + ) |
| 93 | + |
| 94 | + # Overwrite the collection (if requested) |
| 95 | + if mode == "overwrite": |
| 96 | + db.delete_collection() |
| 97 | + logger.info(f"Collection {collection_name} deleted") |
| 98 | + db.create_collection() |
| 99 | + logger.info(f"Collection {collection_name} created") |
| 100 | + |
| 101 | + # Load the documents |
| 102 | + logger.info( |
| 103 | + f"Loading {len(all_documents)} embeddings to {PGVECTOR_HOST} - {PGVECTOR_DATABASE_NAME} - {collection_name}" |
| 104 | + ) |
| 105 | + |
| 106 | + # Add documents to DB in batches to accomodate the large numbers of parameters |
| 107 | + batch_size = 150 |
| 108 | + for i in range(0, len(all_documents), batch_size): |
| 109 | + batch = all_documents[i:i + batch_size] |
| 110 | + logger.info(f"Ingesting batch {i // batch_size + 1} of {len(batch)} documents") |
| 111 | + db.add_documents(documents=batch) |
| 112 | + |
| 113 | + logger.info(f"Successfully loaded {len(all_documents)} embeddings") |
| 114 | + |
| 115 | + directory_source_url_chunks = [ |
| 116 | + list(origin_url) + [chunks] for origin_url, chunks in origin_urls.items() |
| 117 | + ] |
| 118 | + df = pd.DataFrame(directory_source_url_chunks, columns=["origin", "url", "chunks"]) |
| 119 | + filename = f"{PGVECTOR_HOST} - {collection_name} - {datetime.now()}.csv" |
| 120 | + outpath = DIRECTORY_PATH / "logs" / filename |
| 121 | + outpath.parent.mkdir(parents=True, exist_ok=True) |
| 122 | + df.to_csv(outpath, index=False) |
0 commit comments