-
Notifications
You must be signed in to change notification settings - Fork 555
Open
Description
Operating system: Any
wxPython version & source: Latest PYPI
Python version & source: 3.5+
Coroutines are very helpful for GUI developing as this technique allows time-consuming tasks running along with the main event loop, while keep the program logic in a synchronous-like manner. wx.Yield() will no longer be necessary after the introduction of coroutines. A piece of sample code demonstrating the integration is attached in the end.
Because of the usefulness I think it will be great to add these helper classes into somewhere under the wx.lib package, while I have some questions. How should I submit the patch, is a pull request enough for a patch, and in which sub-package should these classes be located? Thank you very much!
#!/usr/bin/env python3
import asyncio
import concurrent.futures
import threading
import time
from asyncio.events import AbstractEventLoop
from asyncio.futures import Future
from typing import Optional, Callable, Any, Type
import wx
class WxTimerHandle(asyncio.TimerHandle):
__slots__ = 'call_later',
class WxEventLoop(asyncio.AbstractEventLoop):
def __init__(self, app: wx.AppConsole):
self._closed = False
self._app = app
self._default_executor = None
self._debug = False
self._exception_handler = None
self._task_factory = None
def run_forever(self) -> None:
self._app.MainLoop()
def stop(self) -> None:
self._app.ExitMainLoop()
def is_running(self) -> bool:
return self._app.GetMainLoop() is not None
def close(self) -> None:
executor = self._default_executor
if executor is not None:
self._default_executor = None
executor.shutdown(wait=False)
self._closed = True
def _timer_handle_cancelled(self, handle: WxTimerHandle) -> None:
handle.call_later.Stop()
def call_soon(self, callback: Callable[..., Any], *args, context=None) -> None:
self.call_soon_threadsafe(callback, *args)
def call_at(self, when, callback: Callable[..., Any], *args, context=None) -> WxTimerHandle:
return self.call_later(when - self.time(), callback, *args, context)
def call_later(self, delay: float, callback: Callable[..., Any], *args: Any) -> WxTimerHandle:
handle = WxTimerHandle(delay * 1000 + self.time(), callback, args, self)
handle.call_later = wx.CallLater(int(delay * 1000), callback, *args)
return handle
def time(self) -> float:
return time.monotonic()
def create_future(self) -> asyncio.Future:
return asyncio.Future(loop=self)
def create_task(self, coro) -> asyncio.Task:
if self._task_factory is None:
return asyncio.Task(coro, loop=self)
else:
return self._task_factory(self, coro)
def call_soon_threadsafe(self, callback: Callable[..., Any], *args, context=None) -> None:
wx.CallAfter(callback, *args)
def run_in_executor(self, executor: concurrent.futures.ThreadPoolExecutor, func: Callable[..., Any], *args) -> asyncio.Future:
if executor is None:
executor = self._default_executor
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor
return asyncio.wrap_future(executor.submit(func, *args), loop=self)
def set_default_executor(self, executor: concurrent.futures.ThreadPoolExecutor) -> None:
self._default_executor = executor
def get_exception_handler(self):
return self._exception_handler
def set_exception_handler(self, handler):
self._exception_handler = handler
def default_exception_handler(self, context):
print('Got exception: ' + repr(context))
def call_exception_handler(self, context):
if self._exception_handler is None:
self.default_exception_handler(context)
else:
self._exception_handler(self, context)
def get_debug(self) -> bool:
return self._debug
def set_debug(self, enabled: bool) -> None:
self._debug = enabled
def run_until_complete(self, future):
raise NotImplementedError
def is_closed(self) -> bool:
return self._closed
async def shutdown_asyncgens(self):
raise NotImplementedError
def set_task_factory(self, factory) -> None:
self._task_factory = factory
def get_task_factory(self):
return self._task_factory
class WxEventLoopPolicy(asyncio.AbstractEventLoopPolicy):
def __init__(self, app: Type[wx.AppConsole], delegate: asyncio.AbstractEventLoopPolicy = asyncio.get_event_loop_policy()):
self._app = app
self._loop = None
self._delegate = delegate
def get_event_loop(self) -> AbstractEventLoop:
if threading.current_thread() is threading.main_thread():
if self._loop is None:
self._loop = WxEventLoop(self._app())
return self._loop
else:
return self._delegate.get_event_loop()
def set_event_loop(self, loop: AbstractEventLoop) -> None:
self._delegate.set_event_loop(loop)
def new_event_loop(self) -> AbstractEventLoop:
return self._delegate.new_event_loop()
def get_child_watcher(self) -> Any:
return self._delegate.get_child_watcher()
def set_child_watcher(self, watcher: Any) -> None:
self._delegate.set_child_watcher(watcher)
def _bind_async(self, event, handler):
def _handler(event):
asyncio.ensure_future(handler(event))
self.Bind(event, _handler)
wx.EvtHandler.BindAsync = _bind_async
def main():
asyncio.set_event_loop_policy(WxEventLoopPolicy(app=wx.App))
asyncio.get_event_loop().set_debug(True)
def _another_loop_thread():
nonlocal another_loop
another_loop = asyncio.new_event_loop()
asyncio.set_event_loop(another_loop)
another_loop.run_forever()
another_loop.close()
another_loop = None
another_loop_thread = threading.Thread(target=_another_loop_thread)
another_loop_thread.start()
async def on_close(event):
def stop_and_close():
another_loop.stop()
another_loop.call_soon_threadsafe(stop_and_close)
frame.Destroy()
frame = wx.Frame(None, title='Coroutine Integration in wxPython', size=wx.Size(800, 600))
frame.BindAsync(wx.EVT_CLOSE, on_close)
frame.CreateStatusBar()
frame.GetStatusBar().StatusText = 'Ready'
counter = 1
async def on_click(event):
def log(message: str):
frame.GetStatusBar().StatusText = message
print(message)
nonlocal counter
count = ' [' + str(counter) + ']'
counter += 1
log('Starting the event handler' + count)
await asyncio.sleep(1) # Sleep in the current event loop
log('Running in the thread pool' + count)
# time.sleep is used to emulate synchronous time-consuming tasks
await asyncio.get_event_loop().run_in_executor(None, time.sleep, 1)
log('Running in another loop' + count)
# Socket operations are theoretically unsupported in WxEventLoop
# So a default event loop in a separate thread is sometime required
# asyncio.sleep is used to emulate these asynchronous tasks
await asyncio.wrap_future(asyncio.run_coroutine_threadsafe(asyncio.sleep(1), another_loop))
log('Ready' + count)
button = wx.Button(frame, label='\n'.join([
'Click to start the asynchronous event handler',
'The application will remain responsive while the handler is running',
'Try click here multiple times to launch multiple coroutines',
'These coroutines will not conflict each other as they are all in the same thread',
]))
button.BindAsync(wx.EVT_BUTTON, on_click)
frame.Show()
asyncio.get_event_loop().run_forever()
another_loop_thread.join()
if __name__ == '__main__':
main()LeonarddeR and kimalive
Metadata
Metadata
Assignees
Labels
No labels