Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions tests/test-client-verify-cn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python3

"""
Tests that --verify-cn flag works correctly on the client.
"""

from common import LOCALHOST, RootCert, STATUS_PORT, SocketPair, TcpClient, \
TlsServer, print_ok, run_ghostunnel, terminate, LISTEN_PORT, TARGET_PORT, \
assert_connection_rejected

ghostunnel = None
try:
# create certs
root = RootCert('root')
root.create_signed_cert('client')
root.create_signed_cert(
'server1',
san='IP:127.0.0.1,IP:::1,DNS:localhost')
root.create_signed_cert(
'server2',
san='IP:127.0.0.1,IP:::1,DNS:localhost')

# start ghostunnel with --verify-cn=server1
ghostunnel = run_ghostunnel(['client',
'--listen={0}:{1}'.format(LOCALHOST, LISTEN_PORT),
'--target=localhost:{0}'.format(TARGET_PORT),
'--keystore=client.p12',
'--verify-cn=server1',
'--cacert=root.crt',
'--status={0}:{1}'.format(LOCALHOST,
STATUS_PORT)])

# connect to server1 (CN=server1), should succeed
pair = SocketPair(TcpClient(LISTEN_PORT), TlsServer(
'server1', 'root', TARGET_PORT))
pair.validate_can_send_from_client(
"hello world", "1: client -> server")
pair.validate_can_send_from_server(
"hello world", "1: server -> client")
pair.validate_closing_client_closes_server(
"1: client closed -> server closed")

# connect to server2 (CN=server2), should be rejected
assert_connection_rejected(
TcpClient(LISTEN_PORT), TlsServer('server2', 'root', TARGET_PORT),
"server2", timeout_ok=False)

print_ok("OK")
finally:
terminate(ghostunnel)
53 changes: 53 additions & 0 deletions tests/test-client-verify-ou.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python3

"""
Tests that --verify-ou flag works correctly on the client.

Note: create_signed_cert(name) sets both CN and OU to the given name,
so --verify-ou=server1 checks the OU field of the server certificate.
"""

from common import LOCALHOST, RootCert, STATUS_PORT, SocketPair, TcpClient, \
TlsServer, print_ok, run_ghostunnel, terminate, LISTEN_PORT, TARGET_PORT, \
assert_connection_rejected

ghostunnel = None
try:
# create certs
root = RootCert('root')
root.create_signed_cert('client')
root.create_signed_cert(
'server1',
san='IP:127.0.0.1,IP:::1,DNS:localhost')
root.create_signed_cert(
'server2',
san='IP:127.0.0.1,IP:::1,DNS:localhost')

# start ghostunnel with --verify-ou=server1
ghostunnel = run_ghostunnel(['client',
'--listen={0}:{1}'.format(LOCALHOST, LISTEN_PORT),
'--target=localhost:{0}'.format(TARGET_PORT),
'--keystore=client.p12',
'--verify-ou=server1',
'--cacert=root.crt',
'--status={0}:{1}'.format(LOCALHOST,
STATUS_PORT)])

# connect to server1 (OU=server1), should succeed
pair = SocketPair(TcpClient(LISTEN_PORT), TlsServer(
'server1', 'root', TARGET_PORT))
pair.validate_can_send_from_client(
"hello world", "1: client -> server")
pair.validate_can_send_from_server(
"hello world", "1: server -> client")
pair.validate_closing_client_closes_server(
"1: client closed -> server closed")

# connect to server2 (OU=server2), should be rejected
assert_connection_rejected(
TcpClient(LISTEN_PORT), TlsServer('server2', 'root', TARGET_PORT),
"server2", timeout_ok=False)

print_ok("OK")
finally:
terminate(ghostunnel)
68 changes: 68 additions & 0 deletions tests/test-server-allow-all.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python3

"""
Tests that --allow-all flag accepts any client with a valid certificate,
regardless of CN/OU/SAN, but still rejects clients with untrusted CAs.
"""

from common import LOCALHOST, RootCert, STATUS_PORT, SocketPair, TcpServer, \
TlsClient, print_ok, run_ghostunnel, terminate, LISTEN_PORT, TARGET_PORT, \
assert_connection_rejected

ghostunnel = None
try:
# create certs
root = RootCert('root')
root.create_signed_cert('server')
root.create_signed_cert('client1')
root.create_signed_cert(
'client2',
san='DNS:other-client,IP:127.0.0.1,IP:::1,DNS:localhost')

# create a cert signed by a different CA (not trusted by ghostunnel)
other_root = RootCert('other_root')
other_root.create_signed_cert('untrusted_client')

# create a combined CA bundle so the untrusted client can verify the
# server cert (signed by root) while presenting its own cert (signed
# by other_root)
with open('combined_ca.crt', 'w') as f:
with open('root.crt') as r:
f.write(r.read())
with open('other_root.crt') as r:
f.write(r.read())

# start ghostunnel with --allow-all
ghostunnel = run_ghostunnel(['server',
'--listen={0}:{1}'.format(LOCALHOST, LISTEN_PORT),
'--target={0}:{1}'.format(LOCALHOST, TARGET_PORT),
'--keystore=server.p12',
'--cacert=root.crt',
'--allow-all',
'--status={0}:{1}'.format(LOCALHOST,
STATUS_PORT)])

