Skip to content

Commit 3b38cd3

Browse files
committed
CMR-10542: Modifying subscription worker to get the granule record as umm_json to create the granule subscription notification
1 parent 831be8b commit 3b38cd3

5 files changed

Lines changed: 499 additions & 7 deletions

File tree

subscription/src/search.py

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
import os
2+
import json
3+
import requests
4+
from env_vars import Env_Vars
5+
from sys import stdout
6+
from typing import Dict
7+
from logger import logger
8+
import xml.etree.ElementTree as ET
9+
10+
class Search:
11+
"""Encapsulates Search API.
12+
This class needs the following environment variables set with an example value:
13+
For local development:
14+
SEARCH_URL=http://localhost:3003/
15+
16+
For AWS:
17+
ENVIRONMENT_NAME=sit
18+
CMR_SEARCH_PROTOCOL=https
19+
CMR_SEARCH_PORT=80
20+
CMR_SEARCH_HOST=<internal load balancer>
21+
CMR_SEARCH_RELATIVE_ROOT_URL=search
22+
TOKEN=<token>
23+
24+
Example Use of this class
25+
search = Search()
26+
response = search.get_concept('token', 'G1200484253-CMR_ONLY')
27+
The call is the same as 'curl -H "Authorization: <token>" https://cmr.sit.earthdata.nasa.gov/search/concepts/G1200484253-CMR_ONLY.umm_json'
28+
Return is either None (Null or Nil) (if check on response is false) or
29+
The concept in umm_json format}
30+
"""
31+
32+
def __init__(self):
33+
""" Sets up a class variable of url."""
34+
self.url = None
35+
self.public_search_url = None
36+
self.token = None
37+
38+
def get_url_from_parameter_store(self):
39+
"""This function returns the URL for the search service. For local development the full URL can be provided. Otherwise the
40+
environment name that is used for the parameter store prefix is obtained from an environment variable. This variable is used to
41+
get the parameter store ingest values to construct the search service URL."""
42+
43+
# Search URL is for local development
44+
search_url = os.getenv("SEARCH_URL")
45+
46+
if search_url:
47+
self.url = search_url
48+
logger.debug(f"Subscription Worker Search URL: {self.url}")
49+
return
50+
else:
51+
# This block gets the search URL from the AWS parameter store.
52+
environment_name = os.getenv("ENVIRONMENT_NAME")
53+
54+
if not environment_name:
55+
logger.error("ENVIRONMENT_NAME environment variable is not set")
56+
raise ValueError("ENVIRONMENT_NAME environment variable is not set")
57+
58+
# construct the search parameter names from the environment variable
59+
env_name = environment_name.lower()
60+
pre_fix = f"/{env_name}/ingest/"
61+
protocol_param_name = f"{pre_fix}CMR_SEARCH_PROTOCOL"
62+
port_param_name = f"{pre_fix}CMR_SEARCH_PORT"
63+
host_param_name = f"{pre_fix}CMR_SEARCH_HOST"
64+
context_param_name = f"{pre_fix}CMR_SEARCH_RELATIVE_ROOT_URL"
65+
66+
env_vars = Env_Vars()
67+
protocol = env_vars.get_env_var_from_parameter_store(parameter_name=protocol_param_name)
68+
port = env_vars.get_env_var_from_parameter_store(parameter_name=port_param_name)
69+
host = env_vars.get_env_var_from_parameter_store(parameter_name=host_param_name)
70+
context = env_vars.get_env_var_from_parameter_store(parameter_name=context_param_name)
71+
72+
# The context already contains the forward / so we don't need it here.
73+
self.url = f"{protocol}://{host}:{port}{context}"
74+
logger.debug(f"Subscription Worker Search URL: {self.url}")
75+
76+
def get_url(self):
77+
"""This function returns the search URL if it has already been constructed, otherwise it constructs the URL and then returns it."""
78+
if not self.url:
79+
self.get_url_from_parameter_store()
80+
return self.url
81+
82+
def get_public_search_url_from_parameter_store(self):
83+
"""This function returns the URL for the public search service. For local development the full URL can be provided. Otherwise the
84+
environment name that is used for the parameter store prefix is obtained from an environment variable. This variable is used to
85+
get the parameter store ingest values to construct the search service URL."""
86+
87+
# Search URL is for local development
88+
public_search_url = os.getenv("PUBLIC_SEARCH_URL")
89+
90+
if public_search_url:
91+
self.public_search_url = public_search_url
92+
logger.debug(f"Subscription Worker Search URL: {self.public_search_url}")
93+
return
94+
else:
95+
# This block gets the search URL from the AWS parameter store.
96+
environment_name = os.getenv("ENVIRONMENT_NAME")
97+
98+
if not environment_name:
99+
logger.error("ENVIRONMENT_NAME environment variable is not set")
100+
raise ValueError("ENVIRONMENT_NAME environment variable is not set")
101+
102+
# construct the search parameter names from the environment variable
103+
env_name = environment_name.lower()
104+
pre_fix = f"/{env_name}/search/"
105+
protocol_param_name = f"{pre_fix}CMR_SEARCH_PUBLIC_PROTOCOL"
106+
port_param_name = f"{pre_fix}CMR_SEARCH_PUBLIC_PORT"
107+
host_param_name = f"{pre_fix}CMR_SEARCH_PUBLIC_HOST"
108+
context_param_name = f"{pre_fix}CMR_SEARCH_RELATIVE_ROOT_URL"
109+
110+
env_vars = Env_Vars()
111+
protocol = env_vars.get_env_var_from_parameter_store(parameter_name=protocol_param_name)
112+
port = env_vars.get_env_var_from_parameter_store(parameter_name=port_param_name)
113+
host = env_vars.get_env_var_from_parameter_store(parameter_name=host_param_name)
114+
context = env_vars.get_env_var_from_parameter_store(parameter_name=context_param_name)
115+
116+
# The context already contains the forward / so we don't need it here.
117+
self.public_search_url = f"{protocol}://{host}:{port}{context}"
118+
logger.debug(f"Subscription Worker Public Search URL: {self.public_search_url}")
119+
120+
def get_public_search_url(self):
121+
"""This function returns the public search URL if it has already been constructed, otherwise it constructs the URL and then returns it."""
122+
if not self.public_search_url:
123+
self.get_public_search_url_from_parameter_store()
124+
return self.public_search_url
125+
126+
def get_token_from_parameter_store(self):
127+
"""This function returns the token for the search service. Otherwise the
128+
environment name that is used for the parameter store prefix is obtained from an environment variable. This variable is used to
129+
get the parameter store ingest values to construct the access control service URL."""
130+
131+
# token is for local development
132+
token = os.getenv("CMR_ECHO_SYSTEM_TOKEN")
133+
134+
if token:
135+
self.token = token
136+
return
137+
else:
138+
# This block gets the token from the AWS parameter store.
139+
environment_name = os.getenv("ENVIRONMENT_NAME")
140+
141+
if not environment_name:
142+
logger.error("ENVIRONMENT_NAME environment variable is not set")
143+
raise ValueError("ENVIRONMENT_NAME environment variable is not set")
144+
145+
# construct the the token parameter names from the environment variable
146+
env_name = environment_name.lower()
147+
token_name = f"/{env_name}/ingest/CMR_ECHO_SYSTEM_TOKEN"
148+
149+
env_vars = Env_Vars()
150+
self.token = env_vars.get_env_var_from_parameter_store(token_name)
151+
152+
def get_token(self):
153+
"""This function returns the token if it has already been constructed, otherwise it gets the token and then returns it."""
154+
if not self.token:
155+
self.get_token_from_parameter_store()
156+
return self.token
157+
158+
def get_concept(self, concept_id):
159+
"""This function calls search using a token, and a CMR concept id to get
160+
a granule by concept id in umm_json format."""
161+
162+
# Set the search concepts URL.
163+
url = f"{self.get_url()}/concepts/{concept_id}.umm_json"
164+
165+
# Set the headers
166+
headers = {
167+
"Authorization": self.get_token()
168+
}
169+
170+
# Make a GET request
171+
response = requests.get(url, headers=headers)
172+
173+
# Check if the request was successful
174+
if response.status_code == 200:
175+
# Request was successful
176+
data = response.text
177+
logger.debug(f"Subscription Worker got search response data: {data}")
178+
return data
179+
else:
180+
# Request failed
181+
logger.warning(f"Subscription Worker getting search concept using URL {url} failed with status code: {response.status_code}")
182+
183+
def get_producer_granule_id(self, metadata):
184+
"""
185+
Get the granule producer id from the metadata and create a string for the
186+
subscription notification message.
187+
"""
188+
identifiers = metadata["DataGranule"]['Identifiers'] #concept.get('DataGranule', {}).get('Identifiers', [])
189+
pgi = None
190+
for identifier in identifiers:
191+
if identifier.get('IdentifierType') == 'ProducerGranuleId':
192+
pgi = identifier.get('Identifier')
193+
break
194+
if pgi:
195+
return pgi
196+
else:
197+
return None
198+
199+
def create_notification_message_body(self, result_dict):
200+
"""Create the notification. Returns either a notification message json string or None.
201+
The notification should look like
202+
"{\"concept-id\": \"G1200484356-PROV\", \"granule-ur\": \"SWOT_L2_HR_PIXC_578_020_221L_20230710T223456_20230710T223506_PIA1_01\", \"producer-granule-id\": \"SWOT_L2_HR_PIXC_578_020_221L_20230710T223456_20230710T223506_PIA1_01.nc\", \"location\": \"http://localhost:3003/concepts/G1200484356-PROV/39\"}"
203+
"""
204+
print(f"Result dict: {result_dict}")
205+
concept_id = result_dict["concept_id"]
206+
revision_id = result_dict["revision_id"]
207+
metadata = result_dict["metadata"]
208+
209+
pgi = self.get_producer_granule_id(metadata)
210+
211+
search_root_url = self.get_public_search_url()
212+
location = f"{search_root_url}concepts/{concept_id}/{revision_id}"
213+
granule_ur = metadata["GranuleUR"] #concept.get('metadata', {}).get('GranuleUR', '')
214+
215+
message = {
216+
"concept-id": concept_id,
217+
"granule-ur": granule_ur,
218+
"location": location
219+
}
220+
221+
if pgi:
222+
message.update({"producer-granule-id": pgi})
223+
print(f"Message: {message}")
224+
return message
225+
226+
def process_result(self, search_result):
227+
"""The search results contain XML, but the metadata is in JSON.
228+
Parse out the XML to get the concept-id and revision-id and store it
229+
in a map. Store the JSON metadata also in the map and return the map."""
230+
# Split the input string into XML and JSON parts
231+
xml_part, json_part = search_result.split('</result>')
232+
233+
# Parse the XML
234+
root = ET.fromstring(xml_part + '</result>')
235+
concept_id = root.find('concept-id').text
236+
revision_id = root.find('revision-id').text
237+
238+
# Parse the JSON into dict.
239+
json_data = json.loads(json_part)
240+
241+
# Create a dictionary with the extracted information
242+
result = {
243+
'concept_id': concept_id,
244+
'revision_id': revision_id,
245+
'metadata': json_data
246+
}
247+
return result
248+
249+
def process_message(self, message):
250+
"""This function gets the Message value from
251+
a SQS message that contains: "{\"concept-id\": \"G1200484356-ERICH_PROV\"}"
252+
Get the concept-id value, search for the concepts metadata, and
253+
return a message string such as:
254+
"{\"concept-id\": \"G1200484356-ERICH_PROV\",
255+
\"granule-ur\": \"SWOT_L2_HR_PIXC_578_020_221L_20230710T223456_20230710T223506_PIA1_01\",
256+
\"producer-granule-id\": \"SWOT_L2_HR_PIXC_578_020_221L_20230710T223456_20230710T223506_PIA1_01.nc\",
257+
\"location\": \"http://localhost:3003/concepts/G1200484356-ERICH_PROV/39\"}"
258+
"""
259+
# Get the granule concept-id from the message.
260+
print(f"Search process_message The message type is: {type(message)}")
261+
message_dict = json.loads(message)
262+
print(f"Search process_message The message_dict type is: {type(message_dict)}")
263+
concept_id = message_dict["concept-id"]
264+
print(f"Search process_message concept_id: {concept_id}")
265+
# Get the concept from search
266+
result = self.get_concept(concept_id)
267+
print(f"Search process_message result: {result}")
268+
269+
# convert the result to a dict containing the concept-id, revision-id, and metadata.
270+
result_dict = self.process_result(result)
271+
print(f"Search process_message result_dict: {result_dict}")
272+
273+
# create and return json dict containing the concept-id, granule-ur, producer-granule-id, and the granule location.
274+
new_message = self.create_notification_message_body(result_dict)
275+
print (f"new_message: {new_message}")
276+
return new_message

