From e611b6959660bc88d8807145faf8d6ebf0788650 Mon Sep 17 00:00:00 2001 From: Soline Date: Mon, 7 Apr 2025 12:17:42 -0400 Subject: [PATCH 1/3] Remove users import from LDAP data --- README.md | 2 +- scripts/read_mila_ldap.py | 423 ---------------------------------- scripts/requirements.txt | 1 - scripts/requirements_orig.txt | 4 - 4 files changed, 1 insertion(+), 429 deletions(-) delete mode 100644 scripts/read_mila_ldap.py delete mode 100644 scripts/requirements_orig.txt diff --git a/README.md b/README.md index 58d2e1c1..fe699b7d 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ Used: ```bash # for main project -python3 -m pip install flask flask-login numpy pymongo oauthlib coverage black ldap3 toml +python3 -m pip install flask flask-login numpy pymongo oauthlib coverage black toml # if you want to OTLP log exporter python3 -m pip opentelemetry-sdk opentelemetry-exporter-otlp # for docs diff --git a/scripts/read_mila_ldap.py b/scripts/read_mila_ldap.py deleted file mode 100644 index 1aa82ae3..00000000 --- a/scripts/read_mila_ldap.py +++ /dev/null @@ -1,423 +0,0 @@ -""" -What this script does -===================== - -This script runs locally every day on a machine at Mila. -It queries our LDAP service for all users and it updates -the MongoDB instance for Clockwork so that the "users" -collection reflects those accounts. - -It's unclear whether we should automatically disable -the existing accounts in CW for members that are not being -mentioned in the LDAP results. We probably want to do -such a thing periodically, with special care instead of -doing it automatically. - -Two mutually-exclusive ways to input the data, by priority: - 1) use the --input_json_file argument - 2) use the LDAP - -Two ways to output the data, that can be used together: - 1) to the --output_json_file - 2) to a MongoDB instance - - -Useful note for later about "clockwork_api_key" -=============================================== - -The two extra fields "clockwork_api_key" and "cc_account_username" -are added when the entries are committed to the database, -but not when they are read or written to a json file. - - -Sample uses -=========== - -Two ways this can be used: - -:: - - python3 read_mila_ldap.py \\ - --local_private_key_file mila_ldap_readonly_credentials/Google_2025_01_26_68154.key \\ - --local_certificate_file mila_ldap_readonly_credentials/Google_2025_01_26_68154.crt \\ - --ldap_service_uri ldaps://ldap.google.com \\ - --mongodb_connection_string ${MONGODB_CONNECTION_STRING} \\ - --output_json_file mila_users.json - - python3 read_mila_ldap.py \\ - --mongodb_connection_string ${MONGODB_CONNECTION_STRING} \\ - --input_json_file mila_users.json - - -LDAP data structure -=================== - -When it comes to the LDAP entries that we get (and need to process), -they are structured as follows: - -:: - - { - "attributes": { - "apple-generateduid": [ - "AF54098F-29AE-990A-B1AC-F63F5A89B89" - ], - "cn": [ - "john.smith", - "John Smith" - ], - "departmentNumber": [], - "displayName": [ - "John Smith" - ], - "employeeNumber": [], - "employeeType": [], - "gecos": [ - "" - ], - "gidNumber": [ - "1500000001" - ], - "givenName": [ - "John" - ], - "googleUid": [ - "john.smith" - ], - "homeDirectory": [ - "/home/john.smith" - ], - "loginShell": [ - "/bin/bash" - ], - "mail": [ - "john.smith@mila.quebec" - ], - "memberOf": [], - "objectClass": [ - "top", - "person", - "organizationalPerson", - "inetOrgPerson", - "posixAccount" - ], - "physicalDeliveryOfficeName": [], - "posixUid": [ - "smithj" - ], - "sn": [ - "Smith" - ], - "suspended": [ - "false" - ], - "telephoneNumber": [], - "title": [], - "uid": [ - "john.smith" - ], - "uidNumber": [ - "1500000001" - ] - }, - "dn": "uid=john.smith,ou=IDT,ou=STAFF,ou=Users,dc=mila,dc=quebec" - } - -""" - -import os -import argparse -import json - -# Requirements -# - pip install ldap3 -from ldap3 import Server, Connection, Tls, ALL_ATTRIBUTES, SUBTREE -import ssl - -from pymongo import MongoClient, UpdateOne - - -parser = argparse.ArgumentParser( - description="Query LDAP and update the MongoDB database users based on values returned." -) -parser.add_argument( - "--local_private_key_file", - type=str, - help="local_private_key_file for LDAP connection", -) -parser.add_argument( - "--local_certificate_file", - type=str, - help="local_certificate_file for LDAP connection", -) -parser.add_argument( - "--ldap_service_uri", - type=str, - default="ldaps://ldap.google.com", - help="ldap service uri", -) -# We have two possible things that we can do with the data fetched. -# Dumping to a json file is possible. -parser.add_argument( - "--mongodb_connection_string", - default=None, - type=str, - help="(optional) MongoDB connection string. Contains username and password.", -) -parser.add_argument( - "--mongodb_database", - default="clockwork", - type=str, - help="(optional) MongoDB database to modify. Better left at default.", -) -parser.add_argument( - "--mongodb_collection", - default="users", - type=str, - help="(optional) MongoDB collection to modify. Better left at default.", -) -parser.add_argument( - "--input_json_file", - default=None, - type=str, - help="(optional) Ignore the LDAP and load from this json file instead.", -) -parser.add_argument( - "--output_json_file", - default=None, - type=str, - help="(optional) Write results to json file.", -) -parser.add_argument( - "--output_raw_LDAP_json_file", - default=None, - type=str, - help="(optional) Write results of the raw LDAP query to json file.", -) - - -def query_ldap(local_private_key_file, local_certificate_file, ldap_service_uri): - """ - Since we don't always query the LDAP (i.e. omitted when --input_json_file is given), - we'll make this a separate function. - """ - - assert os.path.exists( - local_private_key_file - ), f"Missing local_private_key_file {local_private_key_file}." - assert os.path.exists( - local_certificate_file - ), f"Missing local_certificate_file {local_certificate_file}." - - # Prepare TLS Settings - tls_conf = Tls( - local_private_key_file=local_private_key_file, - local_certificate_file=local_certificate_file, - validate=ssl.CERT_REQUIRED, - version=ssl.PROTOCOL_TLSv1_2, - ) - # Connect to LDAP - server = Server(ldap_service_uri, use_ssl=True, tls=tls_conf) - conn = Connection(server) - conn.open() - # Extract all the data - conn.search( - "dc=mila,dc=quebec", - "(objectClass=inetOrgPerson)", - search_scope=SUBTREE, - attributes=ALL_ATTRIBUTES, - ) - # We make the decision here to return only the "attributes" - # and leave out the "dn" field. - return [json.loads(entry.entry_to_json())["attributes"] for entry in conn.entries] - - -def process_user(user_raw: dict) -> dict: - """ - This takes a dict with a LOT of fields, as described by GEN-1744, - and it uses only the following ones, which are renamed. - Note that all entries from `user_raw` are lists, and we expect - them to contain only one element at [0]. - - mail[0] -> mila_email_username (includes the "@mila.quebec") - posixUid[0] -> mila_cluster_username - uidNumber[0] -> mila_cluster_uid - gidNumber[0] -> mila_cluster_gid - displayName[0] -> display_name - suspended[0] -> status (as string "enabled" or "disabled") - - It also asserts, as sanity check, that the entries for - "googleUid" and "uid" match that of "mail" (except for - the "@mila.quebec" suffix). - """ - user = { - # include the suffix "@mila.quebec" - "mila_email_username": user_raw["mail"][0], - "mila_cluster_username": user_raw["posixUid"][0], - "mila_cluster_uid": user_raw["uidNumber"][0], - "mila_cluster_gid": user_raw["gidNumber"][0], - "display_name": user_raw["displayName"][0], - "status": ( - "disabled" - if (user_raw["suspended"][0] in ["True", "true", True]) - else "enabled" - ), - } - assert user_raw["mail"][0].startswith(user_raw["googleUid"][0]) - assert user_raw["mail"][0].startswith(user_raw["uid"][0]) - return user - - -def client_side_user_updates(LD_users_DB, LD_users_LDAP): - """ - Instead of having complicated updates that depend on multiple MongoDB - updates to cover all cases involving the "status" field, we'll do all - that logic locally in this function. - - We have `LD_users_DB` from our database, we have `LD_users_LDAP` from - our LDAP server, and then we return list of updates to be commited. - - Note that both `LD_users_DB` and `LD_users_LDAP` use the same fields. - """ - # The first step is to index everything by unique id, which is - # the "mila_email_username". This is because we'll be matching - # entries from both lists and we want to avoid N^2 performance. - - DD_users_DB = dict((e["mila_email_username"], e) for e in LD_users_DB) - DD_users_LDAP = dict((e["mila_email_username"], e) for e in LD_users_LDAP) - - LD_users_to_update_or_insert = [] - for meu in set(list(DD_users_DB.keys()) + list(DD_users_LDAP.keys())): - # `meu` is short for the mila_email_username value - - if meu in DD_users_DB and not meu in DD_users_LDAP: - # User is in DB but not in the LDAP. Don't change it in DB. - # Don't even update it. This situation is an exception - # what might happen for special admin accounts or something, - # but never with regular student accounts. - continue - elif meu not in DD_users_DB and meu in DD_users_LDAP: - # User is not in DB but is in the LDAP. That's a new entry! - # We want to dress it up and commit it to the DB. - entry = DD_users_LDAP[meu] - entry["cc_account_username"] = None - entry["clockwork_api_key"] = None - entry["cc_account_update_key"] = None - # Any web_settings not present will pull values - # from `get_default_web_settings_values()` later down - # the road. It would actually be better to leave them out - # at this point and not fill them with anything hardcoded. - # Just make sure you make this a dict, though. - entry["web_settings"] = {} - assert "status" in entry # sanity check - else: - # User is in DB and in LDAP. Update it carefully and don't - # disturb the fields that shouldn't be touched. - entry = DD_users_DB[meu] - if DD_users_LDAP[meu]["status"] == "disabled": - # No matter what we said in the database, if the LDAP - # says that it's disabled, then we propagate that change. - # We wouldn't do the same thing with "enabled" in the LDAP, though. - entry["status"] = "disabled" - assert "cc_account_username" in entry # sanity check - assert "clockwork_api_key" in entry # sanity check - - LD_users_to_update_or_insert.append(entry) - return LD_users_to_update_or_insert - - -def run( - local_private_key_file=None, - local_certificate_file=None, - ldap_service_uri=None, - mongodb_connection_string=None, - mongodb_database=None, - mongodb_collection=None, - input_json_file=None, - output_json_file=None, - output_raw_LDAP_json_file=None, - LD_users=None, # for external testing purposes -): - - if LD_users is not None: - # Used mostly for testing purposes. - # Overrides the "input_json_file" argument. - # Just make sure it's a list of dict, at least. - assert isinstance(LD_users, list) - if LD_users: - assert isinstance(LD_users[0], dict) - elif input_json_file: - with open(input_json_file, "r") as f_in: - LD_users = json.load(f_in) - else: - # this is the usual branch taken in practice - LD_users_raw = query_ldap( - local_private_key_file, local_certificate_file, ldap_service_uri - ) - if output_raw_LDAP_json_file: - with open(output_raw_LDAP_json_file, "w") as f_out: - json.dump(LD_users_raw, f_out, indent=4) - print(f"Wrote {output_raw_LDAP_json_file}.") - - LD_users = [process_user(D_user_raw) for D_user_raw in LD_users_raw] - - if mongodb_connection_string and mongodb_database and mongodb_collection: - - users_collection = MongoClient(mongodb_connection_string)[mongodb_database][ - mongodb_collection - ] - - # The "enabled" component has to be dealt with differently. - # - # For a user that already exists in our database, - # - if the LDAP says "disabled", then we propagate that to our database; - # - if the LDAP says "enabled", then we ignore it. - # For a user that is not in our database, - # - we go with whatever the LDAP says. - - LD_users_DB = list(users_collection.find()) - - L_updated_users = client_side_user_updates( - LD_users_DB=LD_users_DB, LD_users_LDAP=LD_users - ) - - L_updates_to_do = [ - UpdateOne( - {"mila_email_username": updated_user["mila_email_username"]}, - { - # We set all the fields corresponding to the fields from `updated_user`, - # so that's a convenient way to do it. Note that this does not affect - # the fields in the database that are already present for that user. - "$set": updated_user, - }, - upsert=True, - ) - for updated_user in L_updated_users - ] - - if L_updates_to_do: - result = users_collection.bulk_write( - L_updates_to_do - ) # <- the actual commit - # print(result.bulk_api_result) - - if output_json_file: - with open(output_json_file, "w") as f_out: - json.dump(LD_users, f_out, indent=4) - print(f"Wrote {output_json_file}.") - - -if __name__ == "__main__": - - args = parser.parse_args() - run( - local_private_key_file=args.local_private_key_file, - local_certificate_file=args.local_certificate_file, - ldap_service_uri=args.ldap_service_uri, - mongodb_connection_string=args.mongodb_connection_string, - mongodb_database=args.mongodb_database, - mongodb_collection=args.mongodb_collection, - input_json_file=args.input_json_file, - output_json_file=args.output_json_file, - output_raw_LDAP_json_file=args.output_raw_LDAP_json_file, - ) diff --git a/scripts/requirements.txt b/scripts/requirements.txt index dce1a7e9..0d9db348 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -4,7 +4,6 @@ dnspython==2.6.1 Flask==3.0.0 itsdangerous==2.1.2 Jinja2==3.1.4 -ldap3==2.9.1 MarkupSafe==2.1.3 pyasn1==0.5.0 pymongo==4.6.3 diff --git a/scripts/requirements_orig.txt b/scripts/requirements_orig.txt deleted file mode 100644 index 5904bcd8..00000000 --- a/scripts/requirements_orig.txt +++ /dev/null @@ -1,4 +0,0 @@ -Flask==3.0.0 -ldap3==2.9.1 -pymongo==4.6.3 -Werkzeug==3.0.4 From ecca6f0e5ef6ad0762d46b056ddfe27afaef6c19 Mon Sep 17 00:00:00 2001 From: Soline Date: Fri, 2 May 2025 09:13:50 -0400 Subject: [PATCH 2/3] Delete cluster connections data and functions --- slurm_state/cluster_desc/beluga.json | 2 - slurm_state/cluster_desc/cedar.json | 2 - slurm_state/cluster_desc/graham.json | 2 - slurm_state/cluster_desc/mila.json | 2 - slurm_state/helpers/clusters_helper.py | 7 -- slurm_state/helpers/ssh_helper.py | 104 -------------------- slurm_state/parsers/entity_parser.py | 78 ++------------- slurm_state/parsers/job_parser.py | 31 ------ slurm_state/parsers/node_parser.py | 6 -- slurm_state/sacct_parser.py | 131 +------------------------ 10 files changed, 8 insertions(+), 357 deletions(-) delete mode 100644 slurm_state/helpers/ssh_helper.py diff --git a/slurm_state/cluster_desc/beluga.json b/slurm_state/cluster_desc/beluga.json index 5ceea780..9aabac56 100644 --- a/slurm_state/cluster_desc/beluga.json +++ b/slurm_state/cluster_desc/beluga.json @@ -1,7 +1,5 @@ {"name": "beluga", "hostname": "beluga.computecanada.ca", - "username": "mila-automation", - "port": 22, "slurm_login_nodes": [ ], "slurm_partitions": [ "cpularge_bycore_b4", "cpubase_bycore_b2", "gpubackfill", "cpubase_bycore_b4", diff --git a/slurm_state/cluster_desc/cedar.json b/slurm_state/cluster_desc/cedar.json index 1b31cf4e..09a4aac8 100644 --- a/slurm_state/cluster_desc/cedar.json +++ b/slurm_state/cluster_desc/cedar.json @@ -1,7 +1,5 @@ {"name": "cedar", "hostname": "cedar.computecanada.ca", - "username": "mila-automation", - "port": 22, "slurm_login_nodes": [ ], "slurm_partitions": [], "slurm_ignore_partitions": [], diff --git a/slurm_state/cluster_desc/graham.json b/slurm_state/cluster_desc/graham.json index e1a19c0d..bdad9a1d 100644 --- a/slurm_state/cluster_desc/graham.json +++ b/slurm_state/cluster_desc/graham.json @@ -1,7 +1,5 @@ {"name": "graham", "hostname": "graham.computecanada.ca", - "username": "mila-automation", - "port": 22, "slurm_login_nodes": [ ], "slurm_partitions": [ "cpularge_bycore_b4", "cpubase_bycore_b2", "cpularge_bynode_b6", "cpularge_bynode_b3", diff --git a/slurm_state/cluster_desc/mila.json b/slurm_state/cluster_desc/mila.json index 4093f847..0ec03432 100644 --- a/slurm_state/cluster_desc/mila.json +++ b/slurm_state/cluster_desc/mila.json @@ -1,7 +1,5 @@ {"name": "mila", "hostname": "login.server.mila.quebec", - "username": "alaingui", - "port": 2222, "slurm_login_nodes": [ "login-1", "login-2", "login-3", "login-4"], "slurm_partitions": ["debug", "long", "cpu_jobs_low-grace", "unkillable", "short-unkillable", "main", "long-grace", "cpu_jobs_low", "cpu_jobs", "main-grace"], "slurm_ignore_partitions": diff --git a/slurm_state/helpers/clusters_helper.py b/slurm_state/helpers/clusters_helper.py index c17a8293..bd8edd0e 100644 --- a/slurm_state/helpers/clusters_helper.py +++ b/slurm_state/helpers/clusters_helper.py @@ -7,7 +7,6 @@ string, optional_string, string_list, - integer, timezone, SubdictValidator, register_config, @@ -57,14 +56,8 @@ def _load_clusters_from_config(): clusters_valid.add_field("timezone", timezone) # SSH connection and data fetching variables - clusters_valid.add_field("remote_user", optional_string) clusters_valid.add_field("remote_hostname", optional_string) - clusters_valid.add_field("ssh_key_filename", string) - clusters_valid.add_field("ssh_port", integer) - - clusters_valid.add_field("sacct_path", optional_string) - clusters_valid.add_field("sinfo_path", optional_string) clusters_valid.add_field("slurm_version", optional_string, default=None) # Load the clusters from the configuration file, asserting that it uses the diff --git a/slurm_state/helpers/ssh_helper.py b/slurm_state/helpers/ssh_helper.py deleted file mode 100644 index 851ea878..00000000 --- a/slurm_state/helpers/ssh_helper.py +++ /dev/null @@ -1,104 +0,0 @@ -import os - -from paramiko import SSHClient, AutoAddPolicy, ssh_exception, RSAKey - - -def open_connection(hostname, username, ssh_key_path, port=22): - """ - If successful, this will connect to the remote server and - the value of self.ssh_client will be usable. - Otherwise, this will set self.ssh_client=None or it will quit(). - """ - - ssh_client = SSHClient() - ssh_client.set_missing_host_key_policy(AutoAddPolicy()) - ssh_client.load_system_host_keys() - assert os.path.exists( - ssh_key_path - ), f"Error. The absolute path given for ssh_key_path does not exist: {ssh_key_path} ." - pkey = RSAKey.from_private_key_file(ssh_key_path) - - # The call to .connect was seen to raise an exception now and then. - # raise AuthenticationException("Authentication timeout.") - # paramiko.ssh_exception.AuthenticationException: Authentication timeout. - # When it happens, we should simply give up on the attempt - # and log the error to stdout. - try: - # For some reason, we really need to specify which key_filename to use. - ssh_client.connect( - hostname, username=username, port=port, pkey=pkey, look_for_keys=False - ) - print(f"Successful SSH connection to {username}@{hostname} port {port}.") - except ssh_exception.AuthenticationException as inst: - print(f"Error in SSH connection to {username}@{hostname} port {port}.") - print(type(inst)) - print(inst) - # set the ssh_client to None as a way to communicate - # to the parent that we got into trouble - ssh_client = None - except Exception as inst: - print(f"Error in SSH connection to {username}@{hostname} port {port}.") - print(type(inst)) - print(inst) - ssh_client = None - - return ssh_client - - -def launch_slurm_command(command, hostname, username, ssh_key_filename, port=22): - """ - Launch a Slurm command through SSH and retrieve its response. - - Parameters: - command The Slurm command to launch through SSH - hostname The hostname used for the SSH connection to launch the Slurm command - username The username used for the SSH connection to launch the Slurm command - ssh_key_filename The name of the private key in .ssh folder used for the SSH connection to launch the Slurm command - port The port used for the SSH connection to launch the sinfo command - """ - # Print the command to use - print(f"The command launched through SSH is:\n{command}") - - # Check the given SSH key - assert ssh_key_filename, "Missing ssh_key_filename from config." - - # Now this is the private ssh key that we are using with Paramiko. - ssh_key_path = os.path.join(os.path.expanduser("~"), ".ssh", ssh_key_filename) - - # Connect through SSH - try: - ssh_client = open_connection( - hostname, username, ssh_key_path=ssh_key_path, port=port - ) - except Exception as inst: - print( - f"Error. Failed to connect to {hostname} to launch the command:\n{command}" - ) - print(inst) - return [] - - # If a connection has been established - if ssh_client: - ssh_stdin, ssh_stdout, ssh_stderr = ssh_client.exec_command(command) - - # We should find a better option to retrieve stderr - """ - response_stderr = "".join(ssh_stderr.readlines()) - if len(response_stderr): - print( - f"Stderr in sinfo call on {hostname}. This doesn't mean that the call failed entirely, though.\n{response_stderr}" - ) - """ - stdout = ssh_stdout.readlines() - ssh_client.close() - return stdout - - else: - print( - f"Error. Failed to connect to {hostname} to make the call. Returned `None` but no exception was thrown." - ) - - # If no SSH connection has been established, raise an exception - raise Exception( - f"No SSH connection has been established while trying to run {command}." - ) diff --git a/slurm_state/parsers/entity_parser.py b/slurm_state/parsers/entity_parser.py index 4e1ff3a0..341f0ad7 100644 --- a/slurm_state/parsers/entity_parser.py +++ b/slurm_state/parsers/entity_parser.py @@ -18,80 +18,16 @@ def __init__(self, entity, cluster_name, slurm_command=None, slurm_version=None) self.cluster = get_all_clusters()[cluster_name] self.cluster["name"] = cluster_name - self.slurm_command = slurm_command - # Retrieve the path to the Slurm command we want to launch on the cluster - # It is stored in the cluster data under the key "sacct_path" for the sacct command - # and "sinfo_path" for the sinfo command - self.slurm_command_path = self.cluster[f"{self.slurm_command}_path"] - # Check if slurm_command_path exists - assert ( - self.slurm_command_path - ), f"Error. We have called the function to make updates with {self.slurm_command} but the {self.slurm_command}_path config is empty." - assert self.slurm_command_path.endswith( - self.slurm_command - ), f"Error. The {self.slurm_command}_path configuration needs to end with '{self.slurm_command}'. It is currently {self.slurm_command_path} ." - + # Get the Slurm version associated to the cluster if slurm_version is not None: self.slurm_version = slurm_version + elif "slurm_version" in self.cluster and self.cluster["slurm_version"] is not None: + self.slurm_version = self.cluster["slurm_version"] else: - # If no Slurm version is provided, retrieve the version of Slurm installed on the current cluster - self.slurm_version = self.get_slurm_version() - - def get_slurm_version(self): - """ - Get the Slurm version - """ - if ( - "slurm_version" in self.cluster - and self.cluster["slurm_version"] is not None - ): - # If the Slurm version has been added to the configuration file, - # return the value of the configuration - return self.cluster["slurm_version"] - else: - # Launch the sacct or sinfo command to get its version - remote_command = f"{self.slurm_command_path} -V" - response = self.launch_slurm_command(remote_command) - assert len(response) == 1 - version_regex = re.compile(r"^slurm (\d+\.\d+\.\d+)$") - if m := version_regex.match(response[0]): - return m.group(1) - # If the version has not been identified, raise an error - raise Exception( - f'The version "{response[0]}" has not been recognized as a Slurm version.' - ) - - def launch_slurm_command(self, remote_command): - """ """ - return launch_slurm_command( - remote_command, - self.cluster["remote_hostname"], - self.cluster["remote_user"], - self.cluster["ssh_key_filename"], - self.cluster["ssh_port"], - ) - - def generate_report(self, remote_command, file_name): - """ - Launch a Slurm command in order to retrieve JSON report containing - jobs or nodes information - - Parameters: - cluster_name The name of the cluster on which the Slurm command will be launched - remote_command The command used to retrieve the data from Slurm - file_name The path of the report file to write - """ - # Launch the requested command in order to retrieve Slurm information - stdout = self.launch_slurm_command(remote_command) - - # Create directories if needed - os.makedirs(os.path.dirname(file_name), exist_ok=True) - - # Write the command output to a file - with open(file_name, "w") as outfile: - for line in stdout: - outfile.write(line) - + # If no Slurm version is provided, whether by configuration file nor parameters, + # raise an error + raise Exception(f"No Slurm version has been identified fo the {self.cluster['name']} cluster. Please provide it through the configuration file or the command parameters.") + class IdentityParser(EntityParser): def __init__(self, entity, cluster_name): diff --git a/slurm_state/parsers/job_parser.py b/slurm_state/parsers/job_parser.py index 01a92508..fe8b42d1 100644 --- a/slurm_state/parsers/job_parser.py +++ b/slurm_state/parsers/job_parser.py @@ -25,37 +25,6 @@ class JobParser(EntityParser): def __init__(self, cluster_name, slurm_version=None): super().__init__("jobs", cluster_name, "sacct", slurm_version=slurm_version) - def generate_report(self, file_name): - - # Retrieve the allocations associated to the cluster - allocations = self.cluster["allocations"] - - if allocations == []: - # If the cluster has no associated allocation, nothing is requested - print( - f"The cluster {self.cluster['name']} has no allocation related to it. Thus, no job has been retrieved. Associated allocations can be provided in the Clockwork configuration file." - ) - return [] - else: - # Set the sacct command - # -S is a condition on the start time, 600 being in seconds - # -E is a condition on the end time - # -X means "Only show statistics relevant to the job allocation itself, not taking steps into consideration." - # --associations is used in order to limit the fetched jobs to the ones related to Mila and/or professors who - # may use Clockwork - if allocations == "*": - # We do not provide --associations information because the default for this parameter - # is "all associations" - remote_command = ( - f"{self.slurm_command_path} -S now-600 -E now -X --allusers --json" - ) - else: - accounts_list = ",".join(allocations) - remote_command = f"{self.slurm_command_path} -S now-600 -E now -X --accounts={accounts_list} --allusers --json" - print(f"remote_command is\n{remote_command}") - - return super().generate_report(remote_command, file_name) - def parser(self, f): """ """ if re.search(r"^21\..*$", self.slurm_version): diff --git a/slurm_state/parsers/node_parser.py b/slurm_state/parsers/node_parser.py index 411cee42..2708a31a 100644 --- a/slurm_state/parsers/node_parser.py +++ b/slurm_state/parsers/node_parser.py @@ -19,12 +19,6 @@ class NodeParser(EntityParser): def __init__(self, cluster_name, slurm_version=None): super().__init__("nodes", cluster_name, "sinfo", slurm_version=slurm_version) - def generate_report(self, file_name): - # The command to be launched through SSH is "sinfo --json" - remote_command = f"{self.slurm_command_path} --json" - - return super().generate_report(remote_command, file_name) - def parser(self, f): """ """ if re.search(r"^21\..*$", self.slurm_version): diff --git a/slurm_state/sacct_parser.py b/slurm_state/sacct_parser.py index 39f87ecb..293bc630 100644 --- a/slurm_state/sacct_parser.py +++ b/slurm_state/sacct_parser.py @@ -3,12 +3,7 @@ to jobs in the format used by Clockwork. """ -import json, os - -# Imports related to sacct call -# https://docs.paramiko.org/en/stable/api/client.html -from slurm_state.helpers.ssh_helper import open_connection -from slurm_state.helpers.clusters_helper import get_all_clusters +import json # These functions are translators used in order to handle the values # we could encounter while parsing a job dictionary retrieved from a @@ -325,127 +320,3 @@ def job_parser(f): # If no translator has been provided: ignore the field yield res_job - - -# The functions used to create the report file, gathering the information to parse - - -def generate_job_report( - cluster_name, - file_name, -): - """ - Launch a sacct command in order to retrieve a JSON report containing - jobs information - - Parameters: - cluster_name The name of the cluster on which the sinfo command will be launched - file_name Path to store the generated sacct report - - """ - # Retrieve the cluster's information from the configuration file - cluster = get_all_clusters()[cluster_name] - - # Retrieve from the configuration file the elements used to establish a SSH connection - # to a remote cluster and launch the sacct command on it - username = cluster[ - "remote_user" - ] # The username used for the SSH connection to launch the sacct command - hostname = cluster[ - "remote_hostname" - ] # The hostname used for the SSH connection to launch the sacct command - port = cluster[ - "ssh_port" - ] # The port used for the SSH connection to launch the sacct command - sacct_path = cluster[ - "sacct_path" - ] # The path of the sacct executable on the cluster side - ssh_key_filename = cluster[ - "ssh_key_filename" - ] # The name of the private key in .ssh folder used for the SSH connection to launch the sacct command - - # sacct path checks - assert ( - sacct_path - ), "Error. We have called the function to make updates with sacct but the sacct_path config is empty." - assert sacct_path.endswith( - "sacct" - ), f"Error. The sacct_path configuration needs to end with 'sacct'. It's currently {sacct_path} ." - - # SSH key check - assert ssh_key_filename, "Missing ssh_key_filename from config." - - # Now this is the private ssh key that we'll be using with Paramiko. - ssh_key_path = os.path.join(os.path.expanduser("~"), ".ssh", ssh_key_filename) - - # Note : It doesn't work to simply start the command with "sacct". - # For some reason due to paramiko not loading the environment variables, - # sacct is not found in the PATH. - # This does not work - # remote_cmd = "sacct -X -j " + ",".join(L_job_ids) - # but if we use - # remote_cmd = "/opt/slurm/bin/sacct ..." - # then it works. We have to hardcode the path in each cluster, it seems. - - # Retrieve the allocations associated to the cluster - allocations = cluster["allocations"] - - if allocations == []: - # If the cluster has no associated allocation, nothing is requested - print( - f"The cluster {cluster_name} has no allocation related to it. Thus, no job has been retrieved. Associated allocations can be provided in the Clockwork configuration file." - ) - return [] - else: - # Set the sacct command - # -S is a condition on the start time, 600 being in seconds - # -E is a condition on the end time - # -X means "Only show statistics relevant to the job allocation itself, not taking steps into consideration." - # --associations is used in order to limit the fetched jobs to the ones related to Mila and/or professors who - # may use Clockwork - if allocations == "*": - # We do not provide --associations information because the default for this parameter - # is "all associations" - remote_cmd = f"{sacct_path} -S now-600 -E now -X --allusers --json" - else: - accounts_list = ",".join(allocations) - remote_cmd = f"{sacct_path} -S now-600 -E now -X --accounts={accounts_list} --allusers --json" - print(f"remote_cmd is\n{remote_cmd}") - - # Connect through SSH - try: - ssh_client = open_connection( - hostname, username, ssh_key_path=ssh_key_path, port=port - ) - except Exception as inst: - print(f"Error. Failed to connect to {hostname} to make a call to sacct.") - print(inst) - return [] - - if ssh_client: - # those three variables are file-like, not strings - ssh_stdin, ssh_stdout, ssh_stderr = ssh_client.exec_command(remote_cmd) - - # We should find a better option to retrieve stderr - """ - response_stderr = "".join(ssh_stderr.readlines()) - if len(response_stderr): - print( - f"Stderr in sacct call on {hostname}. This doesn't mean that the call failed entirely, though.\n{response_stderr}" - ) - """ - - # Create directories if needed - os.makedirs(os.path.dirname(file_name), exist_ok=True) - - # Write the command output to a file - with open(file_name, "w") as outfile: - for line in ssh_stdout.readlines(): - outfile.write(line) - - ssh_client.close() - - else: - print( - f"Error. Failed to connect to {hostname} to make call to sacct. Returned `None` but no exception was thrown." - ) From 355aabb8da6f2ec2b18e9f34c0968d28a70c725a Mon Sep 17 00:00:00 2001 From: Soline Date: Fri, 2 May 2025 09:15:06 -0400 Subject: [PATCH 3/3] Black reformatting --- slurm_state/parsers/entity_parser.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/slurm_state/parsers/entity_parser.py b/slurm_state/parsers/entity_parser.py index 341f0ad7..876e8402 100644 --- a/slurm_state/parsers/entity_parser.py +++ b/slurm_state/parsers/entity_parser.py @@ -21,13 +21,18 @@ def __init__(self, entity, cluster_name, slurm_command=None, slurm_version=None) # Get the Slurm version associated to the cluster if slurm_version is not None: self.slurm_version = slurm_version - elif "slurm_version" in self.cluster and self.cluster["slurm_version"] is not None: + elif ( + "slurm_version" in self.cluster + and self.cluster["slurm_version"] is not None + ): self.slurm_version = self.cluster["slurm_version"] else: # If no Slurm version is provided, whether by configuration file nor parameters, # raise an error - raise Exception(f"No Slurm version has been identified fo the {self.cluster['name']} cluster. Please provide it through the configuration file or the command parameters.") - + raise Exception( + f"No Slurm version has been identified fo the {self.cluster['name']} cluster. Please provide it through the configuration file or the command parameters." + ) + class IdentityParser(EntityParser): def __init__(self, entity, cluster_name):