Skip to content

feat(mysql): spider替换流程 #10105 #10345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions dbm-ui/backend/db_meta/api/cluster/tendbcluster/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from backend.flow.utils.cc_manage import CcManage
from backend.flow.utils.mysql.mysql_module_operate import MysqlCCTopoOperator
from backend.flow.utils.spider.spider_act_dataclass import ShardInfo
from backend.flow.utils.spider.spider_bk_config import get_spider_version_and_charset


class TenDBClusterClusterHandler(ClusterHandler):
Expand Down Expand Up @@ -172,26 +173,30 @@ def add_spiders(
cls,
cluster_id: int,
creator: str,
spider_version: str,
add_spiders: list,
spider_role: Optional[TenDBClusterSpiderRole],
resource_spec: dict,
is_slave_cluster_create: bool,
new_slave_domain: str = None,
new_db_module_id: int = 0,
):
"""
对已有的集群添加spider的元信息
因为从集群添加的行为spider-slave扩容行为基本类似,所以这里作为一个公共方法,对域名处理根据不同单据类型做不同的处理
@param cluster_id: 待关联的集群id
@param creator: 提单的用户名称
@param spider_version: 待加入的spider版本号(包括小版本信息)
@param add_spiders: 待加入的spider机器信息
@param spider_role: 待加入spider的角色
@param resource_spec: 待加入spider的规格
@param is_slave_cluster_create: 代表这次是否是添加从集群
@param new_slave_domain: 如果是添加从集群场景,这里代表新的从域名信息
@param new_db_module_id: new_db_module_id代表新的模块ID,默认为0,代表延用cluster信息的模块ID做依据
"""
cluster = Cluster.objects.get(id=cluster_id)
db_module_id = new_db_module_id if new_db_module_id else cluster.db_module_id
spider_charset, spider_version = get_spider_version_and_charset(
bk_biz_id=cluster.bk_biz_id, db_module_id=db_module_id
)

# 录入机器
machines = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def mysql_proxy_upgrade_package(self, pkg_id: str) -> list:
f"{env.BKREPO_PROJECT}/{env.BKREPO_BUCKET}/{proxy_pkg.path}",
]

def spider_upgrade_package(self, pkg_id: str) -> list:
def spider_upgrade_package(self, pkg_id: int) -> list:
"""
spider 升级需要的安装包列表
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
CtlSwitchToSlaveKwargs,
DropSpiderRoutingKwargs,
)
from backend.flow.utils.spider.spider_bk_config import get_spider_version_and_charset
from backend.flow.utils.spider.spider_db_meta import SpiderDBMeta

"""
Expand All @@ -72,6 +73,8 @@ def add_spider_slaves_sub_flow(
parent_global_data: dict,
is_clone_user: bool = True,
slave_domain: str = None,
new_pkg_id: int = 0,
new_db_module_id: int = 0,
):
"""
定义对原有的TenDB cluster集群添加spider slave节点的公共子流程
Expand All @@ -82,7 +85,9 @@ def add_spider_slaves_sub_flow(
@param root_id: flow流程的root_id
@param parent_global_data: 本次子流程的对应上层流程的全局只读上下文
@param uid: 单据id
@param is_clone_user 是否克隆权限
@param is_clone_user 是否克隆权限, 区分一些单据场景。
@param new_pkg_id 如果是做升级部署,需要传新版本的介质包,默认为0,表示不升级部署
@param new_db_module_id 如果是做升级部署,需要传新的DB模块ID,默认为0,表示不升级部署
"""
tdbctl_pass = get_random_string(length=10)

Expand All @@ -99,7 +104,15 @@ def add_spider_slaves_sub_flow(
)[0]

parent_global_data["spider_ports"] = [tmp_spider.port]
parent_global_data["db_module_id"] = cluster.db_module_id
# 获取版本和字符集信息
parent_global_data["db_module_id"] = new_db_module_id if new_db_module_id else cluster.db_module_id
parent_global_data["spider_charset"], parent_global_data["spider_version"] = get_spider_version_and_charset(
bk_biz_id=cluster.bk_biz_id, db_module_id=parent_global_data["db_module_id"]
)
# spider slave 不安装备份程序,只解压
parent_global_data["untar_only"] = True

# 声明子流程
sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data)

# 拼接执行原子任务活动节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题
Expand All @@ -125,20 +138,21 @@ def add_spider_slaves_sub_flow(
)

