diff --git a/fsspec/utils.py b/fsspec/utils.py index c7e3e3ddc..ee9c5f28d 100644 --- a/fsspec/utils.py +++ b/fsspec/utils.py @@ -627,3 +627,101 @@ def atomic_write(path: str, mode: str = "wb"): raise else: os.replace(fn, path) + + +def rsync( + source, + destination, + source_fs: AbstractFileSystem, + target_fs: AbstractFileSystem, + delete_missing=False, + source_field="size", + dest_field="size", + update_cond="different", + inst_kwargs=None, +) -> None: + """Sync files between two directory trees + + (experimental) + + Parameters + ---------- + source: str + Root of the directory tree to take files from. This must be a directory, but + do not include any terminating "/" character + destination: str + Root path to copy into. The contents of this location should be + identical to the contents of ``source`` when done. This will be made a + directory, and the terminal "/" should not be included. + delete_missing: bool + If there are paths in the destination that don't exist in the + source and this is True, delete them. Otherwise, leave them alone. + source_field: str | callable + If ``update_field`` is "different", this is the key in the info + of source files to consider for difference. Maybe a function of the + info dict. + dest_field: str | callable + If ``update_field`` is "different", this is the key in the info + of destination files to consider for difference. May be a function of + the info dict. + update_cond: "different"|"always"|"never" + If "always", every file is copied, regardless of whether it exists in + the destination. If "never", files that exist in the destination are + not copied again. If "different" (default), only copy if the info + fields given by ``source_field`` and ``dest_field`` (usually "size") + are different. Other comparisons may be added in the future. + inst_kwargs: dict|None + If ``fs`` is None, use this set of keyword arguments to make a + GenericFileSystem instance + fs: GenericFileSystem|None + Instance to use if explicitly given. The instance defines how to + to make downstream file system instances from paths. + """ + logger = logging.getLogger("fsspec.rsync") + source = source_fs._strip_protocol(source) + destination = target_fs._strip_protocol(destination) + allfiles = source_fs.find(source, withdirs=True, detail=True) + if not source_fs.isdir(source): + raise ValueError("Can only rsync on a directory") + otherfiles = target_fs.find(destination, withdirs=True, detail=True) + dirs = [ + a + for a, v in allfiles.items() + if v["type"] == "directory" and a.replace(source, destination) not in otherfiles + ] + logger.debug(f"{len(dirs)} directories to create") + if dirs: + for dirn in dirs: + target_fs.makedirs(dirn.replace(source, destination), exist_ok=True) + allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"} + logger.debug(f"{len(allfiles)} files to consider for copy") + to_delete = [ + o + for o, v in otherfiles.items() + if o.replace(destination, source) not in allfiles and v["type"] == "file" + ] + for k, v in allfiles.copy().items(): + otherfile = k.replace(source, destination) + if otherfile in otherfiles: + if update_cond == "always": + allfiles[k] = otherfile + elif update_cond == "different": + inf1 = source_field(v) if callable(source_field) else v[source_field] + v2 = otherfiles[otherfile] + inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field] + if inf1 != inf2: + # details mismatch, make copy + allfiles[k] = otherfile + else: + # details match, don't copy + allfiles.pop(k) + else: + # file not in target yet + allfiles[k] = otherfile + logger.debug(f"{len(allfiles)} files to copy") + if allfiles: + source_files, target_files = zip(*allfiles.items()) + source_fs.cp(source_files, target_files, **kwargs) + logger.debug(f"{len(to_delete)} files to delete") + if delete_missing: + target_fs.rm(to_delete)