-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathddl_event.go
More file actions
615 lines (532 loc) · 19 KB
/
ddl_event.go
File metadata and controls
615 lines (532 loc) · 19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"go.uber.org/zap"
)
const (
DDLEventVersion1 = 1
)
var _ Event = &DDLEvent{}
type DDLEvent struct {
// Version is the version of the DDLEvent struct.
Version int `json:"version"`
DispatcherID common.DispatcherID `json:"-"`
// Type is the type of the DDL.
Type byte `json:"type"`
// SchemaID is from upstream job.SchemaID
SchemaID int64 `json:"schema_id"`
SchemaName string `json:"schema_name"`
TableName string `json:"table_name"`
// TargetSchemaName and TargetTableName carry routed names for sink output paths.
// They are runtime-only fields and are not serialized.
TargetSchemaName string `json:"-"`
TargetTableName string `json:"-"`
// the following two fields are just used for RenameTable,
// they are the old schema/table name of the table
ExtraSchemaName string `json:"extra_schema_name"`
ExtraTableName string `json:"extra_table_name"`
// TargetExtraSchemaName and TargetExtraTableName carry routed old names for rename DDLs.
// They are runtime-only fields and are not serialized.
TargetExtraSchemaName string `json:"-"`
TargetExtraTableName string `json:"-"`
Query string `json:"query"`
TableInfo *common.TableInfo `json:"-"`
StartTs uint64 `json:"start_ts"`
FinishedTs uint64 `json:"finished_ts"`
// The seq of the event. It is set by event service.
Seq uint64 `json:"seq"`
// The epoch of the event. It is set by event service.
Epoch uint64 `json:"epoch"`
// MultipleTableInfos holds information for multiple versions of a table.
// The first entry always represents the current table information.
MultipleTableInfos []*common.TableInfo `json:"-"`
BlockedTables *InfluencedTables `json:"blocked_tables"`
// BlockedTableNames is used by downstream adapters to get the names of tables that should block this DDL.
// It is particularly used for querying the execution status of asynchronous DDLs (e.g., `ADD INDEX`)
// that may be running on the table before this DDL.
// This field will be set for most `InfluenceTypeNormal` DDLs, except for those creating new tables/schemas or dropping views.
// It will be empty for other DDLs.
// NOTE: For `RENAME TABLE` / `RENAME TABLES` DDLs, this will be set to the old table names.
// For partition DDLs, this will be the parent table name.
BlockedTableNames []SchemaTableName `json:"blocked_table_names"`
NeedDroppedTables *InfluencedTables `json:"need_dropped_tables"`
NeedAddedTables []Table `json:"need_added_tables"`
// Only set when tables moves between databases
UpdatedSchemas []SchemaIDChange `json:"updated_schemas"`
// DDLs which may change table name:
// Create Table
// Create Tables
// Drop Table
// Rename Table
// Rename Tables
// Drop Schema
// Recover Table
TableNameChange *TableNameChange `json:"table_name_change"`
TiDBOnly bool `json:"tidb_only"`
BDRMode string `json:"bdr_mode"`
Err string `json:"err"`
// Call when event flush is completed
PostTxnFlushed []func() `json:"-"`
// eventSize is the size of the event in bytes. It is set when it's unmarshaled.
eventSize int64 `json:"-"`
// for simple protocol
IsBootstrap bool `json:"-"`
// NotSync is used to indicate whether the event should be synced to downstream.
// If it is true, sink should not sync this event to downstream.
// It is used for some special DDL events that do not need to be synced,
// but only need to be sent to dispatcher to update some metadata.
// For example, if a `TRUNCATE TABLE` DDL is filtered by event filter,
// we don't need to sync it to downstream, but the DML events of the new truncated table
// should be sent to downstream.
// So we should send the `TRUNCATE TABLE` DDL event to dispatcher,
// to ensure the new truncated table can be handled correctly.
// If the DDL involves multiple tables, this field is not effective.
// The multiple table DDL event will be handled by filtering querys and table infos.
// NOTE: DDLEventVersion1 still marshals the struct with encoding/json.
// We use json:"not_sync" as the canonical key and keep custom MarshalJSON/
// UnmarshalJSON compatibility so both `not_sync` and legacy `NotSync`
// are interoperable in mixed-version deployment.
NotSync bool `json:"not_sync"`
}
type ddlEventJSONAlias DDLEvent
type ddlEventJSONCompat struct {
ddlEventJSONAlias
NotSyncLegacy bool `json:"NotSync"`
}
type ddlEventJSONDecodeCompat struct {
*ddlEventJSONAlias
NotSyncLegacy *bool `json:"NotSync"`
NotSyncNew *bool `json:"not_sync"`
}
// MarshalJSON encodes both new and legacy NotSync keys for mixed-version compatibility.
func (d DDLEvent) MarshalJSON() ([]byte, error) {
return json.Marshal(ddlEventJSONCompat{
ddlEventJSONAlias: ddlEventJSONAlias(d),
NotSyncLegacy: d.NotSync,
})
}
// UnmarshalJSON accepts both `not_sync` and legacy `NotSync` keys.
// If both keys are present, `not_sync` takes precedence.
func (d *DDLEvent) UnmarshalJSON(data []byte) error {
compat := ddlEventJSONDecodeCompat{
ddlEventJSONAlias: (*ddlEventJSONAlias)(d),
}
if err := json.Unmarshal(data, &compat); err != nil {
return err
}
if compat.NotSyncNew != nil {
d.NotSync = *compat.NotSyncNew
return nil
}
if compat.NotSyncLegacy != nil {
d.NotSync = *compat.NotSyncLegacy
}
return nil
}
func (d *DDLEvent) String() string {
return fmt.Sprintf("DDLEvent{Version: %d, DispatcherID: %s, Type: %d, SchemaID: %d, SchemaName: %s, TableName: %s, ExtraSchemaName: %s, ExtraTableName: %s, Query: %s, TableInfo: %v, StartTs: %d, FinishedTs: %d, Seq: %d, BlockedTables: %v, NeedDroppedTables: %v, NeedAddedTables: %v, UpdatedSchemas: %v, TableNameChange: %v, TiDBOnly: %t, BDRMode: %s, Err: %s, eventSize: %d}",
d.Version, d.DispatcherID.String(), d.Type, d.SchemaID, d.SchemaName, d.TableName, d.ExtraSchemaName, d.ExtraTableName, d.Query, d.TableInfo, d.StartTs, d.FinishedTs, d.Seq, d.BlockedTables, d.NeedDroppedTables, d.NeedAddedTables, d.UpdatedSchemas, d.TableNameChange, d.TiDBOnly, d.BDRMode, d.Err, d.eventSize)
}
func (d *DDLEvent) GetType() int {
return TypeDDLEvent
}
func (d *DDLEvent) GetDispatcherID() common.DispatcherID {
return d.DispatcherID
}
func (d *DDLEvent) GetStartTs() common.Ts {
return d.StartTs
}
func (d *DDLEvent) GetError() error {
if len(d.Err) == 0 {
return nil
}
return errors.New(d.Err)
}
func (d *DDLEvent) GetCommitTs() common.Ts {
return d.FinishedTs
}
func (d *DDLEvent) PostFlush() {
for _, f := range d.PostTxnFlushed {
f()
}
}
func (d *DDLEvent) GetSchemaName() string {
return d.SchemaName
}
func (d *DDLEvent) GetSourceSchemaName() string {
return d.SchemaName
}
func (d *DDLEvent) GetTableName() string {
return d.TableName
}
func (d *DDLEvent) GetSourceTableName() string {
return d.TableName
}
func (d *DDLEvent) GetExtraSchemaName() string {
return d.ExtraSchemaName
}
func (d *DDLEvent) GetSourceExtraSchemaName() string {
return d.ExtraSchemaName
}
func (d *DDLEvent) GetExtraTableName() string {
return d.ExtraTableName
}
func (d *DDLEvent) GetSourceExtraTableName() string {
return d.ExtraTableName
}
func (d *DDLEvent) GetTargetSchemaName() string {
if d.TargetSchemaName != "" {
return d.TargetSchemaName
}
return d.SchemaName
}
func (d *DDLEvent) GetTargetTableName() string {
if d.TargetTableName != "" {
return d.TargetTableName
}
return d.TableName
}
func (d *DDLEvent) GetTargetExtraSchemaName() string {
if d.TargetExtraSchemaName != "" {
return d.TargetExtraSchemaName
}
return d.ExtraSchemaName
}
func (d *DDLEvent) GetTargetExtraTableName() string {
if d.TargetExtraTableName != "" {
return d.TargetExtraTableName
}
return d.ExtraTableName
}
// GetTableID returns the logic table ID of the event.
// it returns 0 when there is no tableinfo
func (d *DDLEvent) GetTableID() int64 {
if d.TableInfo != nil {
return d.TableInfo.TableName.TableID
}
return 0
}
func (d *DDLEvent) GetEvents() []*DDLEvent {
// Some ddl event may be multi-events, we need to split it into multiple messages.
// Such as rename table test.table1 to test.table10, test.table2 to test.table20
switch model.ActionType(d.Type) {
case model.ActionCreateTables, model.ActionRenameTables:
events := make([]*DDLEvent, 0, len(d.MultipleTableInfos))
queries, err := SplitQueries(d.Query)
if err != nil {
log.Panic("split queries failed", zap.Error(err))
}
if len(queries) != len(d.MultipleTableInfos) {
log.Panic("queries length should be equal to multipleTableInfos length", zap.String("query", d.Query), zap.Any("multipleTableInfos", d.MultipleTableInfos))
}
t := model.ActionCreateTable
if model.ActionType(d.Type) == model.ActionRenameTables {
t = model.ActionRenameTable
}
for i, info := range d.MultipleTableInfos {
event := &DDLEvent{
Version: d.Version,
Type: byte(t),
SchemaName: info.GetSchemaName(),
TableName: info.GetTableName(),
TargetSchemaName: info.GetTargetSchemaName(),
TargetTableName: info.GetTargetTableName(),
TableInfo: info,
Query: queries[i],
StartTs: d.StartTs,
FinishedTs: d.FinishedTs,
}
if model.ActionType(d.Type) == model.ActionRenameTables {
event.ExtraSchemaName = d.TableNameChange.DropName[i].SchemaName
event.ExtraTableName = d.TableNameChange.DropName[i].TableName
targetExtraSchemaName, targetExtraTableName := extractRenameTargetExtraFromQuery(queries[i])
event.TargetExtraSchemaName = targetExtraSchemaName
event.TargetExtraTableName = targetExtraTableName
}
events = append(events, event)
}
return events
default:
}
return []*DDLEvent{d}
}
func extractRenameTargetExtraFromQuery(query string) (string, string) {
stmt, err := parser.New().ParseOneStmt(query, "", "")
if err != nil {
log.Panic("parse split rename query failed", zap.String("query", query), zap.Error(err))
}
renameStmt, ok := stmt.(*ast.RenameTableStmt)
if !ok || len(renameStmt.TableToTables) == 0 {
log.Panic("unexpected split rename query", zap.String("query", query), zap.Any("stmt", stmt))
}
oldTable := renameStmt.TableToTables[0].OldTable
return oldTable.Schema.O, oldTable.Name.O
}
func (d *DDLEvent) GetSeq() uint64 {
return d.Seq
}
func (d *DDLEvent) GetEpoch() uint64 {
return d.Epoch
}
func (d *DDLEvent) ClearPostFlushFunc() {
d.PostTxnFlushed = d.PostTxnFlushed[:0]
}
func (d *DDLEvent) AddPostFlushFunc(f func()) {
d.PostTxnFlushed = append(d.PostTxnFlushed, f)
}
func (d *DDLEvent) PushFrontFlushFunc(f func()) {
d.PostTxnFlushed = append([]func(){f}, d.PostTxnFlushed...)
}
func (e *DDLEvent) GetBlockedTables() *InfluencedTables {
return e.BlockedTables
}
func (e *DDLEvent) GetBlockedTableNames() []SchemaTableName {
return e.BlockedTableNames
}
func (e *DDLEvent) GetNeedDroppedTables() *InfluencedTables {
return e.NeedDroppedTables
}
func (e *DDLEvent) GetNeedAddedTables() []Table {
return e.NeedAddedTables
}
func (e *DDLEvent) GetUpdatedSchemas() []SchemaIDChange {
return e.UpdatedSchemas
}
func (e *DDLEvent) GetDDLQuery() string {
if e == nil {
log.Error("DDLEvent is nil, should not happened in production env", zap.Any("event", e))
return ""
}
return e.Query
}
func (e *DDLEvent) GetDDLSchemaName() string {
if e == nil {
return ""
}
return e.GetTargetSchemaName()
}
func (e *DDLEvent) GetDDLType() model.ActionType {
return model.ActionType(e.Type)
}
func (t *DDLEvent) Marshal() ([]byte, error) {
// 1. Encode payload based on version
var payload []byte
var err error
switch t.Version {
case DDLEventVersion1:
payload, err = t.encodeV1()
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported DDLEvent version: %d", t.Version)
}
// 2. Use unified header format
return MarshalEventWithHeader(TypeDDLEvent, t.Version, payload)
}
func (t *DDLEvent) Unmarshal(data []byte) error {
// 1. Validate header and extract payload
payload, version, err := ValidateAndExtractPayload(data, TypeDDLEvent)
if err != nil {
return err
}
// 2. Store version
t.Version = version
// 3. Decode based on version
switch version {
case DDLEventVersion1:
return t.decodeV1(payload)
default:
return fmt.Errorf("unsupported DDLEvent version: %d", version)
}
}
func (t DDLEvent) encodeV1() ([]byte, error) {
// restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipletableInfosDataSize
// Note: version is now handled in the header by Marshal(), not here
data, err := json.Marshal(t)
if err != nil {
return nil, err
}
dispatcherIDData := t.DispatcherID.Marshal()
dispatcherIDDataSize := make([]byte, 8)
binary.BigEndian.PutUint64(dispatcherIDDataSize, uint64(len(dispatcherIDData)))
data = append(data, dispatcherIDData...)
data = append(data, dispatcherIDDataSize...)
if t.TableInfo != nil {
tableInfoData, err := t.TableInfo.Marshal()
if err != nil {
return nil, err
}
tableInfoDataSize := make([]byte, 8)
binary.BigEndian.PutUint64(tableInfoDataSize, uint64(len(tableInfoData)))
data = append(data, tableInfoData...)
data = append(data, tableInfoDataSize...)
} else {
tableInfoDataSize := make([]byte, 8)
binary.BigEndian.PutUint64(tableInfoDataSize, 0)
data = append(data, tableInfoDataSize...)
}
for _, info := range t.MultipleTableInfos {
tableInfoData, err := info.Marshal()
if err != nil {
return nil, err
}
tableInfoDataSize := make([]byte, 8)
binary.BigEndian.PutUint64(tableInfoDataSize, uint64(len(tableInfoData)))
data = append(data, tableInfoData...)
data = append(data, tableInfoDataSize...)
}
multipleTableInfosDataSize := make([]byte, 8)
binary.BigEndian.PutUint64(multipleTableInfosDataSize, uint64(len(t.MultipleTableInfos)))
data = append(data, multipleTableInfosDataSize...)
return data, nil
}
func (t *DDLEvent) decodeV1(data []byte) error {
// restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipleTableInfosDataSize
t.eventSize = int64(len(data))
end := len(data)
if end < 8 {
return fmt.Errorf("invalid DDLEvent data: length %d is too short", len(data))
}
multipleTableInfoCount := binary.BigEndian.Uint64(data[end-8 : end])
if multipleTableInfoCount > uint64((end-8)/8) {
return fmt.Errorf("invalid DDLEvent data: too many multiple table infos, count=%d", multipleTableInfoCount)
}
end -= 8
t.MultipleTableInfos = t.MultipleTableInfos[:0]
if multipleTableInfoCount > 0 {
multipleTableInfos := make([]*common.TableInfo, int(multipleTableInfoCount))
for i := int(multipleTableInfoCount) - 1; i >= 0; i-- {
if end < 8 {
return fmt.Errorf("invalid DDLEvent data: missing table info size for multiple table infos")
}
tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end])
if tableInfoDataSize > uint64(end-8) {
return fmt.Errorf("invalid DDLEvent data: invalid multiple table info size=%d", tableInfoDataSize)
}
tableInfoData := data[end-8-int(tableInfoDataSize) : end-8]
info, err := common.UnmarshalJSONToTableInfo(tableInfoData)
if err != nil {
return err
}
multipleTableInfos[i] = info
end -= 8 + int(tableInfoDataSize)
}
t.MultipleTableInfos = append(t.MultipleTableInfos, multipleTableInfos...)
}
if end < 8 {
return fmt.Errorf("invalid DDLEvent data: missing tableInfoDataSize")
}
tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end])
if tableInfoDataSize > uint64(end-8) {
return fmt.Errorf("invalid DDLEvent data: invalid table info size=%d", tableInfoDataSize)
}
var err error
t.TableInfo = nil
if tableInfoDataSize > 0 {
tableInfoData := data[end-8-int(tableInfoDataSize) : end-8]
info, err := common.UnmarshalJSONToTableInfo(tableInfoData)
if err != nil {
return err
}
t.TableInfo = info
}
end -= 8 + int(tableInfoDataSize)
if end < 8 {
return fmt.Errorf("invalid DDLEvent data: missing dispatcherIDDataSize")
}
dispatcherIDDatSize := binary.BigEndian.Uint64(data[end-8 : end])
if dispatcherIDDatSize > uint64(end-8) {
return fmt.Errorf("invalid DDLEvent data: invalid dispatcher ID size=%d", dispatcherIDDatSize)
}
dispatcherIDData := data[end-8-int(dispatcherIDDatSize) : end-8]
err = t.DispatcherID.Unmarshal(dispatcherIDData)
if err != nil {
return err
}
restDataEnd := end - 8 - int(dispatcherIDDatSize)
err = json.Unmarshal(data[:restDataEnd], t)
if err != nil {
return err
}
for _, info := range t.MultipleTableInfos {
info.InitPrivateFields()
}
if t.TableInfo != nil {
t.TableInfo.InitPrivateFields()
}
return nil
}
func (t *DDLEvent) GetSize() int64 {
return t.eventSize
}
func (t *DDLEvent) IsPaused() bool {
return false
}
// CloneForRouting creates a shallow copy of the DDLEvent that can safely be mutated
// for table-route purposes without affecting the original event.
//
// The clone shares most read-only fields with the original. Slice fields that can be
// replaced independently downstream are copied so routing can update them without
// mutating shared state.
func (d *DDLEvent) CloneForRouting() *DDLEvent {
if d == nil {
return nil
}
// Create shallow copy
clone := *d
// PostTxnFlushed needs its own backing array to prevent potential races.
// Currently, DDL events arrive with nil PostTxnFlushed (callbacks are added
// downstream by basic_dispatcher.go), so append(nil, f) naturally creates a
// fresh slice. However, we make an explicit copy here for future-proofing:
// if any code path later adds callbacks before cloning, sharing the backing
// array could cause nondeterministic callback visibility or data races.
if d.PostTxnFlushed != nil {
clone.PostTxnFlushed = make([]func(), len(d.PostTxnFlushed))
copy(clone.PostTxnFlushed, d.PostTxnFlushed)
}
// MultipleTableInfos needs a new slice so each dispatcher can independently
// apply routing to its elements without affecting others
if d.MultipleTableInfos != nil {
clone.MultipleTableInfos = make([]*common.TableInfo, len(d.MultipleTableInfos))
copy(clone.MultipleTableInfos, d.MultipleTableInfos)
}
if d.BlockedTableNames != nil {
clone.BlockedTableNames = append([]SchemaTableName(nil), d.BlockedTableNames...)
}
return &clone
}
func (t *DDLEvent) Len() int32 {
return 1
}
type DB struct {
SchemaID int64
SchemaName string
}
// TableNameChange will record each ddl change of the table name.
// Each TableNameChange is related to a ddl event
type TableNameChange struct {
AddName []SchemaTableName
DropName []SchemaTableName
DropDatabaseName string
}