99from datetime import datetime
1010import time
1111from copy import deepcopy
12- import asyncio
12+ from concurrent . futures import ThreadPoolExecutor , Future
1313
1414import numpy as np
1515import pandas as pd
@@ -515,11 +515,26 @@ def get_parent(self, index: Union[int, str, UUID]) -> Union[UUID, None]:
515515 return r ["uuid" ]
516516
517517
518- class DummyProcess :
518+ class Waitable (Protocol ):
519+ """An object that we can call "wait" on"""
520+ def wait (self ) -> int : ...
521+
522+
523+ class DummyProcess (Waitable ):
519524 """Dummy process for local backend"""
520525
521- def wait (self ):
522- pass
526+ def wait (self ) -> int :
527+ return 0
528+
529+
530+ class WaitableFuture (Waitable ):
531+ """Adaptor for future returned from Executor.submit"""
532+ def __init__ (self , future : Future [None ]):
533+ self .future = future
534+
535+ def wait (self ) -> int :
536+ self .future .result ()
537+ return 0
523538
524539
525540@pd .api .extensions .register_series_accessor ("caiman" )
@@ -530,7 +545,7 @@ class CaimanSeriesExtensions:
530545
531546 def __init__ (self , s : pd .Series ):
532547 self ._series = s
533- self .process : Popen = None
548+ self .process : Optional [ Waitable ] = None
534549
535550 def _run_local (
536551 self ,
@@ -540,9 +555,15 @@ def _run_local(
540555 data_path : Union [Path , None ],
541556 dview = None
542557 ) -> DummyProcess :
543- coroutine = self ._run_local_async (algo , batch_path , uuid , data_path , dview )
544- asyncio .run (coroutine )
545- return DummyProcess ()
558+ algo_module = getattr (algorithms , algo )
559+ algo_module .run_algo (
560+ batch_path = str (batch_path ),
561+ uuid = str (uuid ),
562+ data_path = str (data_path ),
563+ dview = dview
564+ )
565+ self .process = DummyProcess ()
566+ return self .process
546567
547568 def _run_local_async (
548569 self ,
@@ -551,11 +572,18 @@ def _run_local_async(
551572 uuid : UUID ,
552573 data_path : Union [Path , None ],
553574 dview = None
554- ) -> Coroutine :
575+ ) -> WaitableFuture :
555576 algo_module = getattr (algorithms , algo )
556- return algo_module .run_algo_async (
557- batch_path = str (batch_path ), uuid = str (uuid ), data_path = str (data_path ), dview = dview
558- )
577+ with ThreadPoolExecutor (max_workers = 1 ) as executor :
578+ future = executor .submit (
579+ algo_module .run_algo ,
580+ batch_path = str (batch_path ),
581+ uuid = str (uuid ),
582+ data_path = str (data_path ),
583+ dview = dview
584+ )
585+ self .process = WaitableFuture (future )
586+ return self .process
559587
560588 def _run_subprocess (self , runfile_path : str , wait : bool , ** kwargs ):
561589
0 commit comments