Skip to content

Commit 58341fd

Browse files
authored
Merge pull request #35 from AllenInstitute/feature/make-check_paths_in_sync-multi-threaded
Add parallel processing for path sync checks in S3 utility
2 parents 616192c + f4187a6 commit 58341fd

File tree

2 files changed

+247
-107
lines changed
  • src/aibs_informatics_aws_utils
  • test/aibs_informatics_aws_utils

2 files changed

+247
-107
lines changed

src/aibs_informatics_aws_utils/s3.py

Lines changed: 156 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import math
55
import os
66
from collections import defaultdict
7+
from concurrent.futures import ThreadPoolExecutor, as_completed
78
from datetime import datetime, timezone
89
from enum import Enum
910
from functools import lru_cache
@@ -432,8 +433,12 @@ def is_object(s3_path: S3URI, **kwargs) -> bool:
432433
s3 = get_s3_client(**kwargs)
433434
try:
434435
s3.head_object(Bucket=s3_path.bucket, Key=s3_path.key)
435-
except ClientError:
436-
return False
436+
except ClientError as e:
437+
if client_error_code_check(e, "404", "NoSuchKey", "NotFound"):
438+
return False
439+
raise AWSError(
440+
f"Error checking existence of {s3_path}: {get_client_error_message(e)}"
441+
) from e
437442
return True
438443

439444

@@ -488,6 +493,37 @@ def is_folder(s3_path: S3URI, **kwargs) -> bool:
488493
)
489494

490495

