Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 156 additions & 174 deletions scripts/datastream_updater.py
Original file line number Diff line number Diff line change
@@ -1,200 +1,182 @@
#!/usr/bin/env python

import base64
import datetime
import argparse
from lxml import etree as ET
from lxml.etree import QName
import base64
import os
import mimetypes
import logging

# Setting up basic logging
logging.basicConfig(level=logging.INFO)


def format_xml_element(element, level=0, indent=" "):
"""
Formats an XML element by adding appropriate spacing and indentation.

Args:
element (Element): The XML element to format.
level (int, optional): The current level of indentation. Defaults to 0.
indent (str, optional): The string used for indentation. Defaults to " ".
from datetime import datetime
import xml.etree.ElementTree as ET

Returns:
None
"""
spacing = "\n" + level * indent

if len(element):
if not element.text or not element.text.strip():
element.text = spacing + indent
if not element.tail or not element.tail.strip():
element.tail = spacing
for child in element:
format_xml_element(child, level + 1, indent)
else:
if level and (not element.tail or not element.tail.strip()):
element.tail = spacing
NAMESPACES = {
'foxml': 'info:fedora/fedora-system:def/foxml#'
}

def register_namespaces():
"""Registers all known namespaces with ElementTree for clean output."""
for prefix, uri in NAMESPACES.items():
ET.register_namespace(prefix, uri)

def compress_and_encode(file_path):
def update_foxml_datastream(input_path, output_path, dsid, content_file, label, mimetype, control_group):
"""
Compresses and encodes the binary data from the given file path.
Adds or replaces a datastream in a FOXML file with Base64 encoded content,
with precise indentation and multi-line formatting that preserves the original document's style.

Args:
file_path (str): The path to the file containing the binary data.

Returns:
tuple: A tuple containing the indented base64-encoded data and the original size of the binary data.
input_path (str): Path to the source FOXML file.
output_path (str): Path to save the modified FOXML file.
dsid (str): The ID of the datastream to add/update (e.g., 'OBJ', 'MODS').
content_file (str): Path to the file containing the new content.
label (str): The label for the new datastream version.
mimetype (str): The MIME type of the content file.
control_group (str): The control group for the datastream (e.g., 'M', 'X').
"""
with open(file_path, "rb") as f_in:
binary_data = f_in.read()
original_size = len(binary_data)
base64_data = base64.b64encode(binary_data)
base64_lines = [
base64_data[i : i + 80].decode("utf-8")
for i in range(0, len(base64_data), 80)
]
indented_base64 = "\n ".join(base64_lines)
return indented_base64, original_size


def register_namespaces(xml_path):
"""
Registers XML namespaces from the given XML file.

Args:
xml_path (str): The path to the XML file.
if not os.path.exists(content_file):
print(f"Error: Content file not found at '{content_file}'")
return

Raises:
Exception: If there is an error registering the namespaces.
"""
try:
namespaces = dict(
[node for _, node in ET.iterparse(xml_path, events=["start-ns"])]
)
for ns in namespaces:
ET.register_namespace(ns, namespaces[ns])
except Exception as e:
logging.error(f"Error registering namespaces: {e}")
raise


def add_datastream_version(
xml_path, dsid, base64_data, original_size, mimetype, label=None
):
"""
Adds a new version of a datastream to an XML file.
print(f"Reading content from '{content_file}'...")
with open(content_file, 'rb') as f:
binary_content_bytes = f.read()

encoded_content_string = base64.b64encode(binary_content_bytes).decode('ascii')
content_size = os.path.getsize(content_file)
print(f"Content read successfully. Original size: {content_size} bytes.")

Args:
xml_path (str): The path to the XML file.
dsid (str): The ID of the datastream.
base64_data (str): The base64-encoded content of the datastream.
original_size (int): The original size of the datastream in bytes.
mimetype (str): The MIME type of the datastream.
label (str, optional): The label for the datastream version. If not provided, a default label will be used.

Returns:
str: The XML string with the new datastream version added.

Raises:
ET.ParseError: If there is an error parsing the XML file.
Exception: If there is an error creating the XML string.
"""
register_namespaces()
try:
root = ET.parse(xml_path).getroot()
tree = ET.parse(input_path)
root = tree.getroot()
except ET.ParseError as e:
logging.exception(f"XML parsing error: {e}")
print(f"Error parsing XML file '{input_path}': {e}")
return

