From c3e11bd150befec01155379077e91a5f30927df1 Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 9 Jan 2026 17:03:55 +0100 Subject: [PATCH 1/2] chore: include a simple script to test computing resources without going through the WMS --- .../Resources/computingelements.rst | 9 + tests/System/ce_test.py | 289 ++++++++++++++++++ 2 files changed, 298 insertions(+) create mode 100644 tests/System/ce_test.py diff --git a/docs/source/AdministratorGuide/Resources/computingelements.rst b/docs/source/AdministratorGuide/Resources/computingelements.rst index f33497dc7d2..7f62b1db0c0 100644 --- a/docs/source/AdministratorGuide/Resources/computingelements.rst +++ b/docs/source/AdministratorGuide/Resources/computingelements.rst @@ -205,3 +205,12 @@ These can be specified in the CEDefaults section to apply a standardised slot si } } } + +Debugging Computing Element Issues +@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ + +When troubleshooting Computing Element connectivity or job submission problems, you can use the +`ce_test.py <../../../../tests/System/ce_test.py>`_ script to systematically test CE interactions. + +This script validates CE functionality by testing status retrieval, job submission, monitoring, and output collection. +For detailed usage instructions, prerequisites, and examples, refer to the script's docstring. diff --git a/tests/System/ce_test.py b/tests/System/ce_test.py new file mode 100644 index 00000000000..54535aba394 --- /dev/null +++ b/tests/System/ce_test.py @@ -0,0 +1,289 @@ +"""Test the interactions with a given set of Computing Elements (CE). For each CE: +- Get the CE status if available +- Submit a job to the CE +- Get the job status +- Get the job output/error/log if available + +Conditions: +- The CEs must be configured in the DIRAC configuration +- The script should be executed with an admin proxy: used to fetch a pilot proxy and a token +- The script should be executed: + - in a DIRAC client environment for normal CEs, such as AREX and HTCondor + - in a DIRAC host environment for SSH/Local CEs (credentials would not be available otherwise) +""" +import concurrent.futures +import time +from pathlib import Path +from typing import List + +import typer + +import DIRAC +from DIRAC import gLogger +from DIRAC.ConfigurationSystem.Client.Helpers import Operations, Registry +from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues +from DIRAC.Core.Base.Script import Script +from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager +from DIRAC.Resources.Computing.ComputingElement import ComputingElement +from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES +from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved + +app = typer.Typer(help="Test the interactions with a given set of Computing Elements (CE)") + + +def findGenericCreds(vo: str): + """ + Find the generic pilot credentials for the given VO. + :param vo: The Virtual Organization to use for credentials. + :return: A tuple containing the pilot DN and group. + """ + opsHelper = Operations.Operations(vo=vo) + + pilotGroup = opsHelper.getValue("Pilot/GenericPilotGroup", "") + pilotDN = opsHelper.getValue("Pilot/GenericPilotDN", "") + if not pilotDN: + pilotUser = opsHelper.getValue("Pilot/GenericPilotUser", "") + if pilotUser: + result = Registry.getDNForUsername(pilotUser) + if result["OK"]: + pilotDN = result["Value"][0] + return pilotDN, pilotGroup + + +def getCredentials(pilotDN: str, pilotGroup: str, ce: ComputingElement): + """ + Get the pilot credentials for the dn/group. + :return: A tuple containing the proxy and the token. + """ + # Get the pilot proxy from the ProxyManager + result = gProxyManager.getPilotProxyFromDIRACGroup(pilotDN, pilotGroup, 3600) + if not result["OK"]: + gLogger.error("Cannot get pilot proxy", result["Message"]) + return None, None + proxy = result["Value"] + + # Get the pilot token from the TokenManager + result = gTokenManager.getToken( + userGroup=pilotGroup, + scope=PILOT_SCOPES, + audience=ce.audienceName, + requiredTimeLeft=1200, + ) + if not result["OK"]: + gLogger.error("Cannot get pilot token", result["Message"]) + return None, None + token = result["Value"] + + return proxy, token + + +def buildQueues(vo: str, sites: list[str], ces: list[str], ceTypes: list[str]) -> dict: + """ + Get the list of queues for the given community, site list, CE list, and CE type list. + :param vo: The Virtual Organization to use for credentials. + :param sites: The list of sites to filter the queues. + :param ces: The list of Computing Elements to filter the queues. + :param ceTypes: The list of Computing Element types to filter the queues. + :return: A dictionary containing the queues for the given parameters. + """ + result = getQueues( + community=vo, + siteList=sites, + ceList=ces, + ceTypeList=ceTypes, + ) + if not result["OK"]: + gLogger.error("Cannot get queues", result["Message"]) + return {} + + result = getQueuesResolved( + siteDict=result["Value"], + queueCECache={}, + vo=vo, + instantiateCEs=True, + ) + if not result["OK"]: + gLogger.error("Cannot resolve queues", result["Message"]) + return {} + + return result["Value"] + + +def interactWithCE(ce: ComputingElement): + """ + Interact with a given Computing Element (CE). + :param ceName: The name of the CE. + :param port: The port of the CE. + :param vo: The Virtual Organization to use for credentials. + """ + checks = { + "ce_status": {"OK": False, "Message": ""}, + "job_submit": {"OK": False, "Message": ""}, + "job_status": {"OK": False, "Message": ""}, + "job_output": {"OK": False, "Message": ""}, + "job_log": {"OK": False, "Message": ""}, + } + + # Get CE Status + gLogger.info(f"[{ce.ceName}]Getting CE status") + result = ce.getCEStatus() + if not result["OK"] and ce.ceType != "HTCondorCE": + gLogger.error(f"[{ce.ceName}]Cannot get CE status: {result['Message']}") + checks["ce_status"]["Message"] = result["Message"] + return checks + checks["ce_status"]["OK"] = True + + # Submit a job to the CE + gLogger.info(f"[{ce.ceName}]Submitting a job") + + res = ce.submitJob("workloadExec.sh", None) + if not res["OK"]: + gLogger.error(f"[{ce.ceName}]Cannot submit job to CE: {res['Message']}") + checks["job_submit"]["Message"] = res["Message"] + return checks + checks["job_submit"]["OK"] = True + + job_id = res["Value"][0] + stamp = res["PilotStampDict"][job_id] + + # Wait for the job to finish + gLogger.info(f"[{ce.ceName}]Getting job status") + + status = "Waiting" + while status != "Done" and status != "Failed": + res = ce.getJobStatus([job_id]) + if not res["OK"]: + gLogger.error(f"[{ce.ceName}]Cannot get job status: {res['Message']}") + checks["job_status"]["Message"] = res["Message"] + return checks + status = res["Value"][job_id] + time.sleep(5) + gLogger.verbose(f"[{ce.ceName}]Job {job_id} in status {status}") + checks["job_status"]["OK"] = True + + # Get job output, error, and log + gLogger.info(f"[{ce.ceName}]Getting job output and log") + + res = ce.getJobOutput(job_id + ":::" + stamp) + if not res["OK"]: + gLogger.error(f"[{ce.ceName}]Cannot get job output: {res['Message']}") + checks["job_output"]["Message"] = res["Message"] + return checks + checks["job_output"]["OK"] = True + + if hasattr(ce, "getJobLog"): + res = ce.getJobLog(job_id + ":::" + stamp) + if not res["OK"]: + gLogger.error(f"[{ce.ceName}]Cannot get job log: {res['Message']}") + checks["job_log"]["Message"] = res["Message"] + return checks + checks["job_log"]["OK"] = True + return checks + + +@app.command() +def main( + vo: str = typer.Argument(help="Select Virtual Organization"), + sites: list[str] = typer.Option([], "--site", help="Select sites (can be used multiple times)"), + ces: list[str] = typer.Option([], "--ce", help="Select CEs (can be used multiple times)"), + ce_types: list[str] = typer.Option([], "--ce-type", help="Select CE types (can be used multiple times)"), + log_level: str = typer.Option("INFO", "--log-level", help="Set the log level (DEBUG, VERBOSE, INFO)"), + script: str = typer.Option(None, "--script", help="Path to custom executable script (default: workloadExec.sh)"), +): + """Test the interactions with a given set of Computing Elements (CE).""" + Script.initialize() + + if log_level: + gLogger.setLevel(log_level.upper()) + # If you set a log level for a specific backend and want more details to debug + # then uncomment the next line + # gLogger._backendsList[0]._handler.setLevel(log_level.upper()) + + # Get credentials for the given VO + pilotDN, pilotGroup = findGenericCreds(vo) + if not pilotDN or not pilotGroup: + gLogger.error("Cannot get pilot credentials") + DIRAC.exit(1) + + # Get the queues + queueDict = buildQueues( + vo=vo, + sites=sites, + ces=ces, + ceTypes=ce_types, + ) + if not queueDict: + gLogger.error("Cannot get queues") + DIRAC.exit(1) + + if script: + # Use the provided custom script + gLogger.info(f"Using custom script: {script}") + executable = Path(script) + if not executable.exists() or not executable.is_file(): + gLogger.error(f"Provided script {script} does not exist or is not a file") + DIRAC.exit(1) + else: + # Create default workloadExec.sh + gLogger.info("Creating default workloadExec.sh") + executable = Path("workloadExec.sh") + with open(executable, "w") as f: + f.write("#!/bin/bash\n") + f.write("echo 'Hello from DIRAC!'\n") + + # Make sure the script is executable + executable.chmod(0o755) + + # Prepare to interact with each CE + overallState = {} + + def process_queue(queueName): + ce = queueDict[queueName]["CE"] + if ce.ceType != "SSH": + gLogger.info(f"Getting creds for CE: {ce.ceName} ({ce.ceType})") + proxy, token = getCredentials(pilotDN, pilotGroup, ce) + if not proxy or not token: + DIRAC.exit(1) + ce.setProxy(proxy) + if "Token" in ce.ceParameters.get("Tag", []): + ce.setToken(token) + if ce.ceType == "HTCondorCE": + ce.workingDirectory = str(Path.cwd()) + gLogger.info(f"Interacting with CE: {ce.ceName} ({ce.ceType})") + return queueName, interactWithCE(ce) + + with concurrent.futures.ThreadPoolExecutor() as executor: + results = executor.map(process_queue, list(queueDict.keys())) + for queueName, state in results: + overallState[queueName] = state + + # Clean up the script file after submission (only if auto-generated) + if not script: + executable.unlink() + + gLogger.info("Overall interaction state:") + total = len(overallState) + # human-friendly names for each check + pretty = { + "ce_status": "reported CE status", + "job_submit": "submitted a job", + "job_status": "retrieved job status", + "job_output": "fetched job output", + "job_log": "fetched job log", + } + + for check in pretty: + okCount = sum(1 for queueState in overallState.values() if queueState[check]["OK"]) + issueCount = total - okCount + pct = int(okCount / total * 100) if total else 0 + typer.echo(f"- {pct}% of the queues correctly {pretty[check]}. " f"Issues with {issueCount} queue(s):") + for qname, qState in overallState.items(): + if not qState[check]["OK"]: + msg = qState[check]["Message"] or "unknown error" + typer.echo(f" - {qname}: {msg}") + typer.echo("") + + +if __name__ == "__main__": + app() From 7e7e9323cfbd5ded3775ffcffa3add14753bb2be Mon Sep 17 00:00:00 2001 From: aldbr Date: Wed, 11 Feb 2026 17:55:36 +0100 Subject: [PATCH 2/2] refactor: turn ce_test into a dirac admin command --- .../Resources/computingelements.rst | 6 +- setup.cfg | 1 + .../scripts/dirac_admin_debug_ce.py | 199 +++++++++++------- 3 files changed, 132 insertions(+), 74 deletions(-) rename tests/System/ce_test.py => src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_debug_ce.py (59%) diff --git a/docs/source/AdministratorGuide/Resources/computingelements.rst b/docs/source/AdministratorGuide/Resources/computingelements.rst index 7f62b1db0c0..9ecfa12a820 100644 --- a/docs/source/AdministratorGuide/Resources/computingelements.rst +++ b/docs/source/AdministratorGuide/Resources/computingelements.rst @@ -210,7 +210,7 @@ Debugging Computing Element Issues @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ When troubleshooting Computing Element connectivity or job submission problems, you can use the -`ce_test.py <../../../../tests/System/ce_test.py>`_ script to systematically test CE interactions. +:py:mod:`~DIRAC.WorkloadManagementSystem.scripts.dirac_admin_debug_ce` command to systematically test CE interactions. -This script validates CE functionality by testing status retrieval, job submission, monitoring, and output collection. -For detailed usage instructions, prerequisites, and examples, refer to the script's docstring. +This command validates CE functionality by testing status retrieval, job submission, monitoring, and output collection. +For detailed usage instructions, prerequisites, and examples, run ``dirac-admin-debug-ce --help``. diff --git a/setup.cfg b/setup.cfg index f03d63d3f29..7becf2289f7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -307,6 +307,7 @@ console_scripts = dirac-transformation-update-derived = DIRAC.TransformationSystem.scripts.dirac_transformation_update_derived:main [admin] # WorkloadManagementSystem dirac-admin-add-pilot = DIRAC.WorkloadManagementSystem.scripts.dirac_admin_add_pilot:main [pilot] + dirac-admin-debug-ce = DIRAC.WorkloadManagementSystem.scripts.dirac_admin_debug_ce:main [admin] dirac-admin-kill-pilot = DIRAC.WorkloadManagementSystem.scripts.dirac_admin_kill_pilot:main [admin] dirac-admin-show-task-queues = DIRAC.WorkloadManagementSystem.scripts.dirac_admin_show_task_queues:main [admin] dirac-admin-sync-pilot = DIRAC.WorkloadManagementSystem.scripts.dirac_admin_sync_pilot:main [admin] diff --git a/tests/System/ce_test.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_debug_ce.py similarity index 59% rename from tests/System/ce_test.py rename to src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_debug_ce.py index 54535aba394..dcae26da313 100644 --- a/tests/System/ce_test.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_debug_ce.py @@ -1,45 +1,80 @@ -"""Test the interactions with a given set of Computing Elements (CE). For each CE: +#!/usr/bin/env python +""" +Test the interactions with a given set of Computing Elements (CE). For each CE: + - Get the CE status if available - Submit a job to the CE - Get the job status - Get the job output/error/log if available Conditions: + - The CEs must be configured in the DIRAC configuration - The script should be executed with an admin proxy: used to fetch a pilot proxy and a token - The script should be executed: + - in a DIRAC client environment for normal CEs, such as AREX and HTCondor - in a DIRAC host environment for SSH/Local CEs (credentials would not be available otherwise) + +Usage: + dirac-admin-debug-ce [--site ] [--ce ] [--ce-type ] [--script ] + +Example: + $ dirac-admin-debug-ce dteam --site LCG.CERN.cern --ce-type HTCondorCE """ import concurrent.futures import time from pathlib import Path -from typing import List - -import typer import DIRAC -from DIRAC import gLogger -from DIRAC.ConfigurationSystem.Client.Helpers import Operations, Registry -from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues +from DIRAC import S_OK, gLogger from DIRAC.Core.Base.Script import Script -from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager -from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager -from DIRAC.Resources.Computing.ComputingElement import ComputingElement -from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES -from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved -app = typer.Typer(help="Test the interactions with a given set of Computing Elements (CE)") +# Maximum time (in seconds) to wait for a job to reach a final state +JOB_STATUS_TIMEOUT = 1800 +sites = [] +ces = [] +ceTypes = [] +scriptPath = None -def findGenericCreds(vo: str): - """ - Find the generic pilot credentials for the given VO. - :param vo: The Virtual Organization to use for credentials. + +def setTimeout(val): + global JOB_STATUS_TIMEOUT + JOB_STATUS_TIMEOUT = int(val) + return S_OK() + + +def addSite(val): + sites.append(val) + return S_OK() + + +def addCE(val): + ces.append(val) + return S_OK() + + +def addCEType(val): + ceTypes.append(val) + return S_OK() + + +def setScript(val): + global scriptPath + scriptPath = val + return S_OK() + + +def findGenericCreds(vo): + """Find the generic pilot credentials for the given VO. + + :param str vo: The Virtual Organization to use for credentials. :return: A tuple containing the pilot DN and group. """ - opsHelper = Operations.Operations(vo=vo) + from DIRAC.ConfigurationSystem.Client.Helpers import Operations, Registry + opsHelper = Operations.Operations(vo=vo) pilotGroup = opsHelper.getValue("Pilot/GenericPilotGroup", "") pilotDN = opsHelper.getValue("Pilot/GenericPilotDN", "") if not pilotDN: @@ -51,19 +86,24 @@ def findGenericCreds(vo: str): return pilotDN, pilotGroup -def getCredentials(pilotDN: str, pilotGroup: str, ce: ComputingElement): - """ - Get the pilot credentials for the dn/group. - :return: A tuple containing the proxy and the token. +def getCredentials(pilotDN, pilotGroup, ce): + """Get the pilot credentials for the dn/group. + + :param str pilotDN: The pilot DN. + :param str pilotGroup: The pilot group. + :param ce: The Computing Element instance. + :return: A tuple containing the proxy and the token, or (None, None) on failure. """ - # Get the pilot proxy from the ProxyManager + from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager + from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager + from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES + result = gProxyManager.getPilotProxyFromDIRACGroup(pilotDN, pilotGroup, 3600) if not result["OK"]: gLogger.error("Cannot get pilot proxy", result["Message"]) return None, None proxy = result["Value"] - # Get the pilot token from the TokenManager result = gTokenManager.getToken( userGroup=pilotGroup, scope=PILOT_SCOPES, @@ -78,15 +118,18 @@ def getCredentials(pilotDN: str, pilotGroup: str, ce: ComputingElement): return proxy, token -def buildQueues(vo: str, sites: list[str], ces: list[str], ceTypes: list[str]) -> dict: - """ - Get the list of queues for the given community, site list, CE list, and CE type list. - :param vo: The Virtual Organization to use for credentials. - :param sites: The list of sites to filter the queues. - :param ces: The list of Computing Elements to filter the queues. - :param ceTypes: The list of Computing Element types to filter the queues. +def buildQueues(vo, sites, ces, ceTypes): + """Get the list of queues for the given community, site list, CE list, and CE type list. + + :param str vo: The Virtual Organization to use for credentials. + :param list sites: The list of sites to filter the queues. + :param list ces: The list of Computing Elements to filter the queues. + :param list ceTypes: The list of Computing Element types to filter the queues. :return: A dictionary containing the queues for the given parameters. """ + from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues + from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved + result = getQueues( community=vo, siteList=sites, @@ -110,12 +153,11 @@ def buildQueues(vo: str, sites: list[str], ces: list[str], ceTypes: list[str]) - return result["Value"] -def interactWithCE(ce: ComputingElement): - """ - Interact with a given Computing Element (CE). - :param ceName: The name of the CE. - :param port: The port of the CE. - :param vo: The Virtual Organization to use for credentials. +def interactWithCE(ce): + """Interact with a given Computing Element (CE). + + :param ce: The Computing Element instance. + :return: A dictionary with the result of each check. """ checks = { "ce_status": {"OK": False, "Message": ""}, @@ -136,7 +178,6 @@ def interactWithCE(ce: ComputingElement): # Submit a job to the CE gLogger.info(f"[{ce.ceName}]Submitting a job") - res = ce.submitJob("workloadExec.sh", None) if not res["OK"]: gLogger.error(f"[{ce.ceName}]Cannot submit job to CE: {res['Message']}") @@ -149,9 +190,13 @@ def interactWithCE(ce: ComputingElement): # Wait for the job to finish gLogger.info(f"[{ce.ceName}]Getting job status") - status = "Waiting" + deadline = time.monotonic() + JOB_STATUS_TIMEOUT while status != "Done" and status != "Failed": + if time.monotonic() > deadline: + gLogger.error(f"[{ce.ceName}]Timed out after {JOB_STATUS_TIMEOUT}s waiting for job {job_id}") + checks["job_status"]["Message"] = f"Timed out after {JOB_STATUS_TIMEOUT}s (last status: {status})" + return checks res = ce.getJobStatus([job_id]) if not res["OK"]: gLogger.error(f"[{ce.ceName}]Cannot get job status: {res['Message']}") @@ -164,7 +209,6 @@ def interactWithCE(ce: ComputingElement): # Get job output, error, and log gLogger.info(f"[{ce.ceName}]Getting job output and log") - res = ce.getJobOutput(job_id + ":::" + stamp) if not res["OK"]: gLogger.error(f"[{ce.ceName}]Cannot get job output: {res['Message']}") @@ -182,23 +226,34 @@ def interactWithCE(ce: ComputingElement): return checks -@app.command() -def main( - vo: str = typer.Argument(help="Select Virtual Organization"), - sites: list[str] = typer.Option([], "--site", help="Select sites (can be used multiple times)"), - ces: list[str] = typer.Option([], "--ce", help="Select CEs (can be used multiple times)"), - ce_types: list[str] = typer.Option([], "--ce-type", help="Select CE types (can be used multiple times)"), - log_level: str = typer.Option("INFO", "--log-level", help="Set the log level (DEBUG, VERBOSE, INFO)"), - script: str = typer.Option(None, "--script", help="Path to custom executable script (default: workloadExec.sh)"), -): - """Test the interactions with a given set of Computing Elements (CE).""" - Script.initialize() - - if log_level: - gLogger.setLevel(log_level.upper()) - # If you set a log level for a specific backend and want more details to debug - # then uncomment the next line - # gLogger._backendsList[0]._handler.setLevel(log_level.upper()) +@Script() +def main(): + global scriptPath + + Script.registerSwitch("", "site=", "Select site (can be used multiple times)", addSite) + Script.registerSwitch("", "ce=", "Select CE (can be used multiple times)", addCE) + Script.registerSwitch("", "ce-type=", "Select CE type (can be used multiple times)", addCEType) + Script.registerSwitch("", "script=", "Path to custom executable script (default: workloadExec.sh)", setScript) + Script.registerSwitch("", "timeout=", "Timeout in seconds for job status polling (default: 1800)", setTimeout) + Script.registerArgument("VO: Virtual Organization") + Script.parseCommandLine() + + from DIRAC.Core.Security.Properties import SecurityProperty + from DIRAC.Core.Security.ProxyInfo import getProxyInfo + + # Check credentials + result = getProxyInfo() + if not result["OK"]: + gLogger.error("Do you have a valid proxy?") + gLogger.error(result["Message"]) + DIRAC.exit(1) + proxyProps = result["Value"] + + if SecurityProperty.FULL_DELEGATION not in proxyProps.get("groupProperties", []): + gLogger.error("You need an admin proxy (with FullDelegation property) to run this script") + DIRAC.exit(1) + + vo = Script.getPositionalArgs()[0] # Get credentials for the given VO pilotDN, pilotGroup = findGenericCreds(vo) @@ -211,21 +266,19 @@ def main( vo=vo, sites=sites, ces=ces, - ceTypes=ce_types, + ceTypes=ceTypes, ) if not queueDict: gLogger.error("Cannot get queues") DIRAC.exit(1) - if script: - # Use the provided custom script - gLogger.info(f"Using custom script: {script}") - executable = Path(script) + if scriptPath: + gLogger.info(f"Using custom script: {scriptPath}") + executable = Path(scriptPath) if not executable.exists() or not executable.is_file(): - gLogger.error(f"Provided script {script} does not exist or is not a file") + gLogger.error(f"Provided script {scriptPath} does not exist or is not a file") DIRAC.exit(1) else: - # Create default workloadExec.sh gLogger.info("Creating default workloadExec.sh") executable = Path("workloadExec.sh") with open(executable, "w") as f: @@ -244,7 +297,11 @@ def process_queue(queueName): gLogger.info(f"Getting creds for CE: {ce.ceName} ({ce.ceType})") proxy, token = getCredentials(pilotDN, pilotGroup, ce) if not proxy or not token: - DIRAC.exit(1) + gLogger.error(f"[{ce.ceName}]Failed to get credentials, skipping") + return queueName, { + check: {"OK": False, "Message": "Failed to get credentials"} + for check in ("ce_status", "job_submit", "job_status", "job_output", "job_log") + } ce.setProxy(proxy) if "Token" in ce.ceParameters.get("Tag", []): ce.setToken(token) @@ -259,10 +316,10 @@ def process_queue(queueName): overallState[queueName] = state # Clean up the script file after submission (only if auto-generated) - if not script: + if not scriptPath: executable.unlink() - gLogger.info("Overall interaction state:") + gLogger.notice("Overall interaction state:") total = len(overallState) # human-friendly names for each check pretty = { @@ -277,13 +334,13 @@ def process_queue(queueName): okCount = sum(1 for queueState in overallState.values() if queueState[check]["OK"]) issueCount = total - okCount pct = int(okCount / total * 100) if total else 0 - typer.echo(f"- {pct}% of the queues correctly {pretty[check]}. " f"Issues with {issueCount} queue(s):") + gLogger.notice(f"- {pct}% of the queues correctly {pretty[check]}. Issues with {issueCount} queue(s):") for qname, qState in overallState.items(): if not qState[check]["OK"]: msg = qState[check]["Message"] or "unknown error" - typer.echo(f" - {qname}: {msg}") - typer.echo("") + gLogger.notice(f" - {qname}: {msg}") + gLogger.notice("") if __name__ == "__main__": - app() + main()