Skip to content

Commit 0109ed7

Browse files
committed
rebuild test_smtp
1 parent 5ceae46 commit 0109ed7

File tree

1 file changed

+192
-134
lines changed

1 file changed

+192
-134
lines changed

test/test_smtp/test_smtp.py

Lines changed: 192 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -1,138 +1,196 @@
1-
import base64
2-
import io
3-
import re
4-
import unittest
1+
import random
2+
import string
3+
from email import message_from_string
4+
from email.message import Message
5+
from io import BytesIO, StringIO
6+
from unittest.mock import MagicMock
7+
8+
import pytest
9+
from pytest_mock import MockerFixture
510

611
from parsons import SMTP
12+
from parsons.notifications.sendmail import EmptyListError
13+
14+
15+
@pytest.fixture
16+
def mock_conn(mocker: MockerFixture) -> MagicMock:
17+
mock_smtp_class = mocker.patch("parsons.notifications.smtp.smtplib.SMTP", autospec=True)
18+
conn = mock_smtp_class.return_value
19+
conn.sendmail.return_value = None
20+
21+
def get_msg() -> Message | None:
22+
if not conn.sendmail.called:
23+
return None
24+
args, _ = conn.sendmail.call_args
25+
return message_from_string(args[2])
26+
27+
def get_types() -> list[str]:
28+
msg = get_msg()
29+
if not msg:
30+
return []
31+
return [p.get_content_type() for p in msg.walk() if not p.is_multipart()]
32+
33+
conn.get_sent_msg = get_msg
34+
conn.get_sent_types = get_types
35+
return conn
36+
37+
38+
@pytest.fixture
39+
def smtp(mock_conn: MagicMock) -> SMTP:
40+
"""Configure a Parsons SMTP instance with fake credentials."""
41+
return SMTP("fake.example.com", username="fake", password="fake")
42+
43+
44+
def create_virtual_file(name: str, content: str = "data") -> StringIO:
45+
f = StringIO(content)
46+
f.name = name
47+
return f
48+
49+
50+
@pytest.mark.parametrize(
51+
("html", "files", "expected_types"),
52+
[
53+
(None, None, ["text/plain"]),
54+
("<p>HTML</p>", None, ["text/plain", "text/html"]),
55+
(None, ["test.csv"], ["text/plain", "text/csv"]),
56+
("<p>HTML</p>", ["test.csv"], ["text/plain", "text/html", "text/csv"]),
57+
(
58+
None,
59+
["1.csv", "2.csv"],
60+
["text/plain", "text/csv", "text/csv"],
61+
),
62+
],
63+
ids=(
64+
"Simple Body",
65+
"Body + HTML",
66+
"Body + Attachment",
67+
"Body + HTML + Attachment",
68+
"Multiple Attachments",
69+
),
70+
)
71+
def test_send_email_content(
72+
smtp: SMTP,
73+
mock_conn: MagicMock,
74+
html: str | None,
75+
files: list[str] | None,
76+
expected_types: list[str],
77+
mocker: MockerFixture,
78+
):
79+
files_with_data = (
80+
[
81+
create_virtual_file(
82+
f, content="".join(random.choices(string.ascii_letters + string.digits, k=8))
83+
)
84+
for f in files
85+
]
86+
if files
87+
else None
88+
)
89+
90+
smtp.send_email(
91+
"f@ex.com",
92+
"t@ex.com",
93+
"We've been trying to reach you...",
94+
"Your car's warranty is about to expire!",
95+
message_html=html,
96+
files=files_with_data,
97+
)
98+
99+
msg = mock_conn.get_sent_msg()
100+
assert msg["Subject"] == "We've been trying to reach you..."
101+
assert msg["To"] == "t@ex.com"
102+
assert msg["From"] == "f@ex.com"
103+
sent_payloads = [p.get_payload(decode=True) for p in msg.walk() if not p.is_multipart()]
104+
assert b"Your car's warranty is about to expire!" in sent_payloads
105+
106+
if html:
107+
assert html.encode() in sent_payloads
108+
109+
if files_with_data:
110+
for content in files_with_data:
111+
assert content.read().encode() in sent_payloads
112+
113+
sent_types = mock_conn.get_sent_types()
114+
for t in expected_types:
115+
assert t in sent_types
116+
117+
118+
def test_binary_attachment_integrity(smtp: SMTP, mock_conn: MagicMock):
119+
gif_data = bytes([71, 73, 70, 56, 57, 97, 1, 0, 1, 0])
120+
gif_file = BytesIO(gif_data)
121+
gif_file.name = "tiny.gif"
122+
123+
smtp.send_email("f@ex.com", "t@ex.com", "Sub", "Body", files=[gif_file])
124+
125+
msg = mock_conn.get_sent_msg()
126+
gif_part = next(p for p in msg.walk() if p.get_filename() == "tiny.gif")
127+
128+
assert gif_part.get_content_type() == "image/gif"
129+
assert gif_part.get_payload(decode=True) == gif_data
130+
131+
132+
def test_attachment_disposition(smtp: SMTP, mock_conn: MagicMock):
133+
named_file = StringIO("content")
134+
named_file.name = "report.pdf"
135+
136+
smtp.send_email("f@ex.com", "t@ex.com", "Sub", "Body", files=[named_file])
137+
138+
msg = mock_conn.get_sent_msg()
139+
file_part = next(p for p in msg.walk() if p.get_filename() == "report.pdf")
140+
assert "attachment" in file_part.get("Content-Disposition")
141+
142+
143+
def test_send_email_files_as_single_string(smtp: SMTP, mock_conn: MagicMock, mocker: MockerFixture):
144+
# We mock the attachment creator because we only care about the input conversion here
145+
mock_create = mocker.patch.object(smtp, "_create_message_attachments")
146+
filename = "single_report.pdf"
147+
148+
smtp.send_email("sender@ex.com", "to@ex.com", "Subject", "Body", files=filename)
149+
150+
args, _ = mock_create.call_args
151+
assert args[4] == [filename]
152+
153+
154+
def test_send_email_empty_recipient_list(smtp: SMTP):
155+
with pytest.raises(EmptyListError, match="Must contain at least 1 email."):
156+
smtp.send_email("sender@ex.com", [], "Sub", "Body")
157+
158+
159+
@pytest.mark.parametrize(
160+
("input_to", "expected_header"),
161+
[
162+
(["a@b.com", "c@d.com"], "a@b.com, c@d.com"),
163+
(['"Name" <a@b.com>', "c@d.com"], '"Name" <a@b.com>, c@d.com'),
164+
],
165+
)
166+
def test_recipient_string_joining(
167+
smtp: SMTP, mock_conn: MagicMock, input_to: list[str], expected_header: str
168+
):
169+
smtp.send_email("f@ex.com", input_to, "Sub", "Body")
170+
171+
msg = mock_conn.get_sent_msg()
172+
assert msg["To"] == expected_header
173+
174+
175+
@pytest.mark.parametrize(("close_manually", "expected_quit_count"), [(True, 0), (False, 1)])
176+
def test_connection_closing_logic(
177+
mock_conn: MagicMock, close_manually: bool, expected_quit_count: int
178+
) -> None:
179+
smtp_inst = SMTP(
180+
"fake.example.com", username="fake", password="fake", close_manually=close_manually
181+
)
182+
183+
smtp_inst.send_email("a@b.com", "c@d.com", "Sub", "Body")
184+
185+
assert mock_conn.quit.call_count == expected_quit_count
186+
187+
188+
def test_send_message_error_handling(smtp: SMTP, mock_conn: MagicMock) -> None:
189+
"""Ensure partial failures are returned correctly."""
190+
fail_response = {"bad@ex.com": (550, "User unknown")}
191+
mock_conn.sendmail.return_value = fail_response
7192

