Skip to content

Commit f7ba752

Browse files
authored
Merge pull request #72 from digital-asset/python-startup-fixes
python: Fix a bug that caused ready() calls to perpetually hang under certain circumstances
2 parents baa9eb9 + 7056587 commit f7ba752

File tree

18 files changed

+704
-608
lines changed

18 files changed

+704
-608
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
6.6.4
1+
6.7.0

python/dazl/client/_network_client_impl.py

Lines changed: 19 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,16 @@
2020
from ._base_model import IfMissingPartyBehavior, CREATE_IF_MISSING, NONE_IF_MISSING, \
2121
EXCEPTION_IF_MISSING
2222
from ._party_client_impl import _PartyClientImpl
23-
from ._run_level import RunState
2423
from .bots import Bot, BotCollection
25-
from .config import AnonymousNetworkConfig, NetworkConfig, URLConfig, \
26-
DEFAULT_CONNECT_TIMEOUT_SECONDS
24+
from .config import AnonymousNetworkConfig, NetworkConfig, URLConfig
2725
from ..metrics import MetricEvents
28-
from ..model.core import Party, RunLevel, DazlPartyMissingError
26+
from ..model.core import Party, DazlPartyMissingError
2927
from ..model.ledger import LedgerMetadata
3028
from ..model.network import connection_settings
3129
from ..model.reading import InitEvent, ReadyEvent, BaseEvent
3230
from ..protocols import LedgerNetwork
3331
from ..protocols.autodetect import AutodetectLedgerNetwork
34-
from ..util.asyncio_util import execute_in_loop, Invoker, safe_create_future
32+
from ..scheduler import Invoker, RunLevel
3533
from ..util.dar import get_dar_package_ids
3634
from ..util.prim_types import to_timedelta, TimeDeltaConvertible
3735

@@ -41,16 +39,15 @@
4139
class _NetworkImpl:
4240

4341
__slots__ = ('_lock', '_main_thread', 'invoker', '_party_clients', '_global_impls',
44-
'_party_impls', '_run_state', 'bots', '_config', '_pool', '_pool_init',
42+
'_party_impls', 'bots', '_config', '_pool', '_pool_init',
4543
'_cached_metadata', '_metrics')
4644

4745
def __init__(self, metrics: 'Optional[MetricEvents]' = None):
4846
self.invoker = Invoker()
4947
self._lock = RLock()
5048
self._main_thread = None # type: Optional[Thread]
51-
self._run_state = None # type: Optional[RunState]
5249
self._pool = None # type: Optional[LedgerNetwork]
53-
self._pool_init = safe_create_future()
50+
self._pool_init = self.invoker.create_future()
5451
if metrics is None:
5552
# create a default set of metrics
5653
try:
@@ -71,27 +68,6 @@ def __init__(self, metrics: 'Optional[MetricEvents]' = None):
7168
self._config = dict()
7269
self.bots = BotCollection(None)
7370

74-
@overload
75-
def run_in_loop_threadsafe(self,
76-
cb: Callable[[], Union[None, Awaitable[None]]],
77-
timeout: float = 30) -> None: ...
78-
79-
@overload
80-
def run_in_loop_threadsafe(self,
81-
cb: Callable[[], Union[Awaitable[T], T]],
82-
timeout: float = 30) -> T: ...
83-
84-
def run_in_loop_threadsafe(self, cb, timeout=30):
85-
"""
86-
Schedule a callback to be run on the event loop. This can either be a normal function or a
87-
coroutine.
88-
89-
:param cb: The callback to invoke.
90-
:param timeout: The timeout, in seconds, to abort.
91-
:return: The returned value from the function or coroutine.
92-
"""
93-
return self.invoker.run_in_loop(cb, timeout=timeout)
94-
9571
def set_config(self, *configs: 'Union[NetworkConfig, AnonymousNetworkConfig]', **kwargs):
9672
for config in configs:
9773
self._config.update({k: v for k, v in asdict(config).items() if v is not None})
@@ -111,25 +87,18 @@ def resolved_config(self) -> 'NetworkConfig':
11187
def resolved_anonymous_config(self) -> 'AnonymousNetworkConfig':
11288
return AnonymousNetworkConfig.parse_kwargs(**self._config)
11389

