forked from marlonluo2018/outlook-mcp-server
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoutlook_graph_api.py
More file actions
320 lines (257 loc) · 12.9 KB
/
outlook_graph_api.py
File metadata and controls
320 lines (257 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import requests
import json
from datetime import datetime, timedelta
class OutlookGraphAPI:
def __init__(self, access_token):
self.access_token = access_token
self.base_url = "https://graph.microsoft.com/v1.0"
self.headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
def list_recent_emails(self, days=7, top=30):
date_filter = (datetime.utcnow() - timedelta(days=days)).strftime('%Y-%m-%dT%H:%M:%SZ')
url = f'{self.base_url}/me/messages'
params = {
'$filter': f'receivedDateTime ge {date_filter}',
'$top': top,
'$orderby': 'receivedDateTime desc',
'$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,hasAttachments'
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
return response.json().get('value', [])
else:
raise Exception(f'获取邮件失败: {response.status_code} - {response.text}')
def search_emails_by_subject(self, search_term, days=7):
date_filter = (datetime.utcnow() - timedelta(days=days)).strftime('%Y-%m-%dT%H:%M:%SZ')
url = f'{self.base_url}/me/messages'
params = {
'$filter': f"contains(subject, '{search_term}') and receivedDateTime ge {date_filter}",
'$orderby': 'receivedDateTime desc',
'$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,hasAttachments'
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
return response.json().get('value', [])
else:
raise Exception(f'搜索邮件失败: {response.status_code} - {response.text}')
def search_emails_by_sender(self, sender_name, days=7):
date_filter = (datetime.utcnow() - timedelta(days=days)).strftime('%Y-%m-%dT%H:%M:%SZ')
url = f'{self.base_url}/me/messages'
params = {
'$filter': f"contains(from/emailAddress/name, '{sender_name}') and receivedDateTime ge {date_filter}",
'$orderby': 'receivedDateTime desc',
'$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,hasAttachments'
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
return response.json().get('value', [])
else:
raise Exception(f'搜索邮件失败: {response.status_code} - {response.text}')
def search_emails_by_body(self, search_term, days=7):
date_filter = (datetime.utcnow() - timedelta(days=days)).strftime('%Y-%m-%dT%H:%M:%SZ')
url = f'{self.base_url}/me/messages'
params = {
'$search': f'"{search_term}"',
'$filter': f'receivedDateTime ge {date_filter}',
'$orderby': 'receivedDateTime desc',
'$select': 'id,subject,from,toRecipients,ccRecipients,receivedDateTime,hasAttachments'
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
return response.json().get('value', [])
else:
raise Exception(f'搜索邮件失败: {response.status_code} - {response.text}')
def get_email_details(self, email_id):
url = f'{self.base_url}/me/messages/{email_id}'
params = {
'$select': 'id,subject,from,toRecipients,ccRecipients,bccRecipients,receivedDateTime,body,hasAttachments,attachments'
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f'获取邮件详情失败: {response.status_code} - {response.text}')
def compose_email(self, to_recipients, subject, body, cc_recipients=None, bcc_recipients=None):
email_data = {
"message": {
"subject": subject,
"body": {
"contentType": "HTML",
"content": body
},
"toRecipients": [
{"emailAddress": {"address": email}}
for email in to_recipients
]
}
}
if cc_recipients:
email_data["message"]["ccRecipients"] = [
{"emailAddress": {"address": email}}
for email in cc_recipients
]
if bcc_recipients:
email_data["message"]["bccRecipients"] = [
{"emailAddress": {"address": email}}
for email in bcc_recipients
]
url = f'{self.base_url}/me/sendMail'
response = requests.post(url, headers=self.headers, json=email_data)
if response.status_code == 202:
return {"status": "success", "message": "邮件发送成功"}
else:
raise Exception(f'发送邮件失败: {response.status_code} - {response.text}')
def reply_to_email(self, email_id, reply_text, reply_all=False):
endpoint = 'replyAll' if reply_all else 'reply'
url = f'{self.base_url}/me/messages/{email_id}/{endpoint}'
reply_data = {
"message": {
"body": {
"contentType": "HTML",
"content": reply_text
}
}
}
response = requests.post(url, headers=self.headers, json=reply_data)
if response.status_code == 202:
return {"status": "success", "message": "回复成功"}
else:
raise Exception(f'回复邮件失败: {response.status_code} - {response.text}')
def forward_email(self, email_id, to_recipients, comment=""):
url = f'{self.base_url}/me/messages/{email_id}/forward'
forward_data = {
"message": {
"toRecipients": [
{"emailAddress": {"address": email}}
for email in to_recipients
]
}
}
if comment:
forward_data["message"]["body"] = {
"contentType": "HTML",
"content": comment
}
response = requests.post(url, headers=self.headers, json=forward_data)
if response.status_code == 202:
return {"status": "success", "message": "转发成功"}
else:
raise Exception(f'转发邮件失败: {response.status_code} - {response.text}')
def list_folders(self):
url = f'{self.base_url}/me/mailFolders'
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json().get('value', [])
else:
raise Exception(f'获取文件夹失败: {response.status_code} - {response.text}')
def create_folder(self, folder_name, parent_folder_id=None):
if parent_folder_id:
url = f'{self.base_url}/me/mailFolders/{parent_folder_id}/childFolders'
else:
url = f'{self.base_url}/me/mailFolders'
folder_data = {
"displayName": folder_name
}
response = requests.post(url, headers=self.headers, json=folder_data)
if response.status_code == 201:
return response.json()
else:
raise Exception(f'创建文件夹失败: {response.status_code} - {response.text}')
def move_email(self, email_id, destination_folder_id):
url = f'{self.base_url}/me/messages/{email_id}/move'
move_data = {
"destinationId": destination_folder_id
}
response = requests.post(url, headers=self.headers, json=move_data)
if response.status_code == 201:
return response.json()
else:
raise Exception(f'移动邮件失败: {response.status_code} - {response.text}')
def delete_email(self, email_id):
url = f'{self.base_url}/me/messages/{email_id}'
response = requests.delete(url, headers=self.headers)
if response.status_code == 204:
return {"status": "success", "message": "邮件删除成功"}
else:
raise Exception(f'删除邮件失败: {response.status_code} - {response.text}')
def batch_forward_emails(self, email_id, recipient_list, batch_size=500, custom_text=""):
results = {
"total_recipients": len(recipient_list),
"batches": [],
"successful": 0,
"failed": 0
}
for i in range(0, len(recipient_list), batch_size):
batch = recipient_list[i:i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (len(recipient_list) + batch_size - 1) // batch_size
try:
self.forward_email(email_id, batch, custom_text)
results["batches"].append({
"batch": batch_num,
"total_batches": total_batches,
"recipients": len(batch),
"status": "success"
})
results["successful"] += len(batch)
print(f'批次 {batch_num}/{total_batches} 发送成功 ({len(batch)} 封邮件)')
except Exception as e:
results["batches"].append({
"batch": batch_num,
"total_batches": total_batches,
"recipients": len(batch),
"status": "failed",
"error": str(e)
})
results["failed"] += len(batch)
print(f'批次 {batch_num}/{total_batches} 发送失败: {str(e)}')
return results
if __name__ == '__main__':
print('=== Outlook Graph API 跨平台演示 ===\n')
print('请先运行 python graph_api_auth_local.py 获取访问令牌\n')
access_token = input('请输入访问令牌 (或按Enter使用演示模式): ').strip()
if not access_token:
print('\n演示模式: 展示API功能说明\n')
print('可用功能:')
print('1. list_recent_emails(days=7, top=30) - 获取最近邮件')
print('2. search_emails_by_subject(search_term, days=7) - 按主题搜索')
print('3. search_emails_by_sender(sender_name, days=7) - 按发件人搜索')
print('4. search_emails_by_body(search_term, days=7) - 按正文搜索')
print('5. get_email_details(email_id) - 获取邮件详情')
print('6. compose_email(to_recipients, subject, body) - 发送邮件')
print('7. reply_to_email(email_id, reply_text, reply_all=False) - 回复邮件')
print('8. forward_email(email_id, to_recipients, comment="") - 转发邮件')
print('9. list_folders() - 获取文件夹列表')
print('10. create_folder(folder_name, parent_folder_id=None) - 创建文件夹')
print('11. move_email(email_id, destination_folder_id) - 移动邮件')
print('12. delete_email(email_id) - 删除邮件')
print('13. batch_forward_emails(email_id, recipient_list, batch_size=500) - 批量转发')
print('\n✓ 这些功能可以在Windows、macOS、Linux上运行!')
print('✓ 无需Win32com,仅需网络连接和有效的访问令牌')
exit(0)
try:
outlook = OutlookGraphAPI(access_token)
print('\n1. 获取最近5封邮件...')
emails = outlook.list_recent_emails(days=7, top=5)
print(f'找到 {len(emails)} 封邮件\n')
for i, email in enumerate(emails, 1):
print(f'{i}. {email.get("subject", "(无主题)")}')
print(f' 发件人: {email.get("from", {}).get("emailAddress", {}).get("address", "未知")}')
print(f' 时间: {email.get("receivedDateTime", "未知时间")[:19].replace("T", " ")}')
print()
print('\n2. 获取文件夹列表...')
folders = outlook.list_folders()
print(f'找到 {len(folders)} 个文件夹\n')
for folder in folders:
print(f'- {folder.get("displayName", "未知")} (ID: {folder.get("id", "未知")})')
print('\n3. 测试搜索功能...')
search_results = outlook.search_emails_by_subject('Red Hat', days=7)
print(f'找到 {len(search_results)} 封包含"Red Hat"的邮件\n')
print('✓ 所有测试通过!')
print('\n提示: 这个API可以在Windows、macOS、Linux上运行!')
except Exception as e:
print(f'\n错误: {str(e)}')
print('\n如果令牌过期,请重新运行: python graph_api_auth_local.py')
exit(1)