Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions nettacker/core/lib/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def socket_icmp(self, host, timeout):
Note that ICMP messages can only be sent from processes running as root.
Derived from ping.c distributed in Linux's netkit. That code is
copyright (c) 1989 by The Regents of the University of California.
That code is in turn derived from code written by Mike Muuss of the
That code is in turn derived from code written by Mike Muess of the
US Army Ballistic Research Laboratory in December, 1983 and
placed in the public domain. They have my thanks.
Bugs are naturally mine. I'd be glad to hear about them. There are
Expand Down Expand Up @@ -205,6 +205,10 @@ def socket_icmp(self, host, timeout):
header + data, (socket.gethostbyname(host), 1)
) # Don't know about the 1

delay = None
icmp_response_type = None
icmp_response_code = None

while True:
started_select = time.time()
what_ready = select.select([socket_connection], [], [], timeout)
Expand All @@ -221,17 +225,41 @@ def socket_icmp(self, host, timeout):
packet_id,
packet_sequence,
) = struct.unpack("bbHHh", icmp_header)

if packet_id == random_integer:
packet_bytes = struct.calcsize("d")
time_sent = struct.unpack("d", received_packet[28 : 28 + packet_bytes])[0]
delay = time_received - time_sent
# Store ICMP type and code for validation
icmp_response_type = packet_type
icmp_response_code = packet_code
break

timeout = timeout - how_long_in_select
if timeout <= 0:
break
socket_connection.close()
return {"host": host, "response_time": delay, "ssl_flag": False}

# Return response with ICMP type validation
if delay is not None:
status = "alive" if icmp_response_type == 0 else "unreachable"
return {
"host": host,
"response_time": delay,
"ssl_flag": False,
"icmp_type": icmp_response_type,
"icmp_code": icmp_response_code,
"status": status,
}
else:
return {
"host": host,
"response_time": None,
"ssl_flag": False,
"icmp_type": None,
"icmp_code": None,
"status": "filtered",
}


class SocketEngine(BaseEngine):
Expand Down Expand Up @@ -288,7 +316,11 @@ def response_conditions_matched(self, sub_step, response):
return condition_results if condition_results else []
return []
if sub_step["method"] == "socket_icmp":
return response
# Only count ICMP Echo Reply (type 0) as success
if isinstance(response, dict) and response.get("status") == "alive":
return response
else:
return []
return []

def apply_extra_data(self, sub_step, response):
Expand Down
71 changes: 65 additions & 6 deletions tests/core/lib/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,46 @@


class Responses:
tcp_connect_only = socket_icmp = {}
tcp_connect_only = {}

# ICMP responses with type/code validation
socket_icmp = {}

socket_icmp_echo_reply = {
"host": "127.0.0.1",
"response_time": 0.001,
"ssl_flag": False,
"icmp_type": 0, # Echo Reply - SUCCESS
"icmp_code": 0,
"status": "alive",
}

socket_icmp_dest_unreachable = {
"host": "127.0.0.1",
"response_time": None,
"ssl_flag": False,
"icmp_type": 3, # Destination Unreachable
"icmp_code": 3, # Port Unreachable
"status": "unreachable",
}

socket_icmp_network_unreachable = {
"host": "192.0.2.1",
"response_time": None,
"ssl_flag": False,
"icmp_type": 3, # Destination Unreachable
"icmp_code": 0, # Network Unreachable
"status": "unreachable",
}

socket_icmp_no_response = {
"host": "10.255.255.1",
"response_time": None,
"ssl_flag": False,
"icmp_type": None,
"icmp_code": None,
"status": "filtered",
}

tcp_connect_send_and_receive = {
"response": 'HTTP/1.1 400 Bad Request\r\nServer: Apache/2.4.62 (Debian)\r\nContent-Length: 302\r\nConnection: close\r\nContent-Type: text/html; charset=iso-8859-1\r\n\r\n<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">\n<html><head>\n<title>400 Bad Request</title>\n</head><body>\n<h1>Bad Request</h1>\n<p>Your browser sent a request that this server could not understand.<br />\n</p>\n<hr>\n<address>Apache/2.4.62 (Debian)</address>\n</body></html>\n',
Expand Down Expand Up @@ -153,11 +192,31 @@ def test_create_tcp_socket(self, mock_wrap, mock_socket):
socket_instance.connect.assert_called_with((HOST, PORT))
mock_wrap.assert_called_with(socket_instance)

def test_response_conditions_matched_socket_icmp(self, socket_engine, substeps, responses):
result = socket_engine.response_conditions_matched(
substeps.socket_icmp, responses.socket_icmp
)
assert result == responses.socket_icmp
def test_socket_icmp_echo_reply_should_match(self, socket_engine, substeps):
"""ICMP Echo Reply (type 0) should be treated as successful detection"""
response = Responses.socket_icmp_echo_reply
result = socket_engine.response_conditions_matched(substeps.socket_icmp, response)
assert result == response # Should return the response (match)
assert result.get("status") == "alive"
assert result.get("icmp_type") == 0

def test_socket_icmp_dest_unreachable_should_not_match(self, socket_engine, substeps):
"""ICMP Destination Unreachable (type 3) should NOT be treated as success"""
response = Responses.socket_icmp_dest_unreachable
result = socket_engine.response_conditions_matched(substeps.socket_icmp, response)
assert result == [] # Should return empty list (no match)

def test_socket_icmp_network_unreachable_should_not_match(self, socket_engine, substeps):
"""ICMP Network Unreachable should NOT be treated as success"""
response = Responses.socket_icmp_network_unreachable
result = socket_engine.response_conditions_matched(substeps.socket_icmp, response)
assert result == [] # Should return empty list (no match)

def test_socket_icmp_no_response_should_not_match(self, socket_engine, substeps):
"""No ICMP response (filtered/timeout) should NOT be treated as success"""
response = Responses.socket_icmp_no_response
result = socket_engine.response_conditions_matched(substeps.socket_icmp, response)
assert result == [] # Should return empty list (no match)

def test_response_conditions_matched_tcp_connect_send_and_receive(
self, socket_engine, substeps, responses
Expand Down