Skip to content

Commit b731683

Browse files
fix(preprod): Prevent duplicate snapshot PR comments under fanout (#117467)
Re-derive the existing snapshot PR comment id under a `select_for_update` lock that spans the GitHub call, so concurrent post tasks for the same PR serialize and update the shared comment in place instead of each creating its own. Previously `post_snapshot_pr_comment_task` made the GitHub create/update call with no lock, deciding create-vs-update from a comment id captured earlier by `create_preprod_snapshot_pr_comment_task`. A snapshot PR comment is a single PR-wide table with one row per artifact, so when several artifacts on a commit finished near-simultaneously, each post task saw no existing comment and created its own top-level comment. The result was duplicate comments on the PR, one of which became a permanently orphaned, never-updated copy once the surviving comment id was persisted. The fix moves the existing-comment-id resolution and the GitHub call inside one transaction holding `select_for_update` locks on every `CommitComparison` row for the PR. The first poster creates and persists the comment id; later posters block on the lock, observe the persisted id, and update. This mirrors the existing build-distribution path (`create_preprod_pr_comment_task`), which already posts inside the lock. The shared lock-and-resolve logic is extracted into `lock_pr_comparisons_for_update`. Holding the GitHub call inside the lock is deliberate: releasing the lock before the call would reintroduce the race (two posters both reading "no comment"). Lock hold time is bounded by the integration client timeout (30s), which matches the task's processing deadline. Areas worth careful review: - The lock now spans a network round-trip to GitHub. This is required for correctness and consistent with the build-distribution sibling, but reviewers should confirm they're comfortable with the bounded lock-held I/O. - `existing_comment_id` is removed from `post_snapshot_pr_comment_task`'s signature; in-flight queued messages that still pass it are absorbed by `**kwargs` during deploy.
1 parent fcc6311 commit b731683

3 files changed

Lines changed: 190 additions & 226 deletions

File tree

src/sentry/preprod/vcs/pr_comments/snapshot_tasks.py

Lines changed: 57 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
format_solo_snapshot_pr_comment,
2020
format_waiting_for_base_snapshot_pr_comment,
2121
)
22-
from sentry.preprod.vcs.pr_comments.tasks import find_existing_comment_id, save_pr_comment_result
22+
from sentry.preprod.vcs.pr_comments.tasks import (
23+
lock_pr_comparisons_for_update,
24+
save_pr_comment_result,
25+
)
2326
from sentry.shared_integrations.exceptions import ApiError
2427
from sentry.silo.base import SiloMode
2528
from sentry.tasks.base import instrumented_task
@@ -113,23 +116,14 @@ def create_preprod_snapshot_pr_comment_task(
113116
db_alias = router.db_for_write(CommitComparison)
114117

115118
with transaction.atomic(db_alias):
116-
all_for_pr = list(
117-
CommitComparison.objects.select_for_update()
118-
.filter(
119-
organization_id=commit_comparison.organization_id,
120-
head_repo_name=commit_comparison.head_repo_name,
121-
pr_number=commit_comparison.pr_number,
122-
)
123-
.order_by("id")
119+
cc, existing_comment_id = lock_pr_comparisons_for_update(
120+
organization_id=commit_comparison.organization_id,
121+
head_repo_name=commit_comparison.head_repo_name,
122+
pr_number=commit_comparison.pr_number,
123+
target_id=commit_comparison.id,
124+
comment_type="snapshots",
124125
)
125126

126-
try:
127-
cc = next(c for c in all_for_pr if c.id == commit_comparison.id)
128-
except StopIteration:
129-
raise CommitComparison.DoesNotExist(
130-
f"CommitComparison {commit_comparison.id} was deleted before lock acquisition"
131-
)
132-
133127
all_artifacts = list(artifact.get_sibling_artifacts_for_commit())
134128

135129
artifact_ids = [a.id for a in all_artifacts]
@@ -165,7 +159,6 @@ def create_preprod_snapshot_pr_comment_task(
165159

166160
is_solo = not base_artifact_map
167161

168-
existing_comment_id = find_existing_comment_id(all_for_pr, "snapshots")
169162
cc_id = cc.id
170163

171164
if is_solo:
@@ -241,7 +234,6 @@ def create_preprod_snapshot_pr_comment_task(
241234
commit_comparison_id=cc_id,
242235
artifact_id=artifact.id,
243236
comment_body=comment_body,
244-
existing_comment_id=existing_comment_id,
245237
)
246238

247239

@@ -261,7 +253,6 @@ def post_snapshot_pr_comment_task(
261253
commit_comparison_id: int,
262254
artifact_id: int | None = None,
263255
comment_body: str,
264-
existing_comment_id: str | None,
265256
**kwargs: Any,
266257
) -> None:
267258
try:
@@ -283,38 +274,53 @@ def post_snapshot_pr_comment_task(
283274

284275
comment_id: str | None = None
285276
api_error: Exception | None = None
277+
db_alias = router.db_for_write(CommitComparison)
286278

287279
try:
288-
if existing_comment_id:
289-
client.update_comment(
290-
repo=repo_name,
291-
issue_id=str(pr_number),
292-
comment_id=str(existing_comment_id),
293-
data={"body": comment_body},
294-
)
295-
comment_id = existing_comment_id
296-
else:
297-
resp = client.create_comment(
298-
repo=repo_name,
299-
issue_id=str(pr_number),
300-
data={"body": comment_body},
280+
# The comment_id is re-derived under the lock instead of trusting the
281+
# value passed from the create task: when several artifacts on a commit
282+
# post at once, each create task reads no existing comment, so the
283+
# first post here would otherwise create a duplicate comment instead of
284+
# updating the shared one. The GitHub call is held inside the lock (as
285+
# in create_preprod_pr_comment_task) so concurrent posters serialize on
286+
# the decision; lock hold is bounded by the client timeout, which
287+
# matches this task's processing deadline.
288+
with transaction.atomic(db_alias):
289+
cc, comment_id = lock_pr_comparisons_for_update(
290+
organization_id=organization.id,
291+
head_repo_name=repo_name,
292+
pr_number=pr_number,
293+
target_id=commit_comparison_id,
294+
comment_type="snapshots",
301295
)
302-
comment_id = str(resp["id"])
303-
except Exception as e:
304-
extra: dict[str, Any] = {
305-
"commit_comparison_id": commit_comparison_id,
306-
"organization_id": organization_id,
307-
"error_type": type(e).__name__,
308-
}
309-
if isinstance(e, ApiError):
310-
extra["status_code"] = e.code
311-
logger.exception("preprod.snapshot_pr_comments.post.failed", extra=extra)
312-
api_error = e
296+
is_update = comment_id is not None
297+
298+
try:
299+
if comment_id:
300+
client.update_comment(
301+
repo=repo_name,
302+
issue_id=str(pr_number),
303+
comment_id=str(comment_id),
304+
data={"body": comment_body},
305+
)
306+
else:
307+
resp = client.create_comment(
308+
repo=repo_name,
309+
issue_id=str(pr_number),
310+
data={"body": comment_body},
311+
)
312+
comment_id = str(resp["id"])
313+
except Exception as e:
314+
extra: dict[str, Any] = {
315+
"commit_comparison_id": commit_comparison_id,
316+
"organization_id": organization_id,
317+
"error_type": type(e).__name__,
318+
}
319+
if isinstance(e, ApiError):
320+
extra["status_code"] = e.code
321+
logger.exception("preprod.snapshot_pr_comments.post.failed", extra=extra)
322+
api_error = e
313323

314-
db_alias = router.db_for_write(CommitComparison)
315-
try:
316-
with transaction.atomic(db_alias):
317-
cc = CommitComparison.objects.select_for_update().get(id=commit_comparison_id)
318324
if api_error is not None:
319325
save_pr_comment_result(cc, "snapshots", success=False, error=api_error)
320326
else:
@@ -328,7 +334,7 @@ def post_snapshot_pr_comment_task(
328334
"comment_id": comment_id,
329335
"repo_name": repo_name,
330336
"pr_number": pr_number,
331-
"is_update": existing_comment_id is not None,
337+
"is_update": is_update,
332338
},
333339
)
334340
except CommitComparison.DoesNotExist:
@@ -338,6 +344,9 @@ def post_snapshot_pr_comment_task(
338344
)
339345
return
340346

347+
# Re-raised outside the transaction so the failure record is committed
348+
# before the retry fires. Terminal 4xx (except 429) are swallowed; 429,
349+
# 5xx, and network errors re-raise to trigger the task's retry policy.
341350
if api_error is not None:
342351
if (
343352
isinstance(api_error, ApiError)

src/sentry/preprod/vcs/pr_comments/tasks.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,39 @@ def find_existing_comment_id(
194194
return None
195195

196196

197+
def lock_pr_comparisons_for_update(
198+
*,
199+
organization_id: int,
200+
head_repo_name: str,
201+
pr_number: int,
202+
target_id: int,
203+
comment_type: str,
204+
) -> tuple[CommitComparison, str | None]:
205+
"""Lock every CommitComparison row for the PR and return the target row
206+
along with the existing comment id for ``comment_type``.
207+
208+
Must be called inside an open transaction. The ``SELECT ... FOR UPDATE``
209+
serializes tasks operating on the same PR (ordered by id for a stable lock
210+
order), so the comment id is read from a consistent committed view rather
211+
than a value captured earlier by another task.
212+
"""
213+
all_for_pr = list(
214+
CommitComparison.objects.select_for_update()
215+
.filter(
216+
organization_id=organization_id,
217+
head_repo_name=head_repo_name,
218+
pr_number=pr_number,
219+
)
220+
.order_by("id")
221+
)
222+
cc = next((c for c in all_for_pr if c.id == target_id), None)
223+
if cc is None:
224+
raise CommitComparison.DoesNotExist(
225+
f"CommitComparison {target_id} was deleted before lock acquisition"
226+
)
227+
return cc, find_existing_comment_id(all_for_pr, comment_type)
228+
229+
197230
def save_pr_comment_result(
198231
commit_comparison: CommitComparison,
199232
comment_type: str,

0 commit comments

Comments
 (0)