Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4c94acf
add exit code dependent retry policy
aspiringmind-code Feb 24, 2026
e70893e
adding retry delay condition to needsDefer
aspiringmind-code Feb 24, 2026
595b71b
add exit code dependent ability to change maxmemory, maxjobruntime an…
aspiringmind-code Feb 24, 2026
3614084
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Mar 30, 2026
b76d758
add resubmit_counter and eff max retries
aspiringmind-code Mar 30, 2026
69afe55
use ExprTree
aspiringmind-code Mar 31, 2026
5b92334
remove abort and chnage jobads too
aspiringmind-code Mar 31, 2026
9bd8224
remove jobconst, add to adstoPort
aspiringmind-code Mar 31, 2026
9fca35e
strictly policy dependent
aspiringmind-code Mar 31, 2026
8b3db7e
avoid exprtree
aspiringmind-code Mar 31, 2026
1e980e2
proper use of inkey and test with 8020
aspiringmind-code Apr 1, 2026
f492af6
be free of use_resubmit_info
aspiringmind-code Apr 1, 2026
d308f89
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 10, 2026
12183e9
add handler, use_resubmit_info correct and increase limit
aspiringmind-code Apr 10, 2026
30b6246
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 13, 2026
b288bb8
use upperbounds
aspiringmind-code Apr 13, 2026
ef5ba93
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 17, 2026
1306f2b
address comments
aspiringmind-code Apr 17, 2026
3342bdd
add handler info
aspiringmind-code Apr 20, 2026
b6ebe51
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 20, 2026
7f7fa6a
add handler info
aspiringmind-code Apr 20, 2026
d117922
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 22, 2026
b2f179d
use job specific resubmit_counter
aspiringmind-code Apr 22, 2026
3bdc41d
fix import
aspiringmind-code Apr 22, 2026
359bc85
fix order of store_retry_actions
aspiringmind-code Apr 22, 2026
97d83ec
fix site nomenclature
aspiringmind-code Apr 22, 2026
6cc2f4f
final commit
aspiringmind-code Apr 22, 2026
c76f844
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 22, 2026
d8051c8
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 24, 2026
2434ef3
address comments
aspiringmind-code Apr 24, 2026
e90b1bf
no need to import exit retry policy anymore
aspiringmind-code Apr 24, 2026
f23a770
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 28, 2026
72ca8cc
try fixed max_retries
aspiringmind-code Apr 28, 2026
8f596dd
typo
aspiringmind-code Apr 28, 2026
fff0317
fix formula and add logging
aspiringmind-code Apr 29, 2026
034d93b
test with 2
aspiringmind-code Apr 29, 2026
ab29d86
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/python/TaskWorker/Actions/DagmanCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ def createSubdag(self, splitterResult, **kwargs):

## In the future this parameter may be set by the user in the CRAB configuration
## file and we would take it from the Task DB.
self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 2)
self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 9)

runtime = self.task['tm_split_args'].get('minutes_per_job', -1)

