Skip to content

Commit 46dae11

Browse files
authored
Implement PreferRepeatableRead, apply TransactionOptions to config state (#609)
Currently it is enabled by setting default_transaction_isolation or isolation in TransactionOptions to the enum value PreferRepeatableRead. If that is set, the parse and execute paths will strip it from the state that is sent, and then potentially inject RepeatableRead as the value when doing execute, if it is safe.
1 parent 553e3ce commit 46dae11

8 files changed

+271
-8
lines changed

gel/abstract.py

+22
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class QueryContext(typing.NamedTuple):
6969
state: typing.Optional[options.State]
7070
warning_handler: options.WarningHandler
7171
annotations: typing.Dict[str, str]
72+
transaction_options: typing.Optional[options.TransactionOptions]
7273

7374
def lower(
7475
self, *, allow_capabilities: enums.Capability
@@ -86,6 +87,7 @@ def lower(
8687
allow_capabilities=allow_capabilities,
8788
state=self.state.as_dict() if self.state else None,
8889
annotations=self.annotations,
90+
transaction_options=self.transaction_options,
8991
)
9092

9193

@@ -96,6 +98,7 @@ class ExecuteContext(typing.NamedTuple):
9698
state: typing.Optional[options.State]
9799
warning_handler: options.WarningHandler
98100
annotations: typing.Dict[str, str]
101+
transaction_options: typing.Optional[options.TransactionOptions]
99102

100103
def lower(
101104
self, *, allow_capabilities: enums.Capability
@@ -111,6 +114,7 @@ def lower(
111114
allow_capabilities=allow_capabilities,
112115
state=self.state.as_dict() if self.state else None,
113116
annotations=self.annotations,
117+
transaction_options=self.transaction_options,
114118
)
115119

116120

@@ -220,6 +224,7 @@ def query(self, query: str, *args, **kwargs) -> list:
220224
query_options=_query_opts,
221225
retry_options=self._get_retry_options(),
222226
state=self._get_state(),
227+
transaction_options=self._get_active_tx_options(),
223228
warning_handler=self._get_warning_handler(),
224229
annotations=self._get_annotations(),
225230
))
@@ -233,6 +238,7 @@ def query_single(
233238
query_options=_query_single_opts,
234239
retry_options=self._get_retry_options(),
235240
state=self._get_state(),
241+
transaction_options=self._get_active_tx_options(),
236242
warning_handler=self._get_warning_handler(),
237243
annotations=self._get_annotations(),
238244
))
@@ -244,6 +250,7 @@ def query_required_single(self, query: str, *args, **kwargs) -> typing.Any:
244250
query_options=_query_required_single_opts,
245251
retry_options=self._get_retry_options(),
246252
state=self._get_state(),
253+
transaction_options=self._get_active_tx_options(),
247254
warning_handler=self._get_warning_handler(),
248255
annotations=self._get_annotations(),
249256
))
@@ -255,6 +262,7 @@ def query_json(self, query: str, *args, **kwargs) -> str:
255262
query_options=_query_json_opts,
256263
retry_options=self._get_retry_options(),
257264
state=self._get_state(),
265+
transaction_options=self._get_active_tx_options(),
258266
warning_handler=self._get_warning_handler(),
259267
annotations=self._get_annotations(),
260268
))
@@ -266,6 +274,7 @@ def query_single_json(self, query: str, *args, **kwargs) -> str:
266274
query_options=_query_single_json_opts,
267275
retry_options=self._get_retry_options(),
268276
state=self._get_state(),
277+
transaction_options=self._get_active_tx_options(),
269278
warning_handler=self._get_warning_handler(),
270279
annotations=self._get_annotations(),
271280
))
@@ -277,6 +286,7 @@ def query_required_single_json(self, query: str, *args, **kwargs) -> str:
277286
query_options=_query_required_single_json_opts,
278287
retry_options=self._get_retry_options(),
279288
state=self._get_state(),
289+
transaction_options=self._get_active_tx_options(),
280290
warning_handler=self._get_warning_handler(),
281291
annotations=self._get_annotations(),
282292
))
@@ -293,6 +303,7 @@ def query_sql(self, query: str, *args, **kwargs) -> list[datatypes.Record]:
293303
query_options=_query_opts,
294304
retry_options=self._get_retry_options(),
295305
state=self._get_state(),
306+
transaction_options=self._get_active_tx_options(),
296307
warning_handler=self._get_warning_handler(),
297308
annotations=self._get_annotations(),
298309
))
@@ -307,6 +318,7 @@ def execute(self, commands: str, *args, **kwargs) -> None:
307318
cache=self._get_query_cache(),
308319
retry_options=self._get_retry_options(),
309320
state=self._get_state(),
321+
transaction_options=self._get_active_tx_options(),
310322
warning_handler=self._get_warning_handler(),
311323
annotations=self._get_annotations(),
312324
))
@@ -322,6 +334,7 @@ def execute_sql(self, commands: str, *args, **kwargs) -> None:
322334
cache=self._get_query_cache(),
323335
retry_options=self._get_retry_options(),
324336
state=self._get_state(),
337+
transaction_options=self._get_active_tx_options(),
325338
warning_handler=self._get_warning_handler(),
326339
annotations=self._get_annotations(),
327340
))
@@ -349,6 +362,7 @@ async def query(self, query: str, *args, **kwargs) -> list:
349362
query_options=_query_opts,
350363
retry_options=self._get_retry_options(),
351364
state=self._get_state(),
365+
transaction_options=self._get_active_tx_options(),
352366
warning_handler=self._get_warning_handler(),
353367
annotations=self._get_annotations(),
354368
))
@@ -360,6 +374,7 @@ async def query_single(self, query: str, *args, **kwargs) -> typing.Any:
360374
query_options=_query_single_opts,
361375
retry_options=self._get_retry_options(),
362376
state=self._get_state(),
377+
transaction_options=self._get_active_tx_options(),
363378
warning_handler=self._get_warning_handler(),
364379
annotations=self._get_annotations(),
365380
))
@@ -376,6 +391,7 @@ async def query_required_single(
376391
query_options=_query_required_single_opts,
377392
retry_options=self._get_retry_options(),
378393
state=self._get_state(),
394+
transaction_options=self._get_active_tx_options(),
379395
warning_handler=self._get_warning_handler(),
380396
annotations=self._get_annotations(),
381397
))
@@ -387,6 +403,7 @@ async def query_json(self, query: str, *args, **kwargs) -> str:
387403
query_options=_query_json_opts,
388404
retry_options=self._get_retry_options(),
389405
state=self._get_state(),
406+
transaction_options=self._get_active_tx_options(),
390407
warning_handler=self._get_warning_handler(),
391408
annotations=self._get_annotations(),
392409
))
@@ -398,6 +415,7 @@ async def query_single_json(self, query: str, *args, **kwargs) -> str:
398415
query_options=_query_single_json_opts,
399416
retry_options=self._get_retry_options(),
400417
state=self._get_state(),
418+
transaction_options=self._get_active_tx_options(),
401419
warning_handler=self._get_warning_handler(),
402420
annotations=self._get_annotations(),
403421
))
@@ -414,6 +432,7 @@ async def query_required_single_json(
414432
query_options=_query_required_single_json_opts,
415433
retry_options=self._get_retry_options(),
416434
state=self._get_state(),
435+
transaction_options=self._get_active_tx_options(),
417436
warning_handler=self._get_warning_handler(),
418437
annotations=self._get_annotations(),
419438
))
@@ -430,6 +449,7 @@ async def query_sql(self, query: str, *args, **kwargs) -> typing.Any:
430449
query_options=_query_opts,
431450
retry_options=self._get_retry_options(),
432451
state=self._get_state(),
452+
transaction_options=self._get_active_tx_options(),
433453
warning_handler=self._get_warning_handler(),
434454
annotations=self._get_annotations(),
435455
))
@@ -444,6 +464,7 @@ async def execute(self, commands: str, *args, **kwargs) -> None:
444464
cache=self._get_query_cache(),
445465
retry_options=self._get_retry_options(),
446466
state=self._get_state(),
467+
transaction_options=self._get_active_tx_options(),
447468
warning_handler=self._get_warning_handler(),
448469
annotations=self._get_annotations(),
449470
))
@@ -459,6 +480,7 @@ async def execute_sql(self, commands: str, *args, **kwargs) -> None:
459480
cache=self._get_query_cache(),
460481
retry_options=self._get_retry_options(),
461482
state=self._get_state(),
483+
transaction_options=self._get_active_tx_options(),
462484
warning_handler=self._get_warning_handler(),
463485
annotations=self._get_annotations(),
464486
))

