Skip to content

Commit 8105b54

Browse files
authored
Merge pull request DIRACGrid#8481 from fstagni/SD_proxy_acc
SiteDirector._setCredentials should renew the proxy only if below the queue CPUTime
2 parents dda0c0b + 9a2ab78 commit 8105b54

File tree

2 files changed

+29
-29
lines changed

2 files changed

+29
-29
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -302,25 +302,6 @@ def _submitPilotsPerQueue(self, queueName: str):
302302
self.failedQueues[queueName] += 1
303303
return S_OK(0)
304304

305-
# Adjust queueCPUTime: needed to generate the proxy
306-
if "CPUTime" not in queueDictionary["ParametersDict"]:
307-
self.log.error("CPU time limit is not specified, skipping", f"queue {queueName}")
308-
return S_ERROR(f"CPU time limit is not specified, skipping queue {queueName}")
309-
310-
queueCPUTime = int(queueDictionary["ParametersDict"]["CPUTime"])
311-
if queueCPUTime > self.maxQueueLength:
312-
queueCPUTime = self.maxQueueLength
313-
314-
# Get CE instance
315-
ce = self.queueDict[queueName]["CE"]
316-
317-
# Set credentials
318-
cpuTime = queueCPUTime + 86400
319-
result = self._setCredentials(ce, cpuTime)
320-
if not result["OK"]:
321-
self.log.error("Failed to set credentials:", result["Message"])
322-
return result
323-
324305
# Get the number of available slots on the target site/queue
325306
totalSlots, waitingPilots = self._getQueueSlots(queueName)
326307
if totalSlots <= 0:
@@ -341,6 +322,7 @@ def _submitPilotsPerQueue(self, queueName: str):
341322
)
342323

343324
# Now really submitting
325+
ce = self.queueDict[queueName]["CE"]
344326
result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName)
345327
if not result["OK"]:
346328
self.log.info("Failed pilot submission", f"Queue: {queueName}")
@@ -457,6 +439,12 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
457439
jobProxy = result["Value"]
458440
executable = self._getExecutable(queue, proxy=jobProxy, jobExecDir=jobExecDir, envVariables=envVariables)
459441

442+
# Add the credentials to the CE
443+
result = self._setCredentials(ce, 3600)
444+
if not result["OK"]:
445+
self.log.error("Failed to set credentials:", result["Message"])
446+
return result
447+
460448
# Submit the job
461449
submitResult = ce.submitJob(executable, "", pilotsToSubmit)
462450
# In case the CE does not need the executable after the submission, we delete it
@@ -901,20 +889,18 @@ def __supportToken(self, ce: ComputingElement) -> bool:
901889
return "Token" in ce.ceParameters.get("Tag", []) or f"Token:{self.vo}" in ce.ceParameters.get("Tag", [])
902890

903891
def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: int):
904-
"""
892+
"""Add a proxy and a token to the ComputingElement.
905893
906894
:param ce: ComputingElement instance
907895
:param proxyMinimumRequiredValidity: number of seconds needed to perform an operation with the proxy
908-
:param tokenMinimumRequiredValidity: number of seconds needed to perform an operation with the token
909896
"""
910897
getNewProxy = False
911898

912899
# If the CE does not already embed a proxy, we need one
913900
if not ce.proxy:
914901
getNewProxy = True
915-
916-
# If the CE embeds a proxy that is too short to perform a given operation, we need a new one
917-
if ce.proxy:
902+
else:
903+
# If the CE embeds a proxy that is too short to perform a given operation, we need a new one
918904
result = ce.proxy.getRemainingSecs()
919905
if not result["OK"]:
920906
return result
@@ -924,11 +910,19 @@ def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: in
924910

925911
# Generate a new proxy if needed
926912
if getNewProxy:
927-
self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.vo} {proxyMinimumRequiredValidity} long")
913+
proxyRequestedValidity = max(proxyMinimumRequiredValidity, 86400)
914+
self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.vo} {proxyRequestedValidity} long")
928915
pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup")
929-
result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, proxyMinimumRequiredValidity)
916+
result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, proxyRequestedValidity)
930917
if not result["OK"]:
931918
return result
919+
result_validity = result["Value"].getRemainingSecs()
920+
if not result_validity["OK"]:
921+
return result_validity
922+
if result_validity["Value"] < proxyRequestedValidity:
923+
self.log.warn(
924+
f"The validity of the generated proxy ({result_validity['Value']} seconds) is less than the requested {proxyRequestedValidity} seconds"
925+
)
932926
ce.setProxy(result["Value"])
933927

934928
# Get valid token if needed

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
""" Test class for SiteDirector
2-
"""
1+
"""Test class for SiteDirector"""
32
# pylint: disable=protected-access
43

54
import datetime
@@ -145,6 +144,7 @@
145144

146145
mockPMProxy = MagicMock()
147146
mockPMProxy.dumpAllToString.return_value = {"OK": True, "Value": "fakeProxy"}
147+
mockPMProxy.getRemainingSecs.return_value = {"OK": True, "Value": 1000}
148148
mockPMProxyReply = MagicMock()
149149
mockPMProxyReply.return_value = {"OK": True, "Value": mockPMProxy}
150150

@@ -183,6 +183,10 @@ def mock_getElementStatus(ceNamesList, *args, **kwargs):
183183
mocker.patch(
184184
"DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.downloadProxy", side_effect=mockPMProxyReply
185185
)
186+
mocker.patch(
187+
"DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.getPilotProxyFromDIRACGroup",
188+
side_effect=mockPMProxyReply,
189+
)
186190
sd = SiteDirector()
187191

188192
# Set logger
@@ -288,7 +292,8 @@ def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory):
288292
assert os.path.exists(res) and os.path.isfile(res)
289293

290294

291-
def test__submitPilotsToQueue(sd):
295+
@pytest.mark.parametrize("proxy_validity", [1, 1000, 900000])
296+
def test__submitPilotsToQueue(sd, proxy_validity):
292297
"""Testing SiteDirector()._submitPilotsToQueue()"""
293298
# Create a MagicMock that does not have the workingDirectory
294299
# attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes)
@@ -297,6 +302,7 @@ def test__submitPilotsToQueue(sd):
297302
del ceMock.workingDirectory
298303
proxyObject_mock = MagicMock()
299304
proxyObject_mock.dumpAllToString.return_value = S_OK("aProxy")
305+
proxyObject_mock.getRemainingSecs.return_value = S_OK(proxy_validity)
300306
ceMock.proxy = proxyObject_mock
301307

302308
sd.queueCECache = {"ce1.site1.com_condor": {"CE": ceMock, "Hash": "3d0dd0c60fffa900c511d7442e9c7634"}}

0 commit comments

Comments
 (0)