Expand Down
23 changes: 21 additions & 2 deletions src/python/TaskWorker/Actions/PreJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,19 +456,38 @@ def needsDefer(self):
slow release of jobs in a task.
The function return True if CRAB_JobReleaseTimeout is defined and not 0, and if the submit
time of the task plus the defer time is greater than the current time.
Additionally retry policy delay
"""
deferTime = int(self.task_ad.get("CRAB_JobReleaseTimeout", 0))

currentTime = time.time()

# Check retry delay from resubmit_info
retry_info_file = f"resubmit_info/job.{self.job_id}.txt"
if os.path.exists(retry_info_file):
try:
with open(retry_info_file, "r", encoding="utf-8") as fd:
retry_info = literal_eval(fd.read())
Comment thread
belforte marked this conversation as resolved.
key = str(self.dag_retry)
if key in retry_info:
retry_delay_until = retry_info[key].get("retry_delay_until")
if retry_delay_until and currentTime < retry_delay_until:
wait = int(retry_delay_until - currentTime)
self.logger.info(f"Retry delay not elapsed yet. Deferring for {wait} seconds.")
return True
except Exception:
self.logger.exception("Error checking retry delay in resubmit_info")

if deferTime:
self.logger.info('Release timeout specified in extraJDL:')
totalDefer = deferTime * int(self.job_id)
submitTime = int(self.task_ad["CRAB_TaskSubmitTime"])
currentTime = time.time()
if currentTime < (submitTime + totalDefer):
msg = f" Defer time of this job ({totalDefer} seconds since initial task submission)"
msg += f" not elapsed yet, deferring for {totalDefer} seconds"
self.logger.info(msg)
return True
self.logger.info(' Continuing normally since current time is greater than requested starttime of the job')
self.logger.info('Continuing normally since current time is greater than requested starttime of the job')
return False

def execute(self, *args):
Expand Down
116 changes: 82 additions & 34 deletions src/python/TaskWorker/Actions/RetryJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import shutil
import subprocess
import socket
import time
from collections import namedtuple

from ServerUtilities import executeCommand, getLock
Expand All @@ -15,6 +16,40 @@

JOB_RETURN_CODES = namedtuple('JobReturnCodes', 'OK RECOVERABLE_ERROR FATAL_ERROR')(0, 1, 2)

# ----------------------------------------------------------------------
# Exit-code dependent retry policy
# ----------------------------------------------------------------------

EXIT_RETRY_POLICY = {
Comment thread
belforte marked this conversation as resolved.
1: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to bootstrap CMSSW; likely a worker node issue."},
50513: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."},
81: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not find functioning CMSSW on worker node."},
50115: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job did not produce a FJR; will retry."},
195: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job did not produce a FJR; will retry."},
137: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "SIGKILL; likely an unrelated batch system kill."},
10034: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."},
50: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Required application version not found at the site."},
10040: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Site Error: failed to generate cmsRun cfg file at runtime."},
60403: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout."},
243: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Timeout during attempted file stageout."},
60307: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."},
147: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."},
60311: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."},
151: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Error during attempted file stageout."},
8028: {"type": "recoverable", "max_retries": 9, "delay": 900, "msg": "Job failed to open local and fallback files."},
8021: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileReadError (May be a site error)."},
8020: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FileOpenError (Likely a site error)."},
8022: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "FatalRootError."},
84: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Some required file not found; check logs for name of missing file."},
85: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."},
86: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."},
92: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Job failed to open local and fallback files."},
134: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Abort (ANSI) or IOT trap (4.2 BSD) (most likely user application crashed)."},
8001: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "Other CMS Exception."},
65: {"type": "recoverable", "max_retries": 2, "delay": 900, "msg": "End of job from user application (CMSSW)."},
"default": {"type": "neutral", "max_retries": 2, "delay": 900, "msg": "Taking default exit code retry policy route."}
}

# strings in fatal root exception text which indicate code problem, not corrupted file
# a small "knowledge data base"
NOT_FILE_RELATED_FATAL_ROOT_ERRORS = [
Expand Down Expand Up @@ -98,6 +133,51 @@ def get_job_ad_from_condor_q(self):

# = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def store_retry_delay(self, delay):
retry_info_file = f"resubmit_info/job.{self.job_id}.txt"
retry_info = {}
if os.path.exists(retry_info_file):
try:
with open(retry_info_file, "r", encoding="utf-8") as fd:
retry_info = eval(fd.read())
except Exception:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except FileNotFoundError maybe ? which also pleases pylint :-)
same in other places where you want to say that it is OK that this file is not there yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

retry_info = {}

key = str(self.crab_retry)
if key not in retry_info:
retry_info[key] = {}

retry_info[key]["retry_delay_until"] = time.time() + delay

with open(retry_info_file + ".tmp", "w", encoding="utf-8") as fd:
Comment thread
belforte marked this conversation as resolved.
fd.write(str(retry_info))
os.rename(retry_info_file + ".tmp", retry_info_file)

# = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =


def apply_retry_policy(self, exitCode):
"""
Enforce exit-code dependent retry limits and delay.
Raises FatalError if retry limit exceeded.
"""
policy = EXIT_RETRY_POLICY.get(exitCode, EXIT_RETRY_POLICY["default"])

if policy["type"] == "recoverable":
if self.crab_retry >= policy["max_retries"]:
raise FatalError(f"Retry limit reached for exit {exitCode}: {policy['msg']}")
delay = policy.get("delay", 900)
self.logger.info(f"Sleeping {delay} seconds before retry (exit code {exitCode})")
self.store_retry_delay(delay)
if exitCode in [8020, 8021, 8022, 8028, 84, 85, 86, 92, 134, 8001, 65]:
return
raise RecoverableError(policy["msg"])

if policy["type"] == "neutral":
return

# = = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def get_job_ad_from_file(self):
"""
Need a doc string here
Expand Down Expand Up @@ -307,6 +387,8 @@ def check_exit_code(self):
return 1
try:
exitCode = int(self.report['exitCode'])
# Apply retry policy
self.apply_retry_policy(exitCode)
except ValueError:
msg = "Unable to extract job's wrapper exit code from job report."
self.logger.warning(msg)
Expand All @@ -319,7 +401,6 @@ def check_exit_code(self):
return 0

