diff --git a/backend/archiver/utils/datablocks.py b/backend/archiver/utils/datablocks.py index b8514e4b..76c57d67 100644 --- a/backend/archiver/utils/datablocks.py +++ b/backend/archiver/utils/datablocks.py @@ -18,6 +18,15 @@ from archiver.flows.utils import DatasetError, SystemError, StoragePaths +@log +def get_all_files_relative(folder) -> List[Path]: + relative_files = [] + for i, j, k in os.walk(folder): + for f in k: + relative_files.append(Path(i).joinpath(f).relative_to(folder)) + return relative_files + + @log def unpack_tarballs(src_folder: Path, dst_folder: Path): if not any(Path(src_folder).iterdir()): @@ -36,7 +45,7 @@ def unpack_tarballs(src_folder: Path, dst_folder: Path): @dataclass -class TarInfo: +class ArchiveInfo: unpackedSize: int packedSize: int path: Path @@ -60,31 +69,33 @@ def partition_files_flat(folder: Path, target_size_bytes: int) -> Generator[List part: List[Path] = [] size = 0 idx = 0 - for file in folder.iterdir(): - if size + file.stat().st_size > target_size_bytes: - yield part - part = [] - size = 0 - idx = idx + 1 - part.append(file) - size = size + file.stat().st_size + for dirpath, dirnames, filenames in os.walk(folder): + for filename in filenames: + filepath = Path(os.path.join(dirpath, filename)) + if size + os.path.getsize(filepath) > target_size_bytes: + yield part + part = [] + size = 0 + idx = idx + 1 + part.append(filepath.relative_to(folder)) + size = size + os.path.getsize(filepath) yield part @log -def create_tarfiles_flat( +def create_tarfiles( dataset_id: str, src_folder: Path, dst_folder: Path, target_size: int, -) -> List[TarInfo]: - """Create datablocks, i.e. .tar.gz files, from files in a folder. Nested folders are not recursively. The created tar - files will be named according to the dataset they belong to. +) -> List[ArchiveInfo]: + """Create datablocks, i.e. .tar.gz files, from all files in a folder. Folder structures are kept and symlnks not resolved. + The created tar files will be named according to the dataset they belong to. Args: dataset_id (str): dataset identifier - src_folder (Path): source fodler to find files to create tars from + src_folder (Path): source folder to find files to create tars from dst_folder (Path): destination folder to write the tar files to target_size (int, optional): Target size of the tar file. This is the unpacked size of the files. @@ -93,22 +104,23 @@ def create_tarfiles_flat( """ # TODO: corner case: target size < file size - tarballs: List[TarInfo] = [] - tar_name = dataset_id.replace("/", "--") + tarballs: List[ArchiveInfo] = [] + tar_name = dataset_id.replace("/", "-") if not any(Path(src_folder).iterdir()): raise SystemError(f"Empty folder {src_folder} found.") for files in partition_files_flat(src_folder, target_size): - current_tar_info = TarInfo( + current_tar_info = ArchiveInfo( unpackedSize=0, packedSize=0, path=Path(dst_folder / Path(f"{tar_name}_{len(tarballs)}.tar.gz")), ) current_tarfile: tarfile.TarFile = tarfile.open(current_tar_info.path, "w") - for file in files: - current_tar_info.unpackedSize += file.stat().st_size - current_tarfile.add(name=file, arcname=file.name, recursive=False) + for relative_file_path in files: + full_path = src_folder.joinpath(relative_file_path) + current_tar_info.unpackedSize += full_path.stat().st_size + current_tarfile.add(name=full_path, arcname=relative_file_path) current_tarfile.close() current_tar_info.packedSize = current_tar_info.path.stat().st_size tarballs.append(current_tar_info) @@ -233,7 +245,7 @@ def create_datablock_entries( dataset_id: str, folder: Path, origDataBlocks: List[OrigDataBlock], - tar_infos: List[TarInfo], + tar_infos: List[ArchiveInfo], ) -> List[DataBlock]: """Create datablock entries compliant with schema provided by scicat @@ -505,20 +517,20 @@ def create_datablocks( datablocks_scratch_folder = StoragePaths.scratch_archival_datablocks_folder(dataset_id) datablocks_scratch_folder.mkdir(parents=True, exist_ok=True) - tar_infos = create_tarfiles_flat( + archive_infos = create_tarfiles( dataset_id=dataset_id, src_folder=raw_files_scratch_folder, dst_folder=datablocks_scratch_folder, target_size=Variables().ARCHIVER_TARGET_SIZE_MB * 1024 * 1024, ) - getLogger().info(f"Created {len(tar_infos)} datablocks from {len(file_paths)} objects") + getLogger().info(f"Created {len(archive_infos)} datablocks from {len(file_paths)} objects") datablocks = create_datablock_entries( dataset_id, StoragePaths.scratch_archival_datablocks_folder(dataset_id), origDataBlocks, - tar_infos, + archive_infos, ) uploaded_objects = upload_objects_to_s3( diff --git a/backend/archiver/utils/tests/test_datablocks.py b/backend/archiver/utils/tests/test_datablocks.py index ccf6d00c..c3774e8f 100644 --- a/backend/archiver/utils/tests/test_datablocks.py +++ b/backend/archiver/utils/tests/test_datablocks.py @@ -9,7 +9,7 @@ from unittest.mock import patch from archiver.flows.tests.helpers import mock_s3client -from archiver.utils.datablocks import TarInfo +from archiver.utils.datablocks import ArchiveInfo import archiver.utils.datablocks as datablock_operations from archiver.utils.model import OrigDataBlock, DataBlock, DataFile from archiver.flows.utils import StoragePaths, SystemError @@ -20,14 +20,16 @@ def create_raw_files_fixture(storage_paths_fixture, num_raw_files, file_size_in_bytes): - folder: Path = StoragePaths.scratch_archival_raw_files_folder(test_dataset_id) + prefix_folder = StoragePaths.scratch_archival_raw_files_folder(test_dataset_id) + relative_subfolders = Path("subfolder1") / "subfolder2" + folder = prefix_folder / relative_subfolders folder.mkdir(parents=True, exist_ok=True) for n in range(num_raw_files): filename = folder / f"img_{n}.png" with open(filename, "wb") as fout: print(f"Creating file {filename}") fout.write(os.urandom(file_size_in_bytes)) - return folder + return prefix_folder @pytest.fixture() @@ -41,45 +43,46 @@ def dst_folder_fixture(storage_paths_fixture): @pytest.mark.parametrize( - "target_size,num_raw_files,file_size_in_bytes,expected_num_compressed_files", + "target_size_in_bytes,num_raw_files,single_file_size_in_bytes,expected_num_compressed_files", [ (2.5 * FILE_SIZE, 10, FILE_SIZE, 5), # total size > target size (10 * FILE_SIZE, 10, FILE_SIZE, 1), # total size == target size (11 * FILE_SIZE, 10, FILE_SIZE, 1), # total size < target size ], ) -def test_create_tarfiles_flat( - target_size: int, +def test_create_archives( + target_size_in_bytes: int, num_raw_files, - file_size_in_bytes, + single_file_size_in_bytes, expected_num_compressed_files, dst_folder_fixture: Path, storage_paths_fixture, ): - raw_files_path = create_raw_files_fixture(storage_paths_fixture, num_raw_files, file_size_in_bytes) + raw_files_path = create_raw_files_fixture(storage_paths_fixture, num_raw_files, single_file_size_in_bytes) - tar_infos = datablock_operations.create_tarfiles_flat( + tar_infos = datablock_operations.create_tarfiles( str(test_dataset_id), raw_files_path, dst_folder_fixture, - target_size=target_size, + target_size=target_size_in_bytes, ) for info in tar_infos: - assert info.unpackedSize >= file_size_in_bytes - assert info.unpackedSize <= num_raw_files / expected_num_compressed_files * file_size_in_bytes - assert info.packedSize >= file_size_in_bytes - assert info.packedSize <= target_size * 1.05 # a tarfile might be "a little" larger than + assert info.unpackedSize >= single_file_size_in_bytes + assert info.unpackedSize <= num_raw_files / expected_num_compressed_files * single_file_size_in_bytes + assert info.packedSize >= single_file_size_in_bytes + assert info.packedSize <= target_size_in_bytes * 1.05 # a tarfile might be "a little" larger than - tars = [t for t in dst_folder_fixture.iterdir()] - assert expected_num_compressed_files == len(tars) - assert len(tar_infos) == len(tars) + archive_files = [os.path.join(dst_folder_fixture, t) for t in dst_folder_fixture.iterdir()] + assert expected_num_compressed_files == len(archive_files) + assert len(tar_infos) == len(archive_files) - verify_tar_content(raw_files_path, dst_folder_fixture, tars) + verify_tar_content(raw_files_path, dst_folder_fixture, archive_files) def verify_tar_content(raw_file_folder, datablock_folder, tars): - expected_files = set([t.name for t in raw_file_folder.iterdir()]) + expected_files = set() + [expected_files.add(i) for i in datablock_operations.get_all_files_relative(raw_file_folder)] num_expected_files = len(expected_files) @@ -88,16 +91,16 @@ def verify_tar_content(raw_file_folder, datablock_folder, tars): tar: tarfile.TarFile = tarfile.open(Path(datablock_folder) / t) for f in tar.getnames(): num_packed_files = num_packed_files + 1 - expected_files.discard(f) + expected_files.discard(Path(f)) assert num_packed_files == num_expected_files assert len(expected_files) == 0 @pytest.fixture() -def tar_infos_fixture(storage_paths_fixture) -> List[TarInfo]: +def tar_infos_fixture(storage_paths_fixture) -> List[ArchiveInfo]: folder = create_raw_files_fixture(storage_paths_fixture, 10, 1 * MB) - files = list(folder.iterdir()) + files = datablock_operations.get_all_files_relative(folder) tar_folder = StoragePaths.scratch_archival_datablocks_folder(test_dataset_id) tar_folder.mkdir(parents=True) @@ -105,8 +108,8 @@ def tar_infos_fixture(storage_paths_fixture) -> List[TarInfo]: assert len(files) > 2 tar_infos = [ - TarInfo(unpackedSize=0, packedSize=0, path=Path("")), - TarInfo(unpackedSize=0, packedSize=0, path=Path("")), + ArchiveInfo(unpackedSize=0, packedSize=0, path=Path("")), + ArchiveInfo(unpackedSize=0, packedSize=0, path=Path("")), ] tar1_path = tar_folder / "tar1.tar.gz" @@ -115,11 +118,11 @@ def tar_infos_fixture(storage_paths_fixture) -> List[TarInfo]: if not tar1_path.exists(): tar1 = tarfile.open(tar1_path, "w") - for f in files[:2]: - p = Path(folder) / f + for relative_path in files[:2]: + full_path = Path(folder) / relative_path - tar_infos[0].unpackedSize += p.stat().st_size - tar1.add(name=p, arcname=f.name, recursive=False) + tar_infos[0].unpackedSize += full_path.stat().st_size + tar1.add(name=full_path, arcname=relative_path, recursive=False) tar1.close() @@ -130,10 +133,10 @@ def tar_infos_fixture(storage_paths_fixture) -> List[TarInfo]: if not tar2_path.exists(): tar2 = tarfile.open(tar2_path, "w") - for f in files[2:]: - p = Path(folder) / f - tar_infos[1].unpackedSize += p.stat().st_size - tar2.add(name=p, arcname=f.name, recursive=False) + for relative_path in files[2:]: + full_path = Path(folder) / relative_path + tar_infos[1].unpackedSize += full_path.stat().st_size + tar2.add(name=full_path, arcname=relative_path, recursive=False) tar2.close() tar_infos[1].packedSize = tar2_path.stat().st_size @@ -297,7 +300,7 @@ def create_files_in_scratch(dataset: str): def test_create_datablock_entries( storage_paths_fixture, - tar_infos_fixture: List[TarInfo], + tar_infos_fixture: List[ArchiveInfo], origDataBlocks_fixture: List[OrigDataBlock], ): folder = create_raw_files_fixture(storage_paths_fixture, 10, 1 * MB) @@ -436,14 +439,21 @@ def test_create_datablocks( dataset_id = "testprefix/11.111" folder = create_raw_files_fixture(storage_paths_fixture, 10, 10 * MB) - for raw_file in folder.iterdir(): - StoragePaths.scratch_archival_raw_files_folder(dataset_id).mkdir(parents=True, exist_ok=True) - shutil.copy( - raw_file, - StoragePaths.scratch_archival_raw_files_folder(dataset_id) / Path(raw_file).name, - ) + for dir, _, raw_file in os.walk(folder): + for file in raw_file: + full_path = Path(dir) / file + relative_folder = Path(dir).relative_to(folder) + StoragePaths.scratch_archival_raw_files_folder(dataset_id).joinpath(relative_folder).mkdir( + parents=True, exist_ok=True + ) + + shutil.copy( + full_path, + StoragePaths.scratch_archival_raw_files_folder(dataset_id) + .joinpath(relative_folder) + .joinpath(file), + ) - # Act datablocks = datablock_operations.create_datablocks( mock_s3client(), dataset_id=dataset_id, origDataBlocks=origDataBlocks_fixture )