subscription/src/sns.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,17 @@ def create_topic(self, topic_name):
2525
def publish_message(topic, message):
2626
""" Publishes a message with attributes to the CMR external topic. Subscriptions
2727
can be filtered based on the message attributes. """
28+
print(f"publish_message message type: {type(message)}")
2829
message_body_str = message["Body"]
30+
print(f"publish_message body_str type: {type(message_body_str)}")
2931
message_body = json.loads(message_body_str)
32+
print(f"publish_message body type: {type(message_body)}")
3033
message_subject = message_body["Subject"]
34+
print(f"publish_message subject type: {type(message_subject)}")
3135
message_attributes = message_body["MessageAttributes"]
36+
print(f"publish_message attributes type: {type(message_attributes)}")
3237
message_message = message_body["Message"]
38+
print(f"publish_message messages type: {type(message_message)}")
3339
try:
3440
if message_attributes:
3541
att_dict = {}

subscription/src/subscription_worker.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from sns import Sns
77
from botocore.exceptions import ClientError
88
from access_control import AccessControl
9+
from search import Search
910
from logger import logger
1011

1112
AWS_REGION = os.getenv("AWS_REGION")
@@ -37,26 +38,46 @@ def delete_messages(sqs_client, queue_url, messages):
3738
receipt_handle = message['ReceiptHandle']
3839
delete_message(sqs_client=sqs_client, queue_url=queue_url, receipt_handle=receipt_handle)
3940

