Skip to content

Commit 2a1809b

Browse files
committed
feat(linstor): parallelize SR.scan/cleanup (vhd-util calls)
- Simplify scan logic removing XAPI calls. - Create one XAPI session / LINSTOR connection in each worker thread. Signed-off-by: Ronan Abhamon <ronan.abhamon@vates.tech>
1 parent f67bd89 commit 2a1809b

File tree

5 files changed

+174
-65
lines changed

5 files changed

+174
-65
lines changed

drivers/LinstorSR.py

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
try:
2222
from linstorjournaler import LinstorJournaler
23-
from linstorvhdutil import LinstorVhdUtil
23+
from linstorvhdutil import LinstorVhdUtil, MultiLinstorVhdUtil
2424
from linstorvolumemanager import get_controller_uri
2525
from linstorvolumemanager import get_controller_node_name
2626
from linstorvolumemanager import LinstorVolumeManager
@@ -372,6 +372,7 @@ def load(self, sr_uuid) -> None:
372372
self._vdis_loaded = False
373373
self._all_volume_info_cache = None
374374
self._all_volume_metadata_cache = None
375+
self._multi_vhdutil = None
375376

376377
# To remove in python 3.10.
377378
# Use directly @staticmethod instead.
@@ -1092,8 +1093,10 @@ def _load_vdis_ex(self):
10921093
else:
10931094
introduce = True
10941095

1095-
# 4. Now check all volume info.
1096+
# 4. Now process all volume info.
10961097
vdi_to_snaps = {}
1098+
vdi_uuids = []
1099+
10971100
for vdi_uuid, volume_info in all_volume_info.items():
10981101
if vdi_uuid.startswith(cleanup.SR.TMP_RENAME_PREFIX):
10991102
continue
@@ -1202,14 +1205,30 @@ def _load_vdis_ex(self):
12021205
vdi_to_snaps[snap_uuid] = [vdi_uuid]
12031206

12041207
# 4.b. Add the VDI in the list.
1208+
vdi_uuids.append(vdi_uuid)
1209+
1210+
# 5. Create VDIs.
1211+
self._multi_vhdutil = MultiLinstorVhdUtil(self._linstor.uri, self._group_name)
1212+
1213+
def load_vdi(vdi_uuid, vhdutil_instance):
12051214
vdi = self.vdi(vdi_uuid)
1206-
self.vdis[vdi_uuid] = vdi
12071215

12081216
if USE_KEY_HASH and vdi.vdi_type == vhdutil.VDI_TYPE_VHD:
1209-
vdi.sm_config_override['key_hash'] = self._vhdutil.get_key_hash(vdi_uuid)
1217+
vdi.sm_config_override['key_hash'] = vhdutil_instance.get_key_hash(vdi_uuid)
12101218

1211-
# 4.c. Update CBT status of disks either just added
1212-
# or already in XAPI.
1219+
return vdi
1220+
1221+
try:
1222+
self.vdis = {vdi.uuid: vdi for vdi in self._multi_vhdutil.run(load_vdi, vdi_uuids)}
1223+
finally:
1224+
multi_vhdutil = self._multi_vhdutil
1225+
self._multi_vhdutil = None
1226+
del multi_vhdutil
1227+
1228+
# 6. Update CBT status of disks either just added
1229+
# or already in XAPI.
1230+
for vdi in self.vdis.values():
1231+
volume_metadata = volumes_metadata.get(vdi.uuid)
12131232
cbt_uuid = volume_metadata.get(CBTLOG_TAG)
12141233
if cbt_uuid in cbt_vdis:
12151234
vdi_ref = xenapi.VDI.get_by_uuid(vdi_uuid)
@@ -1220,7 +1239,7 @@ def _load_vdis_ex(self):
12201239
self.vdis[vdi_uuid].cbt_enabled = True
12211240
cbt_vdis.remove(cbt_uuid)
12221241

1223-
# 5. Now set the snapshot statuses correctly in XAPI.
1242+
# 7. Now set the snapshot statuses correctly in XAPI.
12241243
for src_uuid in vdi_to_snaps:
12251244
try:
12261245
src_ref = xenapi.VDI.get_by_uuid(src_uuid)
@@ -1239,7 +1258,7 @@ def _load_vdis_ex(self):
12391258
# TODO: Check correctly how to use CBT.
12401259
# Update cbt_enabled on the right VDI, check LVM/FileSR code.
12411260

