diff --git a/src/pyshark/capture/capture.py b/src/pyshark/capture/capture.py index 5e41b6fc..4ec0649f 100644 --- a/src/pyshark/capture/capture.py +++ b/src/pyshark/capture/capture.py @@ -1,3 +1,4 @@ +from concurrent.futures import ThreadPoolExecutor import asyncio import os import threading @@ -152,7 +153,8 @@ def _setup_eventloop(self): self.eventloop = asyncio.ProactorEventLoop() else: self.eventloop = asyncio.new_event_loop() - asyncio.set_event_loop(self.eventloop) + ThreadPoolExecutor().submit(self.eventloop.run_forever) + # asyncio.set_event_loop(self.eventloop) if os.name == 'posix' and isinstance(threading.current_thread(), threading._MainThread): asyncio.get_child_watcher().attach_loop(self.eventloop) @@ -205,17 +207,20 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None): :param packet_count: If given, stops after this amount of packets is captured. """ # NOTE: This has code duplication with the async version, think about how to solve this - tshark_process = existing_process or self.eventloop.run_until_complete(self._get_tshark_process()) - psml_structure, data = self.eventloop.run_until_complete(self._get_psml_struct(tshark_process.stdout)) + tshark_process = existing_process or asyncio.run_coroutine_threadsafe(self._get_tshark_process(), self.eventloop).result() + + psml_structure, data = asyncio.run_coroutine_threadsafe(self._get_psml_struct(tshark_process.stdout), self.eventloop).result() packets_captured = 0 data = b'' try: while True: try: - packet, data = self.eventloop.run_until_complete( + packet, data = asyncio.run_coroutine_threadsafe( self._get_packet_from_stream(tshark_process.stdout, data, psml_structure=psml_structure, - got_first_packet=packets_captured > 0)) + got_first_packet=packets_captured > 0), + self.eventloop + ).result() except EOFError: self._log.debug('EOF reached (sync)') @@ -227,7 +232,7 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None): if packet_count and packets_captured >= packet_count: break finally: - self.eventloop.run_until_complete(self._cleanup_subprocess(tshark_process)) + asyncio.run_coroutine_threadsafe(self._cleanup_subprocess(tshark_process), self.eventloop).result() def apply_on_packets(self, callback, timeout=None, packet_count=None): """ @@ -245,7 +250,7 @@ def print_callback(pkt): coro = self.packets_from_tshark(callback, packet_count=packet_count) if timeout is not None: coro = asyncio.wait_for(coro, timeout) - return self.eventloop.run_until_complete(coro) + return asyncio.run_coroutine_threadsafe(coro, self.eventloop).result() async def packets_from_tshark(self, packet_callback, packet_count=None, close_tshark=True): """ @@ -400,7 +405,7 @@ async def _cleanup_subprocess(self, process): % process.returncode) def close(self): - self.eventloop.run_until_complete(self._close_async()) + asyncio.run_coroutine_threadsafe(self._close_async(), self.eventloop).result() async def _close_async(self): for process in self._running_processes: