Skip to content

Commit

Permalink
Merge branch 'master' into meiji163/generated-col-error
Browse files Browse the repository at this point in the history
  • Loading branch information
meiji163 authored Oct 24, 2024
2 parents 1b863b0 + a834c00 commit 4960711
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 118 deletions.
54 changes: 46 additions & 8 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type Applier struct {
migrationContext *base.MigrationContext
finishedMigrating int64
name string

dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
Expand Down Expand Up @@ -106,6 +110,37 @@ func (this *Applier) InitDBConnections() (err error) {
return nil
}

func (this *Applier) prepareQueries() (err error) {
if this.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
if this.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
this.migrationContext.SharedColumns,
this.migrationContext.MappedSharedColumns,
); err != nil {
return err
}
if this.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.OriginalTableColumns,
this.migrationContext.SharedColumns,
this.migrationContext.MappedSharedColumns,
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
return nil
}

// validateAndReadGlobalVariables potentially reads server global variables, such as the time_zone and wait_timeout.
func (this *Applier) validateAndReadGlobalVariables() error {
query := `select /* gh-ost */ @@global.time_zone, @@global.wait_timeout`
Expand Down Expand Up @@ -631,6 +666,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(),
// TODO: Don't hardcode this
strings.HasPrefix(this.migrationContext.ApplierMySQLVersion, "8."),
)
if err != nil {
return chunkSize, rowsAffected, duration, err
Expand Down Expand Up @@ -1135,35 +1172,36 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv

// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBuildResult {
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
}
case binlog.UpdateDML:
{
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
results := make([]*dmlBuildResult, 0, 2)
dmlEvent.DML = binlog.DeleteDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
return results
}
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
query, sharedArgs, uniqueKeyArgs, err := this.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args()
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return append(results, newDmlBuildResult(query, args, 0, err))
return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)}
}
}
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
}

// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
Expand Down
194 changes: 174 additions & 20 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
gosql "database/sql"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -100,6 +101,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
columnValues := sql.ToColumnValues([]interface{}{123456, 42})

migrationContext := base.NewMigrationContext()
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "test"
migrationContext.OriginalTableColumns = columns
migrationContext.SharedColumns = columns
Expand All @@ -110,6 +112,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
}

applier := NewApplier(migrationContext)
applier.prepareQueries()

