Skip to content

Commit d266b2f

Browse files
Merge pull request #8531 from ThomasWaldmann/chunkindex
chunk index cache: use cache/chunks.<HASH>, see #8503
2 parents 35c83df + 56493fc commit d266b2f

File tree

4 files changed

+109
-54
lines changed

4 files changed

+109
-54
lines changed

src/borg/archive.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from . import xattr
2424
from .chunker import get_chunker, Chunk
25-
from .cache import ChunkListEntry, build_chunkindex_from_repo
25+
from .cache import ChunkListEntry, build_chunkindex_from_repo, delete_chunkindex_cache
2626
from .crypto.key import key_factory, UnsupportedPayloadError
2727
from .compress import CompressionSpec
2828
from .constants import * # NOQA
@@ -50,7 +50,7 @@
5050
from .item import Item, ArchiveItem, ItemDiff
5151
from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
5252
from .remote import RemoteRepository, cache_if_remote
53-
from .repository import Repository, NoManifestError, StoreObjectNotFound
53+
from .repository import Repository, NoManifestError
5454
from .repoobj import RepoObj
5555

5656
has_link = hasattr(os, "link")
@@ -2140,18 +2140,9 @@ def valid_item(obj):
21402140

21412141
def finish(self):
21422142
if self.repair:
2143+
# we may have deleted chunks, remove the chunks index cache!
21432144
logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.")
2144-
# we may have deleted chunks, invalidate/remove the chunks index cache!
2145-
try:
2146-
self.repository.store_delete("cache/chunks_hash")
2147-
except (Repository.ObjectNotFound, StoreObjectNotFound):
2148-
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
2149-
pass
2150-
try:
2151-
self.repository.store_delete("cache/chunks")
2152-
except (Repository.ObjectNotFound, StoreObjectNotFound):
2153-
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
2154-
pass
2145+
delete_chunkindex_cache(self.repository)
21552146
logger.info("Writing Manifest.")
21562147
self.manifest.write()
21572148

src/borg/archiver/compact_cmd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def save_chunk_index(self):
6565
# as we put the wrong size in there, we need to clean up the size:
6666
self.chunks[id] = entry._replace(size=0)
6767
# now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
68-
write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True)
68+
write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True, delete_other=True)
6969
self.chunks = None # nothing there (cleared!)
7070

7171
def analyze_archives(self) -> Tuple[Set, Set, int, int, int]:

src/borg/cache.py

Lines changed: 103 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
files_cache_logger = create_logger("borg.debug.files_cache")
1515

16+
from borgstore.store import ItemInfo
17+
1618
from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM, TIME_DIFFERS2_NS
1719
from .checksums import xxh64
1820
from .hashindex import ChunkIndex, ChunkIndexEntry
@@ -663,63 +665,125 @@ def memorize_file(self, hashed_path, path_hash, st, chunks):
663665
)
664666

665667

666-
def load_chunks_hash(repository) -> bytes:
667-
try:
668-
hash = repository.store_load("cache/chunks_hash")
669-
logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
670-
except (Repository.ObjectNotFound, StoreObjectNotFound):
671-
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
672-
hash = b""
673-
logger.debug("cache/chunks_hash missing!")
674-
return hash
668+
def try_upgrade_to_b14(repository):
669+
# TODO: remove this before 2.0.0 release
670+
# we just delete any present chunk index cache here, it is invalid due to the
671+
# refcount -> flags change we did and due to the different CHUNKINDEX_HASH_SEED.
672+
for name in "chunks_hash", "chunks":
673+
try:
674+
repository.store_delete(f"cache/{name}")
675+
except (Repository.ObjectNotFound, StoreObjectNotFound):
676+
pass # likely already upgraded
677+
678+
679+
def list_chunkindex_hashes(repository):
680+
hashes = []
681+
for info in repository.store_list("cache"):
682+
info = ItemInfo(*info) # RPC does not give namedtuple
683+
if info.name.startswith("chunks."):
684+
hash = info.name.removeprefix("chunks.")
685+
hashes.append(hash)
686+
hashes = sorted(hashes)
687+
logger.debug(f"cached chunk indexes: {hashes}")
688+
return hashes
689+
690+
691+
def delete_chunkindex_cache(repository):
692+
hashes = list_chunkindex_hashes(repository)
693+
for hash in hashes:
694+
cache_name = f"cache/chunks.{hash}"
695+
try:
696+
repository.store_delete(cache_name)
697+
except (Repository.ObjectNotFound, StoreObjectNotFound):
698+
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
699+
pass
700+
logger.debug(f"cached chunk indexes deleted: {hashes}")
675701

676702

677703
CHUNKINDEX_HASH_SEED = 2
678704

679705

