Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 61 additions & 31 deletions aexpect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@
self.encoding = encoding
self.reader_fds = {}
base_dir = os.path.join(BASE_DIR, f"aexpect_{self.a_id}")
self._close_lockfile = os.path.join(
BASE_DIR, f"aexpect_{self.a_id}.lock"
)

# Define filenames for communication with server
utils_path.init_dir(base_dir)
Expand Down Expand Up @@ -263,8 +266,8 @@
# Wait for the server to complete its initialization
full_output = ""
pattern = f"Server {self.a_id} ready"
end_time = time.time() + 60
while time.time() < end_time:
end_time = time.monotonic() + 60
while time.monotonic() < end_time:
output = sub.stdout.readline().decode(self.encoding, "ignore")
if pattern in output:
break
Expand Down Expand Up @@ -418,7 +421,7 @@
return utils_process.process_in_ptree_is_defunct(self.get_pid())

def kill(self, sig=signal.SIGKILL):
"""
Kill the child process if alive
"""
# Kill it if it's alive
Expand All @@ -431,21 +434,48 @@

:param sig: The signal to send the process when attempting to kill it.
"""
if not self.closed:
self.kill(sig=sig)
# Wait for the server to exit
wait_for_lock(self.lock_server_running_filename)
# Call all cleanup routines
for hook in self.close_hooks:
hook(self)
# Close reader file descriptors
self._close_reader_fds()
self.reader_fds = {}
# Remove all used files
if "AEXPECT_DEBUG" not in os.environ:
shutil.rmtree(os.path.join(BASE_DIR, f"aexpect_{self.a_id}"))
self._close_aexpect_helper()
self.closed = True
if self.closed:
return
lock = None
try:
try:
lock = get_lock_fd(self._close_lockfile, timeout=60)

Check warning

Code scanning / CodeQL

File is not always closed Warning

File is opened but is not closed.
except FileNotFoundError:
if not self.closed:
raise
if not self.closed:
self.kill(sig=sig)
# Wait for the server to exit
if not wait_for_lock(
self.lock_server_running_filename, timeout=60
):
LOG.warning(
"Failed to get lock, the aexpect_helper "
"process might be left behind. Proceeding "
"anyway..."
)
# Call all cleanup routines
for hook in self.close_hooks:
hook(self)
# Close reader file descriptors
self._close_reader_fds()
self.reader_fds = {}
# Remove all used files
if "AEXPECT_DEBUG" not in os.environ:
shutil.rmtree(
os.path.join(BASE_DIR, f"aexpect_{self.a_id}"),
ignore_errors=True,
)
self._close_aexpect_helper()
self.closed = True
finally:
if lock is not None:
try:
unlock_fd(lock)
os.unlink(self._close_lockfile)
except FileNotFoundError:
# File already removed by other thread
pass

def set_linesep(self, linesep):
"""
Expand Down Expand Up @@ -866,7 +896,7 @@
internal_timeout *= 1000
end_time = None
if timeout:
end_time = time.time() + timeout
end_time = time.monotonic() + timeout
expect_pipe = self._get_fd("expect")
poller = select.poll()
poller.register(expect_pipe, select.POLLIN)
Expand All @@ -885,7 +915,7 @@
data += raw_data.decode(self.encoding, "ignore")
else:
return read, data
if end_time and time.time() > end_time:
if end_time and time.monotonic() > end_time:
return read, data

def read_nonblocking(self, internal_timeout=None, timeout=None):
Expand Down Expand Up @@ -979,10 +1009,10 @@
poller = select.poll()
poller.register(expect_pipe, select.POLLIN)
output = ""
end_time = time.time() + timeout
end_time = time.monotonic() + timeout
while True:
try:
max_ms = int((end_time - time.time()) * 1000)
max_ms = int((end_time - time.monotonic()) * 1000)
poll_timeout_ms = max(0, max_ms)
poll_status = poller.poll(poll_timeout_ms)
except select.error:
Expand All @@ -991,7 +1021,7 @@
raise ExpectTimeoutError(patterns, output)
# Read data from child
read, data = self._read_nonblocking(
internal_timeout, end_time - time.time()
internal_timeout, end_time - time.monotonic()
)
if not read:
break
Expand Down Expand Up @@ -1261,10 +1291,10 @@
# Send a newline
self.sendline()
# Wait up to timeout seconds for some output from the child
end_time = time.time() + timeout
while time.time() < end_time:
end_time = time.monotonic() + timeout
while time.monotonic() < end_time:
time.sleep(0.5)
if self.read_nonblocking(0, end_time - time.time()).strip():
if self.read_nonblocking(0, end_time - time.monotonic()).strip():
return True
# No output -- report unresponsive
return False
Expand Down Expand Up @@ -1384,8 +1414,8 @@
self.sendline(cmd)
out = ""
success = False
start_time = time.time()
while (time.time() - start_time) < timeout:
start_time = time.monotonic()
while (time.monotonic() - start_time) < timeout:
try:
out += self.read_up_to_prompt(0.5)
success = True
Expand Down Expand Up @@ -1721,8 +1751,8 @@
encoding=encoding,
)

