-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathGmailToGCS.py
More file actions
78 lines (61 loc) · 2.44 KB
/
GmailToGCS.py
File metadata and controls
78 lines (61 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import os
from os import environ
from datetime import timedelta
import getpass, imaplib
import sys
import string
class ExtractAttachment(BaseOperator):
"""
Extract data from Gmail into GCS
"""
@apply_defaults
def __init__(
self,
inbox_name,
*args, **kwargs):
super(ExtractAttachment, self).__init__(*args, **kwargs)
self.inbox_name = inbox_name
self.file_path = 'filepath_to_save_CSV'
def __extract_email_attachment(self, execution_date):
userName = 'your username'
passwd = 'your password'
imapSession = imaplib.IMAP4_SSL('imap.gmail.com')
typ, accountDetails = imapSession.login(userName, passwd)
if typ != 'OK':
print('Not able to sign in!')
imapSession.select(self.inbox_name)
typ, data = imapSession.search(None, 'Unseen')
if typ != 'OK':
print('Error searching Inbox.')
# Iterating over all emails
for msgId in data[0].split():
typ, messageParts = imapSession.fetch(msgId, '(RFC822)')
if typ != 'OK':
print('Error fetching mail.')
raw_email = messageParts[0][1]
raw_email_string = raw_email.decode('utf-8')
email_message = email.message_from_string(raw_email_string)
for part in email_message.walk():
if part.get_content_maintype() == 'multipart':
# print part.as_string()
continue
if part.get('Content-Disposition') is None:
# print part.as_string()
continue
fileName = part.get_filename()
if bool(fileName):
filePath = self.file_path + fileName
print(filePath)
if not os.path.isfile(filePath) :
print(fileName)
fp = open(filePath, 'wb')
fp.write(part.get_payload(decode=True))
fp.close()
imapSession.uid('STORE',msgId, '+FLAGS', '\SEEN')
imapSession.close()
imapSession.logout()
def execute(self, context):
execution_date = (context.get('execution_date')
self.__extract_email_attachment(execution_date)