Skip to content

Commit 9a2ab78

Browse files
committed
fix: add a short proxy only at submission time
1 parent 7b85a48 commit 9a2ab78

File tree

2 files changed

+24
-31
lines changed

2 files changed

+24
-31
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -302,24 +302,6 @@ def _submitPilotsPerQueue(self, queueName: str):
302302
self.failedQueues[queueName] += 1
303303
return S_OK(0)
304304

305-
# Adjust queueCPUTime: needed to generate the proxy
306-
if "CPUTime" not in queueDictionary["ParametersDict"]:
307-
self.log.error("CPU time limit is not specified, skipping", f"queue {queueName}")
308-
return S_ERROR(f"CPU time limit is not specified, skipping queue {queueName}")
309-
310-
queueCPUTime = int(queueDictionary["ParametersDict"]["CPUTime"])
311-
if queueCPUTime > self.maxQueueLength:
312-
queueCPUTime = self.maxQueueLength
313-
314-
# Get CE instance
315-
ce = self.queueDict[queueName]["CE"]
316-
317-
# Set credentials
318-
result = self._setCredentials(ce, queueCPUTime)
319-
if not result["OK"]:
320-
self.log.error("Failed to set credentials:", result["Message"])
321-
return result
322-
323305
# Get the number of available slots on the target site/queue
324306
totalSlots, waitingPilots = self._getQueueSlots(queueName)
325307
if totalSlots <= 0:
@@ -340,6 +322,7 @@ def _submitPilotsPerQueue(self, queueName: str):
340322
)
341323

342324
# Now really submitting
325+
ce = self.queueDict[queueName]["CE"]
343326
result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName)
344327
if not result["OK"]:
345328
self.log.info("Failed pilot submission", f"Queue: {queueName}")
@@ -456,6 +439,12 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
456439
jobProxy = result["Value"]
457440
executable = self._getExecutable(queue, proxy=jobProxy, jobExecDir=jobExecDir, envVariables=envVariables)
458441

442+
# Add the credentials to the CE
443+
result = self._setCredentials(ce, 3600)
444+
if not result["OK"]:
445+
self.log.error("Failed to set credentials:", result["Message"])
446+
return result
447+
459448
# Submit the job
460449
submitResult = ce.submitJob(executable, "", pilotsToSubmit)
461450
# In case the CE does not need the executable after the submission, we delete it
@@ -900,20 +889,18 @@ def __supportToken(self, ce: ComputingElement) -> bool:
900889
return "Token" in ce.ceParameters.get("Tag", []) or f"Token:{self.vo}" in ce.ceParameters.get("Tag", [])
901890

902891
def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: int):
903-
"""
892+
"""Add a proxy and a token to the ComputingElement.
904893
905894
:param ce: ComputingElement instance
906895
:param proxyMinimumRequiredValidity: number of seconds needed to perform an operation with the proxy
907-
:param tokenMinimumRequiredValidity: number of seconds needed to perform an operation with the token
908896
"""
909897
getNewProxy = False
910898

911899
# If the CE does not already embed a proxy, we need one
912900
if not ce.proxy:
913901
getNewProxy = True
914-
915-
# If the CE embeds a proxy that is too short to perform a given operation, we need a new one
916-
if ce.proxy:
902+
else:
903+
# If the CE embeds a proxy that is too short to perform a given operation, we need a new one
917904
result = ce.proxy.getRemainingSecs()
918905
if not result["OK"]:
919906
return result
@@ -923,18 +910,18 @@ def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: in
923910

924911
# Generate a new proxy if needed
925912
if getNewProxy:
926-
proxyMinimumRequiredValidity = proxyMinimumRequiredValidity + 86400
927-
self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.vo} {proxyMinimumRequiredValidity} long")
913+
proxyRequestedValidity = max(proxyMinimumRequiredValidity, 86400)
914+
self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.vo} {proxyRequestedValidity} long")
928915
pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup")
929-
result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, proxyMinimumRequiredValidity)
916+
result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, proxyRequestedValidity)
930917
if not result["OK"]:
931918
return result
932919
result_validity = result["Value"].getRemainingSecs()
933920
if not result_validity["OK"]:
934921
return result_validity
935-
if result_validity["Value"] < proxyMinimumRequiredValidity:
922+
if result_validity["Value"] < proxyRequestedValidity:
936923
self.log.warn(
937-
f"The validity of the generated proxy ({result_validity['Value']} seconds) is less than the requested {proxyMinimumRequiredValidity} seconds"
924+
f"The validity of the generated proxy ({result_validity['Value']} seconds) is less than the requested {proxyRequestedValidity} seconds"
938925
)
939926
ce.setProxy(result["Value"])
940927

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
""" Test class for SiteDirector
2-
"""
1+
"""Test class for SiteDirector"""
32
# pylint: disable=protected-access
43

54
import datetime
@@ -145,6 +144,7 @@
145144

146145
mockPMProxy = MagicMock()
147146
mockPMProxy.dumpAllToString.return_value = {"OK": True, "Value": "fakeProxy"}
147+
mockPMProxy.getRemainingSecs.return_value = {"OK": True, "Value": 1000}
148148
mockPMProxyReply = MagicMock()
149149
mockPMProxyReply.return_value = {"OK": True, "Value": mockPMProxy}
150150

@@ -183,6 +183,10 @@ def mock_getElementStatus(ceNamesList, *args, **kwargs):
183183
mocker.patch(
184184
"DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.downloadProxy", side_effect=mockPMProxyReply
185185
)
186+
mocker.patch(
187+
"DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.getPilotProxyFromDIRACGroup",
188+
side_effect=mockPMProxyReply,
189+
)
186190
sd = SiteDirector()
187191

188192
# Set logger
@@ -288,7 +292,8 @@ def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory):
288292
assert os.path.exists(res) and os.path.isfile(res)
289293

290294

291-
def test__submitPilotsToQueue(sd):
295+
@pytest.mark.parametrize("proxy_validity", [1, 1000, 900000])
296+
def test__submitPilotsToQueue(sd, proxy_validity):
292297
"""Testing SiteDirector()._submitPilotsToQueue()"""
293298
# Create a MagicMock that does not have the workingDirectory
294299
# attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes)
@@ -297,6 +302,7 @@ def test__submitPilotsToQueue(sd):
297302
del ceMock.workingDirectory
298303
proxyObject_mock = MagicMock()
299304
proxyObject_mock.dumpAllToString.return_value = S_OK("aProxy")
305+
proxyObject_mock.getRemainingSecs.return_value = S_OK(proxy_validity)
300306
ceMock.proxy = proxyObject_mock
301307

302308
sd.queueCECache = {"ce1.site1.com_condor": {"CE": ceMock, "Hash": "3d0dd0c60fffa900c511d7442e9c7634"}}

0 commit comments

Comments
 (0)