680-
def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_write=False):
681-
cached_hash = load_chunks_hash(repository)
706+
def write_chunkindex_to_repo_cache(
707+
repository, chunks, *, clear=False, force_write=False, delete_other=False, delete_these=None
708+
):
709+
cached_hashes = list_chunkindex_hashes(repository)
682710
with io.BytesIO() as f:
683711
chunks.write(f)
684712
data = f.getvalue()
685713
if clear:
686714
# if we don't need the in-memory chunks index anymore:
687715
chunks.clear() # free memory, immediately
688-
new_hash = xxh64(data, seed=CHUNKINDEX_HASH_SEED)
689-
if force_write or new_hash != cached_hash:
690-
# when an updated chunks index is stored into the cache, we also store its hash into the cache.
716+
new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED))
717+
if force_write or new_hash not in cached_hashes:
718+
# when an updated chunks index is stored into the cache, we also store its hash as part of the name.
691719
# when a client is loading the chunks index from a cache, it has to compare its xxh64
692-
# hash against cache/chunks_hash in the repository. if it is the same, the cache
693-
# is valid. If it is different, the cache is either corrupted or out of date and
694-
# has to be discarded.
695-
# when some functionality is DELETING chunks from the repository, it has to either update
696-
# both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both,
720+
# hash against the hash in its name. if it is the same, the cache is valid.
721+
# if it is different, the cache is either corrupted or out of date and has to be discarded.
722+
# when some functionality is DELETING chunks from the repository, it has to delete
723+
# all existing cache/chunks.* and maybe write a new, valid cache/chunks.<hash>,
697724
# so that all clients will discard any client-local chunks index caches.
698-
logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
699-
repository.store_store("cache/chunks", data)
700-
repository.store_store("cache/chunks_hash", new_hash)
725+
cache_name = f"cache/chunks.{new_hash}"
726+
logger.debug(f"caching chunks index as {cache_name} in repository...")
727+
repository.store_store(cache_name, data)
728+
if delete_other:
729+
delete_these = cached_hashes
730+
elif delete_these:
731+
pass
732+
else:
733+
delete_these = []
734+
for hash in delete_these:
735+
cache_name = f"cache/chunks.{hash}"
736+
try:
737+
repository.store_delete(cache_name)
738+
except (Repository.ObjectNotFound, StoreObjectNotFound):
739+
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
740+
pass
741+
if delete_these:
742+
logger.debug(f"cached chunk indexes deleted: {delete_these}")
701743
return new_hash
702744

703745

746+
def read_chunkindex_from_repo_cache(repository, hash):
747+
cache_name = f"cache/chunks.{hash}"
748+
logger.debug(f"trying to load {cache_name} from the repo...")
749+
try:
750+
chunks_data = repository.store_load(cache_name)
751+
except (Repository.ObjectNotFound, StoreObjectNotFound):
752+
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
753+
logger.debug(f"{cache_name} not found in the repository.")
754+
else:
755+
if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == hex_to_bin(hash):
756+
logger.debug(f"{cache_name} is valid.")
757+
with io.BytesIO(chunks_data) as f:
758+
chunks = ChunkIndex.read(f)
759+
return chunks
760+
else:
761+
logger.debug(f"{cache_name} is invalid.")
762+
763+
704764
def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
705-
chunks = None
706-
# first, try to load a pre-computed and centrally cached chunks index:
765+
try_upgrade_to_b14(repository)
766+
# first, try to build a fresh, mostly complete chunk index from centrally cached chunk indexes:
707767
if not disable_caches:
708-
wanted_hash = load_chunks_hash(repository)
709-
logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...")
710-
try:
711-
chunks_data = repository.store_load("cache/chunks")
712-
except (Repository.ObjectNotFound, StoreObjectNotFound):
713-
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
714-
logger.debug("cache/chunks not found in the repository.")
715-
else:
716-
if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == wanted_hash:
717-
logger.debug("cache/chunks is valid.")
718-
with io.BytesIO(chunks_data) as f:
719-
chunks = ChunkIndex.read(f)
768+
hashes = list_chunkindex_hashes(repository)
769+
if hashes: # we have at least one cached chunk index!
770+
merged = 0
771+
chunks = ChunkIndex() # we'll merge all we find into this
772+
for hash in hashes:
773+
chunks_to_merge = read_chunkindex_from_repo_cache(repository, hash)
774+
if chunks_to_merge is not None:
775+
logger.debug(f"cached chunk index {hash} gets merged...")
776+
for k, v in chunks_to_merge.items():
777+
chunks[k] = v
778+
merged += 1
779+
chunks_to_merge.clear()
780+
if merged > 0:
781+
if merged > 1 and cache_immediately:
782+
# immediately update cache/chunks, so we don't have to merge these again:
783+
write_chunkindex_to_repo_cache(
784+
repository, chunks, clear=False, force_write=True, delete_these=hashes
785+
)
720786
return chunks
721-
else:
722-
logger.debug("cache/chunks is invalid.")
723787
# if we didn't get anything from the cache, compute the ChunkIndex the slow way:
724788
logger.debug("querying the chunk IDs list from the repo...")
725789
chunks = ChunkIndex()
@@ -741,7 +805,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
741805
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s")
742806
if cache_immediately:
743807
# immediately update cache/chunks, so we only rarely have to do it the slow way:
744-
write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True)
808+
write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True)
745809
return chunks
746810

747811

src/borg/repository.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ def check_object(obj):
385385
# if we did a full pass in one go, we built a complete, uptodate ChunkIndex, cache it!
386386
from .cache import write_chunkindex_to_repo_cache
387387

388-
write_chunkindex_to_repo_cache(self, chunks, clear=True, force_write=True)
388+
write_chunkindex_to_repo_cache(self, chunks, clear=True, force_write=True, delete_other=True)
389389
except StoreObjectNotFound:
390390
# it can be that there is no "data/" at all, then it crashes when iterating infos.
391391
pass

0 commit comments

Comments
 (0)