Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

## v0.3.*

### v0.3.2 - 2025-08-21

**Bugfix**

[#9](https://github.com/p2p-ld/torrent-models/pull/9) - Fixed incorrectly finding start of piece
using modulo rather than subtraction, which could cause duplicate/incorrect ranges
in the cae that e.g. multiple files that were exactly piece length were at the start of a torrent.

### v0.3.1 - 2025-08-04

[#8](https://github.com/p2p-ld/torrent-models/pull/8)
Expand Down
2 changes: 1 addition & 1 deletion src/torrent_models/torrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def v1_piece_range(self, piece_idx: int) -> V1PieceRange:
if file.length + size_idx > start_range:
# range starts in this file
# create the range from the first file
file_range_start = start_range % size_idx if size_idx > 0 else start_range
file_range_start = start_range - size_idx
file_range_end = min(file.length, file_range_start + self.info.piece_length)
found_len += file_range_end - file_range_start
ranges.append(
Expand Down
63 changes: 47 additions & 16 deletions tests/test_torrent/test_piece_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,43 @@

from torrent_models import KiB, TorrentCreate, TorrentVersion
from torrent_models.const import EXCLUDE_FILES
from torrent_models.hashing.v1 import sort_v1

SIZES = [10 * KiB, 20 * KiB, 32 * KiB, 40 * KiB, 100 * KiB]


@pytest.fixture(params=SIZES)
def file_size(request: pytest.FixtureRequest, tmp_path: Path) -> int:
"""Create a set of files that are smaller than, equal to, and larger than a 32 KiB piece size"""
@pytest.fixture(params=[*SIZES, "multi"])
def random_data(tmp_path: Path, request: pytest.FixtureRequest) -> list[int]:
"""
Create a set of files that are smaller than, equal to, and larger than a 32 KiB piece size

handle the case of all the same size (e.g. to test cases with repeated piece alignment vs not,)
as well as random samples of all sizes
"""

size = request.param
for name in string.ascii_letters[0:10]:
with open(tmp_path / name, "wb") as f:
sizes = []
for _ in range(10):
if request.param == "multi":
sizes.extend(list(random.sample(SIZES, k=len(SIZES))))
else:
sizes.extend([request.param] * len(SIZES))
for i, size in enumerate(sizes):
with open(tmp_path / (string.ascii_letters[i] + str(i)), "wb") as f:
f.write(random.randbytes(size))
return size
return sizes


@pytest.mark.parametrize("version", [TorrentVersion.v1, TorrentVersion.hybrid])
def test_v1_piece_range(file_size: int, version: TorrentVersion, tmp_path: Path):
def test_v1_piece_range(random_data: list[int], version: TorrentVersion, tmp_path: Path):
"""
We can get piece ranges from v1 torrents and validate data against them
"""
files = [p for p in tmp_path.iterdir() if p.name not in EXCLUDE_FILES]
assert len(files) == 10
assert all([(tmp_path / p).stat().st_size == file_size for p in files])

create = TorrentCreate(paths=files, path_root=tmp_path, piece_length=32 * KiB)
torrent = create.generate(version=version)
assert len(random_data) == len(torrent.files)

seen_files = set()
for i, piece in enumerate(torrent.info.pieces):
range = torrent.v1_piece_range(i)
Expand All @@ -55,25 +66,23 @@ def test_v1_piece_range(file_size: int, version: TorrentVersion, tmp_path: Path)
fake_data = [random.randbytes(len(d)) for d in data]
assert not range.validate_data(fake_data)

assert seen_files == {letter for letter in string.ascii_letters[0:10]}
assert seen_files == {f.name for f in files}


@pytest.mark.parametrize("version", [TorrentVersion.v2, TorrentVersion.hybrid])
def test_v2_piece_range(file_size: int, version: TorrentVersion, tmp_path: Path):
def test_v2_piece_range(random_data: list[int], version: TorrentVersion, tmp_path: Path):
"""
We can get piece ranges from v2 torrents and validate data against them
"""
files = [p for p in tmp_path.iterdir() if p.name not in EXCLUDE_FILES]
assert len(files) == 10
assert all([(tmp_path / p).stat().st_size == file_size for p in files])

create = TorrentCreate(paths=files, path_root=tmp_path, piece_length=32 * KiB)
torrent = create.generate(version=version)
assert set(torrent.flat_files.keys()) == {letter for letter in string.ascii_letters[0:10]}
assert len(random_data) == len(torrent.files)
for path, file_info in torrent.flat_files.items():
root = file_info["pieces root"]
n_pieces = 1 if root not in torrent.piece_layers else len(torrent.piece_layers[root])
assert n_pieces == ceil(file_size / (32 * KiB))
assert n_pieces == ceil(file_info["length"] / (32 * KiB))
for piece_idx in range(n_pieces):
piece_range = torrent.v2_piece_range(path, piece_idx)
assert piece_range.range_start == piece_idx * 32 * KiB
Expand All @@ -87,3 +96,25 @@ def test_v2_piece_range(file_size: int, version: TorrentVersion, tmp_path: Path)
# reject random data in the right shape
data = [random.randbytes(len(d)) for d in data]
assert not piece_range.validate_data(data)


def test_v1_piece_ranges_sequential(random_data: list[int], tmp_path: Path):
"""
Test that our piece ranges are what we expect from v1 files laid end to end,
particularly no repeats.
"""
files = sort_v1([p for p in tmp_path.iterdir() if p.name not in EXCLUDE_FILES])
create = TorrentCreate(paths=files, path_root=tmp_path, piece_length=32 * KiB)
torrent = create.generate(version="v1")
assert len(random_data) == len(torrent.files)

for left, right in zip(
range(0, len(torrent.info.pieces) - 1), range(1, len(torrent.info.pieces))
):
left_piece = torrent.v1_piece_range(left)
right_piece = torrent.v1_piece_range(right)
if left_piece.ranges[-1].path == right_piece.ranges[0].path:
assert left_piece.ranges[-1].range_end == right_piece.ranges[0].range_start
else:
assert left_piece.ranges[-1].range_end == left_piece.ranges[-1].length
assert right_piece.ranges[0].range_start == 0
Loading