Skip to content

Commit 05f5fde

Browse files
committed
Introduce manually managed partitions
The data used as a partition key is not always of the greatest quality. For instance, a timestamp column could contain some invalid dates referring to year -1. Ideally these values should be cleaned, but it is not always an option in a system where data is treated as immutable. One way to deal with this situation is to add a default partition that will be in charge of containing data that cannot be assigned to any of the partitions already defined. Another solution, when the range of invalid values is known, is to move this data inside one or several dedicated partitions. Both solutions involve the creation of partitions that cannot be managed by the partition manager, because their range break the policy defined for the other partitions. For this reason we propose to introduce a new configuration property: `manuallyManagedPartitions`. It indicates which partitions should be ignored by the partition manager when checking or cleaning up partitions tables. Signed-off-by: Yohan Legat <legat.yohan@gmail.com>
1 parent ba9075f commit 05f5fde

File tree

5 files changed

+173
-23
lines changed

5 files changed

+173
-23
lines changed

internal/infra/partition/configuration.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@ const (
1515
)
1616

1717
type Configuration struct {
18-
Schema string `mapstructure:"schema" validate:"required"`
19-
Table string `mapstructure:"table" validate:"required"`
20-
PartitionKey string `mapstructure:"partitionKey" validate:"required"`
21-
Interval Interval `mapstructure:"interval" validate:"required,oneof=daily weekly monthly quarterly yearly"`
22-
Retention int `mapstructure:"retention" validate:"required,gt=0"`
23-
PreProvisioned int `mapstructure:"preProvisioned" validate:"required,gt=0"`
24-
CleanupPolicy CleanupPolicy `mapstructure:"cleanupPolicy" validate:"required,oneof=drop detach"`
18+
Schema string `mapstructure:"schema" validate:"required"`
19+
Table string `mapstructure:"table" validate:"required"`
20+
PartitionKey string `mapstructure:"partitionKey" validate:"required"`
21+
Interval Interval `mapstructure:"interval" validate:"required,oneof=daily weekly monthly quarterly yearly"`
22+
Retention int `mapstructure:"retention" validate:"required,gt=0"`
23+
PreProvisioned int `mapstructure:"preProvisioned" validate:"required,gt=0"`
24+
CleanupPolicy CleanupPolicy `mapstructure:"cleanupPolicy" validate:"required,oneof=drop detach"`
25+
ManuallyManagedPartitions []string `mapstructure:"manuallyManagedPartitions"`
2526
}
2627

