Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4c94acf
add exit code dependent retry policy
aspiringmind-code Feb 24, 2026
e70893e
adding retry delay condition to needsDefer
aspiringmind-code Feb 24, 2026
595b71b
add exit code dependent ability to change maxmemory, maxjobruntime an…
aspiringmind-code Feb 24, 2026
3614084
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Mar 30, 2026
b76d758
add resubmit_counter and eff max retries
aspiringmind-code Mar 30, 2026
69afe55
use ExprTree
aspiringmind-code Mar 31, 2026
5b92334
remove abort and chnage jobads too
aspiringmind-code Mar 31, 2026
9bd8224
remove jobconst, add to adstoPort
aspiringmind-code Mar 31, 2026
9fca35e
strictly policy dependent
aspiringmind-code Mar 31, 2026
8b3db7e
avoid exprtree
aspiringmind-code Mar 31, 2026
1e980e2
proper use of inkey and test with 8020
aspiringmind-code Apr 1, 2026
f492af6
be free of use_resubmit_info
aspiringmind-code Apr 1, 2026
d308f89
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 10, 2026
12183e9
add handler, use_resubmit_info correct and increase limit
aspiringmind-code Apr 10, 2026
30b6246
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 13, 2026
b288bb8
use upperbounds
aspiringmind-code Apr 13, 2026
ef5ba93
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 17, 2026
1306f2b
address comments
aspiringmind-code Apr 17, 2026
3342bdd
add handler info
aspiringmind-code Apr 20, 2026
b6ebe51
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 20, 2026
7f7fa6a
add handler info
aspiringmind-code Apr 20, 2026
d117922
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 22, 2026
b2f179d
use job specific resubmit_counter
aspiringmind-code Apr 22, 2026
3bdc41d
fix import
aspiringmind-code Apr 22, 2026
359bc85
fix order of store_retry_actions
aspiringmind-code Apr 22, 2026
97d83ec
fix site nomenclature
aspiringmind-code Apr 22, 2026
6cc2f4f
final commit
aspiringmind-code Apr 22, 2026
c76f844
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 22, 2026
d8051c8
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 24, 2026
2434ef3
address comments
aspiringmind-code Apr 24, 2026
e90b1bf
no need to import exit retry policy anymore
aspiringmind-code Apr 24, 2026
f23a770
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code Apr 28, 2026
72ca8cc
try fixed max_retries
aspiringmind-code Apr 28, 2026
8f596dd
typo
aspiringmind-code Apr 28, 2026
fff0317
fix formula and add logging
aspiringmind-code Apr 29, 2026
034d93b
test with 2
aspiringmind-code Apr 29, 2026
ab29d86
Merge branch 'dmwm:master' into improve_resubmit
aspiringmind-code May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/python/ServerUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,15 @@
# Fatal error limits for job resource usage
# Defaults are used if unable to load from .job.ad
# Otherwise it uses these values.
MAX_WALLTIME = 21 * 60 * 60 + 30 * 60
MAX_MEMORY = 2 * 1024
MAX_WALLTIME = 21 * 60 * 60 + 30 * 60 # 21.5 hours in seconds
MAX_MEMORY = 2 * 1024 # 2048 MB
# see https://github.com/dmwm/CRABServer/issues/5995
MAX_MEMORY_PER_CORE = 2500
MAX_MEMORY_SINGLE_CORE = 3000
MAX_MEMORY_SINGLE_CORE_ON_RESUBMIT = 5000
MAX_MEMORY_PER_CORE = 2500 # 2500 MB
MAX_MEMORY_SINGLE_CORE = 3000 # 3000 MB
MAX_MEMORY_SINGLE_CORE_ON_RESUBMIT = 5000 # 5000 MB
MAX_DISK_SPACE = 20000000 # Disk usage is not used from .job.ad as CRAB3 is not seeting it. 20GB is max.

MAX_MEMORY_AUTOMATIC_RESUBMIT = 7500 # 7500 MB
MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT = 47 * 60 # 47 hours in minutes
MAX_IDLE_JOBS = 1000
MAX_POST_JOBS = 10

