Skip to content

Commit c774a09

Browse files
committed
test: include original tests for atomic_write
The tests were migrated from the original tests of the atomicwrites project: https://github.com/untitaker/python-atomicwrites
1 parent 97c79f8 commit c774a09

File tree

1 file changed

+116
-1
lines changed

1 file changed

+116
-1
lines changed

test/test_contacts.py

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44

55
import base64
66
import datetime
7+
import errno
8+
import os
9+
import pathlib
10+
import tempfile
711
import unittest
812
from unittest import mock
913

10-
from khard.contacts import Contact, multi_property_key
14+
from khard.contacts import Contact, atomic_write, multi_property_key
1115

1216

1317
class ContactFormatDateObject(unittest.TestCase):
@@ -80,3 +84,114 @@ def test_all_strings_are_sorted_before_dicts(self) -> None:
8084
my_list = ["a", {"c": "d"}, "e", {"f": "g"}]
8185
my_list.sort(key=multi_property_key) # type: ignore[arg-type]
8286
self.assertEqual(my_list, ["a", "e", {"c": "d"}, {"f": "g"}])
87+
88+
89+
class AtomicWrite(unittest.TestCase):
90+
"""Tests for the atomic_write functionself.
91+
92+
These tests have been migrated from the original atomicwrites repository.
93+
"""
94+
95+
def setUp(self) -> None:
96+
self.t = tempfile.TemporaryDirectory()
97+
self.tmpdir = pathlib.Path(self.t.name)
98+
return super().setUp()
99+
100+
def tearDown(self) -> None:
101+
self.t.cleanup()
102+
return super().tearDown()
103+
104+
def assertFileContents(self, file: pathlib.Path, contents: str) -> None:
105+
"""Assert the file contents of the given file"""
106+
with file.open() as f:
107+
return self.assertEqual(f.read(), contents)
108+
109+
@staticmethod
110+
def write(path: pathlib.Path, contents: str) -> None:
111+
"""Write a string to a file"""
112+
with path.open("w") as f:
113+
f.write(contents)
114+
115+
def test_atomic_write(self) -> None:
116+
fname = self.tmpdir / 'ha'
117+
for i in range(2):
118+
with atomic_write(str(fname), overwrite=True) as f:
119+
f.write('hoho')
120+
121+
with self.assertRaises(OSError) as excinfo:
122+
with atomic_write(str(fname), overwrite=False) as f:
123+
f.write('haha')
124+
125+
self.assertEqual(excinfo.exception.errno, errno.EEXIST)
126+
127+
self.assertFileContents(fname, 'hoho')
128+
self.assertEqual(len(list(self.tmpdir.iterdir())), 1)
129+
130+
131+
def test_teardown(self) -> None:
132+
fname = self.tmpdir / 'ha'
133+
with self.assertRaises(AssertionError):
134+
with atomic_write(str(fname), overwrite=True):
135+
self.fail("This code should not be reached")
136+
137+
self.assertFalse(any(self.tmpdir.iterdir()))
138+
139+
140+
def test_replace_simultaneously_created_file(self) -> None:
141+
fname = self.tmpdir / 'ha'
142+
with atomic_write(str(fname), overwrite=True) as f:
143+
f.write('hoho')
144+
self.write(fname, 'harhar')
145+
self.assertFileContents(fname, 'harhar')
146+
self.assertFileContents(fname, 'hoho')
147+
self.assertEqual(len(list(self.tmpdir.iterdir())), 1)
148+
149+
150+
def test_dont_remove_simultaneously_created_file(self) -> None:
151+
fname = self.tmpdir / 'ha'
152+
with self.assertRaises(OSError) as excinfo:
153+
with atomic_write(str(fname), overwrite=False) as f:
154+
f.write('hoho')
155+
self.write(fname, 'harhar')
156+
self.assertFileContents(fname, 'harhar')
157+
158+
self.assertEqual(excinfo.exception.errno, errno.EEXIST)
159+
self.assertFileContents(fname, 'harhar')
160+
self.assertEqual(len(list(self.tmpdir.iterdir())), 1)
161+
162+
163+
def test_open_reraise(self) -> None:
164+
"""Verify that nested exceptions during rollback do not overwrite the
165+
initial exception that triggered a rollback."""
166+
fname = self.tmpdir / 'ha'
167+
with self.assertRaises(AssertionError):
168+
with atomic_write(str(fname), overwrite=False):
169+
# Mess with internals; find and remove the temp file used by
170+
# atomic_write internally. We're testing that the initial
171+
# AssertionError triggered below is propagated up the stack,
172+
# not the second exception triggered during commit.
173+
tmp = next(self.tmpdir.iterdir())
174+
tmp.unlink()
175+
# Now trigger our own exception.
176+
self.fail("Intentional failure for testing purposes")
177+
178+
179+
def test_atomic_write_in_cwd(self) -> None:
180+
orig_curdir = os.getcwd()
181+
try:
182+
os.chdir(str(self.tmpdir))
183+
fname = 'ha'
184+
for i in range(2):
185+
with atomic_write(fname, overwrite=True) as f:
186+
f.write('hoho')
187+
188+
with self.assertRaises(OSError) as excinfo:
189+
with atomic_write(fname, overwrite=False) as f:
190+
f.write('haha')
191+
192+
self.assertEqual(excinfo.exception.errno, errno.EEXIST)
193+
194+
self.assertFileContents(pathlib.Path(fname), 'hoho')
195+
self.assertEqual(len(list(self.tmpdir.iterdir())), 1)
196+
finally:
197+
os.chdir(orig_curdir)

0 commit comments

Comments
 (0)