Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL sharding storage #205

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions pkg/providers/mysql/storage_sharding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package mysql

import (
"context"
"fmt"
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
)

func (s *Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error) {
partitions, err := s.resolvePartitions(ctx, table)
if err != nil {
logger.Log.Warnf("unable to load child tables: %v", err)
}
if len(partitions) > 0 {
return partitions, nil
}

return []abstract.TableDescription{table}, nil
}

func (s *Storage) resolvePartitions(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error) {
rows, err := s.DB.QueryContext(
ctx,
`
SELECT PARTITION_NAME, TABLE_ROWS
FROM INFORMATION_SCHEMA.PARTITIONS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?;
`,
table.Schema,
table.Name,
)
if err != nil {
return nil, xerrors.Errorf("unable to resolve partitions: %w", err)
}
var res []abstract.TableDescription
for rows.Next() {
var partName string
var tableRows int
if err := rows.Scan(&partName, &tableRows); err != nil {
return nil, xerrors.Errorf("unable to scan partition: %w", err)
}
res = append(res, abstract.TableDescription{
Name: table.Name,
Schema: table.Schema,
Filter: abstract.WhereStatement(fmt.Sprintf("PARTITION (%s)", partName)),
EtaRow: uint64(tableRows),
Offset: 0,
})
}
return res, nil
}
21 changes: 21 additions & 0 deletions pkg/providers/mysql/tests/sharding/dump/source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE TABLE orders (
id INT NOT NULL AUTO_INCREMENT,
order_date DATE NOT NULL,
customer_name VARCHAR(100),
amount DECIMAL(10,2),
PRIMARY KEY (id, order_date)
)
PARTITION BY RANGE (YEAR(order_date)) (
PARTITION p2022 VALUES LESS THAN (2023),
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION p_future VALUES LESS THAN MAXVALUE
);

INSERT INTO orders (order_date, customer_name, amount) VALUES
('2022-05-10', 'Alice', 150.00),
('2022-08-21', 'Bob', 200.00),
('2023-03-15', 'Charlie', 300.00),
('2023-11-05', 'David', 400.00),
('2024-01-10', 'Eve', 500.00),
('2024-07-18', 'Frank', 600.00);
40 changes: 40 additions & 0 deletions pkg/providers/mysql/tests/sharding/storage_sharding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package sharding

import (
"context"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/providers/mysql"
"github.com/doublecloud/transfer/pkg/providers/mysql/mysqlrecipe"
"github.com/stretchr/testify/require"
"testing"
)

func TestShardingByPartitions(t *testing.T) {
source := mysqlrecipe.RecipeMysqlSource()
storage, err := mysql.NewStorage(source.ToStorageParams())
require.NoError(t, err)
parts, err := storage.ShardTable(context.Background(), abstract.TableDescription{
Name: "orders",
Schema: source.Database,
Filter: "",
EtaRow: 0,
Offset: 0,
})
require.NoError(t, err)
require.Len(t, parts, 4)
resRows := 0
for _, part := range parts {
require.NoError(
t,
storage.LoadTable(context.Background(), part, func(items []abstract.ChangeItem) error {
for _, r := range items {
if r.IsRowEvent() {
resRows++
}
}
return nil
}),
)
}
require.Equal(t, resRows, 6)
}
10 changes: 9 additions & 1 deletion pkg/providers/mysql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ func buildSelectQuery(table abstract.TableDescription, tableSchema []abstract.Co
)

if table.Filter != "" {
resultQuery += " WHERE " + string(table.Filter)
if IsPartition(table.Filter) {
resultQuery += string(table.Filter)
} else {
resultQuery += " WHERE " + string(table.Filter)
}
}
if table.Offset != 0 {
resultQuery += fmt.Sprintf(" OFFSET %d", table.Offset)
Expand All @@ -127,6 +131,10 @@ func buildSelectQuery(table abstract.TableDescription, tableSchema []abstract.Co
return resultQuery
}

func IsPartition(filter abstract.WhereStatement) bool {
return strings.HasPrefix(string(filter), "PARTITION")
}

func MakeArrBacktickedColumnNames(tableSchema *[]abstract.ColSchema) []string {
colNames := make([]string, len(*tableSchema))
for idx, col := range *tableSchema {
Expand Down
Loading