Skip to content

Commit 896e6b7

Browse files
committed
Fix database concurrency conflicts with retry mechanism and enhanced error handling
- Add retry_on_database_conflict decorator with exponential backoff for database operations - Implement transaction-safe database_transaction context manager with proper rollback - Apply @retry_on_database_conflict to all WorkflowTracker event sourcing operations - Add retry protection to view update operations in JobAccounting, WorkflowProgress, and JobProgress - Enhance EngineManager with robust commit/rollback error handling and safe_commit method - Add support for EventSourcing-specific exceptions (IntegrityError, OperationalError) - Implement defensive checks for unknown workflows/tasks in event handlers - Upgrade error logging from ERROR to WARNING level for retryable conflicts - Add comprehensive test coverage for retry mechanisms and concurrent access scenarios This resolves production issues where database unique constraint violations and deadlocks were causing thread failures and incomplete workflow tracking in high-concurrency scenarios like batch processing workflows.
1 parent 2eec75b commit 896e6b7

4 files changed

Lines changed: 665 additions & 31 deletions

File tree

biomero/database.py

Lines changed: 168 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,144 @@
2424
)
2525
from sqlalchemy.orm import sessionmaker, declarative_base, scoped_session
2626
from sqlalchemy.dialects.postgresql import UUID as PGUUID
27+
from sqlalchemy.exc import OperationalError, IntegrityError
2728
import os
29+
import time
30+
import random
31+
from contextlib import contextmanager
32+
from functools import wraps
33+
34+
# Import EventSourcing exceptions
35+
try:
36+
from eventsourcing.persistence import IntegrityError as EventSourcingIntegrityError
37+
from eventsourcing.persistence import OperationalError as EventSourcingOperationalError
38+
except ImportError:
39+
# Fallback for older versions
40+
EventSourcingIntegrityError = None
41+
EventSourcingOperationalError = None
2842

2943
logger = logging.getLogger(__name__)
3044

45+
# --------------------- CONCURRENCY HELPERS ---------------------------- #
46+
47+
def retry_on_database_conflict(max_retries=10, base_delay=0.1, max_delay=5.0):
48+
"""Decorator to retry database operations on concurrency conflicts.
49+
50+
Args:
51+
max_retries: Maximum number of retry attempts
52+
base_delay: Base delay between retries in seconds
53+
max_delay: Maximum delay between retries in seconds
54+
"""
55+
def decorator(func):
56+
@wraps(func)
57+
def wrapper(*args, **kwargs):
58+
last_exception = None
59+
60+
# Build list of exception types to catch
61+
exception_types = [IntegrityError, OperationalError]
62+
if EventSourcingIntegrityError:
63+
exception_types.append(EventSourcingIntegrityError)
64+
if EventSourcingOperationalError:
65+
exception_types.append(EventSourcingOperationalError)
66+
exception_types = tuple(exception_types)
67+
68+
for attempt in range(max_retries + 1):
69+
try:
70+
return func(*args, **kwargs)
71+
except exception_types as e:
72+
last_exception = e
73+
74+
# Don't retry on the last attempt
75+
if attempt == max_retries:
76+
break
77+
78+
# Check if it's a conflict we can retry
79+
error_msg = str(e).lower()
80+
81+
# For EventSourcing exceptions, also check the underlying cause
82+
if hasattr(e, '__cause__') and e.__cause__:
83+
error_msg += " " + str(e.__cause__).lower()
84+
85+
# These are retryable concurrency conflicts
86+
retryable_conflicts = [
87+
'unique constraint',
88+
'duplicate key',
89+
'concurrent update',
90+
'transaction is aborted',
91+
'deadlock detected'
92+
]
93+
94+
# These are permanent data validation errors - do NOT retry
95+
non_retryable_errors = [
96+
'not null constraint',
97+
'null value in column',
98+
'cannot be null'
99+
]
100+
101+
# Check for non-retryable errors first
102+
if any(error in error_msg for error in non_retryable_errors):
103+
logger.debug(f"Non-retryable data validation error: {e}")
104+
break
105+
106+
# Check for retryable conflicts
107+
if any(conflict in error_msg for conflict in retryable_conflicts):
108+
# Calculate exponential backoff with jitter
109+
delay = min(
110+
base_delay * (2 ** attempt) + random.uniform(0, 0.1),
111+
max_delay
112+
)
113+
logger.warning(
114+
f"Database conflict on attempt {attempt + 1}/{max_retries + 1}: {e}. "
115+
f"Retrying in {delay:.2f}s..."
116+
)
117+
118+
time.sleep(delay)
119+
120+
# Rollback current transaction
121+
try:
122+
EngineManager.rollback()
123+
except:
124+
pass
125+
else:
126+
# Not a retryable error
127+
break
128+
129+
# All retries exhausted, raise the last exception
130+
logger.error(f"Database operation failed after {max_retries + 1} attempts: {last_exception}")
131+
raise last_exception
132+
133+
return wrapper
134+
return decorator
135+
136+
137+
@contextmanager
138+
def database_transaction(isolation_level=None):
139+
"""Context manager for database transactions with proper error handling.
140+
141+
Args:
142+
isolation_level: SQLAlchemy isolation level (e.g., 'READ_COMMITTED', 'SERIALIZABLE')
143+
"""
144+
# Don't manage session lifecycle here - let the scoped session handle it
145+
# Just provide transaction boundaries and error handling
146+
try:
147+
# Set isolation level if specified
148+
if isolation_level:
149+
session = EngineManager.get_session()
150+
session.connection(execution_options={'isolation_level': isolation_level})
151+
152+
yield
153+
154+
# Commit if we reach this point
155+
EngineManager.commit()
156+
157+
except Exception as e:
158+
try:
159+
EngineManager.rollback()
160+
logger.debug(f"Transaction rolled back due to: {e}")
161+
except Exception as rollback_error:
162+
logger.error(f"Error during rollback: {rollback_error}")
163+
raise
164+
31165
# --------------------- VIEWS DB tables/classes ---------------------------- #
32166

