Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pre-commit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
python-version: '3.13'
- run: python -m pip install pre-commit
shell: bash
- run: python -m pip freeze --local
Expand Down
8 changes: 2 additions & 6 deletions src/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ def process_messages(self, messages):
data = json.loads(msg.value())
messages_data.append(data)
except json.JSONDecodeError as e:
logging.error(
f"Data deserialization error at offset {msg.offset()}: {e}."
)
logging.error(f"Data deserialization error at offset {msg.offset()}: {e}.")
return False
return messages_data

Expand All @@ -39,9 +37,7 @@ def start(self):
logging.info(f"Consumed {len(messages)} messages")
first_msg = messages[0]
offset = 0 if not first_msg.offset() else first_msg.offset()
self.seek_partition = TopicPartition(
first_msg.topic(), first_msg.partition(), offset
)
self.seek_partition = TopicPartition(first_msg.topic(), first_msg.partition(), offset)

messages_data = self.process_messages(messages)
if not messages_data:
Expand Down
47 changes: 31 additions & 16 deletions src/globus.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging
import time

import jsonpatch
from globus_sdk import (ClientCredentialsAuthorizer, ConfidentialAppAuthClient,
SearchClient)

from globus_sdk import (
ClientCredentialsAuthorizer,
ConfidentialAppAuthClient,
SearchClient,
)
from globus_sdk.scopes import SearchScopes
from globus_sdk.services.search.errors import SearchAPIError

Expand All @@ -22,11 +25,25 @@ def __init__(self, credentials, search_index, error_producer):
self.esgf_index = search_index
self.error_producer = error_producer

def convert_assets(self, assets):
converted_assets = []
def normalize_assets(self, assets):
normalized_assets = []
for key, value in assets.items():
converted_assets.append({"name": key} | value)
return converted_assets
normalized_assets.append({"name": key} | value)
for asset in normalized_assets:
if "alternate" in asset and asset.get("alternate"):
asset["alternate"] = self.normalize_assets(asset["alternate"])
return normalized_assets

def denormalize_assets(self, assets):
denormalized_assets = {}
for asset in assets:
name = asset.pop("name")
if "alternate" in asset:
asset["alternate"] = self.denormalize_assets(asset["alternate"])
else:
asset["alternate"] = {}
denormalized_assets[name] = asset
Copy link

Copilot AI Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using pop() modifies the original asset object, which could cause issues if the same asset is processed multiple times. Consider using asset.get("name") and creating a copy of the asset without the "name" key instead.

Suggested change
denormalized_assets[name] = asset
name = asset.get("name")
asset_copy = {k: v for k, v in asset.items() if k != "name"}
if "alternate" in asset_copy:
asset_copy["alternate"] = self.denormalize_assets(asset_copy["alternate"])
else:
asset_copy["alternate"] = {}
denormalized_assets[name] = asset_copy

Copilot uses AI. Check for mistakes.
return denormalized_assets

def gmetaentry(self, item):
return {
Expand Down Expand Up @@ -70,12 +87,10 @@ def ingest(self, messages_data):
def post(self, message_data):
item = message_data.get("data").get("payload").get("item")
try:
globus_response = self.search_client.get_subject(
self.esgf_index, item.get("id")
)
globus_response = self.search_client.get_subject(self.esgf_index, item.get("id"))
except SearchAPIError as e:
if e.http_status == 404:
item["assets"] = self.convert_assets(item.get("assets"))
item["assets"] = self.normalize_assets(item.get("assets"))
gmeta_entry = self.gmetaentry(item)
return gmeta_entry

Expand All @@ -102,9 +117,11 @@ def json_patch(self, message_data):
value=f"Item with ID {item_id} does not exist in the index.",
)
return None
gmeta_entry = jsonpatch.apply_patch(
globus_response.data.get("content"), payload.get("patch")
)
item = globus_response.data.get("entries")[0].get("content")
item["assets"] = self.denormalize_assets(item.get("assets"))
patched_item = jsonpatch.apply_patch(item, payload.get("patch").get("operations"))
patched_item["assets"] = self.normalize_assets(patched_item.get("assets"))
gmeta_entry = self.gmetaentry(patched_item)
return gmeta_entry

def delete(self, subject):
Expand All @@ -131,8 +148,6 @@ def process_message(self, message_data):
return self.put(message_data)
if method == "JSON_PATCH" or method == "PATCH":
return self.json_patch(message_data)
if method == "MERGE_PATCH":
return self.merge_patch(message_data)
return None
except Exception as e:
logging.error(f"Error processing message data: {e}")
Expand Down
11 changes: 3 additions & 8 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@
from consumer import KafkaConsumerService
from globus import ConsumerSearchClient
from producer import KafkaProducer
from settings.consumer import (event_stream, globus_search,
globus_search_client_credentials)
from settings.consumer import event_stream, globus_search, globus_search_client_credentials
from settings.producer import error_event_stream

logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
)
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO)


if __name__ == "__main__":
error_producer = KafkaProducer(config=error_event_stream.get("config"))

message_processor = ConsumerSearchClient(
globus_search_client_credentials, globus_search.get("index"), error_producer
)
message_processor = ConsumerSearchClient(globus_search_client_credentials, globus_search.get("index"), error_producer)

consumer_service = KafkaConsumerService(
event_stream.get("config"),
Expand Down
8 changes: 2 additions & 6 deletions src/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ def delivery_report(err, msg):
if err is not None:
logger.error(f"Delivery failed for message {msg.key()}: {err}")
else:
logger.info(
f"Message {msg.key()} successfully delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
)
logger.info(f"Message {msg.key()} successfully delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
delivery_reports.append((err, msg))

self.producer.produce(
topic=topic, key=key, value=value, callback=delivery_report
)
self.producer.produce(topic=topic, key=key, value=value, callback=delivery_report)
self.producer.flush()
return delivery_reports
2 changes: 0 additions & 2 deletions src/settings/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

from dotenv import load_dotenv

from src.utils import load_access_control_policy

# Load the .env file
load_dotenv()

Expand Down