Skip to content

FIX: Handles invalid TCK files, prevents infinite loop #1140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions nibabel/streamlines/tck.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ def is_correct_format(cls, fileobj):
otherwise returns False.
"""
with Opener(fileobj) as f:
magic_number = asstr(f.fobj.readline())
f.seek(-len(magic_number), os.SEEK_CUR)
magic_number = f.read(len(cls.MAGIC_NUMBER))
f.seek(-len(cls.MAGIC_NUMBER), os.SEEK_CUR)

return magic_number.strip() == cls.MAGIC_NUMBER
return asstr(magic_number) == cls.MAGIC_NUMBER

@classmethod
def create_empty_header(cls):
Expand Down Expand Up @@ -287,8 +287,8 @@ def _write_header(fileobj, header):
fileobj.write(asbytes(str(new_offset) + "\n"))
fileobj.write(asbytes("END\n"))

@staticmethod
def _read_header(fileobj):
@classmethod
def _read_header(cls, fileobj):
""" Reads a TCK header from a file.

Parameters
Expand All @@ -304,23 +304,56 @@ def _read_header(fileobj):
header : dict
Metadata associated with this tractogram file.
"""
# Record start position if this is a file-like object
start_position = fileobj.tell() if hasattr(fileobj, 'tell') else None

# Build header dictionary from the buffer
hdr = {}
offset_data = 0

with Opener(fileobj) as f:

# Record start position
start_position = f.tell()

# Make sure we are at the beginning of the file
f.seek(0, os.SEEK_SET)

# Read magic number
magic_number = f.fobj.readline().strip()
magic_number = f.read(len(cls.MAGIC_NUMBER))

if asstr(magic_number) != cls.MAGIC_NUMBER:
raise HeaderError(f"Invalid magic number: {magic_number}")

hdr[Field.MAGIC_NUMBER] = magic_number

# Read all key-value pairs contained in the header.
buf = asstr(f.fobj.readline())
while not buf.rstrip().endswith("END"):
buf += asstr(f.fobj.readline())
f.seek(1, os.SEEK_CUR) # Skip \n

found_end = False

# Read all key-value pairs contained in the header, stop at EOF
for n_line, line in enumerate(f, 1):
line = asstr(line).strip()

if not line: # Skip empty lines
continue

if line == "END": # End of the header
found_end = True
break

if ':' not in line: # Invalid header line
raise HeaderError(f"Invalid header (line {n_line}): {line}")

key, value = line.split(":", 1)
hdr[key.strip()] = value.strip()

if not found_end:
raise HeaderError("Missing END in the header.")

offset_data = f.tell()

# Build header dictionary from the buffer.
hdr = dict(item.split(': ') for item in buf.rstrip().split('\n')[:-1])
hdr[Field.MAGIC_NUMBER] = magic_number
# Set the file position where it was, in case it was previously open
if start_position is not None:
f.seek(start_position, os.SEEK_SET)

# Check integrity of TCK header.
if 'datatype' not in hdr:
Expand Down Expand Up @@ -352,10 +385,6 @@ def _read_header(fileobj):
# Keep the file position where the data begin.
hdr['_offset_data'] = int(hdr['file'].split()[1])

# Set the file position where it was, if it was previously open.
if start_position is not None:
fileobj.seek(start_position, os.SEEK_SET)

return hdr

@classmethod
Expand Down
27 changes: 27 additions & 0 deletions nibabel/streamlines/tests/test_tck.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def setup_module():
global DATA

DATA['empty_tck_fname'] = pjoin(data_path, "empty.tck")
DATA['no_magic_number_tck_fname'] = pjoin(data_path, "no_magic_number.tck")
DATA['no_header_end_tck_fname'] = pjoin(data_path, "no_header_end.tck")
DATA['no_header_end_eof_tck_fname'] = pjoin(data_path, "no_header_end_eof.tck")
# simple.tck contains only streamlines
DATA['simple_tck_fname'] = pjoin(data_path, "simple.tck")
DATA['simple_tck_big_endian_fname'] = pjoin(data_path,
Expand All @@ -50,6 +53,30 @@ def test_load_empty_file(self):
with pytest.warns(Warning) if lazy_load else error_warnings():
assert_tractogram_equal(tck.tractogram, DATA['empty_tractogram'])

def test_load_no_magic_number_file(self):
for lazy_load in [False, True]:
with pytest.raises(HeaderError):
TckFile.load(
DATA['no_magic_number_tck_fname'],
lazy_load=lazy_load
)

def test_load_no_header_end_file(self):
for lazy_load in [False, True]:
with pytest.raises(HeaderError):
TckFile.load(
DATA['no_header_end_tck_fname'],
lazy_load=lazy_load
)

def test_load_no_header_end_eof_file(self):
for lazy_load in [False, True]:
with pytest.raises(HeaderError):
TckFile.load(
DATA['no_header_end_eof_tck_fname'],
lazy_load=lazy_load
)

def test_load_simple_file(self):
for lazy_load in [False, True]:
tck = TckFile.load(DATA['simple_tck_fname'], lazy_load=lazy_load)
Expand Down
Binary file added nibabel/tests/data/no_header_end.tck
Binary file not shown.
4 changes: 4 additions & 0 deletions nibabel/tests/data/no_header_end_eof.tck
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mrtrix tracks
count: 0000000000
datatype: Float32LE
file: . 67
Binary file added nibabel/tests/data/no_magic_number.tck
Binary file not shown.