Skip to content

Commit 73f3a8e

Browse files
authored
feat: support replicating TRUNCATE from postgres (#277)
1 parent 2be0e15 commit 73f3a8e

File tree

2 files changed

+59
-4
lines changed

2 files changed

+59
-4
lines changed

pgserver/logrepl/replication.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/apecloud/myduckserver/adapter"
2828
"github.com/apecloud/myduckserver/binlog"
29+
"github.com/apecloud/myduckserver/catalog"
2930
"github.com/apecloud/myduckserver/delta"
3031
"github.com/apecloud/myduckserver/pgtypes"
3132
"github.com/dolthub/go-mysql-server/sql"
@@ -814,8 +815,8 @@ func (r *LogicalReplicator) processMessage(
814815
// No old tuple provided; it means the key columns are unchanged.
815816
// It's fine not to append a delete event to the delta in this case.
816817
// However, the delta appender implements an optimization that
817-
// uses INSERT instead of UPSERT+DELETE when there is no deletion in a batch.
818-
// We need to enforce the use of UPSERT here because the deletion count is zero.
818+
// uses INSERT instead of UPSERT+DELETE or DELETE+INSERT when there is no deletion in a batch.
819+
// We need to enforce the latter code path here because the deletion count is zero.
819820
err = r.append(state, logicalMsg.RelationID, nil, binlog.DeleteRowEvent, binlog.UpdateRowEvent, true)
820821
}
821822
if err != nil {
@@ -836,9 +837,9 @@ func (r *LogicalReplicator) processMessage(
836837
if !state.processMessages {
837838
r.logger.Debugf("Received stale message, ignoring. Last written LSN: %s Message LSN: %s", state.lastWrittenLSN, xld.ServerWALEnd)
838839
return false, nil
839-
// Determine which columns to use based on OldTupleType
840840
}
841841

842+
// Determine which columns to use based on OldTupleType
842843
switch logicalMsg.OldTupleType {
843844
case pglogrepl.UpdateMessageTupleTypeKey:
844845
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, binlog.DeleteRowEvent, true)
@@ -858,7 +859,27 @@ func (r *LogicalReplicator) processMessage(
858859
state.inTxnStmtID += 1
859860

860861
case *pglogrepl.TruncateMessageV2:
861-
r.logger.Debugf("truncate for xid %d\n", logicalMsg.Xid)
862+
if !state.processMessages {
863+
r.logger.Debugf("Received stale message, ignoring. Last written LSN: %s Message LSN: %s", state.lastWrittenLSN, xld.ServerWALEnd)
864+
return false, nil
865+
}
866+
867+
r.logger.Debugf("Truncate message: xid %d\n", logicalMsg.Xid)
868+
869+
// Flush the delta buffer first
870+
r.flushDeltaBuffer(state, nil, nil, delta.DMLStmtFlushReason)
871+
872+
// Truncate the tables
873+
for _, relationID := range logicalMsg.RelationIDs {
874+
if err := r.truncate(state, relationID); err != nil {
875+
return false, err
876+
}
877+
}
878+
879+
state.dirtyTxn = true
880+
state.dirtyStream = true
881+
state.inTxnStmtID += 1
882+
862883
case *pglogrepl.TypeMessageV2:
863884
r.logger.Debugf("typeMessage for xid %d\n", logicalMsg.Xid)
864885
case *pglogrepl.OriginMessage:
@@ -1084,6 +1105,17 @@ func (r *LogicalReplicator) append(state *replicationState, relationID uint32, t
10841105
return nil
10851106
}
10861107

1108+
func (r *LogicalReplicator) truncate(state *replicationState, relationID uint32) error {
1109+
rel, ok := state.relations[relationID]
1110+
if !ok {
1111+
return fmt.Errorf("unknown relation ID %d", relationID)
1112+
}
1113+
1114+
r.logger.Debugf("Truncating table %s.%s\n", rel.Namespace, rel.RelationName)
1115+
_, err := adapter.ExecInTxn(state.replicaCtx, `TRUNCATE `+catalog.ConnectIdentifiersANSI(rel.Namespace, rel.RelationName))
1116+
return err
1117+
}
1118+
10871119
func tupleDataFormat(dataType uint8) int16 {
10881120
switch dataType {
10891121
case pglogrepl.TupleDataTypeBinary:

pgserver/logrepl/replication_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,29 @@ var replicationTests = []ReplicationTest{
610610
},
611611
},
612612
},
613+
{
614+
Name: "Truncate table",
615+
SetUpScript: []string{
616+
dropReplicationSlot,
617+
createReplicationSlot,
618+
startReplication,
619+
"/* replica */ drop table if exists public.test",
620+
"drop table if exists public.test",
621+
"CREATE TABLE public.test (id INT primary key, name varchar(10))",
622+
"INSERT INTO public.test VALUES (1, 'one'), (2, 'two'), (3, 'three')",
623+
"TRUNCATE TABLE public.test",
624+
"INSERT INTO public.test VALUES (4, 'four')",
625+
waitForCatchup,
626+
},
627+
Assertions: []ScriptTestAssertion{
628+
{
629+
Query: "/* replica */ SELECT * FROM public.test order by id",
630+
Expected: []sql.Row{
631+
{int32(4), "four"},
632+
},
633+
},
634+
},
635+
},
613636
}
614637

615638
func TestReplication(t *testing.T) {

0 commit comments

Comments
 (0)