From d4bdfab53820c136e16b6a876b96db2142d68adb Mon Sep 17 00:00:00 2001 From: Yohan Legat Date: Wed, 1 Jan 2025 11:25:58 +0100 Subject: [PATCH] Introduce manually managed partitions The data used as a partition key is not always of the greatest quality. For instance, a timestamp column could contain some invalid dates referring to year -1. Ideally these values should be cleaned, but it is not always an option in a system where data is treated as immutable. One way to deal with this situation is to add a default partition that will be in charge of containing data that cannot be assigned to any of the partitions already defined. Another solution, when the range of invalid values is known, is to move this data inside one or several dedicated partitions. Both solutions involve the creation of partitions that cannot be managed by the partition manager, because their range break the policy defined for the other partitions. For this reason we propose to introduce a new configuration property: `manuallyManagedPartitions`. It indicates which partitions should be ignored by the partition manager when checking or cleaning up partitions tables. --- internal/infra/partition/configuration.go | 15 +-- pkg/ppm/checkpartition.go | 55 +++++++++-- pkg/ppm/checkpartition_test.go | 114 ++++++++++++++++++++-- pkg/ppm/cleanup.go | 2 +- pkg/ppm/export_test.go | 10 ++ 5 files changed, 173 insertions(+), 23 deletions(-) create mode 100644 pkg/ppm/export_test.go diff --git a/internal/infra/partition/configuration.go b/internal/infra/partition/configuration.go index 13caa77..bf2ab4a 100644 --- a/internal/infra/partition/configuration.go +++ b/internal/infra/partition/configuration.go @@ -15,13 +15,14 @@ const ( ) type Configuration struct { - Schema string `mapstructure:"schema" validate:"required"` - Table string `mapstructure:"table" validate:"required"` - PartitionKey string `mapstructure:"partitionKey" validate:"required"` - Interval Interval `mapstructure:"interval" validate:"required,oneof=daily weekly monthly quarterly yearly"` - Retention int `mapstructure:"retention" validate:"required,gt=0"` - PreProvisioned int `mapstructure:"preProvisioned" validate:"required,gt=0"` - CleanupPolicy CleanupPolicy `mapstructure:"cleanupPolicy" validate:"required,oneof=drop detach"` + Schema string `mapstructure:"schema" validate:"required"` + Table string `mapstructure:"table" validate:"required"` + PartitionKey string `mapstructure:"partitionKey" validate:"required"` + Interval Interval `mapstructure:"interval" validate:"required,oneof=daily weekly monthly quarterly yearly"` + Retention int `mapstructure:"retention" validate:"required,gt=0"` + PreProvisioned int `mapstructure:"preProvisioned" validate:"required,gt=0"` + CleanupPolicy CleanupPolicy `mapstructure:"cleanupPolicy" validate:"required,oneof=drop detach"` + ManuallyManagedPartitions []string `mapstructure:"manuallyManagedPartitions"` } func (p Configuration) GeneratePartition(forDate time.Time) (Partition, error) { diff --git a/pkg/ppm/checkpartition.go b/pkg/ppm/checkpartition.go index 563aa0b..b59f340 100644 --- a/pkg/ppm/checkpartition.go +++ b/pkg/ppm/checkpartition.go @@ -77,7 +77,17 @@ func (p *PPM) checkPartitionKey(config partition.Configuration) error { return fmt.Errorf("failed to get partition settings: %w", err) } - p.logger.Debug("Partition configuration found", "schema", config.Schema, "table", config.Table, "partition_key", config.PartitionKey, "partition_key_type", keyDataType, "partition_strategy", partitionStrategy) + p.logger.Debug("Partition configuration found", + "schema", + config.Schema, + "table", + config.Table, + "partition_key", + config.PartitionKey, + "partition_key_type", + keyDataType, + "partition_strategy", + partitionStrategy) if partitionKey != config.PartitionKey { p.logger.Warn("Partition key mismatch", "expected", config.PartitionKey, "current", partitionKey) @@ -108,7 +118,10 @@ func IsSupportedKeyDataType(dataType postgresql.ColumnType) bool { return slices.Contains(SupportedPartitionKeyDataType, dataType) } -func (p *PPM) comparePartitions(existingTables, expectedTables []partition.Partition) (unexpectedTables, missingTables, incorrectBounds []partition.Partition) { +func (p *PPM) comparePartitions(existingTables, + expectedTables []partition.Partition, + manuallyManagedPartitionNames []string, +) (unexpectedTables, missingTables, incorrectBounds []partition.Partition) { existing := make(map[string]partition.Partition) expectedAndExists := make(map[string]bool) @@ -124,13 +137,29 @@ func (p *PPM) comparePartitions(existingTables, expectedTables []partition.Parti if existing[t.Name].UpperBound != t.UpperBound { incorrectBound = true - p.logger.Warn("Incorrect upper partition bound", "schema", t.Schema, "table", t.Name, "current_bound", existing[t.Name].UpperBound, "expected_bound", t.UpperBound) + p.logger.Warn("Incorrect upper partition bound", + "schema", + t.Schema, + "table", + t.Name, + "current_bound", + existing[t.Name].UpperBound, + "expected_bound", + t.UpperBound) } if existing[t.Name].LowerBound != t.LowerBound { incorrectBound = true - p.logger.Warn("Incorrect lower partition bound", "schema", t.Schema, "table", t.Name, "current_bound", existing[t.Name].LowerBound, "expected_bound", t.LowerBound) + p.logger.Warn("Incorrect lower partition bound", + "schema", + t.Schema, + "table", + t.Name, + "current_bound", + existing[t.Name].LowerBound, + "expected_bound", + t.LowerBound) } if incorrectBound { @@ -143,8 +172,18 @@ func (p *PPM) comparePartitions(existingTables, expectedTables []partition.Parti for _, t := range existingTables { if _, found := expectedAndExists[t.Name]; !found { - // Only in existingTables and not in both - unexpectedTables = append(unexpectedTables, t) + isPartitionManuallyManaged := false + + for _, manuallyManagedPartition := range manuallyManagedPartitionNames { + if manuallyManagedPartition == t.Name { + isPartitionManuallyManaged = true + } + } + + if !isPartitionManuallyManaged { + // Only in existingTables and not in both + unexpectedTables = append(unexpectedTables, t) + } } } @@ -190,7 +229,9 @@ func (p *PPM) checkPartitionsConfiguration(config partition.Configuration) error return fmt.Errorf("could not list partitions: %w", err) } - unexpected, missing, incorrectBound := p.comparePartitions(foundPartitions, expectedPartitions) + unexpected, missing, incorrectBound := p.comparePartitions(foundPartitions, + expectedPartitions, + config.ManuallyManagedPartitions) if len(unexpected) > 0 { partitionContainAnError = true diff --git a/pkg/ppm/checkpartition_test.go b/pkg/ppm/checkpartition_test.go index 143c6c0..9081a02 100644 --- a/pkg/ppm/checkpartition_test.go +++ b/pkg/ppm/checkpartition_test.go @@ -101,7 +101,9 @@ func TestCheckPartitions(t *testing.T) { postgreSQLMock.On("GetColumnDataType", p.Schema, p.Table, p.PartitionKey).Return(postgresql.Date, nil).Once() - postgreSQLMock.On("GetPartitionSettings", p.Schema, p.Table).Return(string(partition.Range), p.PartitionKey, nil).Once() + postgreSQLMock.On("GetPartitionSettings", p.Schema, p.Table).Return(string(partition.Range), + p.PartitionKey, + nil).Once() convertedTables := partitionResultToPartition(t, tables, boundDateFormat) postgreSQLMock.On("ListPartitions", p.Schema, p.Table).Return(convertedTables, nil).Once() @@ -158,13 +160,21 @@ func TestCheckMissingPartitions(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { fmt.Println("tc.tables", tc.tables) - postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(partition.Range), config.PartitionKey, nil).Once() - postgreSQLMock.On("GetColumnDataType", config.Schema, config.Table, config.PartitionKey).Return(postgresql.Date, nil).Once() + postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(partition.Range), + config.PartitionKey, + nil).Once() + postgreSQLMock.On("GetColumnDataType", + config.Schema, + config.Table, + config.PartitionKey).Return(postgresql.Date, nil).Once() tables := partitionResultToPartition(t, tc.tables, boundDateFormat) postgreSQLMock.On("ListPartitions", config.Schema, config.Table).Return(tables, nil).Once() - checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config}) + checker := ppm.New(context.TODO(), + *logger, + postgreSQLMock, + map[string]partition.Configuration{"test": config}) assert.Error(t, checker.CheckPartitions(), "at least one partition contains an invalid configuration") }) } @@ -201,11 +211,99 @@ func TestUnsupportedPartitionsStrategy(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - postgreSQLMock.On("GetColumnDataType", config.Schema, config.Table, config.PartitionKey).Return(postgresql.Date, nil).Once() - postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(tc.strategy), tc.key, nil).Once() - - checker := ppm.New(context.TODO(), *logger, postgreSQLMock, map[string]partition.Configuration{"test": config}) + postgreSQLMock.On("GetColumnDataType", + config.Schema, + config.Table, + config.PartitionKey).Return(postgresql.Date, nil).Once() + postgreSQLMock.On("GetPartitionSettings", config.Schema, config.Table).Return(string(tc.strategy), + tc.key, + nil).Once() + + checker := ppm.New(context.TODO(), + *logger, + postgreSQLMock, + map[string]partition.Configuration{"test": config}) assert.Error(t, checker.CheckPartitions(), "at least one partition contains an invalid configuration") }) } } + +func TestPPM_comparePartitions(t *testing.T) { + p := partition.Partition{ + ParentTable: "ParentTable", + Schema: "Schema", + Name: "Name", + } + + type result struct { + unexpectedTables []partition.Partition + missingTables []partition.Partition + incorrectBounds []partition.Partition + } + + tests := []struct { + name string + existingTables []partition.Partition + expectedTables []partition.Partition + manuallyManagedPartitionNames []string + result result + }{ + { + name: "all existing is expected", + existingTables: []partition.Partition{ + p, + }, + expectedTables: []partition.Partition{ + p, + }, + result: result{}, + }, + { + name: "manually managed partition", + existingTables: []partition.Partition{ + p, + }, + manuallyManagedPartitionNames: []string{"Name"}, + result: result{}, + }, + { + name: "missing table", + expectedTables: []partition.Partition{ + p, + }, + result: result{ + missingTables: []partition.Partition{ + p, + }, + }, + }, + { + name: "unexpected table", + existingTables: []partition.Partition{ + p, + }, + result: result{ + unexpectedTables: []partition.Partition{ + p, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := ppm.PPM{} + gotUnexpectedTables, gotMissingTables, gotIncorrectBounds := ppm.ComparePartitions(&p, tt.existingTables, + tt.expectedTables, + tt.manuallyManagedPartitionNames) + assert.DeepEqual(t, + tt.result.unexpectedTables, + gotUnexpectedTables) + assert.DeepEqual(t, + tt.result.missingTables, + gotMissingTables) + assert.DeepEqual(t, + tt.result.incorrectBounds, + gotIncorrectBounds) + }) + } +} diff --git a/pkg/ppm/cleanup.go b/pkg/ppm/cleanup.go index 9c7f58d..235e452 100644 --- a/pkg/ppm/cleanup.go +++ b/pkg/ppm/cleanup.go @@ -28,7 +28,7 @@ func (p PPM) CleanupPartitions() error { return fmt.Errorf("could not list partitions: %w", err) } - unexpected, _, _ := p.comparePartitions(foundPartitions, expectedPartitions) + unexpected, _, _ := p.comparePartitions(foundPartitions, expectedPartitions, config.ManuallyManagedPartitions) for _, partition := range unexpected { err := p.DetachPartition(partition) diff --git a/pkg/ppm/export_test.go b/pkg/ppm/export_test.go new file mode 100644 index 0000000..2c62c32 --- /dev/null +++ b/pkg/ppm/export_test.go @@ -0,0 +1,10 @@ +package ppm + +import "github.com/qonto/postgresql-partition-manager/internal/infra/partition" + +func ComparePartitions(p *PPM, + existingTables, expectedTables []partition.Partition, + manuallyManagedPartitionNames []string, +) (unexpectedTables, missingTables, incorrectBounds []partition.Partition) { + return p.comparePartitions(existingTables, expectedTables, manuallyManagedPartitionNames) +}