Expand Down
3 changes: 1 addition & 2 deletions src/python/TaskWorker/Actions/DagmanCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ def makeJobSubmit(self):
jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url'])
jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url'])


# note about Lists
# in the JDL everything is a string, we can't use the simple classAd[name]=somelist
# but need the ExprTree format (what classAd.lookup() would return)
Expand Down Expand Up @@ -885,7 +884,7 @@ def createSubdag(self, splitterResult, **kwargs):

## In the future this parameter may be set by the user in the CRAB configuration
## file and we would take it from the Task DB.
self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 2)
self.task['numautomjobretries'] = getattr(self.config.TaskWorker, 'numAutomJobRetries', 100)

runtime = self.task['tm_split_args'].get('minutes_per_job', -1)

Expand Down
2 changes: 1 addition & 1 deletion src/python/TaskWorker/Actions/DagmanResubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def executeInternal(self, *args, **kwargs): #pylint: disable=unused-argument
# is saving the values of the parameters for each job retry in text files (the
# files are in the directory resubmit_info in the schedd).
# We use classAds here as way to pass informations to PreJobs.
# Value in schedd.exit(const, ad, value) can be string or ExprTree
# Value in schedd.edit(const, ad, value) can be string or ExprTree
# https://htcondor.readthedocs.io/en/latest/apis/python-bindings/api/version2/htcondor2/schedd.html#htcondor2.Schedd.edit
# We need ExprTree when the ad correspond to a python list
for adparam, taskparam in params.items():
Expand Down
106 changes: 94 additions & 12 deletions src/python/TaskWorker/Actions/PreJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import logging
from ast import literal_eval

from ServerUtilities import getWebdirForDb, insertJobIdSid, pythonListToClassAdExprTree
from ServerUtilities import getWebdirForDb, insertJobIdSid, pythonListToClassAdExprTree, MAX_MEMORY_AUTOMATIC_RESUBMIT, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT
from TaskWorker.Actions.RetryJob import JOB_RETURN_CODES

import htcondor2 as htcondor
Expand All @@ -38,6 +38,7 @@ def __init__(self):
self.resubmit_info = {}
self.prejob_exit_code = None
self.logger = logging.getLogger()
self.max_retries = 2


def calculate_crab_retry(self):
Expand Down Expand Up @@ -182,7 +183,6 @@ def get_resubmit_info(self):
with open(file_name, 'r', encoding='utf-8') as fd:
self.resubmit_info = literal_eval(fd.read())


