Skip to content

Commit 08f507b

Browse files
committed
Modify Node::repair method for nodetool cluster repair
Since scylladb/scylladb#22905 nodetool repair repairs only vnode keyspaces. To repair tablet keyspaces, you need to use nodetool cluster repair. Modify Node::repair method respectively. Node::repair method gets repair parameters as arguments. It creates an object of a newly added RepairOptions class to serialize the params. An old way of passing repair options may still be used to repair vnode keyspaces.
1 parent 306843b commit 08f507b

File tree

2 files changed

+190
-4
lines changed

2 files changed

+190
-4
lines changed

ccmlib/cluster.py

+7
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,13 @@ def drain(self):
616616
def repair(self):
617617
self.nodetool("repair")
618618

619+
stdout, _ = self.nodetool('help')
620+
if " cluster:" in stdout:
621+
for node in list(self.nodes.values()):
622+
if node.is_running():
623+
node.nodetool("cluster repair")
624+
break
625+
619626
def cleanup(self):
620627
self.nodetool("cleanup")
621628

ccmlib/node.py

+183-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
# ccm node
22

33

4+
from enum import Enum
45
import errno
56
import glob
67
import itertools
78
import os
9+
from pathlib import Path
810
import re
911
import shutil
1012
import signal
@@ -16,8 +18,9 @@
1618
from datetime import datetime
1719
import locale
1820
from collections import defaultdict, namedtuple
19-
from typing import TYPE_CHECKING
21+
from typing import TYPE_CHECKING, Any, Dict, Optional
2022

23+
import requests
2124
from ruamel.yaml import YAML
2225

2326
from ccmlib import common
@@ -85,6 +88,124 @@ def __decode(self, value):
8588
_active_tasks_pattern = re.compile(r'\s*([\w-]+)\s+\w+\s+(?P<ks>\w+)\s+(?P<cf>\w+)\s+\d+\s+\d+\s+\w+\s+\d+\.\d+%')
8689

8790

91+
class RepairOptions:
92+
class KeyspaceReplication(Enum):
93+
TABLETS = "tablets",
94+
VNODES = "vnodes",
95+
ALL = "all"
96+
97+
class TabletRepairOptions:
98+
def __init__(self, tablet_tokens: Optional[list[str]] = None):
99+
self.tablet_tokens = tablet_tokens
100+
101+
def get_options(self):
102+
options = []
103+
if self.tablet_tokens:
104+
options.append("--tablet-tokens")
105+
options.append(",".join(self.tablet_tokens))
106+
107+
return " ".join(options)
108+
109+
class VnodeRepairOptions:
110+
def __init__(self, dc_parallel: bool = False, end_token: Optional[int] = None, full: bool = False, local: bool = False,
111+
ignore_unreplicated_keyspaces: bool = False, job_threads: Optional[int] = None, partitioner_range: bool = False,
112+
pull: bool = False, sequential: bool = False, start_token: Optional[int] = None, trace: bool = False):
113+
self.dc_parallel = dc_parallel
114+
self.end_token = end_token
115+
self.full = full
116+
self.local = local
117+
self.ignore_unreplicated_keyspaces = ignore_unreplicated_keyspaces
118+
self.job_threads = job_threads
119+
self.partitioner_range = partitioner_range
120+
self.pull = pull
121+
self.sequential = sequential
122+
self.start_token = start_token
123+
self.trace = trace
124+
125+
def get_options(self):
126+
options = []
127+
if self.dc_parallel:
128+
options.append("--dc-parallel")
129+
130+
if self.end_token:
131+
options.append("--end-token")
132+
options.append(f"{self.end_token}")
133+
134+
if self.full:
135+
options.append("--full")
136+
137+
if self.local:
138+
options.append("--in-local-dc")
139+
140+
if self.ignore_unreplicated_keyspaces:
141+
options.append("--ignore-unreplicated-keyspaces")
142+
143+
if self.job_threads:
144+
options.append("--job-threads")
145+
options.append(f"{self.job_threads}")
146+
147+
if self.partitioner_range:
148+
options.append("--partitioner-range")
149+
150+
if self.pull:
151+
options.append("--pull")
152+
153+
if self.sequential:
154+
options.append("--sequential")
155+
156+
if self.start_token:
157+
options.append("--start-token")
158+
options.append(f"{self.start_token}")
159+
160+
if self.trace:
161+
options.append("--trace")
162+
163+
return " ".join(options)
164+
165+
def __init__(self, replication: KeyspaceReplication = KeyspaceReplication.ALL, keyspace: Optional[str] = None, tables: Optional[list[str]] = None,
166+
hosts: Optional[list[str]] = None, dcs: Optional[list[str]] = None, dc_parallel: bool = False, end_token: Optional[int] = None,
167+
full: bool = False, local: bool = False, ignore_unreplicated_keyspaces: bool = False, job_threads: Optional[int] = None,
168+
partitioner_range: bool = False, pull: bool = False, sequential: bool = False, start_token: Optional[int] = None, trace: bool = False,
169+
tablet_tokens: Optional[list[str]] = None):
170+
self.replication = replication
171+
self.keyspace = keyspace
172+
self.tables = tables
173+
self.hosts = hosts
174+
self.dcs = dcs
175+
self.vnode_repair_options = self.VnodeRepairOptions(dc_parallel, end_token, full, local, ignore_unreplicated_keyspaces,
176+
job_threads, partitioner_range, pull, sequential, start_token, trace)
177+
self.tablets_repair_options = self.TabletRepairOptions(tablet_tokens)
178+
179+
def _get_repair_options(self):
180+
options = []
181+
if self.keyspace:
182+
options.append(self.keyspace)
183+
if self.tables:
184+
options.append(" ".join(self.tables))
185+
186+
if self.hosts:
187+
options.append("--in-hosts")
188+
options.append(",".join(self.hosts))
189+
190+
if self.dcs:
191+
options.append("--in-dc")
192+
options.append(",".join(self.dcs))
193+
194+
return " ".join(options)
195+
196+
def get_tablet_repair_options(self):
197+
return self._get_repair_options() + " " + self.tablets_repair_options.get_options()
198+
199+
def get_vnode_repair_options(self):
200+
return self._get_repair_options() + " " + self.vnode_repair_options.get_options()
201+
202+
def repair_tablets(self):
203+
return self.replication in [self.KeyspaceReplication.TABLETS, self.KeyspaceReplication.ALL]
204+
205+
def repair_vnodes(self):
206+
return self.replication in [self.KeyspaceReplication.VNODES, self.KeyspaceReplication.ALL]
207+
208+
88209
class Node(object):
89210

