Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
with:
filename: coverage.xml
badge: true
fail_below_min: true
fail_below_min: false
format: markdown
hide_branch_rate: false
hide_complexity: true
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ Partition object:

See the [full configuration file](configs/postgresql-partition-manager/postgresql-partition-manager.yaml).

## Work date
By default, the provisioning and cleanup evaluate what to do at the current date. For testing purposes, a different date can be set through the environment variable `PPM_WORK_DATE` (format: `YYYY-MM-DD`).

## Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Expand Down
17 changes: 16 additions & 1 deletion cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/qonto/postgresql-partition-manager/internal/infra/config"
"github.com/qonto/postgresql-partition-manager/internal/infra/logger"
Expand All @@ -22,6 +23,7 @@ const (
PartitionsProvisioningFailedExitCode = 4
PartitionsCheckFailedExitCode = 5
PartitionsCleanupFailedExitCode = 6
InvalidDateExitCode = 7
)

var ErrUnsupportedPostgreSQLVersion = errors.New("unsupported PostgreSQL version")
Expand Down Expand Up @@ -120,7 +122,20 @@ func initCmd() *ppm.PPM {

db := postgresql.New(*log, conn)

client := ppm.New(context.TODO(), *log, db, config.Partitions)
workDate := time.Now()
stringDate, useExternalDate := os.LookupEnv("PPM_WORK_DATE")

if useExternalDate {
workDate, err = time.Parse(time.DateOnly, stringDate)
if err != nil {
log.Error("Could not parse PPM_WORK_DATE environment variable", "error", err)
os.Exit(InvalidDateExitCode)
}
}

log.Info("Work date", "work-date", workDate)

client := ppm.New(context.TODO(), *log, db, config.Partitions, workDate)

if err = client.CheckServerRequirements(); err != nil {
log.Error("Server is incompatible", "error", err)
Expand Down
53 changes: 53 additions & 0 deletions internal/infra/partition/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package partition

import (
"errors"
"fmt"
"log/slog"
"time"

"github.com/google/uuid"
Expand All @@ -22,6 +24,57 @@ var (
ErrUnsupportedUUIDVersion = errors.New("unsupported UUID version")
)

type PartitionRange struct {
LowerBound time.Time
UpperBound time.Time
}

// Bounds provides a concise way to create a PartitionRange
func Bounds(lBound, uBound time.Time) PartitionRange {
return PartitionRange{LowerBound: lBound, UpperBound: uBound}
}

func (r PartitionRange) String() string {
return fmt.Sprintf("[ %s , %s ]", r.LowerBound.Format("02-01-2006"), r.UpperBound.Format("02-01-2006"))
}

func (r PartitionRange) LogValue() slog.Value {
return slog.StringValue(r.String())
}

func (r PartitionRange) IsEmpty() bool {
/* IsEmpty() is true when
- either LowerBound.IsZero() is true and UpperBound.IsZero() is true
- either the bounds are set (non-zero) but equal
*/
return r.LowerBound.Equal(r.UpperBound)
}

func (r PartitionRange) IsEqual(r2 PartitionRange) bool {
return r.LowerBound.Equal(r2.LowerBound) && r.UpperBound.Equal(r2.UpperBound)
}

// Intersection returns the intersection between the intervals r1 and r2
func (r PartitionRange) Intersection(r2 PartitionRange) PartitionRange {
var res PartitionRange // initialized with {time.Time{}, time.Time{}}

if !(r2.LowerBound.After(r.UpperBound) || r.LowerBound.After(r2.UpperBound)) { // !empty intersection
if r.LowerBound.After(r2.LowerBound) {
res.LowerBound = r.LowerBound
} else {
res.LowerBound = r2.LowerBound
}

if r.UpperBound.Before(r2.UpperBound) {
res.UpperBound = r.UpperBound
} else {
res.UpperBound = r2.UpperBound
}
}

return res
}

func getDailyBounds(date time.Time) (lowerBound, upperBound time.Time) {
lowerBound = time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, date.UTC().Location())
upperBound = lowerBound.AddDate(0, 0, 1)
Expand Down
65 changes: 62 additions & 3 deletions pkg/ppm/checkpartition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"slices"
"sort"
"time"

"github.com/qonto/postgresql-partition-manager/internal/infra/partition"
Expand All @@ -16,6 +17,8 @@ var (
ErrPartitionKeyMismatch = errors.New("mismatch of partition keys between parameters and table")
ErrUnexpectedOrMissingPartitions = errors.New("unexpected or missing partitions")
ErrInvalidPartitionConfiguration = errors.New("at least one partition contains an invalid configuration")
ErrPartitionGap = errors.New("gap found in partitions")
ErrIncoherentBounds = errors.New("lower bound greater or equal than upper bound")
)

var SupportedPartitionKeyDataType = []postgresql.ColumnType{
Expand Down Expand Up @@ -178,9 +181,7 @@ func (p *PPM) ListPartitions(schema, table string) (partitions []partition.Parti
func (p *PPM) checkPartitionsConfiguration(config partition.Configuration) error {
partitionContainAnError := false

currentTime := time.Now()

expectedPartitions, err := getExpectedPartitions(config, currentTime)
expectedPartitions, err := getExpectedPartitions(config, p.workDate)
if err != nil {
return fmt.Errorf("could not generate expected partitions: %w", err)
}
Expand All @@ -190,6 +191,20 @@ func (p *PPM) checkPartitionsConfiguration(config partition.Configuration) error
return fmt.Errorf("could not list partitions: %w", err)
}

existingRange, err := p.getGlobalRange(foundPartitions)
if err != nil {
return fmt.Errorf("incorrect set of existing partitions: %w", err)
}

p.logger.Info("Existing range", "range", existingRange)

expectedRange, err := p.getGlobalRange(expectedPartitions)
if err != nil {
return fmt.Errorf("incorrect set of expected partitions: %w", err)
}

p.logger.Info("Expected range", "expected", expectedRange)

unexpected, missing, incorrectBound := p.comparePartitions(foundPartitions, expectedPartitions)

if len(unexpected) > 0 {
Expand All @@ -216,3 +231,47 @@ func (p *PPM) checkPartitionsConfiguration(config partition.Configuration) error

return nil
}

/* Return the lower/upper bound of all partitions combined. Any discontinuity is an error */
func (p *PPM) getGlobalRange(partitions []partition.Partition) (r partition.PartitionRange, err error) {
var minBound, maxBound time.Time

/* sort by lower bounds */
sort.Slice(partitions, func(i, j int) bool {
return partitions[i].LowerBound.Before(partitions[j].LowerBound)
})

/* check continuity */
for i, part := range partitions {
if i == 0 {
minBound = part.LowerBound
maxBound = part.UpperBound
} else {
if part.LowerBound.Before(minBound) {
minBound = part.LowerBound
}

if part.UpperBound.After(maxBound) {
maxBound = part.UpperBound
}
}

if i > 0 && (partitions[i-1].UpperBound != part.LowerBound) {
/* a gap has been detected between the ranges of consecutive partitions */
p.logger.Error("Partition Gap", "lower end", partitions[i-1].UpperBound, "upper end", part.LowerBound)

return partition.PartitionRange{LowerBound: minBound, UpperBound: maxBound}, ErrPartitionGap
}

if part.LowerBound.After(part.UpperBound) || part.LowerBound.Equal(part.UpperBound) {
/* the lower bound is greater or equal than
the upper bound: this should never happen
for existing partitions */
p.logger.Error("Partition Gap", "lower end", part.LowerBound, "upper end", part.UpperBound)

return partition.PartitionRange{LowerBound: minBound, UpperBound: maxBound}, ErrIncoherentBounds
}
}

return partition.PartitionRange{LowerBound: minBound, UpperBound: maxBound}, nil
}
6 changes: 3 additions & 3 deletions pkg/ppm/checkpartition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestCheckPartitions(t *testing.T) {
postgreSQLMock.On("ListPartitions", p.Schema, p.Table).Return(convertedTables, nil).Once()
}

checker := ppm.New(context.TODO(), *logger, postgreSQLMock, partitions)
checker := ppm.New(context.TODO(), *logger, postgreSQLMock, partitions, time.Now())
assert.NilError(t, checker.CheckPartitions(), "Partitions should succeed")
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestCheckMissingPartitions(t *testing.T) {
tables := partitionResultToPartition(t, tc.tables, boundDateFormat)
postgreSQLMock.On("ListPartitions", config.Schema, config.Table).Return(tables, nil).Once()

checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config})
checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config}, time.Now())
assert.Error(t, checker.CheckPartitions(), "at least one partition contains an invalid configuration")
})
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestUnsupportedPartitionsStrategy(t *testing.T) {
postgreSQLMock.On("GetColumnDataType", config.Schema, config.Table, config.PartitionKey).Return(postgresql.Date, nil).Once()
postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(tc.strategy), tc.key, nil).Once()

checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config})
checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config}, time.Now())
assert.Error(t, checker.CheckPartitions(), "at least one partition contains an invalid configuration")
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ppm/checkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestServerRequirements(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// Reset mock on every test case
logger, postgreSQLMock := setupMocks(t)
checker := ppm.New(context.TODO(), *logger, postgreSQLMock, nil)
checker := ppm.New(context.TODO(), *logger, postgreSQLMock, nil, time.Now())

postgreSQLMock.On("GetEngineVersion").Return(tc.serverVersion, nil).Once()
postgreSQLMock.On("GetServerTime").Return(tc.serverTime, nil).Once()
Expand Down
60 changes: 41 additions & 19 deletions pkg/ppm/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ppm
import (
"errors"
"fmt"
"time"

partition_pkg "github.com/qonto/postgresql-partition-manager/internal/infra/partition"
"github.com/qonto/postgresql-partition-manager/internal/infra/retry"
Expand All @@ -12,47 +11,70 @@ import (
var ErrPartitionCleanupFailed = errors.New("at least one partition could not be cleaned")

func (p PPM) CleanupPartitions() error {
currentTime := time.Now()
partitionContainAnError := false

for name, config := range p.partitions {
p.logger.Info("Cleaning partition", "partition", name)

expectedPartitions, err := getExpectedPartitions(config, currentTime)
// Existing
foundPartitions, err := p.ListPartitions(config.Schema, config.Table)
if err != nil {
return fmt.Errorf("could not generate expected partitions: %w", err)
return fmt.Errorf("could not list partitions: %w", err)
}

foundPartitions, err := p.ListPartitions(config.Schema, config.Table)
currentRange, err := p.getGlobalRange(foundPartitions)
if err != nil {
return fmt.Errorf("could not list partitions: %w", err)
return fmt.Errorf("could not evaluate existing ranges: %w", err)
}

unexpected, _, _ := p.comparePartitions(foundPartitions, expectedPartitions)
p.logger.Info("Current ", "c_range", currentRange.String())

for _, partition := range unexpected {
err := p.DetachPartition(partition)
if err != nil {
partitionContainAnError = true
// Expected
expectedPartitions, err := getExpectedPartitions(config, p.workDate)
if err != nil {
return fmt.Errorf("could not generate expected partitions: %w", err)
}

p.logger.Error("Failed to detach partition", "schema", partition.Schema, "table", partition.Name, "error", err)
expectedRange, err := p.getGlobalRange(expectedPartitions)
if err != nil {
return fmt.Errorf("could not evaluate ranges to create: %w", err)
}

continue
}
p.logger.Info("Expected", "e_range", expectedRange)

if expectedRange.IsEqual(currentRange) {
continue // nothing to do on this partition set
}

p.logger.Info("Partition detached", "schema", partition.Schema, "table", partition.Name, "parent_table", partition.ParentTable)
// Each partition whose bounds are entirely outside of expectedRange can be removed

if config.CleanupPolicy == partition_pkg.Drop {
err := p.DeletePartition(partition)
for _, part := range foundPartitions {
if !part.UpperBound.After(expectedRange.LowerBound) || !part.LowerBound.Before(expectedRange.UpperBound) {
p.logger.Info("No intersection", "remove-range", partition_pkg.Bounds(part.LowerBound, part.UpperBound))

err := p.DetachPartition(part)
if err != nil {
partitionContainAnError = true

p.logger.Error("Failed to delete partition", "schema", partition.Schema, "table", partition.Name, "error", err)
p.logger.Error("Failed to detach partition", "schema", part.Schema, "table", part.Name, "error", err)

continue
}

p.logger.Info("Partition deleted", "schema", partition.Schema, "table", partition.Name, "parent_table", partition.ParentTable)
p.logger.Info("Partition detached", "schema", part.Schema, "table", part.Name, "parent_table", part.ParentTable)

if config.CleanupPolicy == partition_pkg.Drop {
err := p.DeletePartition(part)
if err != nil {
partitionContainAnError = true

p.logger.Error("Failed to delete partition", "schema", part.Schema, "table", part.Name, "error", err)

continue
}

p.logger.Info("Partition deleted", "schema", part.Schema, "table", part.Name, "parent_table", part.ParentTable)
}
}
}
}
Expand Down
Loading
Loading