Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e0b79e9
Initial refactor to delete tars properly
forsyth2 Jan 3, 2026
8a4e74e
Fixes from Claude
forsyth2 Jan 3, 2026
43963ee
Further fixes
forsyth2 Jan 6, 2026
25b606b
Add progressive deletion tests
forsyth2 Jan 6, 2026
ed3248d
Pre-commit fixes
forsyth2 Mar 4, 2026
864abe2
Address code review comments
forsyth2 Mar 5, 2026
67e8a00
Minor performance improvements
forsyth2 Mar 5, 2026
e6f07e6
Add type annotation
forsyth2 Mar 5, 2026
d8cf919
Fix typo
forsyth2 Mar 5, 2026
cf05c53
Refactor add_files
forsyth2 Mar 6, 2026
42a8ea2
Refactor process_tar
forsyth2 Mar 7, 2026
f6fcb3f
Better structure file size logic
forsyth2 Mar 7, 2026
0c3cd7c
Clarify and improve Globus logic
forsyth2 Mar 7, 2026
5f32dec
Fix Python tests
forsyth2 Mar 9, 2026
f03160c
Perlmutter tests passing
forsyth2 Mar 9, 2026
221a1b6
Address simple code review comments
forsyth2 Mar 10, 2026
f7e0275
Address globus_finalize comment
forsyth2 Mar 10, 2026
c64d059
Make fix to pass follow_symlinks.sh
forsyth2 Mar 10, 2026
5a72742
Address simple code review comments
forsyth2 Mar 10, 2026
35a5a43
Address remaining comments
forsyth2 Mar 10, 2026
c408e5b
Improve batch status handling and comments
forsyth2 Mar 10, 2026
b2fe7c6
Remove pending_globus_batches check
forsyth2 Mar 10, 2026
ee135f6
Address comment on keep
forsyth2 Mar 10, 2026
75efad0
Update logger calls
forsyth2 Mar 23, 2026
985f94e
Rename tar table skip parameter
forsyth2 Mar 23, 2026
bb11a8d
Add comment explaining write method
forsyth2 Mar 23, 2026
2931d9e
Add comment explaining tars table
forsyth2 Mar 23, 2026
357b264
Update comments on tar submission
forsyth2 Mar 23, 2026
902df88
Reduce complexity of construct_tars
forsyth2 Mar 23, 2026
898e95e
Address comments
forsyth2 Mar 26, 2026
5a6c7f0
Add TaskStatus Enum
forsyth2 Mar 26, 2026
f757094
Improve handling of TransferData
forsyth2 Mar 30, 2026
d57c311
Address comments
forsyth2 Apr 3, 2026
3d8b908
Preliminary refactor of hpss_transfer()
forsyth2 Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,17 @@ test_globus_tar_deletion()
keep_flag=""
fi

zstash create ${blocking_flag} ${keep_flag} --hpss=${globus_path}/${case_name} --maxsize 128 zstash_demo 2>&1 | tee ${case_name}.log
# Use -v so debug logs show up.
zstash create ${blocking_flag} ${keep_flag} --hpss=${globus_path}/${case_name} --maxsize 128 -v zstash_demo 2>&1 | tee ${case_name}.log
if [ $? != 0 ]; then
echo "${case_name} failed. Check ${case_name}_create.log for details. Cannot continue."
echo "${case_name} failed. Check ${case_name}.log for details. Cannot continue."
return 1
fi
echo "${case_name} completed successfully. Checking ${case_name}.log now."
check_log_has "Creating new tar archive 000000.tar" ${case_name}.log || return 2

echo ""
echo "Checking directory status after 'zstash create' has completed. src should only have index.db. dst should have tar and index.db."
echo "Checking directory status after 'zstash create' has completed."
echo "Checking logs in current directory: ${PWD}"

echo ""
Expand Down Expand Up @@ -181,6 +182,98 @@ test_globus_tar_deletion()
return 0 # Success
}

