@@ -17,275 +17,16 @@ import (
17
17
"fmt"
18
18
"net/http"
19
19
"path/filepath"
20
- "time"
21
20
22
21
"configcenter/src/common"
23
22
"configcenter/src/common/blog"
24
- "configcenter/src/common/mapstr"
25
23
"configcenter/src/common/metadata"
26
24
"configcenter/src/common/types"
27
25
"configcenter/src/common/util"
28
- "configcenter/src/common/version"
29
- "configcenter/src/common/watch"
30
- "configcenter/src/scene_server/admin_server/upgrader"
31
- "configcenter/src/source_controller/cacheservice/event"
32
- daltypes "configcenter/src/storage/dal/types"
33
- streamtypes "configcenter/src/storage/stream/types"
34
26
35
27
"github.com/emicklei/go-restful/v3"
36
- "go.mongodb.org/mongo-driver/bson"
37
28
)
38
29
39
- func (s * Service ) migrate (req * restful.Request , resp * restful.Response ) {
40
- rHeader := req .Request .Header
41
- rid := util .GetHTTPCCRequestID (rHeader )
42
- defErr := s .CCErr .CreateDefaultCCErrorIf (util .GetLanguage (rHeader ))
43
- ownerID := common .BKDefaultOwnerID
44
- updateCfg := & upgrader.Config {
45
- OwnerID : ownerID ,
46
- User : common .CCSystemOperatorUserName ,
47
- }
48
-
49
- if err := s .createWatchDBChainCollections (rid ); err != nil {
50
- blog .Errorf ("create watch db chain collections failed, err: %v, rid: %s" , err , rid )
51
- result := & metadata.RespError {
52
- Msg : defErr .Errorf (common .CCErrCommMigrateFailed , err .Error ()),
53
- }
54
- resp .WriteError (http .StatusInternalServerError , result )
55
- return
56
- }
57
-
58
- preVersion , finishedVersions , err := upgrader .Upgrade (s .ctx , s .db , s .cache , s .iam , updateCfg )
59
- if err != nil {
60
- blog .Errorf ("db upgrade failed, err: %+v, rid: %s" , err , rid )
61
- result := & metadata.RespError {
62
- Msg : defErr .Errorf (common .CCErrCommMigrateFailed , err .Error ()),
63
- }
64
- resp .WriteError (http .StatusInternalServerError , result )
65
- return
66
- }
67
-
68
- currentVersion := preVersion
69
- if len (finishedVersions ) > 0 {
70
- currentVersion = finishedVersions [len (finishedVersions )- 1 ]
71
- }
72
-
73
- result := MigrationResponse {
74
- BaseResp : metadata.BaseResp {
75
- Result : true ,
76
- Code : 0 ,
77
- ErrMsg : "" ,
78
- Permissions : nil ,
79
- },
80
- Data : "migrate success" ,
81
- PreVersion : preVersion ,
82
- CurrentVersion : currentVersion ,
83
- FinishedVersions : finishedVersions ,
84
- }
85
- resp .WriteEntity (result )
86
- }
87
-
88
- // dbChainTTLTime the ttl time seconds of the db event chain, used to set the ttl index of mongodb
89
- const dbChainTTLTime = 5 * 24 * 60 * 60
90
-
91
- func (s * Service ) createWatchDBChainCollections (rid string ) error {
92
- // create watch token table to store the last watch token info for every collections
93
- exists , err := s .watchDB .HasTable (s .ctx , common .BKTableNameWatchToken )
94
- if err != nil {
95
- blog .Errorf ("check if table %s exists failed, err: %v, rid: %s" , common .BKTableNameWatchToken , err , rid )
96
- return err
97
- }
98
-
99
- if ! exists {
100
- err = s .watchDB .CreateTable (s .ctx , common .BKTableNameWatchToken )
101
- if err != nil && ! s .watchDB .IsDuplicatedError (err ) {
102
- blog .Errorf ("create table %s failed, err: %v, rid: %s" , common .BKTableNameWatchToken , err , rid )
103
- return err
104
- }
105
- }
106
-
107
- // create watch chain node table and init the last token info as empty for all collections
108
- cursorTypes := watch .ListCursorTypes ()
109
- for _ , cursorType := range cursorTypes {
110
- key , err := event .GetResourceKeyWithCursorType (cursorType )
111
- if err != nil {
112
- blog .Errorf ("get resource key with cursor type %s failed, err: %v, rid: %s" , cursorType , err , rid )
113
- return err
114
- }
115
-
116
- exists , err := s .watchDB .HasTable (s .ctx , key .ChainCollection ())
117
- if err != nil {
118
- blog .Errorf ("check if table %s exists failed, err: %v, rid: %s" , key .ChainCollection (), err , rid )
119
- return err
120
- }
121
-
122
- if ! exists {
123
- err = s .watchDB .CreateTable (s .ctx , key .ChainCollection ())
124
- if err != nil && ! s .watchDB .IsDuplicatedError (err ) {
125
- blog .Errorf ("create table %s failed, err: %v, rid: %s" , key .ChainCollection (), err , rid )
126
- return err
127
- }
128
- }
129
-
130
- if err = s .createWatchIndexes (cursorType , key , rid ); err != nil {
131
- return err
132
- }
133
-
134
- if err = s .createWatchToken (key ); err != nil {
135
- return err
136
- }
137
- }
138
- return nil
139
- }
140
-
141
- func (s * Service ) createWatchIndexes (cursorType watch.CursorType , key event.Key , rid string ) error {
142
- indexes := []daltypes.Index {
143
- {Name : "index_id" , Keys : bson.D {{common .BKFieldID , - 1 }}, Background : true , Unique : true },
144
- {Name : "index_cursor" , Keys : bson.D {{common .BKCursorField , - 1 }}, Background : true , Unique : true },
145
- {Name : "index_cluster_time" , Keys : bson.D {{common .BKClusterTimeField , - 1 }}, Background : true ,
146
- ExpireAfterSeconds : dbChainTTLTime },
147
- }
148
-
149
- if cursorType == watch .ObjectBase || cursorType == watch .MainlineInstance || cursorType == watch .InstAsst {
150
- subResourceIndex := daltypes.Index {
151
- Name : "index_sub_resource" , Keys : bson.D {{common .BKSubResourceField , 1 }}, Background : true ,
152
- }
153
- indexes = append (indexes , subResourceIndex )
154
- }
155
-
156
- existIndexArr , err := s .watchDB .Table (key .ChainCollection ()).Indexes (s .ctx )
157
- if err != nil {
158
- blog .Errorf ("get exist indexes for table %s failed, err: %v, rid: %s" , key .ChainCollection (), err , rid )
159
- return err
160
- }
161
-
162
- existIdxMap := make (map [string ]bool )
163
- for _ , index := range existIndexArr {
164
- existIdxMap [index .Name ] = true
165
- }
166
-
167
- for _ , index := range indexes {
168
- if _ , exist := existIdxMap [index .Name ]; exist {
169
- continue
170
- }
171
-
172
- err = s .watchDB .Table (key .ChainCollection ()).CreateIndex (s .ctx , index )
173
- if err != nil && ! s .watchDB .IsDuplicatedError (err ) {
174
- blog .Errorf ("create indexes for table %s failed, err: %v, rid: %s" , key .ChainCollection (), err , rid )
175
- return err
176
- }
177
- }
178
- return nil
179
- }
180
-
181
- func (s * Service ) createWatchToken (key event.Key ) error {
182
- filter := map [string ]interface {}{
183
- "_id" : key .Collection (),
184
- }
185
-
186
- count , err := s .watchDB .Table (common .BKTableNameWatchToken ).Find (filter ).Count (s .ctx )
187
- if err != nil {
188
- blog .Errorf ("check if last watch token exists failed, err: %v, filter: %+v" , err , filter )
189
- return err
190
- }
191
-
192
- if count > 0 {
193
- return nil
194
- }
195
-
196
- if key .Collection () == event .HostIdentityKey .Collection () {
197
- // host identity's watch token is different with other identity.
198
- // only set coll is ok, the other fields is useless
199
- data := mapstr.MapStr {
200
- "_id" : key .Collection (),
201
- common .BKTableNameBaseHost : watch.LastChainNodeData {Coll : common .BKTableNameBaseHost },
202
- common .BKTableNameModuleHostConfig : watch.LastChainNodeData {Coll : common .BKTableNameModuleHostConfig },
203
- common .BKTableNameBaseProcess : watch.LastChainNodeData {Coll : common .BKTableNameBaseProcess },
204
- }
205
- if err = s .watchDB .Table (common .BKTableNameWatchToken ).Insert (s .ctx , data ); err != nil {
206
- blog .Errorf ("init last watch token failed, err: %v, data: %+v" , err , data )
207
- return err
208
- }
209
- return nil
210
- }
211
-
212
- if key .Collection () == event .BizSetRelationKey .Collection () {
213
- // biz set relation's watch token is generated in the same way with the host identity's watch token
214
- data := mapstr.MapStr {
215
- "_id" : key .Collection (),
216
- common .BKTableNameBaseApp : watch.LastChainNodeData {Coll : common .BKTableNameBaseApp },
217
- common .BKTableNameBaseBizSet : watch.LastChainNodeData {Coll : common .BKTableNameBaseBizSet },
218
- common .BKFieldID : 0 ,
219
- common .BKTokenField : "" ,
220
- }
221
- if err = s .watchDB .Table (common .BKTableNameWatchToken ).Insert (s .ctx , data ); err != nil {
222
- blog .Errorf ("init last biz set relation watch token failed, err: %v, data: %+v" , err , data )
223
- return err
224
- }
225
- return nil
226
- }
227
-
228
- data := watch.LastChainNodeData {
229
- Coll : key .Collection (),
230
- Token : "" ,
231
- StartAtTime : streamtypes.TimeStamp {
232
- Sec : uint32 (time .Now ().Unix ()),
233
- Nano : 0 ,
234
- },
235
- }
236
- if err = s .watchDB .Table (common .BKTableNameWatchToken ).Insert (s .ctx , data ); err != nil {
237
- blog .Errorf ("init last watch token failed, err: %v, data: %+v" , err , data )
238
- return err
239
- }
240
- return nil
241
- }
242
-
243
- func (s * Service ) migrateSpecifyVersion (req * restful.Request , resp * restful.Response ) {
244
- rHeader := req .Request .Header
245
- rid := util .GetHTTPCCRequestID (rHeader )
246
- defErr := s .CCErr .CreateDefaultCCErrorIf (util .GetLanguage (rHeader ))
247
- ownerID := common .BKDefaultOwnerID
248
- updateCfg := & upgrader.Config {
249
- OwnerID : ownerID ,
250
- User : common .CCSystemOperatorUserName ,
251
- }
252
-
253
- input := new (MigrateSpecifyVersionRequest )
254
- if err := json .NewDecoder (req .Request .Body ).Decode (input ); err != nil {
255
- blog .Errorf ("migrateSpecifyVersion failed, decode body err: %v, body:%+v,rid:%s" , err , req .Request .Body , rid )
256
- _ = resp .WriteError (http .StatusOK , & metadata.RespError {Msg : defErr .Error (common .CCErrCommJSONUnmarshalFailed )})
257
- return
258
- }
259
-
260
- if input .CommitID != version .CCGitHash {
261
- _ = resp .WriteError (http .StatusOK ,
262
- & metadata.RespError {Msg : defErr .Errorf (common .CCErrCommParamsInvalid , "commit_id" )})
263
- return
264
- }
265
-
266
- err := upgrader .UpgradeSpecifyVersion (s .ctx , s .db , s .cache , s .iam , updateCfg , input .Version )
267
- if err != nil {
268
- blog .Errorf ("db upgrade specify failed, err: %+v, rid: %s" , err , rid )
269
- result := & metadata.RespError {
270
- Msg : defErr .Errorf (common .CCErrCommMigrateFailed , err .Error ()),
271
- }
272
- resp .WriteError (http .StatusInternalServerError , result )
273
- return
274
- }
275
-
276
- result := MigrationResponse {
277
- BaseResp : metadata.BaseResp {
278
- Result : true ,
279
- Code : 0 ,
280
- ErrMsg : "" ,
281
- Permissions : nil ,
282
- },
283
- Data : "migrate success. version: " + input .Version ,
284
- }
285
- resp .WriteEntity (result )
286
-
287
- }
288
-
289
30
var allConfigNames = map [string ]bool {
290
31
"redis" : true ,
291
32
"mongodb" : true ,
@@ -350,19 +91,3 @@ func (s *Service) refreshConfig(req *restful.Request, resp *restful.Response) {
350
91
blog .Infof ("refresh config success, input:%#v" , input )
351
92
resp .WriteEntity (metadata .NewSuccessResp ("refresh config success" ))
352
93
}
353
-
354
- // MigrationResponse TODO
355
- type MigrationResponse struct {
356
- metadata.BaseResp `json:",inline"`
357
- Data interface {} `json:"data"`
358
- PreVersion string `json:"pre_version"`
359
- CurrentVersion string `json:"current_version"`
360
- FinishedVersions []string `json:"finished_migrations"`
361
- }
362
-
363
- // MigrateSpecifyVersionRequest TODO
364
- type MigrateSpecifyVersionRequest struct {
365
- CommitID string `json:"commit_id"`
366
- TimeStamp int64 `json:"time_stamp"`
367
- Version string `json:"version"`
368
- }
0 commit comments