From 4c94acfad6328efafe4a64dcf12e8359b157417c Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 24 Feb 2026 14:28:21 +0100 Subject: [PATCH 01/27] add exit code dependent retry policy --- .../TaskWorker/Actions/DagmanCreator.py | 2 +- src/python/TaskWorker/Actions/RetryJob.py | 92 ++++++++++++------- 2 files changed, 59 insertions(+), 35 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 8c917a3098..05634d15f1 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -887,7 +887,7 @@ def createSubdag(self, splitterResult, **kwargs): ## In the future this parameter may be set by the user in the CRAB configuration ## file and we would take it from the Task DB. - self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 2) + self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 9) runtime = self.task['tm_split_args'].get('minutes_per_job', -1) diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index b4d11a5c0e..a003351d38 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -5,6 +5,7 @@ import shutil import subprocess import socket +import time from collections import namedtuple from ServerUtilities import executeCommand, getLock @@ -15,6 +16,40 @@ JOB_RETURN_CODES = namedtuple('JobReturnCodes', 'OK RECOVERABLE_ERROR FATAL_ERROR')(0, 1, 2) +# ---------------------------------------------------------------------- +# Exit-code dependent retry policy +# ---------------------------------------------------------------------- + +EXIT_RETRY_POLICY = { + 1: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to bootstrap CMSSW; likely a worker node issue."}, + 50513: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, + 81: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, + 50115: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job did not produce a FJR; will retry."}, + 195: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not produce a FJR; will retry."}, + 137: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "SIGKILL; likely an unrelated batch system kill."}, + 10034: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."}, + 50: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."}, + 10040: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Site Error: failed to generate cmsRun cfg file at runtime."}, + 60403: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout."}, + 243: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout."}, + 60307: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, + 147: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, + 60311: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, + 151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, + 8028: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job failed to open local and fallback files."}, + 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error)."}, + 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error)."}, + 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError."}, + 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file."}, + 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, + 86: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, + 92: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, + 134: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Abort (ANSI) or IOT trap (4.2 BSD) (most likely user application crashed)."}, + 8001: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Other CMS Exception."}, + 65: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "End of job from user application (CMSSW)."}, + "default": {"type": "neutral", "max_retries": 2, "delay": 900, "msg": "Taking default exit code retry policy route."} +} + # strings in fatal root exception text which indicate code problem, not corrupted file # a small "knowledge data base" NOT_FILE_RELATED_FATAL_ROOT_ERRORS = [ @@ -98,6 +133,27 @@ def get_job_ad_from_condor_q(self): # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def apply_retry_policy(self, exitCode): + """ + Enforce exit-code dependent retry limits and delay. + Raises FatalError if retry limit exceeded. + """ + policy = EXIT_RETRY_POLICY.get(exitCode, EXIT_RETRY_POLICY["default"]) + + if policy["type"] == "recoverable": + if self.crab_retry >= policy["max_retries"]: + raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") + delay = policy.get("delay", 900) + self.logger.info(f"Sleeping {delay} seconds before retry (exit code {exitCode})") + time.sleep(delay) + if exitCode in [8020, 8021, 8022, 8028, 84, 85, 86, 92, 134, 8001, 65]: + return + raise RecoverableError(policy["msg"]) + + if policy["type"] == "neutral": + return + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def get_job_ad_from_file(self): """ Need a doc string here @@ -307,6 +363,8 @@ def check_exit_code(self): return 1 try: exitCode = int(self.report['exitCode']) + # Apply retry policy + self.apply_retry_policy(exitCode) except ValueError: msg = "Unable to extract job's wrapper exit code from job report." self.logger.warning(msg) @@ -319,7 +377,6 @@ def check_exit_code(self): return 0 msg = "Job or stageout wrapper finished with exit code %d." % (exitCode) - msg += " Trying to determine the meaning of the exit code and if it is a recoverable or fatal error." self.logger.info(msg) # Wrapper script sometimes returns the posix return code (8 bits). @@ -335,17 +392,6 @@ def check_exit_code(self): self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) # retry the job raise RecoverableError("Job failed to open local and fallback files.") - if exitCode == 1: - raise RecoverableError("Job failed to bootstrap CMSSW; likely a worker node issue.") - - if exitCode == 50513 or exitCode == 81: - raise RecoverableError("Job did not find functioning CMSSW on worker node.") - - # This is a difficult one -- right now CMSRunAnalysis.py will turn things like - # segfaults into an invalid FJR. Will revisit this decision later. - if exitCode == 50115 or exitCode == 195: - raise RecoverableError("Job did not produce a FJR; will retry.") - if exitCode == 134: recoverable_signal = False try: @@ -379,28 +425,6 @@ def check_exit_code(self): if cvmfs_issue: raise RecoverableError("CVMFS issue detected.") - # Another difficult case -- so far, SIGKILL has mostly been observed at T2_CH_CERN, and it has nothing to do - # with an issue of the job itself. Typically, this isn't the user code's fault - # it was often a site or pilot misconfiguration that led to the pilot exhausting its allocated runtime. - # We should revisit this issue if we see SIGKILL happening for other cases that are the users' fault. - if exitCode == 137: - raise RecoverableError("SIGKILL; likely an unrelated batch system kill.") - - if exitCode == 10034 or exitCode == 50: - raise RecoverableError("Required application version not found at the site.") - - if exitCode == 10040: - raise RecoverableError("Site Error: failed to generate cmsRun cfg file at runtime.") - - if exitCode == 60403 or exitCode == 243: - raise RecoverableError("Timeout during attempted file stageout.") - - if exitCode == 60307 or exitCode == 147: - raise RecoverableError("Error during attempted file stageout.") - - if exitCode == 60311 or exitCode == 151: - raise RecoverableError("Error during attempted file stageout.") - if exitCode: raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n '))) From e70893e394eb912a318989f20577967846a0ec60 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 24 Feb 2026 18:25:24 +0100 Subject: [PATCH 02/27] adding retry delay condition to needsDefer --- src/python/TaskWorker/Actions/PreJob.py | 23 ++++++++- src/python/TaskWorker/Actions/RetryJob.py | 58 ++++++++++++++++------- 2 files changed, 62 insertions(+), 19 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 837b78f6d9..b14640cd63 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -456,19 +456,38 @@ def needsDefer(self): slow release of jobs in a task. The function return True if CRAB_JobReleaseTimeout is defined and not 0, and if the submit time of the task plus the defer time is greater than the current time. + Additionally retry policy delay """ deferTime = int(self.task_ad.get("CRAB_JobReleaseTimeout", 0)) + + currentTime = time.time() + + # Check retry delay from resubmit_info + retry_info_file = f"resubmit_info/job.{self.job_id}.txt" + if os.path.exists(retry_info_file): + try: + with open(retry_info_file, "r", encoding="utf-8") as fd: + retry_info = literal_eval(fd.read()) + key = str(self.dag_retry) + if key in retry_info: + retry_delay_until = retry_info[key].get("retry_delay_until") + if retry_delay_until and currentTime < retry_delay_until: + wait = int(retry_delay_until - currentTime) + self.logger.info(f"Retry delay not elapsed yet. Deferring for {wait} seconds.") + return True + except Exception: + self.logger.exception("Error checking retry delay in resubmit_info") + if deferTime: self.logger.info('Release timeout specified in extraJDL:') totalDefer = deferTime * int(self.job_id) submitTime = int(self.task_ad["CRAB_TaskSubmitTime"]) - currentTime = time.time() if currentTime < (submitTime + totalDefer): msg = f" Defer time of this job ({totalDefer} seconds since initial task submission)" msg += f" not elapsed yet, deferring for {totalDefer} seconds" self.logger.info(msg) return True - self.logger.info(' Continuing normally since current time is greater than requested starttime of the job') + self.logger.info('Continuing normally since current time is greater than requested starttime of the job') return False def execute(self, *args): diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index a003351d38..ec4fbd0e20 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -133,25 +133,49 @@ def get_job_ad_from_condor_q(self): # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def store_retry_delay(self, delay): + retry_info_file = f"resubmit_info/job.{self.job_id}.txt" + retry_info = {} + if os.path.exists(retry_info_file): + try: + with open(retry_info_file, "r", encoding="utf-8") as fd: + retry_info = eval(fd.read()) + except Exception: + retry_info = {} + + key = str(self.crab_retry) + if key not in retry_info: + retry_info[key] = {} + + retry_info[key]["retry_delay_until"] = time.time() + delay + + with open(retry_info_file + ".tmp", "w", encoding="utf-8") as fd: + fd.write(str(retry_info)) + os.rename(retry_info_file + ".tmp", retry_info_file) + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + + def apply_retry_policy(self, exitCode): - """ - Enforce exit-code dependent retry limits and delay. - Raises FatalError if retry limit exceeded. - """ - policy = EXIT_RETRY_POLICY.get(exitCode, EXIT_RETRY_POLICY["default"]) - - if policy["type"] == "recoverable": - if self.crab_retry >= policy["max_retries"]: - raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") - delay = policy.get("delay", 900) - self.logger.info(f"Sleeping {delay} seconds before retry (exit code {exitCode})") - time.sleep(delay) - if exitCode in [8020, 8021, 8022, 8028, 84, 85, 86, 92, 134, 8001, 65]: - return - raise RecoverableError(policy["msg"]) + """ + Enforce exit-code dependent retry limits and delay. + Raises FatalError if retry limit exceeded. + """ + policy = EXIT_RETRY_POLICY.get(exitCode, EXIT_RETRY_POLICY["default"]) + + if policy["type"] == "recoverable": + if self.crab_retry >= policy["max_retries"]: + raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") + delay = policy.get("delay", 900) + self.logger.info(f"Sleeping {delay} seconds before retry (exit code {exitCode})") + self.store_retry_delay(delay) + if exitCode in [8020, 8021, 8022, 8028, 84, 85, 86, 92, 134, 8001, 65]: + return + raise RecoverableError(policy["msg"]) - if policy["type"] == "neutral": - return + if policy["type"] == "neutral": + return + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = def get_job_ad_from_file(self): From 595b71b9158d31f59ce03028eac3ae0bdff8a792 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 24 Feb 2026 19:39:21 +0100 Subject: [PATCH 03/27] add exit code dependent ability to change maxmemory, maxjobruntime and site --- src/python/TaskWorker/Actions/PreJob.py | 28 ++++++++++++++++++++ src/python/TaskWorker/Actions/RetryJob.py | 31 +++++++++++++++-------- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index b14640cd63..8211be170d 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -298,6 +298,25 @@ def alter_submit(self, crab_retry): maxmemory = self.resubmit_info[inkey].get('maxmemory') numcores = self.resubmit_info[inkey].get('numcores') priority = self.resubmit_info[inkey].get('priority') + + #ExitCode Dependent change in resubmission parameters for retries + + retry_data = self.resubmit_info.get(inkey, {}) + + if retry_data.get("increase_memory") and maxmemory: + factor = retry_data.get("memory_factor", 1.2) + new_memory = int(int(maxmemory) * factor) + if hasattr(self, "MAX_MEMORY"): + new_memory = min(new_memory, self.MAX_MEMORY) + self.logger.info(f"Increasing memory from {maxmemory} to {new_memory}") + maxmemory = new_memory + + if retry_data.get("increase_runtime") and maxjobruntime: + factor = retry_data.get("runtime_factor", 1.2) + new_runtime = int(int(maxjobruntime) * factor) + self.logger.info(f"Increasing walltime from {maxjobruntime} to {new_runtime}") + maxjobruntime = new_runtime + ## Save the (new) values of the resubmission parameters in self.resubmit_info ## for the current job retry number. outkey = str(crab_retry) @@ -414,6 +433,15 @@ def redoSites(self, crab_retry, use_resubmit_info): self.logger.error("Can not submit since DESIRED_Sites list is empty") self.prejob_exit_code = 1 sys.exit(self.prejob_exit_code) + + # ExitCode Dependent discard of previous_site + retry_data = self.resubmit_info.get(str(crab_retry), {}) + previous_site = retry_data.get("previous_site") + if retry_data.get("change_site") and previous_site: + if previous_site in availableSet: + self.logger.info(f"Removing previous site {previous_site} from candidate sites") + availableSet.discard(previous_site) + ## Make sure that attributest which will be used in MatchMaking are SORTED lists available = list(availableSet) available.sort() diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index ec4fbd0e20..0843a7bb2c 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -7,6 +7,7 @@ import socket import time from collections import namedtuple +from ast import literal_eval from ServerUtilities import executeCommand, getLock from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY @@ -24,21 +25,21 @@ 1: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to bootstrap CMSSW; likely a worker node issue."}, 50513: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, 81: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, - 50115: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job did not produce a FJR; will retry."}, - 195: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not produce a FJR; will retry."}, + 50115: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, + 195: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, 137: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "SIGKILL; likely an unrelated batch system kill."}, 10034: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."}, 50: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."}, 10040: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Site Error: failed to generate cmsRun cfg file at runtime."}, - 60403: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout."}, - 243: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout."}, + 60403: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, + 243: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, 60307: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 147: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 60311: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 8028: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job failed to open local and fallback files."}, - 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error)."}, - 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error)."}, + 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True}, + 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True}, 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError."}, 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file."}, 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, @@ -133,13 +134,14 @@ def get_job_ad_from_condor_q(self): # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = - def store_retry_delay(self, delay): + def store_retry_actions(self, policy): retry_info_file = f"resubmit_info/job.{self.job_id}.txt" retry_info = {} + if os.path.exists(retry_info_file): try: with open(retry_info_file, "r", encoding="utf-8") as fd: - retry_info = eval(fd.read()) + retry_info = literal_eval(fd.read()) except Exception: retry_info = {} @@ -147,8 +149,16 @@ def store_retry_delay(self, delay): if key not in retry_info: retry_info[key] = {} + delay = policy.get("delay", 900) retry_info[key]["retry_delay_until"] = time.time() + delay + retry_info[key]["increase_memory"] = policy.get("increase_memory", False) + retry_info[key]["increase_runtime"] = policy.get("increase_runtime", False) + retry_info[key]["change_site"] = policy.get("change_site", False) + retry_info[key]["memory_factor"] = policy.get("memory_factor", 1.0) + retry_info[key]["runtime_factor"] = policy.get("runtime_factor", 1.0) + retry_info[key]["previous_site"] = getattr(self, "site", None) + with open(retry_info_file + ".tmp", "w", encoding="utf-8") as fd: fd.write(str(retry_info)) os.rename(retry_info_file + ".tmp", retry_info_file) @@ -166,9 +176,8 @@ def apply_retry_policy(self, exitCode): if policy["type"] == "recoverable": if self.crab_retry >= policy["max_retries"]: raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") - delay = policy.get("delay", 900) - self.logger.info(f"Sleeping {delay} seconds before retry (exit code {exitCode})") - self.store_retry_delay(delay) + self.logger.info(f"Applying retry policy for exit code {exitCode}") + self.store_retry_actions(policy) if exitCode in [8020, 8021, 8022, 8028, 84, 85, 86, 92, 134, 8001, 65]: return raise RecoverableError(policy["msg"]) From b76d758413a05f1efaebeec8d8a43911615ff079 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Mon, 30 Mar 2026 16:57:25 +0200 Subject: [PATCH 04/27] add resubmit_counter and eff max retries --- .../TaskWorker/Actions/DagmanResubmitter.py | 2 ++ src/python/TaskWorker/Actions/PreJob.py | 4 ++- src/python/TaskWorker/Actions/RetryJob.py | 26 ++++++++++++++++++- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index 0a4e815e0f..5c3ad3461f 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -96,6 +96,8 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument else: newAdValue = str(task['resubmit_'+taskparam]) schedd.edit(rootConst, adparam, newAdValue) + # Set CRAB_ResubmitCounter + schedd.edit(rootConst, "CRAB_ResubmitCounter","ifThenElse(isUndefined(CRAB_ResubmitCounter), 1, CRAB_ResubmitCounter + 1)") # finally restart the dagman with the 3 lines below schedd.act(htcondor.JobAction.Hold, rootConst) schedd.edit(rootConst, "HoldKillSig", 'SIGUSR1') diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 8211be170d..d71904b541 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -319,6 +319,8 @@ def alter_submit(self, crab_retry): ## Save the (new) values of the resubmission parameters in self.resubmit_info ## for the current job retry number. + #get resubmission counter + resubmit_counter = int(self.task_ad.get("CRAB_ResubmitCounter", 0)) outkey = str(crab_retry) if outkey not in self.resubmit_info: self.resubmit_info[outkey] = {} @@ -328,7 +330,7 @@ def alter_submit(self, crab_retry): self.resubmit_info[outkey]['priority'] = priority self.resubmit_info[outkey]['use_resubmit_info'] = use_resubmit_info self.resubmit_info[outkey]['CRAB_ResubmitList_in_taskad'] = CRAB_ResubmitList_in_taskad - + self.resubmit_info[outkey]["resubmit_counter"] = resubmit_counter ## Add the resubmission parameters to the Job..submit content. ## if self.stage == "probe": diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index 0843a7bb2c..9d683df5c2 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -165,6 +165,29 @@ def store_retry_actions(self, policy): # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def calculate_effective_max_retries(self, policy): + retry_info_file = f"resubmit_info/job.{self.job_id}.txt" + retry_info = {} + + if os.path.exists(retry_info_file): + try: + with open(retry_info_file, "r", encoding="utf-8") as fd: + retry_info = literal_eval(fd.read()) + except Exception: + retry_info = {} + + key = str(self.crab_retry) + if key not in retry_info: + retry_info[key] = {} + + entry = retry_info.get(key, {}) + resubmit_counter = entry.get("resubmit_counter", 0) + base_max = policy["max_retries"] + effective_max_retries = (base_max + 1)*(resubmit_counter + 1) - 1 + self.logger.info(f"Resubmit counter = {resubmit_counter}, effective max retries = {effective_max_retries}") + + return effective_max_retries + def apply_retry_policy(self, exitCode): """ @@ -174,7 +197,8 @@ def apply_retry_policy(self, exitCode): policy = EXIT_RETRY_POLICY.get(exitCode, EXIT_RETRY_POLICY["default"]) if policy["type"] == "recoverable": - if self.crab_retry >= policy["max_retries"]: + effective_max_retries = self.calculate_effective_max_retries(policy) + if self.crab_retry >= effective_max_retries: raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") self.logger.info(f"Applying retry policy for exit code {exitCode}") self.store_retry_actions(policy) From 69afe5510f8349032ebf1b39ff03584af8d2fcad Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 31 Mar 2026 10:14:18 +0200 Subject: [PATCH 05/27] use ExprTree --- src/python/TaskWorker/Actions/DagmanResubmitter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index 5c3ad3461f..cb407d1830 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -97,7 +97,8 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument newAdValue = str(task['resubmit_'+taskparam]) schedd.edit(rootConst, adparam, newAdValue) # Set CRAB_ResubmitCounter - schedd.edit(rootConst, "CRAB_ResubmitCounter","ifThenElse(isUndefined(CRAB_ResubmitCounter), 1, CRAB_ResubmitCounter + 1)") + resubmitCounterExpr = classad.ExprTree("ifThenElse(isUndefined(CRAB_ResubmitCounter), 1, CRAB_ResubmitCounter + 1)") + schedd.edit(rootConst, "CRAB_ResubmitCounter", resubmitCounterExpr) # finally restart the dagman with the 3 lines below schedd.act(htcondor.JobAction.Hold, rootConst) schedd.edit(rootConst, "HoldKillSig", 'SIGUSR1') From 5b923346ddfb112a0fc13a9545e4686008f725af Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 31 Mar 2026 11:55:33 +0200 Subject: [PATCH 06/27] remove abort and chnage jobads too --- src/python/TaskWorker/Actions/DagmanCreator.py | 3 +-- src/python/TaskWorker/Actions/DagmanResubmitter.py | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index c38202c08a..b9ae6bb00b 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -62,7 +62,6 @@ VARS Job{count} My.CRAB_localOutputFiles="\\"{localOutputFiles}\\"" VARS Job{count} My.CRAB_DataBlock="\\"{block}\\"" VARS Job{count} My.CRAB_Destination="\\"{destination}\\"" -ABORT-DAG-ON Job{count} 3 """ @@ -346,7 +345,7 @@ def makeJobSubmit(self): jobSubmit['My.CRAB_Publish'] = "1" if task['tm_publication'] == 'T' else "0" jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) - + jobSubmit['My.CRAB_ResubmitCounter'] = "0" # note about Lists # in the JDL everything is a string, we can't use the simple classAd[name]=somelist diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index cb407d1830..47a74273e2 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -96,9 +96,11 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument else: newAdValue = str(task['resubmit_'+taskparam]) schedd.edit(rootConst, adparam, newAdValue) - # Set CRAB_ResubmitCounter + # Set CRAB_ResubmitCounter - increment on both DAG ad and individual job ads resubmitCounterExpr = classad.ExprTree("ifThenElse(isUndefined(CRAB_ResubmitCounter), 1, CRAB_ResubmitCounter + 1)") + jobConst = f"(CRAB_ReqName =?= {classad.quote(workflow)} && CRAB_DAGType =?= \"Job\")" schedd.edit(rootConst, "CRAB_ResubmitCounter", resubmitCounterExpr) + schedd.edit(jobConst, "CRAB_ResubmitCounter", resubmitCounterExpr) # finally restart the dagman with the 3 lines below schedd.act(htcondor.JobAction.Hold, rootConst) schedd.edit(rootConst, "HoldKillSig", 'SIGUSR1') From 9bd82248a6f6b715958ba05f8e7877cb6af3d22e Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 31 Mar 2026 13:29:33 +0200 Subject: [PATCH 07/27] remove jobconst, add to adstoPort --- src/python/TaskWorker/Actions/DagmanResubmitter.py | 4 +--- src/python/TaskWorker/Actions/DagmanSubmitter.py | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index 47a74273e2..cb407d1830 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -96,11 +96,9 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument else: newAdValue = str(task['resubmit_'+taskparam]) schedd.edit(rootConst, adparam, newAdValue) - # Set CRAB_ResubmitCounter - increment on both DAG ad and individual job ads + # Set CRAB_ResubmitCounter resubmitCounterExpr = classad.ExprTree("ifThenElse(isUndefined(CRAB_ResubmitCounter), 1, CRAB_ResubmitCounter + 1)") - jobConst = f"(CRAB_ReqName =?= {classad.quote(workflow)} && CRAB_DAGType =?= \"Job\")" schedd.edit(rootConst, "CRAB_ResubmitCounter", resubmitCounterExpr) - schedd.edit(jobConst, "CRAB_ResubmitCounter", resubmitCounterExpr) # finally restart the dagman with the 3 lines below schedd.act(htcondor.JobAction.Hold, rootConst) schedd.edit(rootConst, "HoldKillSig", 'SIGUSR1') diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index 3bcb0b0d5c..4e4969d151 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -87,6 +87,7 @@ def addJobSubmitInfoToDagJobJDL(dagJdl, jobSubmit): 'My.CMS_Type', 'My.CMS_WMTool', 'My.CMS_TaskType', + 'My.CRAB_ResubmitCounter', ] for adName in adsToPort: From 9fca35e89cbe6bc0370a83d2e18a7c218dab99a4 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 31 Mar 2026 15:28:01 +0200 Subject: [PATCH 08/27] strictly policy dependent --- src/python/TaskWorker/Actions/RetryJob.py | 98 +++++++++++------------ 1 file changed, 48 insertions(+), 50 deletions(-) diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index 9d683df5c2..e15de078cd 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -202,8 +202,6 @@ def apply_retry_policy(self, exitCode): raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") self.logger.info(f"Applying retry policy for exit code {exitCode}") self.store_retry_actions(policy) - if exitCode in [8020, 8021, 8022, 8028, 84, 85, 86, 92, 134, 8001, 65]: - return raise RecoverableError(policy["msg"]) if policy["type"] == "neutral": @@ -432,58 +430,58 @@ def check_exit_code(self): msg = "Job and stageout wrappers finished successfully (exit code %d)." % (exitCode) self.logger.info(msg) return 0 + # else: + # raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n '))) msg = "Job or stageout wrapper finished with exit code %d." % (exitCode) self.logger.info(msg) - # Wrapper script sometimes returns the posix return code (8 bits). - if exitCode in [8020, 8021, 8022, 8028] or exitCode in [84, 85, 86, 92]: - try: # the following is still a bit experimental, make sure it never crashes the PJ - corruptedInputFile = self.check_corrupted_file(exitCode) - except Exception as e: # pylint: disable=broad-except - msg = f"check_corrupted_file raised an exception:\n{e}\nIgnore and go on" - self.logger.error(msg) - corruptedInputFile = False - if corruptedInputFile: - exitMsg = "Fatal Root Error maybe a corrupted input file. This error is being reported" - self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) # retry the job - raise RecoverableError("Job failed to open local and fallback files.") - - if exitCode == 134: - recoverable_signal = False - try: - fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) - with open(fname, encoding='utf-8') as fd: - for line in fd: - if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"): - recoverable_signal = True - break - except Exception: # pylint: disable=broad-except - msg = "Error analyzing abort signal." - msg += "\nDetails follow:" - self.logger.exception(msg) - if recoverable_signal: - raise RecoverableError("SIGILL; may indicate a worker node issue.") - - if exitCode == 8001 or exitCode == 65: - cvmfs_issue = False - try: - fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) - cvmfs_issue_re = re.compile("== CMSSW: unable to load /cvmfs/.*file too short") - with open(fname, encoding='utf-8') as fd: - for line in fd: - if cvmfs_issue_re.match(line): - cvmfs_issue = True - break - except Exception: # pylint: disable=broad-except - msg = "Error analyzing output for CVMFS issues." - msg += "\nDetails follow:" - self.logger.exception(msg) - if cvmfs_issue: - raise RecoverableError("CVMFS issue detected.") - - if exitCode: - raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n '))) + # # Wrapper script sometimes returns the posix return code (8 bits). + # if exitCode in [8020, 8021, 8022, 8028] or exitCode in [84, 85, 86, 92]: + # try: # the following is still a bit experimental, make sure it never crashes the PJ + # corruptedInputFile = self.check_corrupted_file(exitCode) + # except Exception as e: # pylint: disable=broad-except + # msg = f"check_corrupted_file raised an exception:\n{e}\nIgnore and go on" + # self.logger.error(msg) + # corruptedInputFile = False + # if corruptedInputFile: + # exitMsg = "Fatal Root Error maybe a corrupted input file. This error is being reported" + # self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) # retry the job + # raise RecoverableError("Job failed to open local and fallback files.") + + # if exitCode == 134: + # recoverable_signal = False + # try: + # fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) + # with open(fname, encoding='utf-8') as fd: + # for line in fd: + # if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"): + # recoverable_signal = True + # break + # except Exception: # pylint: disable=broad-except + # msg = "Error analyzing abort signal." + # msg += "\nDetails follow:" + # self.logger.exception(msg) + # if recoverable_signal: + # raise RecoverableError("SIGILL; may indicate a worker node issue.") + + # if exitCode == 8001 or exitCode == 65: + # cvmfs_issue = False + # try: + # fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) + # cvmfs_issue_re = re.compile("== CMSSW: unable to load /cvmfs/.*file too short") + # with open(fname, encoding='utf-8') as fd: + # for line in fd: + # if cvmfs_issue_re.match(line): + # cvmfs_issue = True + # break + # except Exception: # pylint: disable=broad-except + # msg = "Error analyzing output for CVMFS issues." + # msg += "\nDetails follow:" + # self.logger.exception(msg) + # if cvmfs_issue: + # raise RecoverableError("CVMFS issue detected.") + return 0 From 8b3db7ea515b7221cc4e7f37126544dcbd212d21 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 31 Mar 2026 16:37:06 +0200 Subject: [PATCH 09/27] avoid exprtree --- src/python/TaskWorker/Actions/DagmanResubmitter.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index cb407d1830..f398ac0de9 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -96,9 +96,11 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument else: newAdValue = str(task['resubmit_'+taskparam]) schedd.edit(rootConst, adparam, newAdValue) - # Set CRAB_ResubmitCounter - resubmitCounterExpr = classad.ExprTree("ifThenElse(isUndefined(CRAB_ResubmitCounter), 1, CRAB_ResubmitCounter + 1)") - schedd.edit(rootConst, "CRAB_ResubmitCounter", resubmitCounterExpr) + dagAds = schedd.query(rootConst, ['CRAB_ResubmitCounter']) + current = 0 + if dagAds and isinstance(dagAds[0].get('CRAB_ResubmitCounter'), int): + current = dagAds[0]['CRAB_ResubmitCounter'] + schedd.edit(rootConst, "CRAB_ResubmitCounter", str(current + 1)) # finally restart the dagman with the 3 lines below schedd.act(htcondor.JobAction.Hold, rootConst) schedd.edit(rootConst, "HoldKillSig", 'SIGUSR1') From 1e980e2d9bd9f52d46f5f184217adc155aee0799 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Wed, 1 Apr 2026 13:11:55 +0200 Subject: [PATCH 10/27] proper use of inkey and test with 8020 --- src/python/TaskWorker/Actions/PreJob.py | 11 +++++++---- src/python/TaskWorker/Actions/RetryJob.py | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index d71904b541..d468857e9a 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -394,6 +394,9 @@ def redoSites(self, crab_retry, use_resubmit_info): siteWhiteList = [] siteBlackSet = set() siteWhiteSet = set() + inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) + while inkey not in self.resubmit_info and int(inkey) > 0: + inkey = str(int(inkey) - 1) if not use_resubmit_info: if 'CRAB_SiteBlacklist' in self.task_ad: if self.task_ad['CRAB_SiteBlacklist']: # skip ad='' @@ -404,9 +407,6 @@ def redoSites(self, crab_retry, use_resubmit_info): siteWhiteList = self.task_ad['CRAB_SiteWhitelist'] siteWhiteSet = set(siteWhiteList) else: - inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) - while inkey not in self.resubmit_info and int(inkey) > 0: - inkey = str(int(inkey) - 1) siteBlackSet = set(self.resubmit_info[inkey].get('site_blacklist', [])) siteWhiteSet = set(self.resubmit_info[inkey].get('site_whitelist', [])) ## Save the current site black- and whitelists in self.resubmit_info for the @@ -437,12 +437,15 @@ def redoSites(self, crab_retry, use_resubmit_info): sys.exit(self.prejob_exit_code) # ExitCode Dependent discard of previous_site - retry_data = self.resubmit_info.get(str(crab_retry), {}) + retry_data = self.resubmit_info.get(inkey, {}) previous_site = retry_data.get("previous_site") if retry_data.get("change_site") and previous_site: + self.logger.info(f"Last Exit Code indicated that a change in site might help. inkey was {inkey}") if previous_site in availableSet: self.logger.info(f"Removing previous site {previous_site} from candidate sites") availableSet.discard(previous_site) + else: + self.logger.info(f"No need to discard last site. inkey was {inkey}") ## Make sure that attributest which will be used in MatchMaking are SORTED lists available = list(availableSet) diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index e15de078cd..af0287e44f 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -39,7 +39,7 @@ 151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 8028: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job failed to open local and fallback files."}, 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True}, - 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True}, + 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "increase_memory": True, "memory_factor": 1.3, "increase_runtime": True, "runtime_factor": 1.3}, 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError."}, 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file."}, 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, From f492af66b849d4ab390beaca4b2ea6ac96cb8d11 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Wed, 1 Apr 2026 15:58:14 +0200 Subject: [PATCH 11/27] be free of use_resubmit_info --- src/python/TaskWorker/Actions/PreJob.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index d468857e9a..86711d8e2d 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -268,6 +268,10 @@ def alter_submit(self, crab_retry): maxmemory = None numcores = None priority = None + inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) + while inkey not in self.resubmit_info and int(inkey) > 0: + inkey = str(int(inkey) - 1) + self.logger.info(f"use_resubmit_info is {use_resubmit_info} and inkey is {inkey}") if not use_resubmit_info: # means thad we resubmit with new params from crab resubmit #if 'MaxWallTimeMins_RAW' in self.task_ad: # if self.task_ad['MaxWallTimeMins_RAW'] != 1315: @@ -291,9 +295,6 @@ def alter_submit(self, crab_retry): priority = 20 #the maximum for splitting jobs else: # means we resubmit with same params as previous try ## SB most likely much (all) of this string/int conversions can be simplified - inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) - while inkey not in self.resubmit_info and int(inkey) > 0: - inkey = str(int(inkey) - 1) maxjobruntime = self.resubmit_info[inkey].get('maxjobruntime') maxmemory = self.resubmit_info[inkey].get('maxmemory') numcores = self.resubmit_info[inkey].get('numcores') @@ -301,8 +302,8 @@ def alter_submit(self, crab_retry): #ExitCode Dependent change in resubmission parameters for retries + if self.resubmit_info: retry_data = self.resubmit_info.get(inkey, {}) - if retry_data.get("increase_memory") and maxmemory: factor = retry_data.get("memory_factor", 1.2) new_memory = int(int(maxmemory) * factor) From 12183e9a8fd47ebff9d284403394e478864a041b Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Fri, 10 Apr 2026 12:20:44 +0200 Subject: [PATCH 12/27] add handler, use_resubmit_info correct and increase limit --- .../TaskWorker/Actions/DagmanCreator.py | 2 +- src/python/TaskWorker/Actions/PreJob.py | 40 ++--- src/python/TaskWorker/Actions/RetryJob.py | 145 ++++++++++-------- 3 files changed, 105 insertions(+), 82 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index b9ae6bb00b..859cb97cb7 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -884,7 +884,7 @@ def createSubdag(self, splitterResult, **kwargs): ## In the future this parameter may be set by the user in the CRAB configuration ## file and we would take it from the Task DB. - self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 9) + self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 100) runtime = self.task['tm_split_args'].get('minutes_per_job', -1) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 86711d8e2d..8c1a80b503 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -268,11 +268,7 @@ def alter_submit(self, crab_retry): maxmemory = None numcores = None priority = None - inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) - while inkey not in self.resubmit_info and int(inkey) > 0: - inkey = str(int(inkey) - 1) - self.logger.info(f"use_resubmit_info is {use_resubmit_info} and inkey is {inkey}") - if not use_resubmit_info: # means thad we resubmit with new params from crab resubmit + if not use_resubmit_info: # means that we resubmit with new params from crab resubmit #if 'MaxWallTimeMins_RAW' in self.task_ad: # if self.task_ad['MaxWallTimeMins_RAW'] != 1315: # maxjobruntime = self.task_ad.lookup('MaxWallTimeMins_RAW') @@ -295,6 +291,10 @@ def alter_submit(self, crab_retry): priority = 20 #the maximum for splitting jobs else: # means we resubmit with same params as previous try ## SB most likely much (all) of this string/int conversions can be simplified + inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) + while inkey not in self.resubmit_info and int(inkey) > 0: + inkey = str(int(inkey) - 1) + self.logger.info(f"use_resubmit_info is {use_resubmit_info} and inkey is {inkey}") maxjobruntime = self.resubmit_info[inkey].get('maxjobruntime') maxmemory = self.resubmit_info[inkey].get('maxmemory') numcores = self.resubmit_info[inkey].get('numcores') @@ -302,21 +302,21 @@ def alter_submit(self, crab_retry): #ExitCode Dependent change in resubmission parameters for retries - if self.resubmit_info: - retry_data = self.resubmit_info.get(inkey, {}) - if retry_data.get("increase_memory") and maxmemory: - factor = retry_data.get("memory_factor", 1.2) - new_memory = int(int(maxmemory) * factor) - if hasattr(self, "MAX_MEMORY"): - new_memory = min(new_memory, self.MAX_MEMORY) - self.logger.info(f"Increasing memory from {maxmemory} to {new_memory}") - maxmemory = new_memory - - if retry_data.get("increase_runtime") and maxjobruntime: - factor = retry_data.get("runtime_factor", 1.2) - new_runtime = int(int(maxjobruntime) * factor) - self.logger.info(f"Increasing walltime from {maxjobruntime} to {new_runtime}") - maxjobruntime = new_runtime + if self.resubmit_info: + retry_data = self.resubmit_info.get(inkey, {}) + if retry_data.get("increase_memory") and maxmemory: + factor = retry_data.get("memory_factor", 1.2) + new_memory = int(int(maxmemory) * factor) + if hasattr(self, "MAX_MEMORY"): + new_memory = min(new_memory, self.MAX_MEMORY) + self.logger.info(f"Increasing memory from {maxmemory} to {new_memory}") + maxmemory = new_memory + + if retry_data.get("increase_runtime") and maxjobruntime: + factor = retry_data.get("runtime_factor", 1.2) + new_runtime = int(int(maxjobruntime) * factor) + self.logger.info(f"Increasing walltime from {maxjobruntime} to {new_runtime}") + maxjobruntime = new_runtime ## Save the (new) values of the resubmission parameters in self.resubmit_info ## for the current job retry number. diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index af0287e44f..16f7e3cad8 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -37,17 +37,17 @@ 147: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 60311: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, - 8028: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job failed to open local and fallback files."}, - 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True}, - 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "increase_memory": True, "memory_factor": 1.3, "increase_runtime": True, "runtime_factor": 1.3}, - 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError."}, - 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file."}, - 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, - 86: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, - 92: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."}, - 134: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Abort (ANSI) or IOT trap (4.2 BSD) (most likely user application crashed)."}, - 8001: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Other CMS Exception."}, - 65: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "End of job from user application (CMSSW)."}, + 8028: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, + 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "increase_memory": True, "memory_factor": 1.3, "increase_runtime": True, "runtime_factor": 1.3, "handler": "handle_file_open_or_root_error"}, + 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError.", "handler": "handle_file_open_or_root_error"}, + 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file.", "handler": "handle_file_open_or_root_error"}, + 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 86: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 92: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 134: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Abort (ANSI) or IOT trap (4.2 BSD) (most likely user application crashed).", "handler": "handle_sigabrt"}, + 8001: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Other CMS Exception.", "handler": "handle_cvmfs_or_cms_exception"}, + 65: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "End of job from user application (CMSSW).", "handler": "handle_cvmfs_or_cms_exception"}, "default": {"type": "neutral", "max_retries": 2, "delay": 900, "msg": "Taking default exit code retry policy route."} } @@ -188,6 +188,68 @@ def calculate_effective_max_retries(self, policy): return effective_max_retries + def handle_file_open_or_root_error(self, exitCode): + """ + Handle exit codes related to file open/read/root failures (8020, 8021, 8022, 8028, 84, 85, 86, 92). + Checks for corrupted input files; if found, creates a fake FJR with code 8022 + and allows a retry. Otherwise raises RecoverableError with the policy message. + """ + try: + corruptedInputFile = self.check_corrupted_file(exitCode) + except Exception as e: # pylint: disable=broad-except + self.logger.error(f"check_corrupted_file raised an exception:\n{e}\nIgnore and go on") + corruptedInputFile = False + if corruptedInputFile: + exitMsg = "Fatal Root Error: possible corrupted input file. Reporting and retrying." + self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) + raise RecoverableError(EXIT_RETRY_POLICY[exitCode]["msg"]) + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + + def handle_sigabrt(self, exitCode): + """ + Handle exit code 134 (SIGABRT / IOT trap). + Checks job stdout for a SIGILL pattern; if found, promotes to a recoverable + SIGILL error. Otherwise raises the default RecoverableError. + """ + recoverable_signal = False + try: + fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) + with open(fname, encoding='utf-8') as fd: + for line in fd: + if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"): + recoverable_signal = True + break + except Exception: # pylint: disable=broad-except + msg = "Error analyzing abort signal.\nDetails follow:" + self.logger.exception(msg) + if recoverable_signal: + raise RecoverableError("SIGILL; may indicate a worker node issue.") + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + + def handle_cvmfs_or_cms_exception(self, exitCode): + """ + Handle exit codes 8001 and 65 (CMS exceptions / end-of-job). + Scans job stdout for a truncated CVMFS file pattern; if found raises a + specific RecoverableError, otherwise raises the default one. + """ + cvmfs_issue = False + try: + fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) + cvmfs_issue_re = re.compile(r"== CMSSW: unable to load /cvmfs/.*file too short") + with open(fname, encoding='utf-8') as fd: + for line in fd: + if cvmfs_issue_re.match(line): + cvmfs_issue = True + break + except Exception: # pylint: disable=broad-except + msg = "Error analyzing output for CVMFS issues.\nDetails follow:" + self.logger.exception(msg) + if cvmfs_issue: + raise RecoverableError("CVMFS issue detected.") + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = def apply_retry_policy(self, exitCode): """ @@ -202,6 +264,14 @@ def apply_retry_policy(self, exitCode): raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") self.logger.info(f"Applying retry policy for exit code {exitCode}") self.store_retry_actions(policy) + # Dispatch to a handler if one is registered for this exit code. + handler_name = policy.get("handler") + if handler_name: + handler = getattr(self, handler_name, None) + if handler is None: + self.logger.error(f"Handler '{handler_name}' not found on RetryJob.") + else: + handler(exitCode) raise RecoverableError(policy["msg"]) if policy["type"] == "neutral": @@ -430,58 +500,11 @@ def check_exit_code(self): msg = "Job and stageout wrappers finished successfully (exit code %d)." % (exitCode) self.logger.info(msg) return 0 - # else: - # raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n '))) + else: + raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n '))) msg = "Job or stageout wrapper finished with exit code %d." % (exitCode) - self.logger.info(msg) - - # # Wrapper script sometimes returns the posix return code (8 bits). - # if exitCode in [8020, 8021, 8022, 8028] or exitCode in [84, 85, 86, 92]: - # try: # the following is still a bit experimental, make sure it never crashes the PJ - # corruptedInputFile = self.check_corrupted_file(exitCode) - # except Exception as e: # pylint: disable=broad-except - # msg = f"check_corrupted_file raised an exception:\n{e}\nIgnore and go on" - # self.logger.error(msg) - # corruptedInputFile = False - # if corruptedInputFile: - # exitMsg = "Fatal Root Error maybe a corrupted input file. This error is being reported" - # self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) # retry the job - # raise RecoverableError("Job failed to open local and fallback files.") - - # if exitCode == 134: - # recoverable_signal = False - # try: - # fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) - # with open(fname, encoding='utf-8') as fd: - # for line in fd: - # if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"): - # recoverable_signal = True - # break - # except Exception: # pylint: disable=broad-except - # msg = "Error analyzing abort signal." - # msg += "\nDetails follow:" - # self.logger.exception(msg) - # if recoverable_signal: - # raise RecoverableError("SIGILL; may indicate a worker node issue.") - - # if exitCode == 8001 or exitCode == 65: - # cvmfs_issue = False - # try: - # fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) - # cvmfs_issue_re = re.compile("== CMSSW: unable to load /cvmfs/.*file too short") - # with open(fname, encoding='utf-8') as fd: - # for line in fd: - # if cvmfs_issue_re.match(line): - # cvmfs_issue = True - # break - # except Exception: # pylint: disable=broad-except - # msg = "Error analyzing output for CVMFS issues." - # msg += "\nDetails follow:" - # self.logger.exception(msg) - # if cvmfs_issue: - # raise RecoverableError("CVMFS issue detected.") - + self.logger.info(msg) return 0 From b288bb8059c3f9e8f9471cba333f107dddba5379 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Mon, 13 Apr 2026 13:44:04 +0200 Subject: [PATCH 13/27] use upperbounds --- src/python/ServerUtilities.py | 3 ++- src/python/TaskWorker/Actions/PreJob.py | 8 ++++---- src/python/TaskWorker/Actions/RetryJob.py | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 9aa00ef192..d8183ca43a 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -68,7 +68,8 @@ MAX_MEMORY_SINGLE_CORE = 3000 MAX_MEMORY_SINGLE_CORE_ON_RESUBMIT = 5000 MAX_DISK_SPACE = 20000000 # Disk usage is not used from .job.ad as CRAB3 is not seeting it. 20GB is max. - +MAX_MEMORY_AUTOMATIC_RESUBMIT = 7500 +MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT = 47 * 60 * 60 MAX_IDLE_JOBS = 1000 MAX_POST_JOBS = 10 diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 8c1a80b503..152a3ef4ba 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -13,7 +13,7 @@ import logging from ast import literal_eval -from ServerUtilities import getWebdirForDb, insertJobIdSid, pythonListToClassAdExprTree +from ServerUtilities import getWebdirForDb, insertJobIdSid, pythonListToClassAdExprTree, MAX_MEMORY_AUTOMATIC_RESUBMIT, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT from TaskWorker.Actions.RetryJob import JOB_RETURN_CODES import htcondor2 as htcondor @@ -300,21 +300,21 @@ def alter_submit(self, crab_retry): numcores = self.resubmit_info[inkey].get('numcores') priority = self.resubmit_info[inkey].get('priority') - #ExitCode Dependent change in resubmission parameters for retries + #ExitCode Dependent automatic change in resubmission parameters if self.resubmit_info: retry_data = self.resubmit_info.get(inkey, {}) if retry_data.get("increase_memory") and maxmemory: factor = retry_data.get("memory_factor", 1.2) new_memory = int(int(maxmemory) * factor) - if hasattr(self, "MAX_MEMORY"): - new_memory = min(new_memory, self.MAX_MEMORY) + new_memory = min(new_memory, MAX_MEMORY_AUTOMATIC_RESUBMIT) self.logger.info(f"Increasing memory from {maxmemory} to {new_memory}") maxmemory = new_memory if retry_data.get("increase_runtime") and maxjobruntime: factor = retry_data.get("runtime_factor", 1.2) new_runtime = int(int(maxjobruntime) * factor) + new_runtime = min(new_runtime, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT) self.logger.info(f"Increasing walltime from {maxjobruntime} to {new_runtime}") maxjobruntime = new_runtime diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index 16f7e3cad8..2bba47324a 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -39,7 +39,7 @@ 151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 8028: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, - 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "increase_memory": True, "memory_factor": 1.3, "increase_runtime": True, "runtime_factor": 1.3, "handler": "handle_file_open_or_root_error"}, + 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError.", "handler": "handle_file_open_or_root_error"}, 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file.", "handler": "handle_file_open_or_root_error"}, 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, From 1306f2b0afc5df0d649a329f868855616f244e6e Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Fri, 17 Apr 2026 13:44:35 +0200 Subject: [PATCH 14/27] address comments --- src/python/ServerUtilities.py | 14 +++++++------- src/python/TaskWorker/Actions/DagmanCreator.py | 3 ++- .../TaskWorker/Actions/DagmanResubmitter.py | 2 +- src/python/TaskWorker/Actions/DagmanSubmitter.py | 2 +- src/python/TaskWorker/Actions/PreJob.py | 16 +++++++++++----- src/python/TaskWorker/Actions/RetryJob.py | 7 ++++--- 6 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index d8183ca43a..6a164af50f 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -61,15 +61,15 @@ # Fatal error limits for job resource usage # Defaults are used if unable to load from .job.ad # Otherwise it uses these values. -MAX_WALLTIME = 21 * 60 * 60 + 30 * 60 -MAX_MEMORY = 2 * 1024 +MAX_WALLTIME = 21 * 60 * 60 + 30 * 60 # 21.5 hours in seconds +MAX_MEMORY = 2 * 1024 # 2048 MB # see https://github.com/dmwm/CRABServer/issues/5995 -MAX_MEMORY_PER_CORE = 2500 -MAX_MEMORY_SINGLE_CORE = 3000 -MAX_MEMORY_SINGLE_CORE_ON_RESUBMIT = 5000 +MAX_MEMORY_PER_CORE = 2500 # 2500 MB +MAX_MEMORY_SINGLE_CORE = 3000 # 3000 MB +MAX_MEMORY_SINGLE_CORE_ON_RESUBMIT = 5000 # 5000 MB MAX_DISK_SPACE = 20000000 # Disk usage is not used from .job.ad as CRAB3 is not seeting it. 20GB is max. -MAX_MEMORY_AUTOMATIC_RESUBMIT = 7500 -MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT = 47 * 60 * 60 +MAX_MEMORY_AUTOMATIC_RESUBMIT = 7500 # 7500 MB +MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT = 47 * 60 * 60 # 47 hours in seconds MAX_IDLE_JOBS = 1000 MAX_POST_JOBS = 10 diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 859cb97cb7..45e87f3972 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -62,6 +62,7 @@ VARS Job{count} My.CRAB_localOutputFiles="\\"{localOutputFiles}\\"" VARS Job{count} My.CRAB_DataBlock="\\"{block}\\"" VARS Job{count} My.CRAB_Destination="\\"{destination}\\"" +ABORT-DAG-ON Job{count} 3 """ @@ -345,7 +346,7 @@ def makeJobSubmit(self): jobSubmit['My.CRAB_Publish'] = "1" if task['tm_publication'] == 'T' else "0" jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) - jobSubmit['My.CRAB_ResubmitCounter'] = "0" + jobSubmit['My.CRAB_ResubmitCounter'] = 0 # note about Lists # in the JDL everything is a string, we can't use the simple classAd[name]=somelist diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index f398ac0de9..cf9c8187f3 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -86,7 +86,7 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument # is saving the values of the parameters for each job retry in text files (the # files are in the directory resubmit_info in the schedd). # We use classAds here as way to pass informations to PreJobs. - # Value in schedd.exit(const, ad, value) can be string or ExprTree + # Value in schedd.edit(const, ad, value) can be string or ExprTree # https://htcondor.readthedocs.io/en/latest/apis/python-bindings/api/version2/htcondor2/schedd.html#htcondor2.Schedd.edit # We need ExprTree when the ad correspond to a python list for adparam, taskparam in params.items(): diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index 4e4969d151..faff27d5b0 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -61,6 +61,7 @@ def addJobSubmitInfoToDagJobJDL(dagJdl, jobSubmit): 'My.MaxWallTimeMinsRun', 'My.MaxWallTimeMinsProbe', 'My.MaxWallTimeMinsTail', + 'My.CRAB_ResubmitCounter', # these are used in PostJob only 'My.CRAB_UserRole', 'My.CRAB_UserGroup', @@ -87,7 +88,6 @@ def addJobSubmitInfoToDagJobJDL(dagJdl, jobSubmit): 'My.CMS_Type', 'My.CMS_WMTool', 'My.CMS_TaskType', - 'My.CRAB_ResubmitCounter', ] for adName in adsToPort: diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 152a3ef4ba..d7dfc74f69 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -306,22 +306,23 @@ def alter_submit(self, crab_retry): retry_data = self.resubmit_info.get(inkey, {}) if retry_data.get("increase_memory") and maxmemory: factor = retry_data.get("memory_factor", 1.2) - new_memory = int(int(maxmemory) * factor) + new_memory = int(maxmemory * factor) new_memory = min(new_memory, MAX_MEMORY_AUTOMATIC_RESUBMIT) self.logger.info(f"Increasing memory from {maxmemory} to {new_memory}") maxmemory = new_memory if retry_data.get("increase_runtime") and maxjobruntime: factor = retry_data.get("runtime_factor", 1.2) - new_runtime = int(int(maxjobruntime) * factor) + new_runtime = int(maxjobruntime * factor) new_runtime = min(new_runtime, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT) self.logger.info(f"Increasing walltime from {maxjobruntime} to {new_runtime}") maxjobruntime = new_runtime + ## get resubmission counter + resubmit_counter = self.task_ad.get("CRAB_ResubmitCounter", 0) + ## Save the (new) values of the resubmission parameters in self.resubmit_info ## for the current job retry number. - #get resubmission counter - resubmit_counter = int(self.task_ad.get("CRAB_ResubmitCounter", 0)) outkey = str(crab_retry) if outkey not in self.resubmit_info: self.resubmit_info[outkey] = {} @@ -442,7 +443,7 @@ def redoSites(self, crab_retry, use_resubmit_info): previous_site = retry_data.get("previous_site") if retry_data.get("change_site") and previous_site: self.logger.info(f"Last Exit Code indicated that a change in site might help. inkey was {inkey}") - if previous_site in availableSet: + if previous_site in availableSet and len(availableSet) > 1: self.logger.info(f"Removing previous site {previous_site} from candidate sites") availableSet.discard(previous_site) else: @@ -497,6 +498,11 @@ def needsDefer(self): currentTime = time.time() # Check retry delay from resubmit_info + # Inside SPOOL_DIR we have resubmit_info folder with text files job.{self.job_id}.txt for each job id + # Each file is a dictionary of the format example {0:{'maxmemory':2000, ...}, 1:{}, 2:{}, ...} + # where 0,1,2,... are crab_retry numbers. A lot of vital info like site_whitelist, retry_delay_until, + # resubmit_counter, use_resubmit_info, etc is stored and read from this file for each retry across resubmissions. + # ToDo: Convert this file to json format retry_info_file = f"resubmit_info/job.{self.job_id}.txt" if os.path.exists(retry_info_file): try: diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index 2bba47324a..cc69d57d30 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -20,12 +20,13 @@ # ---------------------------------------------------------------------- # Exit-code dependent retry policy # ---------------------------------------------------------------------- +# ToDo: Reduce Redundance across short and long exit codes EXIT_RETRY_POLICY = { 1: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to bootstrap CMSSW; likely a worker node issue."}, 50513: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, 81: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, - 50115: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, + 50115: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, 195: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, 137: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "SIGKILL; likely an unrelated batch system kill."}, 10034: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."}, @@ -37,9 +38,9 @@ 147: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 60311: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, - 8028: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 8028: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, - 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, + 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "handler": "handle_file_open_or_root_error", "increase_runtime": True, "runtime_factor": 1.3, "increase_memory": True, "memory_factor": 1.3}, 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError.", "handler": "handle_file_open_or_root_error"}, 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file.", "handler": "handle_file_open_or_root_error"}, 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, From 3342bddd7eda79ad0ca5c1998b5132c3642c2d7a Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Mon, 20 Apr 2026 15:04:09 +0200 Subject: [PATCH 15/27] add handler info --- src/python/TaskWorker/Actions/RetryJob.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index cc69d57d30..adedce1323 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -203,6 +203,7 @@ def handle_file_open_or_root_error(self, exitCode): if corruptedInputFile: exitMsg = "Fatal Root Error: possible corrupted input file. Reporting and retrying." self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) + self.logger.info(f"Handler function ran successfully for exit Code {exitCode}") raise RecoverableError(EXIT_RETRY_POLICY[exitCode]["msg"]) # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = @@ -224,6 +225,7 @@ def handle_sigabrt(self, exitCode): except Exception: # pylint: disable=broad-except msg = "Error analyzing abort signal.\nDetails follow:" self.logger.exception(msg) + self.logger.info(f"Handler function ran successfully for exit Code {exitCode}") if recoverable_signal: raise RecoverableError("SIGILL; may indicate a worker node issue.") @@ -247,6 +249,7 @@ def handle_cvmfs_or_cms_exception(self, exitCode): except Exception: # pylint: disable=broad-except msg = "Error analyzing output for CVMFS issues.\nDetails follow:" self.logger.exception(msg) + self.logger.info(f"Handler function ran successfully for exit Code {exitCode}") if cvmfs_issue: raise RecoverableError("CVMFS issue detected.") From 7f7fa6a1fe0f5bce041a56b695e8734d7274f35f Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Mon, 20 Apr 2026 16:19:17 +0200 Subject: [PATCH 16/27] add handler info --- src/python/TaskWorker/Actions/DagmanCreator.py | 2 +- src/python/TaskWorker/Actions/PreJob.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 45e87f3972..a774735d37 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -346,7 +346,7 @@ def makeJobSubmit(self): jobSubmit['My.CRAB_Publish'] = "1" if task['tm_publication'] == 'T' else "0" jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) - jobSubmit['My.CRAB_ResubmitCounter'] = 0 + jobSubmit['My.CRAB_ResubmitCounter'] = "0" # note about Lists # in the JDL everything is a string, we can't use the simple classAd[name]=somelist diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index d7dfc74f69..b1c73d77c9 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -319,7 +319,7 @@ def alter_submit(self, crab_retry): maxjobruntime = new_runtime ## get resubmission counter - resubmit_counter = self.task_ad.get("CRAB_ResubmitCounter", 0) + resubmit_counter = int(self.task_ad.get("CRAB_ResubmitCounter", 0)) ## Save the (new) values of the resubmission parameters in self.resubmit_info ## for the current job retry number. From b2f179d09e8f07e0a68a0010e347df8c48eaea6e Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Wed, 22 Apr 2026 13:41:48 +0200 Subject: [PATCH 17/27] use job specific resubmit_counter --- src/python/TaskWorker/Actions/PreJob.py | 36 +++++++++++++++++++---- src/python/TaskWorker/Actions/RetryJob.py | 6 ++-- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index b1c73d77c9..60ea776639 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -12,7 +12,7 @@ import errno import logging from ast import literal_eval - +from RetryJob import EXIT_RETRY_POLICY from ServerUtilities import getWebdirForDb, insertJobIdSid, pythonListToClassAdExprTree, MAX_MEMORY_AUTOMATIC_RESUBMIT, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT from TaskWorker.Actions.RetryJob import JOB_RETURN_CODES @@ -182,6 +182,22 @@ def get_resubmit_info(self): with open(file_name, 'r', encoding='utf-8') as fd: self.resubmit_info = literal_eval(fd.read()) + def get_resubmit_counter(self, exit_code, crab_retry): + """ + Calculate the resubmit counter based on the exit code's max_retries + and the current crab_retry value. + + Formula: resubmit_counter = crab_retry // (max_retries + 1) + + Examples: + max_retries=2: [0,2] -> 0, [3,5] -> 1, [6,8] -> 2, ... + max_retries=9: [0,9] -> 0, [10,19] -> 1, [20,29] -> 2, ... + """ + policy = EXIT_RETRY_POLICY.get(exit_code, EXIT_RETRY_POLICY["default"]) + max_retries = policy["max_retries"] + resubmit_counter = crab_retry // (max_retries + 1) + self.logger.info(f"Resubmit Counter was calculated to be {resubmit_counter}") + return resubmit_counter def save_resubmit_info(self): """ @@ -268,6 +284,9 @@ def alter_submit(self, crab_retry): maxmemory = None numcores = None priority = None + inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) + while inkey not in self.resubmit_info and int(inkey) > 0: + inkey = str(int(inkey) - 1) if not use_resubmit_info: # means that we resubmit with new params from crab resubmit #if 'MaxWallTimeMins_RAW' in self.task_ad: # if self.task_ad['MaxWallTimeMins_RAW'] != 1315: @@ -291,9 +310,6 @@ def alter_submit(self, crab_retry): priority = 20 #the maximum for splitting jobs else: # means we resubmit with same params as previous try ## SB most likely much (all) of this string/int conversions can be simplified - inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) - while inkey not in self.resubmit_info and int(inkey) > 0: - inkey = str(int(inkey) - 1) self.logger.info(f"use_resubmit_info is {use_resubmit_info} and inkey is {inkey}") maxjobruntime = self.resubmit_info[inkey].get('maxjobruntime') maxmemory = self.resubmit_info[inkey].get('maxmemory') @@ -319,7 +335,17 @@ def alter_submit(self, crab_retry): maxjobruntime = new_runtime ## get resubmission counter - resubmit_counter = int(self.task_ad.get("CRAB_ResubmitCounter", 0)) + if inkey == "0": + resubmit_counter = 0 + self.logger.info("inkey is 0. resubmit_counter is set to 0.") + else: + lastExitCode = self.resubmit_info[inkey].get('exitCode') + self.logger.info(f"Last Exit Code was {lastExitCode}") + try: + resubmit_counter = self.get_resubmit_counter(lastExitCode, crab_retry) + except Exception: + self.logger.info("Exception occured while trying to get resubmit counter. Setting to 0") + resubmit_counter = 0 ## Save the (new) values of the resubmission parameters in self.resubmit_info ## for the current job retry number. diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index adedce1323..b8a5ae9ada 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -135,7 +135,7 @@ def get_job_ad_from_condor_q(self): # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = - def store_retry_actions(self, policy): + def store_retry_actions(self, policy, exitCode): retry_info_file = f"resubmit_info/job.{self.job_id}.txt" retry_info = {} @@ -152,7 +152,7 @@ def store_retry_actions(self, policy): delay = policy.get("delay", 900) retry_info[key]["retry_delay_until"] = time.time() + delay - + retry_info[key]["exitCode"] = exitCode retry_info[key]["increase_memory"] = policy.get("increase_memory", False) retry_info[key]["increase_runtime"] = policy.get("increase_runtime", False) retry_info[key]["change_site"] = policy.get("change_site", False) @@ -267,7 +267,7 @@ def apply_retry_policy(self, exitCode): if self.crab_retry >= effective_max_retries: raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") self.logger.info(f"Applying retry policy for exit code {exitCode}") - self.store_retry_actions(policy) + self.store_retry_actions(policy, exitCode) # Dispatch to a handler if one is registered for this exit code. handler_name = policy.get("handler") if handler_name: From 3bdc41dcd3c2d34571a99adccb9d132e943ecc89 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Wed, 22 Apr 2026 15:33:40 +0200 Subject: [PATCH 18/27] fix import --- src/python/TaskWorker/Actions/PreJob.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 60ea776639..43c28fb0d8 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -12,9 +12,9 @@ import errno import logging from ast import literal_eval -from RetryJob import EXIT_RETRY_POLICY + from ServerUtilities import getWebdirForDb, insertJobIdSid, pythonListToClassAdExprTree, MAX_MEMORY_AUTOMATIC_RESUBMIT, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT -from TaskWorker.Actions.RetryJob import JOB_RETURN_CODES +from TaskWorker.Actions.RetryJob import JOB_RETURN_CODES, EXIT_RETRY_POLICY import htcondor2 as htcondor import classad2 as classad From 359bc859c8f1a8f9be114e525c634236bfa03c27 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Wed, 22 Apr 2026 17:45:43 +0200 Subject: [PATCH 19/27] fix order of store_retry_actions --- src/python/ServerUtilities.py | 2 +- src/python/TaskWorker/Actions/RetryJob.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 6a164af50f..3e5f773ef7 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -69,7 +69,7 @@ MAX_MEMORY_SINGLE_CORE_ON_RESUBMIT = 5000 # 5000 MB MAX_DISK_SPACE = 20000000 # Disk usage is not used from .job.ad as CRAB3 is not seeting it. 20GB is max. MAX_MEMORY_AUTOMATIC_RESUBMIT = 7500 # 7500 MB -MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT = 47 * 60 * 60 # 47 hours in seconds +MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT = 47 * 60 # 47 hours in minutes MAX_IDLE_JOBS = 1000 MAX_POST_JOBS = 10 diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index b8a5ae9ada..d2a192bb36 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -261,13 +261,12 @@ def apply_retry_policy(self, exitCode): Raises FatalError if retry limit exceeded. """ policy = EXIT_RETRY_POLICY.get(exitCode, EXIT_RETRY_POLICY["default"]) - + self.logger.info(f"Applying retry policy for exit code {exitCode}") + self.store_retry_actions(policy, exitCode) if policy["type"] == "recoverable": effective_max_retries = self.calculate_effective_max_retries(policy) if self.crab_retry >= effective_max_retries: raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") - self.logger.info(f"Applying retry policy for exit code {exitCode}") - self.store_retry_actions(policy, exitCode) # Dispatch to a handler if one is registered for this exit code. handler_name = policy.get("handler") if handler_name: From 97d83eccaeabafad56b3ae4be0412df0aa91e58f Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Wed, 22 Apr 2026 19:24:49 +0200 Subject: [PATCH 20/27] fix site nomenclature --- src/python/TaskWorker/Actions/PreJob.py | 6 +++--- src/python/TaskWorker/Actions/RetryJob.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 43c28fb0d8..f3b67bb1bd 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -335,9 +335,9 @@ def alter_submit(self, crab_retry): maxjobruntime = new_runtime ## get resubmission counter - if inkey == "0": + if crab_retry == 0: resubmit_counter = 0 - self.logger.info("inkey is 0. resubmit_counter is set to 0.") + self.logger.info("crab_retry is 0. resubmit_counter is set to 0.") else: lastExitCode = self.resubmit_info[inkey].get('exitCode') self.logger.info(f"Last Exit Code was {lastExitCode}") @@ -466,7 +466,7 @@ def redoSites(self, crab_retry, use_resubmit_info): # ExitCode Dependent discard of previous_site retry_data = self.resubmit_info.get(inkey, {}) - previous_site = retry_data.get("previous_site") + previous_site = retry_data.get("site") if retry_data.get("change_site") and previous_site: self.logger.info(f"Last Exit Code indicated that a change in site might help. inkey was {inkey}") if previous_site in availableSet and len(availableSet) > 1: diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index d2a192bb36..abf864bd81 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -158,7 +158,7 @@ def store_retry_actions(self, policy, exitCode): retry_info[key]["change_site"] = policy.get("change_site", False) retry_info[key]["memory_factor"] = policy.get("memory_factor", 1.0) retry_info[key]["runtime_factor"] = policy.get("runtime_factor", 1.0) - retry_info[key]["previous_site"] = getattr(self, "site", None) + retry_info[key]["site"] = getattr(self, "site", None) with open(retry_info_file + ".tmp", "w", encoding="utf-8") as fd: fd.write(str(retry_info)) From 6cc2f4ff30b1cfba2480a9a4bae4a541d631a90c Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Wed, 22 Apr 2026 19:47:37 +0200 Subject: [PATCH 21/27] final commit --- src/python/TaskWorker/Actions/DagmanCreator.py | 1 - src/python/TaskWorker/Actions/DagmanResubmitter.py | 5 ----- src/python/TaskWorker/Actions/DagmanSubmitter.py | 1 - src/python/TaskWorker/Actions/RetryJob.py | 2 +- 4 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index a774735d37..8ca326c2f3 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -346,7 +346,6 @@ def makeJobSubmit(self): jobSubmit['My.CRAB_Publish'] = "1" if task['tm_publication'] == 'T' else "0" jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) - jobSubmit['My.CRAB_ResubmitCounter'] = "0" # note about Lists # in the JDL everything is a string, we can't use the simple classAd[name]=somelist diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index cf9c8187f3..abaeecc97c 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -96,11 +96,6 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument else: newAdValue = str(task['resubmit_'+taskparam]) schedd.edit(rootConst, adparam, newAdValue) - dagAds = schedd.query(rootConst, ['CRAB_ResubmitCounter']) - current = 0 - if dagAds and isinstance(dagAds[0].get('CRAB_ResubmitCounter'), int): - current = dagAds[0]['CRAB_ResubmitCounter'] - schedd.edit(rootConst, "CRAB_ResubmitCounter", str(current + 1)) # finally restart the dagman with the 3 lines below schedd.act(htcondor.JobAction.Hold, rootConst) schedd.edit(rootConst, "HoldKillSig", 'SIGUSR1') diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index faff27d5b0..3bcb0b0d5c 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -61,7 +61,6 @@ def addJobSubmitInfoToDagJobJDL(dagJdl, jobSubmit): 'My.MaxWallTimeMinsRun', 'My.MaxWallTimeMinsProbe', 'My.MaxWallTimeMinsTail', - 'My.CRAB_ResubmitCounter', # these are used in PostJob only 'My.CRAB_UserRole', 'My.CRAB_UserGroup', diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index abf864bd81..591974491a 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -40,7 +40,7 @@ 151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, 8028: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, - 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "handler": "handle_file_open_or_root_error", "increase_runtime": True, "runtime_factor": 1.3, "increase_memory": True, "memory_factor": 1.3}, + 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError.", "handler": "handle_file_open_or_root_error"}, 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file.", "handler": "handle_file_open_or_root_error"}, 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, From 2434ef3c4bc40794b74ff3e16a476a45713f7011 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Fri, 24 Apr 2026 13:23:06 +0200 Subject: [PATCH 22/27] address comments --- src/python/TaskWorker/Actions/PreJob.py | 30 +++++++++------- src/python/TaskWorker/Actions/RetryJob.py | 42 ++++++++++++++++++++--- 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index f3b67bb1bd..a09a25ca5a 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -184,19 +184,23 @@ def get_resubmit_info(self): def get_resubmit_counter(self, exit_code, crab_retry): """ - Calculate the resubmit counter based on the exit code's max_retries - and the current crab_retry value. - - Formula: resubmit_counter = crab_retry // (max_retries + 1) - - Examples: - max_retries=2: [0,2] -> 0, [3,5] -> 1, [6,8] -> 2, ... - max_retries=9: [0,9] -> 0, [10,19] -> 1, [20,29] -> 2, ... + Read the resubmit_counter persisted by RetryJob.store_retry_actions(). + RetryJob is the source of truth: it tracks retries-consumed-per-exit-code + and resets the count when the exit code changes, so the counter correctly + reflects only retries under the current exit code and is not contaminated + by previous exit codes with different max_retries values. + Falls back to 0 if no prior RetryJob data exists. """ - policy = EXIT_RETRY_POLICY.get(exit_code, EXIT_RETRY_POLICY["default"]) - max_retries = policy["max_retries"] - resubmit_counter = crab_retry // (max_retries + 1) - self.logger.info(f"Resubmit Counter was calculated to be {resubmit_counter}") + # inkey resolution mirrors the existing pattern in alter_submit + inkey = "0" if crab_retry == 0 else str(crab_retry - 1) + while inkey not in self.resubmit_info and int(inkey) > 0: + inkey = str(int(inkey) - 1) + + resubmit_counter = self.resubmit_info.get(inkey, {}).get("resubmit_counter", 0) + self.logger.info( + f"Read resubmit_counter={resubmit_counter} from resubmit_info[{inkey}] " + f"(crab_retry={crab_retry}, exit_code={exit_code})" + ) return resubmit_counter def save_resubmit_info(self): @@ -284,7 +288,7 @@ def alter_submit(self, crab_retry): maxmemory = None numcores = None priority = None - inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) + inkey = "0" if crab_retry == 0 else str(crab_retry - 1) while inkey not in self.resubmit_info and int(inkey) > 0: inkey = str(int(inkey) - 1) if not use_resubmit_info: # means that we resubmit with new params from crab resubmit diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index 591974491a..1909c02752 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -143,13 +143,41 @@ def store_retry_actions(self, policy, exitCode): try: with open(retry_info_file, "r", encoding="utf-8") as fd: retry_info = literal_eval(fd.read()) - except Exception: + except FileNotFoundError: retry_info = {} key = str(self.crab_retry) if key not in retry_info: retry_info[key] = {} + # --- compute retries_consumed_for_ec --- + # Find the most recent previous entry (walk backwards if needed) + prev_consumed = 0 + prev_ec = None + if self.crab_retry > 0: + prev_key = str(self.crab_retry - 1) + while prev_key not in retry_info and int(prev_key) > 0: + prev_key = str(int(prev_key) - 1) + prev_entry = retry_info.get(prev_key, {}) + prev_ec = prev_entry.get("exitCode") + prev_consumed = prev_entry.get("retries_consumed_for_ec", 0) + + if prev_ec is not None and prev_ec != exitCode: + # Exit code changed: start a fresh count for this new code + retries_consumed_for_ec = 0 + self.logger.info( + f"Exit code changed from {prev_ec} to {exitCode} at retry {self.crab_retry}. " + f"Resetting retries_consumed_for_ec to 0." + ) + else: + retries_consumed_for_ec = prev_consumed + 1 + + # Compute resubmit_counter: how many full cycles of (max_retries+1) have elapsed + # for THIS exit code. + max_retries = policy["max_retries"] + resubmit_counter = retries_consumed_for_ec // (max_retries + 1) + # --- end --- + delay = policy.get("delay", 900) retry_info[key]["retry_delay_until"] = time.time() + delay retry_info[key]["exitCode"] = exitCode @@ -159,6 +187,8 @@ def store_retry_actions(self, policy, exitCode): retry_info[key]["memory_factor"] = policy.get("memory_factor", 1.0) retry_info[key]["runtime_factor"] = policy.get("runtime_factor", 1.0) retry_info[key]["site"] = getattr(self, "site", None) + retry_info[key]["retries_consumed_for_ec"] = retries_consumed_for_ec + retry_info[key]["resubmit_counter"] = resubmit_counter with open(retry_info_file + ".tmp", "w", encoding="utf-8") as fd: fd.write(str(retry_info)) @@ -174,7 +204,7 @@ def calculate_effective_max_retries(self, policy): try: with open(retry_info_file, "r", encoding="utf-8") as fd: retry_info = literal_eval(fd.read()) - except Exception: + except FileNotFoundError: retry_info = {} key = str(self.crab_retry) @@ -182,18 +212,20 @@ def calculate_effective_max_retries(self, policy): retry_info[key] = {} entry = retry_info.get(key, {}) + # resubmit_counter was written by store_retry_actions() just before this call resubmit_counter = entry.get("resubmit_counter", 0) + retries_consumed = entry.get("retries_consumed_for_ec", 0) base_max = policy["max_retries"] effective_max_retries = (base_max + 1)*(resubmit_counter + 1) - 1 - self.logger.info(f"Resubmit counter = {resubmit_counter}, effective max retries = {effective_max_retries}") + self.logger.info(f"EC retries consumed: {retries_consumed}, resubmit_counter: {resubmit_counter}, effective_max_retries: {effective_max_retries}") return effective_max_retries def handle_file_open_or_root_error(self, exitCode): """ Handle exit codes related to file open/read/root failures (8020, 8021, 8022, 8028, 84, 85, 86, 92). - Checks for corrupted input files; if found, creates a fake FJR with code 8022 - and allows a retry. Otherwise raises RecoverableError with the policy message. + Checks for corrupted input files; if found, creates a fake FJR with code 8022. + Raises RecoverableError with the policy message. """ try: corruptedInputFile = self.check_corrupted_file(exitCode) From e90b1bf738c5c7edae9afc3bc98c8df5274fc017 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Fri, 24 Apr 2026 13:34:27 +0200 Subject: [PATCH 23/27] no need to import exit retry policy anymore --- src/python/TaskWorker/Actions/PreJob.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index a09a25ca5a..b70050d51f 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -14,7 +14,7 @@ from ast import literal_eval from ServerUtilities import getWebdirForDb, insertJobIdSid, pythonListToClassAdExprTree, MAX_MEMORY_AUTOMATIC_RESUBMIT, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT -from TaskWorker.Actions.RetryJob import JOB_RETURN_CODES, EXIT_RETRY_POLICY +from TaskWorker.Actions.RetryJob import JOB_RETURN_CODES import htcondor2 as htcondor import classad2 as classad From 72ca8cc4936f209b9358c63e042c064a9f4d6631 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 28 Apr 2026 15:03:55 +0200 Subject: [PATCH 24/27] try fixed max_retries --- src/python/TaskWorker/Actions/PreJob.py | 39 ++++----- src/python/TaskWorker/Actions/RetryJob.py | 97 ++++++++--------------- 2 files changed, 50 insertions(+), 86 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index b70050d51f..48757aea5b 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -38,6 +38,7 @@ def __init__(self): self.resubmit_info = {} self.prejob_exit_code = None self.logger = logging.getLogger() + self.max_retries = 9 def calculate_crab_retry(self): @@ -182,27 +183,6 @@ def get_resubmit_info(self): with open(file_name, 'r', encoding='utf-8') as fd: self.resubmit_info = literal_eval(fd.read()) - def get_resubmit_counter(self, exit_code, crab_retry): - """ - Read the resubmit_counter persisted by RetryJob.store_retry_actions(). - RetryJob is the source of truth: it tracks retries-consumed-per-exit-code - and resets the count when the exit code changes, so the counter correctly - reflects only retries under the current exit code and is not contaminated - by previous exit codes with different max_retries values. - Falls back to 0 if no prior RetryJob data exists. - """ - # inkey resolution mirrors the existing pattern in alter_submit - inkey = "0" if crab_retry == 0 else str(crab_retry - 1) - while inkey not in self.resubmit_info and int(inkey) > 0: - inkey = str(int(inkey) - 1) - - resubmit_counter = self.resubmit_info.get(inkey, {}).get("resubmit_counter", 0) - self.logger.info( - f"Read resubmit_counter={resubmit_counter} from resubmit_info[{inkey}] " - f"(crab_retry={crab_retry}, exit_code={exit_code})" - ) - return resubmit_counter - def save_resubmit_info(self): """ Need a doc string here. @@ -270,6 +250,7 @@ def alter_submit(self, crab_retry): ## TaskWorker than modify the dagman job classAds ! CRAB_ResubmitList_in_taskad = 'CRAB_ResubmitList' in self.task_ad use_resubmit_info = False + increase_resubmission_counter = False resubmit_jobids = [] if 'CRAB_ResubmitList' in self.task_ad: # SB this map is most likley useless, keep it in "Works/Don't Touch" spirit @@ -278,6 +259,10 @@ def alter_submit(self, crab_retry): resubmit_jobids = set(resubmit_jobids) if resubmit_jobids and self.job_id not in resubmit_jobids: use_resubmit_info = True + base_max = self.max_retries + if self.job_id in resubmit_jobids and self.crab_retry % (base_max + 1) == base_max: + increase_resubmission_counter = True + self.logger.info(f"increase resubmission counter was set to True as {self.job_id} was in {resubmit_jobids}") except TypeError: resubmit_jobids = True ## If there is no resubmit_info, we can of course not use it. @@ -288,6 +273,7 @@ def alter_submit(self, crab_retry): maxmemory = None numcores = None priority = None + # read information about last retry (if any) inkey = "0" if crab_retry == 0 else str(crab_retry - 1) while inkey not in self.resubmit_info and int(inkey) > 0: inkey = str(int(inkey) - 1) @@ -343,10 +329,15 @@ def alter_submit(self, crab_retry): resubmit_counter = 0 self.logger.info("crab_retry is 0. resubmit_counter is set to 0.") else: - lastExitCode = self.resubmit_info[inkey].get('exitCode') - self.logger.info(f"Last Exit Code was {lastExitCode}") + lastResubmitCounter = self.resubmit_info[inkey].get('resubmit_counter') + self.logger.info(f"Last Resubmit Counter was {lastResubmitCounter}") try: - resubmit_counter = self.get_resubmit_counter(lastExitCode, crab_retry) + if increase_resubmission_counter: + resubmit_counter = lastResubmitCounter + 1 + self.logger.info(f"New Resubmit Counter was increased from {lastResubmitCounter} to {resubmit_counter}") + else: + resubmit_counter = lastResubmitCounter + self.logger.info(f"Last resubmission counter same as new {resubmit_counter}") except Exception: self.logger.info("Exception occured while trying to get resubmit counter. Setting to 0") resubmit_counter = 0 diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index 1909c02752..69bf93c9ab 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -23,33 +23,33 @@ # ToDo: Reduce Redundance across short and long exit codes EXIT_RETRY_POLICY = { - 1: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to bootstrap CMSSW; likely a worker node issue."}, - 50513: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, - 81: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, - 50115: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, - 195: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, - 137: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "SIGKILL; likely an unrelated batch system kill."}, - 10034: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."}, - 50: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."}, - 10040: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Site Error: failed to generate cmsRun cfg file at runtime."}, - 60403: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, - 243: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, - 60307: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, - 147: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, - 60311: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, - 151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."}, - 8028: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, - 8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, - 8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, - 8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError.", "handler": "handle_file_open_or_root_error"}, - 84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file.", "handler": "handle_file_open_or_root_error"}, - 85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, - 86: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, - 92: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, - 134: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Abort (ANSI) or IOT trap (4.2 BSD) (most likely user application crashed).", "handler": "handle_sigabrt"}, - 8001: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Other CMS Exception.", "handler": "handle_cvmfs_or_cms_exception"}, - 65: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "End of job from user application (CMSSW).", "handler": "handle_cvmfs_or_cms_exception"}, - "default": {"type": "neutral", "max_retries": 2, "delay": 900, "msg": "Taking default exit code retry policy route."} + 1: {"type": "recoverable", "delay": 900, "msg": "Job failed to bootstrap CMSSW; likely a worker node issue."}, + 50513: {"type": "recoverable", "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, + 81: {"type": "recoverable", "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, + 50115: {"type": "recoverable", "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, + 195: {"type": "recoverable", "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, + 137: {"type": "recoverable", "delay": 900, "msg": "SIGKILL; likely an unrelated batch system kill."}, + 10034: {"type": "recoverable", "delay": 900, "msg": "Required application version not found at the site."}, + 50: {"type": "recoverable", "delay": 900, "msg": "Required application version not found at the site."}, + 10040: {"type": "recoverable", "delay": 900, "msg": "Site Error: failed to generate cmsRun cfg file at runtime."}, + 60403: {"type": "recoverable", "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, + 243: {"type": "recoverable", "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, + 60307: {"type": "recoverable", "delay": 900, "msg": "Error during attempted file stageout."}, + 147: {"type": "recoverable", "delay": 900, "msg": "Error during attempted file stageout."}, + 60311: {"type": "recoverable", "delay": 900, "msg": "Error during attempted file stageout."}, + 151: {"type": "recoverable", "delay": 900, "msg": "Error during attempted file stageout."}, + 8028: {"type": "recoverable", "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 8021: {"type": "recoverable", "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, + 8020: {"type": "recoverable", "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, + 8022: {"type": "recoverable", "delay": 900, "msg": "FatalRootError.", "handler": "handle_file_open_or_root_error"}, + 84: {"type": "recoverable", "delay": 900, "msg": "Some required file not found; check logs for name of missing file.", "handler": "handle_file_open_or_root_error"}, + 85: {"type": "recoverable", "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 86: {"type": "recoverable", "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 92: {"type": "recoverable", "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 134: {"type": "recoverable", "delay": 900, "msg": "Abort (ANSI) or IOT trap (4.2 BSD) (most likely user application crashed).", "handler": "handle_sigabrt"}, + 8001: {"type": "recoverable", "delay": 900, "msg": "Other CMS Exception.", "handler": "handle_cvmfs_or_cms_exception"}, + 65: {"type": "recoverable", "delay": 900, "msg": "End of job from user application (CMSSW).", "handler": "handle_cvmfs_or_cms_exception"}, + "default": {"type": "neutral", "delay": 900, "msg": "Taking default exit code retry policy route."} } # strings in fatal root exception text which indicate code problem, not corrupted file @@ -97,6 +97,7 @@ def __init__(self): self.report = {} self.validreport = True self.integrated_job_time = 0 + self.max_retries = 9 self.MAX_DISK_SPACE = MAX_DISK_SPACE self.MAX_WALLTIME = MAX_WALLTIME @@ -150,33 +151,8 @@ def store_retry_actions(self, policy, exitCode): if key not in retry_info: retry_info[key] = {} - # --- compute retries_consumed_for_ec --- - # Find the most recent previous entry (walk backwards if needed) - prev_consumed = 0 - prev_ec = None - if self.crab_retry > 0: - prev_key = str(self.crab_retry - 1) - while prev_key not in retry_info and int(prev_key) > 0: - prev_key = str(int(prev_key) - 1) - prev_entry = retry_info.get(prev_key, {}) - prev_ec = prev_entry.get("exitCode") - prev_consumed = prev_entry.get("retries_consumed_for_ec", 0) - - if prev_ec is not None and prev_ec != exitCode: - # Exit code changed: start a fresh count for this new code - retries_consumed_for_ec = 0 - self.logger.info( - f"Exit code changed from {prev_ec} to {exitCode} at retry {self.crab_retry}. " - f"Resetting retries_consumed_for_ec to 0." - ) - else: - retries_consumed_for_ec = prev_consumed + 1 - - # Compute resubmit_counter: how many full cycles of (max_retries+1) have elapsed - # for THIS exit code. - max_retries = policy["max_retries"] - resubmit_counter = retries_consumed_for_ec // (max_retries + 1) - # --- end --- + # max_retries = self.max_retries + # resubmit_counter = self.crab_retry // (max_retries + 1) delay = policy.get("delay", 900) retry_info[key]["retry_delay_until"] = time.time() + delay @@ -187,8 +163,7 @@ def store_retry_actions(self, policy, exitCode): retry_info[key]["memory_factor"] = policy.get("memory_factor", 1.0) retry_info[key]["runtime_factor"] = policy.get("runtime_factor", 1.0) retry_info[key]["site"] = getattr(self, "site", None) - retry_info[key]["retries_consumed_for_ec"] = retries_consumed_for_ec - retry_info[key]["resubmit_counter"] = resubmit_counter + # retry_info[key]["resubmit_counter"] = resubmit_counter with open(retry_info_file + ".tmp", "w", encoding="utf-8") as fd: fd.write(str(retry_info)) @@ -196,7 +171,7 @@ def store_retry_actions(self, policy, exitCode): # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = - def calculate_effective_max_retries(self, policy): + def calculate_effective_max_retries(self): retry_info_file = f"resubmit_info/job.{self.job_id}.txt" retry_info = {} @@ -212,12 +187,10 @@ def calculate_effective_max_retries(self, policy): retry_info[key] = {} entry = retry_info.get(key, {}) - # resubmit_counter was written by store_retry_actions() just before this call resubmit_counter = entry.get("resubmit_counter", 0) - retries_consumed = entry.get("retries_consumed_for_ec", 0) - base_max = policy["max_retries"] + base_max = self.max_retries effective_max_retries = (base_max + 1)*(resubmit_counter + 1) - 1 - self.logger.info(f"EC retries consumed: {retries_consumed}, resubmit_counter: {resubmit_counter}, effective_max_retries: {effective_max_retries}") + self.logger.info(f"resubmit_counter: {resubmit_counter}, effective_max_retries: {effective_max_retries}") return effective_max_retries @@ -296,7 +269,7 @@ def apply_retry_policy(self, exitCode): self.logger.info(f"Applying retry policy for exit code {exitCode}") self.store_retry_actions(policy, exitCode) if policy["type"] == "recoverable": - effective_max_retries = self.calculate_effective_max_retries(policy) + effective_max_retries = self.calculate_effective_max_retries() if self.crab_retry >= effective_max_retries: raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") # Dispatch to a handler if one is registered for this exit code. From 8f596ddcaadf8699acc7f7b1b1edd467f1791da5 Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Tue, 28 Apr 2026 17:21:56 +0200 Subject: [PATCH 25/27] typo --- src/python/TaskWorker/Actions/PreJob.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 48757aea5b..17750e142a 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -260,7 +260,7 @@ def alter_submit(self, crab_retry): if resubmit_jobids and self.job_id not in resubmit_jobids: use_resubmit_info = True base_max = self.max_retries - if self.job_id in resubmit_jobids and self.crab_retry % (base_max + 1) == base_max: + if self.job_id in resubmit_jobids and crab_retry % (base_max + 1) == base_max: increase_resubmission_counter = True self.logger.info(f"increase resubmission counter was set to True as {self.job_id} was in {resubmit_jobids}") except TypeError: From fff0317f27edc5af827d7346f25dc58ab3f4c5ab Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Wed, 29 Apr 2026 10:53:05 +0200 Subject: [PATCH 26/27] fix formula and add logging --- src/python/TaskWorker/Actions/PreJob.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 17750e142a..7bd2f3ecb1 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -257,14 +257,16 @@ def alter_submit(self, crab_retry): resubmit_jobids = map(str, self.task_ad['CRAB_ResubmitList']) try: resubmit_jobids = set(resubmit_jobids) + self.logger.info(f"resubmit_jobids is {resubmit_jobids} and job id is {self.job_id}") if resubmit_jobids and self.job_id not in resubmit_jobids: use_resubmit_info = True base_max = self.max_retries - if self.job_id in resubmit_jobids and crab_retry % (base_max + 1) == base_max: + if crab_retry % (base_max + 1) == 0 and crab_retry != 0: increase_resubmission_counter = True self.logger.info(f"increase resubmission counter was set to True as {self.job_id} was in {resubmit_jobids}") except TypeError: resubmit_jobids = True + self.logger.info(f"resubmit_jobids is {resubmit_jobids} and job id is {self.job_id}") ## If there is no resubmit_info, we can of course not use it. if not self.resubmit_info: use_resubmit_info = False From 034d93b11c441cf1f48a0a707bcc25ba5a91051c Mon Sep 17 00:00:00 2001 From: Vijay Chakravarty Date: Wed, 29 Apr 2026 10:59:25 +0200 Subject: [PATCH 27/27] test with 2 --- src/python/TaskWorker/Actions/PreJob.py | 2 +- src/python/TaskWorker/Actions/RetryJob.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 7bd2f3ecb1..e5fe509afb 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -38,7 +38,7 @@ def __init__(self): self.resubmit_info = {} self.prejob_exit_code = None self.logger = logging.getLogger() - self.max_retries = 9 + self.max_retries = 2 def calculate_crab_retry(self): diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index 69bf93c9ab..9936ef2813 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -97,7 +97,7 @@ def __init__(self): self.report = {} self.validreport = True self.integrated_job_time = 0 - self.max_retries = 9 + self.max_retries = 2 self.MAX_DISK_SPACE = MAX_DISK_SPACE self.MAX_WALLTIME = MAX_WALLTIME