Skip to content

Named pipe and tshark support #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

def read_from_pipe(pipe_path='/tmp/tshark_pipe'):
if not os.path.exists(pipe_path):
print(f"Error: The named pipe {pipe_path} does not exist. Make sure that TsharkLive has created it.")
return

try:
with open(pipe_path, 'r') as pipe:
for line in pipe:
print(f"{line.strip()}")
except KeyboardInterrupt:
print("\n")
except Exception as e:
print(f"Error: {e}")

if __name__ == "__main__":
read_from_pipe()
10 changes: 9 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def main():

modules.add_argument('--info', action = 'store_true', help = 'Read generic information about the baseband device.')
modules.add_argument('--pcap-dump', metavar = 'PCAP_FILE', type = FileType('ab'), help = 'Generate a PCAP file containing GSMTAP frames for 2G/3G/4G, to be loaded using Wireshark.')
modules.add_argument('--tshark', action = 'store_true', help = 'Same as --pcap-dump, but directly spawn tshark.')
modules.add_argument('--analyze', action = 'store_true', help = 'Same as --pcap-dump, but send the output to a named pipe.')
modules.add_argument('--wireshark-live', action = 'store_true', help = 'Same as --pcap-dump, but directly spawn a Wireshark instance.')
# modules.add_argument('--efs-dump', metavar = 'OUTPUT_DIR', help = 'Dump the internal EFS filesystem of the device.')
modules.add_argument('--memory-dump', metavar = 'OUTPUT_DIR', help = 'Dump the memory of the device (may not or partially work with recent devices).')
Expand All @@ -73,7 +75,7 @@ def main():
'To be used in combination with --adb.')
modules.add_argument('--decoded-sibs-dump', action = 'store_true', help = 'Print decoded SIBs to stdout (experimental, requires pycrate).')

pcap_options = parser.add_argument_group(title = 'PCAP generation options', description = 'To be used along with --pcap-dump or --wireshark-live.')
pcap_options = parser.add_argument_group(title = 'PCAP generation options', description = 'To be used along with --pcap-dump, --wireshark-live, --tshark or --analyze.')

pcap_options.add_argument('--reassemble-sibs', action = 'store_true', help = 'Include reassembled UMTS SIBs as supplementary frames, also embedded fragmented in RRC frames.')
pcap_options.add_argument('--decrypt-nas', action = 'store_true', help = 'Include unencrypted LTE NAS as supplementary frames, also embedded ciphered in RRC frames.')
Expand Down Expand Up @@ -154,6 +156,12 @@ def parse_modules_args(args):
if args.wireshark_live:
from .modules.pcap_dump import WiresharkLive
diag_input.add_module(WiresharkLive(diag_input, args.reassemble_sibs, args.decrypt_nas, args.include_ip_traffic))
if args.analyze and not args.tshark:
from .modules.pcap_dump import ExternalAnalyze
diag_input.add_module(ExternalAnalyze(diag_input, args.reassemble_sibs, args.decrypt_nas, args.include_ip_traffic))
if args.tshark and not args.analyze:
from .modules.pcap_dump import TsharkLive
diag_input.add_module(TsharkLive(diag_input, args.reassemble_sibs, args.decrypt_nas, args.include_ip_traffic))
if args.json_geo_dump:
diag_input.add_module(JsonGeoDumper(diag_input, args.json_geo_dump))
if args.decoded_sibs_dump:
Expand Down
100 changes: 59 additions & 41 deletions src/modules/pcap_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from logging import warning
from sys import platform
import gzip

import sys
import os
from ..modules._enable_log_mixin import EnableLogMixin, TYPES_FOR_RAW_PACKET_LOGGING
from ..modules.decoded_sibs_dump import DecodedSibsDumper

MODULES_DIR = realpath(dirname(__file__))
SRC_WIRESHARK_PLUGIN_DIR = realpath(MODULES_DIR + '/wireshark_plugin')


