Skip to content

Commit 51a90c8

Browse files
committed
added logging and error handling
1 parent 8950aa0 commit 51a90c8

File tree

2 files changed

+28
-8
lines changed

2 files changed

+28
-8
lines changed

dypxFlow.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
logger.remove()
3030
logger.add(sys.stderr, format=logger_format)
3131

32-
__version__ = '1.0.6'
32+
__version__ = '1.0.7'
3333

3434
DISPLAY_TITLE = r"""
3535
_ _ ______ _
@@ -166,6 +166,7 @@ def main(options: Namespace, inputdir: Path, outputdir: Path):
166166
df_clean = df.fillna('')
167167
l_job = create_query(df_clean)
168168
d_df = []
169+
pipeline_errors = False
169170
if int(options.thread):
170171
with concurrent.futures.ThreadPoolExecutor(max_workers=int(options.maxThreads)) as executor:
171172
results: Iterator = executor.map(lambda t: register_and_anonymize(options, t, options.wait), l_job)
@@ -174,15 +175,20 @@ def main(options: Namespace, inputdir: Path, outputdir: Path):
174175
# executor.shutdown(wait=True)
175176
else:
176177
for d_job in l_job:
177-
status = register_and_anonymize(options, d_job)
178+
response = register_and_anonymize(options, d_job)
178179
row = d_job["raw"]
179180
row.update(d_job["push"])
180-
row["status"] = status
181+
row["status"] = response['status']
181182
d_df.append(row)
183+
if response.get('error'):
184+
pipeline_errors = True
182185

183186
csv_file = os.path.join(options.outputdir, input_file.name)
184187
df = pd.DataFrame(d_df)
185188
df.to_csv(csv_file, index=False)
189+
if pipeline_errors:
190+
LOG(f"ERROR while running pipelines.")
191+
sys.exit(1)
186192

187193

188194
if __name__ == '__main__':
@@ -192,6 +198,7 @@ def register_and_anonymize(options: Namespace, d_job: dict, wait: bool = False):
192198
1) Search through PACS for series and register in CUBE
193199
2) Run anonymize and push workflow on the registered series
194200
"""
201+
resp = {}
195202
d_job["pull"] = {
196203
"url": options.PACSurl,
197204
"pacs": options.PACSname
@@ -202,7 +209,9 @@ def register_and_anonymize(options: Namespace, d_job: dict, wait: bool = False):
202209
d_ret = cube_con.anonymize(d_job, options.pluginInstanceID)
203210
else:
204211
d_ret = d_job["push"]
205-
return d_ret['status']
212+
213+
214+
return d_ret
206215

207216

208217
def health_check(options) -> bool:

pipeline.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,19 @@ def __init__(self, client):
7676
stop=stop_after_attempt(5),
7777
reraise=True
7878
)
79-
def make_request(self, method, endpoint, **kwargs):
79+
def make_request(self, method: str, endpoint: str, **kwargs):
8080
url = f"{self.api_base}{endpoint}"
81-
response = requests.request(method, url, headers=self.headers, auth=self.auth, timeout=5, **kwargs)
81+
response = requests.request(method, url, headers=self.headers, auth=self.auth, timeout=30, **kwargs)
82+
response.raise_for_status()
83+
84+
try:
85+
return response.json().get("collection", {}).get("items", [])
86+
except ValueError:
87+
return response.text
88+
89+
def post_request(self, endpoint: str, **kwargs):
90+
url = f"{self.api_base}{endpoint}"
91+
response = requests.request("POST", url, headers=self.headers, auth=self.auth, timeout=30, **kwargs)
8292
response.raise_for_status()
8393

8494
try:
@@ -93,6 +103,7 @@ def get_pipeline_id(self, name: str) -> int:
93103
"""Fetch pipeline ID by name."""
94104
logger.info(f"Fetching ID for pipeline: {name}")
95105
response = self.make_request("GET", f"/pipelines/search/?name={name}")
106+
96107
for item in response:
97108
for field in item.get("data", []):
98109
if field.get("name") == "id":
@@ -113,7 +124,7 @@ def post_workflow(self, pipeline_id: int, previous_id: int, params: list[dict])
113124
"previous_plugin_inst_id": previous_id,
114125
"nodes_info": json.dumps(params)
115126
}
116-
return self.make_request("POST", f"/pipelines/{pipeline_id}/workflows/", json=payload)
127+
return self.post_request( f"/pipelines/{pipeline_id}/workflows/", json=payload)
117128

118129
def run_pipeline(self, pipeline_name: str, previous_inst: int, pipeline_params: dict):
119130
"""
@@ -129,7 +140,7 @@ def run_pipeline(self, pipeline_name: str, previous_inst: int, pipeline_params:
129140
nodes_info = compute_workflow_nodes_info(default_params, include_all_defaults=True)
130141
updated_params = update_plugin_parameters(nodes_info, pipeline_params)
131142
workflow = self.post_workflow(pipeline_id=pipeline_id, previous_id=previous_inst, params=updated_params)
132-
logger.info(f"Workflow posted successfully: {workflow}")
143+
logger.info(f"Workflow posted successfully")
133144
return {"status": "Pipeline running"}
134145
except Exception as ex:
135146
logger.error(f"Running pipeline failed due to: {ex}")

0 commit comments

Comments
 (0)