Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions joshua/joshua_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,12 @@ def trim_jobqueue(cutoff_date, remove_jobs=True):


def log(outputText, newline=True):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"[{timestamp}] {outputText}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to use the logging package? That would add the timestamp. Probably more refactoring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be better but was thinking minimal change. Should I go for it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that in another PR if we think it's worth to refactor :)

return (
print(outputText, file=getFileHandle())
print(message, file=getFileHandle())
if newline
else getFileHandle().write(outputText)
else getFileHandle().write(message)
)


Expand Down Expand Up @@ -811,6 +813,12 @@ def agent(

start = time.time() # Used later to limit time agent runs.
idle_start = start # Used to determine idle duration
agent_timeout_approaching = False # Flag to indicate graceful shutdown mode

if agent_timeout:
log(f"Joshua agent started with timeout: {agent_timeout} seconds")
else:
log("Joshua agent started with no timeout")

try:
# Run all of the sanity tests first, and if any of them fail, exit.
Expand Down Expand Up @@ -948,13 +956,26 @@ def agent(
agent_idle_timeout is not None
and now - idle_start >= agent_idle_timeout
):
log("Agent timed out")
log(f"Agent timed out after {now - start:.1f} seconds (no active ensembles)")
break
else:
continue

assert ensembles_can_run

# Check if agent timeout is approaching - if so, finish gracefully
now = time.time()
if agent_timeout is not None and (now - start) >= agent_timeout:
if not agent_timeout_approaching:
agent_timeout_approaching = True
log(f"Agent timeout reached after {now - start:.1f} seconds - will exit gracefully after current job completes")
break

# Warn when timeout is approaching (5 minutes before)
if agent_timeout is not None and not agent_timeout_approaching and (now - start) >= (agent_timeout - 300):
log(f"Agent timeout approaching in {agent_timeout - (now - start):.0f} seconds")
agent_timeout_approaching = True

# Pick an ensemble to run. Weight by amount of time spent on each one.

# print('{} Picking from {} ensembles'.format(threading.current_thread().name, len(ensembles)))
Expand All @@ -971,12 +992,15 @@ def agent(
chosen_ensemble = ensemble
break
assert chosen_ensemble is not None

log(f"Starting job from ensemble: {chosen_ensemble}")
retcode = run_ensemble(
chosen_ensemble,
save_on,
work_dir=work_dir,
timeout_command_timeout=timeout_command_timeout,
)
log(f"Completed job from ensemble: {chosen_ensemble}, result: {retcode}")
# Exit agent gracefully via stopfile on probable zombie process
if retcode == -1 or retcode == -2:
if stop_file is None:
Expand Down Expand Up @@ -1098,13 +1122,20 @@ def reap_children():
)
arguments = parser.parse_args()

if arguments.apoptosis is not None:
# Check for AGENT_TIMEOUT environment variable first, then command line argument
agent_timeout_env = os.environ.get("AGENT_TIMEOUT", None)
if agent_timeout_env is not None:
agent_timeout = int(agent_timeout_env)
log(f"Using AGENT_TIMEOUT from environment: {agent_timeout} seconds")
elif arguments.apoptosis is not None:
# Timeout is equal to the given argument with a random fuzz up to 50% of the argument.
# This is added to avoid having 500+ Mesos boxes suddenly crying out in terror and
# being suddenly silenced.
agent_timeout = int(arguments.apoptosis * (1 + 0.5 * random.random()))
log(f"Using --apoptosis timeout: {agent_timeout} seconds")
else:
agent_timeout = None
log("No agent timeout configured - agent will run indefinitely")

joshua_model.open(arguments.cluster_file, arguments.dir_path)
agent_init(arguments.work_dir)
Expand Down
11 changes: 10 additions & 1 deletion joshua/joshua_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,17 @@ def should_run_ensemble(tr: fdb.Transaction, ensemble_id: str) -> bool:
"""
props = _get_ensemble_properties(tr, ensemble_id)
started = props.get("started", 0)
passed = props.get("pass", 0)
failed = props.get("fail", 0)
completed = passed + failed
max_runs = props.get("max_runs", 0)

# max_runs == 0 means run forever
if max_runs > 0 and completed >= max_runs:
# Ensemble has reached its completion target, no more work needed
return False

# Check if we're approaching the limit to avoid overshooting
if max_runs > 0 and started >= max_runs:
current_time = time.time()
max_seed = None
Expand Down Expand Up @@ -713,7 +722,7 @@ def should_run_ensemble(tr: fdb.Transaction, ensemble_id: str) -> bool:
return True
return False
else:
# max_runs == 0 or started < max_runs
# max_runs == 0 or started < max_runs and completed < max_runs
return True


Expand Down
10 changes: 5 additions & 5 deletions k8s/agent-scaler/agent-scaler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ while true; do
# cleanup finished jobs (status 1/1)
# Filter by AGENT_NAME and check 3rd column for "1/1" (completions)
for job in $(kubectl get jobs -n "${namespace}" --no-headers | { grep -E -e "^${AGENT_NAME}-[0-9]+(-[0-9]+)?\\s" || true; } | awk '$3 == "1/1" {print $1}'); do
echo "=== Job $job Completed (1/1) - deleting from get jobs === (AGENT_NAME: ${AGENT_NAME})"
echo "$(date -Iseconds) === Job $job Completed (1/1) - deleting from get jobs === (AGENT_NAME: ${AGENT_NAME})"
kubectl delete job "$job" -n "${namespace}"
done

Expand All @@ -68,7 +68,7 @@ while true; do
if [ -n "$job_prefix_from_pod" ]; then
# Validate that the derived job_prefix_from_pod actually matches the expected format for this agent's jobs
if [[ "${job_prefix_from_pod}" =~ ^${AGENT_NAME}-[0-9]+(-[0-9]+)?$ ]]; then
echo "=== Deleting Job based on pod status: $job_prefix_from_pod === (AGENT_NAME: ${AGENT_NAME})"
echo "$(date -Iseconds) === Deleting Job based on pod status: $job_prefix_from_pod === (AGENT_NAME: ${AGENT_NAME})"
kubectl delete job "$job_prefix_from_pod" -n "${namespace}" --ignore-not-found=true
else
# This case can occur if AGENT_NAME is unusual (e.g., 'foo-bar' and a pod 'foo-bar-baz-TIMESTAMP-...' exists)
Expand All @@ -89,12 +89,12 @@ while true; do
else
num_ensembles=$(python3 /tools/ensemble_count.py -C "${FDB_CLUSTER_FILE}")
fi
echo "${num_ensembles} ensembles in the queue (global)"
echo "$(date -Iseconds) ${num_ensembles} ensembles in the queue (global)"

# Calculate the number of all active Joshua jobs (any type) -- 'jobs' are effectively pod instances (a pod instance usually does batch_size joshua runs and then completes).
# Active jobs are those with .status.active > 0 (i.e., pods are running/pending but not yet succeeded/failed overall for the job)
num_all_active_joshua_jobs=$(kubectl get jobs -n "${namespace}" -o 'jsonpath={range .items[?(@.status.active > 0)]}{.metadata.name}{"\n"}{end}' 2>/dev/null | grep -Ec '^joshua-(rhel9-)?agent-[0-9]+(-[0-9]+)?$')
echo "${num_all_active_joshua_jobs} total active joshua jobs of any type that are running. Global max_jobs: ${max_jobs}."
echo "$(date -Iseconds) ${num_all_active_joshua_jobs} total active joshua jobs of any type that are running. Global max_jobs: ${max_jobs}."

new_jobs=0 # Initialize jobs to start this cycle for this scaler

Expand Down Expand Up @@ -177,7 +177,7 @@ while true; do
fi
fi
# Standardized log message based on new_jobs calculated for this iteration for this agent type
echo "${new_jobs} jobs of type ${AGENT_NAME} were targeted for starting in this iteration."
echo "$(date -Iseconds) ${new_jobs} jobs of type ${AGENT_NAME} were targeted for starting in this iteration."

# Use consistent check delay for all queue sizes
adaptive_delay=${check_delay}
Expand Down