@@ -3,7 +3,6 @@ package kafka
3
3
import (
4
4
"fmt"
5
5
"os"
6
- "strings"
7
6
"time"
8
7
9
8
"dbm-services/bigdata/db-tools/dbactuator/pkg/components"
@@ -45,9 +44,14 @@ func (d *DecomBrokerComp) Init() (err error) {
45
44
// DoReplaceBrokers TODO
46
45
func (d * DecomBrokerComp ) DoReplaceBrokers () (err error ) {
47
46
48
- const SleepInterval = 300 * time .Second
47
+ // 获取zk的地址
48
+ zkHost , zkPath , err := kafkautil .GetZookeeperConnect (cst .KafkaConfigFile )
49
+ logger .Info ("zkHost,zkPath: %s, %s" , zkHost , zkPath )
50
+ if err != nil {
51
+ logger .Error ("Cant get zookeeper.connect: %s" , err )
52
+ return err
53
+ }
49
54
50
- zkHost := d .Params .ZookeeperIP + ":2181"
51
55
oldBrokers := d .Params .ExcludeBrokers
52
56
newBrokers := d .Params .NewBrokers
53
57
@@ -69,53 +73,50 @@ func (d *DecomBrokerComp) DoReplaceBrokers() (err error) {
69
73
}
70
74
logger .Info ("newBrokerIds: %v" , newBrokerIds )
71
75
72
- for i , broker := range oldBrokers {
73
- oldBrokerID , err := kafkautil . GetBrokerIDByHost ( conn , broker )
74
- logger . Info ( "oldBrokerId: [%s]" , oldBrokerID )
76
+ var oldBrokerIds [] string
77
+ for _ , broker := range oldBrokers {
78
+ id , err := kafkautil . GetBrokerIDByHost ( conn , broker )
75
79
if err != nil {
76
80
logger .Error ("cant get %s broker id, %v" , broker , err )
77
81
return err
78
82
}
79
- topicJSON , err := kafkautil .GenReplaceReassignmentJSON (oldBrokerID , newBrokerIds [i ], zkHost )
80
- if err != nil {
81
- logger .Error ("GenReassignmentJson failed" , err )
82
- return err
83
- }
84
- logger .Info ("topicJson, %s" , topicJSON )
85
- // /data/kafkaenv/host.json
86
- jsonFile := fmt .Sprintf ("%s/%s.json" , cst .DefaultKafkaEnv , broker )
87
- logger .Info ("jsonfile: %s" , jsonFile )
88
- if err = os .WriteFile (jsonFile , []byte (topicJSON ), 0644 ); err != nil {
89
- logger .Error ("write %s failed, %v" , jsonFile , err )
90
- return err
91
- }
92
- if ! strings .Contains (topicJSON , "topic" ) {
93
- logger .Info ("无需搬迁数据" )
94
- continue
95
- }
96
- // do
97
- if err = kafkautil .DoReassignPartitions (zkHost , jsonFile ); err != nil {
98
- logger .Error ("DoReassignPartitions failed, %v" , err )
99
- return err
100
- }
101
- for {
102
-
103
- out , err := kafkautil .CheckReassignPartitions (zkHost , jsonFile )
104
- if err != nil {
105
- logger .Error ("CheckReassignPartitions failed %v" , err )
106
- return err
107
- }
83
+ oldBrokerIds = append (oldBrokerIds , id )
84
+ }
85
+ logger .Info ("oldBrokerIds: %v" , oldBrokerIds )
108
86
109
- if len (out ) == 0 {
110
- logger .Info ("数据搬迁完毕" )
111
- break
112
- }
87
+ // 获取主题并写入 JSON 文件
88
+ b , err := kafkautil .WriteTopicJSON (zkHost )
89
+ if err != nil {
90
+ return err
91
+ }
92
+ if len (string (b )) == 0 {
93
+ logger .Error ("topic is empty, please check" )
94
+ return
95
+ }
96
+ logger .Info ("Creating topic.json file" )
97
+ topicJSONFile := fmt .Sprintf ("%s/topic.json" , cst .DefaultKafkaEnv )
98
+ if err = os .WriteFile (topicJSONFile , b , 0644 ); err != nil {
99
+ logger .Error ("write %s failed, %s" , topicJSONFile , err )
100
+ return err
101
+ }
113
102
114
- time .Sleep (SleepInterval )
115
- }
116
- logger .Info ("broker [%s] 搬迁 finished" , broker )
103
+ // 生成分区副本重分配的计划并写入 JSON 文件
104
+ logger .Info ("Creating plan.json file" )
105
+ err = kafkautil .GenReplaceReassignmentJSON (conn , zkHost , oldBrokerIds , newBrokerIds )
106
+ if err != nil {
107
+ logger .Error ("Create plan.json failed %s" , err )
108
+ return err
109
+ }
117
110
111
+ // 执行分区副本重分配
112
+ logger .Info ("Execute the plan" )
113
+ planJSONFile := cst .PlanJSONFile
114
+ err = kafkautil .DoReassignPartitions (zkHost , planJSONFile )
115
+ if err != nil {
116
+ logger .Error ("Execute partitions reassignment failed %s" , err )
117
+ return err
118
118
}
119
+ logger .Info ("Execute partitions reassignment end" )
119
120
120
121
return nil
121
122
}
@@ -185,8 +186,8 @@ func (d *DecomBrokerComp) DoDecomBrokers() (err error) {
185
186
// DoPartitionCheck 检查Kafka分区搬迁的状态。
186
187
// 这个过程会重复检查搬迁状态,直到所有分区都成功搬迁或达到最大重试次数。
187
188
func (d * DecomBrokerComp ) DoPartitionCheck () (err error ) {
188
- // 定义最大重试次数为288次
189
- const MaxRetry = 288
189
+ // 定义最大重试次数为864次
190
+ const MaxRetry = 864
190
191
count := 0 // 初始化计数器
191
192
zkHost := d .Params .ZookeeperIP + ":2181" // 构建Zookeeper的连接字符串
192
193
jsonFile := cst .PlanJSONFile // 搬迁计划文件
@@ -223,8 +224,8 @@ func (d *DecomBrokerComp) DoPartitionCheck() (err error) {
223
224
logger .Error ("检查数据搬迁超时,可以选择重试" )
224
225
return fmt .Errorf ("检查扩容状态超时,可以选择重试" )
225
226
}
226
- // 等待5分钟后再次检查
227
- time .Sleep (300 * time .Second )
227
+ // 等待100秒后再次检查
228
+ time .Sleep (100 * time .Second )
228
229
}
229
230
230
231
// 搬迁完成后的日志信息
@@ -255,7 +256,7 @@ func (d *DecomBrokerComp) DoEmptyCheck() (err error) {
255
256
return err
256
257
}
257
258
if ! empty {
258
- errMsg := fmt .Errorf ("The broker is not empty. " )
259
+ errMsg := fmt .Errorf ("the broker is not empty" )
259
260
return errMsg
260
261
}
261
262
return nil
0 commit comments