Comment on lines +17 to 50
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring this function due to complexity.

The function signature and initial file handling logic are correct, but static analysis indicates this function has too many arguments (7/5), local variables (34/15), and statements (69/50), making it difficult to maintain.

Consider breaking this large function into smaller, focused functions:

  • File reading and encoding logic
  • XML parsing and datastream lookup
  • Datastream creation/update logic
  • XML output writing

This would improve readability, testability, and maintainability.

🧰 Tools
🪛 Flake8 (7.2.0)

[error] 17-17: expected 2 blank lines, found 1

(E302)

🪛 Pylint (3.3.7)

[refactor] 17-17: Too many arguments (7/5)

(R0913)


[refactor] 17-17: Too many positional arguments (7/5)

(R0917)


[refactor] 17-17: Too many local variables (34/15)

(R0914)


[refactor] 17-17: Too many statements (69/50)

(R0915)

🤖 Prompt for AI Agents
In scripts/datastream_updater.py around lines 17 to 50, the
update_foxml_datastream function is too complex with excessive arguments, local
variables, and statements. Refactor by splitting it into smaller functions: one
for reading and base64 encoding the content file, another for parsing the XML
and locating the datastream, a third for creating or updating the datastream
element, and a final one for writing the modified XML output. This modular
approach will reduce complexity and improve readability and maintainability.

nsmap = {
"foxml": "info:fedora/fedora-system:def/foxml#",
"xsi": "http://www.w3.org/2001/XMLSchema-instance",
"audit": "info:fedora/fedora-system:def/audit#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"fedora": "info:fedora/fedora-system:def/relations-external#",
"fedora-model": "info:fedora/fedora-system:def/model#",
"islandora": "http://islandora.ca/ontology/relsext#",
}

# Have to use qualified names when creating an element.
ds_version_tag = QName(nsmap["foxml"], "datastreamVersion")
binary_content_tag = QName(nsmap["foxml"], "binaryContent")

datastream = root.find(f".//foxml:datastream[@ID='{dsid}']", namespaces=nsmap)
datastream_xpath = f"./foxml:datastream[@ID='{dsid}']"
datastream = root.find(datastream_xpath, NAMESPACES)

datastream_indent = ' '
version_indent = datastream_indent + ' '
content_indent = version_indent + ' '
base64_indent = ' ' * 14

if datastream is None:
logging.warning(f"Datastream with ID of {dsid} does not exist.")
return
print(f"Datastream with ID '{dsid}' not found. Creating a new one.")
datastream = ET.SubElement(root, f"{{{NAMESPACES['foxml']}}}datastream", {
'ID': dsid, 'STATE': 'A', 'CONTROL_GROUP': control_group, 'VERSIONABLE': 'true'
})
if len(root) > 1:
prev_sibling = root[-2]
datastream.tail = prev_sibling.tail
prev_sibling.tail = '\n' + datastream_indent
else:
root.text = '\n' + datastream_indent
datastream.tail = '\n'

datastream.text = '\n' + version_indent
last_version = None
version_num = 0

if label is None:
datastream_version = datastream.find(
".//foxml:datastreamVersion[last()]", namespaces=nsmap
)
label = (
datastream_version.get("LABEL")
if datastream_version is not None
else "default_label"
)