# 阶段1 下发spider安装介质包
if parent_global_data.get("pkg_id"):
pkg_id = parent_global_data["pkg_id"]
if new_pkg_id:
# 代表升级部署,根据新的pkg_id下发介质包
sub_pipeline.add_act(
act_name=_("下发spider安装介质"),
act_component_code=TransFileComponent.code,
kwargs=asdict(
DownloadMediaKwargs(
bk_cloud_id=cluster.bk_cloud_id,
exec_ip=[ip_info["ip"] for ip_info in add_spider_slaves],
file_list=GetFileList(db_type=DBType.MySQL).spider_upgrade_package(pkg_id=pkg_id),
file_list=GetFileList(db_type=DBType.MySQL).spider_upgrade_package(pkg_id=new_pkg_id),
)
),
)
else:
# 根据继承的版本名称,或者介质包
sub_pipeline.add_act(
act_name=_("下发spider安装介质"),
act_component_code=TransFileComponent.code,
Expand Down Expand Up @@ -260,6 +274,8 @@ def add_spider_masters_sub_flow(
uid: str,
parent_global_data: dict,
is_add_spider_mnt: bool,
new_pkg_id: int = 0,
new_db_module_id: int = 0,
):
"""
定义对原有的TenDB cluster集群添加spider master节点的公共子流程
Expand All @@ -271,14 +287,24 @@ def add_spider_masters_sub_flow(
@param parent_global_data: 本次子流程的对应上层流程的全局只读上下文
@param is_add_spider_mnt: 表示这次添加spider 运维节点,如果是则True,不是则False
@param uid: 单据uid
@param new_pkg_id 如果是做升级部署,需要传新版本的介质包,默认为0,表示不升级部署
@param new_db_module_id 如果是做升级部署,需要传新的DB模块ID,默认为0,表示不升级部署
"""
tag = "mnt"
tdbctl_pass = get_random_string(length=10)

# 获取到集群对应的spider端口,作为这次的安装
parent_global_data["spider_ports"] = [cluster.proxyinstance_set.first().port]
parent_global_data["ctl_port"] = cluster.proxyinstance_set.first().admin_port
parent_global_data["db_module_id"] = cluster.db_module_id

# 获取版本和字符集信息
parent_global_data["db_module_id"] = new_db_module_id if new_db_module_id else cluster.db_module_id
parent_global_data["spider_charset"], parent_global_data["spider_version"] = get_spider_version_and_charset(
bk_biz_id=cluster.bk_biz_id, db_module_id=parent_global_data["db_module_id"]
)
parent_global_data["ctl_charset"] = parent_global_data["spider_charset"]

# 声明子流程
sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data)

