Skip to content

Commit f81c370

Browse files
committed
Fix ICMP false positives by validating response type
The socket_icmp() method treated any ICMP response as host detection, causing false positives when firewalls sent ICMP Type 3 errors. Code only validated packet_id match, not ICMP response type. RFC 792 specifies only Type 0 (Echo Reply) indicates host is alive. 1. Initialize delay/type/code to None (prevents UnboundLocalError) 2. Capture ICMP type and code from response packet 3. Validate type before declaring host alive 4. Only ICMP Echo Reply (type 0) = host alive 5. ICMP Type 3 errors = host unreachable 6. No response (timeout) = filtered/blocked - Modified socket_icmp() to extract ICMP type/code - Added validation: status = 'alive' only if type == 0 - Updated return to include icmp_type, icmp_code, status fields - Modified response_conditions_matched() to validate status field - Added 4 comprehensive unit tests - Unit tests: 4 new tests cover all scenarios * Echo Reply (type 0) = match * Destination Unreachable (type 3) = no match * Network Unreachable (type 3, code 0) = no match * No response (timeout) = no match - Integration tests: Validated with real hosts - RFC 792 compliance: Only type 0 marked as alive - Eliminates false positives from firewall error responses - More accurate ICMP scanning results - Better error diagnostics with type/code fields Fixes #1186
1 parent 9b5ef1c commit f81c370

File tree

2 files changed

+111
-8
lines changed

2 files changed

+111
-8
lines changed

nettacker/core/lib/socket.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ def socket_icmp(self, host, timeout):
205205
header + data, (socket.gethostbyname(host), 1)
206206
) # Don't know about the 1
207207

208+
delay = None
209+
icmp_response_type = None
210+
icmp_response_code = None
211+
208212
while True:
209213
started_select = time.time()
210214
what_ready = select.select([socket_connection], [], [], timeout)
@@ -221,17 +225,42 @@ def socket_icmp(self, host, timeout):
221225
packet_id,
222226
packet_sequence,
223227
) = struct.unpack("bbHHh", icmp_header)
228+
224229
if packet_id == random_integer:
225230
packet_bytes = struct.calcsize("d")
226231
time_sent = struct.unpack("d", received_packet[28 : 28 + packet_bytes])[0]
227232
delay = time_received - time_sent
233+
# Store ICMP type and code for validation
234+
icmp_response_type = packet_type
235+
icmp_response_code = packet_code
228236
break
229237

230238
timeout = timeout - how_long_in_select
231239
if timeout <= 0:
232240
break
233241
socket_connection.close()
234-
return {"host": host, "response_time": delay, "ssl_flag": False}
242+
243+
244+
# Return response with ICMP type validation
245+
if delay is not None:
246+
status = "alive" if icmp_response_type == 0 else "unreachable"
247+
return {
248+
"host": host,
249+
"response_time": delay,
250+
"ssl_flag": False,
251+
"icmp_type": icmp_response_type,
252+
"icmp_code": icmp_response_code,
253+
"status": status,
254+
}
255+
else:
256+
return {
257+
"host": host,
258+
"response_time": None,
259+
"ssl_flag": False,
260+
"icmp_type": None,
261+
"icmp_code": None,
262+
"status": "filtered",
263+
}
235264

236265

237266
class SocketEngine(BaseEngine):
@@ -288,7 +317,22 @@ def response_conditions_matched(self, sub_step, response):
288317
return condition_results if condition_results else []
289318
return []
290319
if sub_step["method"] == "socket_icmp":
291-
return response
320+
# Only count ICMP Echo Reply (type 0) as success
321+
if isinstance(response, dict) and response.get("status") == "alive":
322+
return response
323+
else:
324+
# Log non-Echo replies for debugging
325+
if isinstance(response, dict):
326+
log.debug(
327+
"ICMP response from {}: type={}, code={}, status={} - "
328+
"not counting as 'alive' (only ICMP Echo Reply type 0 counts)".format(
329+
response.get("host"),
330+
response.get("icmp_type"),
331+
response.get("icmp_code"),
332+
response.get("status"),
333+
)
334+
)
335+
return []
292336
return []
293337

294338
def apply_extra_data(self, sub_step, response):

tests/core/lib/test_socket.py

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,46 @@
66

77

