Skip to content

Commit faf57d1

Browse files
committed
Addresses most PR comments
1 parent 815af19 commit faf57d1

File tree

2 files changed

+54
-56
lines changed
  • community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts

2 files changed

+54
-56
lines changed
Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python3
22

3-
# Copyright 2025 "Google LLC"
3+
# Copyright 2026 "Google LLC"
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.
@@ -28,40 +28,47 @@
2828

2929
def is_node_being_repaired(node):
3030
"""Check if a node is currently being repaired."""
31-
operations = get_operations()
31+
operations = _get_operations()
3232
return node in operations and operations[node]["status"] == "REPAIR_IN_PROGRESS"
3333

34-
def get_operations():
34+
def _get_operations():
3535
"""Get all repair operations from the file."""
3636
if not REPAIR_FILE.exists():
3737
return {}
3838
with open(REPAIR_FILE, 'r', encoding='utf-8') as f:
3939
try:
4040
return json.load(f)
4141
except json.JSONDecodeError:
42+
log.error(f"Failed to decode JSON from {REPAIR_FILE}, returning empty operations list.")
4243
return {}
4344

44-
def store_operations(operations):
45+
def _write_all_operations(operations):
4546
"""Store the operations to the file."""
46-
with open(REPAIR_FILE, 'w', encoding='utf-8') as f:
47-
fcntl.lockf(f, fcntl.LOCK_EX)
48-
try:
49-
json.dump(operations, f, indent=4)
50-
finally:
51-
fcntl.lockf(f, fcntl.LOCK_UN)
47+
try:
48+
with open(REPAIR_FILE, 'w', encoding='utf-8') as f:
49+
fcntl.lockf(f, fcntl.LOCK_EX)
50+
try:
51+
json.dump(operations, f, indent=4)
52+
return True
53+
finally:
54+
fcntl.lockf(f, fcntl.LOCK_UN)
55+
except (IOError, TypeError) as e:
56+
log.error(f"Failed to store repair operations to {REPAIR_FILE}: {e}")
57+
return False
5258

5359
def store_operation(node, operation_id, reason):
5460
"""Store a single repair operation."""
55-
operations = get_operations()
61+
operations = _get_operations()
5662
operations[node] = {
5763
"operation_id": operation_id,
5864
"reason": reason,
5965
"status": "REPAIR_IN_PROGRESS",
6066
"timestamp": datetime.now(timezone.utc).isoformat(),
6167
}
62-
store_operations(operations)
68+
if not _write_all_operations(operations):
69+
log.error(f"Failed to persist repair operation for node {node}.")
6370

64-
def call_rr_api(node, reason):
71+
def _call_rr_api(node, reason):
6572
"""Call the R&R API for a given node."""
6673
log.info(f"Calling R&R API for node {node} with reason {reason}")
6774
inst = lookup().instance(node)
@@ -71,66 +78,57 @@ def call_rr_api(node, reason):
7178
cmd = f"gcloud compute instances report-host-as-faulty {node} --async --disruption-schedule=IMMEDIATE --fault-reasons=behavior={reason},description='VM is managed by Slurm' --zone={inst.zone} --format=json"
7279
try:
7380
result = run(cmd)
81+
log.info(f"gcloud compute instances report-host-as-faulty stdout: {result.stdout.strip()}")
7482
op = json.loads(result.stdout)
7583
if isinstance(op, list):
7684
op = op[0]
7785
return op["name"]
78-
except subprocess.CalledProcessError as e:
79-
log.error(f"Failed to call R&R API for {node} due to command execution error: {e}")
80-
return None
81-
except json.JSONDecodeError as e:
82-
log.error(f"Failed to parse R&R API response for {node} due to JSON decode error: {e}")
86+
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
87+
log.error(f"Failed to call or parse R&R API response for {node}: {e}")
8388
return None
8489
except Exception as e:
8590
log.error(f"An unexpected error occurred while calling R&R API for {node}: {e}")
8691
return None
8792

88-
def get_operation_status(operation_id):
93+
def _get_operation_status(operation_id):
8994
"""Get the status of a GCP operation."""
9095
cmd = f'gcloud compute operations list --filter="name={operation_id}" --format=json'
9196
try:
9297
result = run(cmd)
9398
operations_list = json.loads(result.stdout)
94-
if operations_list and len(operations_list) > 0:
99+
if operations_list:
95100
return operations_list[0]
96101

