-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathtcpdump-translate
More file actions
executable file
·237 lines (215 loc) · 10.2 KB
/
Copy pathtcpdump-translate
File metadata and controls
executable file
·237 lines (215 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python3
import os, sys, io, re, itertools as it, contextlib as cl
ep_map_default = '''
# "<prefix/net/addr> <replacement>" pairs go here, newline/comma separated
# Exact-match full address should end with "/". Example: 1.2 mynet, 1.2.3.4/ myaddr
127.0.0. lo4., :: lo6.'''
class Highlight:
hue = dict((c, 30+n) for n, c in enumerate(
'black red green yellow blue magenta cyan white'.split() ))
mod = dict(
bold=lambda c,mods: (c, mods+[1]),
bright=lambda c,mods: (c+60 if c and c < 60 else c, mods) )
aliases = dict(v.split('=') for v in ( 'bk=black rd=red gn=green'
' ye=yellow bu=blue pink=magenta pk=magenta turquoise=cyan'
' tq=cyan wh=white br=bright bt=bright bo=bold bd=bold' ).split())
esc = '\033['
reset = f'{esc}0m'
def __init__(self, enabled=True):
self.enabled, self.glyphs = enabled, dict()
g = self.glyphs['reset'] = '\ue000'; self.glyphs[g] = self.reset
def color_list(self):
'''Yield two lists, one for mods and second one of color hues.
Each list has (name, alias1, alias2, ...) tuples as elements.'''
alias_map = dict()
for a, k in self.aliases.items(): alias_map.setdefault(k, list()).append(a)
return tuple(list( (k, *alias_map.get(k, list()))
for k in lst ) for lst in (self.mod, self.hue))
def glyph(self, spec):
if not spec: return ''
if g := self.glyphs.get(spec): return g
sc, smods = list(), list()
for c in spec.lower().split('-'):
for c in c, self.aliases.get(c):
if hue := self.hue.get(c): sc.append(hue)
if mod := self.mod.get(c): smods.append(mod)
c, mods = sc and sc[-1], list()
for mod in smods: c, mods = mod(c, mods)
if c: mods.append(c)
s = (self.esc + ';'.join(map(str, mods)) + 'm') if mods else ''
if g := self.glyphs.get(s): return g
g = self.glyphs[spec] = chr(0xe000 + len(self.glyphs))
self.glyphs[g] = s; self.glyphs[s] = g
return g
def wrap(self, spec, s):
'Wraps s into unicode-glyph-colors to set/reset spec-color'
if not (hl := self.glyph(spec)): return s
return hl + s + self.glyph('reset')
def glyph_esc(self, g, pre=True):
s = self.glyphs.get(g, g)
if not pre and s: s = s.removeprefix(self.esc)
return s
def term(self, s, trunc=9_999):
'Decode unicode-glyph-colors to terminal ansi sequences, truncate as-needed'
if not self.enabled: return s[:trunc]
if len(s) <= trunc: return ''.join(self.glyph_esc(c) for c in s)
n, out, reset = trunc, list(), None
for c in s: # truncation doesn't count invisible color-glyphs
out.append(self.glyph_esc(c))
if 0xe000 <= (cn := ord(c)) <= 0xf8ff: reset = cn == 0xe000
else:
n -= 1
if n == 0: break
if reset is False: out.append(self.glyph_esc('\ue000'))
return ''.join(out)
def print_color_map(hl, ep_map, trunc):
mods, hues = hl.color_list()
hue_host, hue_info = dict(), dict((h[0], (f'{h[0]}' + ( ''
if len(h) == 1 else ' ['+' '.join(h[1:])+']' ))) for h in hues)
for pre, repl, hls in ep_map:
if not (h := hl.glyph(hls)): continue
hue_host[h] = repl if h not in hue_host else f'{hue_host[h]}+'
for h, info in hue_info.items():
if hh := hue_host.get(hl.glyph(h)): hue_info[h] = f'{info} - {hh}'
hue_info_len = max(map(len, hue_info.values()))
print('Format: <esc>: <hue-pure> :: <hue+mods-1> :: <hue+mods-2> ...')
mss = list(ms for n in range(1, len(mods)+1) for ms in it.combinations(mods, n))
def _mod_suffix(hue=''):
line, mods_used = '', dict()
for ms in mss:
if not hue and len(ms) == 1:
c = (m := ms[0])[0]; ext = c + ('' if len(m) == 1 else ' ['+' '.join(m[1:])+']')
else: ext = c = '-'.join(m[0] for m in ms) + ('' if not hue else f'-{hue}')
if not hl.glyph_esc(g := hl.glyph(c), False): ext += ' (no change)'
elif cc := mods_used.get(g): ext += f' (same as {cc})'
elif hh := hue_host.get(g): ext += f' - {hh}'
mods_used[g] = c; line += ' :: ' + hl.wrap(c, ext)
return line
print(hl.term('Color mods' + _mod_suffix(), trunc), end='\n\n')
for hue, info in hue_info.items():
pre = hl.glyph_esc(hl.glyph(hue), False) + ': '
line = pre + hl.wrap(hue, f'{{:<{hue_info_len}}}'.format(info))
print(hl.term(line + _mod_suffix(hue), trunc))
print()
def main(args=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter, usage='%(prog)s [opts] [lines]',
description=dd('''
Regexp-replacer for addrs/nets in tcpdump output for regular packets.
Expects "tcpdump -ln" output piped to stdin, passing it through to stdout.
Only matches and tweaks regular TCP/UDP/ICMP packet lines,
replacing networks/addresses there, passing unmatched lines through as-is.'''))
parser.add_argument('lines', nargs='*', help=dd('''
Specific tcpdump lines to parse and exit, instead of default stdin-mode.
Intended for spot-checks of lines in already-collected output or testing.'''))
parser.add_argument('-m', '--map-file', metavar='file', help=dd('''
File with a list of prefixes, networks or addresses or replace/translate.
%%-prefixed file descriptor number can be used to read map from there (e.g. %%3).
Format is newline/comma separated "<prefix/net/addr> <replacement>" pairs,
with any empty lines or #-comments ignored.
Each src/dst address from tcpdump is matched as "<address>/" against
all <prefix/net/addr> values, in longest-first order, with first match replacing
the matched part with <replacement>.
Example patterns:
127.0.0 lo4 # 127.0.0.1 -> lo4.1, 127.0.0.37 -> lo4.37, etc
:: lo6. # ::1 -> lo6.1, ::37 -> lo6.37, fe80::123 - no match, doesn't start with ::
1.2.3.4/ me # 1.2.3.4 -> me, but won't match 1.2.3.44 due to / at the end
Extra <!highlight> element can be added after <replacement>,
e.g. "1.2.3.4/ me !bold-red" to set terminal font style/color there.
Supported color/style specs: bk/black rd/red gn/green ye/yellow bu/blue
pk/pink/ma/magenta tq/turquoise/cn/cyan wh/white br/bt/bright bo/bd/bold.'''))
parser.add_argument('-f', '--filter', action='store_true', help=dd('''
Print only lines that match TCP/UDP/ICMP packets and some pattern in -m/--map-file.
Intended for filtering tcpdump output by a list of known addrs/nets easily.'''))
parser.add_argument('-g', '--grep', metavar='regexp', action='append', help=dd('''
Python regexp to filter original tcpdump lines by.
Can be used multiple times, in which case lines must match all regexps.'''))
parser.add_argument('-e', '--err-fail', action='store_true', help=dd('''
Fail on first <translate-fail> error, instead of passing those through with suffix.'''))
parser.add_argument('-t', '--truncate', type=int, metavar='chars', help=dd('''
Truncate all passed-through lines to specified length. Default: terminal width.'''))
parser.add_argument('--cm', '--color-map', action='store_true', help=dd('''
Print known colors and whether they're used in -m/--map-file (if specified) and exit.'''))
parser.add_argument('-C', '--no-color', action='store_true', help=dd('''
Disable colors, if used in -m/--map-file. Same as setting NO_COLOR env variable.'''))
opts = parser.parse_args(sys.argv[1:] if args is None else args)
@cl.contextmanager
def in_file(path):
if not path or path == '-': return (yield sys.stdin)
if path[0] == '%': path = int(path[1:])
with open(path) as src: yield src
if opts.map_file:
with in_file(opts.map_file) as src: ep_map = src.read()
else: ep_map = ep_map_default
ep_map = ep_map.split('\n')
for n, line in enumerate(ep_map):
ls = ep_map[n] = line.strip()
if (m := ls.find('#')) >= 0: ep_map[n] = ls[:m].strip()
ep_map = '\n'.join(filter(None, ep_map)).replace(',', '\n').split('\n')
for n, ls in enumerate(ep_map):
if not ls: continue
try:
try: pre, repl = ls.split(); hl = ''
except:
pre, repl, hl = ls.split()
if hl[0] != '!': raise ValueError(ls, hl)
else: hl = hl[1:]
except: parser.error(f'Invalid "<addr/net> <repl> [!hl]" spec in -m/--map-file: {ls!r}')
ep_map[n] = pre, repl, hl
ep_map.sort(key=lambda ab: (len(ab[0]), ab), reverse=True)
if not opts.lines: tcpdump = sys.stdin.buffer
else: tcpdump = io.BytesIO('\n'.join(opts.lines).encode()); tcpdump.seek(0)
trunc = opts.truncate
if not trunc:
try: trunc = os.get_terminal_size().columns
except OSError: trunc = 9_999
hl = Highlight(enabled=not (opts.no_color or os.environ.get('NO_COLOR')))
if opts.cm: return print_color_map(hl, ep_map, trunc)
def _ep(addr, has_port=False):
if has_port: addr, port = addr.rsplit('.', 1)
addr += '/'
for pre, repl, hls in ep_map:
if addr.startswith(pre): addr = repl + addr[len(pre):]; break
addr = hl.wrap(hls, addr.rstrip('/'))
if has_port: addr += f'.{port}'
return addr
re_pkt = re.compile( r'(?P<ts>\d\d:\d\d:\d\d\.\d+)'
r'(?P<any> +(?P<iface>\S+) +(?P<op>P|In|Out))?'
r'(?P<addrs> +(?P<af>IP|IP6) +(?P<src>[\da-f:.]+) +\> +(?P<dst>[\da-f:.]+):)'
r'(?P<p> (?P<proto>UDP|TCP|ICMP|ICMP6|Flags))(?P<tail>.*)' )
re_icmp = re.compile(r'\b(who has|tgt is) (?P<addr>[\da-f:.]+),')
re_grep = list(map(re.compile, opts.grep or list()))
while line := tcpdump.readline():
line = line.decode(errors='surrogateescape').rstrip()
if re_grep and not any(rx.search(line) for rx in re_grep): continue
if m := re_pkt.fullmatch(line):
src0 = src = m['src']; dst0 = dst = m['dst']
suff, has_port = '', m['proto'] in ['TCP', 'UDP']
try: src, dst = (_ep(m[k], has_port) for k in ['src', 'dst'])
except:
if opts.err_fail: raise
suff = ' <translate-fail>'
ep_match = not (src == src0 and dst == dst0)
addrs = f' {m["af"]} {src} > {dst}:'
line = m.expand(fr'\g<ts>\g<any>{addrs}\g<p>\g<tail>')
if m['proto'] in ['ICMP', 'ICMP6'] and (m2 := re_icmp.search(line)):
addr0 = addr = m2['addr']
try:
addr = _ep(addr)
ep_match = ep_match or addr != addr0
except:
if opts.err_fail: raise
suff = ' <icmp-translate-fail>'
line = line[:m2.start()] + m2.expand(f'{m2[1]} {addr},') + line[m2.end():]
if suff: line = hl.wrap('br-red', f'{line}{suff}'); ep_match = True
if opts.filter and not ep_match: continue
print(hl.term(line, trunc))
if __name__ == '__main__':
try: sys.exit(main())
except KeyboardInterrupt: sys.exit(1)
except BrokenPipeError: # stdout pipe closed
os.dup2(os.open(os.devnull, os.O_WRONLY), sys.stdout.fileno())
sys.exit(1)