Skip to content

Commit 9926879

Browse files
authored
feat: adds optional timeout for client connections (TCPServer) (#190)
* feat: adds optional timeout for client connections (TCPServer) * feat: add support for handling client timeouts in TCP server
1 parent 2c50d58 commit 9926879

File tree

4 files changed

+56
-27
lines changed

4 files changed

+56
-27
lines changed

CHANGELOG.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
====================
22
pyais CHANGELOG
33
====================
4+
-------------------------------------------------------------------------------
5+
Version 2.13.1 21 Aug 2025
6+
-------------------------------------------------------------------------------
7+
* add support for handling client timeouts in TCP server
48
-------------------------------------------------------------------------------
59
Version 2.13.0 11 Aug 2025
610
-------------------------------------------------------------------------------

pyais/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from pyais.tracker import AISTracker, AISTrack
66

77
__license__ = 'MIT'
8-
__version__ = '2.13.0'
8+
__version__ = '2.13.1'
99
__author__ = 'Leon Morten Richter'
1010

1111
__all__ = (

pyais/stream.py

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import selectors
2+
import time
23
import typing
34
import pathlib
45
import queue
@@ -422,14 +423,25 @@ class TCPServer(SocketStream):
422423
new connections, processing client data with proper message boundary detection,
423424
and cleaning up closed connections, making it suitable for applications requiring
424425
concurrent TCP communication with multiple clients.
426+
427+
Args:
428+
host: The hostname or IP address to bind to.
429+
port: The port number to bind to. Defaults to 80.
430+
preprocessor: Optional preprocessor for handling incoming data.
431+
If None, no preprocessing will be applied.
432+
tbq: Optional tag block queue for managing tagged data blocks.
433+
If None, no tag block queuing will be used.
434+
timeout: Connection timeout in seconds. Use -1 for no timeout.
435+
Defaults to -1.
425436
"""
426437

427438
def __init__(
428439
self,
429440
host: str,
430441
port: int = 80,
431442
preprocessor: typing.Optional[PreprocessorProtocol] = None,
432-
tbq: typing.Optional[TagBlockQueue] = None
443+
tbq: typing.Optional[TagBlockQueue] = None,
444+
timeout: float = -1
433445
) -> None:
434446
# Create a socket and bind it
435447
server_sock = socket(AF_INET, SOCK_STREAM)
@@ -447,6 +459,8 @@ def __init__(
447459

448460
# Keep track of client sockets to clean them up later
449461
self._client_sockets: typing.Set[socket] = set()
462+
self._client_last_activity: typing.Dict[socket, float] = {}
463+
self.timeout = timeout
450464

451465
super().__init__(server_sock, preprocessor=preprocessor, tbq=tbq)
452466

@@ -456,13 +470,9 @@ def recv(self) -> bytes:
456470
if self._message_queue:
457471
return self._message_queue.popleft()
458472

473+
last_cleanup = time.time()
459474
while True:
460-
events = self.sel.select()
461-
if not events:
462-
# No data available
463-
return b''
464-
465-
for key, mask in events:
475+
for key, mask in self.sel.select(timeout=1.0):
466476
if key.data is None:
467477
self.accept(key.fileobj) # type: ignore
468478
else:
@@ -471,6 +481,26 @@ def recv(self) -> bytes:
471481
if self._message_queue:
472482
return self._message_queue.popleft()
473483

484+
# Periodic cleanup for connection timeouts
485+
if self.timeout > 0 and time.time() - last_cleanup >= self.timeout:
486+
self._cleanup_idle_connections()
487+
last_cleanup = time.time()
488+
489+
def _cleanup_idle_connections(self) -> None:
490+
current_time = time.time()
491+
for sock, last_activity in self._client_last_activity.copy().items():
492+
if current_time - last_activity > self.timeout:
493+
self._close_client(sock)
494+
495+
def _close_client(self, client_sock: socket) -> None:
496+
try:
497+
self.sel.unregister(client_sock)
498+
except (KeyError, ValueError):
499+
pass
500+
self._client_last_activity.pop(client_sock, None)
501+
self._client_sockets.discard(client_sock)
502+
client_sock.close()
503+
474504
def read(self) -> typing.Generator[bytes, None, None]:
475505
"""Use custom implementation that handles multiple clients properly"""
476506
try:
@@ -482,34 +512,35 @@ def read(self) -> typing.Generator[bytes, None, None]:
482512
self.sel.close()
483513

484514
def accept(self, sock: socket) -> None:
485-
conn, addr = sock.accept()
486-
conn.setblocking(False)
487-
self._client_sockets.add(sock)
515+
client_sock, addr = sock.accept()
516+
client_sock.setblocking(False)
517+
self._client_sockets.add(client_sock)
488518

489519
data = ClientConnection(
490520
addr=addr,
491521
partial_buffer=b''
492522
)
493-
self.sel.register(conn, selectors.EVENT_READ, data=data)
523+
self.sel.register(client_sock, selectors.EVENT_READ, data=data)
524+
self._client_last_activity[client_sock] = time.time()
494525

495526
def service(self, key: selectors.SelectorKey, mask: int) -> None:
496527
"""Handle client data with per-connection buffering"""
497-
sock: socket = typing.cast(socket, key.fileobj)
528+
client_sock: socket = typing.cast(socket, key.fileobj)
498529
data = key.data
499530

500531
if mask & selectors.EVENT_READ:
501532
try:
502-
recv_data = sock.recv(self.BUF_SIZE)
533+
recv_data = client_sock.recv(self.BUF_SIZE)
503534
if recv_data:
504535
# Process the received data with the connection's buffer
505536
self._process_client_data(data, recv_data)
506537
else:
507538
# close
508-
self.sel.unregister(sock)
509-
sock.close()
539+
self._close_client(client_sock)
510540
except ConnectionResetError:
511-
self.sel.unregister(sock)
512-
sock.close()
541+
self._close_client(client_sock)
542+
543+
self._client_last_activity[client_sock] = time.time()
513544

514545
def _process_client_data(self, client_data: ClientConnection, new_data: bytes) -> None:
515546
"""Process data from a specific client, handling partial messages"""
@@ -536,12 +567,7 @@ def _process_client_data(self, client_data: ClientConnection, new_data: bytes) -
536567

537568
def close(self) -> None:
538569
"""Properly close all connections and selector"""
539-
for sock in self._client_sockets.copy():
540-
try:
541-
self.sel.unregister(sock)
542-
except (KeyError, ValueError):
543-
pass
544-
sock.close()
545-
self._client_sockets.discard(sock)
570+
for client_sock in self._client_sockets.copy():
571+
self._close_client(client_sock)
546572
self.sel.close()
547573
super().close()

tests/test_tcp_server.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ class TestMessageProcessing(unittest.TestCase):
1313

1414
def setUp(self):
1515
"""Set up server for message processing tests"""
16-
with patch('socket.socket'), \
17-
patch('selectors.DefaultSelector'):
16+
with patch('socket.socket'), patch('selectors.DefaultSelector'):
1817

1918
self.server = TCPServer('127.0.0.1', 8080)
2019
self.server._message_queue = deque()

0 commit comments

Comments
 (0)