Skip to content

Commit d93ac9a

Browse files
authored
perf: avoid arrow IPC & unneeded steps when flushing delta (#247)
* perf: avoid unneeded steps when flushing delta * fix: implement workaround for RegisterView bug
1 parent eaafcc7 commit d93ac9a

File tree

8 files changed

+395
-123
lines changed

8 files changed

+395
-123
lines changed

adapter/adapter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ func GetConn(ctx *sql.Context) (*stdsql.Conn, error) {
2121
return ctx.Session.(ConnectionHolder).GetConn(ctx)
2222
}
2323

24+
func GetCatalogConn(ctx *sql.Context) (*stdsql.Conn, error) {
25+
return ctx.Session.(ConnectionHolder).GetCatalogConn(ctx)
26+
}
27+
2428
func CloseBackendConn(ctx *sql.Context) {
2529
ctx.Session.(ConnectionHolder).CloseBackendConn()
2630
}

binlogreplication/binlog_replica_applier.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1245,14 +1245,18 @@ func (a *binlogReplicaApplier) appendRowFormatChanges(
12451245
}
12461246

12471247
func (a *binlogReplicaApplier) flushDeltaBuffer(ctx *sql.Context, reason delta.FlushReason) error {
1248+
conn, err := adapter.GetCatalogConn(ctx)
1249+
if err != nil {
1250+
return err
1251+
}
12481252
tx, err := adapter.GetCatalogTxn(ctx, nil)
12491253
if err != nil {
12501254
return err
12511255
}
12521256

12531257
defer a.deltaBufSize.Store(0)
12541258

1255-
if err = a.tableWriterProvider.FlushDeltaBuffer(ctx, tx, reason); err != nil {
1259+
if err = a.tableWriterProvider.FlushDeltaBuffer(ctx, conn, tx, reason); err != nil {
12561260
ctx.GetLogger().Errorf("Failed to flush changelog: %v", err.Error())
12571261
MyBinlogReplicaController.setSqlError(sqlerror.ERUnknownError, err.Error())
12581262
}

binlogreplication/writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type TableWriterProvider interface {
5151
) (DeltaAppender, error)
5252

5353
// FlushDelta writes the accumulated changes to the database.
54-
FlushDeltaBuffer(ctx *sql.Context, tx *stdsql.Tx, reason delta.FlushReason) error
54+
FlushDeltaBuffer(ctx *sql.Context, conn *stdsql.Conn, tx *stdsql.Tx, reason delta.FlushReason) error
5555

5656
// DiscardDeltaBuffer discards the accumulated changes.
5757
DiscardDeltaBuffer(ctx *sql.Context)

0 commit comments

Comments
 (0)