40-
def process_messages(sns_client, topic, messages, access_control):
41+
#def ensure_json_object(python_dict):
42+
# Serialize to JSON string
43+
# json_string = json.dumps(python_dict)
44+
45+
# Deserialize back to Python object
46+
# json_object = json.loads(json_string)
47+
48+
# return json_object
49+
50+
def process_messages(sns_client, topic, messages, access_control, search):
4151
""" Processes a list of messages that was received from a queue. Check to see if ACLs pass for the granule.
4252
If the checks pass then send the notification. """
4353

4454
for message in messages.get("Messages", []):
4555
try:
56+
print(f"The message type is: {type(message)}")
4657
message_body = json.loads(message["Body"])
58+
59+
print(f"The message_body type is: {type(message_body)}")
4760
message_attributes = message_body["MessageAttributes"]
4861
logger.debug(f"Subscription worker: Received message including attributes: {message_body}")
4962

63+
print(f"The message_attributes type is: {type(message_attributes)}")
5064
subscriber = message_attributes['subscriber']['Value']
5165
collection_concept_id = message_attributes['collection-concept-id']['Value']
5266

5367
if(access_control.has_read_permission(subscriber, collection_concept_id)):
5468
logger.debug(f"Subscription worker: {subscriber} has permission to receive granule notifications for {collection_concept_id}")
69+
print(f"The message_body['Message'] type is: {type(message_body['Message'])}")
70+
message_msg = search.process_message(message_body['Message'])
71+
print(f"The message_msg type is: {type(message_msg)}")
72+
message_body['Message'] = json.loads(message_msg)
73+
print(f"The message_body type is: {type(message_body)}")
74+
message['Body'] = message_body
75+
print(f"The message type is: {type(message)}")
5576
sns_client.publish_message(topic, message)
5677
else:
5778
logger.info(f"Subscription worker: {subscriber} does not have read permission to receive notifications for {collection_concept_id}.")
5879
except Exception as e:
59-
logger.error(f"Subscription worker: There is a problem process messages {message}. {e}")
80+
logger.error(f"Subscription worker: There is a problem in process messages {message}. {e}")
6081