2728
func (p Configuration) GeneratePartition(forDate time.Time) (Partition, error) {

pkg/ppm/checkpartition.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,17 @@ func (p *PPM) checkPartitionKey(config partition.Configuration) error {
7777
return fmt.Errorf("failed to get partition settings: %w", err)
7878
}
7979

80-
p.logger.Debug("Partition configuration found", "schema", config.Schema, "table", config.Table, "partition_key", config.PartitionKey, "partition_key_type", keyDataType, "partition_strategy", partitionStrategy)
80+
p.logger.Debug("Partition configuration found",
81+
"schema",
82+
config.Schema,
83+
"table",
84+
config.Table,
85+
"partition_key",
86+
config.PartitionKey,
87+
"partition_key_type",
88+
keyDataType,
89+
"partition_strategy",
90+
partitionStrategy)
8191

8292
if partitionKey != config.PartitionKey {
8393
p.logger.Warn("Partition key mismatch", "expected", config.PartitionKey, "current", partitionKey)
@@ -108,7 +118,10 @@ func IsSupportedKeyDataType(dataType postgresql.ColumnType) bool {
108118
return slices.Contains(SupportedPartitionKeyDataType, dataType)
109119
}
110120

111-
func (p *PPM) comparePartitions(existingTables, expectedTables []partition.Partition) (unexpectedTables, missingTables, incorrectBounds []partition.Partition) {
121+
func (p *PPM) comparePartitions(existingTables,
122+
expectedTables []partition.Partition,
123+
manuallyManagedPartitionNames []string,
124+
) (unexpectedTables, missingTables, incorrectBounds []partition.Partition) {
112125
existing := make(map[string]partition.Partition)
113126
expectedAndExists := make(map[string]bool)
114127

@@ -124,13 +137,29 @@ func (p *PPM) comparePartitions(existingTables, expectedTables []partition.Parti
124137
if existing[t.Name].UpperBound != t.UpperBound {
125138
incorrectBound = true
126139

127-
p.logger.Warn("Incorrect upper partition bound", "schema", t.Schema, "table", t.Name, "current_bound", existing[t.Name].UpperBound, "expected_bound", t.UpperBound)
140+
p.logger.Warn("Incorrect upper partition bound",
141+
"schema",
142+
t.Schema,
143+
"table",
144+
t.Name,
145+
"current_bound",
146+
existing[t.Name].UpperBound,
147+
"expected_bound",
148+
t.UpperBound)
128149
}
129150

130151
if existing[t.Name].LowerBound != t.LowerBound {
131152
incorrectBound = true
132153

133-
p.logger.Warn("Incorrect lower partition bound", "schema", t.Schema, "table", t.Name, "current_bound", existing[t.Name].LowerBound, "expected_bound", t.LowerBound)
154+
p.logger.Warn("Incorrect lower partition bound",
155+
"schema",
156+
t.Schema,
157+
"table",
158+
t.Name,
159+
"current_bound",
160+
existing[t.Name].LowerBound,
161+
"expected_bound",
162+
t.LowerBound)
134163
}
135164

136165
if incorrectBound {
@@ -143,8 +172,18 @@ func (p *PPM) comparePartitions(existingTables, expectedTables []partition.Parti
143172

144173
for _, t := range existingTables {
145174
if _, found := expectedAndExists[t.Name]; !found {
146-
// Only in existingTables and not in both
147-
unexpectedTables = append(unexpectedTables, t)
175+
isPartitionManuallyManaged := false
176+
177+
for _, manuallyManagedPartition := range manuallyManagedPartitionNames {
178+
if manuallyManagedPartition == t.Name {
179+
isPartitionManuallyManaged = true
180+
}
181+
}
182+
183+
if !isPartitionManuallyManaged {
184+
// Only in existingTables and not in both
185+
unexpectedTables = append(unexpectedTables, t)
186+
}
148187
}
149188
}
150189

@@ -190,7 +229,9 @@ func (p *PPM) checkPartitionsConfiguration(config partition.Configuration) error
190229
return fmt.Errorf("could not list partitions: %w", err)
191230
}
192231

193-
unexpected, missing, incorrectBound := p.comparePartitions(foundPartitions, expectedPartitions)
232+
unexpected, missing, incorrectBound := p.comparePartitions(foundPartitions,
233+
expectedPartitions,
234+
config.ManuallyManagedPartitions)
194235

195236
if len(unexpected) > 0 {
196237
partitionContainAnError = true

pkg/ppm/checkpartition_test.go

Lines changed: 106 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ func TestCheckPartitions(t *testing.T) {
101101

102102
postgreSQLMock.On("GetColumnDataType", p.Schema, p.Table, p.PartitionKey).Return(postgresql.Date, nil).Once()
103103

104-
postgreSQLMock.On("GetPartitionSettings", p.Schema, p.Table).Return(string(partition.Range), p.PartitionKey, nil).Once()
104+
postgreSQLMock.On("GetPartitionSettings", p.Schema, p.Table).Return(string(partition.Range),
105+
p.PartitionKey,
106+
nil).Once()
105107

106108
convertedTables := partitionResultToPartition(t, tables, boundDateFormat)
107109
postgreSQLMock.On("ListPartitions", p.Schema, p.Table).Return(convertedTables, nil).Once()
@@ -158,13 +160,21 @@ func TestCheckMissingPartitions(t *testing.T) {
158160
for _, tc := range testCases {
159161
t.Run(tc.name, func(t *testing.T) {
160162
fmt.Println("tc.tables", tc.tables)
161-
postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(partition.Range), config.PartitionKey, nil).Once()
162-
postgreSQLMock.On("GetColumnDataType", config.Schema, config.Table, config.PartitionKey).Return(postgresql.Date, nil).Once()
163+
postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(partition.Range),
164+
config.PartitionKey,
165+
nil).Once()
166+
postgreSQLMock.On("GetColumnDataType",
167+
config.Schema,
168+
config.Table,
169+
config.PartitionKey).Return(postgresql.Date, nil).Once()
163170

164171
tables := partitionResultToPartition(t, tc.tables, boundDateFormat)
165172
postgreSQLMock.On("ListPartitions", config.Schema, config.Table).Return(tables, nil).Once()
166173

167-
checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config})
174+
checker := ppm.New(context.TODO(),
175+
*logger,
176+
postgreSQLMock,
177+
map[string]partition.Configuration{"test": config})
168178
assert.Error(t, checker.CheckPartitions(), "at least one partition contains an invalid configuration")
169179
})
170180
}
@@ -201,11 +211,99 @@ func TestUnsupportedPartitionsStrategy(t *testing.T) {
201211

202212
for _, tc := range testCases {
203213
t.Run(tc.name, func(t *testing.T) {
204-
postgreSQLMock.On("GetColumnDataType", config.Schema, config.Table, config.PartitionKey).Return(postgresql.Date, nil).Once()
205-
postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(tc.strategy), tc.key, nil).Once()
206-
207-
checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config})
214+
postgreSQLMock.On("GetColumnDataType",
215+
config.Schema,
216+
config.Table,
217+
config.PartitionKey).Return(postgresql.Date, nil).Once()
218+
postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(tc.strategy),
219+
tc.key,
220+
nil).Once()
221+
222+
checker := ppm.New(context.TODO(),
223+
*logger,
224+
postgreSQLMock,
225+
map[string]partition.Configuration{"test": config})
208226
assert.Error(t, checker.CheckPartitions(), "at least one partition contains an invalid configuration")
209227
})
210228
}
211229
}
230+
231+
func TestPPM_comparePartitions(t *testing.T) {
232+
p := partition.Partition{
233+
ParentTable: "ParentTable",
234+
Schema: "Schema",
235+
Name: "Name",
236+
}
237+
238+
type result struct {
239+
unexpectedTables []partition.Partition
240+
missingTables []partition.Partition
241+
incorrectBounds []partition.Partition
242+
}
243+
244+
tests := []struct {
245+
name string
246+
existingTables []partition.Partition
247+
expectedTables []partition.Partition
248+
manuallyManagedPartitionNames []string
249+
result result
250+
}{
251+
{
252+
name: "all existing is expected",
253+
existingTables: []partition.Partition{
254+
p,
255+
},
256+
expectedTables: []partition.Partition{
257+
p,
258+
},
259+
result: result{},
260+
},
261+
{
262+
name: "manually managed partition",
263+
existingTables: []partition.Partition{
264+
p,
265+
},
266+
manuallyManagedPartitionNames: []string{"Name"},
267+
result: result{},
268+
},
269+
{
270+
name: "missing table",
271+
expectedTables: []partition.Partition{
272+
p,
273+
},
274+
result: result{
275+
missingTables: []partition.Partition{
276+
p,
277+
},
278+
},
279+
},
280+
{
281+
name: "unexpected table",
282+
existingTables: []partition.Partition{
283+
p,
284+
},
285+
result: result{
286+
unexpectedTables: []partition.Partition{
287+
p,
288+
},
289+
},
290+
},
291+
}
292+
for _, tt := range tests {
293+
t.Run(tt.name, func(t *testing.T) {
294+
p := ppm.PPM{}
295+
gotUnexpectedTables, gotMissingTables, gotIncorrectBounds := ppm.ComparePartitions(&p, tt.existingTables,
296+
tt.expectedTables,
297+
tt.manuallyManagedPartitionNames)
298+
assert.DeepEqual(t,
299+
tt.result.unexpectedTables,
300+
gotUnexpectedTables)
301+
assert.DeepEqual(t,
302+
tt.result.missingTables,
303+
gotMissingTables)
304+
assert.DeepEqual(t,
305+
tt.result.incorrectBounds,
306+
gotIncorrectBounds)
307+
})
308+
}
309+
}

pkg/ppm/cleanup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (p PPM) CleanupPartitions() error {
2828
return fmt.Errorf("could not list partitions: %w", err)
2929
}
3030

31-
unexpected, _, _ := p.comparePartitions(foundPartitions, expectedPartitions)
31+
unexpected, _, _ := p.comparePartitions(foundPartitions, expectedPartitions, config.ManuallyManagedPartitions)
3232

3333
for _, partition := range unexpected {
3434
err := p.DetachPartition(partition)

pkg/ppm/export_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package ppm
2+
3+
import "github.com/qonto/postgresql-partition-manager/internal/infra/partition"
4+
5+
func ComparePartitions(p *PPM,
6+
existingTables, expectedTables []partition.Partition,
7+
manuallyManagedPartitionNames []string,
8+
) (unexpectedTables, missingTables, incorrectBounds []partition.Partition) {
9+
return p.comparePartitions(existingTables, expectedTables, manuallyManagedPartitionNames)
10+
}

0 commit comments

Comments
 (0)