t.Run("delete", func(t *testing.T) {
binlogEvent := &binlog.BinlogDMLEvent{
Expand Down Expand Up @@ -199,6 +202,30 @@ type ApplierTestSuite struct {
mysqlContainer testcontainers.Container
}

func (suite *ApplierTestSuite) getConnectionConfig(ctx context.Context) (*mysql.ConnectionConfig, error) {
host, err := suite.mysqlContainer.ContainerIP(ctx)
if err != nil {
return nil, err
}

config := mysql.NewConnectionConfig()
config.Key.Hostname = host
config.Key.Port = 3306
config.User = "root"
config.Password = "root-password"

return config, nil
}

func (suite *ApplierTestSuite) getDb(ctx context.Context) (*gosql.DB, error) {
host, err := suite.mysqlContainer.ContainerIP(ctx)
if err != nil {
return nil, err
}

return gosql.Open("mysql", "root:root-password@tcp("+host+":3306)/test")
}

func (suite *ApplierTestSuite) SetupSuite() {
ctx := context.Background()
req := testcontainers.ContainerRequest{
Expand Down Expand Up @@ -229,7 +256,7 @@ func (suite *ApplierTestSuite) SetupTest() {
suite.Require().NoError(err)
suite.Require().Equalf(0, rc, "failed to created database: expected exit code 0, got %d", rc)

rc, _, err = suite.mysqlContainer.Exec(ctx, []string{"mysql", "-uroot", "-proot-password", "-e", "CREATE TABLE test.testing (id INT, item_id INT);"})
rc, _, err = suite.mysqlContainer.Exec(ctx, []string{"mysql", "-uroot", "-proot-password", "-e", "CREATE TABLE test.testing (id INT, item_id INT, PRIMARY KEY (id));"})
suite.Require().NoError(err)
suite.Require().Equalf(0, rc, "failed to created table: expected exit code 0, got %d", rc)
}
Expand All @@ -245,15 +272,11 @@ func (suite *ApplierTestSuite) TearDownTest() {
func (suite *ApplierTestSuite) TestInitDBConnections() {
ctx := context.Background()

host, err := suite.mysqlContainer.ContainerIP(ctx)
connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig()
migrationContext.ApplierConnectionConfig.Key.Hostname = host
migrationContext.ApplierConnectionConfig.Key.Port = 3306
migrationContext.ApplierConnectionConfig.User = "root"
migrationContext.ApplierConnectionConfig.Password = "root-password"
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.SetConnectionConfig("innodb")
Expand All @@ -274,24 +297,25 @@ func (suite *ApplierTestSuite) TestInitDBConnections() {
func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
ctx := context.Background()

host, err := suite.mysqlContainer.ContainerIP(ctx)
connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig()
migrationContext.ApplierConnectionConfig.Key.Hostname = host
migrationContext.ApplierConnectionConfig.Key.Port = 3306
migrationContext.ApplierConnectionConfig.User = "root"
migrationContext.ApplierConnectionConfig.Password = "root-password"
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.SetConnectionConfig("innodb")

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "primary_key",
Columns: *sql.NewColumnList([]string{"id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
Expand All @@ -313,7 +337,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
suite.Require().NoError(err)

// Check that the row was inserted
db, err := gosql.Open("mysql", "root:root-password@tcp("+host+":3306)/test")
db, err := suite.getDb(ctx)
suite.Require().NoError(err)
defer db.Close()

Expand All @@ -340,15 +364,11 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
ctx := context.Background()

host, err := suite.mysqlContainer.ContainerIP(ctx)
connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig()
migrationContext.ApplierConnectionConfig.Key.Hostname = host
migrationContext.ApplierConnectionConfig.Key.Port = 3306
migrationContext.ApplierConnectionConfig.User = "root"
migrationContext.ApplierConnectionConfig.Password = "root-password"
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.SetConnectionConfig("innodb")
Expand All @@ -367,6 +387,140 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
suite.Require().NoError(err)
}

func (suite *ApplierTestSuite) TestApplyIterationInsertQuery() {
ctx := context.Background()

connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.ChunkSize = 10
migrationContext.SetConnectionConfig("innodb")

db, err := suite.getDb(ctx)
suite.Require().NoError(err)
defer db.Close()

_, err = db.Exec("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))")
suite.Require().NoError(err)

// Insert some test values
for i := 1; i <= 10; i++ {
_, err = db.Exec("INSERT INTO test.testing (id, item_id) VALUES (?, ?)", i, i)
suite.Require().NoError(err)
}

migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}

migrationContext.MigrationIterationRangeMinValues = sql.ToColumnValues([]interface{}{1})
migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{10})

applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery()
suite.Require().NoError(err)

suite.Require().Equal(migrationContext.ChunkSize, chunkSize)
suite.Require().Equal(int64(10), rowsAffected)
suite.Require().Greater(duration, time.Duration(0))

// Check that the rows were inserted
rows, err := db.Query("SELECT * FROM test._testing_gho")
suite.Require().NoError(err)
defer rows.Close()

var count, id, item_id int
for rows.Next() {
err = rows.Scan(&id, &item_id)
suite.Require().NoError(err)
count += 1
}
suite.Require().NoError(rows.Err())

suite.Require().Equal(10, count)
}

func (suite *ApplierTestSuite) TestApplyIterationInsertQueryFailsFastWhenSelectingLockedRows() {
ctx := context.Background()

connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.ChunkSize = 10
migrationContext.TableEngine = "innodb"
migrationContext.SetConnectionConfig("innodb")

db, err := suite.getDb(ctx)
suite.Require().NoError(err)
defer db.Close()

_, err = db.Exec("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))")
suite.Require().NoError(err)

// Insert some test values
for i := 1; i <= 10; i++ {
_, err = db.Exec("INSERT INTO test.testing (id, item_id) VALUES (?, ?)", i, i)
suite.Require().NoError(err)
}

migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}

migrationContext.MigrationIterationRangeMinValues = sql.ToColumnValues([]interface{}{1})
migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{10})

applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

// Lock one of the rows
tx, err := db.Begin()
suite.Require().NoError(err)
defer func() {
suite.Require().NoError(tx.Rollback())
}()

_, err = tx.Exec("SELECT * FROM test.testing WHERE id = 5 FOR UPDATE")
suite.Require().NoError(err)

chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery()
suite.Require().Error(err)
suite.Require().EqualError(err, "Error 3572 (HY000): Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.")

suite.Require().Equal(migrationContext.ChunkSize, chunkSize)
suite.Require().Equal(int64(0), rowsAffected)
suite.Require().Equal(time.Duration(0), duration)

// Check that the no rows were inserted
var count int
err = db.QueryRow("SELECT COUNT(*) FROM test._testing_gho").Scan(&count)
suite.Require().NoError(err)

suite.Require().Equal(0, count)
}

func TestApplier(t *testing.T) {
suite.Run(t, new(ApplierTestSuite))
}
4 changes: 4 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ func (this *Migrator) Migrate() (err error) {
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
return err
}
// We can prepare some of the queries on the applier
if err := this.applier.prepareQueries(); err != nil {
return err
}
// Validation complete! We're good to execute this migration
if err := this.hooksExecutor.onValidated(); err != nil {
return err
Expand Down
Loading

0 comments on commit 4960711

Please sign in to comment.