diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 9aa00ef192..3e5f773ef7 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -61,14 +61,15 @@ # Fatal error limits for job resource usage # Defaults are used if unable to load from .job.ad # Otherwise it uses these values. -MAX_WALLTIME = 21 * 60 * 60 + 30 * 60 -MAX_MEMORY = 2 * 1024 +MAX_WALLTIME = 21 * 60 * 60 + 30 * 60 # 21.5 hours in seconds +MAX_MEMORY = 2 * 1024 # 2048 MB # see https://github.com/dmwm/CRABServer/issues/5995 -MAX_MEMORY_PER_CORE = 2500 -MAX_MEMORY_SINGLE_CORE = 3000 -MAX_MEMORY_SINGLE_CORE_ON_RESUBMIT = 5000 +MAX_MEMORY_PER_CORE = 2500 # 2500 MB +MAX_MEMORY_SINGLE_CORE = 3000 # 3000 MB +MAX_MEMORY_SINGLE_CORE_ON_RESUBMIT = 5000 # 5000 MB MAX_DISK_SPACE = 20000000 # Disk usage is not used from .job.ad as CRAB3 is not seeting it. 20GB is max. - +MAX_MEMORY_AUTOMATIC_RESUBMIT = 7500 # 7500 MB +MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT = 47 * 60 # 47 hours in minutes MAX_IDLE_JOBS = 1000 MAX_POST_JOBS = 10 diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 6da53b038c..8ca326c2f3 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -347,7 +347,6 @@ def makeJobSubmit(self): jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) - # note about Lists # in the JDL everything is a string, we can't use the simple classAd[name]=somelist # but need the ExprTree format (what classAd.lookup() would return) @@ -885,7 +884,7 @@ def createSubdag(self, splitterResult, **kwargs): ## In the future this parameter may be set by the user in the CRAB configuration ## file and we would take it from the Task DB. - self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 2) + self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 100) runtime = self.task['tm_split_args'].get('minutes_per_job', -1) diff --git a/src/python/TaskWorker/Actions/DagmanResubmitter.py b/src/python/TaskWorker/Actions/DagmanResubmitter.py index 0a4e815e0f..abaeecc97c 100644 --- a/src/python/TaskWorker/Actions/DagmanResubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanResubmitter.py @@ -86,7 +86,7 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument # is saving the values of the parameters for each job retry in text files (the # files are in the directory resubmit_info in the schedd). # We use classAds here as way to pass informations to PreJobs. - # Value in schedd.exit(const, ad, value) can be string or ExprTree + # Value in schedd.edit(const, ad, value) can be string or ExprTree # https://htcondor.readthedocs.io/en/latest/apis/python-bindings/api/version2/htcondor2/schedd.html#htcondor2.Schedd.edit # We need ExprTree when the ad correspond to a python list for adparam, taskparam in params.items(): diff --git a/src/python/TaskWorker/Actions/PreJob.py b/src/python/TaskWorker/Actions/PreJob.py index 837b78f6d9..e5fe509afb 100644 --- a/src/python/TaskWorker/Actions/PreJob.py +++ b/src/python/TaskWorker/Actions/PreJob.py @@ -13,7 +13,7 @@ import logging from ast import literal_eval -from ServerUtilities import getWebdirForDb, insertJobIdSid, pythonListToClassAdExprTree +from ServerUtilities import getWebdirForDb, insertJobIdSid, pythonListToClassAdExprTree, MAX_MEMORY_AUTOMATIC_RESUBMIT, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT from TaskWorker.Actions.RetryJob import JOB_RETURN_CODES import htcondor2 as htcondor @@ -38,6 +38,7 @@ def __init__(self): self.resubmit_info = {} self.prejob_exit_code = None self.logger = logging.getLogger() + self.max_retries = 2 def calculate_crab_retry(self): @@ -182,7 +183,6 @@ def get_resubmit_info(self): with open(file_name, 'r', encoding='utf-8') as fd: self.resubmit_info = literal_eval(fd.read()) - def save_resubmit_info(self): """ Need a doc string here. @@ -250,16 +250,23 @@ def alter_submit(self, crab_retry): ## TaskWorker than modify the dagman job classAds ! CRAB_ResubmitList_in_taskad = 'CRAB_ResubmitList' in self.task_ad use_resubmit_info = False + increase_resubmission_counter = False resubmit_jobids = [] if 'CRAB_ResubmitList' in self.task_ad: # SB this map is most likley useless, keep it in "Works/Don't Touch" spirit resubmit_jobids = map(str, self.task_ad['CRAB_ResubmitList']) try: resubmit_jobids = set(resubmit_jobids) + self.logger.info(f"resubmit_jobids is {resubmit_jobids} and job id is {self.job_id}") if resubmit_jobids and self.job_id not in resubmit_jobids: use_resubmit_info = True + base_max = self.max_retries + if crab_retry % (base_max + 1) == 0 and crab_retry != 0: + increase_resubmission_counter = True + self.logger.info(f"increase resubmission counter was set to True as {self.job_id} was in {resubmit_jobids}") except TypeError: resubmit_jobids = True + self.logger.info(f"resubmit_jobids is {resubmit_jobids} and job id is {self.job_id}") ## If there is no resubmit_info, we can of course not use it. if not self.resubmit_info: use_resubmit_info = False @@ -268,7 +275,11 @@ def alter_submit(self, crab_retry): maxmemory = None numcores = None priority = None - if not use_resubmit_info: # means thad we resubmit with new params from crab resubmit + # read information about last retry (if any) + inkey = "0" if crab_retry == 0 else str(crab_retry - 1) + while inkey not in self.resubmit_info and int(inkey) > 0: + inkey = str(int(inkey) - 1) + if not use_resubmit_info: # means that we resubmit with new params from crab resubmit #if 'MaxWallTimeMins_RAW' in self.task_ad: # if self.task_ad['MaxWallTimeMins_RAW'] != 1315: # maxjobruntime = self.task_ad.lookup('MaxWallTimeMins_RAW') @@ -291,13 +302,48 @@ def alter_submit(self, crab_retry): priority = 20 #the maximum for splitting jobs else: # means we resubmit with same params as previous try ## SB most likely much (all) of this string/int conversions can be simplified - inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) - while inkey not in self.resubmit_info and int(inkey) > 0: - inkey = str(int(inkey) - 1) + self.logger.info(f"use_resubmit_info is {use_resubmit_info} and inkey is {inkey}") maxjobruntime = self.resubmit_info[inkey].get('maxjobruntime') maxmemory = self.resubmit_info[inkey].get('maxmemory') numcores = self.resubmit_info[inkey].get('numcores') priority = self.resubmit_info[inkey].get('priority') + + #ExitCode Dependent automatic change in resubmission parameters + + if self.resubmit_info: + retry_data = self.resubmit_info.get(inkey, {}) + if retry_data.get("increase_memory") and maxmemory: + factor = retry_data.get("memory_factor", 1.2) + new_memory = int(maxmemory * factor) + new_memory = min(new_memory, MAX_MEMORY_AUTOMATIC_RESUBMIT) + self.logger.info(f"Increasing memory from {maxmemory} to {new_memory}") + maxmemory = new_memory + + if retry_data.get("increase_runtime") and maxjobruntime: + factor = retry_data.get("runtime_factor", 1.2) + new_runtime = int(maxjobruntime * factor) + new_runtime = min(new_runtime, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT) + self.logger.info(f"Increasing walltime from {maxjobruntime} to {new_runtime}") + maxjobruntime = new_runtime + + ## get resubmission counter + if crab_retry == 0: + resubmit_counter = 0 + self.logger.info("crab_retry is 0. resubmit_counter is set to 0.") + else: + lastResubmitCounter = self.resubmit_info[inkey].get('resubmit_counter') + self.logger.info(f"Last Resubmit Counter was {lastResubmitCounter}") + try: + if increase_resubmission_counter: + resubmit_counter = lastResubmitCounter + 1 + self.logger.info(f"New Resubmit Counter was increased from {lastResubmitCounter} to {resubmit_counter}") + else: + resubmit_counter = lastResubmitCounter + self.logger.info(f"Last resubmission counter same as new {resubmit_counter}") + except Exception: + self.logger.info("Exception occured while trying to get resubmit counter. Setting to 0") + resubmit_counter = 0 + ## Save the (new) values of the resubmission parameters in self.resubmit_info ## for the current job retry number. outkey = str(crab_retry) @@ -309,7 +355,7 @@ def alter_submit(self, crab_retry): self.resubmit_info[outkey]['priority'] = priority self.resubmit_info[outkey]['use_resubmit_info'] = use_resubmit_info self.resubmit_info[outkey]['CRAB_ResubmitList_in_taskad'] = CRAB_ResubmitList_in_taskad - + self.resubmit_info[outkey]["resubmit_counter"] = resubmit_counter ## Add the resubmission parameters to the Job..submit content. ## if self.stage == "probe": @@ -373,6 +419,9 @@ def redoSites(self, crab_retry, use_resubmit_info): siteWhiteList = [] siteBlackSet = set() siteWhiteSet = set() + inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) + while inkey not in self.resubmit_info and int(inkey) > 0: + inkey = str(int(inkey) - 1) if not use_resubmit_info: if 'CRAB_SiteBlacklist' in self.task_ad: if self.task_ad['CRAB_SiteBlacklist']: # skip ad='' @@ -383,9 +432,6 @@ def redoSites(self, crab_retry, use_resubmit_info): siteWhiteList = self.task_ad['CRAB_SiteWhitelist'] siteWhiteSet = set(siteWhiteList) else: - inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1) - while inkey not in self.resubmit_info and int(inkey) > 0: - inkey = str(int(inkey) - 1) siteBlackSet = set(self.resubmit_info[inkey].get('site_blacklist', [])) siteWhiteSet = set(self.resubmit_info[inkey].get('site_whitelist', [])) ## Save the current site black- and whitelists in self.resubmit_info for the @@ -414,6 +460,18 @@ def redoSites(self, crab_retry, use_resubmit_info): self.logger.error("Can not submit since DESIRED_Sites list is empty") self.prejob_exit_code = 1 sys.exit(self.prejob_exit_code) + + # ExitCode Dependent discard of previous_site + retry_data = self.resubmit_info.get(inkey, {}) + previous_site = retry_data.get("site") + if retry_data.get("change_site") and previous_site: + self.logger.info(f"Last Exit Code indicated that a change in site might help. inkey was {inkey}") + if previous_site in availableSet and len(availableSet) > 1: + self.logger.info(f"Removing previous site {previous_site} from candidate sites") + availableSet.discard(previous_site) + else: + self.logger.info(f"No need to discard last site. inkey was {inkey}") + ## Make sure that attributest which will be used in MatchMaking are SORTED lists available = list(availableSet) available.sort() @@ -456,19 +514,43 @@ def needsDefer(self): slow release of jobs in a task. The function return True if CRAB_JobReleaseTimeout is defined and not 0, and if the submit time of the task plus the defer time is greater than the current time. + Additionally retry policy delay """ deferTime = int(self.task_ad.get("CRAB_JobReleaseTimeout", 0)) + + currentTime = time.time() + + # Check retry delay from resubmit_info + # Inside SPOOL_DIR we have resubmit_info folder with text files job.{self.job_id}.txt for each job id + # Each file is a dictionary of the format example {0:{'maxmemory':2000, ...}, 1:{}, 2:{}, ...} + # where 0,1,2,... are crab_retry numbers. A lot of vital info like site_whitelist, retry_delay_until, + # resubmit_counter, use_resubmit_info, etc is stored and read from this file for each retry across resubmissions. + # ToDo: Convert this file to json format + retry_info_file = f"resubmit_info/job.{self.job_id}.txt" + if os.path.exists(retry_info_file): + try: + with open(retry_info_file, "r", encoding="utf-8") as fd: + retry_info = literal_eval(fd.read()) + key = str(self.dag_retry) + if key in retry_info: + retry_delay_until = retry_info[key].get("retry_delay_until") + if retry_delay_until and currentTime < retry_delay_until: + wait = int(retry_delay_until - currentTime) + self.logger.info(f"Retry delay not elapsed yet. Deferring for {wait} seconds.") + return True + except Exception: + self.logger.exception("Error checking retry delay in resubmit_info") + if deferTime: self.logger.info('Release timeout specified in extraJDL:') totalDefer = deferTime * int(self.job_id) submitTime = int(self.task_ad["CRAB_TaskSubmitTime"]) - currentTime = time.time() if currentTime < (submitTime + totalDefer): msg = f" Defer time of this job ({totalDefer} seconds since initial task submission)" msg += f" not elapsed yet, deferring for {totalDefer} seconds" self.logger.info(msg) return True - self.logger.info(' Continuing normally since current time is greater than requested starttime of the job') + self.logger.info('Continuing normally since current time is greater than requested starttime of the job') return False def execute(self, *args): diff --git a/src/python/TaskWorker/Actions/RetryJob.py b/src/python/TaskWorker/Actions/RetryJob.py index b4d11a5c0e..9936ef2813 100644 --- a/src/python/TaskWorker/Actions/RetryJob.py +++ b/src/python/TaskWorker/Actions/RetryJob.py @@ -5,7 +5,9 @@ import shutil import subprocess import socket +import time from collections import namedtuple +from ast import literal_eval from ServerUtilities import executeCommand, getLock from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY @@ -15,6 +17,41 @@ JOB_RETURN_CODES = namedtuple('JobReturnCodes', 'OK RECOVERABLE_ERROR FATAL_ERROR')(0, 1, 2) +# ---------------------------------------------------------------------- +# Exit-code dependent retry policy +# ---------------------------------------------------------------------- +# ToDo: Reduce Redundance across short and long exit codes + +EXIT_RETRY_POLICY = { + 1: {"type": "recoverable", "delay": 900, "msg": "Job failed to bootstrap CMSSW; likely a worker node issue."}, + 50513: {"type": "recoverable", "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, + 81: {"type": "recoverable", "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."}, + 50115: {"type": "recoverable", "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, + 195: {"type": "recoverable", "delay": 900, "msg": "Job did not produce a FJR; will retry.", "increase_memory": True, "memory_factor": 1.3}, + 137: {"type": "recoverable", "delay": 900, "msg": "SIGKILL; likely an unrelated batch system kill."}, + 10034: {"type": "recoverable", "delay": 900, "msg": "Required application version not found at the site."}, + 50: {"type": "recoverable", "delay": 900, "msg": "Required application version not found at the site."}, + 10040: {"type": "recoverable", "delay": 900, "msg": "Site Error: failed to generate cmsRun cfg file at runtime."}, + 60403: {"type": "recoverable", "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, + 243: {"type": "recoverable", "delay": 900, "msg": "Timeout during attempted file stageout.", "increase_runtime": True, "runtime_factor": 1.3}, + 60307: {"type": "recoverable", "delay": 900, "msg": "Error during attempted file stageout."}, + 147: {"type": "recoverable", "delay": 900, "msg": "Error during attempted file stageout."}, + 60311: {"type": "recoverable", "delay": 900, "msg": "Error during attempted file stageout."}, + 151: {"type": "recoverable", "delay": 900, "msg": "Error during attempted file stageout."}, + 8028: {"type": "recoverable", "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 8021: {"type": "recoverable", "delay": 900, "msg": "FileReadError (May be a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, + 8020: {"type": "recoverable", "delay": 900, "msg": "FileOpenError (Likely a site error).", "change_site": True, "handler": "handle_file_open_or_root_error"}, + 8022: {"type": "recoverable", "delay": 900, "msg": "FatalRootError.", "handler": "handle_file_open_or_root_error"}, + 84: {"type": "recoverable", "delay": 900, "msg": "Some required file not found; check logs for name of missing file.", "handler": "handle_file_open_or_root_error"}, + 85: {"type": "recoverable", "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 86: {"type": "recoverable", "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 92: {"type": "recoverable", "delay": 900, "msg": "Job failed to open local and fallback files.", "handler": "handle_file_open_or_root_error"}, + 134: {"type": "recoverable", "delay": 900, "msg": "Abort (ANSI) or IOT trap (4.2 BSD) (most likely user application crashed).", "handler": "handle_sigabrt"}, + 8001: {"type": "recoverable", "delay": 900, "msg": "Other CMS Exception.", "handler": "handle_cvmfs_or_cms_exception"}, + 65: {"type": "recoverable", "delay": 900, "msg": "End of job from user application (CMSSW).", "handler": "handle_cvmfs_or_cms_exception"}, + "default": {"type": "neutral", "delay": 900, "msg": "Taking default exit code retry policy route."} +} + # strings in fatal root exception text which indicate code problem, not corrupted file # a small "knowledge data base" NOT_FILE_RELATED_FATAL_ROOT_ERRORS = [ @@ -60,6 +97,7 @@ def __init__(self): self.report = {} self.validreport = True self.integrated_job_time = 0 + self.max_retries = 2 self.MAX_DISK_SPACE = MAX_DISK_SPACE self.MAX_WALLTIME = MAX_WALLTIME @@ -98,6 +136,157 @@ def get_job_ad_from_condor_q(self): # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def store_retry_actions(self, policy, exitCode): + retry_info_file = f"resubmit_info/job.{self.job_id}.txt" + retry_info = {} + + if os.path.exists(retry_info_file): + try: + with open(retry_info_file, "r", encoding="utf-8") as fd: + retry_info = literal_eval(fd.read()) + except FileNotFoundError: + retry_info = {} + + key = str(self.crab_retry) + if key not in retry_info: + retry_info[key] = {} + + # max_retries = self.max_retries + # resubmit_counter = self.crab_retry // (max_retries + 1) + + delay = policy.get("delay", 900) + retry_info[key]["retry_delay_until"] = time.time() + delay + retry_info[key]["exitCode"] = exitCode + retry_info[key]["increase_memory"] = policy.get("increase_memory", False) + retry_info[key]["increase_runtime"] = policy.get("increase_runtime", False) + retry_info[key]["change_site"] = policy.get("change_site", False) + retry_info[key]["memory_factor"] = policy.get("memory_factor", 1.0) + retry_info[key]["runtime_factor"] = policy.get("runtime_factor", 1.0) + retry_info[key]["site"] = getattr(self, "site", None) + # retry_info[key]["resubmit_counter"] = resubmit_counter + + with open(retry_info_file + ".tmp", "w", encoding="utf-8") as fd: + fd.write(str(retry_info)) + os.rename(retry_info_file + ".tmp", retry_info_file) + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + + def calculate_effective_max_retries(self): + retry_info_file = f"resubmit_info/job.{self.job_id}.txt" + retry_info = {} + + if os.path.exists(retry_info_file): + try: + with open(retry_info_file, "r", encoding="utf-8") as fd: + retry_info = literal_eval(fd.read()) + except FileNotFoundError: + retry_info = {} + + key = str(self.crab_retry) + if key not in retry_info: + retry_info[key] = {} + + entry = retry_info.get(key, {}) + resubmit_counter = entry.get("resubmit_counter", 0) + base_max = self.max_retries + effective_max_retries = (base_max + 1)*(resubmit_counter + 1) - 1 + self.logger.info(f"resubmit_counter: {resubmit_counter}, effective_max_retries: {effective_max_retries}") + + return effective_max_retries + + def handle_file_open_or_root_error(self, exitCode): + """ + Handle exit codes related to file open/read/root failures (8020, 8021, 8022, 8028, 84, 85, 86, 92). + Checks for corrupted input files; if found, creates a fake FJR with code 8022. + Raises RecoverableError with the policy message. + """ + try: + corruptedInputFile = self.check_corrupted_file(exitCode) + except Exception as e: # pylint: disable=broad-except + self.logger.error(f"check_corrupted_file raised an exception:\n{e}\nIgnore and go on") + corruptedInputFile = False + if corruptedInputFile: + exitMsg = "Fatal Root Error: possible corrupted input file. Reporting and retrying." + self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) + self.logger.info(f"Handler function ran successfully for exit Code {exitCode}") + raise RecoverableError(EXIT_RETRY_POLICY[exitCode]["msg"]) + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + + def handle_sigabrt(self, exitCode): + """ + Handle exit code 134 (SIGABRT / IOT trap). + Checks job stdout for a SIGILL pattern; if found, promotes to a recoverable + SIGILL error. Otherwise raises the default RecoverableError. + """ + recoverable_signal = False + try: + fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) + with open(fname, encoding='utf-8') as fd: + for line in fd: + if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"): + recoverable_signal = True + break + except Exception: # pylint: disable=broad-except + msg = "Error analyzing abort signal.\nDetails follow:" + self.logger.exception(msg) + self.logger.info(f"Handler function ran successfully for exit Code {exitCode}") + if recoverable_signal: + raise RecoverableError("SIGILL; may indicate a worker node issue.") + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + + def handle_cvmfs_or_cms_exception(self, exitCode): + """ + Handle exit codes 8001 and 65 (CMS exceptions / end-of-job). + Scans job stdout for a truncated CVMFS file pattern; if found raises a + specific RecoverableError, otherwise raises the default one. + """ + cvmfs_issue = False + try: + fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) + cvmfs_issue_re = re.compile(r"== CMSSW: unable to load /cvmfs/.*file too short") + with open(fname, encoding='utf-8') as fd: + for line in fd: + if cvmfs_issue_re.match(line): + cvmfs_issue = True + break + except Exception: # pylint: disable=broad-except + msg = "Error analyzing output for CVMFS issues.\nDetails follow:" + self.logger.exception(msg) + self.logger.info(f"Handler function ran successfully for exit Code {exitCode}") + if cvmfs_issue: + raise RecoverableError("CVMFS issue detected.") + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + + def apply_retry_policy(self, exitCode): + """ + Enforce exit-code dependent retry limits and delay. + Raises FatalError if retry limit exceeded. + """ + policy = EXIT_RETRY_POLICY.get(exitCode, EXIT_RETRY_POLICY["default"]) + self.logger.info(f"Applying retry policy for exit code {exitCode}") + self.store_retry_actions(policy, exitCode) + if policy["type"] == "recoverable": + effective_max_retries = self.calculate_effective_max_retries() + if self.crab_retry >= effective_max_retries: + raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}") + # Dispatch to a handler if one is registered for this exit code. + handler_name = policy.get("handler") + if handler_name: + handler = getattr(self, handler_name, None) + if handler is None: + self.logger.error(f"Handler '{handler_name}' not found on RetryJob.") + else: + handler(exitCode) + raise RecoverableError(policy["msg"]) + + if policy["type"] == "neutral": + return + + # = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = + def get_job_ad_from_file(self): """ Need a doc string here @@ -307,6 +496,8 @@ def check_exit_code(self): return 1 try: exitCode = int(self.report['exitCode']) + # Apply retry policy + self.apply_retry_policy(exitCode) except ValueError: msg = "Unable to extract job's wrapper exit code from job report." self.logger.warning(msg) @@ -317,92 +508,11 @@ def check_exit_code(self): msg = "Job and stageout wrappers finished successfully (exit code %d)." % (exitCode) self.logger.info(msg) return 0 + else: + raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n '))) msg = "Job or stageout wrapper finished with exit code %d." % (exitCode) - msg += " Trying to determine the meaning of the exit code and if it is a recoverable or fatal error." - self.logger.info(msg) - - # Wrapper script sometimes returns the posix return code (8 bits). - if exitCode in [8020, 8021, 8022, 8028] or exitCode in [84, 85, 86, 92]: - try: # the following is still a bit experimental, make sure it never crashes the PJ - corruptedInputFile = self.check_corrupted_file(exitCode) - except Exception as e: # pylint: disable=broad-except - msg = f"check_corrupted_file raised an exception:\n{e}\nIgnore and go on" - self.logger.error(msg) - corruptedInputFile = False - if corruptedInputFile: - exitMsg = "Fatal Root Error maybe a corrupted input file. This error is being reported" - self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) # retry the job - raise RecoverableError("Job failed to open local and fallback files.") - - if exitCode == 1: - raise RecoverableError("Job failed to bootstrap CMSSW; likely a worker node issue.") - - if exitCode == 50513 or exitCode == 81: - raise RecoverableError("Job did not find functioning CMSSW on worker node.") - - # This is a difficult one -- right now CMSRunAnalysis.py will turn things like - # segfaults into an invalid FJR. Will revisit this decision later. - if exitCode == 50115 or exitCode == 195: - raise RecoverableError("Job did not produce a FJR; will retry.") - - if exitCode == 134: - recoverable_signal = False - try: - fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) - with open(fname, encoding='utf-8') as fd: - for line in fd: - if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"): - recoverable_signal = True - break - except Exception: # pylint: disable=broad-except - msg = "Error analyzing abort signal." - msg += "\nDetails follow:" - self.logger.exception(msg) - if recoverable_signal: - raise RecoverableError("SIGILL; may indicate a worker node issue.") - - if exitCode == 8001 or exitCode == 65: - cvmfs_issue = False - try: - fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry)) - cvmfs_issue_re = re.compile("== CMSSW: unable to load /cvmfs/.*file too short") - with open(fname, encoding='utf-8') as fd: - for line in fd: - if cvmfs_issue_re.match(line): - cvmfs_issue = True - break - except Exception: # pylint: disable=broad-except - msg = "Error analyzing output for CVMFS issues." - msg += "\nDetails follow:" - self.logger.exception(msg) - if cvmfs_issue: - raise RecoverableError("CVMFS issue detected.") - - # Another difficult case -- so far, SIGKILL has mostly been observed at T2_CH_CERN, and it has nothing to do - # with an issue of the job itself. Typically, this isn't the user code's fault - # it was often a site or pilot misconfiguration that led to the pilot exhausting its allocated runtime. - # We should revisit this issue if we see SIGKILL happening for other cases that are the users' fault. - if exitCode == 137: - raise RecoverableError("SIGKILL; likely an unrelated batch system kill.") - - if exitCode == 10034 or exitCode == 50: - raise RecoverableError("Required application version not found at the site.") - - if exitCode == 10040: - raise RecoverableError("Site Error: failed to generate cmsRun cfg file at runtime.") - - if exitCode == 60403 or exitCode == 243: - raise RecoverableError("Timeout during attempted file stageout.") - - if exitCode == 60307 or exitCode == 147: - raise RecoverableError("Error during attempted file stageout.") - - if exitCode == 60311 or exitCode == 151: - raise RecoverableError("Error during attempted file stageout.") - - if exitCode: - raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n '))) + self.logger.info(msg) return 0