97102
return None
98-
except subprocess.CalledProcessError as e:
99-
log.error(f"Failed to get operation status for {operation_id} due to command execution error: {e}")
100-
return None
101-
except json.JSONDecodeError as e:
102-
log.error(f"Failed to parse operation status for {operation_id} due to JSON decode error: {e}")
103+
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
104+
log.error(f"Failed to get or parse operation status for {operation_id}: {e}")
103105
return None
104106
except Exception as e:
105107
log.error(f"An unexpected error occurred while getting operation status for {operation_id}: {e}")
106108
return None
107109

108110
def poll_operations():
109111
"""Poll the status of ongoing repair operations."""
110-
operations = get_operations()
112+
operations = _get_operations()
111113
if not operations:
112114
return
113115

114116
log.info("Polling repair operations")
115117
for node, op_details in operations.items():
116118
if op_details["status"] == "REPAIR_IN_PROGRESS":
117-
op_status = get_operation_status(op_details["operation_id"])
118-
if not op_status:
119+
gcp_op_status = _get_operation_status(op_details["operation_id"])
120+
if not gcp_op_status:
119121
continue
120122

121-
if op_status.get("status") == "DONE":
122-
if op_status.get("error"):
123-
log.error(f"Repair operation for {node} failed: {op_status['error']}")
123+
if gcp_op_status.get("status") == "DONE":
124+
if gcp_op_status.get("error"):
125+
log.error(f"Repair operation for {node} failed: {gcp_op_status['error']}")
124126
op_details["status"] = "FAILURE"
125127
run(f"{lookup().scontrol} update nodename={node} state=down reason='Repair failed'")
126128
else:
127129
log.info(f"Repair operation for {node} succeeded. Powering down the VM")
128130
run(f"{lookup().scontrol} update nodename={node} state=power_down reason='Repair succeeded'")
129131
op_details["status"] = "SUCCESS"
130-
elif op_details["status"] == "SUCCESS":
131-
inst = lookup().instance(node)
132-
if inst and inst.status == "RUNNING":
133-
log.info(f"Node {node} is back online.")
134-
op_details["status"] = "RECOVERED"
135132

136-
store_operations(operations)
133+
if not _write_all_operations(operations):
134+
log.error("Failed to persist updated repair operations state after polling.")

community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/tests/test_repair.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python3
22

3-
# Copyright 2025 "Google LLC"
3+
# Copyright 2026 "Google LLC"
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.
@@ -30,7 +30,7 @@ def setUp(self):
3030
# Reset the REPAIR_FILE path for each test
3131
repair.REPAIR_FILE = Path("/slurm/repair_operations.json")
3232