114-
async def aio_run(self, *coroutines, run_state: Optional[RunState] = None) -> None:
90+
async def aio_run(self, *coroutines) -> None:
11591
"""
11692
Coroutine where all network activity is scheduled from.
11793
11894
:param coroutines:
11995
Optional additional coroutines to run. ``aio_run`` is only considered complete once all
12096
of the additional coroutines are also finished.
121-
:param run_state:
122-
:class:`RunState` that specifies when the loop is to be terminated. If ``None`` is
123-
supplied, a default :class:`RunState` that runs until explicitly stopped is assigned.
12497
"""
12598
from ..metrics.instrumenters import AioLoopPerfMonitor
12699
from ..protocols import LedgerConnectionOptions
127100

128-
if run_state is None:
129-
run_state = RunState(RunLevel.RUN_FOREVER)
130-
131101
with self._lock:
132-
self._run_state = run_state
133102
self.invoker.set_context_as_current()
134103
config = self.resolved_config()
135104

@@ -157,21 +126,20 @@ async def aio_run(self, *coroutines, run_state: Optional[RunState] = None) -> No
157126

158127
options = LedgerConnectionOptions(connect_timeout=connect_timeout)
159128

160-
self._pool = pool = AutodetectLedgerNetwork(
161-
options=options, loop=self.invoker.get_loop(), executor=self.invoker.get_executor(),
162-
run_state=self._run_state)
129+
self._pool = pool = AutodetectLedgerNetwork(self.invoker, options)
163130
self._pool_init.set_result(pool)
164131

165132
try:
166-
runner = _NetworkRunner(pool, run_state, self, coroutines)
133+
runner = _NetworkRunner(pool, self, coroutines)
167134
await runner.run()
168135
finally:
169136
await self._pool.close()
170137
if site is not None:
171138
await site.stop()
139+
await self.invoker.shutdown(0)
172140
self._pool = None
173141

174-
def start(self, run_state: RunState, daemon: bool) -> None:
142+
def start(self, daemon: bool) -> None:
175143
"""
176144
Create a background thread, start an event loop, and run the application.
177145
"""
@@ -184,7 +152,7 @@ def background_main():
184152
loop = new_event_loop()
185153
set_event_loop(loop)
186154

187-
loop.run_until_complete(self.aio_run(run_state=run_state))
155+
loop.run_until_complete(self.aio_run())
188156
except:
189157
LOG.exception('The main event loop died!')
190158

@@ -207,11 +175,7 @@ def shutdown(self) -> None:
207175
"""
208176
with self._lock:
209177
LOG.info('Shutting down...')
210-
loop = self.invoker.get_loop()
211-
run_state = self._run_state
212-
if loop is None:
213-
raise RuntimeError('shutdown() called on a non-stopped Network')
214-
loop.call_soon_threadsafe(run_state.handle_sigint)
178+
self.invoker.handle_sigint()
215179

216180
def abort(self) -> None:
217181
"""
@@ -221,11 +185,7 @@ def abort(self) -> None:
221185
"""
222186
with self._lock:
223187
LOG.info('Aborting...')
224-
loop = self.invoker.get_loop()
225-
run_state = self._run_state
226-
if loop is None:
227-
raise RuntimeError('abort() called on a non-stopped Network')
228-
loop.call_soon_threadsafe(run_state.handle_sigquit)
188+
self.invoker.handle_sigquit()
229189

230190
def join(self, timeout=None):
231191
"""
@@ -241,10 +201,10 @@ def join(self, timeout=None):
241201

