Skip to content

Commit 5d0fe57

Browse files
committed
Add retry and db conn check
1 parent 7a40281 commit 5d0fe57

4 files changed

Lines changed: 680 additions & 30 deletions

File tree

biomero_importer/main.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
STAGE_NEW_ORDER,
2222
STAGE_INGEST_STARTED,
2323
STAGE_INGEST_FAILED,
24-
STAGE_IMPORTED
24+
STAGE_IMPORTED,
25+
_mask_url,
2526
)
2627
from .utils.ingest_tracker import IngestionTracking, Base, get_ingest_tracker
2728
from .db_migrate import run_migrations_on_startup
@@ -211,7 +212,16 @@ def __init__(self, config, executor, poll_interval=5):
211212
self.ingest_tracker = get_ingest_tracker() # global instance
212213
self.IngestionTracking = IngestionTracking
213214

214-
self.logger.debug(f"Poller ready: {self.__dict__}")
215+
# Log poller state without leaking credentials
216+
safe_state = {
217+
k: (
218+
{ck: (_mask_url(cv) if 'db' in ck else cv)
219+
for ck, cv in v.items()}
220+
if isinstance(v, dict) else v
221+
)
222+
for k, v in self.__dict__.items()
223+
}
224+
self.logger.debug(f"Poller ready: {safe_state}")
215225

216226
# Ensure tables exist
217227
Base.metadata.create_all(self.ingest_tracker.engine)

biomero_importer/utils/importer.py

Lines changed: 109 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,21 @@
2323

2424
MAX_RETRIES = 5 # Maximum number of retries
2525
RETRY_DELAY = 5 # Delay between retries (in seconds)
26+
IMPORT_MAX_RETRIES = 3 # Maximum retries for transient import errors (e.g. Ice race conditions)
27+
IMPORT_RETRY_DELAY = 10 # Delay between import retries (in seconds)
2628
TMP_OUTPUT_FOLDER = "OMERO_inplace"
2729
PROCESSED_DATA_FOLDER = ".processed"
2830

31+
# Error patterns that indicate a transient/retryable import failure
32+
# (e.g. concurrent Ice session race conditions)
33+
RETRYABLE_IMPORT_ERRORS = [
34+
'Ice.ObjectNotExistException',
35+
'INTERNAL_EXCEPTION',
36+
'Ice.ConnectionLostException',
37+
'Ice.ConnectionRefusedException',
38+
'Ice.TimeoutException',
39+
]
40+
2941
# Canonical keys for storing preprocessing artifacts on the data_package
3042
PREPROC_META_KEY = "_preprocessing_metadata"
3143
PREPROC_RESULTS_KEY = "_preprocessing_results"
@@ -45,6 +57,39 @@ def get_tmp_output_path(data_package):
4557
return os.path.join("/OMERO", TMP_OUTPUT_FOLDER, data_package.get('UUID'))
4658

4759

