Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,25 +302,6 @@ def _submitPilotsPerQueue(self, queueName: str):
self.failedQueues[queueName] += 1
return S_OK(0)

# Adjust queueCPUTime: needed to generate the proxy
if "CPUTime" not in queueDictionary["ParametersDict"]:
self.log.error("CPU time limit is not specified, skipping", f"queue {queueName}")
return S_ERROR(f"CPU time limit is not specified, skipping queue {queueName}")

queueCPUTime = int(queueDictionary["ParametersDict"]["CPUTime"])
if queueCPUTime > self.maxQueueLength:
queueCPUTime = self.maxQueueLength

# Get CE instance
ce = self.queueDict[queueName]["CE"]

# Set credentials
cpuTime = queueCPUTime + 86400
result = self._setCredentials(ce, cpuTime)
if not result["OK"]:
self.log.error("Failed to set credentials:", result["Message"])
return result

# Get the number of available slots on the target site/queue
totalSlots, waitingPilots = self._getQueueSlots(queueName)
if totalSlots <= 0:
Expand All @@ -341,6 +322,7 @@ def _submitPilotsPerQueue(self, queueName: str):
)

# Now really submitting
ce = self.queueDict[queueName]["CE"]
result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName)
if not result["OK"]:
self.log.info("Failed pilot submission", f"Queue: {queueName}")
Expand Down Expand Up @@ -457,6 +439,12 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
jobProxy = result["Value"]
executable = self._getExecutable(queue, proxy=jobProxy, jobExecDir=jobExecDir, envVariables=envVariables)

# Add the credentials to the CE
result = self._setCredentials(ce, 3600)
if not result["OK"]:
self.log.error("Failed to set credentials:", result["Message"])
return result

# Submit the job
submitResult = ce.submitJob(executable, "", pilotsToSubmit)
# In case the CE does not need the executable after the submission, we delete it
Expand Down Expand Up @@ -901,20 +889,18 @@ def __supportToken(self, ce: ComputingElement) -> bool:
return "Token" in ce.ceParameters.get("Tag", []) or f"Token:{self.vo}" in ce.ceParameters.get("Tag", [])

def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: int):
"""
"""Add a proxy and a token to the ComputingElement.

:param ce: ComputingElement instance
:param proxyMinimumRequiredValidity: number of seconds needed to perform an operation with the proxy
:param tokenMinimumRequiredValidity: number of seconds needed to perform an operation with the token
"""
getNewProxy = False

# If the CE does not already embed a proxy, we need one
if not ce.proxy:
getNewProxy = True

# If the CE embeds a proxy that is too short to perform a given operation, we need a new one
if ce.proxy:
else:
# If the CE embeds a proxy that is too short to perform a given operation, we need a new one
result = ce.proxy.getRemainingSecs()
if not result["OK"]:
return result
Expand All @@ -924,11 +910,19 @@ def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: in

# Generate a new proxy if needed
if getNewProxy:
self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.vo} {proxyMinimumRequiredValidity} long")
proxyRequestedValidity = max(proxyMinimumRequiredValidity, 86400)
self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.vo} {proxyRequestedValidity} long")
pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup")
result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, proxyMinimumRequiredValidity)
result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, proxyRequestedValidity)
if not result["OK"]:
return result
result_validity = result["Value"].getRemainingSecs()
if not result_validity["OK"]:
return result_validity
if result_validity["Value"] < proxyRequestedValidity:
self.log.warn(
f"The validity of the generated proxy ({result_validity['Value']} seconds) is less than the requested {proxyRequestedValidity} seconds"
)
ce.setProxy(result["Value"])

# Get valid token if needed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
""" Test class for SiteDirector
"""
"""Test class for SiteDirector"""
# pylint: disable=protected-access

import datetime
Expand Down Expand Up @@ -145,6 +144,7 @@

mockPMProxy = MagicMock()
mockPMProxy.dumpAllToString.return_value = {"OK": True, "Value": "fakeProxy"}
mockPMProxy.getRemainingSecs.return_value = {"OK": True, "Value": 1000}
mockPMProxyReply = MagicMock()
mockPMProxyReply.return_value = {"OK": True, "Value": mockPMProxy}

Expand Down Expand Up @@ -183,6 +183,10 @@ def mock_getElementStatus(ceNamesList, *args, **kwargs):
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.downloadProxy", side_effect=mockPMProxyReply
)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.getPilotProxyFromDIRACGroup",
side_effect=mockPMProxyReply,
)
sd = SiteDirector()

# Set logger
Expand Down Expand Up @@ -288,7 +292,8 @@ def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory):
assert os.path.exists(res) and os.path.isfile(res)


def test__submitPilotsToQueue(sd):
@pytest.mark.parametrize("proxy_validity", [1, 1000, 900000])
def test__submitPilotsToQueue(sd, proxy_validity):
"""Testing SiteDirector()._submitPilotsToQueue()"""
# Create a MagicMock that does not have the workingDirectory
# attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes)
Expand All @@ -297,6 +302,7 @@ def test__submitPilotsToQueue(sd):
del ceMock.workingDirectory
proxyObject_mock = MagicMock()
proxyObject_mock.dumpAllToString.return_value = S_OK("aProxy")
proxyObject_mock.getRemainingSecs.return_value = S_OK(proxy_validity)
ceMock.proxy = proxyObject_mock

sd.queueCECache = {"ce1.site1.com_condor": {"CE": ceMock, "Hash": "3d0dd0c60fffa900c511d7442e9c7634"}}
Expand Down
Loading