Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ repos:
hooks:
- id: black
args: [--line-length=145]
language_version: python3.10
language_version: python3.13

- repo: https://github.com/pycqa/flake8
rev: 7.0.0
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package-mode = false
boto3 = "^1.35.49"
confluent-kafka = "^2.6.0"
globus-sdk = "^3.46.0"
jsonpatch = "^1.33"
python = "^3.10"
python-dotenv = "^1.0.1"

Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ charset-normalizer==3.4.0
click==8.1.8
confluent-kafka==2.6.0
cryptography==43.0.3
dotenv==0.9.9
globus-sdk==3.46.0
idna==3.10
isort==6.0.0
jmespath==1.0.1
jsonpatch==1.33.0
jsonpointer==3.0.0
mypy-extensions==1.0.0
packaging==24.2
pathspec==0.12.1
Expand Down
10 changes: 3 additions & 7 deletions src/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ def process_messages(self, messages):
data = json.loads(msg.value())
messages_data.append(data)
except json.JSONDecodeError as e:
logging.error(
f"Data deserialization error at offset {msg.offset()}: {e}."
)
logging.error(f"Data deserialization error at offset {msg.offset()}: {e}.")
return False
return messages_data

Expand All @@ -39,16 +37,14 @@ def start(self):
logging.info(f"Consumed {len(messages)} messages")
first_msg = messages[0]
offset = 0 if not first_msg.offset() else first_msg.offset()
self.seek_partition = TopicPartition(
first_msg.topic(), first_msg.partition(), offset
)
self.seek_partition = TopicPartition(first_msg.topic(), first_msg.partition(), offset)

messages_data = self.process_messages(messages)
if not messages_data:
self.consumer.seek(self.seek_partition)
continue

if not self.message_processor.ingest(messages_data):
if not self.message_processor.process_messages(messages_data):
self.consumer.seek(self.seek_partition)
continue

Expand Down
110 changes: 98 additions & 12 deletions src/globus.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
import time
import jsonpatch

from globus_sdk import (AccessTokenAuthorizer, ConfidentialAppAuthClient,
SearchClient)
from globus_sdk import AccessTokenAuthorizer, ConfidentialAppAuthClient, SearchClient
from globus_sdk.scopes import SearchScopes


class ConsumerSearchClient:
def __init__(self, credentials, search_index):
def __init__(self, credentials, search_index, error_producer):
confidential_client = ConfidentialAppAuthClient(
client_id=credentials.get("client_id"),
client_secret=credentials.get("client_secret"),
Expand All @@ -19,10 +19,11 @@ def __init__(self, credentials, search_index):
authorizer = AccessTokenAuthorizer(search_tokens.get("access_token"))
self.search_client = SearchClient(authorizer=authorizer)
self.esgf_index = search_index
self.error_producer = error_producer

def convert_assets(self, item):
def convert_assets(self, assets):
converted_assets = []
for key, value in item.get("assets").items():
for key, value in assets.items():
converted_assets.append({"name": key} | value)
return converted_assets

Expand All @@ -45,12 +46,7 @@ def ingest(self, messages_data):
gmeta = []
for data in messages_data:
item = data.get("data").get("payload").get("item")
assets = item.get("assets")
assets_list = []
for name, asset in assets.items():
asset["name"] = name
assets_list.append(asset)
item["assets"] = assets_list
item["assets"] = item.get("assets")
gmeta.append(self.gmetaentry(item))

gmetalist = {"ingest_type": "GMetaList", "ingest_data": {"gmeta": gmeta}}
Expand All @@ -70,5 +66,95 @@ def ingest(self, messages_data):
time.sleep(1)
return True

def post(self, message_data):
item = message_data.get("data").get("payload").get("item")
globus_response = self.search_client.get_subject(self.esgf_index, item.get("id"))
if globus_response.data:
logging.info(f"Item with ID {item.get('id')} already exists in the index.")
self.error_producer.produce(
topic="esgf-local.errors",
key=item.get("id"),
value=f"Item with ID {item.get('id')} already exists in the index.",
)
return None
item["assets"] = self.convert_assets(item.get("assets"))
gmeta_entry = self.gmetaentry(item)
return gmeta_entry

def json_patch(self, message_data):
payload = message_data.get("data").get("payload")
item_id = payload.get("item_id")
globus_response = self.search_client.get_subject(self.esgf_index, item_id)
if not globus_response.data:
logging.info(f"Item with ID {item_id} does not exist in the index.")
self.error_producer.produce(
topic="esgf-local.errors",
key=item_id,
value=f"Item with ID {item_id} does not exist in the index.",
)
return None
gmeta_entry = jsonpatch.apply_patch(
globus_response.data.get("content"), payload.get("patch")
)
return gmeta_entry

def delete(self, subject):
self.search_client.delete_subject(self.esgf_index, subject)
globus_response = self.search_client.get_subject(self.esgf_index, subject)
if globus_response.data:
self.search_client.delete_subject(self.esgf_index, subject)
return True
logging.info(f"Item with ID {subject} does not exist in the index.")
self.error_producer.produce(
topic="esgf-local.errors",
key=subject,
value=f"Item with ID {subject} does not exist in the index.",
)
return None

def process_message(self, message_data):
try:
payload = message_data.get("data").get("payload")
method = payload.get("method")
if method == "POST":
return self.post(message_data)
if method == "PUT":
return self.put(message_data)
if method == "JSON_PATCH" or method == "PATCH":
return self.json_patch(message_data)
if method == "MERGE_PATCH":
return self.merge_patch(message_data)
return None
except Exception as e:
logging.error(f"Error processing message data: {e}")
self.error_producer.produce(
topic="esgf-local.errors",
key=message_data.get("data").get("payload").get("item").get("id"),
value=str(e),
)
return None

def process_messages(self, messages_data):
gmeta = []
for message_data in messages_data:
entry = self.process_message(message_data)
if entry:
gmeta.append(entry)
if not gmeta:
return False

gmetalist = {"ingest_type": "GMetaList", "ingest_data": {"gmeta": gmeta}}

r = self.search_client.ingest(self.esgf_index, gmetalist)
task_id = r.get("task_id")

while True:
r = self.search_client.get_task(task_id)
state = r.get("state")
if state == "SUCCESS":
return True
if state == "FAILED":
logging.error(f"Ingestion task {task_id} failed")
logging.error(r.text)
return False
time.sleep(1)
return True
6 changes: 5 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@

from consumer import KafkaConsumerService
from globus import ConsumerSearchClient
from producer import KafkaProducer
from settings.consumer import (event_stream, globus_search,
globus_search_client_credentials)
from settings.producer import error_event_stream

logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
)