60+
def is_retryable_import_error(errs_file, logger=None):
61+
"""
62+
Check if an import error file contains a retryable (transient) error.
63+
64+
Reads the .errs file produced by the OMERO CLI import and checks for
65+
known transient error patterns (e.g. Ice.ObjectNotExistException from
66+
concurrent import race conditions).
67+
68+
Returns (True, matched_pattern) if retryable, (False, None) otherwise.
69+
"""
70+
if not os.path.exists(errs_file):
71+
if logger:
72+
logger.warning(
73+
f"No .errs file found at {errs_file} — import likely crashed "
74+
f"before writing output. Treating as retryable."
75+
)
76+
return True, "no_errs_file"
77+
try:
78+
with open(errs_file, 'r') as f:
79+
content = f.read()
80+
for pattern in RETRYABLE_IMPORT_ERRORS:
81+
if pattern in content:
82+
if logger:
83+
logger.warning(
84+
f"Retryable error detected in {errs_file}: {pattern}"
85+
)
86+
return True, pattern
87+
except Exception as e:
88+
if logger:
89+
logger.warning(f"Could not read errs file {errs_file}: {e}")
90+
return False, None
91+
92+
4893
def connection(func):
4994
"""
5095
A decorator that wraps a function so that it receives an OMERO user connection.
@@ -421,7 +466,8 @@ def __init__(self, config, data_package, ttl_for_user_conn=6000000):
421466
self.imported = False
422467

423468
@connection
424-
def import_to_omero(self, conn, file_path, target_id, target_type, uuid, transfer_type="ln_s", depth=None):
469+
def import_to_omero(self, conn, file_path, target_id, target_type, uuid, transfer_type="ln_s", depth=None, log_id=None):
470+
log_id = log_id or uuid
425471
self.logger.debug(
426472
f"Starting import to OMERO for file: {file_path}, Target: {target_id} ({target_type})")
427473
cli = CLI()
@@ -434,8 +480,8 @@ def import_to_omero(self, conn, file_path, target_id, target_type, uuid, transfe
434480
'-p', str(conn.port),
435481
f'--transfer={transfer_type}',
436482
'--no-upgrade',
437-
'--file', f"logs/cli.{uuid}.logs",
438-
'--errs', f"logs/cli.{uuid}.errs",
483+
'--file', f"logs/cli.{log_id}.logs",
484+
'--errs', f"logs/cli.{log_id}.errs",
439485
]
440486
if 'parallel_upload_per_worker' in self.config:
441487
arguments += ['--parallel-upload',
@@ -631,7 +677,7 @@ def get_image_paths(self, conn, file_path, dataset_id):
631677
return [], template_prefixes # Return format consistent with get_plate_ids
632678

633679
@connection
634-
def import_dataset(self, conn, target, dataset, transfer="ln_s", depth=None):
680+
def import_dataset(self, conn, target, dataset, transfer="ln_s", depth=None, file_index=None):
635681
kwargs = {"transfer": transfer}
636682
if 'parallel_upload_per_worker' in self.config:
637683
kwargs['parallel-upload'] = str(
@@ -644,18 +690,49 @@ def import_dataset(self, conn, target, dataset, transfer="ln_s", depth=None):
644690
if depth:
645691
kwargs['depth'] = str(depth)
646692
uuid = self.data_package.get('UUID')
647-
kwargs['file'] = f"logs/cli.{uuid}.logs"
648-
kwargs['errs'] = f"logs/cli.{uuid}.errs"
649-
self.logger.debug(f"EZImport: {conn} {target} {int(dataset)} {kwargs}")
650-
result = ezomero.ezimport(conn=conn, target=target, dataset=int(dataset), **kwargs)
651-
# Check if import succeeded - ezimport returns None on failure, list (possibly empty) on success
652-
if result is not None:
653-
self.imported = True
654-
self.logger.info(f"Import succeeded, got image IDs: {result}")
655-
else:
656-
self.imported = False
657-
self.logger.error("Import failed - ezimport returned None")
658-
return result
693+
# Use per-file log IDs to avoid log collisions when importing multiple files
694+
log_id = f"{uuid}_{file_index}" if file_index is not None else uuid
695+
696+
for attempt in range(1, IMPORT_MAX_RETRIES + 1):
697+
# Use attempt-specific log/err files so retries don't overwrite previous evidence
698+
attempt_log_id = f"{log_id}_attempt{attempt}" if attempt > 1 else log_id
699+
kwargs['file'] = f"logs/cli.{attempt_log_id}.logs"
700+
kwargs['errs'] = f"logs/cli.{attempt_log_id}.errs"
701+
702+
self.logger.debug(f"EZImport (attempt {attempt}/{IMPORT_MAX_RETRIES}): {conn} {target} {int(dataset)} {kwargs}")
703+
result = ezomero.ezimport(conn=conn, target=target, dataset=int(dataset), **kwargs)
704+
705+
# Check if import succeeded - ezimport returns None on failure, list (possibly empty) on success
706+
if result is not None:
707+
self.imported = True
708+
self.logger.info(f"Import succeeded, got image IDs: {result}")
709+
return result
710+
711+
# Import failed — check if the error is transient/retryable
712+
errs_file = kwargs['errs']
713+
retryable, pattern = is_retryable_import_error(errs_file, self.logger)
714+
715+
if retryable and attempt < IMPORT_MAX_RETRIES:
716+
delay = IMPORT_RETRY_DELAY * attempt # increasing backoff
717+
self.logger.warning(
718+
f"Import failed with retryable error ({pattern}). "
719+
f"Retrying in {delay}s (attempt {attempt}/{IMPORT_MAX_RETRIES})..."
720+
)
721+
time.sleep(delay)
722+
continue
723+
else:
724+
if retryable:
725+
self.logger.error(
726+
f"Import failed with retryable error ({pattern}) but max retries "
727+
f"({IMPORT_MAX_RETRIES}) exhausted."
728+
)
729+
else:
730+
self.logger.error(
731+
f"Import failed - ezimport returned None (non-retryable error). "
732+
f"Check {errs_file} for details."
733+
)
734+
self.imported = False
735+
return result
659736

660737
def upload_files(self, conn, file_paths, dataset_id=None, screen_id=None, local_paths=None):
661738
uuid = self.data_package.get('UUID')
@@ -696,12 +773,14 @@ def upload_files(self, conn, file_paths, dataset_id=None, screen_id=None, local_
696773
# and in local_paths folder on the omero server storage
697774
# we will import now in-place from the omero server storage
698775
# and then we'll switch the in-place symlinks to the remote storage (subfolder)
776+
log_id = f"{uuid}_{i}"
699777
imported = self.import_to_omero(
700778
file_path=local_path,
701779
target_id=screen_id,
702780
target_type='Screen',
703781
uuid=uuid,
704-
depth=10
782+
depth=10,
783+
log_id=log_id
705784
)
706785
self.logger.debug("Upload done. Retrieving plate id.")
707786
image_ids, local_file_dir = self.get_plate_ids(
@@ -712,16 +791,19 @@ def upload_files(self, conn, file_paths, dataset_id=None, screen_id=None, local_
712791
image_ids = self.import_dataset(
713792
target=local_path,
714793
dataset=dataset_id,
715-
transfer="ln_s"
794+
transfer="ln_s",
795+
file_index=i
716796
)
717797
self.logger.debug(f"EZimport returned ids {image_ids} for {str(file_path)} ({dataset_id})")
718798
else:
799+
log_id = f"{uuid}_{i}"
719800
imported = self.import_to_omero(
720801
file_path=local_path,
721802
target_id=dataset_id,
722803
target_type='Dataset',
723804
uuid=uuid,
724-
depth=10
805+
depth=10,
806+
log_id=log_id
725807
)
726808
image_ids = dataset_id
727809

@@ -787,12 +869,14 @@ def upload_files(self, conn, file_paths, dataset_id=None, screen_id=None, local_
787869
)
788870
else:
789871
if screen_id: # screen
872+
log_id = f"{uuid}_{i}"
790873
imported = self.import_to_omero(
791874
file_path=str(file_path),
792875
target_id=screen_id,
793876
target_type='Screen',
794877
uuid=uuid,
795-
depth=10
878+
depth=10,
879+
log_id=log_id
796880
)
797881
image_ids, _ = self.get_plate_ids(
798882
str(file_path), screen_id)
@@ -801,16 +885,19 @@ def upload_files(self, conn, file_paths, dataset_id=None, screen_id=None, local_
801885
image_ids = self.import_dataset(
802886
target=str(file_path),
803887
dataset=dataset_id,
804-
transfer="ln_s"
888+
transfer="ln_s",
889+
file_index=i
805890
)
806891
self.logger.debug(f"EZimport returned ids {image_ids} for {str(file_path)} ({dataset_id})")
807892
elif os.path.isdir(file_path):
893+
log_id = f"{uuid}_{i}"
808894
imported = self.import_to_omero(
809895
file_path=str(file_path),
810896
target_id=dataset_id,
811897
target_type='Dataset',
812898
uuid=uuid,
813-
depth=10
899+
depth=10,
900+
log_id=log_id
814901
)
815902
image_ids = dataset_id
816903
self.logger.debug(f"Set ids {image_ids} to the dataset {dataset_id}")

biomero_importer/utils/ingest_tracker.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,9 @@ def __init__(self, config):
190190

191191
self.engine = create_engine(
192192
self.database_url,
193-
connect_args=connect_args
193+
connect_args=connect_args,
194+
pool_pre_ping=True,
195+
pool_recycle=1800,
194196
)
195197
self.Session = sessionmaker(bind=self.engine)
196198

@@ -339,29 +341,42 @@ def db_log_ingestion_event(
339341
self.logger.debug(
340342
f"Created database entry: {new_entry.id}")
341343
return new_entry.id
342-
except (SQLAlchemyError, psycopg2.OperationalError) as e:
344+
except (SQLAlchemyError, psycopg2.OperationalError,
345+
psycopg2.DatabaseError) as e:
343346
error_msg = str(e).lower()
344347
is_connection_issue = any(phrase in error_msg for phrase in [
345348
"closed the connection",
346349
"name or service not known",
347350
"connection refused",
348351
"could not connect",
349352
"network is unreachable",
350-
"timeout expired"
353+
"timeout expired",
354+
"pgres_tuples_ok",
355+
"does not return rows",
356+
"server closed the connection",
357+
"connection already closed",
358+
"ssl connection has been closed",
351359
])
352360

353361
if is_connection_issue and attempt < max_retries - 1:
354362
wait_time = 0.5 * (2 ** attempt) # Exponential backoff
355-
self.logger.warning(f"Database connection issue (attempt {attempt + 1}/{max_retries}), retrying in {wait_time}s: {e}")
363+
self.logger.warning(
364+
f"Database connection issue "
365+
f"(attempt {attempt + 1}/{max_retries}), "
366+
f"retrying in {wait_time}s: {e}"
367+
)
356368
time.sleep(wait_time)
357369
continue
358370
elif is_connection_issue:
359-
self.logger.warning(f"Database connection persistently failing after {max_retries} attempts: {e}")
371+
self.logger.warning(
372+
f"Database connection persistently failing "
373+
f"after {max_retries} attempts: {e}"
374+
)
360375
else:
361376
self.logger.error(
362377
f"Database error logging ingestion step: {e}",
363378
exc_info=True,
364-
)
379+
)
365380
return None
366381
except Exception as e:
367382
self.logger.error(

0 commit comments

Comments
 (0)