msg = "Job or stageout wrapper finished with exit code %d." % (exitCode)
msg += " Trying to determine the meaning of the exit code and if it is a recoverable or fatal error."
self.logger.info(msg)

# Wrapper script sometimes returns the posix return code (8 bits).
Expand All @@ -335,17 +416,6 @@ def check_exit_code(self):
self.create_fake_fjr(exitMsg, 8022, 8022, fatalError=False) # retry the job
raise RecoverableError("Job failed to open local and fallback files.")

if exitCode == 1:
raise RecoverableError("Job failed to bootstrap CMSSW; likely a worker node issue.")

if exitCode == 50513 or exitCode == 81:
raise RecoverableError("Job did not find functioning CMSSW on worker node.")

# This is a difficult one -- right now CMSRunAnalysis.py will turn things like
# segfaults into an invalid FJR. Will revisit this decision later.
if exitCode == 50115 or exitCode == 195:
raise RecoverableError("Job did not produce a FJR; will retry.")

if exitCode == 134:
recoverable_signal = False
try:
Expand Down Expand Up @@ -379,28 +449,6 @@ def check_exit_code(self):
if cvmfs_issue:
raise RecoverableError("CVMFS issue detected.")

# Another difficult case -- so far, SIGKILL has mostly been observed at T2_CH_CERN, and it has nothing to do
# with an issue of the job itself. Typically, this isn't the user code's fault
# it was often a site or pilot misconfiguration that led to the pilot exhausting its allocated runtime.
# We should revisit this issue if we see SIGKILL happening for other cases that are the users' fault.
if exitCode == 137:
raise RecoverableError("SIGKILL; likely an unrelated batch system kill.")

if exitCode == 10034 or exitCode == 50:
raise RecoverableError("Required application version not found at the site.")

if exitCode == 10040:
raise RecoverableError("Site Error: failed to generate cmsRun cfg file at runtime.")

if exitCode == 60403 or exitCode == 243:
raise RecoverableError("Timeout during attempted file stageout.")

if exitCode == 60307 or exitCode == 147:
raise RecoverableError("Error during attempted file stageout.")

if exitCode == 60311 or exitCode == 151:
raise RecoverableError("Error during attempted file stageout.")

if exitCode:
raise FatalError("Job wrapper finished with exit code %d.\nExit message:\n %s" % (exitCode, exitMsg.replace('\n', '\n ')))

Expand Down