if __name__ == "__main__":
error_producer = KafkaProducer(config=error_event_stream.get("config"))

message_processor = ConsumerSearchClient(
globus_search_client_credentials, globus_search.get("index")
globus_search_client_credentials, globus_search.get("index"), error_producer
)

consumer_service = KafkaConsumerService(
Expand Down
44 changes: 44 additions & 0 deletions src/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging
from abc import ABC, abstractmethod

import attr
from confluent_kafka import Producer

# Setup logger
logger = logging.getLogger(__name__)


@attr.s
class BaseProducer(ABC):
@abstractmethod
def produce(self, topic, message):
pass


class StdoutProducer(BaseProducer):
def produce(self, topic, message):
logger.info(f"message: {message}")


class KafkaProducer(BaseProducer):
def __init__(self, config):
self.producer = Producer(config)
logger.info("KafkaProducer initialized")

def produce(self, topic, key, value):
delivery_reports = []

def delivery_report(err, msg):
if err is not None:
logger.error(f"Delivery failed for message {msg.key()}: {err}")
else:
logger.info(
f"Message {msg.key()} successfully delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
)
delivery_reports.append((err, msg))

self.producer.produce(
topic=topic, key=key, value=value, callback=delivery_report
)
self.producer.flush()
return delivery_reports
6 changes: 2 additions & 4 deletions src/settings/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"enable.auto.commit": False,
"group.id": "westconsumer",
},
"topics": ["esgf-local"],
"topics": ["esgf-local.transactions"],
}
else:
event_stream = {
Expand All @@ -45,6 +45,4 @@
}

# ESGF2 Globus Search
globus_search = {
"index": os.environ.get("GLOBUS_SEARCH_INDEX")
}
globus_search = {"index": os.environ.get("GLOBUS_SEARCH_INDEX")}
39 changes: 39 additions & 0 deletions src/settings/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

import logging
import os
import socket

from dotenv import load_dotenv

from src.utils import load_access_control_policy

# Load the .env file
load_dotenv()

# Suppress some kafka message streams
logger = logging.getLogger("kafka")
logger.setLevel(logging.WARN)

run_environment = os.environ.get("RUN_ENVIRONMENT", None)


# Kafka connection details
if run_environment == "local":
error_event_stream = {
"config": {
"bootstrap.servers": "host.docker.internal:9092",
"client.id": socket.gethostname(),
},
"topic": "esgf-local.errors",
}
else:
error_event_stream = {
"config": {
"bootstrap.servers": os.environ.get("BOOTSTRAP_SERVERS"),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": os.environ.get("CONFLUENT_CLOUD_USERNAME"),
"sasl.password": os.environ.get("CONFLUENT_CLOUD_PASSWORD"),
},
"topic": os.environ.get("TOPIC", "esgf2.data-challenges.01.errors"),
}
Loading