10
10
# Define Merkle fields to exclude from hashing
11
11
MERKLE_FIELDS = {"merkle:object_hash" , "merkle:hash_method" , "merkle:root" }
12
12
13
+ def remove_merkle_fields (data ):
14
+ """
15
+ Recursively removes Merkle fields from a nested dictionary.
16
+ """
17
+ if isinstance (data , dict ):
18
+ return {
19
+ k : remove_merkle_fields (v )
20
+ for k , v in data .items ()
21
+ if k not in MERKLE_FIELDS
22
+ }
23
+ elif isinstance (data , list ):
24
+ return [remove_merkle_fields (v ) for v in data ]
25
+ else :
26
+ return data
27
+
13
28
def compute_merkle_object_hash (
14
29
stac_object : Dict [str , Any ],
15
- hash_method : Dict [str , Any ],
16
- is_item : bool = False ,
30
+ hash_method : Dict [str , Any ]
17
31
) -> str :
18
32
"""
19
33
Computes the merkle:object_hash for a STAC object (Catalog, Collection, or Item).
20
34
21
35
Parameters:
22
36
- stac_object (dict): The STAC object JSON content.
23
37
- hash_method (dict): The hash method details from merkle:hash_method.
24
- - is_item (bool): Indicates if the object is an Item (Feature).
25
38
26
39
Returns:
27
40
- str: The computed Merkle object hash as a hexadecimal string.
28
41
"""
29
42
fields = hash_method .get ('fields' , ['*' ])
30
43
if fields == ['*' ] or fields == ['all' ]:
31
- # Exclude Merkle fields
32
- if is_item :
33
- # For Items, Merkle fields are within 'properties'
34
- properties = stac_object .get ('properties' , {})
35
- data_to_hash = {k : v for k , v in properties .items () if k not in MERKLE_FIELDS }
36
- else :
37
- # For Collections and Catalogs, Merkle fields are at the top level
38
- data_to_hash = {k : v for k , v in stac_object .items () if k not in MERKLE_FIELDS }
44
+ # Exclude Merkle fields from all levels
45
+ data_to_hash = remove_merkle_fields (stac_object )
39
46
else :
40
- if is_item :
41
- # Include only specified fields, excluding Merkle fields
42
- properties = stac_object .get ('properties' , {})
43
- data_to_hash = {field : properties .get (field ) for field in fields if field not in MERKLE_FIELDS }
44
- else :
45
- # For Collections and Catalogs
46
- data_to_hash = {field : stac_object .get (field ) for field in fields if field not in MERKLE_FIELDS }
47
-
47
+ # Include only specified fields, then remove Merkle fields
48
+ selected_data = {field : stac_object .get (field ) for field in fields if field in stac_object }
49
+ data_to_hash = remove_merkle_fields (selected_data )
48
50
# Serialize the data to a canonical JSON string
49
51
json_str = json .dumps (data_to_hash , sort_keys = True , separators = (',' , ':' ))
50
-
51
52
# Get the hash function
52
53
hash_function_name = hash_method .get ('function' , 'sha256' ).replace ('-' , '' ).lower ()
53
54
hash_func = getattr (hashlib , hash_function_name , None )
54
55
if not hash_func :
55
56
raise ValueError (f"Unsupported hash function: { hash_function_name } " )
56
-
57
57
# Compute the hash
58
58
merkle_object_hash = hash_func (json_str .encode ('utf-8' )).hexdigest ()
59
59
return merkle_object_hash
@@ -120,13 +120,11 @@ def process_item(item_path: Path, hash_method: Dict[str, Any]) -> str:
120
120
item_json = json .load (f )
121
121
122
122
# Compute merkle:object_hash
123
- own_hash = compute_merkle_object_hash (item_json , hash_method , is_item = True )
123
+ own_hash = compute_merkle_object_hash (item_json , hash_method )
124
124
125
- # Add Merkle fields to 'properties'
125
+ # Add merkle:object_hash to 'properties'
126
126
properties = item_json .setdefault ('properties' , {})
127
127
properties ['merkle:object_hash' ] = own_hash
128
- # Remove the following line to avoid adding 'merkle:hash_method' to Items
129
- # properties['merkle:hash_method'] = hash_method
130
128
131
129
# Ensure the Merkle extension is listed
132
130
item_json .setdefault ('stac_extensions' , [])
@@ -163,10 +161,7 @@ def process_collection(collection_path: Path, parent_hash_method: Dict[str, Any]
163
161
collection_json = json .load (f )
164
162
165
163
# Determine the hash_method to use
166
- if 'merkle:hash_method' in collection_json :
167
- hash_method = collection_json ['merkle:hash_method' ]
168
- else :
169
- hash_method = parent_hash_method
164
+ hash_method = collection_json .get ('merkle:hash_method' , parent_hash_method )
170
165
171
166
if not hash_method :
172
167
raise ValueError (f"Hash method not specified for { collection_path } " )
@@ -182,7 +177,7 @@ def process_collection(collection_path: Path, parent_hash_method: Dict[str, Any]
182
177
item_hashes .append (item_hash )
183
178
184
179
# Compute merkle:object_hash
185
- own_hash = compute_merkle_object_hash (collection_json , hash_method , is_item = False )
180
+ own_hash = compute_merkle_object_hash (collection_json , hash_method )
186
181
collection_json ['merkle:object_hash' ] = own_hash
187
182
item_hashes .append (own_hash )
188
183
@@ -231,7 +226,7 @@ def process_catalog(catalog_path: Path) -> str:
231
226
'function' : 'sha256' ,
232
227
'fields' : ['*' ],
233
228
'ordering' : 'ascending' ,
234
- 'description' : 'Computed by including merkle:object_hash values in ascending order and building the Merkle tree.'
229
+ 'description' : 'Computed by excluding Merkle fields and including merkle:object_hash values in ascending order to build the Merkle tree.'
235
230
}
236
231
237
232
# Process collections in the collections folder
@@ -254,7 +249,7 @@ def process_catalog(catalog_path: Path) -> str:
254
249
click .echo (f"collection.json not found in { collection_dir } " , err = True )
255
250
256
251
# Compute merkle:object_hash
257
- own_hash = compute_merkle_object_hash (catalog_json , hash_method , is_item = False )
252
+ own_hash = compute_merkle_object_hash (catalog_json , hash_method )
258
253
catalog_json ['merkle:object_hash' ] = own_hash
259
254
collection_hashes .append (own_hash )
260
255
@@ -283,25 +278,3 @@ def process_catalog(catalog_path: Path) -> str:
283
278
except Exception as e :
284
279
click .echo (f"Error processing Catalog { catalog_path } : { e } " , err = True )
285
280
return ''
286
-
287
- @click .command ()
288
- @click .argument ('catalog_path' , type = click .Path (exists = True , file_okay = True , dir_okay = False ))
289
- def main (catalog_path ):
290
- """
291
- Computes and adds Merkle info to each STAC object in the catalog.
292
-
293
- CATALOG_PATH is the path to your root catalog.json file.
294
- """
295
- catalog_path = Path (catalog_path ).resolve ()
296
-
297
- if not catalog_path .exists ():
298
- click .echo (f"Catalog file does not exist: { catalog_path } " , err = True )
299
- return
300
-
301
- # Process the root catalog
302
- process_catalog (catalog_path )
303
-
304
- click .echo ("Merkle info computation and addition completed." )
305
-
306
- if __name__ == '__main__' :
307
- main ()
0 commit comments