33-
@patch('repair.get_operations')
33+
@patch('repair._get_operations')
3434
def test_is_node_being_repaired(self, mock_get_operations):
3535
mock_get_operations.return_value = {
3636
"node-1": {"status": "REPAIR_IN_PROGRESS"},
@@ -43,26 +43,26 @@ def test_is_node_being_repaired(self, mock_get_operations):
4343
@patch('builtins.open', new_callable=mock_open, read_data='{"node-1": {"status": "REPAIR_IN_PROGRESS"}}')
4444
@patch('pathlib.Path.exists', return_value=True)
4545
def test_get_operations_success(self, mock_exists, mock_open_file):
46-
ops = repair.get_operations()
46+
ops = repair._get_operations()
4747
self.assertEqual(ops, {"node-1": {"status": "REPAIR_IN_PROGRESS"}})
4848
mock_open_file.assert_called_with(repair.REPAIR_FILE, 'r', encoding='utf-8')
4949

5050
@patch('builtins.open', new_callable=mock_open, read_data='invalid json')
5151
@patch('pathlib.Path.exists', return_value=True)
5252
def test_get_operations_json_decode_error(self, mock_exists, mock_open_file):
53-
ops = repair.get_operations()
53+
ops = repair._get_operations()
5454
self.assertEqual(ops, {})
5555

5656
@patch('pathlib.Path.exists', return_value=False)
5757
def test_get_operations_file_not_found(self, mock_exists):
58-
ops = repair.get_operations()
58+
ops = repair._get_operations()
5959
self.assertEqual(ops, {})
6060

6161
@patch('builtins.open', new_callable=mock_open)
6262
@patch('fcntl.lockf')
63-
def test_store_operations(self, mock_lockf, mock_open_file):
63+
def test_write_all_operations(self, mock_lockf, mock_open_file):
6464
operations = {"node-1": {"status": "SUCCESS"}}
65-
repair.store_operations(operations)
65+
repair._write_all_operations(operations)
6666
mock_open_file.assert_called_with(repair.REPAIR_FILE, 'w', encoding='utf-8')
6767
handle = mock_open_file()
6868
expected_json_string = json.dumps(operations, indent=4)
@@ -72,10 +72,10 @@ def test_store_operations(self, mock_lockf, mock_open_file):
7272
mock_lockf.assert_any_call(handle, fcntl.LOCK_EX)
7373
mock_lockf.assert_any_call(handle, fcntl.LOCK_UN)
7474

75-
@patch('repair.get_operations', return_value={})
76-
@patch('repair.store_operations')
75+
@patch('repair._get_operations', return_value={})
76+
@patch('repair._write_all_operations')
7777
@patch('repair.datetime')
78-
def test_store_operation(self, mock_datetime, mock_store_operations, mock_get_operations):
78+
def test_store_operation(self, mock_datetime, mock_write_all_operations, mock_get_operations):
7979
mock_now = datetime(2025, 1, 1, tzinfo=timezone.utc)
8080
mock_datetime.now.return_value = mock_now
8181

@@ -89,7 +89,7 @@ def test_store_operation(self, mock_datetime, mock_store_operations, mock_get_op
8989
"timestamp": mock_now.isoformat(),
9090
}
9191
}
92-
mock_store_operations.assert_called_with(expected_operations)
92+
mock_write_all_operations.assert_called_with(expected_operations)
9393

9494
@patch('repair.lookup')
9595
@patch('repair.run')
@@ -104,14 +104,14 @@ def test_call_rr_api_success(self, mock_run, mock_lookup):
104104
returncode=0
105105
)
106106

107-
op_id = repair.call_rr_api("node-1", "XID")
107+
op_id = repair._call_rr_api("node-1", "XID")
108108
self.assertEqual(op_id, "op-123")
109109
mock_run.assert_called_once()
110110

111111
@patch('repair.lookup')
112112
def test_call_rr_api_instance_not_found(self, mock_lookup):
113113
mock_lookup.return_value.instance.return_value = None
114-
op_id = repair.call_rr_api("node-1", "XID")
114+
op_id = repair._call_rr_api("node-1", "XID")
115115
self.assertIsNone(op_id)
116116

117117
@patch('repair.lookup')
@@ -120,24 +120,24 @@ def test_call_rr_api_run_error(self, mock_run, mock_lookup):
120120
mock_instance = MagicMock()
121121
mock_instance.zone = "us-central1-a"
122122
mock_lookup.return_value.instance.return_value = mock_instance
123-
op_id = repair.call_rr_api("node-1", "XID")
123+
op_id = repair._call_rr_api("node-1", "XID")
124124
self.assertIsNone(op_id)
125125

126126
@patch('repair.run')
127127
def test_get_operation_status_success(self, mock_run):
128128
mock_run.return_value.stdout = '[{"status": "DONE"}]'
129-
status = repair.get_operation_status("op-123")
129+
status = repair._get_operation_status("op-123")
130130
self.assertEqual(status, {"status": "DONE"})
131131

132132
@patch('repair.run')
133133
def test_get_operation_status_empty_list(self, mock_run):
134134
mock_run.return_value.stdout = '[]'
135-
status = repair.get_operation_status("op-123")
135+
status = repair._get_operation_status("op-123")
136136
self.assertIsNone(status)
137137

138-
@patch('repair.get_operations')
139-
@patch('repair.store_operations')
140-
@patch('repair.get_operation_status')
138+
@patch('repair._get_operations')
139+
@patch('repair._write_all_operations')
140+
@patch('repair._get_operation_status')
141141
@patch('repair.lookup')
142142
@patch('repair.run')
143143
def test_poll_operations(self, mock_run, mock_lookup, mock_get_op_status, mock_store_ops, mock_get_ops):

0 commit comments

Comments
 (0)