Skip to content

Commit 15682de

Browse files
Merge branch 'main' of github.com:esgf2-us/west-consumer
2 parents 1f2e149 + 25bb891 commit 15682de

File tree

6 files changed

+39
-39
lines changed

6 files changed

+39
-39
lines changed

.github/workflows/pre-commit.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
- uses: actions/checkout@v3
1313
- uses: actions/setup-python@v4
1414
with:
15-
python-version: '3.10'
15+
python-version: '3.13'
1616
- run: python -m pip install pre-commit
1717
shell: bash
1818
- run: python -m pip freeze --local

src/consumer.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ def process_messages(self, messages):
2121
data = json.loads(msg.value())
2222
messages_data.append(data)
2323
except json.JSONDecodeError as e:
24-
logging.error(
25-
f"Data deserialization error at offset {msg.offset()}: {e}."
26-
)
24+
logging.error(f"Data deserialization error at offset {msg.offset()}: {e}.")
2725
return False
2826
return messages_data
2927

@@ -39,9 +37,7 @@ def start(self):
3937
logging.info(f"Consumed {len(messages)} messages")
4038
first_msg = messages[0]
4139
offset = 0 if not first_msg.offset() else first_msg.offset()
42-
self.seek_partition = TopicPartition(
43-
first_msg.topic(), first_msg.partition(), offset
44-
)
40+
self.seek_partition = TopicPartition(first_msg.topic(), first_msg.partition(), offset)
4541

4642
messages_data = self.process_messages(messages)
4743
if not messages_data:

src/globus.py

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import logging
22
import time
3-
43
import jsonpatch
5-
from globus_sdk import (ClientCredentialsAuthorizer, ConfidentialAppAuthClient,
6-
SearchClient)
4+
5+
from globus_sdk import (
6+
ClientCredentialsAuthorizer,
7+
ConfidentialAppAuthClient,
8+
SearchClient,
9+
)
710
from globus_sdk.scopes import SearchScopes
811
from globus_sdk.services.search.errors import SearchAPIError
912

@@ -22,11 +25,25 @@ def __init__(self, credentials, search_index, error_producer):
2225
self.esgf_index = search_index
2326
self.error_producer = error_producer
2427

25-
def convert_assets(self, assets):
26-
converted_assets = []
28+
def normalize_assets(self, assets):
29+
normalized_assets = []
2730
for key, value in assets.items():
28-
converted_assets.append({"name": key} | value)
29-
return converted_assets
31+
normalized_assets.append({"name": key} | value)
32+
for asset in normalized_assets:
33+
if "alternate" in asset and asset.get("alternate"):
34+
asset["alternate"] = self.normalize_assets(asset["alternate"])
35+
return normalized_assets
36+
37+
def denormalize_assets(self, assets):
38+
denormalized_assets = {}
39+
for asset in assets:
40+
name = asset.pop("name")
41+
if "alternate" in asset:
42+
asset["alternate"] = self.denormalize_assets(asset["alternate"])
43+
else:
44+
asset["alternate"] = {}
45+
denormalized_assets[name] = asset
46+
return denormalized_assets
3047

3148
def gmetaentry(self, item):
3249
return {
@@ -70,12 +87,10 @@ def ingest(self, messages_data):
7087
def post(self, message_data):
7188
item = message_data.get("data").get("payload").get("item")
7289
try:
73-
globus_response = self.search_client.get_subject(
74-
self.esgf_index, item.get("id")
75-
)
90+
globus_response = self.search_client.get_subject(self.esgf_index, item.get("id"))
7691
except SearchAPIError as e:
7792
if e.http_status == 404:
78-
item["assets"] = self.convert_assets(item.get("assets"))
93+
item["assets"] = self.normalize_assets(item.get("assets"))
7994
gmeta_entry = self.gmetaentry(item)
8095
return gmeta_entry
8196

@@ -102,9 +117,11 @@ def json_patch(self, message_data):
102117
value=f"Item with ID {item_id} does not exist in the index.",
103118
)
104119
return None
105-
gmeta_entry = jsonpatch.apply_patch(
106-
globus_response.data.get("content"), payload.get("patch")
107-
)
120+
item = globus_response.data.get("entries")[0].get("content")
121+
item["assets"] = self.denormalize_assets(item.get("assets"))
122+
patched_item = jsonpatch.apply_patch(item, payload.get("patch").get("operations"))
123+
patched_item["assets"] = self.normalize_assets(patched_item.get("assets"))
124+
gmeta_entry = self.gmetaentry(patched_item)
108125
return gmeta_entry
109126

110127
def delete(self, subject):
@@ -131,8 +148,6 @@ def process_message(self, message_data):
131148
return self.put(message_data)
132149
if method == "JSON_PATCH" or method == "PATCH":
133150
return self.json_patch(message_data)
134-
if method == "MERGE_PATCH":
135-
return self.merge_patch(message_data)
136151
return None
137152
except Exception as e:
138153
logging.error(f"Error processing message data: {e}")

src/main.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,16 @@
33
from consumer import KafkaConsumerService
44
from globus import ConsumerSearchClient
55
from producer import KafkaProducer
6-
from settings.consumer import (event_stream, globus_search,
7-
globus_search_client_credentials)
6+
from settings.consumer import event_stream, globus_search, globus_search_client_credentials
87
from settings.producer import error_event_stream
98

10-
logging.basicConfig(
11-
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
12-
)
9+
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO)
1310

1411

1512
if __name__ == "__main__":
1613
error_producer = KafkaProducer(config=error_event_stream.get("config"))
1714

18-
message_processor = ConsumerSearchClient(
19-
globus_search_client_credentials, globus_search.get("index"), error_producer
20-
)
15+
message_processor = ConsumerSearchClient(globus_search_client_credentials, globus_search.get("index"), error_producer)
2116

2217
consumer_service = KafkaConsumerService(
2318
event_stream.get("config"),

src/producer.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,9 @@ def delivery_report(err, msg):
3232
if err is not None:
3333
logger.error(f"Delivery failed for message {msg.key()}: {err}")
3434
else:
35-
logger.info(
36-
f"Message {msg.key()} successfully delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
37-
)
35+
logger.info(f"Message {msg.key()} successfully delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
3836
delivery_reports.append((err, msg))
3937

40-
self.producer.produce(
41-
topic=topic, key=key, value=value, callback=delivery_report
42-
)
38+
self.producer.produce(topic=topic, key=key, value=value, callback=delivery_report)
4339
self.producer.flush()
4440
return delivery_reports

src/settings/producer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
from dotenv import load_dotenv
66

7-
from src.utils import load_access_control_policy
8-
97
# Load the .env file
108
load_dotenv()
119

0 commit comments

Comments
 (0)