test_globus_progressive_deletion()
{
local path_to_repo=$1
local dst_endpoint=$2
local dst_dir=$3
local blocking_str=$4

src_dir=${path_to_repo}/tests/utils/globus_tar_deletion
rm -rf ${src_dir}
mkdir -p ${src_dir}
dst_endpoint_uuid=$(get_endpoint ${dst_endpoint})
globus_path=globus://${dst_endpoint_uuid}/${dst_dir}

case_name=${blocking_str}_progressive_deletion
echo "Running test_globus_progressive_deletion on case=${case_name}"
echo "Exit codes: 0 -- success, 1 -- zstash failed, 2 -- grep check failed"

setup ${case_name} "${src_dir}"

# Create files totaling >2 GB to trigger multiple tars with maxsize=1 GB
# Each file is ~700 MB, so we'll get 3 tars
echo "Creating large test files (this may take a minute)..."
dd if=/dev/zero of=zstash_demo/file1.dat bs=1M count=700 2>/dev/null # 700 MB
dd if=/dev/zero of=zstash_demo/file2.dat bs=1M count=700 2>/dev/null # 700 MB
dd if=/dev/zero of=zstash_demo/file3.dat bs=1M count=700 2>/dev/null # 700 MB
echo "✓ Test files created"

if [ "$blocking_str" == "non-blocking" ]; then
blocking_flag="--non-blocking"
else
blocking_flag=""
fi

# Run with maxsize=1 GB to create multiple tars
echo "Running zstash create (this may take several minutes due to file size and transfers)..."
zstash create ${blocking_flag} --hpss=${globus_path}/${case_name} --maxsize 1 -v zstash_demo 2>&1 | tee ${case_name}.log
if [ $? != 0 ]; then
echo "${case_name} failed."
return 1
fi

# Check that multiple tar files were created
tar_count=$(grep -c "Creating new tar archive" ${case_name}.log)
if [ ${tar_count} -lt 2 ]; then
echo "Expected at least 2 tar archives to be created, found ${tar_count}"
return 2
fi
echo "✓ Created ${tar_count} tar archives"

# Check that files were deleted progressively
deletion_count=$(grep -c "Deleting .* files from successful transfer" ${case_name}.log)

if [ "$blocking_str" == "blocking" ]; then
# In blocking mode, we should see deletion after each tar transfer
if [ ${deletion_count} -lt $((tar_count - 1)) ]; then
echo "Expected at least $((tar_count - 1)) deletion events in blocking mode, found ${deletion_count}"
return 2
fi
echo "✓ Files deleted progressively (${deletion_count} deletion events)"
else
# In non-blocking mode, deletions happen when we check status
if [ ${deletion_count} -lt 1 ]; then
echo "Expected at least 1 deletion event in non-blocking mode, found ${deletion_count}"
return 2
fi
echo "✓ Files deleted (${deletion_count} deletion events in non-blocking mode)"
fi

# Verify that NO tar files remain in source after completion
echo "Checking that no tar files remain in source"
ls ${src_dir}/${case_name}/zstash_demo/zstash/*.tar 2>&1 | tee ls_tar_check.log
if grep -q "\.tar" ls_tar_check.log && ! grep -q "No such file" ls_tar_check.log; then
echo "Found tar files that should have been deleted!"
return 2
fi
echo "✓ All tar files successfully deleted from source"

# Verify tar files exist in destination
if [ "$blocking_str" == "non-blocking" ]; then
wait_for_directory "${dst_dir}/${case_name}" || return 1
fi

dst_tar_count=$(ls ${dst_dir}/${case_name}/*.tar 2>/dev/null | wc -l)
if [ ${dst_tar_count} -ne ${tar_count} ]; then
echo "Expected ${tar_count} tar files in destination, found ${dst_tar_count}"
return 2
fi
echo "✓ All ${tar_count} tar files present in destination"

return 0
}

# Follow these directions #####################################################

# Example usage:
Expand Down Expand Up @@ -232,7 +325,14 @@ run_test_with_tracking() {
echo "Running: ${test_name}"
echo "=========================================="

if test_globus_tar_deletion "${args[@]}"; then
# Determine which test function to call based on test name
if [[ "${test_name}" == *"progressive"* ]]; then
test_func=test_globus_progressive_deletion
else
test_func=test_globus_tar_deletion
fi

if ${test_func} "${args[@]}"; then
# Print test result in the output block AND at the end
echo "✓ ${test_name} PASSED"
test_results+=("✓ ${test_name} PASSED") # Uses Global variable
Expand All @@ -252,15 +352,29 @@ tests_passed=0
tests_failed=0
test_results=() # Global variable to hold test results

echo "Primary tests: single authentication code tests for each endpoint"
echo "Primary tests: basic functionality tests"
echo "If a test hangs, check if https://app.globus.org/activity reports any errors on your transfers."

# Run all tests independently
# Run basic tests
# These check that AT THE END of the run,
# we either still have the files (keep) or the files are deleted (non-keep).
run_test_with_tracking "blocking_non-keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "blocking" "non-keep" || true
run_test_with_tracking "non-blocking_non-keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "non-blocking" "non-keep" || true
run_test_with_tracking "blocking_keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "blocking" "keep" || true
run_test_with_tracking "non-blocking_keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "non-blocking" "keep" || true

echo ""
echo "Progressive deletion tests: verify files are deleted as transfers complete"
echo "WARNING: These tests create ~2GB of data and will take several minutes"

# Run progressive deletion tests
# These check that DURING the run,
# files are deleted after successful transfers (non-keep only).
# Blocking -- get files, transfer files, delete at src, start next transfer.
# Non-blocking -- get files, transfer files, get next set of files, transfer those files, check if previous transfer is done (and if so, delete at src).
run_test_with_tracking "blocking_progressive_deletion" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "blocking" || true
run_test_with_tracking "non-blocking_progressive_deletion" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "non-blocking" || true

# Print summary
echo ""
echo "=========================================="
Expand Down
34 changes: 0 additions & 34 deletions tests/unit/test_optimized_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,40 +563,6 @@ def test_time_tolerance_check(self):
assert is_within_tolerance == should_match


class TestBackwardCompatibility:
"""Tests to ensure backward compatibility with existing code."""

def test_get_files_to_archive_still_works(self, tmp_path):
"""Test that legacy get_files_to_archive function still works."""
from zstash.utils import get_files_to_archive

(tmp_path / "file.txt").write_text("content")

os.chdir(tmp_path)
result = get_files_to_archive("cache", None, None)

# Should return list of strings
assert isinstance(result, list)
assert len(result) > 0
assert all(isinstance(item, str) for item in result)

def test_output_format_matches_original(self, tmp_path):
"""Test that file paths are normalized the same way as original."""
subdir = tmp_path / "subdir"
subdir.mkdir()
(subdir / "file.txt").write_text("content")

os.chdir(tmp_path)

from zstash.utils import get_files_to_archive

legacy_result = get_files_to_archive("cache", None, None)
new_result = list(get_files_to_archive_with_stats("cache", None, None).keys())

# Should produce same file list
assert legacy_result == new_result


@pytest.fixture
def mock_database():
"""Fixture providing a mock database cursor."""
Expand Down
86 changes: 42 additions & 44 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@
import os.path
import sqlite3
import sys
from typing import Any, List, Tuple
from datetime import datetime
from typing import Any, Dict, List, Tuple

from six.moves.urllib.parse import urlparse

from .globus import globus_activate, globus_finalize
from .hpss import hpss_put
from .hpss_utils import add_files
from .hpss_utils import DevOptions, construct_tars
from .settings import DEFAULT_CACHE, config, get_db_filename, logger
from .transfer_tracking import TransferManager
from .utils import (
create_tars_table,
get_files_to_archive,
get_files_to_archive_with_stats,
run_command,
tars_table_exists,
ts_utc,
Expand Down Expand Up @@ -52,12 +54,13 @@ def create():
logger.error(input_path_error_str)
raise NotADirectoryError(input_path_error_str)

transfer_manager: TransferManager = TransferManager()
if hpss != "none":
url = urlparse(hpss)
if url.scheme == "globus":
# identify globus endpoints
logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)")
globus_activate(hpss)
transfer_manager.globus_config = globus_activate(hpss)
else:
# config.hpss is not "none", so we need to
# create target HPSS directory
Expand Down Expand Up @@ -88,14 +91,21 @@ def create():

# Create and set up the database
logger.debug(f"{ts_utc()}: Calling create_database()")
failures: List[str] = create_database(cache, args)
failures: List[str] = create_database(cache, args, transfer_manager)

# Transfer to HPSS. Always keep a local copy.
logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}")
hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True)
hpss_put(
hpss,
get_db_filename(cache),
cache,
transfer_manager,
keep=args.keep,
is_index=True,
)

logger.debug(f"{ts_utc()}: calling globus_finalize()")
globus_finalize(non_blocking=args.non_blocking)
globus_finalize(transfer_manager, args.keep)

if len(failures) > 0:
# List the failures
Expand Down Expand Up @@ -204,7 +214,9 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
return cache, args


def create_database(cache: str, args: argparse.Namespace) -> List[str]:
def create_database(
cache: str, args: argparse.Namespace, transfer_manager: TransferManager
) -> List[str]:
# Create new database
logger.debug(f"{ts_utc()}:Creating index database")
if os.path.exists(get_db_filename(cache)):
Expand Down Expand Up @@ -260,44 +272,30 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
cur.execute("insert into config values (?,?)", (attr, value))
con.commit()

files: List[str] = get_files_to_archive(cache, args.include, args.exclude)
file_stats: Dict[str, Tuple[int, datetime]] = get_files_to_archive_with_stats(
cache, args.include, args.exclude
)

failures: List[str]
if args.follow_symlinks:
try:
# Add files to archive
failures = add_files(
cur,
con,
-1,
files,
cache,
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
error_on_duplicate_tar=args.error_on_duplicate_tar,
overwrite_duplicate_tars=args.overwrite_duplicate_tars,
force_database_corruption=args.for_developers_force_database_corruption,
)
except FileNotFoundError:
raise Exception("Archive creation failed due to broken symlink.")
else:
# Add files to archive
failures = add_files(
cur,
con,
-1,
files,
cache,
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
error_on_duplicate_tar=args.error_on_duplicate_tar,
overwrite_duplicate_tars=args.overwrite_duplicate_tars,
force_database_corruption=args.for_developers_force_database_corruption,
)
dev_options: DevOptions = DevOptions(
error_on_duplicate_tar=args.error_on_duplicate_tar,
overwrite_duplicate_tars=args.overwrite_duplicate_tars,
force_database_corruption=args.for_developers_force_database_corruption,
)
# Add files to archive
failures = construct_tars(
cur,
con,
-1,
file_stats,
cache,
args.keep,
args.follow_symlinks,
dev_options,
transfer_manager,
skip_tars_table=args.no_tars_md5,
non_blocking=args.non_blocking,
)

# Close database
con.commit()
Expand Down
Loading
Loading