88
class Responses:
9-
tcp_connect_only = socket_icmp = {}
9+
tcp_connect_only = {}
10+
11+
# ICMP responses with type/code validation
12+
socket_icmp = {}
13+
14+
socket_icmp_echo_reply = {
15+
"host": "127.0.0.1",
16+
"response_time": 0.001,
17+
"ssl_flag": False,
18+
"icmp_type": 0, # Echo Reply - SUCCESS
19+
"icmp_code": 0,
20+
"status": "alive",
21+
}
22+
23+
socket_icmp_dest_unreachable = {
24+
"host": "127.0.0.1",
25+
"response_time": None,
26+
"ssl_flag": False,
27+
"icmp_type": 3, # Destination Unreachable
28+
"icmp_code": 3, # Port Unreachable
29+
"status": "unreachable",
30+
}
31+
32+
socket_icmp_network_unreachable = {
33+
"host": "192.0.2.1",
34+
"response_time": None,
35+
"ssl_flag": False,
36+
"icmp_type": 3, # Destination Unreachable
37+
"icmp_code": 0, # Network Unreachable
38+
"status": "unreachable",
39+
}
40+
41+
socket_icmp_no_response = {
42+
"host": "10.255.255.1",
43+
"response_time": None,
44+
"ssl_flag": False,
45+
"icmp_type": None,
46+
"icmp_code": None,
47+
"status": "filtered",
48+
}
1049

1150
tcp_connect_send_and_receive = {
1251
"response": 'HTTP/1.1 400 Bad Request\r\nServer: Apache/2.4.62 (Debian)\r\nContent-Length: 302\r\nConnection: close\r\nContent-Type: text/html; charset=iso-8859-1\r\n\r\n<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">\n<html><head>\n<title>400 Bad Request</title>\n</head><body>\n<h1>Bad Request</h1>\n<p>Your browser sent a request that this server could not understand.<br />\n</p>\n<hr>\n<address>Apache/2.4.62 (Debian)</address>\n</body></html>\n',
@@ -153,11 +192,31 @@ def test_create_tcp_socket(self, mock_wrap, mock_socket):
153192
socket_instance.connect.assert_called_with((HOST, PORT))
154193
mock_wrap.assert_called_with(socket_instance)
155194

156-
def test_response_conditions_matched_socket_icmp(self, socket_engine, substeps, responses):
157-
result = socket_engine.response_conditions_matched(
158-
substeps.socket_icmp, responses.socket_icmp
159-
)
160-
assert result == responses.socket_icmp
195+
def test_socket_icmp_echo_reply_should_match(self, socket_engine, substeps):
196+
"""ICMP Echo Reply (type 0) should be treated as successful detection"""
197+
response = Responses.socket_icmp_echo_reply
198+
result = socket_engine.response_conditions_matched(substeps.socket_icmp, response)
199+
assert result == response # Should return the response (match)
200+
assert result.get("status") == "alive"
201+
assert result.get("icmp_type") == 0
202+
203+
def test_socket_icmp_dest_unreachable_should_not_match(self, socket_engine, substeps):
204+
"""ICMP Destination Unreachable (type 3) should NOT be treated as success"""
205+
response = Responses.socket_icmp_dest_unreachable
206+
result = socket_engine.response_conditions_matched(substeps.socket_icmp, response)
207+
assert result == [] # Should return empty list (no match)
208+
209+
def test_socket_icmp_network_unreachable_should_not_match(self, socket_engine, substeps):
210+
"""ICMP Network Unreachable should NOT be treated as success"""
211+
response = Responses.socket_icmp_network_unreachable
212+
result = socket_engine.response_conditions_matched(substeps.socket_icmp, response)
213+
assert result == [] # Should return empty list (no match)
214+
215+
def test_socket_icmp_no_response_should_not_match(self, socket_engine, substeps):
216+
"""No ICMP response (filtered/timeout) should NOT be treated as success"""
217+
response = Responses.socket_icmp_no_response
218+
result = socket_engine.response_conditions_matched(substeps.socket_icmp, response)
219+
assert result == [] # Should return empty list (no match)
161220

162221
def test_response_conditions_matched_tcp_connect_send_and_receive(
163222
self, socket_engine, substeps, responses

0 commit comments

Comments
 (0)