90211
"""
@@ -1418,11 +1539,69 @@ def drain(self, block_on_log=False):
14181539
if block_on_log:
14191540
self.watch_log_for("DRAINED", from_mark=mark)
14201541

1421-
def repair(self, options=[], **kwargs):
1422-
args = ["repair"] + options
1423-
cmd = ' '.join(args)
1542+
@property
1543+
def scylla_yaml(self) -> Dict[str, Any]:
1544+
return YAML().load(Path(self.get_conf_dir()) / common.SCYLLA_CONF)
1545+
1546+
@property
1547+
def api_port(self) -> int:
1548+
return self.scylla_yaml.get('api_port', 10000)
1549+
1550+
def cluster_command_available(self):
1551+
stdout, _ = self.nodetool('help')
1552+
return " cluster " in stdout
1553+
1554+
def run_repair(self, options, **kwargs):
1555+
cmd = "repair " + options
1556+
return self.nodetool(cmd, **kwargs)
1557+
1558+
def run_cluster_repair(self, options, **kwargs):
1559+
cmd = "cluster repair " + options
14241560
return self.nodetool(cmd, **kwargs)
14251561

1562+
def repair(self, options=[], replication: RepairOptions.KeyspaceReplication = RepairOptions.KeyspaceReplication.ALL, keyspace: Optional[str] = None,
1563+
tables: Optional[list[str]] = None, hosts: Optional[list[str]] = None, dcs: Optional[list[str]] = None, dc_parallel: bool = False,
1564+
end_token: Optional[int] = None, full: bool = False, local: bool = False, ignore_unreplicated_keyspaces: bool = False,
1565+
job_threads: Optional[int] = None, partitioner_range: bool = False, pull: bool = False, sequential: bool = False, start_token: Optional[int] = None,
1566+
trace: bool = False, tablet_tokens: Optional[list[str]] = None, **kwargs):
1567+
if not options:
1568+
repair_options = RepairOptions(replication, keyspace, tables, hosts, dcs, dc_parallel, end_token, full, local, ignore_unreplicated_keyspaces,
1569+
job_threads, partitioner_range, pull, sequential, start_token, trace, tablet_tokens)
1570+
outs_and_errs = []
1571+
if repair_options.keyspace:
1572+
url = f"http://{self.address()}:{self.api_port}/storage_service/keyspaces"
1573+
resp = requests.get(url=url, params={"replication": "vnodes"})
1574+
1575+
if self.cluster_command_available() and resp.status_code == requests.codes.ok:
1576+
if repair_options.keyspace in resp.json():
1577+
res = self.run_repair(repair_options.get_vnode_repair_options(), **kwargs)
1578+
outs_and_errs.append(res)
1579+
else:
1580+
res = self.run_cluster_repair(repair_options.get_tablet_repair_options(), **kwargs)
1581+
outs_and_errs.append(res)
1582+
else:
1583+
res = self.run_repair(repair_options.get_vnode_repair_options(), **kwargs)
1584+
outs_and_errs.append(res)
1585+
else:
1586+
if self.cluster_command_available():
1587+
if repair_options.repair_vnodes():
1588+
res = self.run_repair(repair_options.get_vnode_repair_options(), **kwargs)
1589+
outs_and_errs.append(res)
1590+
1591+
if repair_options.repair_tablets() and self.cluster_command_available():
1592+
res = self.run_cluster_repair(repair_options.get_tablet_repair_options(), **kwargs)
1593+
outs_and_errs.append(res)
1594+
else:
1595+
res = self.run_repair(repair_options.get_vnode_repair_options(), **kwargs)
1596+
outs_and_errs.append(res)
1597+
1598+
outs, errs = zip(*outs_and_errs)
1599+
return outs, errs
1600+
else:
1601+
args = ["repair"] + options
1602+
cmd = ' '.join(args)
1603+
return self.nodetool(cmd, **kwargs)
1604+
14261605
def move(self, new_token):
14271606
self.nodetool("move " + str(new_token))
14281607

0 commit comments

Comments
 (0)