Skip to content

Commit 822d874

Browse files
Merge pull request #5 from esgf2-us/lukasz/dev
Error event stream and JSON-PATCH HTTP
2 parents 737f08c + 94633ec commit 822d874

File tree

9 files changed

+196
-25
lines changed

9 files changed

+196
-25
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ repos:
44
hooks:
55
- id: black
66
args: [--line-length=145]
7-
language_version: python3.10
7+
language_version: python3.13
88

99
- repo: https://github.com/pycqa/flake8
1010
rev: 7.0.0

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package-mode = false
1313
boto3 = "^1.35.49"
1414
confluent-kafka = "^2.6.0"
1515
globus-sdk = "^3.46.0"
16+
jsonpatch = "^1.33"
1617
python = "^3.10"
1718
python-dotenv = "^1.0.1"
1819

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ charset-normalizer==3.4.0
77
click==8.1.8
88
confluent-kafka==2.6.0
99
cryptography==43.0.3
10+
dotenv==0.9.9
1011
globus-sdk==3.46.0
1112
idna==3.10
1213
isort==6.0.0
1314
jmespath==1.0.1
15+
jsonpatch==1.33.0
16+
jsonpointer==3.0.0
1417
mypy-extensions==1.0.0
1518
packaging==24.2
1619
pathspec==0.12.1

src/consumer.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ def process_messages(self, messages):
2121
data = json.loads(msg.value())
2222
messages_data.append(data)
2323
except json.JSONDecodeError as e:
24-
logging.error(
25-
f"Data deserialization error at offset {msg.offset()}: {e}."
26-
)
24+
logging.error(f"Data deserialization error at offset {msg.offset()}: {e}.")
2725
return False
2826
return messages_data
2927

@@ -39,16 +37,14 @@ def start(self):
3937
logging.info(f"Consumed {len(messages)} messages")
4038
first_msg = messages[0]
4139
offset = 0 if not first_msg.offset() else first_msg.offset()
42-
self.seek_partition = TopicPartition(
43-
first_msg.topic(), first_msg.partition(), offset
44-
)
40+
self.seek_partition = TopicPartition(first_msg.topic(), first_msg.partition(), offset)
4541

4642
messages_data = self.process_messages(messages)
4743
if not messages_data:
4844
self.consumer.seek(self.seek_partition)
4945
continue
5046

51-
if not self.message_processor.ingest(messages_data):
47+
if not self.message_processor.process_messages(messages_data):
5248
self.consumer.seek(self.seek_partition)
5349
continue
5450

src/globus.py

Lines changed: 98 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
22
import time
3+
import jsonpatch
34

4-
from globus_sdk import (AccessTokenAuthorizer, ConfidentialAppAuthClient,
5-
SearchClient)
5+
from globus_sdk import AccessTokenAuthorizer, ConfidentialAppAuthClient, SearchClient
66
from globus_sdk.scopes import SearchScopes
77

88