6182

6283
def poll_queue(running):
@@ -69,18 +90,19 @@ def poll_queue(running):
6990
topic = sns_client.create_topic(SNS_NAME)
7091

7192
access_control = AccessControl()
93+
search = Search()
7294
while running.value:
7395
try:
7496
# Poll the SQS
7597
messages = receive_message(sqs_client=sqs_client, queue_url=QUEUE_URL)
7698

7799
if messages:
78-
process_messages(sns_client=sns_client, topic=topic, messages=messages, access_control=access_control)
100+
process_messages(sns_client=sns_client, topic=topic, messages=messages, access_control=access_control, search=search)
79101
delete_messages(sqs_client=sqs_client, queue_url=QUEUE_URL, messages=messages)
80102

81103
dl_messages = receive_message(sqs_client=sqs_client, queue_url=DEAD_LETTER_QUEUE_URL)
82104
if dl_messages:
83-
process_messages(sns_client=sns_client, topic=topic, messages=dl_messages, access_control=access_control)
105+
process_messages(sns_client=sns_client, topic=topic, messages=dl_messages, access_control=access_control, search=search)
84106
delete_messages(sqs_client=sqs_client, queue_url=DEAD_LETTER_QUEUE_URL, messages=dl_messages)
85107

86108
except Exception as e:

0 commit comments

Comments
 (0)