193+
simple_msg = smtp._create_message_simple("f@ex.com", "bad@ex.com", "Sub", "Body")
194+
result = smtp._send_message(simple_msg)
8195

9-
class FakeConnection:
10-
def __init__(self, result_obj):
11-
self.result_obj = result_obj
12-
13-
def sendmail(self, sender, to, message_body):
14-
self.result_obj.result = (sender, to, message_body)
15-
if "willfail@example.com" in to:
16-
return {"willfail@example.com": (550, "User unknown")}
17-
18-
def quit(self):
19-
self.result_obj.quit_ran = True
20-
21-
22-
class TestSMTP(unittest.TestCase):
23-
def setUp(self):
24-
self.smtp = SMTP("fake.example.com", username="fake", password="fake")
25-
self.smtp.conn = FakeConnection(self)
26-
self.result = None
27-
self.quit_ran = False
28-
29-
def test_send_message_simple(self):
30-
self.smtp.send_email(
31-
"foo@example.com", "recipient1@example.com", "Simple subject", "Fake body"
32-
)
33-
assert self.result[0] == "foo@example.com"
34-
assert self.result[1] == ["recipient1@example.com"]
35-
assert self.result[2].endswith(
36-
"\nto: recipient1@example.com\nfrom: foo@example.com"
37-
"\nsubject: Simple subject\n\nFake body"
38-
)
39-
assert self.quit_ran
40-
41-
def test_send_message_html(self):
42-
self.smtp.send_email(
43-
"foohtml@example.com",
44-
"recipienthtml@example.com",
45-
"Simple subject",
46-
"Fake body",
47-
"<p>Really Fake html</p>",
48-
)
49-
assert self.result[0] == "foohtml@example.com"
50-
assert self.result[1] == ["recipienthtml@example.com"]
51-
assert re.search(r"<p>Really Fake html</p>\n--=======", self.result[2])
52-
assert re.search(r"\nFake body\n--======", self.result[2])
53-
assert re.search(r"ubject: Simple subject\n", self.result[2])
54-
assert self.quit_ran
55-
56-
def test_send_message_manualclose(self):
57-
smtp = SMTP("fake.example.com", username="fake", password="fake", close_manually=True)
58-
smtp.conn = FakeConnection(self)
59-
smtp.send_email("foo@example.com", "recipient1@example.com", "Simple subject", "Fake body")
60-
assert not self.quit_ran
61-
62-
def test_send_message_files(self):
63-
named_file_content = "x,y,z\n1,2,3\r\n3,4,5\r\n"
64-
unnamed_file_content = "foo,bar\n1,2\r\n3,4\r\n"
65-
bytes_file_content = bytes(
66-
[
67-
71,
68-
73,
69-
70,
70-
56,
71-
57,
72-
97,
73-
1,
74-
0,
75-
1,
76-
0,
77-
0,
78-
255,
79-
0,
80-
44,
81-
0,
82-
0,
83-
0,
84-
0,
85-
1,
86-
0,
87-
1,
88-
0,
89-
0,
90-
2,
91-
0,
92-
59,
93-
]
94-
)
95-
named_file = io.StringIO(named_file_content)
96-
named_file.name = "xyz.csv"
97-
98-
bytes_file = io.BytesIO(bytes_file_content)
99-
bytes_file.name = "xyz.gif"
100-
101-
self.smtp.send_email(
102-
"foofiles@example.com",
103-
"recipientfiles@example.com",
104-
"Simple subject",
105-
"Fake body",
106-
files=[io.StringIO(unnamed_file_content), named_file, bytes_file],
107-
)
108-
assert self.result[0] == "foofiles@example.com"
109-
assert self.result[1] == ["recipientfiles@example.com"]
110-
assert re.search(r"\nFake body\n--======", self.result[2])
111-
found = re.findall(r'filename="file"\n\n([\w=/]+)\n\n--===', self.result[2])
112-
assert base64.b64decode(found[0]).decode() == unnamed_file_content
113-
found_named = re.findall(
114-
r'Content-Type: text/csv; charset="utf-8"\nMIME-Version: 1.0'
115-
r"\nContent-Transfer-Encoding: base64\nContent-Disposition: "
116-
r'attachment; filename="xyz.csv"\n\n([\w=/]+)\n\n--======',
117-
self.result[2],
118-
)
119-
assert base64.b64decode(found_named[0]).decode() == named_file_content
120-
121-
found_gif = re.findall(
122-
r"Content-Type: image/gif\nMIME-Version: 1.0"
123-
r"\nContent-Transfer-Encoding: base64\nContent-ID: <xyz.gif>"
124-
r'\nContent-Disposition: attachment; filename="xyz.gif"\n\n([\w=/]+)\n\n--==',
125-
self.result[2],
126-
)
127-
assert base64.b64decode(found_gif[0]) == bytes_file_content
128-
assert self.quit_ran
129-
130-
def test_send_message_partial_fail(self):
131-
simple_msg = self.smtp._create_message_simple(
132-
"foo@example.com",
133-
"recipient1@example.com, willfail@example.com",
134-
"Simple subject",
135-
"Fake body",
136-
)
137-
send_result = self.smtp._send_message(simple_msg)
138-
assert send_result == {"willfail@example.com": (550, "User unknown")}
196+
assert result == fail_response

0 commit comments

Comments
 (0)