Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions asdf/_block/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,21 @@ def read_blocks(fd, memmap=False, lazy_load=False, validate_checksums=False, aft
# load all blocks serially
return _read_blocks_serially(fd, memmap, lazy_load, validate_checksums, after_magic)

# try to find block index
# store starting offset
starting_offset = fd.tell()
magic_len = len(constants.BLOCK_MAGIC)

# we should be at the first block, have no blocks, or have padding
if not after_magic:
# seek until the first magic is found
try:
fd.seek_until(b"(" + constants.BLOCK_MAGIC + b")", magic_len)
except DelimiterNotFoundError:
fd.seek(starting_offset)
return []
first_block_offset = fd.tell() - magic_len

# try to find block index
index_offset = bio.find_block_index(fd, starting_offset)
if index_offset is None:
# if failed, load all blocks serially
Expand All @@ -218,21 +231,21 @@ def read_blocks(fd, memmap=False, lazy_load=False, validate_checksums=False, aft
warnings.warn(msg, AsdfBlockIndexWarning)
fd.seek(starting_offset)
return _read_blocks_serially(fd, memmap, lazy_load, validate_checksums, after_magic)
# skip magic for each block
magic_len = len(constants.BLOCK_MAGIC)
blocks = [ReadBlock(offset + magic_len, fd, memmap, lazy_load, validate_checksums) for offset in block_index]
try:
# load first and last blocks to check if the index looks correct
for index in (0, -1):
fd.seek(block_index[index])
buff = fd.read(magic_len)
if buff != constants.BLOCK_MAGIC:
msg = "Invalid block magic"
raise OSError(msg)
blocks[index].load()
except (OSError, ValueError) as e:
msg = f"Invalid block index contents for block {index}, falling back to serial reading: {e!s}"

# index says no blocks but we found block magic above, this so the index is wrong
if not len(block_index):
msg = "Failed to read block index, falling back to serial reading"
warnings.warn(msg, AsdfBlockIndexWarning)
fd.seek(starting_offset)
return _read_blocks_serially(fd, memmap, lazy_load, validate_checksums, after_magic)

# check that the offset for the first block matches
if block_index[0] != first_block_offset:
msg = "Invalid block index contents for block 0, falling back to serial reading"
warnings.warn(msg, AsdfBlockIndexWarning)
fd.seek(starting_offset)
return _read_blocks_serially(fd, memmap, lazy_load, after_magic)
return blocks

# skip magic for each block
fd.seek(starting_offset)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seek is here only to keep sphinx-asdf from crashing for asdf directives that show blocks. Once #2057 is merged we can remove this seek.

return [ReadBlock(offset + magic_len, fd, memmap, lazy_load, validate_checksums) for offset in block_index]
6 changes: 2 additions & 4 deletions asdf/_tests/_block/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ def test_read(tmp_path, lazy_load, memmap, with_index, validate_checksums, paddi
) as (fd, check):
r = read_blocks(fd, memmap=memmap, lazy_load=lazy_load, validate_checksums=validate_checksums)
if lazy_load and with_index and not streamed:
assert r[0].loaded
assert r[-1].loaded
for blk in r[1:-1]:
for blk in r:
assert not blk.loaded
# getting the header should load the block
blk.header
Expand Down Expand Up @@ -131,7 +129,7 @@ def test_read_post_padding_non_null_bytes():
check(read_blocks(fd))


@pytest.mark.parametrize("invalid_block_index", [0, 1, -1, "junk"])
@pytest.mark.parametrize("invalid_block_index", [0, "junk"])
def test_invalid_block_index(tmp_path, invalid_block_index):
fn = tmp_path / "test.bin"
with gen_blocks(fn=fn, with_index=True) as (fd, check):
Expand Down
2 changes: 1 addition & 1 deletion asdf/_tests/tags/core/tests/test_ndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def test_dont_load_data():
repr(ff.tree)

for block in ff._blocks.blocks:
assert callable(block._data)
assert not block.loaded


def test_table_inline():
Expand Down
2 changes: 1 addition & 1 deletion asdf/_tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ def test_array_access_after_file_close(tmp_path):
# the file has been closed:
with asdf.open(path) as af:
tree = af.tree
with pytest.raises(OSError, match=r"ASDF file has already been closed. Can not get the data."):
with pytest.raises(OSError, match=r"Attempt to load block from closed file"):
tree["data"][0]

# With memory mapping disabled and copying arrays enabled,
Expand Down
40 changes: 4 additions & 36 deletions asdf/_tests/test_array_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,15 +601,13 @@ def test_block_index():
buff.seek(0)
with asdf.open(buff) as ff2:
assert len(ff2._blocks.blocks) == 100
assert ff2._blocks.blocks[0].loaded
for i in range(2, 99):
for i in range(100):
assert not ff2._blocks.blocks[i].loaded
assert ff2._blocks.blocks[99].loaded

# Force the loading of one array
ff2.tree["arrays"][50] * 2

for i in range(2, 99):
for i in range(100):
if i == 50:
assert ff2._blocks.blocks[i].loaded
else:
Expand Down Expand Up @@ -646,7 +644,6 @@ def test_large_block_index():

buff.seek(0)
with asdf.open(buff) as ff2:
assert ff2._blocks.blocks[0].loaded
assert len(ff2._blocks.blocks) == narrays


Expand Down Expand Up @@ -711,37 +708,7 @@ def test_short_file_find_block_index():
assert ff._blocks.blocks[1].loaded


def test_invalid_block_index_values():
# This adds a value in the block index that points to something
# past the end of the file. In that case, we should just reject
# the index altogether.

buff = generic_io.get_file(io.BytesIO(), mode="w")

arrays = []
for i in range(10):
arrays.append(np.ones((8, 8)) * i)

tree = {"arrays": arrays}

ff = asdf.AsdfFile(tree)
ff.write_to(buff, include_block_index=True)
buff.seek(0)
offset = bio.find_block_index(buff)
buff.seek(offset)
block_index = bio.read_block_index(buff)
block_index.append(123456789)
buff.seek(offset)
bio.write_block_index(buff, block_index)

buff.seek(0)
with pytest.warns(AsdfBlockIndexWarning, match="Invalid block index contents"):
with asdf.open(buff) as ff:
assert len(ff._blocks.blocks) == 10
assert ff._blocks.blocks[1].loaded


@pytest.mark.parametrize("block_index_index", [0, -1])
@pytest.mark.parametrize("block_index_index", [0])
def test_invalid_block_index_offset(block_index_index):
"""
This adds a value in the block index that points to something
Expand Down Expand Up @@ -994,6 +961,7 @@ def test_open_memmap_from_closed_file(tmp_path):
with pytest.raises(OSError, match=msg):
view[:]

msg = r"Attempt to load block from closed file"
with pytest.raises(OSError, match=msg):
base2[:]

Expand Down
1 change: 1 addition & 0 deletions changes/2056.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change block index parsing to only check the first block header. This improves IO performance (especially for remote files) by removing several file seeks and reads.
Loading