-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimap.py
More file actions
153 lines (124 loc) · 4.74 KB
/
Copy pathimap.py
File metadata and controls
153 lines (124 loc) · 4.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
IMAP connection and helper functions for the S/MIME decryption tool.
Uses ``imapclient`` for all IMAP interaction — folder listing, message
fetching, flag management, and APPEND operations. The underlying
``imaplib`` response parsing is handled by ``imapclient`` so no manual
regex extraction is needed.
"""
from __future__ import annotations
import ssl
import sys
from imapclient import IMAPClient
# ---------------------------------------------------------------------------
# Connection
# ---------------------------------------------------------------------------
def connect_to_server(host: str, port: int, quiet: bool = False) -> IMAPClient:
"""Connect to the IMAP server with STARTTLS and return the client."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
if not quiet:
print(f"Connecting to {host}:{port}...")
client = IMAPClient(host, port, ssl=False)
if not quiet:
print("Upgrading connection with STARTTLS...")
client.starttls(ssl_context=ctx)
return client
def login(conn: IMAPClient, user: str, password: str, quiet: bool = False):
"""Authenticate against the IMAP server. Exits on failure."""
try:
if not quiet:
print(f"Logging in as {user}...")
conn.login(user, password)
if not quiet:
print("Login successful.")
except Exception as exc:
print(f"ERROR: Login failed: {exc}", file=sys.stderr)
try:
conn.logout()
except Exception:
pass
sys.exit(1)
# ---------------------------------------------------------------------------
# Folder helpers
# ---------------------------------------------------------------------------
def get_all_folders(conn: IMAPClient):
"""
List all folders using the LIST command.
Returns a list of ``(flags, delimiter, name)`` tuples where *flags*
is a tuple of bytes (e.g. ``(b'\\\\HasChildren',)``), *delimiter* is
a string, and *name* is a decoded string.
"""
return conn.list_folders()
def select_folder(conn: IMAPClient, folder_name: str, readonly: bool = False):
"""Select a folder. Returns the message count or None on failure."""
try:
result = conn.select_folder(folder_name, readonly=readonly)
return result.get(b"EXISTS", 0)
except Exception:
return None
def ensure_folder_exists(conn: IMAPClient, folder_name: str) -> bool:
"""Create a folder if it does not already exist. Returns True on success."""
try:
conn.create_folder(folder_name)
return True
except Exception:
pass
# Folder might already exist — try subscribing
try:
conn.subscribe_folder(folder_name)
return True
except Exception:
pass
return True
# ---------------------------------------------------------------------------
# Batch operations
# ---------------------------------------------------------------------------
def batch_store_deleted(conn: IMAPClient, folder_name: str, uids: list[int],
dbg=None):
"""
SELECT *folder_name*, STORE ``\\Deleted`` on all *uids* in a single
command, then UNSELECT to release dotlocks.
This amortises the SELECT/UNSELECT cost across an entire batch
instead of paying it per-message.
"""
if not uids:
return
if dbg:
dbg(f"SELECT {folder_name} for batch STORE ({len(uids)} UIDs)")
select_folder(conn, folder_name, readonly=False)
if dbg:
dbg(f"STORE \\Deleted on UIDs: {uids[:5]}{'...' if len(uids) > 5 else ''}")
try:
conn.add_flags(uids, [b"\\Deleted"])
except Exception as exc:
if dbg:
dbg(f"batch STORE error: {exc}")
raise
if dbg:
dbg("UNSELECT after batch STORE")
try:
conn.unselect_folder()
except Exception:
try:
conn.close_folder()
except Exception:
pass
# ---------------------------------------------------------------------------
# Flag utilities
# ---------------------------------------------------------------------------
def clean_flags(flags: list, exclude: set[str] | None = None) -> list[bytes]:
"""
Filter a flag list, removing entries whose lowercase string form
is in *exclude* (default: ``{\\\\deleted, \\\\recent}``).
Accepts flags as bytes or str. Returns a list of bytes suitable for
``imapclient.append()`` and ``imapclient.add_flags()``.
"""
if exclude is None:
exclude = {"\\deleted", "\\recent"}
result = []
for f in flags:
s = f.decode("ascii", errors="replace") if isinstance(f, bytes) else str(f)
if s.lower() not in exclude:
result.append(f if isinstance(f, bytes) else f.encode("ascii"))
return result