Skip to content

Commit

Permalink
Introduce manually managed partitions
Browse files Browse the repository at this point in the history
The data used as a partition key is not always of the greatest quality. For instance, a timestamp column could contain some invalid dates referring to year -1. Ideally these values should be cleaned, but it is not always an option in a system where data is treated as immutable.

One way to deal with this situation is to add a default partition that will be in charge of containing data that cannot be assigned to any of the partitions already defined. Another solution, when the range of invalid values is known, is to move this data inside one or several dedicated partitions.

Both solutions involve the creation of partitions that cannot be managed by the partition manager, because their range break the policy defined for the other partitions.

For this reason we propose to introduce a new configuration property: `manuallyManagedPartitions`. It indicates which partitions should be ignored by the partition manager when checking or cleaning up partitions tables.

Signed-off-by: Yohan Legat <[email protected]>
  • Loading branch information
ylegat committed Jan 1, 2025
1 parent ba9075f commit afe42f3
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 23 deletions.
15 changes: 8 additions & 7 deletions internal/infra/partition/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ const (
)

type Configuration struct {
Schema string `mapstructure:"schema" validate:"required"`
Table string `mapstructure:"table" validate:"required"`
PartitionKey string `mapstructure:"partitionKey" validate:"required"`
Interval Interval `mapstructure:"interval" validate:"required,oneof=daily weekly monthly quarterly yearly"`
Retention int `mapstructure:"retention" validate:"required,gt=0"`
PreProvisioned int `mapstructure:"preProvisioned" validate:"required,gt=0"`
CleanupPolicy CleanupPolicy `mapstructure:"cleanupPolicy" validate:"required,oneof=drop detach"`
Schema string `mapstructure:"schema" validate:"required"`
Table string `mapstructure:"table" validate:"required"`
PartitionKey string `mapstructure:"partitionKey" validate:"required"`
Interval Interval `mapstructure:"interval" validate:"required,oneof=daily weekly monthly quarterly yearly"`
Retention int `mapstructure:"retention" validate:"required,gt=0"`
PreProvisioned int `mapstructure:"preProvisioned" validate:"required,gt=0"`
CleanupPolicy CleanupPolicy `mapstructure:"cleanupPolicy" validate:"required,oneof=drop detach"`
ManuallyManagedPartitions []string `mapstructure:"manuallyManagedPartitions"`
}

func (p Configuration) GeneratePartition(forDate time.Time) (Partition, error) {
Expand Down
55 changes: 48 additions & 7 deletions pkg/ppm/checkpartition.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,17 @@ func (p *PPM) checkPartitionKey(config partition.Configuration) error {
return fmt.Errorf("failed to get partition settings: %w", err)
}

p.logger.Debug("Partition configuration found", "schema", config.Schema, "table", config.Table, "partition_key", config.PartitionKey, "partition_key_type", keyDataType, "partition_strategy", partitionStrategy)
p.logger.Debug("Partition configuration found",
"schema",
config.Schema,
"table",
config.Table,
"partition_key",
config.PartitionKey,
"partition_key_type",
keyDataType,
"partition_strategy",
partitionStrategy)

if partitionKey != config.PartitionKey {
p.logger.Warn("Partition key mismatch", "expected", config.PartitionKey, "current", partitionKey)
Expand Down Expand Up @@ -108,7 +118,10 @@ func IsSupportedKeyDataType(dataType postgresql.ColumnType) bool {
return slices.Contains(SupportedPartitionKeyDataType, dataType)
}

func (p *PPM) comparePartitions(existingTables, expectedTables []partition.Partition) (unexpectedTables, missingTables, incorrectBounds []partition.Partition) {
func (p *PPM) comparePartitions(existingTables,
expectedTables []partition.Partition,
manuallyManagedPartitionNames []string,
) (unexpectedTables, missingTables, incorrectBounds []partition.Partition) {
existing := make(map[string]partition.Partition)
expectedAndExists := make(map[string]bool)

Expand All @@ -124,13 +137,29 @@ func (p *PPM) comparePartitions(existingTables, expectedTables []partition.Parti
if existing[t.Name].UpperBound != t.UpperBound {
incorrectBound = true

p.logger.Warn("Incorrect upper partition bound", "schema", t.Schema, "table", t.Name, "current_bound", existing[t.Name].UpperBound, "expected_bound", t.UpperBound)
p.logger.Warn("Incorrect upper partition bound",
"schema",
t.Schema,
"table",
t.Name,
"current_bound",
existing[t.Name].UpperBound,
"expected_bound",
t.UpperBound)
}

if existing[t.Name].LowerBound != t.LowerBound {
incorrectBound = true

p.logger.Warn("Incorrect lower partition bound", "schema", t.Schema, "table", t.Name, "current_bound", existing[t.Name].LowerBound, "expected_bound", t.LowerBound)
p.logger.Warn("Incorrect lower partition bound",
"schema",
t.Schema,
"table",
t.Name,
"current_bound",
existing[t.Name].LowerBound,
"expected_bound",
t.LowerBound)
}

if incorrectBound {
Expand All @@ -143,8 +172,18 @@ func (p *PPM) comparePartitions(existingTables, expectedTables []partition.Parti

for _, t := range existingTables {
if _, found := expectedAndExists[t.Name]; !found {
// Only in existingTables and not in both
unexpectedTables = append(unexpectedTables, t)
isPartitionManuallyManaged := false

for _, manuallyManagedPartition := range manuallyManagedPartitionNames {
if manuallyManagedPartition == t.Name {
isPartitionManuallyManaged = true
}
}

if !isPartitionManuallyManaged {
// Only in existingTables and not in both
unexpectedTables = append(unexpectedTables, t)
}
}
}

Expand Down Expand Up @@ -190,7 +229,9 @@ func (p *PPM) checkPartitionsConfiguration(config partition.Configuration) error
return fmt.Errorf("could not list partitions: %w", err)
}

unexpected, missing, incorrectBound := p.comparePartitions(foundPartitions, expectedPartitions)
unexpected, missing, incorrectBound := p.comparePartitions(foundPartitions,
expectedPartitions,
config.ManuallyManagedPartitions)

if len(unexpected) > 0 {
partitionContainAnError = true
Expand Down
114 changes: 106 additions & 8 deletions pkg/ppm/checkpartition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func TestCheckPartitions(t *testing.T) {

postgreSQLMock.On("GetColumnDataType", p.Schema, p.Table, p.PartitionKey).Return(postgresql.Date, nil).Once()

postgreSQLMock.On("GetPartitionSettings", p.Schema, p.Table).Return(string(partition.Range), p.PartitionKey, nil).Once()
postgreSQLMock.On("GetPartitionSettings", p.Schema, p.Table).Return(string(partition.Range),
p.PartitionKey,
nil).Once()

convertedTables := partitionResultToPartition(t, tables, boundDateFormat)
postgreSQLMock.On("ListPartitions", p.Schema, p.Table).Return(convertedTables, nil).Once()
Expand Down Expand Up @@ -158,13 +160,21 @@ func TestCheckMissingPartitions(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fmt.Println("tc.tables", tc.tables)
postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(partition.Range), config.PartitionKey, nil).Once()
postgreSQLMock.On("GetColumnDataType", config.Schema, config.Table, config.PartitionKey).Return(postgresql.Date, nil).Once()
postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(partition.Range),
config.PartitionKey,
nil).Once()
postgreSQLMock.On("GetColumnDataType",
config.Schema,
config.Table,
config.PartitionKey).Return(postgresql.Date, nil).Once()

tables := partitionResultToPartition(t, tc.tables, boundDateFormat)
postgreSQLMock.On("ListPartitions", config.Schema, config.Table).Return(tables, nil).Once()

checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config})
checker := ppm.New(context.TODO(),
*logger,
postgreSQLMock,
map[string]partition.Configuration{"test": config})
assert.Error(t, checker.CheckPartitions(), "at least one partition contains an invalid configuration")
})
}
Expand Down Expand Up @@ -201,11 +211,99 @@ func TestUnsupportedPartitionsStrategy(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
postgreSQLMock.On("GetColumnDataType", config.Schema, config.Table, config.PartitionKey).Return(postgresql.Date, nil).Once()
postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(tc.strategy), tc.key, nil).Once()

checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config})
postgreSQLMock.On("GetColumnDataType",
config.Schema,
config.Table,
config.PartitionKey).Return(postgresql.Date, nil).Once()
postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(tc.strategy),
tc.key,
nil).Once()

