Skip to content

Commit 95cb8a6

Browse files
committed
Implement caching of whether RepeatableRead works
1 parent 674b4eb commit 95cb8a6

File tree

6 files changed

+127
-20
lines changed

6 files changed

+127
-20
lines changed

gel/base_client.py

+13
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ class BasePoolImpl(abc.ABC):
415415
"_connect_args",
416416
"_codecs_registry",
417417
"_query_cache",
418+
"_tx_needs_serializable_cache",
418419
"_connection_factory",
419420
"_queue",
420421
"_user_max_concurrency",
@@ -444,6 +445,11 @@ def __init__(
444445
self._connect_args = connect_args
445446
self._codecs_registry = protocol.CodecsRegistry()
446447
self._query_cache = protocol.LRUMapping(maxsize=QUERY_CACHE_SIZE)
448+
# Whether a transaction() call from a particular source location
449+
# needs to use Serializable. See transaction.BaseRetry for details.
450+
self._tx_needs_serializable_cache = (
451+
protocol.LRUMapping(maxsize=QUERY_CACHE_SIZE)
452+
)
447453

448454
if max_concurrency is not None and max_concurrency <= 0:
449455
raise ValueError(
@@ -639,6 +645,13 @@ class BaseClient(abstract.BaseReadOnlyExecutor, _options._OptionsMixin):
639645
__slots__ = ("_impl", "_options")
640646
_impl_class = NotImplemented
641647

648+
# Number of stack frames that the Retry objects used by
649+
# transaction() need to look up to find their caller from for
650+
# caching purposes. We define this in a variable on BaseClient
651+
# basically to make it possible for a user to hackily override it
652+
# if they always have some wrapper they need to skip past...
653+
_TRANSACTION_FRAME_OFFSET = 2
654+
642655
def __init__(
643656
self,
644657
*,

gel/protocol/lru.pxd

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ cdef class LRUMapping:
2525
object _dict_move_to_end
2626
object _dict_get
2727

28-
cdef get(self, key, default)
28+
cpdef get(self, key, default)

gel/protocol/lru.pyx

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ cdef class LRUMapping:
5353
self._dict_get = self._dict.get
5454
self._maxsize = maxsize
5555

56-
cdef get(self, key, default):
56+
cpdef get(self, key, default):
5757
o = self._dict_get(key, _LRU_MARKER)
5858
if o is _LRU_MARKER:
5959
return default

gel/transaction.py

+16
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import typing
2121

2222
import enum
23+
import sys
2324

2425
from . import abstract
2526
from . import errors
@@ -242,13 +243,25 @@ def __init__(self, owner):
242243
self._done = False
243244
self._next_backoff = 0
244245
self._options = owner._options
246+
self._key = None
245247

246248
prefer_rr = (
247249
self._options.transaction_options._isolation
248250
== options.IsolationLevel.PreferRepeatableRead
249251
)
252+
if prefer_rr:
253+
owner = self._owner
254+
frame = sys._getframe(owner._TRANSACTION_FRAME_OFFSET)
255+
self._key = key = (frame.f_code.co_filename, frame.f_lineno)
256+
257+
# If we have seen this cache key before and it needed
258+
# Serializable, then do serializable.
259+
if owner._impl._tx_needs_serializable_cache.get(key, False):
260+
prefer_rr = False
261+
250262
self._optimistic_rr = prefer_rr
251263

264+
252265
def _retry(self, exc):
253266
self._last_exception = exc
254267
rule = self._options.retry_options.get_rule_for_exception(exc)
@@ -262,6 +275,9 @@ def _retry_rr_failure(self, exc):
262275
# Retry a failure due to REPEATABLE READ not working
263276
if not self._optimistic_rr:
264277
return False
278+
if self._key:
279+
self._owner._impl._tx_needs_serializable_cache[self._key] = True
280+
265281
self._optimistic_rr = False
266282
# Decrement _iteration count, since this one doesn't really count.
267283
self._iteration -= 1

tests/test_async_tx.py

+69-3
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,18 @@ class TestAsyncTx(tb.AsyncQueryTestCase):
3232
CREATE TYPE test::TransactionTest EXTENDING std::Object {
3333
CREATE PROPERTY name -> std::str;
3434
};
35-
'''
3635
37-
TEARDOWN = '''
38-
DROP TYPE test::TransactionTest;
36+
37+
CREATE TYPE test::Tmp {
38+
CREATE REQUIRED PROPERTY tmp -> std::str;
39+
};
40+
CREATE TYPE test::TmpConflict {
41+
CREATE REQUIRED PROPERTY tmp -> std::str {
42+
CREATE CONSTRAINT exclusive;
43+
}
44+
};
45+
46+
CREATE TYPE test::TmpConflictChild extending test::TmpConflict;
3947
'''
4048

4149
async def test_async_transaction_regular_01(self):
@@ -104,3 +112,61 @@ async def test_async_transaction_exclusive(self):
104112
):
105113
await asyncio.wait_for(f1, timeout=5)
106114
await asyncio.wait_for(f2, timeout=5)
115+
116+
117+
async def _try_bogus_rr_tx(self, con, first_try):
118+
# A transaction that needs to be serializable
119+
async for tx in con.transaction():
120+
async with tx:
121+
res1 = await tx.query_single('''
122+
select {
123+
ins := (insert test::Tmp { tmp := "test1" }),
124+
level := sys::get_transaction_isolation(),
125+
}
126+
''')
127+
# If this is the second time we've tried to run this
128+
# transaction, then the cache should ensure we *only*
129+
# try Serializable.
130+
if not first_try:
131+
self.assertEqual(res1.level, 'Serializable')
132+
133+
res2 = await tx.query_single('''
134+
select {
135+
ins := (insert test::TmpConflict {
136+
tmp := <str>random()
137+
}),
138+
level := sys::get_transaction_isolation(),
139+
}
140+
''')
141+
142+
# N.B: res1 will be RepeatableRead on the first
143+
# iteration, maybe, but contingent on the second query
144+
# succeeding it will be Serializable!
145+
self.assertEqual(res1.level, 'Serializable')
146+
self.assertEqual(res2.level, 'Serializable')
147+
148+
async def test_async_transaction_prefer_rr(self):
149+
if (
150+
str(self.server_version.stage) != 'dev'
151+
and (self.server_version.major, self.server_version.minor) < (6, 5)
152+
):
153+
self.skipTest("DML in RepeatableRead not supported yet")
154+
con = self.client.with_transaction_options(
155+
edgedb.TransactionOptions(
156+
isolation=edgedb.IsolationLevel.PreferRepeatableRead
157+
)
158+
)
159+
# A transaction that needs to be serializable
160+
await self._try_bogus_rr_tx(con, first_try=True)
161+
await self._try_bogus_rr_tx(con, first_try=False)
162+
163+
# And one that doesn't
164+
async for tx in con.transaction():
165+
async with tx:
166+
res = await tx.query_single('''
167+
select {
168+
ins := (insert test::Tmp { tmp := "test" }),
169+
level := sys::get_transaction_isolation(),
170+
}
171+
''')
172+
self.assertEqual(str(res.level), 'RepeatableRead')

tests/test_sync_tx.py

+27-15
Original file line numberDiff line numberDiff line change
@@ -108,39 +108,51 @@ async def test_sync_transaction_kinds(self):
108108
with tx:
109109
pass
110110

111-
def test_sync_transaction_prefer_rr(self):
112-
if (
113-
str(self.server_version.stage) != 'dev'
114-
and (self.server_version.major, self.server_version.minor) < (6, 5)
115-
):
116-
self.skipTest("DML in RepeatableRead not supported yet")
117-
con = self.client.with_transaction_options(
118-
edgedb.TransactionOptions(
119-
isolation=edgedb.IsolationLevel.PreferRepeatableRead
120-
)
121-
)
111+
def _try_bogus_rr_tx(self, con, first_try):
122112
# A transaction that needs to be serializable
123113
for tx in con.transaction():
124114
with tx:
125115
res1 = tx.query_single('''
126116
select {
127-
ins := (insert test::TmpConflict { tmp := "test1" }),
117+
ins := (insert test::Tmp { tmp := "test1" }),
128118
level := sys::get_transaction_isolation(),
129119
}
130120
''')
121+
# If this is the second time we've tried to run this
122+
# transaction, then the cache should ensure we *only*
123+
# try Serializable.
124+
if not first_try:
125+
self.assertEqual(res1.level, 'Serializable')
131126

132127
res2 = tx.query_single('''
133128
select {
134-
ins := (insert test::TmpConflict { tmp := "test2" }),
129+
ins := (insert test::TmpConflict {
130+
tmp := <str>random()
131+
}),
135132
level := sys::get_transaction_isolation(),
136133
}
137134
''')
138135

139136
# N.B: res1 will be RepeatableRead on the first
140137
# iteration, maybe, but contingent on the second query
141138
# succeeding it will be Serializable!
142-
self.assertEqual(str(res1.level), 'Serializable')
143-
self.assertEqual(str(res2.level), 'Serializable')
139+
self.assertEqual(res1.level, 'Serializable')
140+
self.assertEqual(res2.level, 'Serializable')
141+
142+
def test_sync_transaction_prefer_rr(self):
143+
if (
144+
str(self.server_version.stage) != 'dev'
145+
and (self.server_version.major, self.server_version.minor) < (6, 5)
146+
):
147+
self.skipTest("DML in RepeatableRead not supported yet")
148+
con = self.client.with_transaction_options(
149+
edgedb.TransactionOptions(
150+
isolation=edgedb.IsolationLevel.PreferRepeatableRead
151+
)
152+
)
153+
# A transaction that needs to be serializable
154+
self._try_bogus_rr_tx(con, first_try=True)
155+
self._try_bogus_rr_tx(con, first_try=False)
144156

145157
# And one that doesn't
146158
for tx in con.transaction():

0 commit comments

Comments
 (0)