Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions joshua/joshua_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,13 @@ def trim_jobqueue(cutoff_date, remove_jobs=True):


def log(outputText, newline=True):
import datetime
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"[{timestamp}] {outputText}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to use the logging package? That would add the timestamp. Probably more refactoring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be better but was thinking minimal change. Should I go for it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that in another PR if we think it's worth to refactor :)

return (
print(outputText, file=getFileHandle())
print(message, file=getFileHandle())
if newline
else getFileHandle().write(outputText)
else getFileHandle().write(message)
)


Expand Down Expand Up @@ -811,6 +814,12 @@ def agent(

start = time.time() # Used later to limit time agent runs.
idle_start = start # Used to determine idle duration
agent_timeout_approaching = False # Flag to indicate graceful shutdown mode

if agent_timeout:
log(f"Joshua agent started with timeout: {agent_timeout} seconds")
else:
log("Joshua agent started with no timeout")

try:
# Run all of the sanity tests first, and if any of them fail, exit.
Expand Down Expand Up @@ -948,13 +957,26 @@ def agent(
agent_idle_timeout is not None
and now - idle_start >= agent_idle_timeout
):
log("Agent timed out")
log(f"Agent timed out after {now - start:.1f} seconds (no active ensembles)")
break
else:
continue

assert ensembles_can_run

# Check if agent timeout is approaching - if so, finish gracefully
now = time.time()
if agent_timeout is not None and (now - start) >= agent_timeout:
if not agent_timeout_approaching:
agent_timeout_approaching = True
log(f"Agent timeout reached after {now - start:.1f} seconds - will exit gracefully after current job completes")
break

# Warn when timeout is approaching (5 minutes before)
if agent_timeout is not None and not agent_timeout_approaching and (now - start) >= (agent_timeout - 300):
log(f"Agent timeout approaching in {agent_timeout - (now - start):.0f} seconds")
agent_timeout_approaching = True

# Pick an ensemble to run. Weight by amount of time spent on each one.

# print('{} Picking from {} ensembles'.format(threading.current_thread().name, len(ensembles)))
Expand All @@ -971,12 +993,15 @@ def agent(
chosen_ensemble = ensemble
break
assert chosen_ensemble is not None

log(f"Starting job from ensemble: {chosen_ensemble}")
retcode = run_ensemble(
chosen_ensemble,
save_on,
work_dir=work_dir,
timeout_command_timeout=timeout_command_timeout,
)
log(f"Completed job from ensemble: {chosen_ensemble}, result: {retcode}")
# Exit agent gracefully via stopfile on probable zombie process
if retcode == -1 or retcode == -2:
if stop_file is None:
Expand Down Expand Up @@ -1098,13 +1123,20 @@ def reap_children():
)
arguments = parser.parse_args()

if arguments.apoptosis is not None:
# Check for AGENT_TIMEOUT environment variable first, then command line argument
agent_timeout_env = os.environ.get("AGENT_TIMEOUT", None)
if agent_timeout_env is not None:
agent_timeout = int(agent_timeout_env)
log(f"Using AGENT_TIMEOUT from environment: {agent_timeout} seconds")
elif arguments.apoptosis is not None:
# Timeout is equal to the given argument with a random fuzz up to 50% of the argument.
# This is added to avoid having 500+ Mesos boxes suddenly crying out in terror and
# being suddenly silenced.
agent_timeout = int(arguments.apoptosis * (1 + 0.5 * random.random()))
log(f"Using --apoptosis timeout: {agent_timeout} seconds")
else:
agent_timeout = None
log("No agent timeout configured - agent will run indefinitely")

joshua_model.open(arguments.cluster_file, arguments.dir_path)
agent_init(arguments.work_dir)
Expand Down
11 changes: 10 additions & 1 deletion joshua/joshua_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,17 @@ def should_run_ensemble(tr: fdb.Transaction, ensemble_id: str) -> bool:
"""
props = _get_ensemble_properties(tr, ensemble_id)
started = props.get("started", 0)
passed = props.get("pass", 0)
failed = props.get("fail", 0)
completed = passed + failed
max_runs = props.get("max_runs", 0)

# max_runs == 0 means run forever
if max_runs > 0 and completed >= max_runs:
# Ensemble has reached its completion target, no more work needed
return False

# Check if we're approaching the limit to avoid overshooting
if max_runs > 0 and started >= max_runs:
current_time = time.time()
max_seed = None
Expand Down Expand Up @@ -713,7 +722,7 @@ def should_run_ensemble(tr: fdb.Transaction, ensemble_id: str) -> bool:
return True
return False
else:
# max_runs == 0 or started < max_runs
# max_runs == 0 or started < max_runs and completed < max_runs
return True


Expand Down
Loading