gel/base_client.py

+10
Original file line numberDiff line numberDiff line change
@@ -701,8 +701,18 @@ def _get_query_cache(self) -> abstract.QueryCache:
701701
)
702702

703703
def _get_retry_options(self) -> typing.Optional[_options.RetryOptions]:
704+
# This is overloaded in transaction.py to return None, to prevent
705+
# retrying *inside* a transaction.
704706
return self._options.retry_options
705707

708+
def _get_active_tx_options(self) -> typing.Optional[
709+
_options.TransactionOptions
710+
]:
711+
# This is overloaded in transaction.py to return None, since
712+
# the tx options are applied at the *start* of transactions,
713+
# not inside them.
714+
return self._options.transaction_options
715+
706716
def _get_state(self) -> _options.State:
707717
return self._options.state
708718

gel/options.py

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class IsolationLevel:
5454
"""Isolation level for transaction"""
5555
Serializable = "Serializable"
5656
RepeatableRead = "RepeatableRead"
57+
PreferRepeatableRead = "PreferRepeatableRead"
5758

5859
@staticmethod
5960
def _to_start_tx_str(v):

gel/protocol/protocol.pxd

+3-1
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,15 @@ cdef class ExecuteContext:
9090
uint64_t allow_capabilities
9191
object state
9292
object annotations
93+
object tx_options
9394

9495
# Contextual variables
9596
readonly bytes cardinality
9697
readonly BaseCodec in_dc
9798
readonly BaseCodec out_dc
9899
readonly uint64_t capabilities
99100
readonly tuple warnings
101+
readonly tuple unsafe_isolation_dangers
100102

101103
cdef inline bint has_na_cardinality(self)
102104
cdef bint load_from_cache(self)
@@ -168,7 +170,7 @@ cdef class SansIOProtocol:
168170

169171
cdef ensure_connected(self)
170172

171-
cdef WriteBuffer encode_parse_params(self, ExecuteContext ctx)
173+
cdef WriteBuffer encode_parse_params(self, ExecuteContext ctx, dict state)
172174

173175

174176
include "protocol_v0.pxd"

0 commit comments

Comments
 (0)