diff --git a/src/mvt/database.py b/src/mvt/database.py index 83deae6..f39544d 100644 --- a/src/mvt/database.py +++ b/src/mvt/database.py @@ -1,4 +1,7 @@ import sqlite3 +import os +import json +import re # This script creates a SQLite database to store user information. # It includes functions to create a connection to the database, create a table for users, @@ -119,4 +122,410 @@ def get_all_prompts(conn): return cur.fetchall() except sqlite3.Error as e: print(f"Error retrieving prompts: {e}") - return [] \ No newline at end of file + return [] + +# ==================== ADDITIONAL FUNCTIONS FOR OTHER TABLES ==================== + +def create_response_table(conn): + """Create a table to store responses""" + try: + sql = '''CREATE TABLE IF NOT EXISTS responses ( + id integer PRIMARY KEY, + answer text NOT NULL, + question text NOT NULL, + id_user integer, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (id_user) REFERENCES users (id) + );''' + conn.cursor().execute(sql) + conn.commit() + except sqlite3.Error as e: + print(e) + +def create_document_table(conn): + """Create a table to store documents""" + try: + sql = '''CREATE TABLE IF NOT EXISTS documents ( + id integer PRIMARY KEY, + source text NOT NULL, + metadata text, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + );''' + conn.cursor().execute(sql) + conn.commit() + except sqlite3.Error as e: + print(e) + +def create_docs_response_table(conn): + """Create a junction table to link documents and responses""" + try: + sql = '''CREATE TABLE IF NOT EXISTS docs_response ( + id integer PRIMARY KEY, + id_response integer NOT NULL, + id_document integer NOT NULL, + date_p TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (id_response) REFERENCES responses (id), + FOREIGN KEY (id_document) REFERENCES documents (id) + );''' + conn.cursor().execute(sql) + conn.commit() + except sqlite3.Error as e: + print(e) + +# Response table functions +def insert_response(conn, answer, question, id_user=None): + """Insert a new response into the database""" + try: + sql = '''INSERT INTO responses(answer, question, id_user) VALUES(?,?,?)''' + cur = conn.cursor() + cur.execute(sql, (answer, question, id_user)) + conn.commit() + return cur.lastrowid + except sqlite3.Error as e: + print(f"Error inserting response: {e}") + return None + +def get_response(conn, response_id): + """Retrieve a response by ID""" + try: + cur = conn.cursor() + cur.execute("SELECT * FROM responses WHERE id = ?", (response_id,)) + return cur.fetchone() + except sqlite3.Error as e: + print(f"Error retrieving response: {e}") + return None + +def get_responses_by_user(conn, user_id): + """Retrieve all responses for a specific user""" + try: + cur = conn.cursor() + cur.execute("SELECT * FROM responses WHERE id_user = ? ORDER BY created_at DESC", (user_id,)) + return cur.fetchall() + except sqlite3.Error as e: + print(f"Error retrieving user responses: {e}") + return [] + +def update_response(conn, response_id, answer=None, question=None): + """Update a response""" + try: + updates = [] + params = [] + + if answer is not None: + updates.append("answer = ?") + params.append(answer) + if question is not None: + updates.append("question = ?") + params.append(question) + + if not updates: + return False + + params.append(response_id) + sql = f"UPDATE responses SET {', '.join(updates)} WHERE id = ?" + + cur = conn.cursor() + cur.execute(sql, params) + conn.commit() + return True + except sqlite3.Error as e: + print(f"Error updating response: {e}") + return False + +def delete_response(conn, response_id): + """Delete a response and its associated document links""" + try: + cur = conn.cursor() + # First delete associated docs_response entries + cur.execute("DELETE FROM docs_response WHERE id_response = ?", (response_id,)) + # Then delete the response + cur.execute("DELETE FROM responses WHERE id = ?", (response_id,)) + conn.commit() + return True + except sqlite3.Error as e: + print(f"Error deleting response: {e}") + return False + +# Document table functions +def insert_document(conn, source, metadata=None): + """Insert a new document into the database""" + try: + sql = '''INSERT INTO documents(source, metadata) VALUES(?,?)''' + cur = conn.cursor() + cur.execute(sql, (source, metadata)) + conn.commit() + return cur.lastrowid + except sqlite3.Error as e: + print(f"Error inserting document: {e}") + return None + +def get_document(conn, document_id): + """Retrieve a document by ID""" + try: + cur = conn.cursor() + cur.execute("SELECT * FROM documents WHERE id = ?", (document_id,)) + return cur.fetchone() + except sqlite3.Error as e: + print(f"Error retrieving document: {e}") + return None + +def get_document_by_source(conn, source): + """Retrieve a document by source""" + try: + cur = conn.cursor() + cur.execute("SELECT * FROM documents WHERE source = ?", (source,)) + return cur.fetchone() + except sqlite3.Error as e: + print(f"Error retrieving document by source: {e}") + return None + +def get_all_documents(conn): + """Retrieve all documents""" + try: + cur = conn.cursor() + cur.execute("SELECT * FROM documents ORDER BY created_at DESC") + return cur.fetchall() + except sqlite3.Error as e: + print(f"Error retrieving documents: {e}") + return [] + +def update_document(conn, document_id, source=None, metadata=None): + """Update a document""" + try: + updates = [] + params = [] + + if source is not None: + updates.append("source = ?") + params.append(source) + if metadata is not None: + updates.append("metadata = ?") + params.append(metadata) + + if not updates: + return False + + params.append(document_id) + sql = f"UPDATE documents SET {', '.join(updates)} WHERE id = ?" + + cur = conn.cursor() + cur.execute(sql, params) + conn.commit() + return True + except sqlite3.Error as e: + print(f"Error updating document: {e}") + return False + +def delete_document(conn, document_id): + """Delete a document and its associated response links""" + try: + cur = conn.cursor() + # First delete associated docs_response entries + cur.execute("DELETE FROM docs_response WHERE id_document = ?", (document_id,)) + # Then delete the document + cur.execute("DELETE FROM documents WHERE id = ?", (document_id,)) + conn.commit() + return True + except sqlite3.Error as e: + print(f"Error deleting document: {e}") + return False + +# docs_response junction table functions +def link_document_response(conn, id_response, id_document): + """Link a document to a response""" + try: + sql = '''INSERT INTO docs_response(id_response, id_document) VALUES(?,?)''' + cur = conn.cursor() + cur.execute(sql, (id_response, id_document)) + conn.commit() + return cur.lastrowid + except sqlite3.Error as e: + print(f"Error linking document to response: {e}") + return None + +def get_documents_for_response(conn, response_id): + """Get all documents linked to a specific response""" + try: + cur = conn.cursor() + sql = '''SELECT d.* FROM documents d + JOIN docs_response dr ON d.id = dr.id_document + WHERE dr.id_response = ?''' + cur.execute(sql, (response_id,)) + return cur.fetchall() + except sqlite3.Error as e: + print(f"Error retrieving documents for response: {e}") + return [] + +def get_responses_for_document(conn, document_id): + """Get all responses linked to a specific document""" + try: + cur = conn.cursor() + sql = '''SELECT r.* FROM responses r + JOIN docs_response dr ON r.id = dr.id_response + WHERE dr.id_document = ?''' + cur.execute(sql, (document_id,)) + return cur.fetchall() + except sqlite3.Error as e: + print(f"Error retrieving responses for document: {e}") + return [] + +def unlink_document_response(conn, id_response, id_document): + """Remove the link between a document and response""" + try: + cur = conn.cursor() + cur.execute("DELETE FROM docs_response WHERE id_response = ? AND id_document = ?", + (id_response, id_document)) + conn.commit() + return True + except sqlite3.Error as e: + print(f"Error unlinking document from response: {e}") + return False + +def get_all_document_response_links(conn): + """Get all document-response links with details""" + try: + cur = conn.cursor() + sql = '''SELECT dr.id, dr.date_p, r.question, r.answer, d.source, d.metadata + FROM docs_response dr + JOIN responses r ON dr.id_response = r.id + JOIN documents d ON dr.id_document = d.id + ORDER BY dr.date_p DESC''' + cur.execute(sql) + return cur.fetchall() + except sqlite3.Error as e: + print(f"Error retrieving document-response links: {e}") + return [] + +# Utility function to create all tables +def create_all_tables(conn): + """Create all tables in the database""" + create_table(conn) # users table + create_prompts_table(conn) + create_response_table(conn) + create_document_table(conn) + create_docs_response_table(conn) + +def get_all_responses_with_documents(conn): + """Get all responses with their associated documents""" + try: + cur = conn.cursor() + sql = '''SELECT r.id, r.question, r.answer, r.created_at, r.id_user, + GROUP_CONCAT(d.id || '|||' || d.source || '|||' || COALESCE(d.metadata, '')) as documents + FROM responses r + LEFT JOIN docs_response dr ON r.id = dr.id_response + LEFT JOIN documents d ON dr.id_document = d.id + GROUP BY r.id, r.question, r.answer, r.created_at, r.id_user + ORDER BY r.created_at DESC''' + cur.execute(sql) + results = cur.fetchall() + + # Process results to separate documents + processed_results = [] + for row in results: + response_id, question, answer, created_at, id_user, documents_str = row + documents = [] + + if documents_str: + doc_parts = documents_str.split(',') + for doc_part in doc_parts: + if '|||' in doc_part: + parts = doc_part.split('|||') + if len(parts) >= 3: + doc_id, source, metadata = parts[0], parts[1], parts[2] + documents.append({ + 'id': doc_id, + 'source': source, + 'metadata': metadata + }) + + processed_results.append({ + 'id': response_id, + 'question': question, + 'answer': answer, + 'created_at': created_at, + 'id_user': id_user, + 'documents': documents + }) + + return processed_results + except sqlite3.Error as e: + print(f"Error retrieving responses with documents: {e}") + return [] + +def migrate_text_file_to_database(conn, responses_file="responses.txt"): + """Migrate existing responses.txt data to database""" + if not os.path.exists(responses_file): + return 0 + + migrated_count = 0 + try: + with open(responses_file, 'r', encoding='utf-8') as f: + lines = f.read().strip().split('\n') + + for line in lines: + if line.strip(): + try: + # Parse the response line (similar to admin_responses.py logic) + # Extract the input (question) + input_match = re.search(r"'input': '((?:[^'\\]|\\.)*)'", line) + question = input_match.group(1) if input_match else "Migrated question" + question = question.replace("\\'", "'").replace("\\n", "\n") + + # Extract the answer + answer_match = re.search(r"'answer': '((?:[^'\\]|\\.)*)'(?=\})", line) + answer = answer_match.group(1) if answer_match else "Migrated answer" + answer = answer.replace("\\'", "'").replace("\\n", "\n") + + # Check if this response already exists to avoid duplicates + cur = conn.cursor() + cur.execute("SELECT id FROM responses WHERE question = ? AND answer = ? LIMIT 1", (question, answer)) + if cur.fetchone(): + continue # Skip if already exists + + # Insert response + response_id = insert_response(conn, answer, question, None) + + if response_id: + # Extract and process documents + doc_pattern = r"Document\(id='([^']*)', metadata=\{([^}]*)\}, page_content='((?:[^'\\]|\\.)*)'\)" + doc_matches = re.findall(doc_pattern, line) + + for doc_match in doc_matches: + doc_id, metadata_str, content = doc_match + + # Parse metadata + metadata = {'id': doc_id} + metadata_pairs = re.findall(r"'([^']*)': '([^']*)'", metadata_str) + for key, value in metadata_pairs: + metadata[key] = value + metadata['content'] = content.replace("\\'", "'").replace("\\n", "\n") + + source = metadata.get('source', doc_id) + + # Check if document exists + existing_doc = get_document_by_source(conn, source) + if existing_doc: + db_doc_id = existing_doc[0] + else: + # Insert new document + metadata_json = json.dumps(metadata) + db_doc_id = insert_document(conn, source, metadata_json) + + # Link document to response + if db_doc_id: + link_document_response(conn, response_id, db_doc_id) + + migrated_count += 1 + + except Exception as e: + print(f"Error migrating line: {e}") + continue + + except Exception as e: + print(f"Error reading migration file: {e}") + + return migrated_count + +if __name__ == "__main__": + conn = create_connection() + insert_user(conn,"dev","dev@example.com","admin") \ No newline at end of file diff --git a/src/mvt/pages/admin_responses.py b/src/mvt/pages/admin_responses.py index 3f6800d..c8fcc40 100644 --- a/src/mvt/pages/admin_responses.py +++ b/src/mvt/pages/admin_responses.py @@ -1,8 +1,10 @@ import streamlit as st import os import re +import json from menu import menu_with_redirect from utils import load_yaml_file_with_db_prompts +from database import create_connection, get_all_document_response_links, get_documents_for_response, get_response, get_all_responses_with_documents, migrate_text_file_to_database # Redirect to app.py if not logged in, otherwise show the navigation menu menu_with_redirect() @@ -15,8 +17,49 @@ st.markdown("# Admin Responses") st.markdown("View previously asked user questions, AI-generated answers, and source documents.") -def parse_responses_file(): - """Parse the responses.txt file and return a list of question-answer pairs with context""" +def get_responses_from_database(): + """Get responses from database instead of parsing text file""" + conn = create_connection() + if not conn: + return [] + + try: + responses = [] + db_responses = get_all_responses_with_documents(conn) + + for db_response in db_responses: + # Convert database format to expected format + context_docs = [] + for doc in db_response.get('documents', []): + try: + metadata_dict = json.loads(doc['metadata']) if doc['metadata'] else {} + except: + metadata_dict = {} + + context_docs.append({ + 'id': doc['source'], + 'metadata': metadata_dict, + 'page_content': metadata_dict.get('content', 'No content available'), + 'source': doc['source'] + }) + + responses.append({ + 'input': db_response['question'], + 'answer': db_response['answer'], + 'context': context_docs, + 'created_at': db_response['created_at'] + }) + + return responses + + except Exception as e: + st.error(f"Error loading responses from database: {e}") + return [] + finally: + conn.close() + +def get_responses_fallback(): + """Fallback to parse responses from text file if database is empty""" responses_file = "responses.txt" if not os.path.exists(responses_file): return [] @@ -90,6 +133,10 @@ def display_source_document(doc, index): # Display content content = doc.get('page_content', '') + if not content or content == 'No content available': + # Try to get content from metadata if not in page_content + content = doc.get('metadata', {}).get('content', 'No content available') + st.markdown("**Content:**") st.text(content) @@ -97,18 +144,95 @@ def display_source_document(doc, index): st.markdown("**Metadata:**") metadata = doc.get('metadata', {}) + # Show source prominently + source = doc.get('source', metadata.get('source', 'Unknown')) + st.write(f"- **Source:** {source}") + + # Show other metadata excluding content and source for key, value in metadata.items(): - st.write(f"- **{key.title()}:** {value}") + if key not in ['content', 'source']: + st.write(f"- **{key.title()}:** {value}") + + # Show database info if available + if 'id' in doc and doc['id'] != source: + st.write(f"- **Document ID:** {doc['id']}") # Load and display responses config_data = load_yaml_file_with_db_prompts("config.yaml") k_value = config_data.get("nr_retrieved_documents") print(k_value) -responses = parse_responses_file() +# Try to get responses from database first, fallback to text file +responses = get_responses_from_database() +if not responses: + # Try to migrate from text file if database is empty + conn = create_connection() + if conn: + migrated_count = migrate_text_file_to_database(conn) + conn.close() + if migrated_count > 0: + st.success(f"Migrated {migrated_count} responses from text file to database!") + responses = get_responses_from_database() # Try again after migration + + if not responses: + st.info("No responses found in database, falling back to text file...") + responses = get_responses_fallback() +# Add a utility section for admins +if responses and st.session_state.user_type == "admin": + with st.sidebar: + st.markdown("### 🛠️ Admin Utilities") + + # Check if text file exists and offer migration + if os.path.exists("responses.txt"): + st.info("responses.txt file detected") + if st.button("🔄 Re-run Migration"): + conn = create_connection() + if conn: + migrated_count = migrate_text_file_to_database(conn) + conn.close() + if migrated_count > 0: + st.success(f"Migrated {migrated_count} additional responses!") + st.rerun() + else: + st.info("No new responses to migrate") + + # Option to backup and remove text file after successful migration + db_responses = get_responses_from_database() + if db_responses: + if st.button("🗑️ Archive responses.txt", help="Move responses.txt to responses_backup.txt"): + try: + import shutil + shutil.move("responses.txt", "responses_backup.txt") + st.success("Text file archived successfully!") + st.rerun() + except Exception as e: + st.error(f"Error archiving file: {e}") + + # Database stats + conn = create_connection() + if conn: + try: + cur = conn.cursor() + cur.execute("SELECT COUNT(*) FROM responses") + response_count = cur.fetchone()[0] + cur.execute("SELECT COUNT(*) FROM documents") + document_count = cur.fetchone()[0] + cur.execute("SELECT COUNT(*) FROM docs_response") + link_count = cur.fetchone()[0] + + st.markdown("### 📈 Database Stats") + st.write(f"Responses: {response_count}") + st.write(f"Documents: {document_count}") + st.write(f"Links: {link_count}") + except Exception as e: + st.error(f"Error getting stats: {e}") + finally: + conn.close() + if responses: - st.markdown(f"### Overview ({len(responses)} responses found)") + source_info = "📊 **Database**" if get_responses_from_database() else "📄 **Text File**" + st.markdown(f"### Overview ({len(responses)} responses found) - Source: {source_info}") # Search functionality search_term = st.text_input("Search questions or answers:", placeholder="Enter search term...") diff --git a/src/mvt/pages/chatbot.py b/src/mvt/pages/chatbot.py index 23c425b..96b6d36 100644 --- a/src/mvt/pages/chatbot.py +++ b/src/mvt/pages/chatbot.py @@ -4,10 +4,18 @@ from menu import menu_with_redirect from chat_history import init_db, save_message, get_messages from query_rewriting import query_rewriting_llm +from database import create_connection, create_all_tables, insert_response, insert_document, link_document_response, get_document_by_source, get_user +import json # Initialize DB init_db() +# Initialize main database and create tables +conn = create_connection() +if conn: + create_all_tables(conn) + conn.close() + # Redirect to app.py if not logged in menu_with_redirect() @@ -67,7 +75,50 @@ # Use rewritten query or original prompt based on config query = rewritten_query if config_data.get("use_query_rewriting", True) else prompt response = rag_chain.invoke({"input": query}) - # save response in a text file + + # Save response to database instead of text file + conn = create_connection() + if conn: + try: + # Get user ID if logged in + user_id = None + if hasattr(st.session_state, 'email') and st.session_state.email: + user = get_user(conn, st.session_state.email) + user_id = user[0] if user else None + + # Insert response to database + response_id = insert_response(conn, response["answer"], query, user_id) + + # Process and save source documents + if response_id and "context" in response: + for doc in response["context"]: + # Extract source from document metadata + source = "" + if hasattr(doc, 'metadata') and doc.metadata: + source = doc.metadata.get('source', '') + elif hasattr(doc, 'page_content'): + source = f"content_{hash(doc.page_content) % 10000}" + + if source: + # Check if document already exists + existing_doc = get_document_by_source(conn, source) + if existing_doc: + doc_id = existing_doc[0] + else: + # Insert new document with metadata as JSON + metadata_json = json.dumps(doc.metadata if hasattr(doc, 'metadata') and doc.metadata else {}) + doc_id = insert_document(conn, source, metadata_json) + + # Link document to response + if doc_id: + link_document_response(conn, response_id, doc_id) + + except Exception as e: + st.error(f"Error saving to database: {e}") + finally: + conn.close() + + # Keep text file backup for now (can be removed later) print(response, file=open('responses.txt', 'a', encoding='utf-8')) st.markdown(response["answer"])