Skip to content

Commit bf4e5a8

Browse files
committed
fix: Prevent data duplication for replicated tables by enforcing CopyOnMaster strategy
- Always use CopyOnMaster strategy for replicated tables regardless of on-segment-threshold - Add IsReplicated field detection via gp_distribution_policy.policytype = 'r' - Ensure replicated table identification propagates through entire processing pipeline - Add version-specific SQL queries for GPDB 6+ compatibility - Include comprehensive tests for replicated table migration scenarios Fixes #21: Data duplicated when on-segment-threshold is negative and replicated tables incorrectly use CopyOnSegment strategy instead of CopyOnMaster.
1 parent 0151888 commit bf4e5a8

File tree

9 files changed

+305
-43
lines changed

9 files changed

+305
-43
lines changed

copy/copy_command.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -607,13 +607,23 @@ func createTestCopyStrategy(strategy string, workerId int, srcSegs []utils.Segme
607607
// - Number of segments in source and destination clusters (srcSegs, destSegs)
608608
// - Database versions of source and destination (srcConn.Version, destConn.Version)
609609
// It returns an instance of a struct that implements the CopyCommand interface.
610-
func CreateCopyStrategy(numTuples int64, workerId int, srcSegs []utils.SegmentHostInfo, destSegs []utils.SegmentIpInfo, srcConn, destConn *dbconn.DBConn) CopyCommand {
610+
func CreateCopyStrategy(isReplicated bool,
611+
numTuples int64,
612+
workerId int,
613+
srcSegs []utils.SegmentHostInfo,
614+
destSegs []utils.SegmentIpInfo,
615+
srcConn, destConn *dbconn.DBConn) CopyCommand {
616+
compArg := "--compress-type snappy"
617+
if isReplicated {
618+
gplog.Debug("Using CopyOnMaster strategy for replicated table")
619+
return &CopyOnMaster{CopyBase: CopyBase{WorkerId: workerId, SrcSegmentsHostInfo: srcSegs, DestSegmentsIpInfo: destSegs, CompArg: compArg}}
620+
}
621+
611622
if strategy := os.Getenv("TEST_COPY_STRATEGY"); strategy != "" {
612623
gplog.Debug("Using test copy strategy: %s", strategy)
613624
return createTestCopyStrategy(strategy, workerId, srcSegs, destSegs)
614625
}
615626

616-
compArg := "--compress-type snappy"
617627
if numTuples <= int64(utils.MustGetFlagInt(option.ON_SEGMENT_THRESHOLD)) {
618628
if !utils.MustGetFlagBool(option.COMPRESSION) {
619629
compArg = "--no-compression"

copy/copy_manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,8 @@ func (tc *TableCopier) cleanupAfterCopy(isSkipped bool, inTxn bool, err error) {
182182
}
183183

184184
func (tc *TableCopier) copyData() error {
185-
command := CreateCopyStrategy(tc.srcTable.RelTuples,
185+
command := CreateCopyStrategy(tc.srcTable.IsReplicated,
186+
tc.srcTable.RelTuples,
186187
tc.workerID,
187188
tc.manager.srcSegmentsHostInfo,
188189
tc.manager.destSegmentsIpInfo,

copy/copy_metadata.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,10 @@ func (m *MetadataManager) fillTablePairChan(srcTables, destTables []option.Table
116116
for i, t := range srcTables {
117117
tablec <- option.TablePair{
118118
SrcTable: option.Table{
119-
Schema: t.Schema,
120-
Name: t.Name,
121-
RelTuples: t.RelTuples,
119+
Schema: t.Schema,
120+
Name: t.Name,
121+
RelTuples: t.RelTuples,
122+
IsReplicated: t.IsReplicated,
122123
},
123124
DestTable: option.Table{
124125
Schema: destTables[i].Schema,

copy/copy_operation.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ func (op *CopyOperation) handleFailure(donec chan struct{}, toErr error, fromErr
149149
gplog.Debug("[Worker %v] Failed to copy %v.%v to %v.%v : %v",
150150
op.connNum, op.srcTable.Schema, op.srcTable.Name,
151151
op.destTable.Schema, op.destTable.Name, toErr)
152-
time.Sleep(2 * time.Second)
153152
if fromErr != nil {
154153
<-donec
155154
return fromErr

copy/copy_query.go

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,18 @@ func (qm *QueryManager) GetAllDatabases(conn *dbconn.DBConn) ([]string, error) {
6262

6363
// GetUserTables retrieves user tables from the database
6464
// example output:
65-
// map[public.t1:{"Partition":0,"RelTuples":1000} public.t2:{"Partition":0,"RelTuples":2000}]
65+
// map[public.t1:{"Partition":0,"RelTuples":1000,"IsReplicated":false} public.t2:{"Partition":0,"RelTuples":2000,"IsReplicated":true}]
6666
func (tm *QueryManager) GetUserTables(conn *dbconn.DBConn) (map[string]option.TableStatistics, error) {
6767
var query string
6868

6969
if (conn.Version.IsGPDB() && conn.Version.AtLeast("7")) || conn.Version.IsCBDBFamily() {
7070
query = `
7171
SELECT
72-
quote_ident(n.nspname) AS schema, quote_ident(c.relname) as name, 0 as partition, cast(c.reltuples as bigint) AS relTuples
72+
quote_ident(n.nspname) AS schema,
73+
quote_ident(c.relname) as name,
74+
0 as partition,
75+
cast(c.reltuples as bigint) AS relTuples,
76+
CASE WHEN p.policytype = 'r' THEN true ELSE false END as isReplicated
7377
FROM
7478
pg_class c
7579
JOIN pg_namespace n ON (c.relnamespace=n.oid)
@@ -81,11 +85,15 @@ func (tm *QueryManager) GetUserTables(conn *dbconn.DBConn) (map[string]option.Ta
8185
AND c.relkind <> 'm'
8286
ORDER BY c.relpages DESC
8387
`
84-
} else {
88+
} else if conn.Version.IsGPDB() && conn.Version.AtLeast("6") {
8589
query = `
8690
SELECT t.* FROM (
8791
SELECT
88-
quote_ident(n.nspname) AS schema, quote_ident(c.relname) as name, 0 as partition, cast(c.reltuples as bigint) AS relTuples
92+
quote_ident(n.nspname) AS schema,
93+
quote_ident(c.relname) as name,
94+
0 as partition,
95+
cast(c.reltuples as bigint) AS relTuples,
96+
CASE WHEN p.policytype = 'r' THEN true ELSE false END as isReplicated
8997
FROM
9098
pg_class c
9199
JOIN pg_namespace n ON (c.relnamespace=n.oid)
@@ -99,7 +107,48 @@ func (tm *QueryManager) GetUserTables(conn *dbconn.DBConn) (map[string]option.Ta
99107
ORDER BY c.relpages DESC ) t
100108
UNION ALL
101109
SELECT
102-
quote_ident(n.nspname) AS schema, quote_ident(cparent.relname) AS name, 0 as partition, cast(cparent.reltuples as bigint) AS relTuples
110+
quote_ident(n.nspname) AS schema,
111+
quote_ident(cparent.relname) AS name,
112+
0 as partition,
113+
cast(cparent.reltuples as bigint) AS relTuples,
114+
false as isReplicated
115+
FROM pg_partition p
116+
JOIN pg_partition_rule r ON p.oid = r.paroid
117+
JOIN pg_class cparent ON cparent.oid = r.parchildrelid
118+
JOIN pg_namespace n ON cparent.relnamespace=n.oid
119+
JOIN (SELECT parrelid AS relid, max(parlevel) AS pl
120+
FROM pg_partition GROUP BY parrelid) AS levels ON p.parrelid = levels.relid
121+
WHERE
122+
r.parchildrelid != 0
123+
AND p.parlevel = levels.pl
124+
AND n.nspname NOT IN ('gpexpand', 'pg_bitmapindex', 'information_schema', 'gp_toolkit')
125+
AND n.nspname NOT LIKE 'pg_temp_%' AND cparent.relstorage NOT IN ('v', 'x', 'f')`
126+
} else {
127+
query = `
128+
SELECT t.* FROM (
129+
SELECT
130+
quote_ident(n.nspname) AS schema,
131+
quote_ident(c.relname) as name,
132+
0 as partition,
133+
cast(c.reltuples as bigint) AS relTuples,
134+
false as isReplicated
135+
FROM
136+
pg_class c
137+
JOIN pg_namespace n ON (c.relnamespace=n.oid)
138+
WHERE
139+
c.oid NOT IN ( SELECT parchildrelid as oid FROM pg_partition_rule )
140+
AND c.oid NOT IN ( SELECT parrelid as oid FROM pg_partition )
141+
AND n.nspname NOT IN ('gpexpand', 'pg_bitmapindex', 'information_schema', 'gp_toolkit')
142+
AND n.nspname NOT LIKE 'pg_temp_%' AND c.relstorage NOT IN ('v', 'x', 'f')
143+
AND c.relkind <> 'm'
144+
ORDER BY c.relpages DESC ) t
145+
UNION ALL
146+
SELECT
147+
quote_ident(n.nspname) AS schema,
148+
quote_ident(cparent.relname) AS name,
149+
0 as partition,
150+
cast(cparent.reltuples as bigint) AS relTuples,
151+
false as isReplicated
103152
FROM pg_partition p
104153
JOIN pg_partition_rule r ON p.oid = r.paroid
105154
JOIN pg_class cparent ON cparent.oid = r.parchildrelid
@@ -123,7 +172,7 @@ func (tm *QueryManager) GetUserTables(conn *dbconn.DBConn) (map[string]option.Ta
123172
results := make(map[string]option.TableStatistics)
124173
for _, t := range tables {
125174
k := t.Schema + "." + t.Name
126-
results[k] = option.TableStatistics{Partition: t.Partition, RelTuples: t.RelTuples}
175+
results[k] = option.TableStatistics{Partition: t.Partition, RelTuples: t.RelTuples, IsReplicated: t.IsReplicated}
127176
}
128177

129178
return results, nil

copy/copy_query_wrapper.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,11 @@ func (qw *QueryWrapper) excludeTables(includeTables, excludeTables map[string]op
6868
if !exists {
6969
sl := strings.Split(k, ".")
7070
results = append(results, option.Table{
71-
Schema: sl[0],
72-
Name: sl[1],
73-
Partition: v.Partition,
74-
RelTuples: v.RelTuples,
71+
Schema: sl[0],
72+
Name: sl[1],
73+
Partition: v.Partition,
74+
RelTuples: v.RelTuples,
75+
IsReplicated: v.IsReplicated,
7576
})
7677
}
7778
}
@@ -116,10 +117,11 @@ func (qw *QueryWrapper) redirectIncludeTables(tables []option.Table) []option.Ta
116117

117118
for _, v := range tables {
118119
results = append(results, option.Table{
119-
Schema: ds,
120-
Name: v.Name,
121-
Partition: v.Partition,
122-
RelTuples: v.RelTuples,
120+
Schema: ds,
121+
Name: v.Name,
122+
Partition: v.Partition,
123+
RelTuples: v.RelTuples,
124+
IsReplicated: v.IsReplicated,
123125
})
124126
}
125127

@@ -273,7 +275,11 @@ func (qw *QueryWrapper) FormUserTableMap(srcTables, destTables []option.Table) m
273275
result := make(map[string]string)
274276

275277
for i, t := range srcTables {
276-
result[destTables[i].Schema+"."+destTables[i].Name] = t.Schema + "." + t.Name + "." + strconv.FormatInt(t.RelTuples, 10)
278+
isReplicatedStr := "false"
279+
if t.IsReplicated {
280+
isReplicatedStr = "true"
281+
}
282+
result[destTables[i].Schema+"."+destTables[i].Name] = t.Schema + "." + t.Name + "." + strconv.FormatInt(t.RelTuples, 10) + "." + isReplicatedStr
277283
}
278284

279285
return result
@@ -391,7 +397,11 @@ func (qw *QueryWrapper) expandPartTables(conn *dbconn.DBConn, userTables map[str
391397

392398
stat, exists := userTables[fqn]
393399
if exists {
394-
expandMap[fqn] = option.TableStatistics{Partition: 0, RelTuples: stat.RelTuples}
400+
expandMap[fqn] = option.TableStatistics{
401+
Partition: 0,
402+
RelTuples: stat.RelTuples,
403+
IsReplicated: stat.IsReplicated,
404+
}
395405
} else {
396406
pendingCheckRels[fqn] = option.TableStatistics{Partition: 0, RelTuples: t.RelTuples}
397407
}
@@ -471,16 +481,18 @@ func (qw *QueryWrapper) excludeTablePair(srcTables, destTables, exclTables []opt
471481
sld := strings.Split(v, ".")
472482

473483
excludedSrcTabs = append(excludedSrcTabs, option.Table{
474-
Schema: sls[0],
475-
Name: sls[1],
476-
Partition: u.Partition,
477-
RelTuples: u.RelTuples,
484+
Schema: sls[0],
485+
Name: sls[1],
486+
Partition: u.Partition,
487+
RelTuples: u.RelTuples,
488+
IsReplicated: u.IsReplicated,
478489
})
479490
excludedDstTabs = append(excludedDstTabs, option.Table{
480-
Schema: sld[0],
481-
Name: sld[1],
482-
Partition: u.Partition,
483-
RelTuples: u.RelTuples,
491+
Schema: sld[0],
492+
Name: sld[1],
493+
Partition: u.Partition,
494+
RelTuples: u.RelTuples,
495+
IsReplicated: u.IsReplicated,
484496
})
485497

486498
gplog.Debug("mapping table from \"%v\" to \"%v\"", k, v)

0 commit comments

Comments
 (0)