new_id = "{}.{}".format(
dsid, len(datastream.findall(".//foxml:datastreamVersion", namespaces=nsmap))
)
datastream_version = ET.SubElement(
datastream,
ds_version_tag,
{
"ID": new_id,
"LABEL": label,
"MIMETYPE": mimetype,
"SIZE": str(original_size),
},
else:
print(f"Found existing datastream with ID '{dsid}'. Adding a new version.")
versions = datastream.findall(f"{{{NAMESPACES['foxml']}}}datastreamVersion", NAMESPACES)
last_version = versions[-1] if versions else None
version_num = len(versions)
if last_version is not None:
last_version.tail = '\n' + version_indent
else:
datastream.text = '\n' + version_indent

new_version_id = f"{dsid}.{version_num}"
now = datetime.utcnow()
main_part = now.strftime('%Y-%m-%dT%H:%M:%S')
milliseconds = f'{now.microsecond // 1000:03d}'
created_timestamp = f'{main_part}.{milliseconds}Z'

if not mimetype:
mimetype, _ = mimetypes.guess_type(content_file)
mimetype = mimetype or 'application/octet-stream'
print(f"Guessed MIME type: '{mimetype}'")

if not label:
label = f"{dsid} datastream"

ds_version_attrs = {
'ID': new_version_id, 'LABEL': label, 'CREATED': created_timestamp,
'MIMETYPE': mimetype, 'SIZE': str(content_size)
}
ds_version = ET.SubElement(datastream, f"{{{NAMESPACES['foxml']}}}datastreamVersion", ds_version_attrs)

ds_version.text = '\n' + content_indent
ds_version.tail = '\n' + datastream_indent

binary_content_element = ET.SubElement(ds_version, f"{{{NAMESPACES['foxml']}}}binaryContent")

LINE_WIDTH = 76
chunks = [encoded_content_string[i:i + LINE_WIDTH] for i in range(0, len(encoded_content_string), LINE_WIDTH)]

binary_content_element.text = (
f"\n{base64_indent}" +
f"\n{base64_indent}".join(chunks) +
f"\n{content_indent}"
)

dt = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
datastream_version.set("CREATED", dt)

binary_content = ET.SubElement(datastream_version, binary_content_tag)
binary_content.text = "\n " + base64_data + "\n "
binary_content_element.tail = '\n' + version_indent

try:
ET.indent(root, space=" ")
format_xml_element(root)
xml_string = ET.tostring(
root, encoding="utf-8", method="xml", xml_declaration=True
)
except Exception as e:
logging.exception(f"Error creating XML string: {e}")
raise

return xml_string


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--xml", help="path to the XML file to modify", required=True)
parser.add_argument("--dsid", help="ID of the datastream to modify", required=True)
tree.write(output_path, encoding='UTF-8', xml_declaration=True)
print(f"Successfully created new version '{new_version_id}'.")
print(f"Modified FOXML file saved to '{output_path}'")
except IOError as e:
print(f"Error writing to output file '{output_path}': {e}")


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Add or update a datastream in a FOXML file with Base64 encoded content.',
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
'-i', '--input-foxml',
required=True,
help='Path to the input FOXML file.'
)
parser.add_argument(
'-o', '--output-foxml',
required=True,
help='Path to save the modified output FOXML file.'
)
parser.add_argument(
'--dsid',
required=True,
help='The ID for the datastream (e.g., "OBJ", "MODS", "FULL_TEXT").'
)
parser.add_argument(
"--content",
help="path to the binary content to add as a new datastreamVersion",
'-f', '--file',
required=True,
dest='content_file',
help='Path to the file to be used as the new datastream content.'
)
parser.add_argument(
'--label',
default=None,
help='A human-readable label for the new datastream version. \n(default: "[dsid] datastream")'
)
parser.add_argument(
'--mimetype',
default=None,
help='The MIME type of the content file (e.g., "application/pdf").\n(default: auto-detected or "application/octet-stream")'
)
parser.add_argument(
'--control-group',
default='M',
choices=['M', 'X', 'R', 'E'],
help='The control group for the datastream. \'M\' (Managed) is typical for binary content. \n(default: M)'
)
parser.add_argument("--label", help="label of the new datastream version")
parser.add_argument("--output", help="path to the output XML file", required=True)
args = parser.parse_args()

try:
mimetype, _ = mimetypes.guess_type(args.content)
mimetype = mimetype or "application/octet-stream"

base64_data, original_size = compress_and_encode(args.content)
register_namespaces(args.xml)
updated_xml = add_datastream_version(
args.xml, args.dsid, base64_data, original_size, mimetype, args.label
)

if updated_xml:
with open(args.output, "w") as f_out:
f_out.write(updated_xml.decode("utf-8"))
except Exception as e:
logging.exception(f"Error in script execution: {e}")
update_foxml_datastream(
input_path=args.input_foxml,
output_path=args.output_foxml,
dsid=args.dsid,
content_file=args.content_file,
label=args.label,
mimetype=args.mimetype,
control_group=args.control_group
)