Skip to content

Cluster Cleanup Rework #1155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jun 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 122 additions & 93 deletions cron/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@
from lib.configuration_check_error import ConfigurationCheckError, Status

# We currently have this dynamically as it will probably change quite a bit
STATUS_LIST = ['cooldown', 'warmup', 'job_no', 'job_start', 'job_error', 'job_end', 'cleanup_start', 'cleanup_end', 'measurement_control_start', 'measurement_control_end', 'measurement_control_error']
STATUS_LIST = ['cooldown', 'warmup', 'job_no', 'job_start', 'job_error', 'job_end', 'maintenance_start', 'maintenance_end', 'measurement_control_start', 'measurement_control_end', 'measurement_control_error']
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))

def set_status(status_code, cur_temp, cooldown_time_after_job, data=None, run_id=None):
def set_status(status_code, data=None, run_id=None):
if not hasattr(set_status, "last_status"):
set_status.last_status = status_code # static variable
elif set_status.last_status == status_code:
return # no need to update status, if it has not changed since last time
set_status.last_status = status_code

# pylint: disable=redefined-outer-name
config = GlobalConfig().config
config = GlobalConfig().config # pylint: disable=redefined-outer-name
client = config['cluster']['client']

if status_code not in STATUS_LIST:
Expand All @@ -48,14 +47,14 @@ def set_status(status_code, cur_temp, cooldown_time_after_job, data=None, run_id

query = """
UPDATE machines
SET status_code=%s, cooldown_time_after_job=%s, current_temperature=%s, base_temperature=%s, jobs_processing=%s, gmt_hash=%s, gmt_timestamp=%s, configuration=%s
SET status_code=%s, base_temperature=%s, jobs_processing=%s, gmt_hash=%s, gmt_timestamp=%s, configuration=%s
WHERE id = %s
"""

gmt_hash, gmt_timestamp = get_repo_info(CURRENT_DIR)

params = (
status_code, cooldown_time_after_job, cur_temp,
status_code,
config['machine']['base_temperature_value'], client['jobs_processing'],
gmt_hash, gmt_timestamp,
json.dumps({'measurement': config['measurement'], 'machine': config['machine'], 'cluster': config['cluster']}),
Expand All @@ -64,17 +63,94 @@ def set_status(status_code, cur_temp, cooldown_time_after_job, data=None, run_id
)
DB().query(query=query, params=params)

def do_cleanup(cur_temp, cooldown_time_after_job):
set_status('cleanup_start', cur_temp, cooldown_time_after_job)
def do_maintenance():
config = GlobalConfig().config # pylint: disable=redefined-outer-name

result = subprocess.run(['sudo',
os.path.join(os.path.dirname(os.path.abspath(__file__)),'../tools/cluster/cleanup.sh')],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,)
set_status('maintenance_start')

set_status('cleanup_end', cur_temp, cooldown_time_after_job, data=f"stdout: {result.stdout}, stderr: {result.stderr}")
result = subprocess.check_output(['sudo', os.path.join(os.path.dirname(os.path.abspath(__file__)),'../tools/cluster/cleanup.py')], encoding='UTF-8')

set_status('maintenance_end', data=result)

if '<<<< NO PACKAGES UPDATED - NO NEED TO RUN VALIDATION WORKLOAD >>>>' not in result:
DB().query('INSERT INTO changelog (message, machine_id) VALUES (%s, %s)', params=(result, config['machine']['id']))

return True # must run validation workload again. New packages installed

return None

def validate_temperature():
if not hasattr(validate_temperature, "temperature_errors") or not hasattr(validate_temperature, "cooldown_time"):
validate_temperature.temperature_errors = 0 # initialize static variable
validate_temperature.cooldown_time = 0 # initialize static variable

current_temperature = get_temperature(
GlobalConfig().config['machine']['base_temperature_chip'],
GlobalConfig().config['machine']['base_temperature_feature']
)

DB().query('UPDATE machines SET current_temperature=%s WHERE id = %s', params=(current_temperature, ))

if current_temperature > config_main['machine']['base_temperature_value']:
if validate_temperature.temperature_errors >= 10:
raise RuntimeError(f"Temperature could not be stabilized in time. Was {current_temperature} but should be {GlobalConfig().config['machine']['base_temperature_value']}. Pleae check logs ...")

print(f"Machine is still too hot: {current_temperature}°. Sleeping for 1 minute")
set_status('cooldown')
validate_temperature.cooldown_time += 60
validate_temperature.temperature_errors += 1
time.sleep(60)
return False

if current_temperature <= (config_main['machine']['base_temperature_value'] - 10):
if validate_temperature.temperature_errors >= 10:
raise RuntimeError(f"Temperature could not be stabilized in time. Was {current_temperature} but should be {GlobalConfig().config['machine']['base_temperature_value']}. Pleae check logs ...")

print(f"Machine is too cool: {current_temperature}°. Warming up and retrying")
set_status('warmup')
validate_temperature.temperature_errors += 1
current_time = time.time()
while True: # spinlock
if time.time() > (current_time + 10):
break
return False

DB().query('UPDATE machines SET cooldown_time_after_job=%s WHERE id = %s', params=(validate_temperature.cooldown_time, ))

validate_temperature.temperature_errors = 0 # reset
validate_temperature.cooldown_time = 0 # reset

return True

def do_measurement_control():
config = GlobalConfig().config # pylint: disable=redefined-outer-name
cwl = config['cluster']['client']['control_workload']

set_status('measurement_control_start')
validate.run_workload(cwl['name'], cwl['uri'], cwl['filename'], cwl['branch'])
set_status('measurement_control_end')

stddev_data = validate.get_workload_stddev(cwl['uri'], cwl['filename'], cwl['branch'], config['machine']['id'], cwl['comparison_window'], cwl['phase'], cwl['metrics'])
print('get_workload_stddev returned: ', stddev_data)

try:
message = validate.validate_workload_stddev(stddev_data, cwl['metrics'])
if client_main['send_control_workload_status_mail'] and config['admin']['notification_email']:
Job.insert(
'email',
user_id=0, # User 0 is the [GMT-SYSTEM] user
email=config['admin']['notification_email'],
name=f"{config['machine']['description']} is operating normally. All STDDEV fine.",
message='\n'.join(message)
)

except Exception as exception: # pylint: disable=broad-except
validate.handle_validate_exception(exception)
set_status('measurement_control_error')
# the process will now go to sleep for 'time_between_control_workload_validations''
# This is as long as the next validation is needed and thus it will loop
# endlessly in validation until manually handled, which is what we want.
time.sleep(client_main['time_between_control_workload_validations'])

if __name__ == '__main__':
try:
Expand All @@ -94,14 +170,16 @@ def do_cleanup(cur_temp, cooldown_time_after_job):
config_main = GlobalConfig().config

client_main = config_main['cluster']['client']
cwl = client_main['control_workload']
cooldown_time = 0
last_cooldown_time = 0
current_temperature = -1
temperature_errors = 0
last_cleanup = 0
must_revalidate_bc_new_packages = False
last_24h_maintenance = 0

while True:

# run forced maintenance with cleanup every 24 hours
if not args.testing and last_24h_maintenance < (time.time() - 43200): # every 12 hours
must_revalidate_bc_new_packages = do_maintenance()
last_24h_maintenance = time.time()

job = Job.get_job('run')
if job and job.check_job_running():
error_helpers.log_error('Job is still running. This is usually an error case! Continuing for now ...', machine=config_main['machine']['description'])
Expand All @@ -110,80 +188,23 @@ def do_cleanup(cur_temp, cooldown_time_after_job):
continue

if not args.testing:

if last_cleanup < (time.time() - 43200): # every 12 hours
do_cleanup(current_temperature, last_cooldown_time)
last_cleanup = time.time()

current_temperature = get_temperature(
GlobalConfig().config['machine']['base_temperature_chip'],
GlobalConfig().config['machine']['base_temperature_feature']
)

if current_temperature > config_main['machine']['base_temperature_value']:
if temperature_errors >= 10:
raise RuntimeError(f"Temperature could not be stabilized in time. Was {current_temperature} but should be {GlobalConfig().config['machine']['base_temperature_value']}. Pleae check logs ...")

print(f"Machine is still too hot: {current_temperature}°. Sleeping for 1 minute")
set_status('cooldown', current_temperature, last_cooldown_time)
cooldown_time += 60
temperature_errors += 1
if not args.testing:
time.sleep(60)
continue

if current_temperature <= (config_main['machine']['base_temperature_value'] - 10):
if temperature_errors >= 10:
raise RuntimeError(f"Temperature could not be stabilized in time. Was {current_temperature} but should be {GlobalConfig().config['machine']['base_temperature_value']}. Pleae check logs ...")

print(f"Machine is too cool: {current_temperature}°. Warming up and retrying")
set_status('warmup', current_temperature, last_cooldown_time)
temperature_errors += 1
current_time = time.time()
while True: # spinlock
if time.time() > (current_time + 10):
break
continue # still retry loop and make all checks again

print('Machine is temperature is good. Continuing ...')
last_cooldown_time = cooldown_time
cooldown_time = 0
temperature_errors = 0

if not args.testing and validate.is_validation_needed(config_main['machine']['id'], client_main['time_between_control_workload_validations']):
set_status('measurement_control_start', current_temperature, last_cooldown_time)
validate.run_workload(cwl['name'], cwl['uri'], cwl['filename'], cwl['branch'])
set_status('measurement_control_end', current_temperature, last_cooldown_time)

stddev_data = validate.get_workload_stddev(cwl['uri'], cwl['filename'], cwl['branch'], config_main['machine']['id'], cwl['comparison_window'], cwl['phase'], cwl['metrics'])
print('get_workload_stddev returned: ', stddev_data)

try:
message = validate.validate_workload_stddev(stddev_data, cwl['metrics'])
if client_main['send_control_workload_status_mail'] and config_main['admin']['notification_email']:
Job.insert(
'email',
user_id=0, # User 0 is the [GMT-SYSTEM] user
email=config_main['admin']['notification_email'],
name=f"{config_main['machine']['description']} is operating normally. All STDDEV fine.",
message='\n'.join(message)
)
except Exception as exception: # pylint: disable=broad-except
validate.handle_validate_exception(exception)
set_status('measurement_control_error', current_temperature, last_cooldown_time)
# the process will now go to sleep for 'time_between_control_workload_validations''
# This is as long as the next validation is needed and thus it will loop
# endlessly in validation until manually handled, which is what we want.
if not args.testing:
time.sleep(client_main['time_between_control_workload_validations'])

elif job:
set_status('job_start', current_temperature, last_cooldown_time, run_id=job._run_id)
if validate_temperature():
print('Machine is temperature is good. Continuing ...')
else:
continue # retry all checks

if not args.testing and (must_revalidate_bc_new_packages or validate.is_validation_needed(config_main['machine']['id'], client_main['time_between_control_workload_validations'])):
do_measurement_control()
must_revalidate_bc_new_packages = False # reset as measurement control has run. even if failed
continue # re-do temperature checks

if job:
set_status('job_start', run_id=job._run_id)
try:
job.process(docker_prune=True)
set_status('job_end', current_temperature, last_cooldown_time, run_id=job._run_id)
set_status('job_end', run_id=job._run_id)
except ConfigurationCheckError as exc: # ConfigurationChecks indicate that before the job ran, some setup with the machine was incorrect. So we soft-fail here with sleeps
set_status('job_error', current_temperature, last_cooldown_time, data=str(exc), run_id=job._run_id)
set_status('job_error', data=str(exc), run_id=job._run_id)
if exc.status == Status.WARN: # Warnings is something like CPU% too high. Here short sleep
error_helpers.log_error('Job processing in cluster failed (client.py)', exception_context=exc.__context__, last_exception=exc, status=exc.status, run_id=job._run_id, name=job._name, url=job._url, machine=config_main['machine']['description'], sleep_duration=600)
if not args.testing:
Expand All @@ -194,7 +215,7 @@ def do_cleanup(cur_temp, cooldown_time_after_job):
time.sleep(client_main['time_between_control_workload_validations'])

except Exception as exc: # pylint: disable=broad-except
set_status('job_error', current_temperature, last_cooldown_time, data=str(exc), run_id=job._run_id)
set_status('job_error', data=str(exc), run_id=job._run_id)
error_helpers.log_error('Job processing in cluster failed (client.py)',
exception_context=exc.__context__,
last_exception=exc,
Expand All @@ -215,13 +236,21 @@ def do_cleanup(cur_temp, cooldown_time_after_job):
name='Measurement Job on Green Metrics Tool Cluster failed',
message=f"Run-ID: {job._run_id}\nName: {job._name}\nMachine: {job._machine_description}\n\nDetails can also be found in the log under: {config_main['cluster']['metrics_url']}/stats.html?id={job._run_id}\n\nError message: {exc.__context__}\n{exc}\n"
)
finally: # run periodic maintenance with cleanup in between every run
if not args.testing:
must_revalidate_bc_new_packages = do_maintenance() # when new packages are installed, we must revalidate
last_24h_maintenance = time.time()

else:
set_status('job_no', current_temperature, last_cooldown_time)
set_status('job_no')
if client_main['shutdown_on_job_no']:
subprocess.check_output(['sync'])
time.sleep(60) # sleep for 60 before going to suspend to allow logins to cluster when systems are fresh rebooted for maintenance
subprocess.check_output(['sudo', 'systemctl', client_main['shutdown_on_job_no']])

if not args.testing:
time.sleep(client_main['sleep_time_no_job'])

if args.testing:
print('Successfully ended testing run of client.py')
break
Expand Down
13 changes: 13 additions & 0 deletions docker/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,16 @@ CREATE TABLE carbon_intensity (
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (latitude, longitude, created_at)
);

CREATE TABLE changelog (
id SERIAL PRIMARY KEY,
message text,
machine_id integer REFERENCES machines(id) ON DELETE SET NULL ON UPDATE CASCADE,
created_at timestamp with time zone DEFAULT now(),
updated_at timestamp with time zone
);

CREATE TRIGGER changelog_moddatetime
BEFORE UPDATE ON changelog
FOR EACH ROW
EXECUTE PROCEDURE moddatetime (updated_at);
4 changes: 2 additions & 2 deletions frontend/js/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ $(document).ready(function () {
case 'job_start': return `${el} <span data-inverted data-tooltip="Current Job has started running"><i class="ui question circle icon fluid"></i></span>`;
case 'job_error': return `${el} <span data-inverted data-tooltip="Last job failed"></i></span>`;
case 'job_end': return `${el} <span data-inverted data-tooltip="Current job ended"></i></span>`;
case 'cleanup_start': return `${el} <span data-inverted data-tooltip="Cleanup after job has started"><i class="ui question circle icon fluid"></i></span>`;
case 'cleanup_end': return `${el} <span data-inverted data-tooltip="Cleanup after job has finished"><i class="ui question circle icon fluid"></i></span>`;
case 'maintenance_start': return `${el} <span data-inverted data-tooltip="Maintenance after job has started"><i class="ui question circle icon fluid"></i></span>`;
case 'maintenance_end': return `${el} <span data-inverted data-tooltip="Maintenance after job has finished"><i class="ui question circle icon fluid"></i></span>`;
case 'measurement_control_start': return `${el} <span data-inverted data-tooltip="Periodic Measurement Control job has started"><i class="ui question circle icon fluid"></i></span>`;
case 'cooldown': return `${el} <span data-inverted data-tooltip="Machine is currently cooling down to base temperature"><i class="ui question circle icon fluid"></i></span>`;
case 'measurement_control_error': return `${el} <span data-inverted data-tooltip="Last periodic Measurement Control job has failed"><i class="ui question circle icon fluid"></i></span>`;
Expand Down
Loading