Skip to content

Commit a2846f9

Browse files
authored
Merge pull request #14 from AnswerDotAI/nested-tx-claim
Fix: Preserve claim_one status update when transaction aborts
2 parents 4847a1b + dcd84d7 commit a2846f9

2 files changed

Lines changed: 84 additions & 9 deletions

File tree

fastasyncpg/core.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -734,12 +734,15 @@ async def claim_one(self:Table, status_col='status', pending='pending', complete
734734
p = _ClaimCtx()
735735
async with self.db.transaction() as txn:
736736
tbl = txn.t[self.name]
737-
p.db = txn
738737
p.evt = await tbl.claim(where=f'"{status_col}"=$1', where_args=[pending], order_by=order_by)
739-
try: yield p
740-
except Exception as e: p.failed, p.exc, p.tb = True, e, traceback.format_exc()
738+
p.db = txn
739+
async with txn.conn.transaction():
740+
try: yield p # create save point in case tx is aborted
741+
except Exception as e: p.failed, p.exc, p.tb = True, e, traceback.format_exc()
741742
if p.evt is None: return
742743
pk = self.pks[0]
743744
new = failed if p.failed else (None if p.retry else completed)
744745
stmt = f'UPDATE {self} SET "{status_col}"=$1 WHERE "{pk}"=$2'
745-
if new: await txn.execute(stmt, new, get_field(p.evt, pk))
746+
if new:
747+
await txn.execute(stmt, new, get_field(p.evt, pk))
748+
p.evt = await tbl.selectone(f'"{pk}"=$1', [get_field(p.evt, pk)])

nbs/00_core.ipynb

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5091,15 +5091,18 @@
50915091
" p = _ClaimCtx()\n",
50925092
" async with self.db.transaction() as txn:\n",
50935093
" tbl = txn.t[self.name]\n",
5094-
" p.db = txn\n",
50955094
" p.evt = await tbl.claim(where=f'\"{status_col}\"=$1', where_args=[pending], order_by=order_by)\n",
5096-
" try: yield p\n",
5097-
" except Exception as e: p.failed, p.exc, p.tb = True, e, traceback.format_exc()\n",
5095+
" p.db = txn\n",
5096+
" async with txn.conn.transaction():\n",
5097+
" try: yield p # create save point in case tx is aborted\n",
5098+
" except Exception as e: p.failed, p.exc, p.tb = True, e, traceback.format_exc()\n",
50985099
" if p.evt is None: return\n",
50995100
" pk = self.pks[0]\n",
51005101
" new = failed if p.failed else (None if p.retry else completed)\n",
51015102
" stmt = f'UPDATE {self} SET \"{status_col}\"=$1 WHERE \"{pk}\"=$2'\n",
5102-
" if new: await txn.execute(stmt, new, get_field(p.evt, pk))"
5103+
" if new: \n",
5104+
" await txn.execute(stmt, new, get_field(p.evt, pk))\n",
5105+
" p.evt = await tbl.selectone(f'\"{pk}\"=$1', [get_field(p.evt, pk)])"
51035106
]
51045107
},
51055108
{
@@ -5293,10 +5296,79 @@
52935296
"await jobs(\"payload=$1\", [pl])"
52945297
]
52955298
},
5299+
{
5300+
"cell_type": "markdown",
5301+
"id": "a866bdaa",
5302+
"metadata": {},
5303+
"source": [
5304+
"Test that when the transaction is aborted the claimed event row is still set to failed."
5305+
]
5306+
},
5307+
{
5308+
"cell_type": "code",
5309+
"execution_count": null,
5310+
"id": "e1e45f08",
5311+
"metadata": {},
5312+
"outputs": [],
5313+
"source": [
5314+
"await jobs.delete_where()\n",
5315+
"await jobs.inserts([Job(payload='repro', status='pending')])\n",
5316+
"\n",
5317+
"async with jobs.claim_one(order_by='id') as p:\n",
5318+
" await p.db.execute('select definitely_not_a_real_function()')\n",
5319+
" await p.db.t.job.update(p.evt, status='processing')\n",
5320+
"\n",
5321+
"test_eq(p.evt.status, 'failed')\n",
5322+
"test_eq(p.failed, True)"
5323+
]
5324+
},
5325+
{
5326+
"cell_type": "markdown",
5327+
"id": "81bb8295",
5328+
"metadata": {},
5329+
"source": [
5330+
"Test that if the `txn.conn.transaction` fails, we allow the exception to surface rather than having the generator swallow it and return a `RuntimeError(\"generator didn't yield\")`."
5331+
]
5332+
},
5333+
{
5334+
"cell_type": "code",
5335+
"execution_count": null,
5336+
"id": "77029d63",
5337+
"metadata": {},
5338+
"outputs": [],
5339+
"source": [
5340+
"await jobs.delete_where()\n",
5341+
"await jobs.inserts([Job(payload='x', status='pending')])"
5342+
]
5343+
},
5344+
{
5345+
"cell_type": "code",
5346+
"execution_count": null,
5347+
"id": "ccacb161",
5348+
"metadata": {},
5349+
"outputs": [],
5350+
"source": [
5351+
"async with db.acquire() as c: ConnType = type(c.conn)\n",
5352+
"orig = ConnType.transaction\n",
5353+
"calls = []\n",
5354+
"from unittest.mock import patch as mock_patch\n",
5355+
"\n",
5356+
"def boom(self, *a, **k):\n",
5357+
" calls.append(1)\n",
5358+
" if len(calls) >= 2: raise ConnectionError(\"savepoint boom\")\n",
5359+
" return orig(self, *a, **k)\n",
5360+
"\n",
5361+
"with mock_patch.object(ConnType, 'transaction', boom):\n",
5362+
" try:\n",
5363+
" async with jobs.claim_one(order_by='id') as p: pass\n",
5364+
" assert False, \"expected ConnectionError\"\n",
5365+
" except ConnectionError: pass"
5366+
]
5367+
},
52965368
{
52975369
"cell_type": "code",
52985370
"execution_count": null,
5299-
"id": "c7e5cb98",
5371+
"id": "089f1a43",
53005372
"metadata": {},
53015373
"outputs": [],
53025374
"source": [

0 commit comments

Comments
 (0)