99
class ConsumerSearchClient:
10-
def __init__(self, credentials, search_index):
10+
def __init__(self, credentials, search_index, error_producer):
1111
confidential_client = ConfidentialAppAuthClient(
1212
client_id=credentials.get("client_id"),
1313
client_secret=credentials.get("client_secret"),
@@ -19,10 +19,11 @@ def __init__(self, credentials, search_index):
1919
authorizer = AccessTokenAuthorizer(search_tokens.get("access_token"))
2020
self.search_client = SearchClient(authorizer=authorizer)
2121
self.esgf_index = search_index
22+
self.error_producer = error_producer
2223

23-
def convert_assets(self, item):
24+
def convert_assets(self, assets):
2425
converted_assets = []
25-
for key, value in item.get("assets").items():
26+
for key, value in assets.items():
2627
converted_assets.append({"name": key} | value)
2728
return converted_assets
2829

@@ -45,12 +46,7 @@ def ingest(self, messages_data):
4546
gmeta = []
4647
for data in messages_data:
4748
item = data.get("data").get("payload").get("item")
48-
assets = item.get("assets")
49-
assets_list = []
50-
for name, asset in assets.items():
51-
asset["name"] = name
52-
assets_list.append(asset)
53-
item["assets"] = assets_list
49+
item["assets"] = item.get("assets")
5450
gmeta.append(self.gmetaentry(item))
5551

5652
gmetalist = {"ingest_type": "GMetaList", "ingest_data": {"gmeta": gmeta}}
@@ -70,5 +66,95 @@ def ingest(self, messages_data):
7066
time.sleep(1)
7167
return True
7268

69+
def post(self, message_data):
70+
item = message_data.get("data").get("payload").get("item")
71+
globus_response = self.search_client.get_subject(self.esgf_index, item.get("id"))
72+
if globus_response.data:
73+
logging.info(f"Item with ID {item.get('id')} already exists in the index.")
74+
self.error_producer.produce(
75+
topic="esgf-local.errors",
76+
key=item.get("id"),
77+
value=f"Item with ID {item.get('id')} already exists in the index.",
78+
)
79+
return None
80+
item["assets"] = self.convert_assets(item.get("assets"))
81+
gmeta_entry = self.gmetaentry(item)
82+
return gmeta_entry
83+
84+
def json_patch(self, message_data):
85+
payload = message_data.get("data").get("payload")
86+
item_id = payload.get("item_id")
87+
globus_response = self.search_client.get_subject(self.esgf_index, item_id)
88+
if not globus_response.data:
89+
logging.info(f"Item with ID {item_id} does not exist in the index.")
90+
self.error_producer.produce(
91+
topic="esgf-local.errors",
92+
key=item_id,
93+
value=f"Item with ID {item_id} does not exist in the index.",
94+
)
95+
return None
96+
gmeta_entry = jsonpatch.apply_patch(
97+
globus_response.data.get("content"), payload.get("patch")
98+
)
99+
return gmeta_entry
100+
73101
def delete(self, subject):
74-
self.search_client.delete_subject(self.esgf_index, subject)
102+
globus_response = self.search_client.get_subject(self.esgf_index, subject)
103+
if globus_response.data:
104+
self.search_client.delete_subject(self.esgf_index, subject)
105+
return True
106+
logging.info(f"Item with ID {subject} does not exist in the index.")
107+
self.error_producer.produce(
108+
topic="esgf-local.errors",
109+
key=subject,
110+
value=f"Item with ID {subject} does not exist in the index.",
111+
)
112+
return None
113+
114+
def process_message(self, message_data):
115+
try:
116+
payload = message_data.get("data").get("payload")
117+
method = payload.get("method")
118+
if method == "POST":
119+
return self.post(message_data)
120+
if method == "PUT":
121+
return self.put(message_data)
122+
if method == "JSON_PATCH" or method == "PATCH":
123+
return self.json_patch(message_data)
124+
if method == "MERGE_PATCH":
125+
return self.merge_patch(message_data)
126+
return None
127+
except Exception as e:
128+
logging.error(f"Error processing message data: {e}")
129+
self.error_producer.produce(
130+
topic="esgf-local.errors",
131+
key=message_data.get("data").get("payload").get("item").get("id"),
132+
value=str(e),
133+
)
134+
return None
135+
136+
def process_messages(self, messages_data):
137+
gmeta = []
138+
for message_data in messages_data:
139+
entry = self.process_message(message_data)
140+
if entry:
141+
gmeta.append(entry)
142+
if not gmeta:
143+
return False
144+
145+
gmetalist = {"ingest_type": "GMetaList", "ingest_data": {"gmeta": gmeta}}
146+
147+
r = self.search_client.ingest(self.esgf_index, gmetalist)
148+
task_id = r.get("task_id")
149+
150+
while True:
151+
r = self.search_client.get_task(task_id)
152+
state = r.get("state")
153+
if state == "SUCCESS":
154+
return True
155+
if state == "FAILED":
156+
logging.error(f"Ingestion task {task_id} failed")
157+
logging.error(r.text)
158+
return False
159+
time.sleep(1)
160+
return True

src/main.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,21 @@
22

33
from consumer import KafkaConsumerService
44
from globus import ConsumerSearchClient
5+
from producer import KafkaProducer
56
from settings.consumer import (event_stream, globus_search,
67
globus_search_client_credentials)
8+
from settings.producer import error_event_stream
79

810
logging.basicConfig(
911
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
1012
)
1113

1214

1315
if __name__ == "__main__":
16+
error_producer = KafkaProducer(config=error_event_stream.get("config"))
17+
1418
message_processor = ConsumerSearchClient(
15-
globus_search_client_credentials, globus_search.get("index")
19+
globus_search_client_credentials, globus_search.get("index"), error_producer
1620
)
1721

1822
consumer_service = KafkaConsumerService(

src/producer.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import logging
2+
from abc import ABC, abstractmethod
3+
4+
import attr
5+
from confluent_kafka import Producer
6+
7+
# Setup logger
8+
logger = logging.getLogger(__name__)
9+
10+
11+
@attr.s
12+
class BaseProducer(ABC):
13+
@abstractmethod
14+
def produce(self, topic, message):
15+
pass
16+
17+
18+
class StdoutProducer(BaseProducer):
19+
def produce(self, topic, message):
20+
logger.info(f"message: {message}")
21+
22+
23+
class KafkaProducer(BaseProducer):
24+
def __init__(self, config):
25+
self.producer = Producer(config)
26+
logger.info("KafkaProducer initialized")
27+
28+
def produce(self, topic, key, value):
29+
delivery_reports = []
30+
31+
def delivery_report(err, msg):
32+
if err is not None:
33+
logger.error(f"Delivery failed for message {msg.key()}: {err}")
34+
else:
35+
logger.info(
36+
f"Message {msg.key()} successfully delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
37+
)
38+
delivery_reports.append((err, msg))
39+
40+
self.producer.produce(
41+
topic=topic, key=key, value=value, callback=delivery_report
42+
)
43+
self.producer.flush()
44+
return delivery_reports

src/settings/consumer.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"enable.auto.commit": False,
1919
"group.id": "westconsumer",
2020
},
21-
"topics": ["esgf-local"],
21+
"topics": ["esgf-local.transactions"],
2222
}
2323
else:
2424
event_stream = {
@@ -45,6 +45,4 @@
4545
}
4646

