Skip to content

Commit 0b59313

Browse files
Black and isort
1 parent d8a0e4d commit 0b59313

File tree

4 files changed

+17
-8
lines changed

4 files changed

+17
-8
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ venv
66
dist
77
build
88
*.egg-info
9-
.env*
9+
.env*
10+
.pytest*

src/consumer.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ def process_messages(self, messages):
2121
data = json.loads(msg.value())
2222
messages_data.append(data)
2323
except json.JSONDecodeError as e:
24-
logging.error(f"Data deserialization error at offset {msg.offset()}: {e}.")
24+
logging.error(
25+
f"Data deserialization error at offset {msg.offset()}: {e}."
26+
)
2527
return False
2628
return messages_data
2729

@@ -37,7 +39,9 @@ def start(self):
3739
logging.info(f"Consumed {len(messages)} messages")
3840
first_msg = messages[0]
3941
offset = 0 if not first_msg.offset() else first_msg.offset()
40-
self.seek_partition = TopicPartition(first_msg.topic(), first_msg.partition(), offset)
42+
self.seek_partition = TopicPartition(
43+
first_msg.topic(), first_msg.partition(), offset
44+
)
4145

4246
messages_data = self.process_messages(messages)
4347
if not messages_data:

src/globus.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import logging
22
import time
3-
import jsonpatch
43

5-
from globus_sdk import ClientCredentialsAuthorizer, ConfidentialAppAuthClient, SearchClient
4+
import jsonpatch
5+
from globus_sdk import (ClientCredentialsAuthorizer, ConfidentialAppAuthClient,
6+
SearchClient)
67
from globus_sdk.scopes import SearchScopes
78
from globus_sdk.services.search.errors import SearchAPIError
89

@@ -69,7 +70,9 @@ def ingest(self, messages_data):
6970
def post(self, message_data):
7071
item = message_data.get("data").get("payload").get("item")
7172
try:
72-
globus_response = self.search_client.get_subject(self.esgf_index, item.get("id"))
73+
globus_response = self.search_client.get_subject(
74+
self.esgf_index, item.get("id")
75+
)
7376
except SearchAPIError as e:
7477
if e.http_status == 404:
7578
item["assets"] = self.convert_assets(item.get("assets"))
@@ -99,7 +102,9 @@ def json_patch(self, message_data):
99102
value=f"Item with ID {item_id} does not exist in the index.",
100103
)
101104
return None
102-
gmeta_entry = jsonpatch.apply_patch(globus_response.data.get("content"), payload.get("patch"))
105+
gmeta_entry = jsonpatch.apply_patch(
106+
globus_response.data.get("content"), payload.get("patch")
107+
)
103108
return gmeta_entry
104109

105110
def delete(self, subject):

src/settings/producer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
import logging
32
import os
43
import socket

0 commit comments

Comments
 (0)