Skip to content

Commit 35e472a

Browse files
committed
fix: pin bulk submission delete to primary DB
delete_xform_submissions and its async wrapper lack @use_master, so the submission_count correction reads from a replica that may have replication lag. The stale count matches num_of_submissions, skipping the correction entirely.
1 parent 3f398c3 commit 35e472a

File tree

4 files changed

+36
-0
lines changed

4 files changed

+36
-0
lines changed

onadata/apps/api/tasks.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from django.utils.datastructures import MultiValueDict
1818

1919
from celery.result import AsyncResult
20+
from multidb.pinning import use_master
2021

2122
from onadata.apps.api import tools
2223
from onadata.apps.logger.models import Instance, Project, ProjectInvitation, XForm
@@ -213,6 +214,7 @@ def share_project_async(project_id, username, role, remove=False):
213214

214215

215216
@app.task(base=AutoRetryTask)
217+
@use_master
216218
def delete_xform_submissions_async(
217219
xform_id: int,
218220
deleted_by_id: int,

onadata/apps/api/tests/test_tasks.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,3 +231,13 @@ def test_user_id_invalid(self, mock_logger, mock_delete):
231231
delete_xform_submissions_async.delay(self.xform.pk, sys.maxsize)
232232
self.assertFalse(mock_delete.called)
233233
mock_logger.assert_called_once()
234+
235+
def test_pinned_to_primary_db(self, mock_delete):
236+
"""Thread is pinned to primary DB during task execution"""
237+
from multidb.pinning import this_thread_is_pinned
238+
239+
mock_delete.side_effect = lambda *a, **kw: self.assertTrue(
240+
this_thread_is_pinned(),
241+
"Thread was not pinned to primary DB when calling delete_xform_submissions",
242+
)
243+
delete_xform_submissions_async.delay(self.xform.pk, self.user.pk)

onadata/libs/tests/utils/test_logger_tools.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,29 @@ def test_decrypted_submission_count_updated(self):
11861186
self.xform.refresh_from_db()
11871187
self.assertEqual(self.xform.num_of_decrypted_submissions, 3)
11881188

1189+
def test_pinned_to_primary_db(self):
1190+
"""Thread is pinned to primary DB during deletion"""
1191+
from multidb.pinning import this_thread_is_pinned
1192+
1193+
pinned_states = []
1194+
original_submission_count = type(self.xform).submission_count
1195+
1196+
def capture_pinned(xform_self, force_update=False):
1197+
if force_update:
1198+
pinned_states.append(this_thread_is_pinned())
1199+
return original_submission_count(xform_self, force_update=force_update)
1200+
1201+
with patch.object(
1202+
type(self.xform), "submission_count", capture_pinned
1203+
):
1204+
delete_xform_submissions(self.xform, self.user)
1205+
1206+
self.assertTrue(pinned_states, "submission_count was not called with force_update")
1207+
self.assertTrue(
1208+
all(pinned_states),
1209+
"Thread was not pinned to primary DB during submission_count",
1210+
)
1211+
11891212

11901213
class ResponseWithMimetypeAndNameTestCase(TestBase):
11911214
"""Tests for method `response_with_mimetype_and_name`"""

onadata/libs/utils/logger_tools.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,7 @@ def publish_xform(self):
13131313
return publish_xml_form(self.xml_file, self.user, self.project)
13141314

13151315

1316+
@use_master
13161317
def delete_xform_submissions(
13171318
xform: XForm,
13181319
deleted_by: User,

0 commit comments

Comments
 (0)