checker := ppm.New(context.TODO(),
*logger,
postgreSQLMock,
map[string]partition.Configuration{"test": config})
assert.Error(t, checker.CheckPartitions(), "at least one partition contains an invalid configuration")
})
}
}

func TestPPM_comparePartitions(t *testing.T) {
p := partition.Partition{
ParentTable: "ParentTable",
Schema: "Schema",
Name: "Name",
}

type result struct {
unexpectedTables []partition.Partition
missingTables []partition.Partition
incorrectBounds []partition.Partition
}

tests := []struct {
name string
existingTables []partition.Partition
expectedTables []partition.Partition
manuallyManagedPartitionNames []string
result result
}{
{
name: "all existing is expected",
existingTables: []partition.Partition{
p,
},
expectedTables: []partition.Partition{
p,
},
result: result{},
},
{
name: "manually managed partition",
existingTables: []partition.Partition{
p,
},
manuallyManagedPartitionNames: []string{"Name"},
result: result{},
},
{
name: "missing table",
expectedTables: []partition.Partition{
p,
},
result: result{
missingTables: []partition.Partition{
p,
},
},
},
{
name: "unexpected table",
existingTables: []partition.Partition{
p,
},
result: result{
unexpectedTables: []partition.Partition{
p,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := ppm.PPM{}
gotUnexpectedTables, gotMissingTables, gotIncorrectBounds := ppm.ComparePartitions(&p, tt.existingTables,
tt.expectedTables,
tt.manuallyManagedPartitionNames)
assert.DeepEqual(t,
tt.result.unexpectedTables,
gotUnexpectedTables)
assert.DeepEqual(t,
tt.result.missingTables,
gotMissingTables)
assert.DeepEqual(t,
tt.result.incorrectBounds,
gotIncorrectBounds)
})
}
}
2 changes: 1 addition & 1 deletion pkg/ppm/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (p PPM) CleanupPartitions() error {
return fmt.Errorf("could not list partitions: %w", err)
}

unexpected, _, _ := p.comparePartitions(foundPartitions, expectedPartitions)
unexpected, _, _ := p.comparePartitions(foundPartitions, expectedPartitions, config.ManuallyManagedPartitions)

for _, partition := range unexpected {
err := p.DetachPartition(partition)
Expand Down
10 changes: 10 additions & 0 deletions pkg/ppm/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ppm

import "github.com/qonto/postgresql-partition-manager/internal/infra/partition"

func ComparePartitions(p *PPM,
existingTables, expectedTables []partition.Partition,
manuallyManagedPartitionNames []string,
) (unexpectedTables, missingTables, incorrectBounds []partition.Partition) {
return p.comparePartitions(existingTables, expectedTables, manuallyManagedPartitionNames)
}

0 comments on commit afe42f3

Please sign in to comment.