def save_resubmit_info(self):
"""
Need a doc string here.
Expand Down Expand Up @@ -250,16 +250,23 @@ def alter_submit(self, crab_retry):
## TaskWorker than modify the dagman job classAds !
CRAB_ResubmitList_in_taskad = 'CRAB_ResubmitList' in self.task_ad
use_resubmit_info = False
increase_resubmission_counter = False
resubmit_jobids = []
if 'CRAB_ResubmitList' in self.task_ad:
# SB this map is most likley useless, keep it in "Works/Don't Touch" spirit
resubmit_jobids = map(str, self.task_ad['CRAB_ResubmitList'])
try:
resubmit_jobids = set(resubmit_jobids)
self.logger.info(f"resubmit_jobids is {resubmit_jobids} and job id is {self.job_id}")
if resubmit_jobids and self.job_id not in resubmit_jobids:
use_resubmit_info = True
base_max = self.max_retries
if crab_retry % (base_max + 1) == 0 and crab_retry != 0:
increase_resubmission_counter = True
self.logger.info(f"increase resubmission counter was set to True as {self.job_id} was in {resubmit_jobids}")
except TypeError:
resubmit_jobids = True
self.logger.info(f"resubmit_jobids is {resubmit_jobids} and job id is {self.job_id}")
## If there is no resubmit_info, we can of course not use it.
if not self.resubmit_info:
use_resubmit_info = False
Expand All @@ -268,7 +275,11 @@ def alter_submit(self, crab_retry):
maxmemory = None
numcores = None
priority = None
if not use_resubmit_info: # means thad we resubmit with new params from crab resubmit
# read information about last retry (if any)
inkey = "0" if crab_retry == 0 else str(crab_retry - 1)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am always confused by inkey and outkey, which predates your and mine putting hands in this. Please add a comment like
# read information about last retry (if any)
hopefully combination of that with the comment about outkey in lines 350-351 will help next time I read

while inkey not in self.resubmit_info and int(inkey) > 0:
inkey = str(int(inkey) - 1)
if not use_resubmit_info: # means that we resubmit with new params from crab resubmit
#if 'MaxWallTimeMins_RAW' in self.task_ad:
# if self.task_ad['MaxWallTimeMins_RAW'] != 1315:
# maxjobruntime = self.task_ad.lookup('MaxWallTimeMins_RAW')
Expand All @@ -291,13 +302,48 @@ def alter_submit(self, crab_retry):
priority = 20 #the maximum for splitting jobs
else: # means we resubmit with same params as previous try
## SB most likely much (all) of this string/int conversions can be simplified
inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1)
while inkey not in self.resubmit_info and int(inkey) > 0:
inkey = str(int(inkey) - 1)
self.logger.info(f"use_resubmit_info is {use_resubmit_info} and inkey is {inkey}")
maxjobruntime = self.resubmit_info[inkey].get('maxjobruntime')
maxmemory = self.resubmit_info[inkey].get('maxmemory')
numcores = self.resubmit_info[inkey].get('numcores')
priority = self.resubmit_info[inkey].get('priority')

#ExitCode Dependent automatic change in resubmission parameters

if self.resubmit_info:
retry_data = self.resubmit_info.get(inkey, {})
if retry_data.get("increase_memory") and maxmemory:
factor = retry_data.get("memory_factor", 1.2)
new_memory = int(maxmemory * factor)
new_memory = min(new_memory, MAX_MEMORY_AUTOMATIC_RESUBMIT)
self.logger.info(f"Increasing memory from {maxmemory} to {new_memory}")
maxmemory = new_memory

if retry_data.get("increase_runtime") and maxjobruntime:
factor = retry_data.get("runtime_factor", 1.2)
new_runtime = int(maxjobruntime * factor)
new_runtime = min(new_runtime, MAX_JOB_RUNTIME_AUTOMATIC_RESUBMIT)
self.logger.info(f"Increasing walltime from {maxjobruntime} to {new_runtime}")
maxjobruntime = new_runtime

## get resubmission counter
if crab_retry == 0:
resubmit_counter = 0
self.logger.info("crab_retry is 0. resubmit_counter is set to 0.")
else:
lastResubmitCounter = self.resubmit_info[inkey].get('resubmit_counter')
self.logger.info(f"Last Resubmit Counter was {lastResubmitCounter}")
try:
if increase_resubmission_counter:
resubmit_counter = lastResubmitCounter + 1
self.logger.info(f"New Resubmit Counter was increased from {lastResubmitCounter} to {resubmit_counter}")
else:
resubmit_counter = lastResubmitCounter
self.logger.info(f"Last resubmission counter same as new {resubmit_counter}")
except Exception:
self.logger.info("Exception occured while trying to get resubmit counter. Setting to 0")
resubmit_counter = 0

## Save the (new) values of the resubmission parameters in self.resubmit_info
## for the current job retry number.
outkey = str(crab_retry)
Expand All @@ -309,7 +355,7 @@ def alter_submit(self, crab_retry):
self.resubmit_info[outkey]['priority'] = priority
self.resubmit_info[outkey]['use_resubmit_info'] = use_resubmit_info
self.resubmit_info[outkey]['CRAB_ResubmitList_in_taskad'] = CRAB_ResubmitList_in_taskad

self.resubmit_info[outkey]["resubmit_counter"] = resubmit_counter
## Add the resubmission parameters to the Job.<job_id>.submit content.
##
if self.stage == "probe":
Expand Down Expand Up @@ -373,6 +419,9 @@ def redoSites(self, crab_retry, use_resubmit_info):
siteWhiteList = []
siteBlackSet = set()
siteWhiteSet = set()
inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1)
while inkey not in self.resubmit_info and int(inkey) > 0:
inkey = str(int(inkey) - 1)
if not use_resubmit_info:
if 'CRAB_SiteBlacklist' in self.task_ad:
if self.task_ad['CRAB_SiteBlacklist']: # skip ad=''
Expand All @@ -383,9 +432,6 @@ def redoSites(self, crab_retry, use_resubmit_info):
siteWhiteList = self.task_ad['CRAB_SiteWhitelist']
siteWhiteSet = set(siteWhiteList)
else:
inkey = str(crab_retry) if crab_retry == 0 else str(crab_retry - 1)
while inkey not in self.resubmit_info and int(inkey) > 0:
inkey = str(int(inkey) - 1)
siteBlackSet = set(self.resubmit_info[inkey].get('site_blacklist', []))
siteWhiteSet = set(self.resubmit_info[inkey].get('site_whitelist', []))
## Save the current site black- and whitelists in self.resubmit_info for the
Expand Down Expand Up @@ -414,6 +460,18 @@ def redoSites(self, crab_retry, use_resubmit_info):
self.logger.error("Can not submit since DESIRED_Sites list is empty")
self.prejob_exit_code = 1
sys.exit(self.prejob_exit_code)

# ExitCode Dependent discard of previous_site
Comment thread
aspiringmind-code marked this conversation as resolved.
retry_data = self.resubmit_info.get(inkey, {})
previous_site = retry_data.get("site")
if retry_data.get("change_site") and previous_site:
self.logger.info(f"Last Exit Code indicated that a change in site might help. inkey was {inkey}")
if previous_site in availableSet and len(availableSet) > 1:
self.logger.info(f"Removing previous site {previous_site} from candidate sites")
availableSet.discard(previous_site)
else:
self.logger.info(f"No need to discard last site. inkey was {inkey}")

## Make sure that attributest which will be used in MatchMaking are SORTED lists
available = list(availableSet)
available.sort()
Expand Down Expand Up @@ -456,19 +514,43 @@ def needsDefer(self):
slow release of jobs in a task.
The function return True if CRAB_JobReleaseTimeout is defined and not 0, and if the submit
time of the task plus the defer time is greater than the current time.
Additionally retry policy delay
"""
deferTime = int(self.task_ad.get("CRAB_JobReleaseTimeout", 0))

