Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 97 additions & 24 deletions geotessera/registry_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ def process_grid_directory(args):
# Read SHA256 file
sha256_file = os.path.join(grid_path, "SHA256")
if not os.path.exists(sha256_file):
# If there are .npy files but no SHA256, log warning and skip
# If there are .npy files but no SHA256, return warning and skip
if has_npy_files:
logger.warning(f"Skipping directory without SHA256 file: {sha256_file}")
return ("WARNING", "missing SHA256 file", grid_path)
return None # Skip this directory

# Parse hashes from SHA256 file
Expand Down Expand Up @@ -159,30 +159,18 @@ def process_grid_directory(args):

# Skip directories with incomplete npy/scales files (one but not the other)
if embedding_exists and not scales_exists:
logger.warning(
f"Skipping directory with incomplete files (missing scales): {grid_path}"
)
return None
return ("WARNING", "missing scales", grid_path)
if scales_exists and not embedding_exists:
logger.warning(
f"Skipping directory with incomplete files (missing embedding): {grid_path}"
)
return None
return ("WARNING", "missing embedding", grid_path)
if not embedding_exists and not scales_exists:
# Both missing - skip silently (probably not a tile directory)
return None

# Check for hashes in SHA256 file
if embedding_hash is None:
logger.warning(
f"Skipping directory - no hash found for embedding in SHA256 file: {sha256_file}"
)
return None
return ("WARNING", "no hash for embedding in SHA256 file", grid_path)
if scales_hash is None:
logger.warning(
f"Skipping directory - no hash found for scales in SHA256 file: {sha256_file}"
)
return None
return ("WARNING", "no hash for scales in SHA256 file", grid_path)

# Get file stats
embedding_stat = os.stat(embedding_path)
Expand Down Expand Up @@ -216,7 +204,7 @@ def iterate_tessera_tiles(
base_dir: str,
callback: Callable[[TileInfo], Any],
progress_callback: Optional[Callable] = None,
) -> List[Any]:
) -> tuple[List[Any], List[tuple[str, str]]]:
"""
Single-pass iterator through Tessera embedding filesystem structure.

Expand All @@ -234,7 +222,9 @@ def iterate_tessera_tiles(
progress_callback: Optional progress reporting function(current, total, status)

Returns:
List of results from callback calls (None results are filtered out)
Tuple of (results, warnings) where:
- results: List of results from callback calls (None results are filtered out)
- warnings: List of (reason, path) tuples for skipped directories

Raises:
FileNotFoundError: Missing SHA256 files or embedding/scales files
Expand All @@ -246,6 +236,7 @@ def iterate_tessera_tiles(
raise FileNotFoundError(f"Embeddings directory not found: {repr_dir}")

results = []
warnings = []
processed_dirs = 0
total_dirs = 0

Expand All @@ -260,7 +251,7 @@ def iterate_tessera_tiles(

if total_dirs == 0:
# No grid directories at all
return results # Return empty list instead of raising error
return results, warnings # Return empty lists instead of raising error

# Get number of CPU cores for parallel processing
num_cores = multiprocessing.cpu_count()
Expand Down Expand Up @@ -317,6 +308,10 @@ def iterate_tessera_tiles(
_, grid_name, error_msg = tile_info_or_error
grid_path = os.path.join(year_path, grid_name)
raise RuntimeError(f"Error processing {grid_path}: {error_msg}")
elif tile_info_or_error[0] == "WARNING":
_, reason, path = tile_info_or_error
warnings.append((reason, path))
continue

# Skip None results (empty/skipped directories)
if tile_info_or_error is None:
Expand All @@ -336,7 +331,7 @@ def iterate_tessera_tiles(
f"Unexpected error in parallel processing: {e}"
) from e

return results
return results, warnings


def calculate_sha256(file_path):
Expand Down Expand Up @@ -668,7 +663,7 @@ def progress_callback(current, total, status):

try:
# Use the fast iterator - reads SHA256 files, no hash calculation
records = iterate_tessera_tiles(
records, warnings = iterate_tessera_tiles(
base_dir, collect_tile_data, progress_callback=progress_callback
)

Expand Down Expand Up @@ -757,6 +752,45 @@ def progress_callback(current, total, status):
)
)

# Display warning summary if there are any skipped directories
if warnings:
console.print()
console.print(
f"[yellow]⚠ Skipped {len(warnings)} director{'y' if len(warnings) == 1 else 'ies'} with incomplete files:[/yellow]"
)

# Group warnings by reason for better readability
from collections import defaultdict
warnings_by_reason = defaultdict(list)
for reason, path in warnings:
warnings_by_reason[reason].append(path)

for reason, paths in sorted(warnings_by_reason.items()):
console.print(f"\n[dim] {reason.capitalize()}:[/dim]")
for path in sorted(paths):
console.print(f" {path}")

# Write missing files to separate text files
output_dir = Path(output_path).parent

# Write missing embeddings
missing_embeddings = warnings_by_reason.get("missing embedding", [])
if missing_embeddings:
missing_embeddings_file = output_dir / "missing_embeddings.txt"
with open(missing_embeddings_file, "w") as f:
for path in sorted(missing_embeddings):
f.write(f"{path}\n")
console.print(f"\n[dim]Written {len(missing_embeddings)} paths to {missing_embeddings_file}[/dim]")

# Write missing scales
missing_scales = warnings_by_reason.get("missing scales", [])
if missing_scales:
missing_scales_file = output_dir / "missing_scales.txt"
with open(missing_scales_file, "w") as f:
for path in sorted(missing_scales):
f.write(f"{path}\n")
console.print(f"[dim]Written {len(missing_scales)} paths to {missing_scales_file}[/dim]")

return True


Expand Down Expand Up @@ -831,7 +865,7 @@ def progress_callback(current, total, status):

try:
# Run validation
iterate_tessera_tiles(
_, warnings = iterate_tessera_tiles(
base_dir, validate_tile, progress_callback=progress_callback
)

Expand All @@ -858,6 +892,45 @@ def progress_callback(current, total, status):
)
)

# Display warning summary if there are any skipped directories
if warnings:
console.print()
console.print(
f"[yellow]⚠ Skipped {len(warnings)} director{'y' if len(warnings) == 1 else 'ies'} with incomplete files:[/yellow]"
)

# Group warnings by reason for better readability
from collections import defaultdict
warnings_by_reason = defaultdict(list)
for reason, path in warnings:
warnings_by_reason[reason].append(path)

for reason, paths in sorted(warnings_by_reason.items()):
console.print(f"\n[dim] {reason.capitalize()}:[/dim]")
for path in sorted(paths):
console.print(f" {path}")

# Write missing files to separate text files
output_dir = Path(base_dir)

# Write missing embeddings
missing_embeddings = warnings_by_reason.get("missing embedding", [])
if missing_embeddings:
missing_embeddings_file = output_dir / "missing_embeddings.txt"
with open(missing_embeddings_file, "w") as f:
for path in sorted(missing_embeddings):
f.write(f"{path}\n")
console.print(f"\n[dim]Written {len(missing_embeddings)} paths to {missing_embeddings_file}[/dim]")

# Write missing scales
missing_scales = warnings_by_reason.get("missing scales", [])
if missing_scales:
missing_scales_file = output_dir / "missing_scales.txt"
with open(missing_scales_file, "w") as f:
for path in sorted(missing_scales):
f.write(f"{path}\n")
console.print(f"[dim]Written {len(missing_scales)} paths to {missing_scales_file}[/dim]")

return 0


Expand Down
Loading