1242-
# 6. If we have items remaining in this list,
1261+
# 8. If we have items remaining in this list,
12431262
# they are cbt_metadata VDI that XAPI doesn't know about.
12441263
# Add them to self.vdis and they'll get added to the DB.
12451264
for cbt_uuid in cbt_vdis:
@@ -1248,10 +1267,10 @@ def _load_vdis_ex(self):
12481267
new_vdi.cbt_enabled = True
12491268
self.vdis[cbt_uuid] = new_vdi
12501269

1251-
# 7. Update virtual allocation, build geneology and remove useless VDIs
1270+
# 9. Update virtual allocation, build geneology and remove useless VDIs
12521271
self.virtual_allocation = 0
12531272

1254-
# 8. Build geneology.
1273+
# 10. Build geneology.
12551274
geneology = {}
12561275

12571276
for vdi_uuid, vdi in self.vdis.items():
@@ -1265,7 +1284,7 @@ def _load_vdis_ex(self):
12651284
if not vdi.hidden:
12661285
self.virtual_allocation += vdi.size
12671286

1268-
# 9. Remove all hidden leaf nodes to avoid introducing records that
1287+
# 11. Remove all hidden leaf nodes to avoid introducing records that
12691288
# will be GC'ed.
12701289
for vdi_uuid in list(self.vdis.keys()):
12711290
if vdi_uuid not in geneology and self.vdis[vdi_uuid].hidden:
@@ -2096,13 +2115,15 @@ def _load_this(self):
20962115
volume_metadata = None
20972116
if self.sr._all_volume_metadata_cache:
20982117
volume_metadata = self.sr._all_volume_metadata_cache.get(self.uuid)
2099-
if volume_metadata is None:
2118+
assert volume_metadata
2119+
else:
21002120
volume_metadata = self._linstor.get_volume_metadata(self.uuid)
21012121

21022122
volume_info = None
21032123
if self.sr._all_volume_info_cache:
21042124
volume_info = self.sr._all_volume_info_cache.get(self.uuid)
2105-
if volume_info is None:
2125+
assert volume_info
2126+
else:
21062127
volume_info = self._linstor.get_volume_info(self.uuid)
21072128