currentTime = time.time()

# Check retry delay from resubmit_info
# Inside SPOOL_DIR we have resubmit_info folder with text files job.{self.job_id}.txt for each job id
# Each file is a dictionary of the format example {0:{'maxmemory':2000, ...}, 1:{}, 2:{}, ...}
# where 0,1,2,... are crab_retry numbers. A lot of vital info like site_whitelist, retry_delay_until,
# resubmit_counter, use_resubmit_info, etc is stored and read from this file for each retry across resubmissions.
# ToDo: Convert this file to json format
retry_info_file = f"resubmit_info/job.{self.job_id}.txt"
if os.path.exists(retry_info_file):
try:
with open(retry_info_file, "r", encoding="utf-8") as fd:
retry_info = literal_eval(fd.read())
Comment thread
belforte marked this conversation as resolved.
key = str(self.dag_retry)
if key in retry_info:
retry_delay_until = retry_info[key].get("retry_delay_until")
if retry_delay_until and currentTime < retry_delay_until:
wait = int(retry_delay_until - currentTime)
self.logger.info(f"Retry delay not elapsed yet. Deferring for {wait} seconds.")
return True
except Exception:
self.logger.exception("Error checking retry delay in resubmit_info")

if deferTime:
self.logger.info('Release timeout specified in extraJDL:')
totalDefer = deferTime * int(self.job_id)
submitTime = int(self.task_ad["CRAB_TaskSubmitTime"])
currentTime = time.time()
if currentTime < (submitTime + totalDefer):
msg = f" Defer time of this job ({totalDefer} seconds since initial task submission)"
msg += f" not elapsed yet, deferring for {totalDefer} seconds"
self.logger.info(msg)
return True
self.logger.info(' Continuing normally since current time is greater than requested starttime of the job')
self.logger.info('Continuing normally since current time is greater than requested starttime of the job')
return False

def execute(self, *args):
Expand Down
Loading