Skip to content

Commit ae09ab8

Browse files
authored
Merge branch 'master' into ssl_optimizations
2 parents 11ad720 + 5910a18 commit ae09ab8

File tree

6 files changed

+92
-96
lines changed

6 files changed

+92
-96
lines changed

tests/test_fs_event.py

Lines changed: 67 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,100 +1,87 @@
11
import asyncio
2+
import contextlib
23
import os.path
34
import tempfile
45

56
from uvloop import _testbase as tb
67
from uvloop.loop import FileSystemEvent
78

89

9-
class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase):
10-
async def _file_writer(self):
11-
f = await self.q.get()
12-
while True:
13-
f.write('hello uvloop\n')
14-
f.flush()
15-
x = await self.q.get()
16-
if x is None:
17-
return
18-
19-
def fs_event_setup(self):
20-
self.change_event_count = 0
21-
self.fname = ''
22-
self.q = asyncio.Queue()
23-
24-
def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
25-
_d, fn = os.path.split(self.fname)
26-
self.assertEqual(ev_fname, fn)
27-
self.assertEqual(evt, FileSystemEvent.CHANGE)
28-
self.change_event_count += 1
29-
if self.change_event_count < 4:
30-
self.q.put_nowait(0)
31-
else:
32-
self.q.put_nowait(None)
10+
class Test_UV_FS_Event(tb.UVTestCase):
11+
def setUp(self):
12+
super().setUp()
13+
self.exit_stack = contextlib.ExitStack()
14+
self.tmp_dir = self.exit_stack.enter_context(
15+
tempfile.TemporaryDirectory()
16+
)
17+
18+
def tearDown(self):
19+
self.exit_stack.close()
20+
super().tearDown()
3321

3422
def test_fs_event_change(self):
35-
self.fs_event_setup()
36-
37-
async def run(write_task):
38-
self.q.put_nowait(tf)
39-
try:
40-
await asyncio.wait_for(write_task, 4)
41-
except asyncio.TimeoutError:
42-
write_task.cancel()
43-
44-
with tempfile.NamedTemporaryFile('wt') as tf:
45-
self.fname = tf.name.encode()
46-
h = self.loop._monitor_fs(tf.name, self.event_cb)
23+
change_event_count = 0
24+
filename = "fs_event_change.txt"
25+
path = os.path.join(self.tmp_dir, filename)
26+
q = asyncio.Queue()
27+
28+
with open(path, 'wt') as f:
29+
async def file_writer():
30+
while True:
31+
f.write('hello uvloop\n')
32+
f.flush()
33+
x = await q.get()
34+
if x is None:
35+
return
36+
37+
def event_cb(ev_fname: bytes, evt: FileSystemEvent):
38+
nonlocal change_event_count
39+
self.assertEqual(ev_fname, filename.encode())
40+
self.assertEqual(evt, FileSystemEvent.CHANGE)
41+
change_event_count += 1
42+
if change_event_count < 4:
43+
q.put_nowait(0)
44+
else:
45+
q.put_nowait(None)
46+
47+
h = self.loop._monitor_fs(path, event_cb)
48+
self.loop.run_until_complete(
49+
asyncio.sleep(0.1) # let monitor start
50+
)
4751
self.assertFalse(h.cancelled())
4852

49-
self.loop.run_until_complete(run(
50-
self.loop.create_task(self._file_writer())))
53+
self.loop.run_until_complete(asyncio.wait_for(file_writer(), 4))
5154
h.cancel()
5255
self.assertTrue(h.cancelled())
5356

54-
self.assertEqual(self.change_event_count, 4)
55-
56-
57-
class Test_UV_FS_EVENT_RENAME(tb.UVTestCase):
58-
async def _file_renamer(self):
59-
await self.q.get()
60-
os.rename(os.path.join(self.dname, self.changed_name),
61-
os.path.join(self.dname, self.changed_name + "-new"))
62-
await self.q.get()
63-
64-
def fs_event_setup(self):
65-
self.dname = ''
66-
self.changed_name = "hello_fs_event.txt"
67-
self.changed_set = {self.changed_name, self.changed_name + '-new'}
68-
self.q = asyncio.Queue()
69-
70-
def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
71-
ev_fname = ev_fname.decode()
72-
self.assertEqual(evt, FileSystemEvent.RENAME)
73-
self.changed_set.remove(ev_fname)
74-
if len(self.changed_set) == 0:
75-
self.q.put_nowait(None)
57+
self.assertEqual(change_event_count, 4)
7658