end_time = time.time() + timeout
while time.time() < end_time and bg_process.is_alive():
end_time = time.monotonic() + timeout
while time.monotonic() < end_time and bg_process.is_alive():
time.sleep(0.1)

return bg_process
Expand Down Expand Up @@ -1774,8 +1804,8 @@
encoding=encoding,
)

end_time = time.time() + timeout
while time.time() < end_time and bg_process.is_alive():
end_time = time.monotonic() + timeout
while time.monotonic() < end_time and bg_process.is_alive():
time.sleep(0.1)

return bg_process
Expand Down
8 changes: 4 additions & 4 deletions aexpect/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,9 @@ def wait_for_login(
client,
timeout,
)
end_time = time.time() + timeout
end_time = time.monotonic() + timeout
verbose = False
while time.time() < end_time:
while time.monotonic() < end_time:
try:
return remote_login(
client,
Expand Down Expand Up @@ -1375,9 +1375,9 @@ def transfer(*args, **kwargs):
msg = f"Copy file from {args[0]}:{args[5]} to {args[6]}, "
else:
msg = f"Copy file from {args[5]} to {args[0]}:{args[6]}, "
start_time = time.time()
start_time = time.monotonic()
ret = func(*args, **kwargs)
elapsed_time = time.time() - start_time
elapsed_time = time.monotonic() - start_time
if kwargs.get("filesize", None) is not None:
throughput = kwargs["filesize"] / elapsed_time
msg += f"estimated throughput: {throughput:.2f} MB/s"
Expand Down
28 changes: 14 additions & 14 deletions aexpect/rss_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def __init__(self, address, port, log_func=None, timeout=20):
) from timeout_error
self._send(struct.pack("=i", CHUNKSIZE))
self._log_func = log_func
self._last_time = time.time()
self._last_time = time.monotonic()
self._last_transferred = 0
self.transferred = 0

Expand Down Expand Up @@ -173,10 +173,10 @@ def _send(self, data, timeout=60):

def _receive(self, size, timeout=60):
strs = []
end_time = time.time() + timeout
end_time = time.monotonic() + timeout
try:
while size > 0:
timeout = end_time - time.time()
timeout = end_time - time.monotonic()
if timeout <= 0:
raise socket.timeout
self._socket.settimeout(timeout)
Expand All @@ -202,15 +202,15 @@ def _receive(self, size, timeout=60):

def _report_stats(self, data):
if self._log_func:
delta = time.time() - self._last_time
delta = time.monotonic() - self._last_time
if delta >= 1:
transferred = self.transferred / 1048576.0
speed = (self.transferred - self._last_transferred) / delta
speed /= 1048576.0
self._log_func(
f"{data} {transferred:.3f} MB ({speed:.3f}" " MB/sec)"
)
self._last_time = time.time()
self._last_time = time.monotonic()
self._last_transferred = self.transferred

