Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions vandockit/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ def _validate(path):
return (validator.get_summary_msg(), validator.has_errors())


def _convert(source_path, dest_path):
def _convert(source_path, dest_path, **kwargs):
try:
converter = PackageConverter(source_path)
converter.convert(dest_path)
converter.convert(dest_path, **kwargs)
except Exception as exception:
# Log and re-raise exceptions
logging.critical(exception)
Expand Down Expand Up @@ -145,7 +145,14 @@ def validate(path):
"source_path", type=click.Path(exists=True, file_okay=False, readable=True)
)
@click.argument("dest_path", type=click.Path())
def convert(source_path, dest_path):
@click.option(
"--zip",
"-z",
"zip_transfers",
is_flag=True,
help="Zip each AM standard transfer directory after conversion",
)
def convert(source_path, dest_path, zip_transfers):
"""
Convert the VanDocs transfer package at SOURCE_PATH to one Archivematica
standard transfer directory per container in DEST_PATH. Validates the
Expand All @@ -160,7 +167,9 @@ def convert(source_path, dest_path):
(summary_msg, has_errors) = _validate(source_path)

if not has_errors:
(summary_msg, has_errors) = _convert(source_path, dest_path)
(summary_msg, has_errors) = _convert(
source_path, dest_path, zip=zip_transfers
)

_print_summary(summary_msg, has_errors)

Expand Down
124 changes: 87 additions & 37 deletions vandockit/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,20 @@ def create_subdirs(self, path, subdirs):

return path

def make_read_only(self, target_dir):
"""Recursively remove write access from directories and files in
directory dir to prevent modification"""

for item in target_dir.iterdir():
if item.is_dir():
# Recurse into sub-directories
def make_read_only(self, path):
"""
Remove write permissions from the file or directory at path to prevent
modification. If path is a directory, recursively remove write
permissions from all of its contents.
"""

if path.is_dir():
for item in path.iterdir():
self.make_read_only(item)
else:
item.chmod(0o444)

# Make target_dir itself read-only
target_dir.chmod(0o555)
path.chmod(0o555)
else:
path.chmod(0o444)


class PackageConverter(BaseConverter):
Expand Down Expand Up @@ -198,12 +199,12 @@ def get_summary_msg(self):

return msg

def convert(self, dest_path):
def convert(self, dest_path, **kwargs):
self.timer["start"] = time.time()
dest_path = Path(dest_path)

for container in self.get_containers():
container.write_am_std_transfer(dest_path)
container.write_am_std_transfer(dest_path, **kwargs)

if container.has_errors():
self.errors += container.errors
Expand All @@ -223,25 +224,26 @@ def get_log_prefix(self):
def get_am_transfer_name(self):
return f"{self.parent.get_transfer_number()}_{self.name}"

def transfer_exists(self, dest_path):
"""Check if a transfer directory or zip file with the same name already
exists in the destination path."""

transfer_name = self.get_am_transfer_name()

if (dest_path / transfer_name).exists() or (
dest_path / f"{transfer_name}.zip"
).exists():
return True

return False

def create_am_transfer_dir(self, dest_path):
"""Create an Archivematica transfer directory using the name format
[transfer_number]_[container_name]"""

am_transfer_name = self.get_am_transfer_name()
am_transfer_dir = dest_path / am_transfer_name

if am_transfer_dir.exists():
msg = (
f'Transfer "{am_transfer_name}" already exists. Please move or'
+ " delete the existing transfer directory to create a new"
+ " transfer."
)

self.errors += 1
logging.error("\n".join(msg).format(am_transfer_name))

return

self.create_subdirs(dest_path, am_transfer_name)

logging.info(
Expand Down Expand Up @@ -427,25 +429,73 @@ def copy_desc_md_files(self, am_transfer_dir):

self.copy_files(dmd_files, subdoc_dir)

def write_am_std_transfer(self, dest_path):
am_transfer_dir = self.create_am_transfer_dir(dest_path)
def zip_dir(self, path):
"""
Zip the directory at path and return the zip file's path. The original
directory is deleted after the zip file is created.
"""

try:
zip_path = shutil.make_archive(
path,
"zip",
path.parent,
path.name,
)
Comment thread
djjuhasz marked this conversation as resolved.
except OSError:
logging.critical(
self.get_log_prefix()
+ f"Couldn't create zip file '{path.name}.zip' from dir"
f" '{path}'"
)

# Halt script
raise

# Delete the unzipped transfer directory after creating the zip file
try:
shutil.rmtree(path)
except OSError:
logging.critical(
self.get_log_prefix() + f"Couldn't delete '{path}'"
)

# Halt script
raise

return Path(zip_path)

def write_am_std_transfer(self, dest_path, **kwargs):
# Skip this container if the transfer already exists
if self.transfer_exists(dest_path):
self.errors += 1
logging.error(
f'Transfer "{self.get_am_transfer_name()}" already exists.'
+ " Please move or delete the existing transfer to create a"
+ " new transfer."
)

# Skip this container if the target am_transfer_dir already exists
if not am_transfer_dir:
return

self.copy_submission_docs(am_transfer_dir)
self.write_location_file(am_transfer_dir)
self.write_am_checksum_file(am_transfer_dir)
transfer_path = self.create_am_transfer_dir(dest_path)

self.copy_submission_docs(transfer_path)
self.write_location_file(transfer_path)
self.write_am_checksum_file(transfer_path)

# 2022-03-10: At CVA's request disable the creation of metadata.csv
# because the current VanDocs data doesn't map accurately to Dublin
# Core
#
# self.write_am_metadata(am_transfer_dir)
# self.write_am_metadata(transfer_path)

self.copy_preservation_objects(am_transfer_dir)
self.copy_desc_md_files(am_transfer_dir)
self.make_read_only(am_transfer_dir)
self.copy_preservation_objects(transfer_path)
self.copy_desc_md_files(transfer_path)

return am_transfer_dir
# Zip the transfer directory if the --zip option was specified.
if kwargs.get("zip", False):
transfer_path = self.zip_dir(transfer_path)

self.make_read_only(transfer_path)

return transfer_path
40 changes: 32 additions & 8 deletions vandockit/tests/test_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import csv
import xml.etree.ElementTree as ET
from pathlib import Path

import pytest

Expand Down Expand Up @@ -161,15 +162,14 @@ def test_create_subdirs_os_error(self, dest_dir, vd_base_converter):
with pytest.raises(OSError):
vd_base_converter.create_subdirs(dest_dir, "spam")

def test_make_read_only(self, dest_dir, vd_base_converter):
def test_make_read_only_dir(self, dest_dir, vd_base_converter):
test_dir = dest_dir / "a_dir"
test_dir.mkdir()
test_subdir = test_dir / "a_subdir"
test_subdir.mkdir()
test_file = test_dir / "a_file.txt"
test_file.touch()

# Remove write permissions for directory and files
vd_base_converter.make_read_only(test_dir)

# Can't add a new file
Expand All @@ -181,6 +181,15 @@ def test_make_read_only(self, dest_dir, vd_base_converter):
with pytest.raises(PermissionError):
test_file.write_text("foo")

def test_make_read_only_file(self, dest_dir, vd_base_converter):
test_file = dest_dir / "a_file.txt"
test_file.touch()

vd_base_converter.make_read_only(test_file)

with pytest.raises(PermissionError):
test_file.write_text("foo")


class TestPackageConverter:
def test_get_submission_docs(self, vd_package_converter):
Expand Down Expand Up @@ -285,25 +294,27 @@ def test_get_am_transfer_name(

assert check_name == vd_container_converter.get_am_transfer_name()

def test_create_am_transfer_dir(
def test_transfer_exists(
self, dest_dir, vd_container_converter, test_container_data
):
vd_container_converter.create_am_transfer_dir(dest_dir)
assert vd_container_converter.transfer_exists(dest_dir) is False

check_path = dest_dir / "{}_{}".format(
test_container_data["transfer_number"], test_container_data["name"]
)
check_path.mkdir()

assert check_path.exists() and check_path.is_dir()
assert vd_container_converter.transfer_exists(dest_dir) is True

def test_create_am_transfer_dir_already_exists(
def test_create_am_transfer_dir(
self, dest_dir, vd_container_converter, test_container_data
):
vd_container_converter.create_am_transfer_dir(dest_dir)
check_path = dest_dir / "{}_{}".format(
test_container_data["transfer_number"], test_container_data["name"]
)
check_path.mkdir()

assert vd_container_converter.create_am_transfer_dir(dest_dir) is None
assert check_path.exists() and check_path.is_dir()

def test_copy_submission_docs(
self,
Expand Down Expand Up @@ -471,3 +482,16 @@ def test_copy_desc_md_files(
assert {
doc["md_filename"] for doc in test_container_data["documents"]
} == {i.name for i in sd_dir.glob("*_Metadata.xml")}

def test_zip_dir(
self,
dest_dir,
vd_container_converter,
):
path = vd_container_converter.create_am_transfer_dir(dest_dir)
zip_path = vd_container_converter.zip_dir(path)

assert zip_path == Path(f"{path}.zip")
assert zip_path.is_file()
assert zip_path.stat().st_size > 0
assert not path.exists()
Loading