Skip to content

Commit 48e8665

Browse files
committed
(feat) Implement replica identity when creating partitions
It follows the parent table. Signed-off-by: Daniel Vérité <dverite@gmail.com>
1 parent bbe3df6 commit 48e8665

File tree

7 files changed

+115
-2
lines changed

7 files changed

+115
-2
lines changed

internal/infra/postgresql/partition.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,55 @@ func (p Postgres) GetPartitionSettings(schema, table string) (strategy, key stri
140140

141141
return strategy, key, nil
142142
}
143+
144+
func (p Postgres) SetPartitionReplicaIdentity(schema, table, parent string) error {
145+
var replIdent string
146+
147+
// Get the replica identity of the parent
148+
query_ri := "SELECT relreplident::text from pg_class c JOIN pg_namespace n ON (n.oid=c.relnamespace) WHERE n.nspname=$1 AND c.relname=$2"
149+
err := p.conn.QueryRow(p.ctx, query_ri, schema, parent).Scan(&replIdent)
150+
if err != nil {
151+
return fmt.Errorf("failed to check pg_class.relreplident for parent table: %w", err)
152+
}
153+
154+
if replIdent == "f" { // replica identity = full
155+
query_f := fmt.Sprintf("ALTER TABLE %s.%s REPLICA IDENTITY FULL", schema, table)
156+
p.logger.Debug("Set identity full", "query", query_f)
157+
_, err = p.conn.Exec(p.ctx, query_f)
158+
if err != nil {
159+
return fmt.Errorf("failed to set replica identity: %w", err)
160+
}
161+
162+
} else if replIdent == "i" { // replica identity = specific index
163+
var indexName string
164+
/* This query finds the index that is a child of the (only) index
165+
in the parent table having "indisreplident"=true.
166+
"pg_inherits" holds the (parent-index, child-index) relationship. */
167+
query_idx := `
168+
SELECT c_idx_child.relname
169+
FROM pg_index i_parent JOIN pg_inherits inh ON (inh.inhparent=i_parent.indexrelid)
170+
JOIN pg_index i_child ON (i_child.indexrelid=inh.inhrelid)
171+
JOIN pg_class c_parent ON (c_parent.oid=i_parent.indrelid)
172+
JOIN pg_namespace ON (pg_namespace.oid=c_parent.relnamespace)
173+
JOIN pg_class c_idx_child ON (c_idx_child.oid=inh.inhrelid)
174+
JOIN pg_class c_child ON (c_child.oid=i_child.indrelid)
175+
WHERE pg_namespace.nspname= $1
176+
AND c_parent.relname = $2
177+
AND i_parent.indisreplident=true
178+
AND c_child.relname = $3
179+
`
180+
err = p.conn.QueryRow(p.ctx, query_idx, schema, parent, table).Scan(&indexName)
181+
if err != nil {
182+
return fmt.Errorf("failed to find the child index for the new partition: %w", err)
183+
}
184+
185+
query_alter := fmt.Sprintf("ALTER TABLE %s.%s REPLICA IDENTITY USING INDEX %s", schema, table, indexName)
186+
p.logger.Debug("Set replica identity", "schema", schema, "table", table, "index", indexName, "query", query_alter)
187+
188+
_, err := p.conn.Exec(p.ctx, query_alter)
189+
if err != nil {
190+
return fmt.Errorf("failed to set replica identity on the new partition: %w", err)
191+
}
192+
}
193+
return nil
194+
}

internal/infra/postgresql/table.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import "fmt"
44

