Skip to content

Commit c9ab700

Browse files
Fix pre-commit config - calling different Python versions
1 parent dcb971b commit c9ab700

File tree

6 files changed

+10
-29
lines changed

6 files changed

+10
-29
lines changed

.github/workflows/pre-commit.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
- uses: actions/checkout@v3
1313
- uses: actions/setup-python@v4
1414
with:
15-
python-version: '3.10'
15+
python-version: '3.13'
1616
- run: python -m pip install pre-commit
1717
shell: bash
1818
- run: python -m pip freeze --local

src/consumer.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ def process_messages(self, messages):
2121
data = json.loads(msg.value())
2222
messages_data.append(data)
2323
except json.JSONDecodeError as e:
24-
logging.error(
25-
f"Data deserialization error at offset {msg.offset()}: {e}."
26-
)
24+
logging.error(f"Data deserialization error at offset {msg.offset()}: {e}.")
2725
return False
2826
return messages_data
2927

@@ -39,9 +37,7 @@ def start(self):
3937
logging.info(f"Consumed {len(messages)} messages")
4038
first_msg = messages[0]
4139
offset = 0 if not first_msg.offset() else first_msg.offset()
42-
self.seek_partition = TopicPartition(
43-
first_msg.topic(), first_msg.partition(), offset
44-
)
40+
self.seek_partition = TopicPartition(first_msg.topic(), first_msg.partition(), offset)
4541

4642
messages_data = self.process_messages(messages)
4743
if not messages_data:

src/globus.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,7 @@ def ingest(self, messages_data):
8787
def post(self, message_data):
8888
item = message_data.get("data").get("payload").get("item")
8989
try:
90-
globus_response = self.search_client.get_subject(
91-
self.esgf_index, item.get("id")
92-
)
90+
globus_response = self.search_client.get_subject(self.esgf_index, item.get("id"))
9391
except SearchAPIError as e:
9492
if e.http_status == 404:
9593
item["assets"] = self.normalize_assets(item.get("assets"))
@@ -121,9 +119,7 @@ def json_patch(self, message_data):
121119
return None
122120
item = globus_response.data.get("entries")[0].get("content")
123121
item["assets"] = self.denormalize_assets(item.get("assets"))
124-
patched_item = jsonpatch.apply_patch(
125-
item, payload.get("patch").get("operations")
126-
)
122+
patched_item = jsonpatch.apply_patch(item, payload.get("patch").get("operations"))
127123
patched_item["assets"] = self.normalize_assets(patched_item.get("assets"))
128124
gmeta_entry = self.gmetaentry(patched_item)
129125
return gmeta_entry

src/main.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,16 @@
33
from consumer import KafkaConsumerService
44
from globus import ConsumerSearchClient
55
from producer import KafkaProducer
6-
from settings.consumer import (event_stream, globus_search,
7-
globus_search_client_credentials)
6+
from settings.consumer import event_stream, globus_search, globus_search_client_credentials
87
from settings.producer import error_event_stream
98

10-
logging.basicConfig(
11-
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
12-
)
9+
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO)
1310

1411

1512
if __name__ == "__main__":
1613
error_producer = KafkaProducer(config=error_event_stream.get("config"))
1714

18-
message_processor = ConsumerSearchClient(
19-
globus_search_client_credentials, globus_search.get("index"), error_producer
20-
)
15+
message_processor = ConsumerSearchClient(globus_search_client_credentials, globus_search.get("index"), error_producer)
2116

2217
consumer_service = KafkaConsumerService(
2318
event_stream.get("config"),

src/producer.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,9 @@ def delivery_report(err, msg):
3232
if err is not None:
3333
logger.error(f"Delivery failed for message {msg.key()}: {err}")
3434
else:
35-
logger.info(
36-
f"Message {msg.key()} successfully delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
37-
)
35+
logger.info(f"Message {msg.key()} successfully delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
3836
delivery_reports.append((err, msg))
3937

40-
self.producer.produce(
41-
topic=topic, key=key, value=value, callback=delivery_report
42-
)
38+
self.producer.produce(topic=topic, key=key, value=value, callback=delivery_report)
4339
self.producer.flush()
4440
return delivery_reports

src/settings/producer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
from dotenv import load_dotenv
66

7-
from src.utils import load_access_control_policy
8-
97
# Load the .env file
108
load_dotenv()
119

0 commit comments

Comments
 (0)