Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions src/borg/fuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import struct
import sys
import tempfile
import threading
import time
from collections import defaultdict
from signal import SIGINT
Expand Down Expand Up @@ -155,17 +156,15 @@ def iter_archive_items(self, archive_item_ids, filter=None, consider_part_files=
last_chunk_length = 0
msgpacked_bytes = b''

write_offset = self.write_offset
meta = self.meta
pack_indirect_into = self.indirect_entry_struct.pack_into

for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)):
# Store the chunk ID in the meta-array
if write_offset + 32 >= len(meta):
self.meta = meta = meta + bytes(self.GROW_META_BY)
meta[write_offset:write_offset + 32] = key
current_id_offset = write_offset
write_offset += 32
if self.write_offset + 32 >= len(self.meta):
self.meta.extend(bytes(self.GROW_META_BY))
self.meta[self.write_offset:self.write_offset + 32] = key
current_id_offset = self.write_offset
self.write_offset += 32

chunk_begin += last_chunk_length
last_chunk_length = len(data)
Expand Down Expand Up @@ -199,8 +198,8 @@ def iter_archive_items(self, archive_item_ids, filter=None, consider_part_files=
current_spans_chunks = stream_offset - current_item_length < chunk_begin
msgpacked_bytes = b''

if write_offset + 9 >= len(meta):
self.meta = meta = meta + bytes(self.GROW_META_BY)
if self.write_offset + 9 >= len(self.meta):
self.meta.extend(bytes(self.GROW_META_BY))

# item entries in the meta-array come in two different flavours, both nine bytes long.
# (1) for items that span chunks:
Expand All @@ -221,19 +220,17 @@ def iter_archive_items(self, archive_item_ids, filter=None, consider_part_files=
if current_spans_chunks:
pos = self.fd.seek(0, io.SEEK_END)
self.fd.write(current_item)
meta[write_offset:write_offset + 9] = b'S' + pos.to_bytes(8, 'little')
self.meta[self.write_offset:self.write_offset + 9] = b'S' + pos.to_bytes(8, 'little')
self.direct_items += 1
else:
item_offset = stream_offset - current_item_length - chunk_begin
pack_indirect_into(meta, write_offset, b'I', write_offset - current_id_offset, item_offset)
pack_indirect_into(self.meta, self.write_offset, b'I', self.write_offset - current_id_offset, item_offset)
self.indirect_items += 1
inode = write_offset + self.offset
write_offset += 9
inode = self.write_offset + self.offset
self.write_offset += 9

yield inode, item

self.write_offset = write_offset


class FuseBackend:
"""Virtual filesystem based on archive(s) to provide information to fuse
Expand Down Expand Up @@ -269,6 +266,7 @@ def __init__(self, key, manifest, repository, args, decrypted_repository):
self.uid_forced = None
self.gid_forced = None
self.umask = 0
self.lock = threading.Lock()

def _create_filesystem(self):
self._create_dir(parent=1) # first call, create root dir (inode == 1)
Expand Down Expand Up @@ -304,9 +302,10 @@ def get_item(self, inode):

def check_pending_archive(self, inode):
# Check if this is an archive we need to load
archive_name = self.pending_archives.pop(inode, None)
if archive_name is not None:
self._process_archive(archive_name, [os.fsencode(archive_name)])
with self.lock:
archive_name = self.pending_archives.pop(inode, None)
if archive_name is not None:
self._process_archive(archive_name, [os.fsencode(archive_name)])

def _allocate_inode(self):
self.inode_count += 1
Expand Down
38 changes: 37 additions & 1 deletion src/borg/testsuite/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from datetime import timedelta
from hashlib import sha256
from io import BytesIO, StringIO
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from pathlib import Path

import pytest
Expand Down Expand Up @@ -2940,6 +2940,42 @@ def test_fuse_versions_view(self):
assert os.stat(hl2).st_ino == os.stat(hl3).st_ino
assert open(hl3, 'rb').read() == b'123456'

@unittest.skipUnless(llfuse, 'llfuse not installed')
def test_fuse_inode_collision_on_failure(self):
# Direct test of ItemCache to ensure write_offset is updated even on failure.
# This avoids issues with FUSE process isolation in tests.
from ..fuse import ItemCache
from ..item import Item

decrypted_repo = MagicMock()
item1 = Item(path='file1', mode=0o100644)
data1 = msgpack.packb(item1.as_dict())

decrypted_repo.get_many.side_effect = lambda ids: [(len(data1), data1)]

cache = ItemCache(decrypted_repo)
assert cache.write_offset == 0

# First call to iter_archive_items, simulate failure after 1st item
gen = cache.iter_archive_items([b'id1'])
inode, item = next(gen)
assert inode == 1000000 + 32 # 32 is chunk id offset

# At this point, write_offset should already be updated with the fix.
# Without the fix, it would be 0.
assert cache.write_offset == 32 + 9 # 32 for chunk id, 9 for item entry

# Even if we don't continue the generator (simulating a failure in the caller),
# the next call to iter_archive_items should start from the updated offset.

decrypted_repo.get_many.side_effect = lambda ids: [(len(data1), data1)]
gen2 = cache.iter_archive_items([b'id2'])
inode2, item2 = next(gen2)
# Without fix, inode2 would be 1000000 + 32 = 1000032 (COLLISION with inode).
# With fix, it should be 1000000 + (32 + 9) + 32 = 1000073.
assert inode2 == 1000000 + (32 + 9) + 32
assert inode2 > inode

@unittest.skipUnless(llfuse, 'llfuse not installed')
def test_fuse_allow_damaged_files(self):
self.cmd('init', '--encryption=repokey', self.repository_location)
Expand Down
Loading