Skip to content

Commit 32b076f

Browse files
authored
python: Fix a bug that would cause long-running streams to be aborted due to idle timeout, even if the connection had active traffic. (#549)
1 parent 02a0221 commit 32b076f

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

python/dazl/ledger/grpc/_args_aio.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,16 @@ def grpc_kwargs(self) -> GrpcKwargs:
177177
kwargs["metadata"] = aio.Metadata(("authorization", f"Bearer {self.token}"))
178178
return kwargs
179179

180+
@property
181+
def grpc_kwargs_infinite_timeout(self) -> GrpcKwargs:
182+
"""
183+
Generate keyword arguments that should accompany a gRPC Ledger API call.
184+
"""
185+
kwargs = GrpcKwargs()
186+
if self.token:
187+
kwargs["metadata"] = aio.Metadata(("authorization", f"Bearer {self.token}"))
188+
return kwargs
189+
180190

181191
class Cache:
182192
pass

python/dazl/ledger/grpc/conn_aio.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,19 +1293,20 @@ async def _acs_events(
12931293
) -> AsyncIterable[CreateEvent | Boundary]:
12941294
stub = self._call.grpc_stub(lapipb.ActiveContractsServiceStub)
12951295
request = lapipb.GetActiveContractsRequest(ledger_id=self._call.ledger_id, filter=filter_pb)
1296-
self._response_stream = response_stream = stub.GetActiveContracts(
1297-
request,
1298-
**self._call.grpc_kwargs,
1299-
)
1300-
1301-
offset = None
13021296

13031297
# Unidirectional gRPC streams cannot sensibly have a deadline because the stream may be
13041298
# open indefinitely. However, if fetching an individual message from the stream takes a
13051299
# long time here, we can reasonably assume that the stream is dead, because Active
13061300
# Contract Set messages are really supposed to be sent as quickly as the server can send
13071301
# them. In other words, a long timeout pause in the middle of pulling down ACS messages
13081302
# would be highly unusual, so we treat them as fatal
1303+
self._response_stream = response_stream = stub.GetActiveContracts(
1304+
request,
1305+
**self._call.grpc_kwargs_infinite_timeout,
1306+
)
1307+
1308+
offset = None
1309+
13091310
i = response_stream.__aiter__()
13101311
while True:
13111312
try:
@@ -1347,7 +1348,7 @@ async def _tx_events(
13471348

13481349
self._response_stream = response_stream = stub.GetTransactions(
13491350
request,
1350-
**self._call.grpc_kwargs,
1351+
**self._call.grpc_kwargs_infinite_timeout,
13511352
)
13521353
i = response_stream.__aiter__()
13531354

0 commit comments

Comments
 (0)