def _send_packet(self, data, timeout=60):
Expand All @@ -232,10 +232,10 @@ def _send_file_chunks(self, filename, timeout=60):
self._log_func(f"Sending file {filename}")
with open(filename, "rb") as file_handle:
try:
end_time = time.time() + timeout
end_time = time.monotonic() + timeout
while True:
data = file_handle.read(CHUNKSIZE)
self._send_packet(data, int(end_time - time.time()))
self._send_packet(data, int(end_time - time.monotonic()))
if len(data) < CHUNKSIZE:
break
except FileTransferError as error:
Expand All @@ -247,9 +247,9 @@ def _receive_file_chunks(self, filename, timeout=60):
self._log_func(f"Receiving file {filename}")
with open(filename, "wb") as file_handle:
try:
end_time = time.time() + timeout
end_time = time.monotonic() + timeout
while True:
data = self._receive_packet(int(end_time - time.time()))
data = self._receive_packet(int(end_time - time.monotonic()))
file_handle.write(data)
if len(data) < CHUNKSIZE:
break
Expand Down Expand Up @@ -306,7 +306,7 @@ def _upload_file(self, path, end_time):
if os.path.isfile(path):
self._send_msg(RSS_CREATE_FILE)
self._send_packet(os.path.basename(path).encode())
self._send_file_chunks(path, end_time - time.time())
self._send_file_chunks(path, end_time - time.monotonic())
elif os.path.isdir(path):
self._send_msg(RSS_CREATE_DIR)
self._send_packet(os.path.basename(path).encode())
Expand Down Expand Up @@ -349,7 +349,7 @@ def upload(self, src_pattern, dst_path, timeout=600):
message to the client
:note: Other exceptions can be raised.
"""
end_time = time.time() + timeout
end_time = time.monotonic() + timeout
try:
try:
self._send_msg(RSS_SET_PATH)
Expand All @@ -371,7 +371,7 @@ def upload(self, src_pattern, dst_path, timeout=600):
"or directories"
)
# Look for RSS_OK or RSS_ERROR
msg = self._receive_msg(int(end_time - time.time()))
msg = self._receive_msg(int(end_time - time.monotonic()))
if msg == RSS_OK:
return
if msg == RSS_ERROR:
Expand Down Expand Up @@ -446,7 +446,7 @@ def download(self, src_pattern, dst_path, timeout=600):
:note: Other exceptions can be raised.
"""
dst_path = os.path.abspath(dst_path)
end_time = time.time() + timeout
end_time = time.monotonic() + timeout
file_count = 0
dir_count = 0
try:
Expand All @@ -463,7 +463,7 @@ def download(self, src_pattern, dst_path, timeout=600):
if os.path.isdir(dst_path):
dst_path = os.path.join(dst_path, filename)
self._receive_file_chunks(
dst_path, int(end_time - time.time())
dst_path, int(end_time - time.monotonic())
)
dst_path = os.path.dirname(dst_path)
file_count += 1
Expand Down
37 changes: 29 additions & 8 deletions aexpect/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,36 @@
import os
import fcntl
import termios
import time

BASE_DIR = os.environ.get("TMPDIR", "/tmp")


def get_lock_fd(filename):
def get_lock_fd(filename, timeout=-1):
"""Lock a file"""
if not os.path.exists(filename):
with open(filename, "w", encoding="utf-8"):
pass

lock_fd = os.open(filename, os.O_RDWR)
fcntl.lockf(lock_fd, fcntl.LOCK_EX)
lock_flags = fcntl.LOCK_EX
if timeout > 0:
lock_flags |= fcntl.LOCK_NB
end_time = time.monotonic() + timeout if timeout > 0 else -1
while True:
try:
fcntl.flock(lock_fd, lock_flags)
break
except IOError:
if time.monotonic() > end_time:
os.close(lock_fd)
raise
return lock_fd


def unlock_fd(lock_fd):
"""Unlock a file"""
fcntl.lockf(lock_fd, fcntl.LOCK_UN)
fcntl.flock(lock_fd, fcntl.LOCK_UN)
os.close(lock_fd)


Expand All @@ -41,19 +54,27 @@
except OSError:
return False
try:
fcntl.lockf(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
os.close(lock_fd)
return True
fcntl.lockf(lock_fd, fcntl.LOCK_UN)
fcntl.flock(lock_fd, fcntl.LOCK_UN)
os.close(lock_fd)
return False


def wait_for_lock(filename):
"""Wait until lock can be acquired, then release it"""
lock_fd = get_lock_fd(filename)
def wait_for_lock(filename, timeout=-1):
"""
Wait until lock can be acquired, then release it

:return: True on success, False on failure/timeout
"""
try:
lock_fd = get_lock_fd(filename, timeout)

Check warning

Code scanning / CodeQL

File is not always closed Warning

File is opened but is not closed.
except (IOError, FileNotFoundError):
return False
unlock_fd(lock_fd)
return True


def makeraw(shell_fd):
Expand Down
8 changes: 4 additions & 4 deletions aexpect/utils/wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ def wait_for(func, timeout, first=0.0, step=1.0, text=None):
:param step: Time to sleep between attempts in seconds
:param text: Text to print while waiting, for debug purposes
"""
start_time = time.time()
end_time = time.time() + timeout
start_time = time.monotonic()
end_time = time.monotonic() + timeout

time.sleep(first)

while time.time() < end_time:
while time.monotonic() < end_time:
if text:
_LOG.debug("%s (%f secs)", text, (time.time() - start_time))
_LOG.debug("%s (%f secs)", text, (time.monotonic() - start_time))

output = func()
if output:
Expand Down
Loading