4747
# ESGF2 Globus Search
48-
globus_search = {
49-
"index": os.environ.get("GLOBUS_SEARCH_INDEX")
50-
}
48+
globus_search = {"index": os.environ.get("GLOBUS_SEARCH_INDEX")}

src/settings/producer.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
2+
import logging
3+
import os
4+
import socket
5+
6+
from dotenv import load_dotenv
7+
8+
from src.utils import load_access_control_policy
9+
10+
# Load the .env file
11+
load_dotenv()
12+
13+
# Suppress some kafka message streams
14+
logger = logging.getLogger("kafka")
15+
logger.setLevel(logging.WARN)
16+
17+
run_environment = os.environ.get("RUN_ENVIRONMENT", None)
18+
19+
20+
# Kafka connection details
21+
if run_environment == "local":
22+
error_event_stream = {
23+
"config": {
24+
"bootstrap.servers": "host.docker.internal:9092",
25+
"client.id": socket.gethostname(),
26+
},
27+
"topic": "esgf-local.errors",
28+
}
29+
else:
30+
error_event_stream = {
31+
"config": {
32+
"bootstrap.servers": os.environ.get("BOOTSTRAP_SERVERS"),
33+
"security.protocol": "SASL_SSL",
34+
"sasl.mechanisms": "PLAIN",
35+
"sasl.username": os.environ.get("CONFLUENT_CLOUD_USERNAME"),
36+
"sasl.password": os.environ.get("CONFLUENT_CLOUD_PASSWORD"),
37+
},
38+
"topic": os.environ.get("TOPIC", "esgf2.data-challenges.01.errors"),
39+
}

0 commit comments

Comments
 (0)