-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgmail_ingestnew.py
More file actions
131 lines (106 loc) · 4.97 KB
/
gmail_ingestnew.py
File metadata and controls
131 lines (106 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import ssl
from imapclient import IMAPClient
from elasticsearch import Elasticsearch, helpers
from datetime import datetime, timedelta
import json
import time
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# --- 1. CREDENTIALS (loaded from .env) ---
GMAIL_USER = os.getenv("GMAIL_EMAIL")
GMAIL_PASS = os.getenv("GMAIL_PASSWORD")
ELASTIC_CLOUD_ID = os.getenv("ELASTIC_CLOUD_ID")
ELASTIC_API_KEY = os.getenv("ELASTIC_API_KEY")
# --- 2. CONFIGURATION ---
INDEX_NAME = "school-agent-final-data"
LAST_CHECK_FILE = "last_check.txt"
VECTORIZATION_PIPELINE = "school-rag-vectorizer" # The pipeline created in Kibana
# --- 3. ELASTICSEARCH CLIENT INITIALIZATION ---
try:
es = Elasticsearch(
cloud_id=ELASTIC_CLOUD_ID,
api_key=ELASTIC_API_KEY
)
es.info()
print("Elasticsearch client initialized successfully.")
except Exception as e:
print(f"FATAL ELASTICSEARCH ERROR: Could not connect to Elastic Cloud. {e}")
exit()
# --- 4. INGESTION UTILITIES ---
def get_last_check_date():
"""Reads the last successful sync date for incremental ingestion."""
try:
with open(LAST_CHECK_FILE, 'r') as f:
date_str = f.read().strip()
return datetime.strptime(date_str, '%Y-%m-%d') - timedelta(days=1)
except (FileNotFoundError, ValueError):
return datetime.now() - timedelta(days=7)
def update_last_check_date():
"""Writes the current run date to the file."""
with open(LAST_CHECK_FILE, 'w') as f:
f.write(datetime.now().strftime('%Y-%m-%d'))
def ingest_emails():
last_check_date = get_last_check_date()
# IMAP search requires date format: "07-Nov-2025"
search_date = last_check_date.strftime("%d-%b-%Y")
context = ssl.create_default_context()
try:
with IMAPClient('imap.gmail.com', port=993, ssl=True, ssl_context=context) as client:
client.login(GMAIL_USER, GMAIL_PASS)
client.select_folder('INBOX')
messages = client.search([b'SINCE', search_date.encode('utf-8')])
if not messages:
print(f"No new emails found since {search_date}.")
return
print(f"Found {len(messages)} emails for processing. Starting ingestion...")
fetch_items = [b'BODY[]', b'ENVELOPE', b'BODY[HEADER.FIELDS (FROM)]']
response = client.fetch(messages, fetch_items)
actions = []
for msg_id, data in response.items():
raw_from_header = data.get(b'BODY[HEADER.FIELDS (FROM)]', b'').decode('utf-8', errors='ignore')
# --- FILTER: STRICTLY ENFORCE "via ParentSquare" ---
if 'via ParentSquare' not in raw_from_header:
continue
# --- EXTRACT METADATA ---
envelope = data[b'ENVELOPE']
try:
sender_info = envelope.from_[0]
sender_address = sender_info.address.decode('utf-8') if sender_info.address else 'unknown@sender.com'
except Exception:
sender_address = 'unknown@sender.com'
subject = envelope.subject.decode('utf-8') if envelope.subject else 'No Subject'
raw_body = data[b'BODY[]'].decode('utf-8', errors='ignore')
# Construct the document for Elasticsearch
doc = {
'@timestamp': datetime.now().isoformat(),
'subject': subject,
'sender_address': sender_address,
'body_full': raw_body,
}
# --- THE CORRECTED ACTIONS.APPEND BLOCK (Inside the loop) ---
actions.append({
'_op_type': 'index', # Defines the operation type
'_index': INDEX_NAME,
'_id': str(msg_id),
'_source': doc, # The document data
})
# --- END OF 'for' LOOP ---
# --- BULK INDEXING (Robust Error Handling) ---
if actions:
try:
success, errors = helpers.bulk(es, actions, raise_on_error=True)
print(f"Successfully indexed {success} documents. Errors: {len(errors)}")
except helpers.BulkIndexError as e:
print("\n--- ELASTICSEARCH BULK ERRORS FOUND ---")
if hasattr(e, 'errors'):
print(json.dumps(e.errors, indent=2))
print("---------------------------------------")
# Re-raise error to halt system for review
raise SystemExit(f"FATAL INDEXING ERROR: {e.args[0]}") from e
update_last_check_date() # Update history only if indexing was attempted
except Exception as e:
print(f"FATAL IMAP CONNECTION ERROR: {e}")
if __name__ == "__main__":
ingest_emails()