try:
from os import setpgrp, getenv, setresgid, setresuid, setgroups, getgrouplist
from pwd import getpwuid
Expand All @@ -37,40 +37,40 @@
"""

class PcapDumper(DecodedSibsDumper):

def __init__(self, diag_input, pcap_file, reassemble_sibs, decrypt_nas, include_ip_traffic):

self.pcap_file = pcap_file

"""
Write a PCAP file header - https://wiki.wireshark.org/Development/LibpcapFileFormat#File_Format
"""

if not self.pcap_file.appending_to_file:

self.pcap_file.write(pack('<IHHi4xII',
0xa1b2c3d4, # PCAP Magic
2, 4, # Version
0, # Timezone
65535, # Max packet length
228 # LINKTYPE_IPV4 (for GSMTAP)
))

self.diag_input = diag_input

self.limit_registered_logs = TYPES_FOR_RAW_PACKET_LOGGING

self.current_rat = None # Radio access technology: "2g", "3g", "4g", "5g"

self.reassemble_sibs = reassemble_sibs
self.decrypt_nas = decrypt_nas
self.include_ip_traffic = include_ip_traffic

# Install the QCSuper Lua Wireshark plug-in, except if the
# corresponding environment variable is set.

self.install_wireshark_plugin()

def install_wireshark_plugin(self): # WIP

# See: https://www.wireshark.org/docs/wsug_html_chunked/ChPluginFolders.html
Expand Down Expand Up @@ -506,68 +506,86 @@ def __del__(self):
if getattr(self, 'pcap_file', None):
self.pcap_file.close()

class StdoutWrapper:
def __init__(self, stream):
self.stream = stream.buffer
self.appending_to_file = False

"""
This is the same module, except that il will launch directly a FIFO to
Wireshark rather than write the PCAP to a file
"""
def write(self, data):
self.stream.write(data)
self.stream.flush()

class WiresharkLive(PcapDumper):
def close(self):
pass

class ExternalAnalyze(PcapDumper):
def __init__(self, diag_input, reassemble_sibs, decrypt_nas, include_ip_traffic):

stdout_wrapper = StdoutWrapper(sys.stdout)
super().__init__(diag_input, stdout_wrapper, reassemble_sibs, decrypt_nas, include_ip_traffic)

class TsharkLive(PcapDumper):
def __init__(self, diag_input, reassemble_sibs, decrypt_nas, include_ip_traffic):
tshark = which('tshark')
if not tshark:
raise Exception('Could not find Tshark in $PATH')

tshark_pipe = Popen([tshark, '-i', '-'], stdin=PIPE)
tshark_pipe.stdin.appending_to_file = False

super().__init__(diag_input, tshark_pipe.stdin, reassemble_sibs, decrypt_nas, include_ip_traffic)

def __del__(self):
self.tshark_pipe.stdin.close()
self.tshark_pipe.wait()

class WiresharkLive(PcapDumper):
def __init__(self, diag_input, reassemble_sibs, decrypt_nas, include_ip_traffic):

wireshark = (
which('C:\\Program Files\\Wireshark\\Wireshark.exe') or
which('C:\\Program Files (x86)\\Wireshark\\Wireshark.exe') or
which('wireshark') or
which('wireshark-gtk')
)

if not wireshark:

raise Exception('Could not find Wireshark in $PATH')

if not IS_UNIX:

self.detach_process = None

wireshark_pipe = Popen([wireshark, '-k', '-i', '-'],
stdin = PIPE, stdout = DEVNULL, stderr = STDOUT,
preexec_fn = self.detach_process,
bufsize = 0
).stdin

wireshark_pipe.appending_to_file = False

super().__init__(diag_input, wireshark_pipe, reassemble_sibs, decrypt_nas, include_ip_traffic)

"""
Executed when we launch a Wireshark process, after fork()
"""

def detach_process(self):

try:

# Don't be hit by CTRL+C

setpgrp()

# Drop privileges if needed

uid, gid = getenv('SUDO_UID'), getenv('SUDO_GID')

if uid and gid:

uid, gid = int(uid), int(gid)

uid, gid = int(uid), int(gid)
setgroups(getgrouplist(getpwuid(uid).pw_name, gid))

setresgid(gid, gid, -1)

setresuid(uid, uid, -1)

except Exception:

print_exc()