496+
def is_folder_placeholder_object(s3_path: S3URI, **kwargs) -> bool:
497+
"""Check if S3 Path is a "folder placeholder" object
498+
499+
A "folder placeholder" object is defined as an S3 object that:
500+
- Has a key that ends with a '/' character.
501+
- Has a content length of zero bytes.
502+
503+
These objects are often used to represent folders in S3, which is a flat storage system.
504+
For these purposes, we want to ignore such objects when considering the contents of a folder.
505+
506+
Args:
507+
s3_path (S3URI): S3 URI to check.
508+
509+
Returns:
510+
bool: True if the S3 path is a folder placeholder object, False otherwise.
511+
"""
512+
if not s3_path.has_folder_suffix():
513+
return False
514+
515+
s3 = get_s3_client(**kwargs)
516+
try:
517+
obj = s3.head_object(Bucket=s3_path.bucket, Key=s3_path.key)
518+
return obj["ContentLength"] == 0
519+
except ClientError as e:
520+
if client_error_code_check(e, "404", "NoSuchKey", "NotFound"):
521+
return False
522+
raise AWSError(
523+
f"Error checking existence of {s3_path}: {get_client_error_message(e)}"
524+
) from e
525+
526+
491527
def get_s3_path_collection_stats(*s3_paths: S3URI, **kwargs) -> Mapping[S3URI, S3PathStats]:
492528
return dict(
493529
zip(
@@ -1020,21 +1056,14 @@ def should_sync(
10201056
Args:
10211057
source_path (Union[Path, S3URI]): source path
10221058
destination_path (Union[Path, S3URI]): destination to transfer to
1023-
size_only (bool, optional): Limits content comparison to just size and date (no ETag).
1024-
Defaults to False.
1025-
1026-
Raises:
1027-
ValueError: if the source path does not exist.
1028-
1029-
Returns:
1030-
bool: True if a transfer is necessary, False, otherwise
1059+
size_only (bool, optional): Limits content comparison to False, otherwise
10311060
"""
10321061
source_last_modified: datetime
10331062
source_size_bytes: int
1034-
source_hash: Optional[Callable[[], str]] = None
1063+
source_hash: Callable[[], Optional[str]]
10351064
dest_last_modified: Optional[datetime] = None
10361065
dest_size_bytes: Optional[int] = None
1037-
dest_hash: Optional[Callable[[], str]] = None
1066+
dest_hash: Callable[[], Optional[str]]
10381067
multipart_chunk_size_bytes: Optional[int] = None
10391068
multipart_threshold_bytes: Optional[int] = None
10401069

@@ -1045,21 +1074,23 @@ def should_sync(
10451074
multipart_chunk_size_bytes, multipart_threshold_bytes = determine_multipart_attributes(
10461075
destination_path, **kwargs
10471076
)
1048-
if not size_only:
10491077

1050-
def dest_hash():
1051-
return dest_s3_object.e_tag
1078+
def dest_hash() -> Optional[str]:
1079+
return dest_s3_object.e_tag if not size_only else None
10521080
elif isinstance(destination_path, Path) and destination_path.exists():
10531081
dest_local_path = destination_path
10541082
local_stats = dest_local_path.stat()
10551083
dest_last_modified = datetime.fromtimestamp(local_stats.st_mtime, tz=timezone.utc)
10561084
dest_size_bytes = local_stats.st_size
1057-
if not size_only:
10581085

1059-
def dest_hash():
1060-
return get_local_etag(
1086+
def dest_hash() -> Optional[str]:
1087+
return (
1088+
get_local_etag(
10611089
dest_local_path, multipart_chunk_size_bytes, multipart_threshold_bytes
10621090
)
1091+
if not size_only
1092+
else None
1093+
)
10631094
else:
10641095
return True
10651096

@@ -1070,21 +1101,23 @@ def dest_hash():
10701101
multipart_chunk_size_bytes, multipart_threshold_bytes = determine_multipart_attributes(
10711102
source_path, **kwargs
10721103
)
1073-
if not size_only:
10741104

1075-
def source_hash():
1076-
return src_s3_object.e_tag
1105+
def source_hash() -> Optional[str]:
1106+
return src_s3_object.e_tag if not size_only else None
10771107
elif isinstance(source_path, Path) and source_path.exists():
10781108
src_local_path = source_path
10791109
local_stats = src_local_path.stat()
10801110
source_last_modified = datetime.fromtimestamp(local_stats.st_mtime, tz=timezone.utc)
10811111
source_size_bytes = local_stats.st_size
1082-
if not size_only:
10831112

1084-
def source_hash():
1085-
return get_local_etag(
1113+
def source_hash() -> Optional[str]:
1114+
return (
1115+
get_local_etag(
10861116
src_local_path, multipart_chunk_size_bytes, multipart_threshold_bytes
10871117
)
1118+
if not size_only
1119+
else None
1120+
)
10881121
else:
10891122
raise ValueError(
10901123
f"Cannot transfer, source path {source_path} does not exist! "
@@ -1100,7 +1133,7 @@ def source_hash():
11001133
return True
11011134
if source_last_modified.replace(microsecond=0) > dest_last_modified.replace(microsecond=0):
11021135
return True
1103-
if not size_only and source_hash and dest_hash and source_hash() != dest_hash():
1136+
if not size_only and source_hash() != dest_hash():
11041137
return True
11051138
return False
11061139

@@ -1109,6 +1142,9 @@ def check_paths_in_sync(
11091142
source_path: Union[Path, S3URI],
11101143
destination_path: Union[Path, S3URI],
11111144
size_only: bool = False,
1145+
ignore_folder_placeholder_objects: bool = True,
1146+
allow_subset: bool = False,
1147+
max_workers: Optional[int] = None,
11121148
**kwargs,
11131149
) -> bool:
11141150
"""Checks whether source and destination paths are in sync.
@@ -1118,62 +1154,117 @@ def check_paths_in_sync(
11181154
destination_path (Union[Path, S3URI]): destination path
11191155
size_only (bool, optional): Limits content comparison to just size and date
11201156
(no checksum/ETag). Defaults to False.
1157+
ignore_folder_placeholder_objects (bool, optional): Whether to ignore S3 folder
1158+
placeholder objects (zero-byte objects with keys ending in '/'). Defaults to True.
1159+
allow_subset (bool, optional): Whether to allow source path to be a subset of
1160+
destination path. Defaults to False.
1161+
max_workers (Optional[int], optional): Number of worker threads to use for
1162+
parallel comparison. Defaults to None (ThreadPoolExecutor default).
11211163
11221164
Raises:
11231165
ValueError: if the source path does not exist.
11241166
11251167
Returns:
11261168
bool: True if paths are in sync, False, otherwise
11271169
"""
1128-
source_paths: Union[List[Path], List[S3URI]] = (
1129-
(
1130-
[Path(p) for p in sorted(find_paths(source_path, include_dirs=False))]
1131-
if source_path.is_dir()
1132-
else [source_path]
1133-
)
1134-
if isinstance(source_path, Path)
1135-
else (
1136-
sorted(list_s3_paths(source_path, **kwargs)) # type: ignore[arg-type]
1137-
if not is_object(source_path)
1138-
else [source_path] # type: ignore
1139-
)
1140-
)
1141-
destination_paths: Union[List[Path], List[S3URI]] = (
1142-
(
1143-
list(map(Path, sorted(find_paths(destination_path, include_dirs=False))))
1144-
if destination_path.is_dir()
1145-
else [destination_path]
1146-
)
1147-
if isinstance(destination_path, Path)
1148-
else (
1149-
sorted(list_s3_paths(destination_path, **kwargs)) # type: ignore[arg-type]
1150-
if not is_object(destination_path)
1151-
else [destination_path] # type: ignore
1152-
)
1153-
)
1170+
1171+
def _resolve_paths(path: Union[Path, S3URI]) -> List[Union[Path, S3URI]]:
1172+
if isinstance(path, Path):
1173+
if path.is_dir():
1174+
return list(map(Path, sorted(find_paths(path, include_dirs=False))))
1175+
else:
1176+
return [path]
1177+
else:
1178+
if is_object(path, **kwargs) and not is_folder_placeholder_object(path, **kwargs):
1179+
return [path]
1180+
else:
1181+
return [
1182+
_
1183+
for _ in sorted(list_s3_paths(path, **kwargs))
1184+
if (
1185+
not ignore_folder_placeholder_objects
1186+
or not is_folder_placeholder_object(_, **kwargs)
1187+
)
1188+
]
1189+
1190+
def _find_relative_path(full_path: Union[Path, S3URI], root_path: Union[Path, S3URI]) -> str:
1191+
if isinstance(full_path, Path) and isinstance(root_path, Path):
1192+
if full_path == root_path:
1193+
return ""
1194+
# Adding the leading "/" to ensure we return the leading "/" in the relative path for
1195+
# files under a folder. This is to be consistent with S3URI behavior.
1196+
relative_path = "/" + strip_path_root(full_path, root_path)
1197+
return relative_path
1198+
elif isinstance(full_path, S3URI) and isinstance(root_path, S3URI):
1199+
if full_path == root_path:
1200+
return ""
1201+
# Stripping the "/" to ensure we return the leading "/" in the relative path for
1202+
# objects under a folder. This means that if we have:
1203+
# root_path = `s3://bucket/folder/`
1204+
# or root_path = `s3://bucket/folder`,
1205+
# full_path = `s3://bucket/folder/subfolder/object.txt`
1206+
# The relative path returned will be: `/subfolder/object.txt`
1207+
1208+
relative_path = full_path.removeprefix(root_path.rstrip("/"))
1209+
return relative_path
1210+
else:
1211+
raise ValueError("Mismatched path types between full_path and root_path")
1212+
1213+
source_paths = _resolve_paths(source_path)
1214+
destination_paths = _resolve_paths(destination_path)
1215+
11541216
if len(source_paths) == 0:
11551217
raise ValueError(f"Source path {source_path} does not exist")
1156-
if len(source_paths) != len(destination_paths):
1218+
1219+
stripped_source_path_to_path_map: Dict[str, Union[Path, S3URI]] = {
1220+
_find_relative_path(sp, source_path): sp for sp in source_paths
1221+
}
1222+
1223+
stripped_destination_path_to_path_map: Dict[str, Union[Path, S3URI]] = {
1224+
_find_relative_path(dp, destination_path): dp for dp in destination_paths
1225+
}
1226+
1227+
missing_destination_paths = set(stripped_source_path_to_path_map.keys()).difference(
1228+
stripped_destination_path_to_path_map.keys()
1229+
)
1230+
if missing_destination_paths:
11571231
logger.info(
1158-
"Source and destination paths have different number of paths. "
1159-
f"Source path {source_path} has {len(source_paths)} paths, "
1160-
f"destination path {destination_path} has {len(destination_paths)} paths"
1232+
"The following source paths are missing in the destination path: "
1233+
f"{missing_destination_paths}"
11611234
)
11621235
return False
1163-
for sp, dp in zip(source_paths, destination_paths):
1164-
rsp = strip_path_root(str(sp).removeprefix("s3:"), str(source_path).removeprefix("s3:"))
1165-
rdp = strip_path_root(
1166-
str(dp).removeprefix("s3:"), str(destination_path).removeprefix("s3:")
1236+
if not allow_subset:
1237+
extra_destination_paths = set(stripped_destination_path_to_path_map.keys()).difference(
1238+
stripped_source_path_to_path_map.keys()
11671239
)
1168-
if rsp != rdp:
1240+
if extra_destination_paths:
11691241
logger.info(
1170-
f"Source path {sp} (relative={rsp}) does not match "
1171-
f"destination path {dp} (relative={rdp})"
1242+
"The following destination paths are extra compared to the source path: "
1243+
f"{extra_destination_paths}"
11721244
)
11731245
return False
1174-
if should_sync(source_path=sp, destination_path=dp, size_only=size_only, **kwargs): # type: ignore[arg-type] # mypy complains but sp/dp are Path|S3Path
1175-
logger.info(f"Source path {sp} content does not match destination path {dp}")
1176-
return False
1246+
1247+
# Run comparisons in parallel
1248+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
1249+
future_to_pair = {
1250+
executor.submit(
1251+
should_sync,
1252+
(sp := stripped_source_path_to_path_map[relative_path]),
1253+
(dp := stripped_destination_path_to_path_map[relative_path]),
1254+
size_only,
1255+
**kwargs,
1256+
): (sp, dp)
1257+
for relative_path in stripped_source_path_to_path_map
1258+
}
1259+
1260+
for future in as_completed(future_to_pair):
1261+
not_ok = future.result()
1262+
if not_ok:
1263+
# Cancel any still-pending futures; we already know it's not in sync.
1264+
for f in future_to_pair:
1265+
f.cancel()
1266+
return False
1267+
11771268
return True
11781269

11791270

0 commit comments

Comments
 (0)