33167
# Base class for declarative class definitions
@@ -247,7 +381,40 @@ def commit(cls):
247381
"""
248382
Commits the current transaction in the scoped session.
249383
"""
250-
cls._session.commit()
384+
try:
385+
cls._session.commit()
386+
except Exception as e:
387+
logger.warning(f"Database commit failed (will be retried if applicable): {e}")
388+
cls.rollback()
389+
raise
390+
391+
@classmethod
392+
def rollback(cls):
393+
"""
394+
Rolls back the current transaction in the scoped session.
395+
"""
396+
try:
397+
cls._session.rollback()
398+
except Exception as e:
399+
logger.warning(f"Database rollback failed (forcing session removal): {e}")
400+
# Force session removal on rollback failure
401+
cls._session.remove()
402+
raise
403+
404+
@classmethod
405+
def remove_session(cls):
406+
"""
407+
Removes the current session from the scoped session registry.
408+
"""
409+
cls._session.remove()
410+
411+
@classmethod
412+
@retry_on_database_conflict(max_retries=10)
413+
def safe_commit(cls):
414+
"""
415+
Commits the transaction with automatic retry on conflicts.
416+
"""
417+
cls.commit()
251418

252419
@classmethod
253420
def close_engine(cls):

biomero/eventsourcing.py

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from typing import Any, Dict, List
1919
from fabric import Result
2020
import logging
21-
from biomero.database import EngineManager
21+
from biomero.database import EngineManager, retry_on_database_conflict
2222

2323

2424
# Create a logger for this module
@@ -346,6 +346,7 @@ class WorkflowTracker(Application):
346346
update_task_progress: Updates the progress of a task.
347347
"""
348348

349+
@retry_on_database_conflict(max_retries=3)
349350
def initiate_workflow(self,
350351
name: str,
351352
description: str,
@@ -366,9 +367,10 @@ def initiate_workflow(self,
366367
logger.debug(f"[WFT] Initiating workflow: name={name}, description={description}, user={user}, group={group}")
367368
workflow = WorkflowRun(name, description, user, group)
368369
self.save(workflow)
369-
EngineManager.commit()
370+
EngineManager.safe_commit()
370371
return workflow.id
371372

373+
@retry_on_database_conflict(max_retries=3)
372374
def add_task_to_workflow(self,
373375
workflow_id: UUID,
374376
task_name: str,
@@ -397,13 +399,14 @@ def add_task_to_workflow(self,
397399
input_data,
398400
kwargs)
399401
self.save(task)
400-
EngineManager.commit()
402+
EngineManager.safe_commit()
401403
workflow: WorkflowRun = self.repository.get(workflow_id)
402404
workflow.add_task(task.id)
403405
self.save(workflow)
404-
EngineManager.commit()
406+
EngineManager.safe_commit()
405407
return task.id
406408

409+
@retry_on_database_conflict(max_retries=3)
407410
def start_workflow(self, workflow_id: UUID):
408411
"""
409412
Starts the workflow with the given UUID.
@@ -416,8 +419,9 @@ def start_workflow(self, workflow_id: UUID):
416419
workflow: WorkflowRun = self.repository.get(workflow_id)
417420
workflow.start_workflow()
418421
self.save(workflow)
419-
EngineManager.commit()
422+
EngineManager.safe_commit()
420423

