Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions joshua/joshua_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,16 +581,39 @@ def _run_ensemble(
# output = output.decode('utf-8')

break
except subprocess.TimeoutExpired:
except subprocess.TimeoutExpired as timeout_ex:
# TimeoutExpired exception contains partial output
# captured before timeout. We must preserve this!
if self._cancelled():
log("<cancelled>")
retcode = -1
output = b""
# Preserve any stdout captured before cancellation
output = timeout_ex.stdout if timeout_ex.stdout else b""
break
if timeout_time and time.time() > timeout_time:
log("<timed out>")
process.kill()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here, but should we first try to terminate() the process (sends SIGTERM) and then send the kill() (sends SIGKILL) if it doesn't terminate after a fixed amount of time? That should give the process some additional time to write the logs before shutting down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. Missed this @johscheuer . Making a new PR to implement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Get any remaining output after kill
remaining_out, remaining_err = process.communicate()
# Combine partial output from timeout exception + remaining after kill
output = (timeout_ex.stdout or b"") + (remaining_out or b"")
log(f"Captured {len(output)} bytes on timeout (timeout_partial={len(timeout_ex.stdout or b'')}, post_kill={len(remaining_out or b'')})")

# If we got NO output at all (hung before test started), generate fallback XML
if len(output) == 0 or output.strip() == b"":
log("No output captured - test hung before producing any results. Generating fallback XML.")
fallback_xml = (
f'<Test TestFile="UNKNOWN" RandomSeed="UNKNOWN" BuggifyEnabled="UNKNOWN" '
f'FaultInjectionEnabled="UNKNOWN" JoshuaSeed="{seed}" Ok="0" '
f'CrashReason="HungBeforeExecution">'
f'<JoshuaMessage Severity="40" Message="Test timed out without producing any output. '
f'Container may have hung during tarball unpacking or before test command execution. '
f'Seed={seed}"/></Test>'
).encode('utf-8')
output = fallback_xml
log(f"Generated fallback XML ({len(output)} bytes)")

retcode = -2
output = b""
break
getFileHandle().write(".")

Expand Down
33 changes: 28 additions & 5 deletions k8s/agent-scaler/agent-scaler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ namespace=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace)
# Its 'joshua-rhel9-agent' for rockylinux9 instances.
export AGENT_NAME=${AGENT_NAME:-"joshua-agent"}

# Scaling configuration for large test suites
# For very large queues (>1000 ensembles), scale more aggressively
# For smaller queues, use normal scaling to be fair to other users

# if AGENT_TAG is not set through --build-arg,
# use the default agent image and tag
export AGENT_TAG=${AGENT_TAG:-"foundationdb/joshua-agent:latest"}
Expand Down Expand Up @@ -120,14 +124,30 @@ while true; do
fi

# 3. The number of new jobs cannot exceed the per-cycle limit, if one is set.
# Simple scaling: 1000 jobs for very large queues, 100 for smaller queues
if [ -n "${MAX_NEW_JOBS}" ]; then
if [ "${num_to_attempt}" -gt "${MAX_NEW_JOBS}" ]; then
num_to_attempt=${MAX_NEW_JOBS}
if [ "${num_ensembles}" -gt 1000 ]; then
# For very large queues (>1000), allow up to 1000 jobs per cycle
# This helps clear large test suites quickly while being fair
large_queue_limit=1000
if [ "${num_to_attempt}" -gt "${large_queue_limit}" ]; then
num_to_attempt=${large_queue_limit}
fi
else
# For smaller queues, use normal MAX_NEW_JOBS to be fair to other users
if [ "${num_to_attempt}" -gt "${MAX_NEW_JOBS}" ]; then
num_to_attempt=${MAX_NEW_JOBS}
fi
fi
fi

if [ "${num_to_attempt}" -gt 0 ]; then
new_jobs=${num_to_attempt}
if [ "${num_ensembles}" -gt 1000 ]; then
echo "Large queue detected: ${num_ensembles} ensembles, targeting ${new_jobs} new jobs (large queue mode)"
else
echo "Normal scaling: ${num_ensembles} ensembles, targeting ${new_jobs} new jobs"
fi
fi

idx=0
Expand All @@ -151,16 +171,19 @@ while true; do
fi
done
# /tmp/joshua-agent.yaml contains up to $batch_size entries
echo "Starting a batch of ${i} jobs"
echo "Starting ${i} jobs"
kubectl apply -f /tmp/joshua-agent.yaml -n "${namespace}"
done
fi
fi
# Standardized log message based on new_jobs calculated for this iteration for this agent type
echo "${new_jobs} jobs of type ${AGENT_NAME} were targeted for starting in this iteration."

# check every check_delay seconds
sleep "${check_delay}"
# Use consistent check delay for all queue sizes
adaptive_delay=${check_delay}

# check every adaptive_delay seconds
sleep "${adaptive_delay}"
done
exit 0

4 changes: 3 additions & 1 deletion k8s/agent-scaler/agent-scaler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ spec:
containers:
- env:
- name: MAX_JOBS
value: "5"
value: "100"
- name: MAX_NEW_JOBS
value: "100"
- name: CHECK_DELAY
value: "10"
image: foundationdb/agent-scaler:latest
Expand Down