# client1 should be accepted
pair1 = SocketPair(
TlsClient('client1', 'root', LISTEN_PORT), TcpServer(TARGET_PORT))
pair1.validate_can_send_from_client("toto", "client1 accepted")
pair1.validate_can_send_from_server("toto", "client1 accepted")
pair1.cleanup()

# client2 (different OU/SAN) should also be accepted with --allow-all
pair2 = SocketPair(
TlsClient('client2', 'root', LISTEN_PORT), TcpServer(TARGET_PORT))
pair2.validate_can_send_from_client("toto", "client2 accepted (different OU)")
pair2.validate_can_send_from_server("toto", "client2 accepted (different OU)")
pair2.cleanup()

# client signed by a different CA should still be rejected
# use combined_ca so the TLS client trusts the server cert, but the
# server won't trust the client cert (signed by other_root)
assert_connection_rejected(
TlsClient('untrusted_client', 'combined_ca', LISTEN_PORT), TcpServer(TARGET_PORT),
"untrusted_client")

print_ok("OK")
finally:
terminate(ghostunnel)
124 changes: 124 additions & 0 deletions tests/test-server-max-concurrent-conns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python3

"""
Tests that --max-concurrent-conns limits the number of simultaneous connections.
"""

from common import LOCALHOST, RootCert, STATUS_PORT, SocketPair, TcpServer, \
TlsClient, print_ok, run_ghostunnel, terminate, LISTEN_PORT, TARGET_PORT
import socket
import ssl
import time

ghostunnel = None
try:
# create certs
root = RootCert('root')
root.create_signed_cert('server')
root.create_signed_cert('client')

# start ghostunnel with --max-concurrent-conns=2 and a short connect
# timeout so that any stale TCP connections in the backlog are cleaned
# up quickly by ghostunnel
ghostunnel = run_ghostunnel(['server',
'--listen={0}:{1}'.format(LOCALHOST, LISTEN_PORT),
'--target={0}:{1}'.format(LOCALHOST, TARGET_PORT),
'--keystore=server.p12',
'--cacert=root.crt',
'--allow-ou=client',
'--max-concurrent-conns=2',
'--connect-timeout=1s',
'--status={0}:{1}'.format(LOCALHOST,
STATUS_PORT)])

# open first connection
pair1 = SocketPair(
TlsClient('client', 'root', LISTEN_PORT), TcpServer(TARGET_PORT))
pair1.validate_can_send_from_client("hello1", "pair1 works")
print_ok("connection 1 established")

# open second connection
pair2 = SocketPair(
TlsClient('client', 'root', LISTEN_PORT), TcpServer(TARGET_PORT))
pair2.validate_can_send_from_client("hello2", "pair2 works")
print_ok("connection 2 established")

# third connection: the semaphore is full so ghostunnel won't even accept
# the TCP connection. A TLS connect attempt should time out.
blocked = False
sock3 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock3.settimeout(2)
try:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_verify_locations(cafile='root.crt')
ctx.load_cert_chain('client.crt', 'client.key')
tls_sock = ctx.wrap_socket(sock3, server_hostname=LOCALHOST)
tls_sock.connect((LOCALHOST, LISTEN_PORT))
# if we get here, connection was accepted — check if backend is reachable
# (it shouldn't be since semaphore is full)
backend3 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
backend3.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
so_reuseport = getattr(socket, 'SO_REUSEPORT', None)
if so_reuseport is not None:
backend3.setsockopt(socket.SOL_SOCKET, so_reuseport, 1)
backend3.settimeout(2)
backend3.bind((LOCALHOST, TARGET_PORT))
backend3.listen(1)
try:
backend3.accept()
raise Exception("3rd connection should not have reached backend")
except socket.timeout:
blocked = True
finally:
backend3.close()
tls_sock.close()
except (socket.timeout, ssl.SSLError, ConnectionError, OSError):
blocked = True
finally:
try:
sock3.close()
except OSError:
pass # best-effort cleanup, socket may already be closed

if not blocked:
raise Exception("3rd connection was not blocked by concurrency limit")
print_ok("3rd connection correctly blocked by concurrency limit")

# close first connection to free up a slot
pair1.cleanup()
print_ok("connection 1 closed")

# Wait for ghostunnel to fully release the semaphore slot. The stale
# sock3 TCP connection may still be in the listen backlog — ghostunnel
# will accept it, fail the handshake (connect-timeout=1s), and release
# the semaphore. We need to wait for that cycle to complete.
deadline = time.time() + 10
pair3 = None
last_error = None
while time.time() < deadline:
try:
pair3 = SocketPair(
TlsClient('client', 'root', LISTEN_PORT), TcpServer(TARGET_PORT))
pair3.validate_can_send_from_client("hello3", "pair3 works after slot freed")
print_ok("connection 3 established after freeing slot")
break
except Exception as exc:
last_error = exc
if pair3 is not None:
try:
pair3.cleanup()
except Exception:
pass # best-effort cleanup during retry
pair3 = None
time.sleep(0.5)

if pair3 is None:
raise Exception("3rd connection did not succeed after freeing slot") from last_error

pair2.cleanup()
pair3.cleanup()

print_ok("OK")
finally:
terminate(ghostunnel)
Loading
Loading