424+
@retry_on_database_conflict(max_retries=3)
421425
def complete_workflow(self, workflow_id: UUID):
422426
"""
423427
Marks the workflow with the given UUID as completed.
@@ -430,8 +434,9 @@ def complete_workflow(self, workflow_id: UUID):
430434
workflow: WorkflowRun = self.repository.get(workflow_id)
431435
workflow.complete_workflow()
432436
self.save(workflow)
433-
EngineManager.commit()
437+
EngineManager.safe_commit()
434438

439+
@retry_on_database_conflict(max_retries=3)
435440
def fail_workflow(self, workflow_id: UUID, error_message: str):
436441
"""
437442
Marks the workflow with the given UUID as failed with an error message.
@@ -445,8 +450,9 @@ def fail_workflow(self, workflow_id: UUID, error_message: str):
445450
workflow: WorkflowRun = self.repository.get(workflow_id)
446451
workflow.fail_workflow(error_message)
447452
self.save(workflow)
448-
EngineManager.commit()
453+
EngineManager.safe_commit()
449454

455+
@retry_on_database_conflict(max_retries=3)
450456
def start_task(self, task_id: UUID):
451457
"""
452458
Starts the task with the given UUID.
@@ -459,8 +465,9 @@ def start_task(self, task_id: UUID):
459465
task: Task = self.repository.get(task_id)
460466
task.start_task()
461467
self.save(task)
462-
EngineManager.commit()
468+
EngineManager.safe_commit()
463469

470+
@retry_on_database_conflict(max_retries=3)
464471
def complete_task(self, task_id: UUID, message: str):
465472
"""
466473
Marks the task with the given UUID as completed with a result message.
@@ -474,8 +481,9 @@ def complete_task(self, task_id: UUID, message: str):
474481
task: Task = self.repository.get(task_id)
475482
task.complete_task(message)
476483
self.save(task)
477-
EngineManager.commit()
484+
EngineManager.safe_commit()
478485

486+
@retry_on_database_conflict(max_retries=3)
479487
def fail_task(self, task_id: UUID, error_message: str):
480488
"""
481489
Marks the task with the given UUID as failed with an error message.
@@ -489,8 +497,9 @@ def fail_task(self, task_id: UUID, error_message: str):
489497
task: Task = self.repository.get(task_id)
490498
task.fail_task(error_message)
491499
self.save(task)
492-
EngineManager.commit()
500+
EngineManager.safe_commit()
493501

502+
@retry_on_database_conflict(max_retries=3)
494503
def add_job_id(self, task_id, slurm_job_id):
495504
"""
496505
Adds a Slurm job ID to the task with the given UUID.
@@ -504,8 +513,9 @@ def add_job_id(self, task_id, slurm_job_id):
504513
task: Task = self.repository.get(task_id)
505514
task.add_job_id(slurm_job_id)
506515
self.save(task)
507-
EngineManager.commit()
516+
EngineManager.safe_commit()
508517

518+
@retry_on_database_conflict(max_retries=3)
509519
def add_result(self, task_id, result):
510520
"""
511521
Adds a result to the task with the given UUID.
@@ -519,8 +529,9 @@ def add_result(self, task_id, result):
519529
task: Task = self.repository.get(task_id)
520530
task.add_result(result)
521531
self.save(task)
522-
EngineManager.commit()
532+
EngineManager.safe_commit()
523533

534+
@retry_on_database_conflict(max_retries=3)
524535
def update_task_status(self, task_id, status):
525536
"""
526537
Updates the status of the task with the given UUID.
@@ -534,8 +545,9 @@ def update_task_status(self, task_id, status):
534545
task: Task = self.repository.get(task_id)
535546
task.update_task_status(status)
536547
self.save(task)
537-
EngineManager.commit()
548+
EngineManager.safe_commit()
538549

550+
@retry_on_database_conflict(max_retries=3)
539551
def update_task_progress(self, task_id, progress):
540552
"""
541553
Updates the progress of the task with the given UUID.
@@ -549,7 +561,7 @@ def update_task_progress(self, task_id, progress):
549561
task: Task = self.repository.get(task_id)
550562
task.update_task_progress(progress)
551563
self.save(task)
552-
EngineManager.commit()
564+
EngineManager.safe_commit()
553565

554566

555567

0 commit comments

Comments
 (0)