Skip to content

Commit 6a13bdb

Browse files
committed
[tests] added tests for socket connections
1 parent bc02638 commit 6a13bdb

File tree

3 files changed

+139
-0
lines changed

3 files changed

+139
-0
lines changed

.github/workflows/run-tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ env:
88
GEOCOMPY_TEST_PORT_CLIENT: /tmp/geocompy_pty_client
99
GEOCOMPY_TEST_PORT_SERVER: /tmp/geocompy_pty_server
1010
GEOCOMPY_TEST_PORT_FAULTY: /tmp/geocompy_pty_faulty
11+
GEOCOMPY_TEST_TCPPORT_SERVER: 12345
1112

1213
jobs:
1314
testing:
@@ -30,6 +31,7 @@ jobs:
3031
run: |
3132
socat pty,raw,echo=0,link=$GEOCOMPY_TEST_PORT_CLIENT pty,raw,echo=0,link=$GEOCOMPY_TEST_PORT_SERVER &
3233
python tests/com_echo_server.py &
34+
python tests/tcp_echo_server.py &
3335
3436
- name: Test with pytest
3537
run: |

tests/tcp_echo_server.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from os import environ
2+
3+
import socket
4+
5+
6+
portname = environ.get("GEOCOMPY_TEST_TCPPORT_SERVER", "")
7+
if portname == "":
8+
raise ValueError(
9+
"Echo server serial port name must be set in "
10+
"'GEOCOMPY_TEST_TCPPORT_SERVER' environment variable"
11+
)
12+
13+
14+
def echo_server(port: int) -> None:
15+
with socket.socket(
16+
socket.AF_INET,
17+
socket.SOCK_STREAM,
18+
socket.IPPROTO_TCP
19+
) as soc:
20+
soc.bind(("127.0.0.1", port))
21+
while True:
22+
soc.listen(1)
23+
print(f"Listening on {port}")
24+
server, address = soc.accept()
25+
print(f"Connected to {address}")
26+
27+
with server:
28+
while True:
29+
try:
30+
data = server.recv(1024)
31+
if data == b"":
32+
break
33+
34+
data = data.strip(b"\r\n")
35+
if data != b"":
36+
server.send(data + b"\r\n")
37+
38+
except Exception:
39+
break
40+
41+
42+
if __name__ == "__main__":
43+
echo_server(int(portname))

tests/test_communication.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,28 @@
11
from os import environ
2+
import socket
3+
24
import pytest
35
from serial import Serial
46
from geocompy.communication import (
57
get_dummy_logger,
68
open_serial,
9+
open_socket,
710
SerialConnection,
11+
SocketConnection,
812
crc16_bitwise,
913
crc16_bytewise
1014
)
1115

1216

17+
@pytest.fixture
18+
def sock() -> socket.socket:
19+
return socket.socket(
20+
socket.AF_INET,
21+
socket.SOCK_STREAM,
22+
socket.IPPROTO_TCP
23+
)
24+
25+
1326
portname = environ.get("GEOCOMPY_TEST_PORT_CLIENT", "")
1427
if portname == "": # pragma: no coverage
1528
raise ValueError(
@@ -24,6 +37,13 @@
2437
"'GEOCOMPY_TEST_PORT_FAULTY' environment variable"
2538
)
2639

40+
tcpport = environ.get("GEOCOMPY_TEST_TCPPORT_SERVER", "")
41+
if faultyportname == "": # pragma: no coverage
42+
raise ValueError(
43+
"Echo server tcp port must be set in "
44+
"'GEOCOMPY_TEST_TCPPORT_SERVER' environment variable"
45+
)
46+
2747

2848
class TestDummyLogger:
2949
def test_get_dummy_logger(self) -> None:
@@ -32,6 +52,80 @@ def test_get_dummy_logger(self) -> None:
3252
assert len(log.handlers) == 1
3353

3454

55+
class TestSocketConnection:
56+
def test_init(self, sock: socket.socket) -> None:
57+
sock.connect(("127.0.0.1", int(tcpport)))
58+
with SocketConnection(sock) as client:
59+
assert client.is_open()
60+
61+
client.close()
62+
assert not client.is_open()
63+
64+
def test_open_socket(self) -> None:
65+
with open_socket("127.0.0.1", int(tcpport), "tcp") as soc:
66+
assert soc.is_open()
67+
68+
with pytest.raises(Exception):
69+
open_socket("127.0.0.1", int(tcpport), "rfcomm", timeout=1)
70+
71+
with pytest.raises(ValueError):
72+
open_socket(
73+
"127.0.0.1",
74+
int(tcpport),
75+
"mistake", # type: ignore[arg-type]
76+
timeout=1
77+
)
78+
79+
def test_messaging(self) -> None:
80+
with open_socket(
81+
"127.0.0.1",
82+
int(tcpport),
83+
"tcp"
84+
) as soc:
85+
soc.is_open()
86+
request = "Test"
87+
assert soc.exchange(request) == request
88+
89+
soc.send("ascii")
90+
assert soc.receive() == "ascii"
91+
92+
assert soc.exchange_binary(b"00\r\n") == b"00"
93+
94+
soc.reset()
95+
96+
with pytest.raises(ConnectionError):
97+
soc.send("closed")
98+
99+
with pytest.raises(ConnectionError):
100+
soc.receive()
101+
102+
with open_socket(
103+
"127.0.0.1",
104+
int(tcpport),
105+
"tcp",
106+
sync_after_timeout=True
107+
) as soc:
108+
soc.send("msg1")
109+
soc.send("msg2")
110+
soc.send("msg3")
111+
soc._timeout_counter = 3
112+
assert soc.exchange("recovered") == "recovered"
113+
114+
with soc.timeout_override(1):
115+
with pytest.raises(TimeoutError):
116+
soc.receive()
117+
118+
assert soc._timeout_counter == 1
119+
120+
with pytest.raises(TimeoutError):
121+
soc.receive()
122+
123+
assert soc._timeout_counter == 2
124+
125+
with pytest.raises(ConnectionError):
126+
soc.receive()
127+
128+
35129
class TestSerialConnection:
36130
def test_init(self) -> None:
37131
port = Serial(portname)

0 commit comments

Comments
 (0)