# 拼接执行原子任务活动节点需要的通用的私有参数结构体, 减少代码重复率,但引用时注意内部参数值传递的问题
Expand All @@ -303,16 +329,15 @@ def add_spider_masters_sub_flow(
)
)
# 阶段1 下发spider安装介质包
if parent_global_data.get("pkg_id"):
pkg_id = parent_global_data["pkg_id"]
if new_pkg_id:
sub_pipeline.add_act(
act_name=_("下发spider安装介质"),
act_component_code=TransFileComponent.code,
kwargs=asdict(
DownloadMediaKwargs(
bk_cloud_id=cluster.bk_cloud_id,
exec_ip=[ip_info["ip"] for ip_info in add_spider_masters],
file_list=GetFileList(db_type=DBType.MySQL).spider_upgrade_package(pkg_id=pkg_id),
file_list=GetFileList(db_type=DBType.MySQL).spider_upgrade_package(pkg_id=new_pkg_id),
)
),
)
Expand Down Expand Up @@ -344,17 +369,11 @@ def add_spider_masters_sub_flow(
acts_list = []
for spider in get_spider_master_incr(cluster, add_spider_masters):
exec_act_kwargs.exec_ip = spider["ip"]
if parent_global_data.get("pkg_id"):
exec_act_kwargs.cluster = {
"immutable_domain": cluster.immute_domain,
"auto_incr_value": spider["incr_number"],
"pkg_id": parent_global_data["pkg_id"],
}
else:
exec_act_kwargs.cluster = {
"immutable_domain": cluster.immute_domain,
"auto_incr_value": spider["incr_number"],
}
exec_act_kwargs.cluster = {
"immutable_domain": cluster.immute_domain,
"auto_incr_value": spider["incr_number"],
"pkg_id": new_pkg_id,
}
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.get_install_spider_payload.__name__
acts_list.append(
{
Expand Down Expand Up @@ -797,21 +816,14 @@ def reduce_ctls_routing(root_id: str, parent_global_data: dict, cluster: Cluster

if reduce_ctl_primary:
# 选择新节点作为primary,过滤待回收的节点
all_ctl = cluster.proxyinstance_set.filter(
tendbclusterspiderext__spider_role=TenDBClusterSpiderRole.SPIDER_MASTER
)

# 因为ctl集群是采用GTID+半同步数据同步,所以理论上选择任意一个从节点作为主,数据不会丢失
new_ctl_primary = all_ctl.exclude(machine__ip__in=[ip_info.machine.ip for ip_info in reduce_ctls]).first()

sub_pipeline.add_act(
act_name=_("切换ctl中控集群"),
act_component_code=CtlSwitchToSlaveComponent.code,
kwargs=asdict(
CtlSwitchToSlaveKwargs(
cluster_id=cluster.id,
reduce_ctl_primary=reduce_ctl_primary,
new_ctl_primary=f"{new_ctl_primary.machine.ip}{IP_PORT_DIVIDER}{new_ctl_primary.admin_port}",
reduce_ctl_secondary_list=reduce_ctl_secondary_list,
)
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from backend.flow.plugins.components.collections.spider.spider_db_meta import SpiderDBMetaComponent
from backend.flow.utils.mysql.mysql_act_dataclass import DBMetaOPKwargs
from backend.flow.utils.mysql.mysql_context_dataclass import SystemInfoContext
from backend.flow.utils.spider.spider_bk_config import get_spider_version_and_charset
from backend.flow.utils.spider.spider_db_meta import SpiderDBMeta

logger = logging.getLogger("flow")
Expand Down Expand Up @@ -67,14 +66,7 @@ def add_spider_mnt(self):
raise ClusterNotExistException(
cluster_id=info["cluster_id"], bk_biz_id=int(self.data["bk_biz_id"]), message=_("集群不存在")
)
# 通过bk—config获取版本号和字符集信息
# 获取的是业务默认配置,不一定是集群当前配置
spider_charset, spider_version = get_spider_version_and_charset(
bk_biz_id=cluster.bk_biz_id, db_module_id=cluster.db_module_id
)
# 补充这次单据需要的隐形参数,spider版本以及字符集
sub_flow_context["spider_charset"] = spider_charset
sub_flow_context["spider_version"] = spider_version

# 启动子流程
sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context))
# 阶段1 根据场景执行添加spider-mnt子流程
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from backend.flow.plugins.components.collections.spider.spider_db_meta import SpiderDBMetaComponent
from backend.flow.utils.mysql.mysql_act_dataclass import DBMetaOPKwargs
from backend.flow.utils.mysql.mysql_context_dataclass import SystemInfoContext
from backend.flow.utils.spider.spider_bk_config import get_spider_version_and_charset
from backend.flow.utils.spider.spider_db_meta import SpiderDBMeta

logger = logging.getLogger("flow")
Expand Down Expand Up @@ -57,53 +56,67 @@ def add_spider_nodes(self):
pipeline = Builder(root_id=self.root_id, data=self.data)
sub_pipelines = []
for info in self.data["infos"]:
# 拼接子流程需要全局参数
sub_flow_context = copy.deepcopy(self.data)
sub_flow_context.pop("infos")

# 获取对应集群相关对象
try:
cluster = Cluster.objects.get(id=info["cluster_id"], bk_biz_id=int(self.data["bk_biz_id"]))
except Cluster.DoesNotExist:
raise ClusterNotExistException(
cluster_id=info["cluster_id"], bk_biz_id=int(self.data["bk_biz_id"]), message=_("集群不存在")
sub_pipelines.append(
self.add_spider_nodes_with_cluster(
cluster_id=info["cluster_id"],
add_spider_role=info["add_spider_role"],
add_spider_hosts=info["spider_ip_list"],
)

# 根据集群去bk-config获取对应spider版本和字符集
spider_charset, spider_version = get_spider_version_and_charset(
bk_biz_id=cluster.bk_biz_id, db_module_id=cluster.db_module_id
)

# 拼接子流程的全局参数
sub_flow_context.update(info)
pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)
pipeline.run_pipeline(init_trans_data_class=SystemInfoContext())

# 补充这次单据需要的隐形参数,spider版本以及字符集
sub_flow_context["spider_charset"] = spider_charset
sub_flow_context["spider_version"] = spider_version
def add_spider_nodes_with_cluster(
self,
cluster_id: int,
add_spider_role: TenDBClusterSpiderRole,
add_spider_hosts: list,
new_db_module_id: int = 0,
new_pkg_id: int = 0,
):
"""
定义添加节点的子流程
"""

if info["add_spider_role"] == TenDBClusterSpiderRole.SPIDER_MASTER:
# 获取对应集群相关对象
try:
cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=int(self.data["bk_biz_id"]))
except Cluster.DoesNotExist:
raise ClusterNotExistException(
cluster_id=cluster_id, bk_biz_id=int(self.data["bk_biz_id"]), message=_("集群不存在")
)

# 加入spider-master 子流程
sub_flow_context["ctl_charset"] = spider_charset
sub_pipelines.append(self.add_spider_master_notes(sub_flow_context, cluster))
# 补充这次单据需要的隐形参数,spider版本以及字符集
sub_flow_context = {
"uid": self.data["uid"],
"bk_biz_id": cluster.bk_biz_id,
"cluster_id": cluster.id,
"created_by": self.data["created_by"],
"ticket_type": self.data["ticket_type"],
"spider_ip_list": add_spider_hosts,
"new_db_module_id": new_db_module_id,
}

elif info["add_spider_role"] == TenDBClusterSpiderRole.SPIDER_SLAVE:
if add_spider_role == TenDBClusterSpiderRole.SPIDER_MASTER:

# 加入spider-slave 子流程
sub_pipelines.append(self.add_spider_slave_notes(sub_flow_context, cluster))
# 加入spider-master 子流程
return self.add_spider_master_notes(sub_flow_context, cluster, new_db_module_id, new_pkg_id)

else:
# 理论上不会出现,出现就中断这次流程构造
raise NormalSpiderFlowException(
message=_("[{}]This type of role addition is not supported".format(info["add_spider_role"]))
)
if not sub_pipelines:
raise NormalSpiderFlowException(message=_("build spider-add-nodes-pipeline failed"))
elif add_spider_role == TenDBClusterSpiderRole.SPIDER_SLAVE:

pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)
pipeline.run_pipeline(init_trans_data_class=SystemInfoContext())
# 加入spider-slave 子流程
return self.add_spider_slave_notes(sub_flow_context, cluster, new_db_module_id, new_pkg_id)

else:
# 理论上不会出现,出现就中断这次流程构造
raise NormalSpiderFlowException(
message=_("[{}]This type of role addition is not supported".format(add_spider_role))
)

def add_spider_master_notes(self, sub_flow_context: dict, cluster: Cluster):
def add_spider_master_notes(
self, sub_flow_context: dict, cluster: Cluster, new_db_module_id: int = 0, new_pkg_id: int = 0
):
"""
定义spider master集群部署子流程
目前产品形态 spider专属一套集群,所以流程只支持spider单机单实例安装
Expand All @@ -121,6 +134,8 @@ def add_spider_master_notes(self, sub_flow_context: dict, cluster: Cluster):
uid=sub_flow_context["uid"],
parent_global_data=sub_flow_context,
is_add_spider_mnt=False,
new_db_module_id=new_db_module_id,
new_pkg_id=new_pkg_id,
)
)

Expand All @@ -144,13 +159,13 @@ def add_spider_master_notes(self, sub_flow_context: dict, cluster: Cluster):
)
return sub_pipeline.build_sub_process(sub_name=_("[{}]添加spider-master节点流程".format(cluster.name)))

def add_spider_slave_notes(self, sub_flow_context: dict, cluster: Cluster):
def add_spider_slave_notes(
self, sub_flow_context: dict, cluster: Cluster, new_db_module_id: int = 0, new_pkg_id: int = 0
):
"""
添加spider-slave节点的子流程流程逻辑
必须集群存在从集群,才能添加
"""
# spider slave 不安装备份程序,只解压
sub_flow_context["untar_only"] = True

sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context))

Expand All @@ -162,6 +177,8 @@ def add_spider_slave_notes(self, sub_flow_context: dict, cluster: Cluster):
root_id=self.root_id,
uid=sub_flow_context["uid"],
parent_global_data=copy.deepcopy(sub_flow_context),
new_db_module_id=new_db_module_id,
new_pkg_id=new_pkg_id,
)
)
# 阶段2 变更db_meta数据
Expand Down
Loading