Skip to content

feat(datablocks): create tar files keeping the folder structure of nested folders #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions backend/archiver/utils/datablocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
from archiver.flows.utils import DatasetError, SystemError, StoragePaths


@log
def get_all_files_relative(folder) -> List[Path]:
relative_files = []
for i, j, k in os.walk(folder):
for f in k:
relative_files.append(Path(i).joinpath(f).relative_to(folder))
return relative_files


@log
def unpack_tarballs(src_folder: Path, dst_folder: Path):
if not any(Path(src_folder).iterdir()):
Expand All @@ -36,7 +45,7 @@ def unpack_tarballs(src_folder: Path, dst_folder: Path):


@dataclass
class TarInfo:
class ArchiveInfo:
unpackedSize: int
packedSize: int
path: Path
Expand All @@ -60,31 +69,33 @@ def partition_files_flat(folder: Path, target_size_bytes: int) -> Generator[List
part: List[Path] = []
size = 0
idx = 0
for file in folder.iterdir():
if size + file.stat().st_size > target_size_bytes:
yield part
part = []
size = 0
idx = idx + 1
part.append(file)
size = size + file.stat().st_size
for dirpath, dirnames, filenames in os.walk(folder):
for filename in filenames:
filepath = Path(os.path.join(dirpath, filename))
if size + os.path.getsize(filepath) > target_size_bytes:
yield part
part = []
size = 0
idx = idx + 1
part.append(filepath.relative_to(folder))
size = size + os.path.getsize(filepath)

yield part


@log
def create_tarfiles_flat(
def create_tarfiles(
dataset_id: str,
src_folder: Path,
dst_folder: Path,
target_size: int,
) -> List[TarInfo]:
"""Create datablocks, i.e. .tar.gz files, from files in a folder. Nested folders are not recursively. The created tar
files will be named according to the dataset they belong to.
) -> List[ArchiveInfo]:
"""Create datablocks, i.e. .tar.gz files, from all files in a folder. Folder structures are kept and symlnks not resolved.
The created tar files will be named according to the dataset they belong to.

Args:
dataset_id (str): dataset identifier
src_folder (Path): source fodler to find files to create tars from
src_folder (Path): source folder to find files to create tars from
dst_folder (Path): destination folder to write the tar files to
target_size (int, optional): Target size of the tar file. This is the unpacked size of the files.

Expand All @@ -93,22 +104,23 @@ def create_tarfiles_flat(
"""

# TODO: corner case: target size < file size
tarballs: List[TarInfo] = []
tar_name = dataset_id.replace("/", "--")
tarballs: List[ArchiveInfo] = []
tar_name = dataset_id.replace("/", "-")

if not any(Path(src_folder).iterdir()):
raise SystemError(f"Empty folder {src_folder} found.")

for files in partition_files_flat(src_folder, target_size):
current_tar_info = TarInfo(
current_tar_info = ArchiveInfo(
unpackedSize=0,
packedSize=0,
path=Path(dst_folder / Path(f"{tar_name}_{len(tarballs)}.tar.gz")),
)
current_tarfile: tarfile.TarFile = tarfile.open(current_tar_info.path, "w")
for file in files:
current_tar_info.unpackedSize += file.stat().st_size
current_tarfile.add(name=file, arcname=file.name, recursive=False)
for relative_file_path in files:
full_path = src_folder.joinpath(relative_file_path)
current_tar_info.unpackedSize += full_path.stat().st_size
current_tarfile.add(name=full_path, arcname=relative_file_path)
current_tarfile.close()
current_tar_info.packedSize = current_tar_info.path.stat().st_size
tarballs.append(current_tar_info)
Expand Down Expand Up @@ -233,7 +245,7 @@ def create_datablock_entries(
dataset_id: str,
folder: Path,
origDataBlocks: List[OrigDataBlock],
tar_infos: List[TarInfo],
tar_infos: List[ArchiveInfo],
) -> List[DataBlock]:
"""Create datablock entries compliant with schema provided by scicat

Expand Down Expand Up @@ -502,20 +514,20 @@ def create_datablocks(
datablocks_scratch_folder = StoragePaths.scratch_archival_datablocks_folder(dataset_id)
datablocks_scratch_folder.mkdir(parents=True, exist_ok=True)

tar_infos = create_tarfiles_flat(
archive_infos = create_tarfiles(
dataset_id=dataset_id,
src_folder=raw_files_scratch_folder,
dst_folder=datablocks_scratch_folder,
target_size=Variables().ARCHIVER_TARGET_SIZE_MB * 1024 * 1024,
)

getLogger().info(f"Created {len(tar_infos)} datablocks from {len(file_paths)} objects")
getLogger().info(f"Created {len(archive_infos)} datablocks from {len(file_paths)} objects")

datablocks = create_datablock_entries(
dataset_id,
StoragePaths.scratch_archival_datablocks_folder(dataset_id),
origDataBlocks,
tar_infos,
archive_infos,
)

uploaded_objects = upload_objects_to_s3(
Expand Down
90 changes: 50 additions & 40 deletions backend/archiver/utils/tests/test_datablocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from unittest.mock import patch

from archiver.flows.tests.helpers import mock_s3client
from archiver.utils.datablocks import TarInfo
from archiver.utils.datablocks import ArchiveInfo
import archiver.utils.datablocks as datablock_operations
from archiver.utils.model import OrigDataBlock, DataBlock, DataFile
from archiver.flows.utils import StoragePaths, SystemError
Expand All @@ -20,14 +20,16 @@


def create_raw_files_fixture(storage_paths_fixture, num_raw_files, file_size_in_bytes):
folder: Path = StoragePaths.scratch_archival_raw_files_folder(test_dataset_id)
prefix_folder = StoragePaths.scratch_archival_raw_files_folder(test_dataset_id)
relative_subfolders = Path("subfolder1") / "subfolder2"
folder = prefix_folder / relative_subfolders
folder.mkdir(parents=True, exist_ok=True)
for n in range(num_raw_files):
filename = folder / f"img_{n}.png"
with open(filename, "wb") as fout:
print(f"Creating file {filename}")
fout.write(os.urandom(file_size_in_bytes))
return folder
return prefix_folder


@pytest.fixture()
Expand All @@ -41,45 +43,46 @@ def dst_folder_fixture(storage_paths_fixture):


@pytest.mark.parametrize(
"target_size,num_raw_files,file_size_in_bytes,expected_num_compressed_files",
"target_size_in_bytes,num_raw_files,single_file_size_in_bytes,expected_num_compressed_files",
[
(2.5 * FILE_SIZE, 10, FILE_SIZE, 5), # total size > target size
(10 * FILE_SIZE, 10, FILE_SIZE, 1), # total size == target size
(11 * FILE_SIZE, 10, FILE_SIZE, 1), # total size < target size
],
)
def test_create_tarfiles_flat(
target_size: int,
def test_create_archives(
target_size_in_bytes: int,
num_raw_files,
file_size_in_bytes,
single_file_size_in_bytes,
expected_num_compressed_files,
dst_folder_fixture: Path,
storage_paths_fixture,
):
raw_files_path = create_raw_files_fixture(storage_paths_fixture, num_raw_files, file_size_in_bytes)
raw_files_path = create_raw_files_fixture(storage_paths_fixture, num_raw_files, single_file_size_in_bytes)

tar_infos = datablock_operations.create_tarfiles_flat(
tar_infos = datablock_operations.create_tarfiles(
str(test_dataset_id),
raw_files_path,
dst_folder_fixture,
target_size=target_size,
target_size=target_size_in_bytes,
)

for info in tar_infos:
assert info.unpackedSize >= file_size_in_bytes
assert info.unpackedSize <= num_raw_files / expected_num_compressed_files * file_size_in_bytes
assert info.packedSize >= file_size_in_bytes
assert info.packedSize <= target_size * 1.05 # a tarfile might be "a little" larger than
assert info.unpackedSize >= single_file_size_in_bytes
assert info.unpackedSize <= num_raw_files / expected_num_compressed_files * single_file_size_in_bytes
assert info.packedSize >= single_file_size_in_bytes
assert info.packedSize <= target_size_in_bytes * 1.05 # a tarfile might be "a little" larger than

tars = [t for t in dst_folder_fixture.iterdir()]
assert expected_num_compressed_files == len(tars)
assert len(tar_infos) == len(tars)
archive_files = [os.path.join(dst_folder_fixture, t) for t in dst_folder_fixture.iterdir()]
assert expected_num_compressed_files == len(archive_files)
assert len(tar_infos) == len(archive_files)

verify_tar_content(raw_files_path, dst_folder_fixture, tars)
verify_tar_content(raw_files_path, dst_folder_fixture, archive_files)


def verify_tar_content(raw_file_folder, datablock_folder, tars):
expected_files = set([t.name for t in raw_file_folder.iterdir()])
expected_files = set()
[expected_files.add(i) for i in datablock_operations.get_all_files_relative(raw_file_folder)]

num_expected_files = len(expected_files)

Expand All @@ -88,25 +91,25 @@ def verify_tar_content(raw_file_folder, datablock_folder, tars):
tar: tarfile.TarFile = tarfile.open(Path(datablock_folder) / t)
for f in tar.getnames():
num_packed_files = num_packed_files + 1
expected_files.discard(f)
expected_files.discard(Path(f))

assert num_packed_files == num_expected_files
assert len(expected_files) == 0


@pytest.fixture()
def tar_infos_fixture(storage_paths_fixture) -> List[TarInfo]:
def tar_infos_fixture(storage_paths_fixture) -> List[ArchiveInfo]:
folder = create_raw_files_fixture(storage_paths_fixture, 10, 1 * MB)
files = list(folder.iterdir())
files = datablock_operations.get_all_files_relative(folder)

tar_folder = StoragePaths.scratch_archival_datablocks_folder(test_dataset_id)
tar_folder.mkdir(parents=True)

assert len(files) > 2

tar_infos = [
TarInfo(unpackedSize=0, packedSize=0, path=Path("")),
TarInfo(unpackedSize=0, packedSize=0, path=Path("")),
ArchiveInfo(unpackedSize=0, packedSize=0, path=Path("")),
ArchiveInfo(unpackedSize=0, packedSize=0, path=Path("")),
]

tar1_path = tar_folder / "tar1.tar.gz"
Expand All @@ -115,11 +118,11 @@ def tar_infos_fixture(storage_paths_fixture) -> List[TarInfo]:

if not tar1_path.exists():
tar1 = tarfile.open(tar1_path, "w")
for f in files[:2]:
p = Path(folder) / f
for relative_path in files[:2]:
full_path = Path(folder) / relative_path

tar_infos[0].unpackedSize += p.stat().st_size
tar1.add(name=p, arcname=f.name, recursive=False)
tar_infos[0].unpackedSize += full_path.stat().st_size
tar1.add(name=full_path, arcname=relative_path, recursive=False)

tar1.close()

Expand All @@ -130,10 +133,10 @@ def tar_infos_fixture(storage_paths_fixture) -> List[TarInfo]:

if not tar2_path.exists():
tar2 = tarfile.open(tar2_path, "w")
for f in files[2:]:
p = Path(folder) / f
tar_infos[1].unpackedSize += p.stat().st_size
tar2.add(name=p, arcname=f.name, recursive=False)
for relative_path in files[2:]:
full_path = Path(folder) / relative_path
tar_infos[1].unpackedSize += full_path.stat().st_size
tar2.add(name=full_path, arcname=relative_path, recursive=False)
tar2.close()

tar_infos[1].packedSize = tar2_path.stat().st_size
Expand Down Expand Up @@ -297,7 +300,7 @@ def create_files_in_scratch(dataset: str):

def test_create_datablock_entries(
storage_paths_fixture,
tar_infos_fixture: List[TarInfo],
tar_infos_fixture: List[ArchiveInfo],
origDataBlocks_fixture: List[OrigDataBlock],
):
folder = create_raw_files_fixture(storage_paths_fixture, 10, 1 * MB)
Expand Down Expand Up @@ -436,14 +439,21 @@ def test_create_datablocks(
dataset_id = "testprefix/11.111"
folder = create_raw_files_fixture(storage_paths_fixture, 10, 10 * MB)

for raw_file in folder.iterdir():
StoragePaths.scratch_archival_raw_files_folder(dataset_id).mkdir(parents=True, exist_ok=True)
shutil.copy(
raw_file,
StoragePaths.scratch_archival_raw_files_folder(dataset_id) / Path(raw_file).name,
)
for dir, _, raw_file in os.walk(folder):
for file in raw_file:
full_path = Path(dir) / file
relative_folder = Path(dir).relative_to(folder)
StoragePaths.scratch_archival_raw_files_folder(dataset_id).joinpath(relative_folder).mkdir(
parents=True, exist_ok=True
)

shutil.copy(
full_path,
StoragePaths.scratch_archival_raw_files_folder(dataset_id)
.joinpath(relative_folder)
.joinpath(file),
)

# Act
datablocks = datablock_operations.create_datablocks(
mock_s3client(), dataset_id=dataset_id, origDataBlocks=origDataBlocks_fixture
)
Expand Down