Skip to content

Commit ad7f46f

Browse files
author
Nekokatt
authored
Bugfix/77 wait for race (#82)
* Optimised raw event dispatching to uncover bug. Looks like, at least on my machine, asyncio immediately invokes anything you await rather than switching to another task on the queue first unless the call does raw IO. I have confirmed this with Epoll, Poll and Select selector implementations on a non-debug asyncio SelectorEventLoop implementation. This means that the bulk of dispatching an event would currently occur as soon as the event is dispatched rather than after another task runs, which could lead to immediate slowdown if other tasks are queued. Switching to sync dispatching and using create task to invoke the callback management "later" seems to speed up this implementation significantly and allows other race conditions we have not accounted for properly as part of #77 to be detectable with test scripts that saturate the event loop. * Updated CLi script to show OS type as well. * Added code to allow debugging of asyncio loop blocking incidents. * Fixes #77 dispatcher wait_for race condition. * Removed async predicates for wait_for, removing last parts of race condition hopefully. * Fixes #77 dispatcher wait_for race condition.
1 parent 5fb11fd commit ad7f46f

10 files changed

Lines changed: 204 additions & 156 deletions

File tree

hikari/api/event_dispatcher.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535

3636
EventT_co = typing.TypeVar("EventT_co", bound=base_events.Event, covariant=True)
3737
EventT_inv = typing.TypeVar("EventT_inv", bound=base_events.Event)
38-
PredicateT = typing.Callable[[EventT_co], typing.Union[bool, typing.Coroutine[typing.Any, typing.Any, bool]]]
39-
AsyncCallbackT = typing.Callable[[EventT_inv], typing.Coroutine[typing.Any, typing.Any, None]]
38+
PredicateT = typing.Callable[[EventT_co], bool]
39+
CallbackT = typing.Callable[[EventT_inv], typing.Coroutine[typing.Any, typing.Any, None]]
4040

4141

4242
class EventDispatcher(abc.ABC):
@@ -134,9 +134,7 @@ async def on_everyone_mentioned(event):
134134
# For the sake of UX, I will check this at runtime instead and let the
135135
# user use a static type checker.
136136
@abc.abstractmethod
137-
def subscribe(
138-
self, event_type: typing.Type[typing.Any], callback: AsyncCallbackT[typing.Any]
139-
) -> AsyncCallbackT[typing.Any]:
137+
def subscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> CallbackT[typing.Any]:
140138
"""Subscribe a given callback to a given event type.
141139
142140
Parameters
@@ -180,7 +178,7 @@ async def on_message(event):
180178
# For the sake of UX, I will check this at runtime instead and let the
181179
# user use a static type checker.
182180
@abc.abstractmethod
183-
def unsubscribe(self, event_type: typing.Type[typing.Any], callback: AsyncCallbackT[typing.Any]) -> None:
181+
def unsubscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None:
184182
"""Unsubscribe a given callback from a given event type, if present.
185183
186184
Parameters
@@ -210,7 +208,7 @@ async def on_message(event):
210208
@abc.abstractmethod
211209
def get_listeners(
212210
self, event_type: typing.Type[EventT_co], *, polymorphic: bool = True,
213-
) -> typing.Collection[AsyncCallbackT[EventT_co]]:
211+
) -> typing.Collection[CallbackT[EventT_co]]:
214212
"""Get the listeners for a given event type, if there are any.
215213
216214
Parameters
@@ -240,7 +238,7 @@ def get_listeners(
240238
@abc.abstractmethod
241239
def listen(
242240
self, event_type: typing.Optional[typing.Type[EventT_co]] = None,
243-
) -> typing.Callable[[AsyncCallbackT[EventT_co]], AsyncCallbackT[EventT_co]]:
241+
) -> typing.Callable[[CallbackT[EventT_co]], CallbackT[EventT_co]]:
244242
"""Generate a decorator to subscribe a callback to an event type.
245243
246244
This is a second-order decorator.
@@ -285,11 +283,13 @@ async def wait_for(
285283
The event type to listen for. This will listen for subclasses of
286284
this type additionally.
287285
predicate
288-
A function or coroutine taking the event as the single parameter.
286+
A function taking the event as the single parameter.
289287
This should return `builtins.True` if the event is one you want to
290288
return, or `builtins.False` if the event should not be returned.
291289
If left as `None` (the default), then the first matching event type
292290
that the bot receives (or any subtype) will be the one returned.
291+
292+
ASYNC PREDICATES ARE NOT SUPPORTED.
293293
timeout : typing.Optional[builtins.float or builtins.int]
294294
The amount of time to wait before raising an `asyncio.TimeoutError`
295295
and giving up instead. This is measured in seconds. If

hikari/cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,4 @@ def main() -> None:
4848
sys.stderr.write(f"hikari v{version} {sha1}\n")
4949
sys.stderr.write(f"located at {path}\n")
5050
sys.stderr.write(f"{py_impl} {py_ver} {py_compiler}\n")
51+
sys.stderr.write(" ".join(frag.strip() for frag in platform.uname() if frag and frag.strip()) + "\n")

hikari/events/base_events.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -162,21 +162,6 @@ class ExceptionEvent(Event, typing.Generic[FailedEventT]):
162162
side-effects on the application runtime.
163163
"""
164164

165-
app: traits.RESTAware = attr.ib(metadata={attr_extensions.SKIP_DEEP_COPY: True})
166-
# <<inherited docstring from Event>>.
167-
168-
shard: typing.Optional[gateway_shard.GatewayShard] = attr.ib(metadata={attr_extensions.SKIP_DEEP_COPY: True})
169-
"""Shard that received the event.
170-
171-
Returns
172-
-------
173-
hikari.api.shard.GatewayShard
174-
Shard that raised this exception.
175-
176-
This may be `builtins.None` if no specific shard was the cause of this
177-
exception (e.g. when starting up or shutting down).
178-
"""
179-
180165
exception: Exception = attr.ib()
181166
"""Exception that was raised.
182167
@@ -201,6 +186,28 @@ class ExceptionEvent(Event, typing.Generic[FailedEventT]):
201186
# for us to remove this effect. This functionally changes nothing but it helps MyPy.
202187
_failed_callback: FailedCallbackT[FailedEventT] = attr.ib()
203188

189+
@property
190+
def app(self) -> traits.RESTAware:
191+
# <<inherited docstring from Event>>.
192+
return self.failed_event.app
193+
194+
@property
195+
def shard(self) -> typing.Optional[gateway_shard.GatewayShard]:
196+
"""Shard that received the event, if there was one associated.
197+
198+
Returns
199+
-------
200+
typing.Optional[hikari.api.shard.GatewayShard]
201+
Shard that raised this exception.
202+
203+
This may be `builtins.None` if no specific shard was the cause of this
204+
exception (e.g. when starting up or shutting down).
205+
"""
206+
shard = getattr(self.failed_event, "shard", None)
207+
if isinstance(shard, gateway_shard.GatewayShard):
208+
return shard
209+
return None
210+
204211
@property
205212
def failed_callback(self) -> FailedCallbackT[FailedEventT]:
206213
"""Event callback that threw an exception.

hikari/impl/bot.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
from hikari.impl import stateless_event_manager
5858
from hikari.impl import stateless_guild_chunker as stateless_guild_chunker_impl
5959
from hikari.impl import voice
60+
from hikari.utilities import aio
6061
from hikari.utilities import art
6162
from hikari.utilities import constants
6263
from hikari.utilities import date
@@ -574,30 +575,30 @@ async def start(self) -> None:
574575
def listen(
575576
self, event_type: typing.Optional[typing.Type[event_dispatcher.EventT_co]] = None,
576577
) -> typing.Callable[
577-
[event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co]],
578-
event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co],
578+
[event_dispatcher.CallbackT[event_dispatcher.EventT_co]],
579+
event_dispatcher.CallbackT[event_dispatcher.EventT_co],
579580
]:
580581
# <<inherited docstring from event_dispatcher.EventDispatcher>>
581582
return self.dispatcher.listen(event_type)
582583

583584
def get_listeners(
584585
self, event_type: typing.Type[event_dispatcher.EventT_co], *, polymorphic: bool = True,
585-
) -> typing.Collection[event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co]]:
586+
) -> typing.Collection[event_dispatcher.CallbackT[event_dispatcher.EventT_co]]:
586587
# <<inherited docstring from event_dispatcher.EventDispatcher>>
587588
return self.dispatcher.get_listeners(event_type, polymorphic=polymorphic)
588589

589590
def subscribe(
590591
self,
591592
event_type: typing.Type[event_dispatcher.EventT_co],
592-
callback: event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co],
593-
) -> event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co]:
593+
callback: event_dispatcher.CallbackT[event_dispatcher.EventT_co],
594+
) -> event_dispatcher.CallbackT[event_dispatcher.EventT_co]:
594595
# <<inherited docstring from event_dispatcher.EventDispatcher>>
595596
return self.dispatcher.subscribe(event_type, callback)
596597

597598
def unsubscribe(
598599
self,
599600
event_type: typing.Type[event_dispatcher.EventT_co],
600-
callback: event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co],
601+
callback: event_dispatcher.CallbackT[event_dispatcher.EventT_co],
601602
) -> None:
602603
# <<inherited docstring from event_dispatcher.EventDispatcher>>
603604
return self.dispatcher.unsubscribe(event_type, callback)
@@ -642,7 +643,12 @@ async def close(self) -> None:
642643
await self._connector_factory.close()
643644
self._global_ratelimit.close()
644645

645-
def run(self) -> None:
646+
def run(
647+
self,
648+
*,
649+
loop: typing.Optional[asyncio.AbstractEventLoop] = None,
650+
slow_callback_duration: typing.Optional[float] = None,
651+
) -> None:
646652
"""Run this application on the current thread in an event loop.
647653
648654
This will use the event loop that is set for the current thread, or
@@ -658,19 +664,47 @@ def run(self) -> None:
658664
659665
The application is always guaranteed to be shut down before this
660666
function completes or propagates any exception.
667+
668+
Parameters
669+
----------
670+
loop : typing.Optional[asyncio.AbstractEventLoop]
671+
Event loop to run on. This defaults to `builtins.None`.
672+
673+
If `builtins.None`, the event loop set for the current thread will
674+
be used. If the thread does not have an event loop, then one will
675+
be created first and registered to the running thread.
676+
677+
It is advisable to only have one event loop per thread. Generally
678+
you should not have a need to specify this.
679+
slow_callback_duration : typing.Optional[builtins.float]
680+
How long a coroutine should block for in seconds before it shows a
681+
warning.
682+
683+
This defaults to being `builtins.None`, which will disable the
684+
feature (since it may cause a small increase in execution latency).
685+
If specified as a number, it will be enabled.
661686
"""
662-
try:
663-
loop = asyncio.get_event_loop()
664-
except RuntimeError:
665-
_LOGGER.debug("no event loop registered on this thread; now creating one...")
666-
loop = asyncio.new_event_loop()
667-
asyncio.set_event_loop(loop)
687+
if loop is None:
688+
try:
689+
loop = asyncio.get_event_loop()
690+
_LOGGER.debug("using default thread's event loop")
691+
except RuntimeError:
692+
_LOGGER.debug("no event loop registered on this thread; now creating one...")
693+
loop = asyncio.new_event_loop()
694+
asyncio.set_event_loop(loop)
695+
696+
# We always expect this to be populated by now.
697+
loop: asyncio.AbstractEventLoop
698+
699+
if slow_callback_duration and slow_callback_duration > 0:
700+
aio.patch_slow_callback_detection(slow_callback_duration)
668701

669702
try:
670703
self._map_signal_handlers(
671704
loop.add_signal_handler,
672705
lambda *_: loop.create_task(self.close(), name="signal interrupt shutting down application"),
673706
)
707+
_LOGGER.debug("using default thread's event loop", loop)
674708
loop.run_until_complete(self._shard_management_lifecycle())
675709

676710
except KeyboardInterrupt as ex:

0 commit comments

Comments
 (0)