-
Notifications
You must be signed in to change notification settings - Fork 133
WIP: Icechunk opener #1135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
WIP: Icechunk opener #1135
Conversation
|
I will automatically update this comment whenever this PR is modified
|
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
earthaccess/__init__.py
Outdated
| ) | ||
| from .auth import Auth | ||
| from .dmrpp_zarr import open_virtual_dataset, open_virtual_mfdataset | ||
| from .icechunk import _open_icechunk_from_url |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there an underscore prefix? This does not match up with the entry you added to __all__.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks for catching this @chuckwondo. My intention here was to hack most of this together and develop it further after an initial test (with lots of hardcoded bits) passes.
earthaccess/icechunk.py
Outdated
| # TODO: Figure out how to ensure authentication here. | ||
|
|
||
|
|
||
| def _get_daac_provider_from_url(url: str) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type is annotated incorrectly.
earthaccess/icechunk.py
Outdated
| @@ -0,0 +1,109 @@ | |||
| from datetime import datetime | |||
| from typing import Dict, List, Optional | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| from typing import Dict, List, Optional |
Co-authored-by: Chuck Daniels <[email protected]>
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
Thank you so much for looking into this @chuckwondo. |
|
Follow up on Slack discussion I had with @betolink and Ryan Abbott over on slack. The main thing I need for this functionality is a mapping from bucket (+prefix) to a credentials endpoint. With the help of claude I created a little script that crawls CMR to find this mapping, and crucially see if the mapping between each bucket and credentials endpoint is unique: Crawl Script v0#!/usr/bin/env python3
"""
Async CMR Query - Map S3 Buckets to Auth Endpoints
Crawls NASA CMR API and maps buckets (no prefix) to S3 credentials endpoints.
Warns about buckets with conflicting endpoints.
"""
import asyncio
import aiohttp
import json
from typing import Dict, Set, Tuple
from collections import defaultdict
async def fetch_page(
session: aiohttp.ClientSession,
base_url: str,
page_num: int,
page_size: int,
cloud_hosted: bool = True
) -> Tuple[int, list, int]:
"""
Fetch a single page from CMR API.
Returns:
Tuple of (page_num, items, total_hits)
"""
params = {
"page_size": page_size,
"page_num": page_num
}
if cloud_hosted:
params["cloud_hosted"] = "true"
try:
async with session.get(base_url, params=params, timeout=aiohttp.ClientTimeout(total=30)) as response:
response.raise_for_status()
data = await response.json()
items = data.get("items", [])
total_hits = int(response.headers.get("CMR-Hits", 0))
return page_num, items, total_hits
except Exception as e:
print(f" ✗ Error fetching page {page_num}: {e}")
return page_num, [], 0
async def query_cmr_async(
base_url: str = "https://cmr.earthdata.nasa.gov/search/collections.umm_json",
cloud_hosted: bool = True,
max_pages: int = 100,
page_size: int = 100,
concurrent_requests: int = 10
) -> Dict[str, Dict]:
"""
Asynchronously query CMR API and collect DirectDistributionInformation.
Args:
base_url: CMR API endpoint
cloud_hosted: Filter for cloud-hosted collections
max_pages: Maximum number of pages to fetch
page_size: Results per page
concurrent_requests: Number of concurrent requests
Returns:
Dictionary mapping concept_id to DirectDistributionInformation
"""
print(f"Starting async CMR query...")
print(f" Max pages: {max_pages}")
print(f" Page size: {page_size}")
print(f" Concurrent requests: {concurrent_requests}")
print()
results = {}
async with aiohttp.ClientSession() as session:
# First, fetch page 1 to get total hits
_, items, total_hits = await fetch_page(session, base_url, 1, page_size, cloud_hosted)
if items:
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
print(f"Total collections available: {total_hits}")
total_pages = min(max_pages, (total_hits + page_size - 1) // page_size)
print(f"Will fetch {total_pages} page(s)\n")
if total_pages <= 1:
return results
# Fetch remaining pages concurrently
tasks = []
for page_num in range(2, total_pages + 1):
task = fetch_page(session, base_url, page_num, page_size, cloud_hosted)
tasks.append(task)
# Process in batches to limit concurrency
if len(tasks) >= concurrent_requests:
batch_results = await asyncio.gather(*tasks)
for page_num, items, _ in batch_results:
if items:
print(f" ✓ Page {page_num}: {len(items)} collections")
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
tasks = []
# Process remaining tasks
if tasks:
batch_results = await asyncio.gather(*tasks)
for page_num, items, _ in batch_results:
if items:
print(f" ✓ Page {page_num}: {len(items)} collections")
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
print(f"\n✓ Collected {len(results)} collections with DirectDistributionInformation\n")
return results
def extract_bucket(s3_path: str) -> str:
"""Extract just the bucket name from S3 path."""
if s3_path.startswith('s3://'):
s3_path = s3_path[5:]
bucket = s3_path.split('/')[0]
return bucket
def create_bucket_mapping(results: Dict[str, Dict]) -> Tuple[Dict[str, str], Dict[str, Set[str]]]:
"""
Create mapping from bucket to endpoint.
Returns:
Tuple of (bucket_to_endpoint, bucket_conflicts)
where bucket_conflicts contains buckets with multiple endpoints
"""
print("Processing bucket mappings...")
# Track all endpoints seen for each bucket
bucket_endpoints = defaultdict(set)
for concept_id, info in results.items():
endpoint = info.get('S3CredentialsAPIEndpoint')
if not endpoint:
continue
s3_paths = info.get('S3BucketAndObjectPrefixNames', [])
for s3_path in s3_paths:
bucket = extract_bucket(s3_path)
bucket_endpoints[bucket].add(endpoint)
# Create final mapping (using first endpoint alphabetically for conflicts)
bucket_to_endpoint = {}
bucket_conflicts = {}
for bucket, endpoints in bucket_endpoints.items():
if len(endpoints) > 1:
# Conflict detected
bucket_conflicts[bucket] = endpoints
# Use first endpoint alphabetically
bucket_to_endpoint[bucket] = sorted(endpoints)[0]
else:
bucket_to_endpoint[bucket] = next(iter(endpoints))
print(f"✓ Mapped {len(bucket_to_endpoint)} unique buckets to endpoints")
print(f"⚠ Found {len(bucket_conflicts)} bucket(s) with conflicting endpoints\n")
return bucket_to_endpoint, bucket_conflicts
def print_conflicts(conflicts: Dict[str, Set[str]]):
"""Print warning about buckets with multiple endpoints."""
if not conflicts:
print("="*80)
print("✓ NO CONFLICTS - All buckets have consistent endpoints")
print("="*80)
return
print("="*80)
print("⚠ WARNING: BUCKETS WITH MULTIPLE ENDPOINTS")
print("="*80)
print(f"\nFound {len(conflicts)} bucket(s) with conflicting endpoints:\n")
for bucket, endpoints in sorted(conflicts.items()):
print(f"Bucket: {bucket}")
for endpoint in sorted(endpoints):
print(f" - {endpoint}")
print()
def print_summary(mapping: Dict[str, str], conflicts: Dict[str, Set[str]]):
"""Print summary statistics."""
print("="*80)
print("SUMMARY")
print("="*80)
unique_endpoints = len(set(mapping.values()))
print(f"Total unique buckets: {len(mapping)}")
print(f"Unique endpoints: {unique_endpoints}")
print(f"Buckets with conflicts: {len(conflicts)}")
# Group by endpoint
endpoint_groups = defaultdict(list)
for bucket, endpoint in mapping.items():
endpoint_groups[endpoint].append(bucket)
print(f"\nBuckets per endpoint:")
for endpoint, buckets in sorted(endpoint_groups.items(), key=lambda x: len(x[1]), reverse=True):
print(f" {endpoint}")
print(f" → {len(buckets)} bucket(s)")
async def main():
"""Main execution."""
print("\n" + "="*80)
print("NASA CMR ASYNC S3 BUCKET TO ENDPOINT MAPPER")
print("="*80 + "\n")
# Configuration
MAX_PAGES = 10000 # Adjust this to crawl more/fewer pages
PAGE_SIZE = 100 # Max is 2000, but 100 is more stable
CONCURRENT_REQUESTS = 10 # Number of simultaneous requests
# Step 1: Query CMR asynchronously
results = await query_cmr_async(
max_pages=MAX_PAGES,
page_size=PAGE_SIZE,
concurrent_requests=CONCURRENT_REQUESTS
)
if not results:
print("No results collected. Exiting.")
return
# Step 2: Create bucket mapping and detect conflicts
mapping, conflicts = create_bucket_mapping(results)
# Step 3: Print conflicts
print_conflicts(conflicts)
# Step 4: Print summary
print()
print_summary(mapping, conflicts)
# Step 5: Save outputs
print(f"\n{'='*80}")
print("SAVING RESULTS")
print("="*80)
with open('bucket_to_endpoint.json', 'w') as f:
json.dump(mapping, f, indent=2, sort_keys=True)
print("✓ Saved bucket_to_endpoint.json")
if conflicts:
conflicts_serializable = {k: list(v) for k, v in conflicts.items()}
with open('bucket_conflicts.json', 'w') as f:
json.dump(conflicts_serializable, f, indent=2, sort_keys=True)
print("✓ Saved bucket_conflicts.json")
with open('cmr_raw_results.json', 'w') as f:
json.dump(results, f, indent=2)
print("✓ Saved cmr_raw_results.json")
print(f"\n{'='*80}")
print("COMPLETE!")
print("="*80 + "\n")
if __name__ == "__main__":
asyncio.run(main())What I get as a result is that mostly that mapping is unique: {
"TestBucket": "www.testexample.com",
"asdc-prod-protected": "https://data.asdc.earthdata.nasa.gov/s3credentials",
"asf-cumulus-prod-alos2-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-aria-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-browse": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-browse": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-product": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-seasat-products": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-grd-7d1b4348": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-ocn-1e29d408": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-raw-98779950": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-slc-7b420b89": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-xml-8cf7476b": "https://sentinel1.asf.alaska.edu/s3credentials",
"csda-cumulus-prod-protected-5047": "https://data.csdap.earthdata.nasa.gov/s3credentials",
"gesdisc-cumulus-prod-protected": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"gesdisc-cumulus-prod-protectedAqua_AIRS_Level2": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"ghrcw-protected": "https://data.ghrc.earthdata.nasa.gov/s3credentials",
"ghrcwuat-protected": "https://data.ghrc.uat.earthdata.nasa.gov/s3credentials",
"lp-prod-protected": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-prod-public": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-protected": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-public": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-sit-protected": "https://data.lpdaac.sit.earthdatacloud.nasa.gov/s3credentials",
"lp-sit-public": "https://data.lpdaac.sit.earthdatacloud.nasa.gov/s3credentials",
"nsidc-cumulus-prod-protected": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"nsidc-cumulus-prod-public": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-prod-public": "https://obdaac-tea.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-sit-public": "https://obdaac-tea.sit.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-uat-public": "https://obdaac-tea.uat.earthdatacloud.nasa.gov/s3credentials",
"ornl-cumulus-prod-protected": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"ornl-cumulus-prod-public": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-docs": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-protected": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-public": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-swot-ops-cumulus-protected": "https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"podaac-swot-ops-cumulus-public": "https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"prod-lads": "https://data.laadsdaac.earthdatacloud.nasa.gov/s3credentials"
}with a few exceptions: {
"asf-cumulus-prod-opera-browse": [
"https://cumulus.asf.alaska.edu/s3credentials",
"https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials"
],
"asf-cumulus-prod-opera-products": [
"https://cumulus.asf.alaska.edu/s3credentials",
"https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials"
]
}Ill keep digging, to see if the bucket+first 'folder' is in fact unique! |
|
Ok i think this might just be sufficient for now. Crawl script V1#!/usr/bin/env python3
"""
Async CMR Query - Map S3 Buckets to Auth Endpoints
Crawls NASA CMR API and maps bucket/prefix keys to S3 credentials endpoints.
Warns about bucket/prefix keys with conflicting endpoints.
Note: In S3, there are no actual "folders" - only keys with delimiters (/).
What looks like a folder structure is just part of the object key.
"""
import asyncio
import aiohttp
import json
from typing import Dict, Set, Tuple
from collections import defaultdict
async def fetch_page(
session: aiohttp.ClientSession,
base_url: str,
page_num: int,
page_size: int,
cloud_hosted: bool = True
) -> Tuple[int, list, int]:
"""
Fetch a single page from CMR API.
Returns:
Tuple of (page_num, items, total_hits)
"""
params = {
"page_size": page_size,
"page_num": page_num
}
if cloud_hosted:
params["cloud_hosted"] = "true"
try:
async with session.get(base_url, params=params, timeout=aiohttp.ClientTimeout(total=30)) as response:
response.raise_for_status()
data = await response.json()
items = data.get("items", [])
total_hits = int(response.headers.get("CMR-Hits", 0))
return page_num, items, total_hits
except Exception as e:
print(f" ✗ Error fetching page {page_num}: {e}")
return page_num, [], 0
async def query_cmr_async(
base_url: str = "https://cmr.earthdata.nasa.gov/search/collections.umm_json",
cloud_hosted: bool = True,
max_pages: int = 100,
page_size: int = 100,
concurrent_requests: int = 10
) -> Dict[str, Dict]:
"""
Asynchronously query CMR API and collect DirectDistributionInformation.
Args:
base_url: CMR API endpoint
cloud_hosted: Filter for cloud-hosted collections
max_pages: Maximum number of pages to fetch
page_size: Results per page
concurrent_requests: Number of concurrent requests
Returns:
Dictionary mapping concept_id to DirectDistributionInformation
"""
print(f"Starting async CMR query...")
print(f" Max pages: {max_pages}")
print(f" Page size: {page_size}")
print(f" Concurrent requests: {concurrent_requests}")
print()
results = {}
async with aiohttp.ClientSession() as session:
# First, fetch page 1 to get total hits
_, items, total_hits = await fetch_page(session, base_url, 1, page_size, cloud_hosted)
if items:
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
print(f"Total collections available: {total_hits}")
total_pages = min(max_pages, (total_hits + page_size - 1) // page_size)
print(f"Will fetch {total_pages} page(s)\n")
if total_pages <= 1:
return results
# Fetch remaining pages concurrently
tasks = []
for page_num in range(2, total_pages + 1):
task = fetch_page(session, base_url, page_num, page_size, cloud_hosted)
tasks.append(task)
# Process in batches to limit concurrency
if len(tasks) >= concurrent_requests:
batch_results = await asyncio.gather(*tasks)
for page_num, items, _ in batch_results:
if items:
print(f" ✓ Page {page_num}: {len(items)} collections")
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
tasks = []
# Process remaining tasks
if tasks:
batch_results = await asyncio.gather(*tasks)
for page_num, items, _ in batch_results:
if items:
print(f" ✓ Page {page_num}: {len(items)} collections")
for item in items:
concept_id = item.get("meta", {}).get("concept-id")
if concept_id:
direct_dist_info = item.get("umm", {}).get("DirectDistributionInformation")
if direct_dist_info:
results[concept_id] = direct_dist_info
print(f"\n✓ Collected {len(results)} collections with DirectDistributionInformation\n")
return results
def extract_bucket_prefix_key(s3_path: str, prefix_depth: int = 0) -> str:
"""
Extract bucket and prefix up to specified depth from S3 path.
In S3, there are no actual folders - only object keys with '/' delimiters.
This function extracts the bucket and the first N prefix components.
Args:
s3_path: S3 path like 's3://bucket/prefix/component1/component2'
prefix_depth: How many prefix components to include (0 = bucket only)
Returns:
String like 'bucket' (depth=0) or 'bucket/prefix/component1' (depth=2)
Examples:
extract_bucket_prefix_key('s3://my-bucket/data/2024/file.txt', 0) -> 'my-bucket'
extract_bucket_prefix_key('s3://my-bucket/data/2024/file.txt', 1) -> 'my-bucket/data'
extract_bucket_prefix_key('s3://my-bucket/data/2024/file.txt', 2) -> 'my-bucket/data/2024'
"""
if s3_path.startswith('s3://'):
s3_path = s3_path[5:]
parts = s3_path.split('/')
bucket = parts[0]
if prefix_depth == 0:
return bucket
# Include bucket + prefix_depth components
# Handle case where path doesn't have enough components
end_idx = min(1 + prefix_depth, len(parts))
key = '/'.join(parts[:end_idx])
return key
def create_bucket_key_mapping_recursive(
results: Dict[str, Dict],
max_depth: int = 5
) -> Tuple[Dict[str, str], Dict[str, Set[str]]]:
"""
Create mapping from bucket/prefix key to endpoint.
Recursively increases depth only for keys that have conflicts.
Strategy:
1. Start at depth 0 (bucket only)
2. For any key with multiple endpoints, increase depth by 1
3. Repeat until no conflicts or max_depth reached
Args:
results: Dictionary of DirectDistributionInformation
max_depth: Maximum prefix depth to try
Returns:
Tuple of (key_to_endpoint, remaining_conflicts)
"""
print(f"Building bucket/prefix mapping with recursive conflict resolution...")
print(f"Maximum depth: {max_depth}\n")
# First, collect all S3 paths and their endpoints
path_endpoints = [] # List of (s3_path, endpoint)
for concept_id, info in results.items():
endpoint = info.get('S3CredentialsAPIEndpoint')
if not endpoint:
continue
s3_paths = info.get('S3BucketAndObjectPrefixNames', [])
for s3_path in s3_paths:
path_endpoints.append((s3_path, endpoint))
print(f"Total S3 paths to process: {len(path_endpoints)}\n")
# Track final mapping and paths that still need processing
final_mapping = {}
paths_to_process = path_endpoints # Start with all paths at depth 0
for depth in range(max_depth + 1):
if not paths_to_process:
break
print(f"Processing depth {depth}...")
# Build mapping at current depth for paths still being processed
key_endpoints = defaultdict(set)
key_original_paths = defaultdict(list) # Track which original paths map to each key
for s3_path, endpoint in paths_to_process:
key = extract_bucket_prefix_key(s3_path, depth)
key_endpoints[key].add(endpoint)
key_original_paths[key].append((s3_path, endpoint))
# Separate unique keys from conflicting keys
paths_still_conflicting = []
resolved_count = 0
conflict_count = 0
for key, endpoints in key_endpoints.items():
if len(endpoints) == 1:
# No conflict at this depth - add to final mapping
final_mapping[key] = next(iter(endpoints))
resolved_count += 1
else:
# Still conflicting - need to go deeper
if depth < max_depth:
# Add these paths back for processing at next depth
paths_still_conflicting.extend(key_original_paths[key])
conflict_count += 1
else:
# Max depth reached, pick first endpoint alphabetically
final_mapping[key] = sorted(endpoints)[0]
resolved_count += 1
conflict_count += 1
print(f" Resolved: {resolved_count} unique keys")
print(f" Conflicts: {conflict_count} keys")
if depth < max_depth and paths_still_conflicting:
print(f" → Moving {len(paths_still_conflicting)} paths to depth {depth + 1}")
paths_to_process = paths_still_conflicting
# Find any remaining conflicts (shouldn't happen unless max_depth reached)
remaining_conflicts = {}
key_endpoints_final = defaultdict(set)
for key, endpoint in final_mapping.items():
key_endpoints_final[key].add(endpoint)
# Re-scan to find actual conflicts in final mapping
# (can happen if max_depth is reached)
for concept_id, info in results.items():
endpoint = info.get('S3CredentialsAPIEndpoint')
if not endpoint:
continue
s3_paths = info.get('S3BucketAndObjectPrefixNames', [])
for s3_path in s3_paths:
# Find which key this path mapped to
for depth in range(max_depth + 1):
key = extract_bucket_prefix_key(s3_path, depth)
if key in final_mapping:
if final_mapping[key] != endpoint:
if key not in remaining_conflicts:
remaining_conflicts[key] = set()
remaining_conflicts[key].add(endpoint)
remaining_conflicts[key].add(final_mapping[key])
break
print(f"\n✓ Final mapping has {len(final_mapping)} unique bucket/prefix keys")
print(f"⚠ Unresolved conflicts: {len(remaining_conflicts)} keys\n")
return final_mapping, remaining_conflicts
def print_conflicts(conflicts: Dict[str, Set[str]]):
"""Print warning about bucket/prefix keys with multiple endpoints."""
if not conflicts:
print("="*80)
print("✓ NO CONFLICTS - All bucket/prefix keys have unique endpoints")
print("="*80)
return
print("="*80)
print("⚠ WARNING: BUCKET/PREFIX KEYS WITH MULTIPLE ENDPOINTS")
print("="*80)
print(f"\nFound {len(conflicts)} key(s) with unresolved conflicts:\n")
print("(These conflicts could not be resolved even at maximum depth)\n")
for key, endpoints in sorted(conflicts.items()):
depth = key.count('/')
print(f"Key: {key} (depth={depth})")
for endpoint in sorted(endpoints):
print(f" - {endpoint}")
print()
def print_summary(mapping: Dict[str, str], conflicts: Dict[str, Set[str]]):
"""Print summary statistics."""
print("="*80)
print("SUMMARY")
print("="*80)
unique_endpoints = len(set(mapping.values()))
print(f"Total unique bucket/prefix keys: {len(mapping)}")
print(f"Unique endpoints: {unique_endpoints}")
print(f"Unresolved conflicts: {len(conflicts)}")
# Show depth distribution
depth_counts = defaultdict(int)
for key in mapping.keys():
depth = key.count('/')
depth_counts[depth] += 1
print(f"\nDepth distribution:")
for depth in sorted(depth_counts.keys()):
print(f" Depth {depth}: {depth_counts[depth]} keys")
# Group by endpoint
endpoint_groups = defaultdict(list)
for key, endpoint in mapping.items():
endpoint_groups[endpoint].append(key)
print(f"\nKeys per endpoint:")
for endpoint, keys in sorted(endpoint_groups.items(), key=lambda x: len(x[1]), reverse=True):
print(f" {endpoint}")
print(f" → {len(keys)} key(s)")
# Show a few examples with their depths
if len(keys) <= 3:
for key in sorted(keys):
depth = key.count('/')
print(f" - {key} (depth={depth})")
else:
for key in sorted(keys)[:3]:
depth = key.count('/')
print(f" - {key} (depth={depth})")
print(f" ... and {len(keys) - 3} more")
async def main():
"""Main execution."""
print("\n" + "="*80)
print("NASA CMR ASYNC S3 BUCKET/PREFIX TO ENDPOINT MAPPER")
print("(Recursive Conflict Resolution)")
print("="*80 + "\n")
# Configuration
MAX_PAGES = 1000 # Adjust this to crawl more/fewer pages
PAGE_SIZE = 100 # Max is 2000, but 100 is more stable
CONCURRENT_REQUESTS = 10 # Number of simultaneous requests
MAX_DEPTH = 5 # Maximum prefix depth to try for conflict resolution
print(f"Configuration:")
print(f" Max depth for conflict resolution: {MAX_DEPTH}")
print(f" Strategy: Start at depth 0, increase depth only for conflicts")
print()
# Step 1: Query CMR asynchronously
results = await query_cmr_async(
max_pages=MAX_PAGES,
page_size=PAGE_SIZE,
concurrent_requests=CONCURRENT_REQUESTS
)
if not results:
print("No results collected. Exiting.")
return
# Step 2: Create bucket/prefix mapping with recursive conflict resolution
mapping, conflicts = create_bucket_key_mapping_recursive(results, max_depth=MAX_DEPTH)
# Step 3: Print conflicts
print_conflicts(conflicts)
# Step 4: Print summary
print()
print_summary(mapping, conflicts)
# Step 5: Save outputs
print(f"\n{'='*80}")
print("SAVING RESULTS")
print("="*80)
with open('bucket_to_endpoint.json', 'w') as f:
json.dump(mapping, f, indent=2, sort_keys=True)
print("✓ Saved bucket_to_endpoint.json")
if conflicts:
conflicts_serializable = {k: list(v) for k, v in conflicts.items()}
with open('bucket_conflicts.json', 'w') as f:
json.dump(conflicts_serializable, f, indent=2, sort_keys=True)
print("✓ Saved bucket_conflicts.json")
with open('cmr_raw_results.json', 'w') as f:
json.dump(results, f, indent=2)
print("✓ Saved cmr_raw_results.json")
print(f"\n{'='*80}")
print("COMPLETE!")
print("="*80 + "\n")
if __name__ == "__main__":
asyncio.run(main())gives: {
"TestBucket": "www.testexample.com",
"asdc-prod-protected": "https://data.asdc.earthdata.nasa.gov/s3credentials",
"asf-cumulus-prod-alos2-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-aria-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-browse": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L2_CSLC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L2_RTC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L4_TROPO-ZENITH_V1": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-product": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_CSLC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_CSLC-S1_STATIC": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_RTC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_RTC-S1_STATIC": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L4_TROPO-ZENITH_V1": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-seasat-products": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-grd-7d1b4348": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-ocn-1e29d408": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-raw-98779950": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-slc-7b420b89": "https://sentinel1.asf.alaska.edu/s3credentials",
"asf-ngap2w-p-s1-xml-8cf7476b": "https://sentinel1.asf.alaska.edu/s3credentials",
"csda-cumulus-prod-protected-5047": "https://data.csdap.earthdata.nasa.gov/s3credentials",
"gesdisc-cumulus-prod-protected": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"gesdisc-cumulus-prod-protectedAqua_AIRS_Level2": "https://data.gesdisc.earthdata.nasa.gov/s3credentials",
"ghrcw-protected": "https://data.ghrc.earthdata.nasa.gov/s3credentials",
"ghrcwuat-protected": "https://data.ghrc.uat.earthdata.nasa.gov/s3credentials",
"lp-prod-protected": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-prod-public": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-protected": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-public": "https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials",
"lp-sit-protected": "https://data.lpdaac.sit.earthdatacloud.nasa.gov/s3credentials",
"lp-sit-public": "https://data.lpdaac.sit.earthdatacloud.nasa.gov/s3credentials",
"nsidc-cumulus-prod-protected": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"nsidc-cumulus-prod-public": "https://data.nsidc.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-prod-public": "https://obdaac-tea.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-sit-public": "https://obdaac-tea.sit.earthdatacloud.nasa.gov/s3credentials",
"ob-cumulus-uat-public": "https://obdaac-tea.uat.earthdatacloud.nasa.gov/s3credentials",
"ornl-cumulus-prod-protected": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"ornl-cumulus-prod-public": "https://data.ornldaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-docs": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-protected": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-ops-cumulus-public": "https://archive.podaac.earthdata.nasa.gov/s3credentials",
"podaac-swot-ops-cumulus-protected": "https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"podaac-swot-ops-cumulus-public": "https://archive.swot.podaac.earthdata.nasa.gov/s3credentials",
"prod-lads": "https://data.laadsdaac.earthdatacloud.nasa.gov/s3credentials"
}I am inclined to just commit this mapping to the repo and add the script so we could update it quickly if things change? Or is this a really crappy idea? |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
This probably makes the most sense for now.
These both should work identically but It'd be worth prioritizing the |
|
Also, these ones: {
"asf-cumulus-prod-opera-browse/OPERA_L2_CSLC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L2_RTC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-browse/OPERA_L4_TROPO-ZENITH_V1": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_CSLC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_CSLC-S1_STATIC": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_RTC-S1": "https://cumulus.asf.alaska.edu/s3credentials",
"asf-cumulus-prod-opera-products/OPERA_L2_RTC-S1_STATIC": "https://cumulus.asf.alaska.edu/s3credentials",
}All have a prefix included with the bucket name and could just be: {
"asf-cumulus-prod-opera-browse": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials",
"asf-cumulus-prod-opera-products": "https://cumulus.asf.earthdatacloud.nasa.gov/s3credentials"
} |
Ah that is super helpful. @jhkennedy will you be at the hack on tuesday by any chance? |
|
Another question for DAAC folks here: Is there any way to add a dummy icechunk store to any of the EDL authenticated buckets? I think that might be the easiest way to test the top level functionality here. I am revising the structure of the code quite a bit at the moment. My plan is to support two main usecases: 1. "Full EDL" case - Icechunk store and any virtual chunks (if present) are within EDL bucketsfrom earthaccess.icechunk import open_icechunk_from_url
url = 's3://some-edl-bucket/pointing/to/ic/store' # how to get this url will be solved by different logic
store = open_icechunk_from_url(url)simple as that, but at this point this is a non-existent use case AFAICT? 2. "Virtual EDL Chunks" Icechunk store is wherever, but all the virtual chunks point to one or more EDL buckets:import icechunk as ic
from earthaccess.icechunk import get_virtual_chunk_credentials
storage = ... # configure your custom icechunk storage
vchunk_credentials = get_virtual_chunk_credentials(storage)
repo = ic.Repository.open(storage=storage, authorize_virtual_chunk_access=vchunk_credentials)
...This is not quite as automatic but will actually help a lot of current use cases I think. It also is quite a LOT shorter than what I have to do here for example. I think this would even enable more 'frankenstein-ish' cases, where an icechunk repo points to some EDL, and some non-EDL buckets (to be tested). |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
User jbusecke does not have permission to run integration tests. A maintainer must perform a security review of the code changes in this pull request and re-run the failed integration tests jobs, if the code is deemed safe. |
|
@betolink could you assign me to this PR so we can track it easier on the devseed side? |
|
Sorry @jbusecke I'm out of office this week, thanks @chuckwondo for the quick response! |
This is a first start towards building an icechunk opener for earthaccess (see #1132 for context).
This PR depends on #1154
This is still very rough and might change a lot
This implements a very minimalist test and opener function in
earthaccess.icechunk._open_icechunk_from_url. There are a ton of todos and questions, but let me try to point out the most pressing ones.Pull Request (PR) draft checklist - click to expand
contributing documentation
before getting started.
title such as "Add testing details to the contributor section of the README".
Example PRs: #763
example
closes #1. SeeGitHub docs - Linking a pull request to an issue.
CHANGELOG.mdwith details about your change in a section titled## Unreleased. If such a section does not exist, please create one. FollowCommon Changelog for your additions.
Example PRs: #763
README.mdwith details of changes to theearthaccess interface, if any. Consider new environment variables, function names,
decorators, etc.
Click the "Ready for review" button at the bottom of the "Conversation" tab in GitHub
once these requirements are fulfilled. Don't worry if you see any test failures in
GitHub at this point!
Pull Request (PR) merge checklist - click to expand
Please do your best to complete these requirements! If you need help with any of these
requirements, you can ping the
@nsidc/earthaccess-supportteam in a comment and wewill help you out!
Request containing "pre-commit.ci autofix" to automate this.
📚 Documentation preview 📚: https://earthaccess--1135.org.readthedocs.build/en/1135/