|
62 | 62 | except ImportError: |
63 | 63 | # Python 3's subprocess module contains a compatibility layer |
64 | 64 | import subprocess as commands |
| 65 | +import datetime |
65 | 66 | import errno |
| 67 | +import threading |
66 | 68 |
|
67 | 69 | from DIRAC import S_OK, S_ERROR, gConfig |
68 | 70 | from DIRAC.Resources.Computing.ComputingElement import ComputingElement |
@@ -164,6 +166,10 @@ class HTCondorCEComputingElement(ComputingElement): |
164 | 166 | implementing the functions jobSubmit, getJobOutput |
165 | 167 | """ |
166 | 168 |
|
| 169 | + # static variables to ensure single cleanup every minute |
| 170 | + _lastCleanupTime = datetime.datetime.utcnow() |
| 171 | + _cleanupLock = threading.Lock() |
| 172 | + |
167 | 173 | ############################################################################# |
168 | 174 | def __init__(self, ceUniqueID): |
169 | 175 | """ Standard constructor. |
@@ -529,21 +535,33 @@ def __cleanup(self): |
529 | 535 | # FIXME: again some issue with the working directory... |
530 | 536 | # workingDirectory = self.ceParameters.get( 'WorkingDirectory', DEFAULT_WORKINGDIRECTORY ) |
531 | 537 |
|
| 538 | + if not HTCondorCEComputingElement._cleanupLock.acquire(False): |
| 539 | + return |
| 540 | + |
| 541 | + now = datetime.datetime.utcnow() |
| 542 | + if (now - HTCondorCEComputingElement._lastCleanupTime).total_seconds() < 60: |
| 543 | + HTCondorCEComputingElement._cleanupLock.release() |
| 544 | + return |
| 545 | + |
| 546 | + HTCondorCEComputingElement._lastCleanupTime = now |
| 547 | + |
532 | 548 | self.log.debug("Cleaning working directory: %s" % self.workingDirectory) |
533 | 549 |
|
534 | 550 | # remove all files older than 120 minutes starting with DIRAC_ Condor will |
535 | 551 | # push files on submission, but it takes at least a few seconds until this |
536 | 552 | # happens so we can't directly unlink after condor_submit |
537 | | - status, stdout = commands.getstatusoutput('find %s -mmin +120 -name "DIRAC_*" -delete ' % self.workingDirectory) |
| 553 | + status, stdout = commands.getstatusoutput('find -O3 %s -maxdepth 1 -mmin +120 -name "DIRAC_*" -delete ' % |
| 554 | + self.workingDirectory) |
538 | 555 | if status: |
539 | 556 | self.log.error("Failure during HTCondorCE __cleanup", stdout) |
540 | 557 |
|
541 | | - # remove all out/err/log files older than "DaysToKeepLogs" days in the CE part of the working Directory |
542 | | - workDir = os.path.join(self.workingDirectory, self.ceName) |
543 | | - findPars = dict(workDir=workDir, days=self.daysToKeepLogs) |
| 558 | + # remove all out/err/log files older than "DaysToKeepLogs" days in the working directory |
| 559 | + # not running this for each CE so we do global cleanup |
| 560 | + findPars = dict(workDir=self.workingDirectory, days=self.daysToKeepLogs) |
544 | 561 | # remove all out/err/log files older than "DaysToKeepLogs" days |
545 | 562 | status, stdout = commands.getstatusoutput( |
546 | 563 | r'find %(workDir)s -mtime +%(days)s -type f \( -name "*.out" -o -name "*.err" -o -name "*.log" \) -delete ' % |
547 | 564 | findPars) |
548 | 565 | if status: |
549 | 566 | self.log.error("Failure during HTCondorCE __cleanup", stdout) |
| 567 | + self._cleanupLock.release() |
0 commit comments