22
33This module contains common logic used by both S1 and S2 fetching modules,
44reducing code duplication and providing a consistent interface.
5+
6+ Product-level deduplication:
7+ Files are named with the Copernicus product ID embedded: {product_id}__{safe_name}.ext
8+ This allows us to detect already-downloaded products regardless of which query found them.
9+ Two different bboxes that return the same Copernicus tile will share the download.
510"""
611
712import json
13+ import zipfile
814from pathlib import Path
915from typing import TYPE_CHECKING , Any , Callable , Dict , List , Optional
1016
1117if TYPE_CHECKING :
1218 from .client import CopernicusClient
1319
1420
21+ def find_product_on_disk (
22+ cache_dir : Path ,
23+ satellite_subdir : str ,
24+ product_id : str ,
25+ ) -> Optional [Path ]:
26+ """Check if a Copernicus product is already downloaded by scanning filenames for its ID.
27+
28+ Files are named as {product_id}__{safe_name}.ext, so we glob for {product_id}__*
29+ to find any existing download regardless of which query originally fetched it.
30+
31+ Args:
32+ cache_dir: Root cache directory (e.g. data/cache/copernicus)
33+ satellite_subdir: Subdirectory for the satellite type ("s1" or "s2")
34+ product_id: Copernicus product UUID (e.g. "a8dd0899-7a3b-4e4b-9b3a-5e7f1234abcd")
35+
36+ Returns:
37+ Path to the existing file if found and non-empty, None otherwise
38+ """
39+ subdir = cache_dir / satellite_subdir
40+ if not subdir .exists ():
41+ return None
42+
43+ matches = list (subdir .glob (f"{ product_id } __*" ))
44+ for match in matches :
45+ if match .exists () and match .stat ().st_size > 0 :
46+ # For zip files, verify the archive isn't truncated/corrupted
47+ if match .suffix == ".zip" and not _is_valid_zip (match ):
48+ print (f"⚠️ Corrupted zip detected, removing: { match .name } " )
49+ match .unlink ()
50+ continue
51+ return match
52+ return None
53+
54+
55+ def _is_valid_zip (path : Path ) -> bool :
56+ """Quick integrity check for a zip file.
57+
58+ Reads the central directory (at the end of the file) and runs CRC checks
59+ on all entries. A truncated download will fail here because the central
60+ directory is written last.
61+
62+ Args:
63+ path: Path to the zip file
64+
65+ Returns:
66+ True if the zip is structurally valid, False otherwise
67+ """
68+ try :
69+ with zipfile .ZipFile (path , "r" ) as zf :
70+ bad = zf .testzip ()
71+ return bad is None
72+ except (zipfile .BadZipFile , OSError ):
73+ return False
74+
75+
1576def check_cache (
1677 cache_file : Path ,
1778) -> Optional [List [Path ]]:
@@ -143,6 +204,10 @@ def process_products(
143204) -> List [Path ]:
144205 """Process products by downloading or creating metadata.
145206
207+ Uses product-level deduplication: before downloading, checks if the product
208+ (identified by its Copernicus UUID) already exists on disk from a previous
209+ query. This prevents re-downloading the same tile when the bbox shifts slightly.
210+
146211 Args:
147212 client: CopernicusClient instance
148213 products: List of products to process
@@ -156,12 +221,25 @@ def process_products(
156221 List of paths to downloaded/created files
157222 """
158223 downloaded_paths : List [Path ] = []
224+ sat_subdir = "s1" if "1" in satellite else "s2"
225+ skipped = 0
159226
160227 if download_data :
161228 print (f"\n 📥 DOWNLOADING { satellite } IMAGERY" )
162229 print ("=" * 45 )
163230
164231 for i , product in enumerate (products , 1 ):
232+ product_id = product .get ("Id" , "" )
233+
234+ # Product-level dedup: check if this product ID is already on disk
235+ if product_id :
236+ existing = find_product_on_disk (client .cache_dir , sat_subdir , product_id )
237+ if existing :
238+ print (f"\n ⏭️ Product { i } /{ len (products )} already on disk: { existing .name } " )
239+ downloaded_paths .append (existing )
240+ skipped += 1
241+ continue
242+
165243 print (f"\n 🛰️ Downloading product { i } /{ len (products )} " )
166244
167245 downloaded_file = download_func (client , product , i - 1 , ** kwargs )
@@ -175,8 +253,22 @@ def process_products(
175253 print ("=" * 35 )
176254
177255 for i , product in enumerate (products ):
256+ product_id = product .get ("Id" , "" )
257+
258+ # Product-level dedup for metadata files too
259+ if product_id :
260+ existing = find_product_on_disk (client .cache_dir , sat_subdir , product_id )
261+ if existing :
262+ print (f"⏭️ Metadata for product already on disk: { existing .name } " )
263+ downloaded_paths .append (existing )
264+ skipped += 1
265+ continue
266+
178267 metadata_file = metadata_func (client , product , i , ** kwargs )
179268 if metadata_file :
180269 downloaded_paths .append (metadata_file )
181270
271+ if skipped :
272+ print (f"\n 🎯 Skipped { skipped } /{ len (products )} products (already downloaded)" )
273+
182274 return downloaded_paths
0 commit comments