This repository was archived by the owner on May 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathemail_handler.py
More file actions
78 lines (65 loc) · 2.84 KB
/
Copy pathemail_handler.py
File metadata and controls
78 lines (65 loc) · 2.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from googleapiclient.errors import HttpError
import base64
import logging
from email.mime.text import MIMEText
# Set up logging
logging.basicConfig(filename='app.log', filemode='a', format='%(name)s - %(levelname)s - %(message)s', level=logging.INFO)
def get_unread_emails(service):
try:
result = service.users().messages().list(userId='me', q="is:unread").execute()
messages = result.get('messages', [])
return messages
except HttpError as error:
logging.error(f'An error occurred: {error}')
return None
def parse_email_content(service, email):
try:
message = service.users().messages().get(userId='me', id=email['id']).execute()
payload = message['payload']
headers = payload.get("headers")
parts = payload.get("parts")
data = parts[0]
data_bytes = data['body']['data']
decoded_data = base64.urlsafe_b64decode(data_bytes)
str_data = decoded_data.decode('utf-8')
# Extract the 'To', 'From', and 'Subject' fields from headers
email_data = {
'Body': str_data,
'To': next(header['value'] for header in headers if header['name'] == 'From'),
'From': next(header['value'] for header in headers if header['name'] == 'To'),
'Subject': next(header['value'] for header in headers if header['name'] == 'Subject'),
'ThreadId': message['threadId'], # Add the ThreadId
'Id': email['id'] # Add the email Id
}
if email_data is None:
logging.error('Failed to parse email content.')
return email_data
except HttpError as error:
logging.error(f'An error occurred: {error}')
return None
def mark_read(service, user_id, email_id):
try:
service.users().messages().modify(userId=user_id, id=email_id, body={'removeLabelIds': ['UNREAD']}).execute()
logging.info(f'Email id: {email_id} marked as read.')
except HttpError as error:
logging.error(f'An error occurred: {error}')
def apply_label(service, user_id, email_id, label_id):
try:
# Create a dictionary with the labelIds
label_object = {'addLabelIds': [label_id]}
# Use the Gmail API to apply the label
service.users().messages().modify(userId=user_id, id=email_id, body=label_object).execute()
logging.info(f"Label applied to email {email_id}")
except Exception as e:
logging.error(f"An error occurred: {e}")
def get_label_id(service, user_id, label_name):
try:
response = service.users().labels().list(userId=user_id).execute()
labels = response['labels']
for label in labels:
if label['name'] == label_name:
return label['id']
return None
except Exception as e:
logging.error(f'An error occurred: {e}')
return None