|
8 | 8 |
|
9 | 9 | from collections import OrderedDict |
10 | 10 | from datetime import datetime |
11 | | -import fcntl |
12 | 11 | import io |
13 | 12 | import logging |
14 | 13 | import os |
15 | 14 | import re |
16 | 15 | import sys |
17 | | -import time |
18 | 16 | import uuid |
| 17 | +import portalocker |
| 18 | +import timeout_decorator |
19 | 19 | import yaml.resolver |
20 | 20 | import yaml.scanner |
21 | 21 | import glob |
@@ -213,46 +213,30 @@ def _acquire_flock_open( |
213 | 213 | :param exclusive: config exclusive lock (True) or shared lock (False), defaults to True |
214 | 214 | :return: opened file object if successful else None |
215 | 215 | """ |
| 216 | + |
| 217 | + @timeout_decorator.timeout( |
| 218 | + seconds=timeout_seconds, timeout_exception=FailureToAcquireLockException |
| 219 | + ) |
| 220 | + def get_lock(file_obj: io.IOBase): |
| 221 | + flags = ( |
| 222 | + portalocker.LockFlags.EXCLUSIVE |
| 223 | + if exclusive |
| 224 | + else portalocker.LockFlags.SHARED |
| 225 | + ) |
| 226 | + portalocker.lock( |
| 227 | + file_obj, |
| 228 | + flags, |
| 229 | + ) |
| 230 | + return file_obj |
| 231 | + |
216 | 232 | # pylint: disable=unspecified-encoding,consider-using-with |
217 | 233 | file_obj = open(lock_file, mode) |
218 | | - pid = os.getpid() |
219 | 234 | lock_file_obj = None |
220 | | - retry_sleep_seconds = 0.5 |
221 | | - |
222 | | - start_time = current_time = time.time() |
223 | | - duration_seconds = current_time - start_time |
224 | | - |
225 | | - while duration_seconds < timeout_seconds: |
226 | | - try: |
227 | | - # The LOCK_NB means non-blocking |
228 | | - # More information here: |
229 | | - # https://docs.python.org/3/library/fcntl.html#fcntl.flock |
230 | | - if exclusive: |
231 | | - # Obtain an exclusive lock, blocks shared (read) locks |
232 | | - fcntl.flock(file_obj, fcntl.LOCK_EX | fcntl.LOCK_NB) |
233 | | - else: |
234 | | - # Allow multiple people to read from the file at the same time |
235 | | - # If another instance adds an exclusive lock in the save method below, |
236 | | - # this will block until the lock is released |
237 | | - fcntl.flock(file_obj, fcntl.LOCK_SH | fcntl.LOCK_NB) |
238 | | - except (IOError, OSError): |
239 | | - # Limit the amount of logs written while waiting for the file lock |
240 | | - num_seconds_between_log_writes = 10 |
241 | | - if ( |
242 | | - 0 |
243 | | - <= (duration_seconds % num_seconds_between_log_writes) |
244 | | - <= retry_sleep_seconds |
245 | | - ): |
246 | | - logger.info( |
247 | | - f"PID:{pid} waiting for file lock on {lock_file} after {duration_seconds} seconds" |
248 | | - ) |
249 | | - else: |
250 | | - lock_file_obj = file_obj |
251 | | - break |
252 | | - time.sleep(retry_sleep_seconds) |
253 | | - duration_seconds = time.time() - start_time |
| 235 | + pid = os.getpid() |
254 | 236 |
|
255 | | - if lock_file_obj is None: |
| 237 | + try: |
| 238 | + lock_file_obj = get_lock(file_obj) |
| 239 | + except FailureToAcquireLockException: |
256 | 240 | file_obj.close() |
257 | 241 | raise FailureToAcquireLockException( |
258 | 242 | f"PID:{pid} failed to acquire file lock for {lock_file} after timeout of {timeout_seconds} seconds" |
@@ -322,6 +306,6 @@ def release_flock( |
322 | 306 | """ |
323 | 307 | if lock_file_obj is None: |
324 | 308 | return |
325 | | - fcntl.flock(lock_file_obj, fcntl.LOCK_UN) |
| 309 | + portalocker.unlock(lock_file_obj) |
326 | 310 | lock_file_obj.close() |
327 | 311 | logger.write(f"PID:{os.getpid()} released and closed file {lock_file_obj}") |
0 commit comments