55
func (p Postgres) CreateTableLikeTable(schema, table, parent string) error {
66
query := fmt.Sprintf("CREATE TABLE %s.%s (LIKE %s.%s INCLUDING ALL)", schema, table, schema, parent)
7-
p.logger.Debug("Create table", "query", schema, "table", table, "query", query)
7+
p.logger.Debug("Create table", "schema", schema, "table", table, "query", query)
88

99
_, err := p.conn.Exec(p.ctx, query)
1010
if err != nil {

pkg/ppm/mocks/PostgreSQLClient.go

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/ppm/ppm.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type PostgreSQLClient interface {
2424
DropTable(schema, table string) error
2525
DetachPartitionConcurrently(schema, table, parent string) error
2626
FinalizePartitionDetach(schema, table, parent string) error
27+
SetPartitionReplicaIdentity(schema, table, parent string) error
2728
}
2829

2930
type PPM struct {

pkg/ppm/provisioning.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,13 @@ func (p PPM) CreatePartition(partitionConfiguration partition.Configuration, par
167167
return fmt.Errorf("fail to attach partition: %w", err)
168168
}
169169

170+
err = p.db.SetPartitionReplicaIdentity(partition.Schema, partition.Name, partition.ParentTable)
171+
if err != nil {
172+
p.logger.Warn("failed to set replica identity", "error", err, "schema", partition.Schema, "table", partition.Name, "attempt", attempt, "max_retries", maxRetries)
173+
174+
return fmt.Errorf("fail to set replica identity: %w", err)
175+
}
176+
170177
return nil
171178
})
172179
if err != nil {

scripts/bats/30_provisioning.bats

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ setup() {
1919
ppm_setup
2020
}
2121

22+
2223
@test "Test that provisioning succeed on up-to-date partitioning" {
2324
local TABLE=$(generate_table_name)
2425
local INTERVAL=daily
@@ -434,5 +435,44 @@ EOF
434435
assert_output "$expected_2"
435436

436437
rm "$CONFIGURATION_FILE"
438+
}
439+
440+
441+
@test "Test setting Replica Identity on new partitions" {
442+
local TABLE=$(generate_table_name)
443+
create_partitioned_table ${TABLE}
444+
445+
local CONFIGURATION=$(basic_configuration "$TABLE" monthly created_at 1 2)
446+
local CONFIGURATION_FILE=$(generate_configuration_file "${CONFIGURATION}")
447+
448+
execute_sql "CREATE UNIQUE INDEX idx_ri_test ON \"${TABLE}\"(created_at, id);"
449+
execute_sql "ALTER TABLE \"${TABLE}\" REPLICA IDENTITY USING INDEX idx_ri_test;"
450+
451+
PPM_WORK_DATE="2026-01-01" run "$PPM_PROG" run provisioning -c ${CONFIGURATION_FILE}
452+
assert_success
453+
assert_output --partial "All partitions are correctly provisioned"
454+
455+
# The 4 partitions should all have the replica identity set to "i"
456+
local check_query="SELECT array_agg(relreplident ORDER BY relname) FROM pg_class WHERE oid in
457+
(SELECT inhrelid from pg_inherits where inhparent='\"$TABLE\"'::regclass);"
458+
459+
run execute_sql_commands "$check_query"
460+
assert_output "{i,i,i,i}"
461+
462+
# Switch the parent replica identity to "full". Afterwards it must be applied to
463+
# newly created partitions
464+
execute_sql "ALTER TABLE \"${TABLE}\" REPLICA IDENTITY FULL;"
437465

466+
# Increment the provisioning by one month and run the test again
467+
yq eval ".partitions.unittest.preProvisioned = 3" -i ${CONFIGURATION_FILE}
468+
PPM_WORK_DATE="2026-01-01" run "$PPM_PROG" run provisioning -c ${CONFIGURATION_FILE}
469+
assert_success
470+
assert_output --partial "All partitions are correctly provisioned"
471+
472+
# Now the most recent partition should have its replica identity to full
473+
# and the others to indexes
474+
run execute_sql_commands "$check_query"
475+
assert_output "{i,i,i,i,f}"
476+
477+
rm "$CONFIGURATION_FILE"
438478
}

scripts/bats/test/libs/seeds.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ create_table_from_template() {
2828

2929
read -r -d '' QUERY <<EOQ ||
3030
CREATE TABLE ${TABLE} (
31-
id BIGSERIAL,
31+
id BIGSERIAL NOT NULL,
3232
temperature INT,
3333
created_at DATE NOT NULL
3434
) PARTITION BY RANGE (created_at);

0 commit comments

Comments
 (0)