7759
def test_fs_event_rename(self):
78-
self.fs_event_setup()
79-
80-
async def run(write_task):
81-
self.q.put_nowait(0)
82-
try:
83-
await asyncio.wait_for(write_task, 4)
84-
except asyncio.TimeoutError:
85-
write_task.cancel()
86-
87-
with tempfile.TemporaryDirectory() as td_name:
88-
self.dname = td_name
89-
f = open(os.path.join(td_name, self.changed_name), 'wt')
60+
orig_name = "hello_fs_event.txt"
61+
new_name = "hello_fs_event_rename.txt"
62+
changed_set = {orig_name, new_name}
63+
event = asyncio.Event()
64+
65+
async def file_renamer():
66+
os.rename(os.path.join(self.tmp_dir, orig_name),
67+
os.path.join(self.tmp_dir, new_name))
68+
await event.wait()
69+
70+
def event_cb(ev_fname: bytes, evt: FileSystemEvent):
71+
ev_fname = ev_fname.decode()
72+
self.assertEqual(evt, FileSystemEvent.RENAME)
73+
changed_set.discard(ev_fname)
74+
if len(changed_set) == 0:
75+
event.set()
76+
77+
with open(os.path.join(self.tmp_dir, orig_name), 'wt') as f:
9078
f.write('hello!')
91-
f.close()
92-
h = self.loop._monitor_fs(td_name, self.event_cb)
93-
self.assertFalse(h.cancelled())
79+
h = self.loop._monitor_fs(self.tmp_dir, event_cb)
80+
self.loop.run_until_complete(asyncio.sleep(0.5)) # let monitor start
81+
self.assertFalse(h.cancelled())
9482

95-
self.loop.run_until_complete(run(
96-
self.loop.create_task(self._file_renamer())))
97-
h.cancel()
98-
self.assertTrue(h.cancelled())
83+
self.loop.run_until_complete(asyncio.wait_for(file_renamer(), 4))
84+
h.cancel()
85+
self.assertTrue(h.cancelled())
9986

100-
self.assertEqual(len(self.changed_set), 0)
87+
self.assertEqual(len(changed_set), 0)

uvloop/cbhandles.pyx

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ cdef class Handle:
99
cdef inline _set_loop(self, Loop loop):
1010
self.loop = loop
1111
if UVLOOP_DEBUG:
12-
loop._debug_cb_handles_total += 1
13-
loop._debug_cb_handles_count += 1
12+
system.__atomic_fetch_add(
13+
&loop._debug_cb_handles_total, 1, system.__ATOMIC_RELAXED)
14+
system.__atomic_fetch_add(
15+
&loop._debug_cb_handles_count, 1, system.__ATOMIC_RELAXED)
1416
if loop._debug:
1517
self._source_traceback = extract_stack()
1618

@@ -21,7 +23,8 @@ cdef class Handle:
2123

2224
def __dealloc__(self):
2325
if UVLOOP_DEBUG and self.loop is not None:
24-
self.loop._debug_cb_handles_count -= 1
26+
system.__atomic_fetch_sub(
27+
&self.loop._debug_cb_handles_count, 1, system.__ATOMIC_RELAXED)
2528
if self.loop is None:
2629
raise RuntimeError('Handle.loop is None in Handle.__dealloc__')
2730

@@ -174,8 +177,10 @@ cdef class TimerHandle:
174177
self._cancelled = 0
175178

176179
if UVLOOP_DEBUG:
177-
self.loop._debug_cb_timer_handles_total += 1
178-
self.loop._debug_cb_timer_handles_count += 1
180+
system.__atomic_fetch_add(
181+
&self.loop._debug_cb_timer_handles_total, 1, system.__ATOMIC_RELAXED)
182+
system.__atomic_fetch_add(
183+
&self.loop._debug_cb_timer_handles_count, 1, system.__ATOMIC_RELAXED)
179184

180185
if context is None:
181186
context = Context_CopyCurrent()
@@ -205,7 +210,8 @@ cdef class TimerHandle:
205210

206211
def __dealloc__(self):
207212
if UVLOOP_DEBUG:
208-
self.loop._debug_cb_timer_handles_count -= 1
213+
system.__atomic_fetch_sub(
214+
&self.loop._debug_cb_timer_handles_count, 1, system.__ATOMIC_RELAXED)
209215
if self.timer is not None:
210216
raise RuntimeError('active TimerHandle is deallacating')
211217

