22import time
33import jsonpatch
44
5- from globus_sdk import AccessTokenAuthorizer , ConfidentialAppAuthClient , SearchClient
5+ from globus_sdk import ClientCredentialsAuthorizer , ConfidentialAppAuthClient , SearchClient
66from globus_sdk .scopes import SearchScopes
7+ from globus_sdk .services .search .errors import SearchAPIError
78
89
910class ConsumerSearchClient :
@@ -12,11 +13,10 @@ def __init__(self, credentials, search_index, error_producer):
1213 client_id = credentials .get ("client_id" ),
1314 client_secret = credentials .get ("client_secret" ),
1415 )
15- token_response = confidential_client .oauth2_client_credentials_tokens (
16- requested_scopes = SearchScopes .all ,
16+ authorizer = ClientCredentialsAuthorizer (
17+ confidential_client ,
18+ scopes = SearchScopes .all ,
1719 )
18- search_tokens = token_response .by_resource_server .get ("search.api.globus.org" )
19- authorizer = AccessTokenAuthorizer (search_tokens .get ("access_token" ))
2020 self .search_client = SearchClient (authorizer = authorizer )
2121 self .esgf_index = search_index
2222 self .error_producer = error_producer
@@ -68,18 +68,24 @@ def ingest(self, messages_data):
6868
6969 def post (self , message_data ):
7070 item = message_data .get ("data" ).get ("payload" ).get ("item" )
71- globus_response = self .search_client .get_subject (self .esgf_index , item .get ("id" ))
71+ try :
72+ globus_response = self .search_client .get_subject (self .esgf_index , item .get ("id" ))
73+ except SearchAPIError as e :
74+ if e .http_status == 404 :
75+ item ["assets" ] = self .convert_assets (item .get ("assets" ))
76+ gmeta_entry = self .gmetaentry (item )
77+ return gmeta_entry
78+
7279 if globus_response .data :
7380 logging .info (f"Item with ID { item .get ('id' )} already exists in the index." )
7481 self .error_producer .produce (
7582 topic = "esgf-local.errors" ,
7683 key = item .get ("id" ),
7784 value = f"Item with ID { item .get ('id' )} already exists in the index." ,
7885 )
86+ print ("Item already exists, returning None" )
7987 return None
80- item ["assets" ] = self .convert_assets (item .get ("assets" ))
81- gmeta_entry = self .gmetaentry (item )
82- return gmeta_entry
88+ return None
8389
8490 def json_patch (self , message_data ):
8591 payload = message_data .get ("data" ).get ("payload" )
@@ -93,9 +99,7 @@ def json_patch(self, message_data):
9399 value = f"Item with ID { item_id } does not exist in the index." ,
94100 )
95101 return None
96- gmeta_entry = jsonpatch .apply_patch (
97- globus_response .data .get ("content" ), payload .get ("patch" )
98- )
102+ gmeta_entry = jsonpatch .apply_patch (globus_response .data .get ("content" ), payload .get ("patch" ))
99103 return gmeta_entry
100104
101105 def delete (self , subject ):
@@ -115,6 +119,7 @@ def process_message(self, message_data):
115119 try :
116120 payload = message_data .get ("data" ).get ("payload" )
117121 method = payload .get ("method" )
122+ print (f"Processing message with method: { method } " )
118123 if method == "POST" :
119124 return self .post (message_data )
120125 if method == "PUT" :
@@ -140,12 +145,13 @@ def process_messages(self, messages_data):
140145 if entry :
141146 gmeta .append (entry )
142147 if not gmeta :
143- return False
148+ return True
144149
145150 gmetalist = {"ingest_type" : "GMetaList" , "ingest_data" : {"gmeta" : gmeta }}
146151
147152 r = self .search_client .ingest (self .esgf_index , gmetalist )
148153 task_id = r .get ("task_id" )
154+ print ("Ingested successfully, waiting for task to complete..." )
149155
150156 while True :
151157 r = self .search_client .get_task (task_id )
0 commit comments