Skip to content

Commit e36b932

Browse files
authored
Merge pull request #294 from digital-asset/python-grpc-offsets
python: Expose starting offsets for query and streams (gRPC only)
2 parents d4b1df6 + 0488404 commit e36b932

File tree

6 files changed

+155
-59
lines changed

6 files changed

+155
-59
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
7.5.12
1+
7.5.13

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
[tool.poetry]
55
name = "dazl"
6-
version = "7.5.12"
6+
version = "7.5.13"
77
description = "high-level Ledger API client for Daml ledgers"
88
license = "Apache-2.0"
99
authors = ["Davin K. Tanabe <davin.tanabe@digitalasset.com>"]

python/dazl/ledger/_offsets.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
__all__ = [
1212
"LedgerOffsetRange",
13+
"End",
14+
"END",
1315
"UNTIL_END",
1416
"FROM_BEGINNING_UNTIL_FOREVER",
1517
"from_offset_until_forever",
@@ -21,6 +23,8 @@ class End:
2123
Marker object that denotes the current end of the ledger.
2224
"""
2325

26+
__slots__ = ()
27+
2428
def __hash__(self):
2529
return 0
2630

@@ -39,7 +43,7 @@ class LedgerOffsetRange:
3943
so this class actually represents the commonality between these two interfaces.
4044
"""
4145

42-
def __init__(self, __begin: "Union[None, str]", __end: "Union[None, End]"):
46+
def __init__(self, __begin: "Union[None, str]", __end: "Union[None, str, End]"):
4347
"""
4448
Initialize a :class:`LedgerOffsetRange`.
4549
@@ -48,9 +52,11 @@ def __init__(self, __begin: "Union[None, str]", __end: "Union[None, End]"):
4852
Otherwise, must be a legal ledger offset.
4953
:param __end:
5054
The end of the stream. If ``None``, then keep reading from the stream forever; if
51-
``END``, then terminate when reaching the _current_ end of stream. Note that offsets
52-
are *not* allowed here, as the HTTP JSON API does not provide a mechanism for reading
53-
*to* a specific transaction offset.
55+
``END``, then terminate when reaching the _current_ end of stream.
56+
57+
Note that offsets are only allowed here on the gRPC Ledger API; they are *not*
58+
allowed here on the HTTP JSON API does not provide a mechanism for reading *to* a
59+
specific transaction offset.
5460
"""
5561
self.begin = __begin
5662
self.end = __end
@@ -65,6 +71,9 @@ def __eq__(self, other: Any) -> bool:
6571
def __hash__(self):
6672
return hash(self.begin) ^ hash(self.end)
6773

74+
def __repr__(self):
75+
return f"({self.begin}, {self.end})"
76+
6877

6978
UNTIL_END = LedgerOffsetRange(None, END)
7079
FROM_BEGINNING_UNTIL_FOREVER = LedgerOffsetRange(None, None)

python/dazl/ledger/grpc/codec_aio.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from ...prim import ContractData, ContractId, Party
4545
from ...values import Context
4646
from ...values.protobuf import ProtobufDecoder, ProtobufEncoder, set_value
47+
from .._offsets import END, End
4748
from ..aio import PackageLoader
4849
from ..pkgcache import SHARED_PACKAGE_DATABASE
4950

@@ -217,11 +218,22 @@ def encode_identifier(name: TypeConName) -> lapipb.Identifier:
217218

218219
@staticmethod
219220
def encode_begin_offset(offset: Optional[str]) -> lapipb.LedgerOffset:
220-
return (
221-
lapipb.LedgerOffset(absolute=offset)
222-
if offset is not None
223-
else lapipb.LedgerOffset(boundary=0)
224-
)
221+
if offset is None:
222+
return lapipb.LedgerOffset(boundary=0)
223+
else:
224+
return lapipb.LedgerOffset(absolute=offset)
225+
226+
@staticmethod
227+
def encode_end_offset(offset: Union[str, None, End]) -> Optional[lapipb.LedgerOffset]:
228+
if offset is None:
229+
# there is no ending offset (the stream will never naturally terminate)
230+
return None
231+
elif isinstance(offset, End):
232+
# the offset goes up until the current end of the ledger
233+
return lapipb.LedgerOffset(boundary=1)
234+
else:
235+
# the offset is absolute
236+
return lapipb.LedgerOffset(absolute=offset)
225237

226238
async def decode_created_event(self, event: lapipb.CreatedEvent) -> CreateEvent:
227239
cid = self.decode_contract_id(event)

python/dazl/ledger/grpc/conn_aio.py

Lines changed: 71 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313
from grpc.aio import Channel, UnaryStreamCall
1414

1515
from .. import aio
16+
from ... import LOG
1617
from ..._gen.com.daml.ledger.api import v1 as lapipb
1718
from ..._gen.com.daml.ledger.api.v1 import admin as lapiadminpb
1819
from ...damlast.daml_lf_1 import PackageRef, TypeConName
1920
from ...damlast.util import is_match
2021
from ...prim import LEDGER_STRING_REGEX, ContractData, ContractId, Party
2122
from ...query import Filter, Queries, Query, parse_query
22-
from .._offsets import UNTIL_END, LedgerOffsetRange, from_offset_until_forever
23+
from .._offsets import END, UNTIL_END, End, LedgerOffsetRange, from_offset_until_forever
2324
from ..api_types import ArchiveEvent, Boundary, Command, CreateEvent, ExerciseResponse, PartyInfo
2425
from ..config import Config
2526
from ..config.access import PropertyBasedAccessConfig
@@ -455,7 +456,12 @@ def _submit_and_wait_request(
455456
# region Read API
456457

457458
def query(
458-
self, __template_id: Union[str, TypeConName] = "*", __query: Query = None
459+
self,
460+
__template_id: Union[str, TypeConName] = "*",
461+
__query: Query = None,
462+
*,
463+
begin_offset: Optional[str] = None,
464+
end_offset: Optional[str] = None,
459465
) -> "QueryStream":
460466
"""
461467
Return the create events from the active contract set service as a stream.
@@ -468,12 +474,25 @@ def query(
468474
The name of the template for which to fetch contracts.
469475
:param __query:
470476
A filter to apply to the set of returned contracts.
471-
"""
477+
:param begin_offset:
478+
The starting offset at which to read an active contract set. If ``None``, contracts
479+
are read from the beginning, and using the Active Contract Set Service instead of the
480+
Transaction Service.
481+
:param end_offset:
482+
The ending offset. If ``None``, contracts are read until the end of the stream.
483+
In order to read indefinitely, use :meth:`stream` instead.
484+
"""
485+
offset = LedgerOffsetRange(begin_offset, end_offset if end_offset is not None else END)
472486
return QueryStream(
473-
self, parse_query({__template_id: __query}, server_side_filters=False), UNTIL_END
487+
self, parse_query({__template_id: __query}, server_side_filters=False), offset
474488
)
475489

476-
def query_many(self, *queries: Queries) -> "QueryStream":
490+
def query_many(
491+
self,
492+
*queries: Queries,
493+
begin_offset: Optional[str] = None,
494+
end_offset: Optional[str] = None,
495+
) -> "QueryStream":
477496
"""
478497
Return the create events from the active contract set service as a stream.
479498
@@ -483,8 +502,16 @@ def query_many(self, *queries: Queries) -> "QueryStream":
483502
484503
:param queries:
485504
A map of template IDs to filter to apply to the set of returned contracts.
505+
:param begin_offset:
506+
The starting offset at which to read an active contract set. If ``None``, contracts
507+
are read from the beginning, and using the Active Contract Set Service instead of the
508+
Transaction Service.
509+
:param end_offset:
510+
The ending offset. If ``None``, contracts are read until the end of the stream.
511+
In order to read indefinitely, use :meth:`stream_many` instead.
486512
"""
487-
return QueryStream(self, parse_query(*queries, server_side_filters=False), UNTIL_END)
513+
offset = LedgerOffsetRange(begin_offset, end_offset if end_offset is not None else END)
514+
return QueryStream(self, parse_query(*queries, server_side_filters=False), offset)
488515

489516
def stream(
490517
self,
@@ -643,8 +670,10 @@ async def items(self):
643670
filters_by_party = {party: filters for party in self.conn.config.access.read_as}
644671
tx_filter_pb = lapipb.TransactionFilter(filters_by_party=filters_by_party)
645672

646-
offset = None
647-
if self._offset_range.begin is None:
673+
offset = self._offset_range.begin
674+
if offset:
675+
log.debug("Skipped reading from the ACS because begin offset is %r", offset)
676+
else:
648677
# when starting from the beginning of the ledger, the Active Contract Set service
649678
# lets us catch up more quickly than having to parse every create/archive event
650679
# ourselves
@@ -658,31 +687,30 @@ async def items(self):
658687
else:
659688
warnings.warn(f"Received an unknown event: {event}", ProtocolWarning)
660689
yield event
661-
else:
662-
log.debug(
663-
"Skipped reading from the ACS because begin offset is %r",
664-
self._offset_range.begin,
665-
)
666690

667-
if self._offset_range != UNTIL_END:
668-
# now start returning events as they come off the transaction stream; note this
669-
# stream will never naturally close, so it's on the caller to call close() or to
670-
# otherwise exit our current context
671-
log.debug("Reading a transaction stream: %s", self._offset_range)
672-
async for event in self._tx_events(tx_filter_pb, offset):
673-
if isinstance(event, CreateEvent):
674-
await self._emit_create(event)
675-
elif isinstance(event, ArchiveEvent):
676-
await self._emit_archive(event)
677-
elif isinstance(event, Boundary):
678-
await self._emit_boundary(event)
679-
else:
680-
warnings.warn(f"Received an unknown event: {event}", ProtocolWarning)
681-
yield event
682-
else:
683-
log.debug(
684-
"Not reading from transaction stream because we were only asked for a snapshot."
685-
)
691+
# when reading from the Active Contract Set service, if we're supposed to stop
692+
# at "the end", then the Active Contract Set data is all that we'll return
693+
if self._offset_range.end == END:
694+
log.debug(
695+
"Not reading from transaction stream because we were only asked for a snapshot."
696+
)
697+
return
698+
699+
# now start returning events as they come off the transaction stream; note this
700+
# stream will never naturally close, so it's on the caller to call close() or to
701+
# otherwise exit our current context
702+
log.debug("Reading a transaction stream: %s", self._offset_range)
703+
async for event in self._tx_events(tx_filter_pb, offset, self._offset_range.end):
704+
log.debug("Received an event: %s", event)
705+
if isinstance(event, CreateEvent):
706+
await self._emit_create(event)
707+
elif isinstance(event, ArchiveEvent):
708+
await self._emit_archive(event)
709+
elif isinstance(event, Boundary):
710+
await self._emit_boundary(event)
711+
else:
712+
warnings.warn(f"Received an unknown event: {event}", ProtocolWarning)
713+
yield event
686714

687715
async def _acs_events(
688716
self, filter_pb: lapipb.TransactionFilter
@@ -696,23 +724,33 @@ async def _acs_events(
696724

697725
offset = None
698726
async for response in response_stream:
727+
LOG.debug(
728+
"ACS start (offset %r, %d event(s))",
729+
response.offset,
730+
len(response.active_contracts),
731+
)
699732
for event in response.active_contracts:
700733
c_evt = await self.conn.codec.decode_created_event(event)
701734
if self._is_match(c_evt):
702735
yield c_evt
703736
# for ActiveContractSetResponse messages, only the last offset is actually relevant
737+
LOG.debug("ACS end (offset %r)", response.offset)
704738
offset = response.offset
705739
yield Boundary(offset)
706740

707741
async def _tx_events(
708-
self, filter_pb: lapipb.TransactionFilter, begin_offset: Optional[str]
742+
self,
743+
filter_pb: lapipb.TransactionFilter,
744+
begin_offset: Optional[str],
745+
end_offset: "Union[None, str, End]",
709746
) -> AsyncIterable[Union[CreateEvent, ArchiveEvent, Boundary]]:
710747
stub = lapipb.TransactionServiceStub(self.conn.channel)
711748

712749
request = lapipb.GetTransactionsRequest(
713750
ledger_id=self.conn.config.access.ledger_id,
714751
filter=filter_pb,
715752
begin=self.conn.codec.encode_begin_offset(begin_offset),
753+
end=self.conn.codec.encode_end_offset(end_offset),
716754
)
717755

718756
self._response_stream = response_stream = stub.GetTransactions(request)
Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,88 @@
11
# Copyright (c) 2017-2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from asyncio import gather
4+
from asyncio import gather, wait_for
5+
import logging
56

67
import dazl
8+
from dazl.ledger import Boundary, CreateEvent
9+
from dazl.ledger.aio import Connection
10+
from dazl.ledger.grpc import Connection as GrpcConnection
11+
from dazl.prim import ContractData, Party
712
import pytest
813
from tests.unit import dars
914

15+
TEMPLATE = "Simple:OperatorNotification"
16+
17+
18+
def payload(operator: Party, text: str) -> ContractData:
19+
return {"operator": operator, "theObservers": [], "text": text}
20+
1021

1122
@pytest.mark.asyncio
12-
async def test_query_no_filter(sandbox):
23+
async def test_query_no_filter(sandbox) -> None:
1324
async with dazl.connect(url=sandbox, admin=True) as conn:
1425
party_info, _ = await gather(
1526
conn.allocate_party(), conn.upload_package(dars.Simple.read_bytes())
1627
)
1728

1829
async with dazl.connect(url=sandbox, act_as=party_info.party) as conn:
19-
for text in ["Red", "Red", "Green", "Blue", "Blue", "Blue"]:
20-
await conn.create(
21-
"Simple:OperatorNotification",
22-
{"operator": party_info.party, "theObservers": [], "text": text},
23-
)
30+
texts = ["Red", "Red", "Green", "Blue", "Blue", "Blue"]
31+
for text in texts:
32+
await conn.create(TEMPLATE, payload(party_info.party, text))
2433

25-
async with conn.query("Simple:OperatorNotification") as stream:
26-
events = []
27-
async for event in stream.creates():
28-
events.append(event)
34+
# it could be possible for the creates to succeed, but not yet be reflected on the
35+
# read side; we'll give it as much as 30 seconds before we fail this test
36+
boundary = await wait_for(wait_for_n_notifications(conn, len(texts)), timeout=30)
37+
logging.info("Created %s contracts, and our offset is now %s.", len(texts), boundary.offset)
2938

30-
assert len(events) == 6
39+
await conn.create(TEMPLATE, payload(party_info.party, "Yellow"))
3140

32-
async with conn.query("Simple:OperatorNotification", {"text": "Red"}) as stream:
41+
async with conn.query(TEMPLATE, {"text": "Red"}) as stream:
3342
events = []
3443
async for event in stream.creates():
3544
events.append(event)
3645

3746
assert len(events) == 2
3847

39-
async with conn.query("Simple:OperatorNotification", {"text": "Green"}) as stream:
48+
async with conn.query(TEMPLATE, {"text": "Green"}) as stream:
4049
events = []
4150
async for event in stream.creates():
4251
events.append(event)
4352

4453
assert len(events) == 1
4554

46-
async with conn.query("Simple:OperatorNotification", {"text": "Blue"}) as stream:
55+
async with conn.query(TEMPLATE, {"text": "Blue"}) as stream:
4756
events = []
4857
async for event in stream.creates():
4958
events.append(event)
5059

5160
assert len(events) == 3
61+
62+
logging.info("Making sure that our last event made it in...")
63+
final_offset = await wait_for(wait_for_n_notifications(conn, len(texts) + 1), timeout=30)
64+
65+
logging.info("Yep it's there, and the final offset is %s", final_offset.offset)
66+
67+
# offset-based queries only work over the gRPC API
68+
assert isinstance(conn, GrpcConnection)
69+
70+
async with conn.query(TEMPLATE, begin_offset=boundary.offset) as stream:
71+
async for event in stream.creates():
72+
# only one create is supposed to come after this part in the stream, and that is
73+
# "Yellow"
74+
assert event.payload["text"] == "Yellow"
75+
76+
77+
async def wait_for_n_notifications(conn: Connection, n: int) -> Boundary:
78+
async with conn.stream("Simple:OperatorNotification") as stream:
79+
event_count = 0
80+
async for obj in stream:
81+
if isinstance(obj, CreateEvent):
82+
event_count += 1
83+
elif isinstance(obj, Boundary):
84+
# return the first Boundary object we see after the expected number of CreateEvents
85+
if event_count >= n:
86+
return obj
87+
88+
raise Exception("unexpected end of stream")

0 commit comments

Comments
 (0)