uvloop/includes/stdlib.pxi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ cdef aio_wait = asyncio.wait
3737
cdef aio_wrap_future = asyncio.wrap_future
3838
cdef aio_logger = asyncio.log.logger
3939
cdef aio_iscoroutine = asyncio.iscoroutine
40-
cdef aio_iscoroutinefunction = asyncio.iscoroutinefunction
4140
cdef aio_BaseProtocol = asyncio.BaseProtocol
4241
cdef aio_Protocol = asyncio.Protocol
4342
cdef aio_isfuture = getattr(asyncio, 'isfuture', None)
@@ -65,6 +64,7 @@ cdef gc_disable = gc.disable
6564

6665
cdef iter_chain = itertools.chain
6766
cdef inspect_isgenerator = inspect.isgenerator
67+
cdef inspect_iscoroutinefunction = inspect.iscoroutinefunction
6868

6969
cdef int has_IPV6_V6ONLY = hasattr(socket, 'IPV6_V6ONLY')
7070
cdef int IPV6_V6ONLY = getattr(socket, 'IPV6_V6ONLY', -1)

uvloop/includes/system.pxd

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,11 @@ cdef extern from "includes/fork_handler.h":
9494
void setForkHandler(OnForkHandler handler)
9595
void resetForkHandler()
9696
void setMainThreadID(uint64_t id)
97+
98+
99+
cdef extern from * nogil:
100+
uint64_t __atomic_fetch_add(uint64_t *ptr, uint64_t val, int memorder)
101+
uint64_t __atomic_fetch_sub(uint64_t *ptr, uint64_t val, int memorder)
102+
103+
cdef enum:
104+
__ATOMIC_RELAXED

uvloop/loop.pxd

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ cdef class Loop:
5050
object _default_executor
5151
object _ready
5252
set _queued_streams, _executing_streams
53-
Py_ssize_t _ready_len
5453

5554
set _servers
5655

uvloop/loop.pyx

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ cdef class Loop:
181181
self._queued_streams = set()
182182
self._executing_streams = set()
183183
self._ready = col_deque()
184-
self._ready_len = 0
185184

186185
self.handler_async = UVAsync.new(
187186
self, <method_t>self._on_wake, self)
@@ -440,7 +439,7 @@ cdef class Loop:
440439
self.handler_async.send()
441440

442441
cdef _on_wake(self):
443-
if ((self._ready_len > 0 or self._stopping) and
442+
if ((len(self._ready) > 0 or self._stopping) and
444443
not self.handler_idle.running):
445444
self.handler_idle.start()
446445

@@ -481,8 +480,7 @@ cdef class Loop:
481480
if len(self._queued_streams):
482481
self._exec_queued_writes()
483482

484-
self._ready_len = len(self._ready)
485-
if self._ready_len == 0 and self.handler_idle.running:
483+
if len(self._ready) == 0 and self.handler_idle.running:
486484
self.handler_idle.stop()
487485

488486
if self._stopping:
@@ -570,7 +568,6 @@ cdef class Loop:
570568
for cb_handle in self._ready:
571569
cb_handle.cancel()
572570
self._ready.clear()
573-
self._ready_len = 0
574571

575572
if self._polls:
576573
for poll_handle in self._polls.values():
@@ -672,7 +669,6 @@ cdef class Loop:
672669
cdef inline _append_ready_handle(self, Handle handle):
673670
self._check_closed()
674671
self._ready.append(handle)
675-
self._ready_len += 1
676672

677673
cdef inline _call_soon_handle(self, Handle handle):
678674
self._append_ready_handle(handle)
@@ -2731,7 +2727,7 @@ cdef class Loop:
27312727
return transport, protocol
27322728

27332729
def run_in_executor(self, executor, func, *args):
2734-
if aio_iscoroutine(func) or aio_iscoroutinefunction(func):
2730+
if aio_iscoroutine(func) or inspect_iscoroutinefunction(func):
27352731
raise TypeError("coroutines cannot be used with run_in_executor()")
27362732

27372733
self._check_closed()
@@ -2910,7 +2906,7 @@ cdef class Loop:
29102906
'the main thread')
29112907

29122908
if (aio_iscoroutine(callback)
2913-
or aio_iscoroutinefunction(callback)):
2909+
or inspect_iscoroutinefunction(callback)):
29142910
raise TypeError(
29152911
"coroutines cannot be used with add_signal_handler()")
29162912

0 commit comments

Comments
 (0)