242202
def global_impl(self, ctor: 'Callable[[_NetworkImpl], T]') -> T:
243203
with self._lock:
244-
inst = self._global_impls.get(ctor)
204+
inst = self._global_impls.get(ctor) # type: ignore
245205
if inst is None:
246206
inst = ctor(self)
247-
self._global_impls[ctor] = inst
207+
self._global_impls[ctor] = inst # type: ignore
248208
return inst
249209

250210
@overload
@@ -342,7 +302,7 @@ def simple_metadata(self, timeout: 'TimeDeltaConvertible') -> LedgerMetadata:
342302
return self._cached_metadata
343303

344304
with self._lock:
345-
return execute_in_loop(self.invoker.get_loop(), self._pool.ledger, timeout=timeout)
305+
return self.invoker.run_in_loop(self._pool.ledger, timeout=timeout)
346306

347307
async def aio_metadata(self) -> LedgerMetadata:
348308
"""
@@ -417,9 +377,8 @@ async def connect_anonymous(self):
417377

418378

419379
class _NetworkRunner:
420-
def __init__(self, pool: LedgerNetwork, run_state: RunState, network_impl: '_NetworkImpl', user_coroutines):
380+
def __init__(self, pool: LedgerNetwork, network_impl: '_NetworkImpl', user_coroutines):
421381
self.pool = pool
422-
self.run_state = run_state
423382
self.initialized_parties = set() # type: Set[Party]
424383
self._network_impl = network_impl
425384
self._config = self._network_impl.resolved_config()
@@ -483,10 +442,10 @@ async def beat(self, party_impls: 'Collection[_PartyClientImpl]') -> Tuple[str,
483442
self._user_coroutines = [fut for fut in self._user_coroutines if not fut.done()]
484443

485444
# If there are no more commands in flight, there is no more activity
486-
if self.run_state.level >= RunLevel.TERMINATE_GRACEFULLY:
445+
if self._network_impl.invoker.level >= RunLevel.TERMINATE_GRACEFULLY:
487446
LOG.info('network_run terminating on user request...')
488447
return offset, False
489-
elif self.run_state.level >= RunLevel.RUN_UNTIL_IDLE:
448+
elif self._network_impl.invoker.level >= RunLevel.RUN_UNTIL_IDLE:
490449
if not self._read_completions and not self._user_coroutines:
491450
if all(pi.writer_idle() for pi in party_impls):
492451
LOG.info('network_run: terminating because all writers are idle and all '

python/dazl/client/_party_client_impl.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
InitEvent, ReadyEvent, ActiveContractSetEvent, PackagesAddedEvent
2727
from ..model.writing import CommandBuilder, CommandDefaults, CommandPayload, EventHandlerResponse
2828
from ..protocols import LedgerNetwork, LedgerClient
29-
from ..util.asyncio_util import ServiceQueue, await_then, completed, safe_create_future, \
30-
named_gather
29+
from ..util.asyncio_util import ServiceQueue, await_then, completed, named_gather
3130
from ..util.prim_natural import n_things
3231
from ..util.typing import safe_cast
3332

@@ -51,10 +50,10 @@ def __init__(self, parent: '_NetworkImpl', party: 'Party'):
5150
self._pool = None # type: Optional[LedgerNetwork]
5251
self._pool_fut = None # type: Optional[Awaitable[LedgerNetwork]]
5352
self._client_fut = None # type: Optional[Awaitable[LedgerClient]]
54-
self._ready_fut = safe_create_future()
53+
self._ready_fut = self.invoker.create_future()
5554
self._known_packages = set() # type: Set[str]
5655

57-
self._acs = ActiveContractSet()
56+
self._acs = ActiveContractSet(self.invoker)
5857
self.bots = BotCollection(party)
5958
self._reader = _PartyClientReaderState()
6059
self._writer = _PartyClientWriterState()

0 commit comments

Comments
 (0)