-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnetsc_flat.py
More file actions
374 lines (317 loc) · 13.4 KB
/
Copy pathnetsc_flat.py
File metadata and controls
374 lines (317 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import asyncio
import ipaddress
import json
import csv
import re
import socket
import sys
import os
import subprocess
from typing import List, Dict, Optional, Tuple
from datetime import datetime
# Optional plotting
try:
import matplotlib.pyplot as plt
HAS_MPL = True
except Exception:
HAS_MPL = False
# ----------------------- Network scanning utilities -----------------------
DEFAULT_PORTS = [21, 22, 23, 25, 53, 80, 110, 139, 143, 443, 445, 3306, 3389, 8080]
BANNER_READ_BYTES = 1024
async def try_connect(ip: str, port: int, timeout: float = 3.0) -> Optional[bytes]:
"""Attempt TCP connect and do a short non-blocking read to fetch any banner.
Returns raw bytes received or None if closed / timeout / refused.
"""
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host=ip, port=port), timeout=timeout)
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
return None
banner = b''
try:
# For HTTP-like ports we send a minimal HEAD to elicit a response
if port in (80, 8080, 8000, 8888):
try:
writer.write(b'HEAD / HTTP/1.0\r\nHost: example.com\r\n\r\n')
await writer.drain()
except Exception:
pass
# attempt to read a little bit
banner = await asyncio.wait_for(reader.read(BANNER_READ_BYTES), timeout=1.0)
except (asyncio.IncompleteReadError, asyncio.TimeoutError):
pass
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
return banner if banner else None
def parse_mac_from_arp(ip: str) -> Optional[str]:
"""Resolve a MAC address for `ip`.
Strategy (best-effort):
1. If `scapy` is installed and the script has permission to send packets, perform an active ARP request (recommended).
2. Fallback to system ARP table (`ip neigh` or `arp -n`) if scapy is unavailable or active probing fails.
Returns lowercase MAC string or None.
"""
# Attempt active ARP with scapy if available
try:
from scapy.all import ARP, Ether, srp
SCAPY_AVAILABLE = True
except Exception:
SCAPY_AVAILABLE = False
if SCAPY_AVAILABLE:
try:
# build ARP request and send on layer 2
pkt = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=ip)
# srp returns (answered, unanswered)
ans, _ = srp(pkt, timeout=2, verbose=0)
for _, rcv in ans:
mac = rcv[Ether].src
if mac:
return mac.lower()
except PermissionError:
# not running as root/capable user to send raw packets
pass
except Exception:
# any other scapy failure -> fallback
pass
# Fallback: prefer `ip neigh` (iproute2), then `arp -n`, then Windows `arp -a` parsing
def try_ip_neigh(ip_addr: str) -> Optional[str]:
try:
out = subprocess.check_output(['ip', 'neigh', 'show', ip_addr], stderr=subprocess.DEVNULL, text=True)
except Exception:
return None
m = re.search(r'([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', out)
if m:
return m.group(1).lower()
return None
mac = try_ip_neigh(ip)
if mac:
return mac
# Try legacy arp -n
if sys.platform.startswith('linux') or sys.platform.startswith('darwin') or 'bsd' in sys.platform:
try:
out = subprocess.check_output(['arp', '-n', ip], stderr=subprocess.DEVNULL, text=True)
except subprocess.CalledProcessError:
return None
m = re.search(r'([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', out)
if m:
return m.group(1).lower()
return None
else:
# On Windows, use `arp -a` and parse
try:
out = subprocess.check_output(['arp', '-a'], stderr=subprocess.DEVNULL, text=True)
except subprocess.CalledProcessError:
return None
for line in out.splitlines():
if ip in line:
m = re.search(r'([0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2})', line)
if m:
return m.group(1).replace('-', ':').lower()
return None
async def scan_host_ports(ip: str, ports: List[int], timeout: float, sem: asyncio.Semaphore) -> Dict:
"""Scan a single host for given ports asynchronously. Returns a dict with results."""
results = []
async with sem:
tasks = []
for port in ports:
tasks.append(_scan_port_task(ip, port, timeout))
for fut in asyncio.as_completed(tasks):
port, banner = await fut
if banner is not None:
results.append({'port': port, 'banner': banner.decode(errors='replace') if isinstance(banner, (bytes, bytearray)) else str(banner)})
mac = parse_mac_from_arp(ip)
return {'ip': ip, 'mac': mac, 'open_ports': results}
async def _scan_port_task(ip: str, port: int, timeout: float) -> Tuple[int, Optional[bytes]]:
banner = await try_connect(ip, port, timeout)
return port, banner
async def scan_network(cidr: str, ports: List[int], timeout: float = 2.0, concurrency: int = 500) -> List[Dict]:
"""Scan hosts in the given CIDR (or single IP) and return list of host dicts.
Excludes network and broadcast addresses for IPv4 networks.
"""
net = ipaddress.ip_network(cidr, strict=False)
hosts = [str(ip) for ip in net.hosts()]
sem = asyncio.Semaphore(concurrency)
results = []
tasks = [scan_host_ports(ip, ports, timeout, sem) for ip in hosts]
for fut in asyncio.as_completed(tasks):
try:
r = await fut
# only include hosts with open ports
if r['open_ports']:
results.append(r)
except Exception as e:
# non-fatal; continue scanning
print(f"scan task error: {e}", file=sys.stderr)
return results
# ----------------------- Log analyzer utilities -----------------------
APACHE_COMMON_REGEX = re.compile(
r'(?P<ip>\S+) \S+ \S+ \[(?P<time>[^\]]+)\] "(?P<request>[^"]*)" (?P<status>\d{3}) (?P<size>\S+)'
)
SYSLOG_REGEX = re.compile(r'(?P<time>\w{3} +\d{1,2} \d{2}:\d{2}:\d{2}) (?P<host>\S+) (?P<process>[^:\[]+)(?:\[\d+\])?: (?P<msg>.*)')
def analyze_logs(path: str) -> Dict:
"""Parse and summarize a log file (Apache or syslog-ish).
Returns a summary dict with counts and top items.
"""
total = 0
errors = 0
not_found = 0
auth_failures = 0
top_ips = {}
top_uris = {}
with open(path, 'r', errors='replace') as fh:
for line in fh:
total += 1
line = line.strip()
# Try Apache first
m = APACHE_COMMON_REGEX.search(line)
if m:
status = int(m.group('status'))
ip = m.group('ip')
req = m.group('request')
uri = req.split(' ')[1] if req and len(req.split(' ')) >= 2 else '-'
top_ips[ip] = top_ips.get(ip, 0) + 1
top_uris[uri] = top_uris.get(uri, 0) + 1
if status >= 500:
errors += 1
if status == 404:
not_found += 1
# Look for common auth failure strings in request or size? (best-effort)
if 'authentication' in line.lower() or 'auth' in line.lower():
auth_failures += 1
continue
# Fallback to syslog regex
m2 = SYSLOG_REGEX.search(line)
if m2:
msg = m2.group('msg')
if 'failed password' in msg.lower() or 'authentication failure' in msg.lower() or 'auth failure' in msg.lower():
auth_failures += 1
if 'error' in msg.lower():
errors += 1
# try extract IPs
ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line)
for ip in ips:
top_ips[ip] = top_ips.get(ip, 0) + 1
continue
# otherwise, some generic heuristics
if '404' in line:
not_found += 1
if 'error' in line.lower():
errors += 1
summary = {
'file': os.path.abspath(path),
'total_lines': total,
'errors': errors,
'404s': not_found,
'auth_failures': auth_failures,
'top_ips': sorted(top_ips.items(), key=lambda x: x[1], reverse=True)[:20],
'top_uris': sorted(top_uris.items(), key=lambda x: x[1], reverse=True)[:20],
}
return summary
# ----------------------- IO helpers -----------------------
def save_json(data, path: str):
with open(path, 'w') as fh:
json.dump(data, fh, indent=2)
def save_csv_from_scan(scan_results: List[Dict], path: str):
# Flatten rows: ip, mac, port, banner
rows = []
for host in scan_results:
for p in host['open_ports']:
rows.append({'ip': host['ip'], 'mac': host.get('mac'), 'port': p['port'], 'banner': p.get('banner')})
fieldnames = ['ip', 'mac', 'port', 'banner']
with open(path, 'w', newline='', encoding='utf-8') as fh:
writer = csv.DictWriter(fh, fieldnames=fieldnames)
writer.writeheader()
for r in rows:
writer.writerow(r)
def save_csv_from_summary(summary: Dict, path: str):
# write a small CSV of top_ips and top_uris
with open(path, 'w', newline='', encoding='utf-8') as fh:
writer = csv.writer(fh)
writer.writerow(['metric', 'value', 'count'])
for ip, c in summary.get('top_ips', []):
writer.writerow(['top_ip', ip, c])
for uri, c in summary.get('top_uris', []):
writer.writerow(['top_uri', uri, c])
# Also write simple counts
writer.writerow(['total_lines', '', summary.get('total_lines', 0)])
writer.writerow(['errors', '', summary.get('errors', 0)])
writer.writerow(['404s', '', summary.get('404s', 0)])
writer.writerow(['auth_failures', '', summary.get('auth_failures', 0)])
def plot_summary(summary: Dict, out_png: str):
if not HAS_MPL:
raise RuntimeError('matplotlib not available')
labels = ['errors', '404s', 'auth_failures']
vals = [summary.get(k, 0) for k in labels]
plt.figure(figsize=(6,4))
plt.bar(labels, vals)
plt.title('Log summary')
plt.tight_layout()
plt.savefig(out_png)
plt.close()
# ----------------------- CLI -----------------------
def cmd_scan(args: argparse.Namespace):
ports = args.ports or DEFAULT_PORTS
if args.ports_file:
with open(args.ports_file) as fh:
ports = [int(x.strip()) for x in fh if x.strip()]
# run event loop
results = asyncio.run(scan_network(args.cidr, ports, timeout=args.timeout, concurrency=args.concurrency))
out = {'scanned_cidr': args.cidr, 'timestamp': datetime.utcnow().isoformat() + 'Z', 'results': results}
if args.json:
save_json(out, args.json)
print(f'Wrote JSON to {args.json}')
if args.csv:
save_csv_from_scan(results, args.csv)
print(f'Wrote CSV to {args.csv}')
if not args.json and not args.csv:
print(json.dumps(out, indent=2))
def cmd_analyze(args: argparse.Namespace):
summary = analyze_logs(args.logfile)
if args.json:
save_json(summary, args.json)
print(f'Wrote JSON to {args.json}')
if args.csv:
save_csv_from_summary(summary, args.csv)
print(f'Wrote CSV to {args.csv}')
if args.plot:
try:
plot_summary(summary, args.plot)
print(f'Wrote plot to {args.plot}')
except Exception as e:
print(f'Plot failed: {e}', file=sys.stderr)
if not args.json and not args.csv and not args.plot:
print(json.dumps(summary, indent=2))
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description='Async network scanner + log analyzer')
sub = p.add_subparsers(dest='cmd')
sp_scan = sub.add_parser('scan', help='Scan a CIDR or single IP for open ports')
sp_scan.add_argument('--cidr', required=True, help='CIDR or single IP to scan, e.g. 192.168.1.0/24 or 10.0.0.5')
sp_scan.add_argument('--ports', nargs='+', type=int, help='List of ports to scan (space separated)')
sp_scan.add_argument('--ports-file', help='File with one port per line')
sp_scan.add_argument('--timeout', type=float, default=2.0, help='Timeout (sec) for each connect/read')
sp_scan.add_argument('--concurrency', type=int, default=500, help='Max concurrent connections')
sp_scan.add_argument('--json', help='Write JSON output to file')
sp_scan.add_argument('--csv', help='Write CSV output to file')
sp_an = sub.add_parser('analyze', help='Analyze a syslog or Apache log file')
sp_an.add_argument('--logfile', required=True, help='Path to log file')
sp_an.add_argument('--json', help='Write JSON summary to file')
sp_an.add_argument('--csv', help='Write CSV summary to file')
sp_an.add_argument('--plot', help='Save a simple matplotlib bar chart to PNG')
return p
def main():
parser = build_parser()
args = parser.parse_args()
if not args.cmd:
parser.print_help()
sys.exit(1)
if args.cmd == 'scan':
cmd_scan(args)
elif args.cmd == 'analyze':
cmd_analyze(args)
if __name__ == '__main__':
main()