Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions internal/infra/postgresql/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ type PartitionResult struct {
}

func (p Postgres) IsPartitionAttached(schema, table string) (exists bool, err error) {
query := `SELECT EXISTS(
SELECT 1 FROM pg_inherits WHERE inhrelid = $1::regclass
)`
query := `SELECT EXISTS (SELECT 1 FROM pg_catalog.pg_inherits WHERE inhrelid = (SELECT c.oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relname = $2 AND c.relkind='r'))
`

err = p.conn.QueryRow(p.ctx, query, fmt.Sprintf("%s.%s", schema, table)).Scan(&exists)
err = p.conn.QueryRow(p.ctx, query, schema, table).Scan(&exists)
if err != nil {
return false, fmt.Errorf("failed to check partition attachment: %w", err)
}
Expand All @@ -37,7 +39,10 @@ func (p Postgres) IsPartitionAttached(schema, table string) (exists bool, err er
}

func (p Postgres) AttachPartition(schema, table, parent, lowerBound, upperBound string) error {
query := fmt.Sprintf("ALTER TABLE %s.%s ATTACH PARTITION %s.%s FOR VALUES FROM ('%s') TO ('%s')", schema, parent, schema, table, lowerBound, upperBound)
query := fmt.Sprintf("ALTER TABLE %s ATTACH PARTITION %s FOR VALUES FROM ('%s') TO ('%s')",
pgx.Identifier{schema, parent}.Sanitize(),
pgx.Identifier{schema, table}.Sanitize(),
lowerBound, upperBound)
p.logger.Debug("Attach partition", "query", query, "schema", schema, "table", table)

_, err := p.conn.Exec(p.ctx, query)
Expand All @@ -52,7 +57,10 @@ func (p Postgres) AttachPartition(schema, table, parent, lowerBound, upperBound
// The partition still exists as standalone table after detaching
// More info: https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-DETACH-PARTITION
func (p Postgres) DetachPartitionConcurrently(schema, table, parent string) error {
query := fmt.Sprintf(`ALTER TABLE %s.%s DETACH PARTITION %s.%s CONCURRENTLY`, schema, parent, schema, table)
query := fmt.Sprintf("ALTER TABLE %s DETACH PARTITION %s CONCURRENTLY",
pgx.Identifier{schema, parent}.Sanitize(),
pgx.Identifier{schema, table}.Sanitize())

p.logger.Debug("Detach partition", "schema", schema, "table", table, "query", query, "parent_table", parent)

_, err := p.conn.Exec(p.ctx, query)
Expand All @@ -67,7 +75,9 @@ func (p Postgres) DetachPartitionConcurrently(schema, table, parent string) erro
// It's required when a partition is in "detach pending" status.
// More info: https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-DETACH-PARTITION
func (p Postgres) FinalizePartitionDetach(schema, table, parent string) error {
query := fmt.Sprintf(`ALTER TABLE %s.%s DETACH PARTITION %s.%s FINALIZE`, schema, parent, schema, table)
query := fmt.Sprintf(`ALTER TABLE %s DETACH PARTITION %s FINALIZE`,
pgx.Identifier{schema, parent}.Sanitize(),
pgx.Identifier{schema, table}.Sanitize())
p.logger.Debug("finialize detach partition", "schema", schema, "table", table, "query", query, "parent_table", parent)

_, err := p.conn.Exec(p.ctx, query)
Expand All @@ -79,28 +89,33 @@ func (p Postgres) FinalizePartitionDetach(schema, table, parent string) error {
}

func (p Postgres) ListPartitions(schema, table string) (partitions []PartitionResult, err error) {
query := fmt.Sprintf(`
query := `
WITH parts as (
SELECT
relnamespace::regnamespace as schema,
c.oid::pg_catalog.regclass AS part_name,
n.nspname as schema,
c.relname AS part_name,
regexp_match(pg_get_expr(c.relpartbound, c.oid),
'FOR VALUES FROM \(''(.*)''\) TO \(''(.*)''\)') AS bounds
FROM
pg_catalog.pg_class c JOIN pg_catalog.pg_inherits i ON (c.oid = i.inhrelid)
WHERE i.inhparent = '%s.%s'::regclass
JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid)
WHERE i.inhparent = (SELECT c.oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relname = $2 AND c.relkind='p' -- parent
)
AND c.relkind='r'
)
SELECT
schema,
part_name as name,
'%s' as parentTable,
$2 as parentTable,
bounds[1]::text AS lowerBound,
bounds[2]::text AS upperBound
FROM parts
ORDER BY part_name;`, schema, table, table)
ORDER BY part_name`

rows, err := p.conn.Query(p.ctx, query)
rows, err := p.conn.Query(p.ctx, query, schema, table)
if err != nil {
return nil, fmt.Errorf("failed to list partitions: %w", err)
}
Expand All @@ -119,12 +134,15 @@ func (p Postgres) GetPartitionSettings(schema, table string) (strategy, key stri
// pg_get_partkeydef() is a system function returning the definition of a partitioning key
// It return a text string: <partitioningStrategy> (<partitioning key definition>)
// Example for RANGE (created_at)
query := fmt.Sprintf(`
query := `
SELECT regexp_match(partkeydef, '^(.*) \((.*)\)$')
FROM pg_catalog.pg_get_partkeydef('%s.%s'::regclass) as partkeydef
`, schema, table)
FROM pg_catalog.pg_get_partkeydef((SELECT c.oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relname = $2 AND c.relkind='p')) as partkeydef
`

err = p.conn.QueryRow(p.ctx, query).Scan(&partkeydef)
err = p.conn.QueryRow(p.ctx, query, schema, table).Scan(&partkeydef)
if err != nil {
p.logger.Warn("failed to get partitioning key", "error", err, "schema", schema, "table", table)

Expand Down Expand Up @@ -153,7 +171,8 @@ func (p Postgres) SetPartitionReplicaIdentity(schema, table, parent string) erro
}

if replIdent == "f" { // replica identity = full
queryFull := fmt.Sprintf("ALTER TABLE %s.%s REPLICA IDENTITY FULL", schema, table)
queryFull := fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY FULL",
pgx.Identifier{schema, table}.Sanitize())
p.logger.Debug("Set identity full", "query", queryFull)

_, err = p.conn.Exec(p.ctx, queryFull)
Expand Down Expand Up @@ -184,7 +203,9 @@ SELECT c_idx_child.relname
return fmt.Errorf("failed to find the child index for the new partition: %w", err)
}

queryAlter := fmt.Sprintf("ALTER TABLE %s.%s REPLICA IDENTITY USING INDEX %s", schema, table, indexName)
queryAlter := fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY USING INDEX %s",
pgx.Identifier{schema, table}.Sanitize(),
pgx.Identifier{indexName}.Sanitize())
p.logger.Debug("Set replica identity", "schema", schema, "table", table, "index", indexName, "query", queryAlter)

_, err := p.conn.Exec(p.ctx, queryAlter)
Expand Down
42 changes: 25 additions & 17 deletions internal/infra/postgresql/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/jackc/pgx/v5"
"github.com/pashagolub/pgxmock/v3"
"github.com/qonto/postgresql-partition-manager/internal/infra/postgresql"
"github.com/stretchr/testify/assert"
Expand All @@ -15,29 +16,29 @@ func generateTable(t *testing.T) (schema, table, fullQualifiedTable, parent stri

schema = "public"
table = "my_table"
fullQualifiedTable = fmt.Sprintf("%s.%s", schema, table)
fullQualifiedTable = pgx.Identifier{schema, table}.Sanitize()
parent = "my_parent_table"

return
}

func TestIsPartitionAttached(t *testing.T) {
schema, table, fullQualifiedTable, _ := generateTable(t)
schema, table, _, _ := generateTable(t)

mock, p := setupMock(t, pgxmock.QueryMatcherRegexp)
query := "SELECT EXISTS"

mock.ExpectQuery(query).WithArgs(fullQualifiedTable).WillReturnRows(mock.NewRows([]string{"EXISTS"}).AddRow(true))
mock.ExpectQuery(query).WithArgs(schema, table).WillReturnRows(mock.NewRows([]string{"EXISTS"}).AddRow(true))
exists, err := p.IsPartitionAttached(schema, table)
assert.Nil(t, err, "IsPartitionAttached should succeed")
assert.True(t, exists, "Table should be attached")

mock.ExpectQuery(query).WithArgs(fullQualifiedTable).WillReturnRows(mock.NewRows([]string{"EXISTS"}).AddRow(false))
mock.ExpectQuery(query).WithArgs(schema, table).WillReturnRows(mock.NewRows([]string{"EXISTS"}).AddRow(false))
exists, err = p.IsPartitionAttached(schema, table)
assert.Nil(t, err, "IsPartitionAttached should succeed")
assert.False(t, exists, "Table should not be attached")

mock.ExpectQuery(query).WithArgs(fullQualifiedTable).WillReturnError(ErrPostgreSQLConnectionFailure)
mock.ExpectQuery(query).WithArgs(schema, table).WillReturnError(ErrPostgreSQLConnectionFailure)
_, err = p.IsPartitionAttached(schema, table)
assert.Error(t, err, "IsPartitionAttached should fail")
}
Expand All @@ -48,7 +49,10 @@ func TestAttachPartition(t *testing.T) {
upperBound := "2024-01-31"

mock, p := setupMock(t, pgxmock.QueryMatcherEqual)
query := fmt.Sprintf(`ALTER TABLE %s.%s ATTACH PARTITION %s.%s FOR VALUES FROM ('%s') TO ('%s')`, schema, parent, schema, table, lowerBound, upperBound)
query := fmt.Sprintf(`ALTER TABLE %s ATTACH PARTITION %s FOR VALUES FROM ('%s') TO ('%s')`,
pgx.Identifier{schema, parent}.Sanitize(),
pgx.Identifier{schema, table}.Sanitize(),
lowerBound, upperBound)

mock.ExpectExec(query).WillReturnResult(pgxmock.NewResult("ALTER", 1))
err := p.AttachPartition(schema, table, parent, lowerBound, upperBound)
Expand All @@ -63,7 +67,9 @@ func TestDetachPartitionConcurrently(t *testing.T) {
schema, table, _, parent := generateTable(t)

mock, p := setupMock(t, pgxmock.QueryMatcherEqual)
query := fmt.Sprintf(`ALTER TABLE %s.%s DETACH PARTITION %s.%s CONCURRENTLY`, schema, parent, schema, table)
query := fmt.Sprintf(`ALTER TABLE %s DETACH PARTITION %s CONCURRENTLY`,
pgx.Identifier{schema, parent}.Sanitize(),
pgx.Identifier{schema, table}.Sanitize())

mock.ExpectExec(query).WillReturnResult(pgxmock.NewResult("ALTER", 1))
err := p.DetachPartitionConcurrently(schema, table, parent)
Expand All @@ -79,7 +85,9 @@ func TestFinalizePartitionDetach(t *testing.T) {

mock, p := setupMock(t, pgxmock.QueryMatcherEqual)

query := fmt.Sprintf(`ALTER TABLE %s.%s DETACH PARTITION %s.%s FINALIZE`, schema, parent, schema, table)
query := fmt.Sprintf(`ALTER TABLE %s DETACH PARTITION %s FINALIZE`,
pgx.Identifier{schema, parent}.Sanitize(),
pgx.Identifier{schema, table}.Sanitize())

mock.ExpectExec(query).WillReturnResult(pgxmock.NewResult("ALTER", 1))
err := p.FinalizePartitionDetach(schema, table, parent)
Expand All @@ -99,18 +107,18 @@ func TestGetPartitionSettings(t *testing.T) {

query := `SELECT regexp_match`

mock.ExpectQuery(query).WillReturnRows(mock.NewRows([]string{"partkeydef"}).AddRow([]string{expectedStrategy, expectedKey}))
mock.ExpectQuery(query).WithArgs(schema, table).WillReturnRows(mock.NewRows([]string{"partkeydef"}).AddRow([]string{expectedStrategy, expectedKey}))
strategy, key, err := p.GetPartitionSettings(schema, table)
assert.Nil(t, err, "GetPartitionSettings should succeed")
assert.Equal(t, strategy, expectedStrategy, "Strategy should match")
assert.Equal(t, key, expectedKey, "Key should match")

mock.ExpectQuery(query).WillReturnRows(mock.NewRows([]string{"partkeydef"}).AddRow([]string{}))
mock.ExpectQuery(query).WithArgs(schema, table).WillReturnRows(mock.NewRows([]string{"partkeydef"}).AddRow([]string{}))
_, _, err = p.GetPartitionSettings(schema, table)
assert.Error(t, err, "GetPartitionSettings should fail")
assert.ErrorIs(t, err, postgresql.ErrTableIsNotPartitioned)

mock.ExpectQuery(query).WillReturnError(ErrPostgreSQLConnectionFailure)
mock.ExpectQuery(query).WithArgs(schema, table).WillReturnError(ErrPostgreSQLConnectionFailure)
_, _, err = p.GetPartitionSettings(schema, table)
assert.Error(t, err, "GetPartitionSettings should fail")
}
Expand Down Expand Up @@ -142,17 +150,17 @@ func TestListPartitions(t *testing.T) {
for _, p := range expectedPartitions {
rows.AddRow(p.Schema, p.Name, p.ParentTable, p.LowerBound, p.UpperBound)
}
mock.ExpectQuery(query).WillReturnRows(rows)
result, err := p.ListPartitions(schema, table)
mock.ExpectQuery(query).WithArgs(schema, parent).WillReturnRows(rows)
result, err := p.ListPartitions(schema, parent)
assert.Nil(t, err, "ListPartitions should succeed")
assert.Equal(t, result, expectedPartitions, "Partitions should be match")

rows = mock.NewRows([]string{"invalidColumn"}).AddRow("invalidColumn")
mock.ExpectQuery(query).WillReturnRows(rows)
_, err = p.ListPartitions(schema, table)
mock.ExpectQuery(query).WithArgs(schema, parent).WillReturnRows(rows)
_, err = p.ListPartitions(schema, parent)
assert.Error(t, err, "ListPartitions should fail")

mock.ExpectQuery(query).WillReturnError(ErrPostgreSQLConnectionFailure)
_, err = p.ListPartitions(schema, table)
mock.ExpectQuery(query).WithArgs(schema, parent).WillReturnError(ErrPostgreSQLConnectionFailure)
_, err = p.ListPartitions(schema, parent)
assert.Error(t, err, "ListPartitions should fail")
}
10 changes: 8 additions & 2 deletions internal/infra/postgresql/table.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package postgresql

import "fmt"
import (
"fmt"

"github.com/jackc/pgx/v5"
)

func (p Postgres) CreateTableLikeTable(schema, table, parent string) error {
query := fmt.Sprintf("CREATE TABLE %s.%s (LIKE %s.%s INCLUDING ALL)", schema, table, schema, parent)
query := fmt.Sprintf("CREATE TABLE %s (LIKE %s INCLUDING ALL)",
pgx.Identifier{schema, table}.Sanitize(),
pgx.Identifier{schema, parent}.Sanitize())
p.logger.Debug("Create table", "schema", schema, "table", table, "query", query)

_, err := p.conn.Exec(p.ctx, query)
Expand Down
5 changes: 4 additions & 1 deletion internal/infra/postgresql/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/jackc/pgx/v5"
"github.com/pashagolub/pgxmock/v3"
"github.com/stretchr/testify/assert"
)
Expand All @@ -14,7 +15,9 @@ func TestCreateTableLikeTable(t *testing.T) {
table := "my_table"
parentTable := "parent_table"

query := fmt.Sprintf(`CREATE TABLE %s.%s (LIKE %s.%s INCLUDING ALL)`, schema, table, schema, parentTable)
query := fmt.Sprintf("CREATE TABLE %s (LIKE %s INCLUDING ALL)",
pgx.Identifier{schema, table}.Sanitize(),
pgx.Identifier{schema, parentTable}.Sanitize())

mock, p := setupMock(t, pgxmock.QueryMatcherEqual)

Expand Down
33 changes: 33 additions & 0 deletions scripts/bats/30_provisioning.bats
Original file line number Diff line number Diff line change
Expand Up @@ -474,5 +474,38 @@ EOF
run execute_sql_commands "$check_query"
assert_output "{i,i,i,i,f}"

rm "$CONFIGURATION_FILE"
}

@test "Test provisioning with special chars in the table name" {
local CONFIGURATION=$(cat << EOF
partitions:
unittest1:
schema: public
table: table's name
interval: daily
partitionKey: created_at
cleanupPolicy: detach
retention: 1
preProvisioned: 1
EOF
)
local CONFIGURATION_FILE=$(generate_configuration_file "${CONFIGURATION}")

create_partitioned_table "\"table's name\""

PPM_WORK_DATE="2025-02-01" run "$PPM_PROG" run provisioning -c ${CONFIGURATION_FILE}
assert_success

local expected=$(cat <<'EOF'
public|table's name_2025_01_31|2025-01-31|2025-02-01
public|table's name_2025_02_01|2025-02-01|2025-02-02
public|table's name_2025_02_02|2025-02-02|2025-02-03
EOF
)
run list_existing_partitions "public" "table's name"
assert_output "$expected"


rm "$CONFIGURATION_FILE"
}
8 changes: 4 additions & 4 deletions scripts/bats/test/libs/sql.bash
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ EOSQL

list_existing_partitions() {
# mandatory arguments
local PARENT_SCHEMA=$1
local PARENT_TABLE=$2
local PARENT_SCHEMA="$1"
local PARENT_TABLE="$2"

psql --tuples-only --no-align --quiet --dbname="$PPM_DATABASE" -v parent_schema=${PARENT_SCHEMA} -v parent_table=${PARENT_TABLE} <<'EOSQL'
psql --tuples-only --no-align --quiet --dbname="$PPM_DATABASE" -v parent_schema="${PARENT_SCHEMA}" -v parent_table="${PARENT_TABLE}" <<'EOSQL'
WITH parts as (
SELECT
relnamespace::regnamespace as schema,
c.oid::pg_catalog.regclass AS part_name,
c.relname AS part_name,
regexp_match(pg_get_expr(c.relpartbound, c.oid),
'FOR VALUES FROM \(''(.*)''\) TO \(''(.*)''\)') AS bounds
FROM
Expand Down
Loading