diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 28dff17ed46..3ec19c137ab 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -302,25 +302,6 @@ def _submitPilotsPerQueue(self, queueName: str): self.failedQueues[queueName] += 1 return S_OK(0) - # Adjust queueCPUTime: needed to generate the proxy - if "CPUTime" not in queueDictionary["ParametersDict"]: - self.log.error("CPU time limit is not specified, skipping", f"queue {queueName}") - return S_ERROR(f"CPU time limit is not specified, skipping queue {queueName}") - - queueCPUTime = int(queueDictionary["ParametersDict"]["CPUTime"]) - if queueCPUTime > self.maxQueueLength: - queueCPUTime = self.maxQueueLength - - # Get CE instance - ce = self.queueDict[queueName]["CE"] - - # Set credentials - cpuTime = queueCPUTime + 86400 - result = self._setCredentials(ce, cpuTime) - if not result["OK"]: - self.log.error("Failed to set credentials:", result["Message"]) - return result - # Get the number of available slots on the target site/queue totalSlots, waitingPilots = self._getQueueSlots(queueName) if totalSlots <= 0: @@ -341,6 +322,7 @@ def _submitPilotsPerQueue(self, queueName: str): ) # Now really submitting + ce = self.queueDict[queueName]["CE"] result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName) if not result["OK"]: self.log.info("Failed pilot submission", f"Queue: {queueName}") @@ -457,6 +439,12 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: jobProxy = result["Value"] executable = self._getExecutable(queue, proxy=jobProxy, jobExecDir=jobExecDir, envVariables=envVariables) + # Add the credentials to the CE + result = self._setCredentials(ce, 3600) + if not result["OK"]: + self.log.error("Failed to set credentials:", result["Message"]) + return result + # Submit the job submitResult = ce.submitJob(executable, "", pilotsToSubmit) # In case the CE does not need the executable after the submission, we delete it @@ -901,20 +889,18 @@ def __supportToken(self, ce: ComputingElement) -> bool: return "Token" in ce.ceParameters.get("Tag", []) or f"Token:{self.vo}" in ce.ceParameters.get("Tag", []) def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: int): - """ + """Add a proxy and a token to the ComputingElement. :param ce: ComputingElement instance :param proxyMinimumRequiredValidity: number of seconds needed to perform an operation with the proxy - :param tokenMinimumRequiredValidity: number of seconds needed to perform an operation with the token """ getNewProxy = False # If the CE does not already embed a proxy, we need one if not ce.proxy: getNewProxy = True - - # If the CE embeds a proxy that is too short to perform a given operation, we need a new one - if ce.proxy: + else: + # If the CE embeds a proxy that is too short to perform a given operation, we need a new one result = ce.proxy.getRemainingSecs() if not result["OK"]: return result @@ -924,11 +910,19 @@ def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: in # Generate a new proxy if needed if getNewProxy: - self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.vo} {proxyMinimumRequiredValidity} long") + proxyRequestedValidity = max(proxyMinimumRequiredValidity, 86400) + self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.vo} {proxyRequestedValidity} long") pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup") - result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, proxyMinimumRequiredValidity) + result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, proxyRequestedValidity) if not result["OK"]: return result + result_validity = result["Value"].getRemainingSecs() + if not result_validity["OK"]: + return result_validity + if result_validity["Value"] < proxyRequestedValidity: + self.log.warn( + f"The validity of the generated proxy ({result_validity['Value']} seconds) is less than the requested {proxyRequestedValidity} seconds" + ) ce.setProxy(result["Value"]) # Get valid token if needed diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py index 8e2d7179b7e..6d720e2e446 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py @@ -1,5 +1,4 @@ -""" Test class for SiteDirector -""" +"""Test class for SiteDirector""" # pylint: disable=protected-access import datetime @@ -145,6 +144,7 @@ mockPMProxy = MagicMock() mockPMProxy.dumpAllToString.return_value = {"OK": True, "Value": "fakeProxy"} +mockPMProxy.getRemainingSecs.return_value = {"OK": True, "Value": 1000} mockPMProxyReply = MagicMock() mockPMProxyReply.return_value = {"OK": True, "Value": mockPMProxy} @@ -183,6 +183,10 @@ def mock_getElementStatus(ceNamesList, *args, **kwargs): mocker.patch( "DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.downloadProxy", side_effect=mockPMProxyReply ) + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.getPilotProxyFromDIRACGroup", + side_effect=mockPMProxyReply, + ) sd = SiteDirector() # Set logger @@ -288,7 +292,8 @@ def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory): assert os.path.exists(res) and os.path.isfile(res) -def test__submitPilotsToQueue(sd): +@pytest.mark.parametrize("proxy_validity", [1, 1000, 900000]) +def test__submitPilotsToQueue(sd, proxy_validity): """Testing SiteDirector()._submitPilotsToQueue()""" # Create a MagicMock that does not have the workingDirectory # attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes) @@ -297,6 +302,7 @@ def test__submitPilotsToQueue(sd): del ceMock.workingDirectory proxyObject_mock = MagicMock() proxyObject_mock.dumpAllToString.return_value = S_OK("aProxy") + proxyObject_mock.getRemainingSecs.return_value = S_OK(proxy_validity) ceMock.proxy = proxyObject_mock sd.queueCECache = {"ce1.site1.com_condor": {"CE": ceMock, "Hash": "3d0dd0c60fffa900c511d7442e9c7634"}}