Skip to content

Commit 5a19a4b

Browse files
authored
localbackend: move to ingestctrl and rename package (#68382)
ref #68346
1 parent 8c17ce1 commit 5a19a4b

91 files changed

Lines changed: 305 additions & 285 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ mock_import: mockgen
556556
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter > br/pkg/mock/backend.go
557557
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/lightning/common ChunkFlushStatus > br/pkg/mock/common.go
558558
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
559-
tools/bin/mockgen -package mocklocal github.com/pingcap/tidb/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher,StoreHelper > br/pkg/mock/mocklocal/local.go
559+
tools/bin/mockgen -package mocklocal github.com/pingcap/tidb/pkg/ingestor/ingestctrl DiskUsage,TiKVModeSwitcher,StoreHelper > br/pkg/mock/mocklocal/local.go
560560
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go
561561
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/dxf/framework/taskexecutor TaskTable,TaskExecutor,Extension > pkg/dxf/framework/mock/task_executor_mock.go
562562
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/dxf/framework/scheduler Scheduler,CleanUpRoutine,TaskManager > pkg/dxf/framework/mock/scheduler_mock.go

br/pkg/mock/mocklocal/local.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

build/nogo_config.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@
423423
"pkg/lightning/mydump/": "ignore mydump code",
424424
"pkg/lightning/checkpoints/checkpoints.go": "ignore checkpoints code",
425425
"pkg/lightning/checkpoints/glue_checkpoint.go": "ignore glue_checkpoint code",
426-
"pkg/lightning/backend/local/": "ignore local code",
426+
"pkg/ingestor/ingestctrl/": "ignore local code",
427427
"pkg/lightning/backend/tidb/tidb.go": "ignore tidb code",
428428
"pkg/lightning/backend/tidb/tidb_test.go": "ignore tidb code",
429429
"pkg/lightning/checkpoints/checkpoints_test.go": "ignore checkpoints code",
@@ -657,7 +657,7 @@
657657
"pkg/util/chunk": "ignore util/chunk",
658658
"pkg/lightning/mydump/": "more than 50",
659659
"lightning/pkg/importer/": "more than 50",
660-
"pkg/lightning/backend/local/": "more than 50",
660+
"pkg/ingestor/ingestctrl/": "more than 50",
661661
"br/pkg/restore/": "more than 50",
662662
"br/pkg/storage/": "more than 50",
663663
"pkg/ddl/tests/partition/": "more than 50"
@@ -1065,8 +1065,8 @@
10651065
"br/pkg/streamhelper/advancer_cliext.go": "github.com/golang/protobuf deprecated",
10661066
"pkg/lightning/checkpoints/checkpoints.go": "cfg.TikvImporter.Addr is deprecated",
10671067
"pkg/lightning/checkpoints/glue_checkpoint.go": "cfg.TikvImporter.Addr is deprecated",
1068-
"pkg/lightning/backend/local/local.go": "grpc Compressor/Decompressor is deprecated",
1069-
"pkg/lightning/backend/local/compress.go": "grpc Compressor/Decompressor is deprecated",
1068+
"pkg/ingestor/ingestctrl/local.go": "grpc Compressor/Decompressor is deprecated",
1069+
"pkg/ingestor/ingestctrl/compress.go": "grpc Compressor/Decompressor is deprecated",
10701070
"pkg/domain/infosync/resource_manager_client.go": "github.com/golang/protobuf deprecated"
10711071
},
10721072
"only_files": {

lightning/pkg/importer/BUILD.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ go_library(
3333
"//pkg/config",
3434
"//pkg/ddl",
3535
"//pkg/errno",
36+
"//pkg/ingestor/ingestctrl",
3637
"//pkg/keyspace",
3738
"//pkg/kv",
3839
"//pkg/lightning/backend",
3940
"//pkg/lightning/backend/encode",
4041
"//pkg/lightning/backend/kv",
41-
"//pkg/lightning/backend/local",
4242
"//pkg/lightning/backend/tidb",
4343
"//pkg/lightning/common",
4444
"//pkg/lightning/config",
@@ -139,11 +139,11 @@ go_test(
139139
"//pkg/config/kerneltype",
140140
"//pkg/ddl",
141141
"//pkg/errno",
142+
"//pkg/ingestor/ingestctrl",
142143
"//pkg/kv",
143144
"//pkg/lightning/backend",
144145
"//pkg/lightning/backend/encode",
145146
"//pkg/lightning/backend/kv",
146-
"//pkg/lightning/backend/local",
147147
"//pkg/lightning/backend/tidb",
148148
"//pkg/lightning/common",
149149
"//pkg/lightning/config",

lightning/pkg/importer/checksum_helper.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ import (
1919

2020
"github.com/pingcap/errors"
2121
"github.com/pingcap/tidb/br/pkg/pdutil"
22+
"github.com/pingcap/tidb/pkg/ingestor/ingestctrl"
2223
"github.com/pingcap/tidb/pkg/kv"
23-
"github.com/pingcap/tidb/pkg/lightning/backend/local"
2424
"github.com/pingcap/tidb/pkg/lightning/common"
2525
"github.com/pingcap/tidb/pkg/lightning/config"
2626
"github.com/pingcap/tidb/pkg/lightning/importdef"
@@ -31,7 +31,7 @@ import (
3131
)
3232

3333
// NewChecksumManager creates a new checksum manager.
34-
func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (local.ChecksumManager, error) {
34+
func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (ingestctrl.ChecksumManager, error) {
3535
// if we don't need checksum, just return nil
3636
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.PostRestore.Checksum == config.OpLevelOff {
3737
return nil, nil
@@ -43,30 +43,30 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
4343
}
4444

4545
// for v4.0.0 or upper, we can use the gc ttl api
46-
var manager local.ChecksumManager
46+
var manager ingestctrl.ChecksumManager
4747
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
4848
backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db)
4949
// only set backoff weight when it's smaller than default value
50-
if err == nil && backoffWeight >= local.DefaultBackoffWeight {
50+
if err == nil && backoffWeight >= ingestctrl.DefaultBackoffWeight {
5151
logutil.Logger(ctx).Info("get tidb_backoff_weight", zap.Int("backoff_weight", backoffWeight))
5252
} else {
53-
logutil.Logger(ctx).Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", local.DefaultBackoffWeight))
54-
backoffWeight = local.DefaultBackoffWeight
53+
logutil.Logger(ctx).Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", ingestctrl.DefaultBackoffWeight))
54+
backoffWeight = ingestctrl.DefaultBackoffWeight
5555
}
5656

57-
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, rc.taskType)
57+
manager = ingestctrl.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, rc.taskType)
5858
} else {
59-
manager = local.NewTiDBChecksumExecutor(rc.db)
59+
manager = ingestctrl.NewTiDBChecksumExecutor(rc.db)
6060
}
6161

6262
return manager, nil
6363
}
6464

6565
// DoChecksum do checksum for tables.
6666
// table should be in <db>.<table>, format. e.g. foo.bar
67-
func DoChecksum(ctx context.Context, table *importdef.TableInfo) (*local.RemoteChecksum, error) {
67+
func DoChecksum(ctx context.Context, table *importdef.TableInfo) (*ingestctrl.RemoteChecksum, error) {
6868
var err error
69-
manager, ok := ctx.Value(&checksumManagerKey).(local.ChecksumManager)
69+
manager, ok := ctx.Value(&checksumManagerKey).(ingestctrl.ChecksumManager)
7070
if !ok {
7171
return nil, errors.New("No gcLifeTimeManager found in context, check context initialization")
7272
}

lightning/pkg/importer/get_pre_info.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import (
3030
ropts "github.com/pingcap/tidb/lightning/pkg/importer/opts"
3131
"github.com/pingcap/tidb/pkg/ddl"
3232
"github.com/pingcap/tidb/pkg/errno"
33+
"github.com/pingcap/tidb/pkg/ingestor/ingestctrl"
3334
"github.com/pingcap/tidb/pkg/lightning/backend"
3435
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
3536
"github.com/pingcap/tidb/pkg/lightning/backend/kv"
36-
"github.com/pingcap/tidb/pkg/lightning/backend/local"
3737
"github.com/pingcap/tidb/pkg/lightning/backend/tidb"
3838
"github.com/pingcap/tidb/pkg/lightning/common"
3939
"github.com/pingcap/tidb/pkg/lightning/config"
@@ -145,7 +145,7 @@ func NewTargetInfoGetterImpl(
145145
case config.BackendTiDB:
146146
backendTargetInfoGetter = tidb.NewTargetInfoGetter(targetDB)
147147
case config.BackendLocal:
148-
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, pdHTTPCli)
148+
backendTargetInfoGetter = ingestctrl.NewTargetInfoGetter(tls, targetDB, pdHTTPCli)
149149
default:
150150
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
151151
}
@@ -299,7 +299,7 @@ func NewPreImportInfoGetter(
299299
case config.BackendTiDB:
300300
encBuilder = tidb.NewEncodingBuilder()
301301
case config.BackendLocal:
302-
encBuilder = local.NewEncodingBuilder(context.Background())
302+
encBuilder = ingestctrl.NewEncodingBuilder(context.Background())
303303
default:
304304
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
305305
}

lightning/pkg/importer/import.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ import (
4040
"github.com/pingcap/tidb/lightning/pkg/errormanager"
4141
"github.com/pingcap/tidb/lightning/pkg/progress"
4242
tidbconfig "github.com/pingcap/tidb/pkg/config"
43+
"github.com/pingcap/tidb/pkg/ingestor/ingestctrl"
4344
"github.com/pingcap/tidb/pkg/keyspace"
4445
tidbkv "github.com/pingcap/tidb/pkg/kv"
4546
"github.com/pingcap/tidb/pkg/lightning/backend"
4647
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
47-
"github.com/pingcap/tidb/pkg/lightning/backend/local"
4848
"github.com/pingcap/tidb/pkg/lightning/backend/tidb"
4949
"github.com/pingcap/tidb/pkg/lightning/common"
5050
"github.com/pingcap/tidb/pkg/lightning/config"
@@ -240,7 +240,7 @@ type Controller struct {
240240
preInfoGetter PreImportInfoGetter
241241
precheckItemBuilder *PrecheckItemBuilder
242242
encBuilder encode.EncodingBuilder
243-
tikvModeSwitcher local.TiKVModeSwitcher
243+
tikvModeSwitcher ingestctrl.TiKVModeSwitcher
244244

245245
keyspaceName string
246246
resourceGroupName string
@@ -369,19 +369,19 @@ func NewImportControllerWithPauser(
369369
encodingBuilder = tidb.NewEncodingBuilder()
370370
backendObj = tidb.NewTiDBBackend(ctx, db, cfg, errorMgr)
371371
case config.BackendLocal:
372-
var rLimit local.RlimT
373-
rLimit, err = local.GetSystemRLimit()
372+
var rLimit ingestctrl.RlimT
373+
rLimit, err = ingestctrl.GetSystemRLimit()
374374
if err != nil {
375375
return nil, err
376376
}
377-
maxOpenFiles := int(rLimit / local.RlimT(cfg.App.TableConcurrency))
377+
maxOpenFiles := int(rLimit / ingestctrl.RlimT(cfg.App.TableConcurrency))
378378
// check overflow
379379
if maxOpenFiles < 0 {
380380
maxOpenFiles = math.MaxInt32
381381
}
382382

383383
addrs := strings.Split(cfg.TiDB.PdAddr, ",")
384-
pdCli, err = pd.NewClientWithContext(ctx, componentName, addrs, tls.ToPDSecurityOption(), local.PDClientOptions()...)
384+
pdCli, err = pd.NewClientWithContext(ctx, componentName, addrs, tls.ToPDSecurityOption(), ingestctrl.PDClientOptions()...)
385385
if err != nil {
386386
return nil, errors.Trace(err)
387387
}
@@ -402,14 +402,14 @@ func NewImportControllerWithPauser(
402402
}
403403

404404
// simple wraps PD client, so no need to close it separately.
405-
pdCliForTiKV, err := local.NewCodecPDClient(pdCli, p.KeyspaceName)
405+
pdCliForTiKV, err := ingestctrl.NewCodecPDClient(pdCli, p.KeyspaceName)
406406
if err != nil {
407407
return nil, common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs()
408408
}
409409

410410
initGlobalConfig(tls.ToTiKVSecurityConfig())
411411

412-
encodingBuilder = local.NewEncodingBuilder(ctx)
412+
encodingBuilder = ingestctrl.NewEncodingBuilder(ctx)
413413

414414
// get resource group name.
415415
exec := common.SQLWithRetry{
@@ -445,8 +445,8 @@ func NewImportControllerWithPauser(
445445
if isRaftKV2 {
446446
raftKV2SwitchModeDuration = cfg.Cron.SwitchMode.Duration
447447
}
448-
backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName, p.TaskType, raftKV2SwitchModeDuration)
449-
backendObj, err = local.NewBackend(ctx, tls, backendConfig, pdCliForTiKV)
448+
backendConfig := ingestctrl.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName, p.TaskType, raftKV2SwitchModeDuration)
449+
backendObj, err = ingestctrl.NewBackend(ctx, tls, backendConfig, pdCliForTiKV)
450450
if err != nil {
451451
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
452452
}
@@ -479,7 +479,7 @@ func NewImportControllerWithPauser(
479479

480480
var wrapper backend.TargetInfoGetter
481481
if cfg.TikvImporter.Backend == config.BackendLocal {
482-
wrapper = local.NewTargetInfoGetter(tls, db, pdHTTPCli)
482+
wrapper = ingestctrl.NewTargetInfoGetter(tls, db, pdHTTPCli)
483483
} else {
484484
wrapper = tidb.NewTargetInfoGetter(db)
485485
}
@@ -544,7 +544,7 @@ func NewImportControllerWithPauser(
544544
preInfoGetter: preInfoGetter,
545545
precheckItemBuilder: preCheckBuilder,
546546
encBuilder: encodingBuilder,
547-
tikvModeSwitcher: local.NewTiKVModeSwitcher(tls.TLSConfig(), pdHTTPCli, logutil.Logger(ctx)),
547+
tikvModeSwitcher: ingestctrl.NewTiKVModeSwitcher(tls.TLSConfig(), pdHTTPCli, logutil.Logger(ctx)),
548548

549549
keyspaceName: p.KeyspaceName,
550550
resourceGroupName: p.ResourceGroupName,
@@ -760,7 +760,7 @@ func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error
760760
for tableName, engineIDs := range targetTables {
761761
for _, engineID := range engineIDs {
762762
_, eID := backend.MakeUUID(tableName, int64(engineID))
763-
file := local.Engine{UUID: eID}
763+
file := ingestctrl.Engine{UUID: eID}
764764
err := file.Exist(dir)
765765
if err != nil {
766766
logutil.Logger(ctx).Error("can't find local file",
@@ -1705,7 +1705,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) {
17051705
return
17061706
}
17071707

1708-
localBackend := rc.backend.(*local.Backend)
1708+
localBackend := rc.backend.(*ingestctrl.Backend)
17091709
go func() {
17101710
// locker is assigned when we detect the disk quota is exceeded.
17111711
// before the disk quota is confirmed exceeded, we keep the diskQuotaLock
@@ -1733,7 +1733,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) {
17331733
}
17341734

17351735
quota := int64(rc.cfg.TikvImporter.DiskQuota)
1736-
largeEngines, inProgressLargeEngines, totalDiskSize, totalMemSize := local.CheckDiskQuota(localBackend, quota)
1736+
largeEngines, inProgressLargeEngines, totalDiskSize, totalMemSize := ingestctrl.CheckDiskQuota(localBackend, quota)
17371737
if m, ok := metric.FromContext(ctx); ok {
17381738
m.LocalStorageUsageBytesGauge.WithLabelValues("disk").Set(float64(totalDiskSize))
17391739
m.LocalStorageUsageBytesGauge.WithLabelValues("mem").Set(float64(totalMemSize))

lightning/pkg/importer/meta_manager_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424
"github.com/DATA-DOG/go-sqlmock"
2525
"github.com/pingcap/errors"
2626
"github.com/pingcap/tidb/pkg/ddl"
27+
"github.com/pingcap/tidb/pkg/ingestor/ingestctrl"
2728
"github.com/pingcap/tidb/pkg/kv"
28-
"github.com/pingcap/tidb/pkg/lightning/backend/local"
2929
"github.com/pingcap/tidb/pkg/lightning/common"
3030
"github.com/pingcap/tidb/pkg/lightning/importdef"
3131
"github.com/pingcap/tidb/pkg/lightning/log"
@@ -361,7 +361,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int
361361
s.mockDB.ExpectExec("\\QUPDATE `test`.`table_meta` SET total_kvs_base = ?, total_bytes_base = ?, checksum_base = ?, status = ? WHERE table_id = ? AND task_id = ?\\E").
362362
WithArgs(checksum.SumKVS(), checksum.SumSize(), checksum.Sum(), metaStatusRestoreStarted.String(), int64(1), int64(1)).
363363
WillReturnResult(sqlmock.NewResult(int64(0), int64(1)))
364-
s.checksumMgr.checksum = local.RemoteChecksum{
364+
s.checksumMgr.checksum = ingestctrl.RemoteChecksum{
365365
TotalBytes: checksum.SumSize(),
366366
TotalKVs: checksum.SumKVS(),
367367
Checksum: checksum.Sum(),
@@ -438,13 +438,13 @@ func TestCheckTasksExclusively(t *testing.T) {
438438
}
439439

440440
type testChecksumMgr struct {
441-
checksum local.RemoteChecksum
441+
checksum ingestctrl.RemoteChecksum
442442
callCnt int
443443
}
444444

445-
var _ local.ChecksumManager = (*testChecksumMgr)(nil)
445+
var _ ingestctrl.ChecksumManager = (*testChecksumMgr)(nil)
446446

447-
func (t *testChecksumMgr) Checksum(ctx context.Context, tableInfo *importdef.TableInfo) (*local.RemoteChecksum, error) {
447+
func (t *testChecksumMgr) Checksum(ctx context.Context, tableInfo *importdef.TableInfo) (*ingestctrl.RemoteChecksum, error) {
448448
t.callCnt++
449449
return &t.checksum, nil
450450
}

0 commit comments

Comments
 (0)