Skip to content

Commit 002c691

Browse files
committed
feat(linstor): add LinstorManager class
Signed-off-by: Ronan Abhamon <ronan.abhamon@vates.tech>
1 parent d7cd0b1 commit 002c691

File tree

1 file changed

+385
-0
lines changed

1 file changed

+385
-0
lines changed

scripts/linstor-repair

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright (C) 2026 Vates SAS
4+
#
5+
# This program is free software: you can redistribute it and/or modify
6+
# it under the terms of the GNU General Public License as published by
7+
# the Free Software Foundation, either version 3 of the License, or
8+
# (at your option) any later version.
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public License
15+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
17+
from typing import (
18+
Any,
19+
Callable,
20+
Dict,
21+
List,
22+
Optional,
23+
TypeVar
24+
)
25+
26+
# TODO: Remove
27+
import sys
28+
sys.path[0] = '/opt/xensource/sm/'
29+
30+
import argparse
31+
import linstor
32+
import time
33+
34+
from collections import defaultdict
35+
36+
# TODO: Remove
37+
from linstorvolumemanager import _get_controller_uri
38+
39+
T = TypeVar("T")
40+
41+
# ==============================================================================
42+
43+
def wait_for_condition(function: Callable[[], T], timeout: int, interval: int) -> T:
44+
if timeout <= 0:
45+
return function()
46+
47+
start_time = time.time()
48+
while True:
49+
result = function()
50+
if result:
51+
return result
52+
if time.time() - start_time >= timeout:
53+
return None
54+
time.sleep(interval)
55+
56+
def logger(message: str) -> None:
57+
print(message)
58+
59+
# ==============================================================================
60+
61+
class LinstorManagerError(Exception):
62+
ERR_GENERIC = 0
63+
ERR_NETWORK = 1
64+
ERR_RESOURCE_DESTROY = 2
65+
ERR_RESOURCE_TOGGLE = 3
66+
ERR_RESOURCE_PROP_UPDATE = 4
67+
68+
def __init__(self, message: str, code=ERR_GENERIC) -> None:
69+
super().__init__(message)
70+
self._code = code
71+
72+
@property
73+
def code(self) -> int:
74+
return self._code
75+
76+
# ------------------------------------------------------------------------------
77+
78+
class LinstorManagerBase(object):
79+
def __init__(self, uri = None) -> None:
80+
self._linstor = None
81+
self._uri = uri
82+
83+
def connect(self) -> None:
84+
if self._linstor:
85+
return
86+
87+
uri = None
88+
def connect_impl():
89+
nonlocal uri
90+
91+
if self._uri:
92+
uri = self._uri
93+
else:
94+
uri = self.find_controller_uri(timeout=0)
95+
if not uri:
96+
return False
97+
98+
try:
99+
instance = linstor.Linstor(uri, timeout=5, keep_alive=True)
100+
instance.connect()
101+
self._linstor = instance
102+
self._uri = uri
103+
return True
104+
except (linstor.errors.LinstorNetworkError, linstor.errors.LinstorTimeoutError):
105+
pass
106+
except Exception as e:
107+
logger(f"Unable to connect to LINSTOR: `{e}`.")
108+
return False
109+
110+
if wait_for_condition(connect_impl, timeout=120, interval=1):
111+
return
112+
113+
self._uri = None
114+
if not uri:
115+
raise LinstorManagerError(
116+
"Unable to find controller uri...",
117+
LinstorManagerError.ERR_NETWORK
118+
)
119+
120+
raise LinstorManagerError(
121+
f"Unable to connect to LINSTOR with URI: `{uri}`.",
122+
LinstorManagerError.ERR_NETWORK
123+
)
124+
125+
@staticmethod
126+
def find_controller_uri(timeout: int = 30) -> Optional[str]:
127+
return wait_for_condition(_get_controller_uri, timeout, interval=1)
128+
129+
def _exec_query(self, query: Callable[[linstor.Linstor, T], T], *args, **kwargs) -> T:
130+
while True:
131+
self.connect()
132+
try:
133+
return query(self._linstor, *args, **kwargs)
134+
except (linstor.errors.LinstorNetworkError, linstor.errors.LinstorTimeoutError):
135+
self._linstor = None
136+
except Exception as e:
137+
raise LinstorManagerError(f"LINSTOR query exception: `{e}`.") from e
138+
139+
@staticmethod
140+
def _filter_errors(result: List[Any]) -> List[linstor.responses.ApiCallResponse]:
141+
return [
142+
err for err in result
143+
if hasattr(err, "is_error") and err.is_error()
144+
]
145+
146+
@classmethod
147+
def _get_error_str(cls, result: List[Any]) -> str:
148+
return ", ".join([
149+
err.message for err in cls._filter_errors(result)
150+
])
151+
152+
# ------------------------------------------------------------------------------
153+
154+
class LinstorManager(LinstorManagerBase):
155+
class ResourceReplicas(object):
156+
def __init__(self, name: str, node_names: List[str], expected_count: int) -> None:
157+
self.name = name
158+
self.node_names = node_names
159+
self.expected_count = expected_count
160+
161+
def resource_delete(self, resource_name: str, node_name: str) -> None:
162+
result = self._exec_query(
163+
linstor.Linstor.resource_delete,
164+
node_name,
165+
resource_name
166+
)
167+
errors = self._filter_errors(result)
168+
if errors:
169+
error_str = self._get_error_str(errors)
170+
raise LinstorManagerError(
171+
f"Failed to destroy resource `{resource_name}` on node `{node_name}`: `{error_str}`.",
172+
LinstorManagerError.ERR_RESOURCE_DESTROY
173+
)
174+
175+
def resource_toggle(self, resource_name: str, node_name: str, diskless: bool) -> None:
176+
result = self._exec_query(
177+
linstor.Linstor.resource_toggle_disk,
178+
node_name,
179+
resource_name,
180+
diskless=diskless
181+
)
182+
errors = self._filter_errors(result)
183+
if errors:
184+
state = "Diskless" if diskless else "Diskful"
185+
error_str = self._get_error_str(errors)
186+
raise LinstorManagerError(
187+
f"Could not toggle resource `{resource_name}` on node `{node_name}` to {state}: `{error_str}`.",
188+
LinstorManagerError.ERR_RESOURCE_TOGGLE
189+
)
190+
191+
def resource_remove_skip_disk_flag(self, resource_name: str, node_name: str) -> None:
192+
return self._resource_delete_properties(resource_name, node_name, ["DrbdOptions/SkipDisk"])
193+
194+
def resources_create_missing_replicas(self) -> None:
195+
def resources_get_missing_replicas(self, resource_group_name: Optional[str] = None) -> List[ResourceReplicas]:
196+
resource_replicas = []
197+
group_name_to_replication_count = {
198+
resource_group.name: resource_group.select_filter.place_count
199+
for resource_group in self._fetch_resource_groups(resource_group_name)
200+
}
201+
if not group_name_to_replication_count:
202+
return {}
203+
204+
name_to_resources = self._fetch_resources()
205+
name_to_resource_definitions = self._fetch_resource_definitions()
206+
for resource_definition in name_to_resource_definitions.values():
207+
replication_count = group_name_to_replication_count.get(resource_definition.resource_group_name)
208+
if replication_count is None:
209+
continue
210+
211+
if resource_definition.resource_group_name not in group_name_to_replication_count:
212+
continue
213+
214+
resources = name_to_resources.get(resource_definition.name)
215+
if not resources:
216+
continue
217+
218+
diskful_nodes = [
219+
resource.node_name for resource in resources
220+
if linstor.consts.FLAG_DISKLESS not in resource.flags
221+
]
222+
if len(diskful_nodes) < replication_count:
223+
resource_replicas.append(
224+
self.ResourceReplicas(resource_definition.name, diskful_nodes, replication_count)
225+
)
226+
227+
return resource_replicas
228+
229+
def resources_remove_skip_disks(self) -> None:
230+
name_to_resources = self._fetch_resources()
231+
232+
for resources in name_to_resources.values():
233+
skip_disk_resources = {
234+
resource.node_name: resource
235+
for resource in resources
236+
if self._is_skip_disk_resource(resource)
237+
}
238+
# Skip disk deletion is only permitted if all remaining volumes are in a stable state.
239+
if not skip_disk_resources or not all(
240+
self._is_online_disk_state(volume.state.disk_state)
241+
for resource in resources
242+
if resource.node_name not in skip_disk_resources
243+
for volume in resource.volumes
244+
):
245+
continue
246+
247+
for resource in skip_disk_resources.values():
248+
try:
249+
if linstor.consts.FLAG_DISKLESS not in resource.flags:
250+
self.resource_toggle(resource.name, resource.node_name, True)
251+
self.resource_remove_skip_disk_flag(resource.name, resource.node_name)
252+
except LinstorManagerError as e:
253+
if e.code == LinstorManagerError.ERR_NETWORK:
254+
raise
255+
logger(
256+
f"Failed to delete skip disk on resource `{resource.name}`"
257+
f" on node `{resource.node_name}`: {e}"
258+
)
259+
260+
def _fetch_storage_pools(
261+
self, node_filter: Optional[List[str]] = None, name_filter: Optional[List[str]] = None
262+
) -> List[linstor.responses.StoragePool]:
263+
result = self._exec_query(
264+
linstor.Linstor.storage_pool_list, filter_by_nodes=node_filter, filter_by_stor_pools=name_filter
265+
)
266+
if not result:
267+
raise LinstorManagerError("Failed to fetch storage pools: no data.")
268+
result = result[0]
269+
270+
if not isinstance(result, linstor.responses.StoragePoolListResponse):
271+
raise LinstorManagerError(
272+
f"Failed to fetch storage pools: `{result}`.",
273+
LinstorManagerError.ERR_STORAGE_POOL_FETCH
274+
)
275+
276+
return result.storage_pools
277+
278+
def _fetch_resource_groups(self, name_filter: Optional[List[str]] = None) -> List[linstor.responses.ResourceGroup]:
279+
# There is no `resource_group_list` method, so find LINSTOR API call exception...
280+
try:
281+
return self._exec_query(
282+
linstor.Linstor.resource_group_list_raise, filter_by_resource_groups=name_filter
283+
).resource_groups
284+
except Exception as e:
285+
cause = e.__cause__
286+
if not cause:
287+
raise
288+
raise LinstorManagerError( # pylint: disable = W0707
289+
f"Failed to fetch resource groups: `{cause}`.",
290+
LinstorManagerError.ERR_RESOURCE_GROUP_FETCH
291+
)
292+
293+
def _fetch_resource_definitions(
294+
self, fetch_volumes = False, name_filter: Optional[List[str]] = None
295+
) -> Dict[str, List[linstor.responses.ResourceDefinition]]:
296+
result = self._exec_query(
297+
linstor.Linstor.resource_dfn_list,
298+
query_volume_definitions=fetch_volumes,
299+
filter_by_resource_definitions=name_filter
300+
)
301+
if not result:
302+
raise LinstorManagerError(
303+
"Failed to fetch resource definitions: no data.",
304+
LinstorManagerError.ERR_RESOURCE_DEFINITION_FETCH
305+
)
306+
result = result[0]
307+
308+
if not isinstance(result, linstor.responses.ResourceDefinitionResponse):
309+
raise LinstorManagerError(
310+
f"Failed to fetch resource definitions: `{result}`.",
311+
LinstorManagerError.ERR_RESOURCE_DEFINITION_FETCH
312+
)
313+
314+
return {resource_definition.name: resource_definition for resource_definition in result.resource_definitions}
315+
316+
def _fetch_resources(self) -> Dict[str, List[linstor.responses.Resource]]:
317+
result = self._exec_query(linstor.Linstor.resource_list)
318+
if not result:
319+
raise LinstorManagerError(
320+
"Failed to fetch resources: no data.",
321+
LinstorManagerError.ERR_RESOURCE_FETCH
322+
)
323+
result = result[0]
324+
325+
if not isinstance(result, linstor.responses.ResourceResponse):
326+
raise LinstorManagerError(
327+
f"Failed to fetch resources: `{result}`.",
328+
LinstorManagerError.ERR_RESOURCE_FETCH
329+
)
330+
331+
resources = defaultdict(list)
332+
for resource in result.resources:
333+
resources[resource.name].append(resource)
334+
return resources
335+
336+
def _resource_delete_properties(self, resource_name: str, node_name: str, properties: List[str]) -> None:
337+
result = self._exec_query(
338+
linstor.Linstor.resource_modify,
339+
node_name,
340+
resource_name,
341+
property_dict={},
342+
delete_props=properties
343+
)
344+
errors = self._filter_errors(result)
345+
if errors:
346+
error_str = self._get_error_str(errors)
347+
raise LinstorManagerError(
348+
f"Failed to destroy resource properties of `{resource_name}` on node `{node_name}`: `{error_str}`.",
349+
LinstorManagerError.ERR_RESOURCE_PROP_UPDATE
350+
)
351+
352+
@staticmethod
353+
def _is_online_disk_state(disk_state: str) -> bool:
354+
return disk_state in ("UpToDate", "Diskless") or disk_state.startswith("SyncTarget")
355+
356+
@classmethod
357+
def _is_online_volume(cls, volume: linstor.responses.Volume) -> bool:
358+
return cls._is_online_disk_state(volume.state.disk_state)
359+
360+
@staticmethod
361+
def _is_skip_disk_resource(resource: linstor.responses.Resource) -> bool:
362+
return resource.properties.get("DrbdOptions/SkipDisk") == "True"
363+
364+
# ==============================================================================
365+
366+
def main():
367+
parser = argparse.ArgumentParser()
368+
parser.add_argument("-u", "--uri", required=False)
369+
370+
parser.add_argument("--create-missing-replicas", action="store_true")
371+
parser.add_argument("--remove-skip-disks", action="store_true")
372+
parser.add_argument("--evacuate-resources", action="store_true")
373+
374+
args = parser.parse_args()
375+
376+
linstor_manager = LinstorManager(args.uri)
377+
if args.remove_skip_disks:
378+
linstor_manager.resources_remove_skip_disks()
379+
if args.create_missing_replicas:
380+
linstor_manager.resources_create_missing_replicas()
381+
if args.evacuate_resources:
382+
linstor_manager.resources_evacuate()
383+
384+
if __name__ == "__main__":
385+
main()

0 commit comments

Comments
 (0)