21082129
# Contains the max physical size used on a disk.
@@ -2119,7 +2140,13 @@ def _load_this(self):
21192140
self.size = volume_info.virtual_size
21202141
self.parent = ''
21212142
else:
2122-
vhd_info = self.sr._vhdutil.get_vhd_info(self.uuid)
2143+
if self.sr._multi_vhdutil:
2144+
vhdutil_instance = self.sr._multi_vhdutil.local_vhdutil
2145+
else:
2146+
vhdutil_instance = self.sr._vhdutil
2147+
2148+
vhd_info = vhdutil_instance.get_vhd_info(self.uuid)
2149+
21232150
self.hidden = vhd_info.hidden
21242151
self.size = vhd_info.sizeVirt
21252152
self.parent = vhd_info.parentUuid
@@ -2223,32 +2250,26 @@ def _determine_type_and_path(self):
22232250
Determine whether this is a RAW or a VHD VDI.
22242251
"""
22252252

2226-
# 1. Check vdi_ref and vdi_type in config.
2227-
try:
2228-
vdi_ref = self.session.xenapi.VDI.get_by_uuid(self.uuid)
2229-
if vdi_ref:
2230-
sm_config = self.session.xenapi.VDI.get_sm_config(vdi_ref)
2231-
vdi_type = sm_config.get('vdi_type')
2232-
if vdi_type:
2233-
# Update parent fields.
2234-
self.vdi_type = vdi_type
2235-
self.sm_config_override = sm_config
2236-
self._update_device_name(
2237-
self._linstor.get_volume_name(self.uuid)
2238-
)
2239-
return
2240-
except Exception:
2241-
pass
2253+
if self.sr._all_volume_metadata_cache:
2254+
# We are currently loading all volumes.
2255+
volume_metadata = self.sr._all_volume_metadata_cache.get(self.uuid)
2256+
if not volume_metadata:
2257+
raise xs_errors.XenError(
2258+
'VDIUnavailable',
2259+
opterr='failed to get metadata'
2260+
)
2261+
else:
2262+
# Simple load.
2263+
volume_metadata = self._linstor.get_volume_metadata(self.uuid)
22422264

2243-
# 2. Otherwise use the LINSTOR volume manager directly.
2244-
# It's probably a new VDI created via snapshot.
2245-
volume_metadata = self._linstor.get_volume_metadata(self.uuid)
2265+
# Set type and path.
22462266
self.vdi_type = volume_metadata.get(VDI_TYPE_TAG)
22472267
if not self.vdi_type:
22482268
raise xs_errors.XenError(
22492269
'VDIUnavailable',
22502270
opterr='failed to get vdi_type in metadata'
22512271
)
2272+
22522273
self._update_device_name(self._linstor.get_volume_name(self.uuid))
22532274

22542275
def _update_device_name(self, device_name):

drivers/cleanup.py

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import lock
4646
import blktap2
4747
import xs_errors
48+
4849
from refcounter import RefCounter
4950
from ipc import IPCFlag
5051
from lvmanager import LVActivator
@@ -54,7 +55,7 @@
5455

5556
try:
5657
from linstorjournaler import LinstorJournaler
57-
from linstorvhdutil import LinstorVhdUtil
58+
from linstorvhdutil import LinstorVhdUtil, MultiLinstorVhdUtil
5859
from linstorvolumemanager import get_controller_uri
5960
from linstorvolumemanager import LinstorVolumeManager
6061
from linstorvolumemanager import LinstorVolumeManagerError
@@ -3501,12 +3502,18 @@ def _scan(self, force):
35013502
raise util.SMException('Scan error')
35023503

35033504
def _load_vdi_info(self):
3505+
all_volume_info = self._linstor.get_volumes_with_info()
3506+
volumes_metadata = self._linstor.get_volumes_with_metadata()
3507+
35043508
all_vdi_info = {}
3509+
pending_vdi_uuids = []
35053510

3506-
# TODO: Ensure metadata contains the right info.
3511+
def handle_fail(vdi_uuid, e):
3512+
Util.log(f" [VDI {vdi_uuid}: failed to load VDI info]: {e}")
3513+
info = vhdutil.VHDInfo(vdi_uuid)
3514+
info.error = 1
3515+
return info
35073516

3508-
all_volume_info = self._linstor.get_volumes_with_info()
3509-
volumes_metadata = self._linstor.get_volumes_with_metadata()
35103517
for vdi_uuid, volume_info in all_volume_info.items():
35113518
try:
35123519
volume_metadata = volumes_metadata[vdi_uuid]
@@ -3534,34 +3541,26 @@ def _load_vdi_info(self):
35343541
continue
35353542

35363543
vdi_type = volume_metadata.get(VDI_TYPE_TAG)
3537-
volume_name = self._linstor.get_volume_name(vdi_uuid)
3538-
if volume_name.startswith(LINSTOR_PERSISTENT_PREFIX):
3539-
# Always RAW!
3540-
info = None
3541-
elif vdi_type == vhdutil.VDI_TYPE_VHD:
3542-
info = self._vhdutil.get_vhd_info(vdi_uuid)
3544+
if vdi_type == vhdutil.VDI_TYPE_VHD:
3545+
pending_vdi_uuids.append(vdi_uuid)
35433546
else:
3544-
# Ensure it's not a VHD...
3545-
try:
3546-
info = self._vhdutil.get_vhd_info(vdi_uuid)
3547-
except:
3548-
try:
3549-
self._vhdutil.force_repair(
3550-
self._linstor.get_device_path(vdi_uuid)
3551-
)
3552-
info = self._vhdutil.get_vhd_info(vdi_uuid)
3553-
except:
3554-
info = None
3547+
all_vdi_info[vdi_uuid] = None
3548+
except Exception as e:
3549+
all_vdi_info[vdi_uuid] = handle_fail(vdi_uuid, e)
35553550

3551+
multi_vhdutil = MultiLinstorVhdUtil(self._linstor.uri, self._linstor.group_name)
3552+
3553+
def load_info(vdi_uuid, vhdutil_instance):
3554+
try:
3555+
return vhdutil_instance.get_vhd_info(vdi_uuid)
35563556
except Exception as e:
3557-
Util.log(
3558-
' [VDI {}: failed to load VDI info]: {}'
3559-
.format(vdi_uuid, e)
3560-
)
3561-
info = vhdutil.VHDInfo(vdi_uuid)
3562-
info.error = 1
3557+
return handle_fail(vdi_uuid, e)
35633558

3564-
all_vdi_info[vdi_uuid] = info
3559+
try:
3560+
for vdiInfo in multi_vhdutil.run(load_info, pending_vdi_uuids):
3561+
all_vdi_info[vdiInfo.uuid] = vdiInfo
3562+
finally:
3563+
del multi_vhdutil
35653564

35663565
return all_vdi_info
35673566

drivers/linstorvhdutil.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
from linstorjournaler import LinstorJournaler
2020
from linstorvolumemanager import LinstorVolumeManager
21+
22+
from concurrent.futures import ThreadPoolExecutor
2123
import base64
2224
import errno
2325
import json
2426
import socket
27+
import threading
2528
import time
2629
import util
2730
import vhdutil
@@ -597,3 +600,72 @@ def _zeroize(path, size):
597600
'EIO',
598601
opterr='Failed to zero out VHD footer {}'.format(path)
599602
)
603+
604+
class MultiLinstorVhdUtil:
605+
class ExecutorData(threading.local):
606+
def __init__(self):
607+
self._clear()
608+
609+
def _clear(self):
610+
self.session = None
611+
self.linstor = None
612+
self.vhdutil = None
613+
614+
class Load:
615+
def __init__(self, session):
616+
self.session = session
617+
618+
def cleanup(self):
619+
if self.session:
620+
self.session.xenapi.session.logout()
621+
self.session = None
622+
623+
def __init__(self, uri, group_name) -> None:
624+
self._uri = uri
625+
self._group_name = group_name
626+
self._loads: self.Load = []
627+
self._executor_data = self.ExecutorData()
628+
629+
def __del__(self):
630+
self._cleanup()
631+
632+
def run(self, cb, vdi_uuids):
633+
def wrapper(cb, vdi_uuid):
634+
if not self._executor_data.session:
635+
self._init_executor_thread()
636+
return cb(vdi_uuid, self._executor_data.vhdutil)
637+
638+
with ThreadPoolExecutor() as executor:
639+
return executor.map(lambda vdi_uuid: wrapper(cb, vdi_uuid), vdi_uuids)
640+
641+
@property
642+
def local_vhdutil(self):
643+
return self._executor_data.vhdutil
644+
645+
def _init_executor_thread(self):
646+
session = util.get_localAPI_session()
647+
load = self.Load(session)
648+
try:
649+
linstor = LinstorVolumeManager(
650+
self._uri,
651+
self._group_name,
652+
repair=False,
653+
logger=util.SMlog
654+
)
655+
self._executor_data.linstor = linstor
656+
self._executor_data.vhdutil = LinstorVhdUtil(session, linstor)
657+
self._executor_data.session = session
658+
except:
659+
self._executor_data.clear()
660+
load.cleanup()
661+
raise
662+
663+
self._loads.append(load)
664+
665+
def _cleanup(self):
666+
for load in self._loads:
667+
try:
668+
load.cleanup()
669+
except Exception as e:
670+
util.SMlog(f"Failed to clean load executor: {e}")
671+
self._loads.clear()

drivers/linstorvolumemanager.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,7 @@ def get_all_volume_openers(resource_name, volume):
9090
volume = str(volume)
9191
openers = {}
9292

93-
# Make sure this call never stucks because this function can be called
94-
# during HA init and in this case we can wait forever.
95-
session = util.timeout_call(10, util.get_localAPI_session)
93+
session = util.get_localAPI_session()
9694

9795
hosts = session.xenapi.host.get_all_records()
9896
for host_ref, host_record in hosts.items():
@@ -257,7 +255,7 @@ class LinstorVolumeManager(object):
257255
"""
258256

259257
__slots__ = (
260-
'_linstor', '_logger', '_redundancy',
258+
'_linstor', '_uri', '_logger', '_redundancy',
261259
'_base_group_name', '_group_name', '_ha_group_name',
262260
'_volumes', '_storage_pools', '_storage_pools_time',
263261
'_kv_cache', '_resource_cache', '_volume_info_cache',
@@ -359,6 +357,7 @@ def __init__(
359357
:param int attempt_count: Number of attempts to join the controller.
360358
"""
361359

360+
self._uri = uri
362361
self._linstor = self._create_linstor_instance(
363362
uri, attempt_count=attempt_count
364363
)
@@ -402,6 +401,10 @@ def __init__(
402401
self._resources_info_cache = None
403402
self._build_volumes(repair=repair)
404403

404+
@property
405+
def uri(self) -> str:
406+
return self._uri
407+
405408
@property
406409
def group_name(self):
407410
"""

0 commit comments

Comments
 (0)