From 582338ed10723fbe41cf12447a5a49c6733c8989 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?vsth=E5=AD=99=E5=A4=A9=E6=B5=A9?= <thsun@trip.com>
Date: Thu, 20 Apr 2023 22:15:50 +0800
Subject: [PATCH 1/4] fix issue 887

---
 go/base/context.go    |  2 ++
 go/cmd/gh-ost/main.go |  2 +-
 go/logic/applier.go   | 81 +++++++++++++++++++++++++++++++++++++++++--
 go/logic/migrator.go  | 27 +++++++++++++--
 4 files changed, 107 insertions(+), 5 deletions(-)

diff --git a/go/base/context.go b/go/base/context.go
index e3472f5bd..878316e6c 100644
--- a/go/base/context.go
+++ b/go/base/context.go
@@ -232,6 +232,8 @@ type MigrationContext struct {
 
 	recentBinlogCoordinates mysql.BinlogCoordinates
 
+	AllowSetupMetadataLockInstruments bool
+
 	Log Logger
 }
 
diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go
index 3daf24441..eedeee325 100644
--- a/go/cmd/gh-ost/main.go
+++ b/go/cmd/gh-ost/main.go
@@ -134,7 +134,7 @@ func main() {
 	flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook")
 
 	flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
-
+	flag.BoolVar(&migrationContext.AllowSetupMetadataLockInstruments, "allow-setup-metadata-lock-instruments", false, "validate rename session acquiring lock whether is original table before unlock tables in cut-over phase")
 	maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
 	criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits")
 	flag.Int64Var(&migrationContext.CriticalLoadIntervalMilliseconds, "critical-load-interval-millis", 0, "When 0, migration immediately bails out upon meeting critical-load. When non-zero, a second check is done after given interval, and migration only bails out if 2nd check still meets critical load")
diff --git a/go/logic/applier.go b/go/logic/applier.go
index 9554c59d0..3c19e64f2 100644
--- a/go/logic/applier.go
+++ b/go/logic/applier.go
@@ -416,6 +416,24 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
 	return this.WriteAndLogChangelog("state", value)
 }
 
+func (this *Applier) EnableMetadataLockInstrument() (err error) {
+	query := fmt.Sprintf(`select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'`)
+	var enabled, timed string
+	if err = this.db.QueryRow(query).Scan(&enabled, &timed); err != nil {
+		return this.migrationContext.Log.Errorf("query performance_schema.setup_instruments with name wait/lock/metadata/sql/mdl error: %s", err)
+	}
+	if enabled == "YES" && timed == "YES" {
+		this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl already has been enabled")
+		return nil
+	}
+	this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl state: enabled %s, timed %s", enabled, timed)
+	if _, err = this.db.Exec(`UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME = 'wait/lock/metadata/sql/mdl'`); err != nil {
+		return this.migrationContext.Log.Errorf("enable instrument wait/lock/metadata/sql/mdl error: %s", err)
+	}
+	this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl enabled")
+	return nil
+}
+
 // InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
 // This is done asynchronously
 func (this *Applier) InitiateHeartbeat() {
@@ -934,7 +952,7 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
 }
 
 // AtomicCutOverMagicLock
-func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
+func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, okToDropSentryTable <-chan bool, dropSentryTableDone chan<- bool) error {
 	tx, err := this.db.Begin()
 	if err != nil {
 		tableLocked <- err
@@ -1003,7 +1021,16 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
 
 	// The cut-over phase will proceed to apply remaining backlog onto ghost table,
 	// and issue RENAME. We wait here until told to proceed.
-	<-okToUnlockTable
+
+	// we should make sure that the whole lock tables duration(include wait channel cost) not higher than rename session timeout.
+	// receive timeout channel, Rename session already has timeout&quit
+	select {
+	case <-okToDropSentryTable:
+		this.migrationContext.Log.Infof("Receive drop magic table channel, drop table %s.%s now", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName()))
+	case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Now().Sub(this.migrationContext.LockTablesStartTime)):
+		this.migrationContext.Log.Warningf("Wait drop magic table channel timeout, drop table %s.%s forcefully now. Noteworthy, it is unreasonable, timeout was used for foolproof, but normally it should not be here.", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName()))
+	}
+
 	this.migrationContext.Log.Infof("Will now proceed to drop magic table and unlock tables")
 
 	// The magic table is here because we locked it. And we are the only ones allowed to drop it.
@@ -1019,6 +1046,16 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
 		// We DO NOT return here because we must `UNLOCK TABLES`!
 	}
 
+	// Anyway, send dropSentryTableDone channel here
+	dropSentryTableDone <- true
+
+	// receive timeout channel, Rename session already has timeout&quit
+	select {
+	case <-okToUnlockTable:
+		this.migrationContext.Log.Infof("Receive unlock table channel, unlock tables now")
+	case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Now().Sub(this.migrationContext.LockTablesStartTime)):
+		this.migrationContext.Log.Warningf("Wait unlock table channel timeout, unlock tables forcefully now. Noteworthy, it is unreasonable, timeout was used for foolproof, but normally it should not be here.")
+	}
 	// Tables still locked
 	this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
 		sql.EscapeName(this.migrationContext.DatabaseName),
@@ -1079,6 +1116,46 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
 	return nil
 }
 
+// ValidateGhostTableLocked validate original table has been locked via wait/lock/metadata/sql/mdl instrument.
+// return error means the ghost table has not been locked by rename session which is not expected. Need kill rename session, unlock table try again later.
+func (this *Applier) ValidateGhostTableLocked(renameSessionId int64) (err error) {
+	var schema, object, state, info string
+	query := `select /*+ MAX_EXECUTION_TIME(300) */ ifnull(a.object_schema,''),a.object_name,b.PROCESSLIST_STATE,b.PROCESSLIST_INFO from performance_schema.metadata_locks a join performance_schema.threads b on a.owner_thread_id=b.thread_id where b.processlist_id=? and a.lock_status='PENDING';`
+	// Not strictly validate here
+	if err := this.db.QueryRow(query, renameSessionId).Scan(&schema, &object, &state, &info); err != nil {
+		if err == gosql.ErrNoRows {
+			this.migrationContext.Log.Warningf("query metadata locks returns %s, perhaps instrument wait/lock/metadata/sql/mdl not enabled, enable it via -allow-setup-metadata-lock-instruments", err)
+		}
+		this.migrationContext.Log.Warningf("Grabbing rename session acquire metadata lock error: %s, query:%s", err, query)
+	} else {
+		this.migrationContext.Log.Infof("Grabbing rename session acquire metadata lock is schema:%s, object:%s, state:%s, info: %s", schema, object, state, info)
+		if strings.ToLower(object) != strings.ToLower(this.migrationContext.OriginalTableName) {
+			return this.migrationContext.Log.Errorf("Expect rename session %d acquiring table metadata lock is %s.%s, but got %s.%s",
+				renameSessionId,
+				sql.EscapeName(this.migrationContext.DatabaseName),
+				sql.EscapeName(this.migrationContext.OriginalTableName),
+				sql.EscapeName(schema),
+				sql.EscapeName(object))
+		}
+	}
+	return nil
+}
+
+// KillRenameSession Kill rename session while ghost table not locked.
+// Check rename session id whether is an expect process before execute kill command.
+func (this *Applier) KillRenameSession(renameSessionId int64) (err error) {
+	if err := this.ExpectProcess(renameSessionId, "metadata lock", "rename"); err != nil {
+		return err
+	}
+	this.migrationContext.Log.Infof("Starting kill rename session %d", renameSessionId)
+	query := fmt.Sprintf(`kill /* gh-ost */ %d`, renameSessionId)
+	if _, err := this.db.Exec(query); err != nil {
+		return this.migrationContext.Log.Errorf("kill rename session %d error %s, anyway starting release original table now", renameSessionId, err)
+	}
+	this.migrationContext.Log.Infof("Kill rename session %d done", renameSessionId)
+	return nil
+}
+
 func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
 	query := fmt.Sprintf(`show global status like '%s'`, variableName)
 	if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
diff --git a/go/logic/migrator.go b/go/logic/migrator.go
index b4d0a9ae1..b7958c961 100644
--- a/go/logic/migrator.go
+++ b/go/logic/migrator.go
@@ -638,7 +638,10 @@ func (this *Migrator) atomicCutOver() (err error) {
 	defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
 
 	okToUnlockTable := make(chan bool, 4)
+	okToDropSentryTable := make(chan bool, 4)
+	dropSentryTableDone := make(chan bool, 2)
 	defer func() {
+		okToDropSentryTable <- true
 		okToUnlockTable <- true
 	}()
 
@@ -648,7 +651,7 @@ func (this *Migrator) atomicCutOver() (err error) {
 	tableLocked := make(chan error, 2)
 	tableUnlocked := make(chan error, 2)
 	go func() {
-		if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
+		if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, okToDropSentryTable, dropSentryTableDone); err != nil {
 			this.migrationContext.Log.Errore(err)
 		}
 	}()
@@ -674,6 +677,7 @@ func (this *Migrator) atomicCutOver() (err error) {
 		if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
 			// Abort! Release the lock
 			atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
+			okToDropSentryTable <- true
 			okToUnlockTable <- true
 		}
 	}()
@@ -691,6 +695,7 @@ func (this *Migrator) atomicCutOver() (err error) {
 	// Wait for the RENAME to appear in PROCESSLIST
 	if err := this.retryOperation(waitForRename, true); err != nil {
 		// Abort! Release the lock
+		okToDropSentryTable <- true
 		okToUnlockTable <- true
 		return err
 	}
@@ -703,9 +708,20 @@ func (this *Migrator) atomicCutOver() (err error) {
 	}
 	this.migrationContext.Log.Infof("Connection holding lock on original table still exists")
 
+	okToDropSentryTable <- true
+	<-dropSentryTableDone
+
+	if err := this.applier.ValidateGhostTableLocked(renameSessionId); err != nil {
+		// Abort! Kill Rename session and release the lock
+		if err := this.applier.KillRenameSession(renameSessionId); err != nil {
+			this.migrationContext.Log.Errore(err)
+		}
+		okToUnlockTable <- true
+		return err
+	}
+
 	// Now that we've found the RENAME blocking, AND the locking connection still alive,
 	// we know it is safe to proceed to release the lock
-
 	okToUnlockTable <- true
 	// BAM! magic table dropped, original table lock is released
 	// -> RENAME released -> queries on original are unblocked.
@@ -1146,6 +1162,13 @@ func (this *Migrator) initiateApplier() error {
 		}
 	}
 	this.applier.WriteChangelogState(string(GhostTableMigrated))
+
+	if this.migrationContext.AllowSetupMetadataLockInstruments {
+		if err := this.applier.EnableMetadataLockInstrument(); err != nil {
+			this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out")
+			return err
+		}
+	}
 	go this.applier.InitiateHeartbeat()
 	return nil
 }

From 72980133db12280e59251395232a7dedfce31bd1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?vsth=E5=AD=99=E5=A4=A9=E6=B5=A9?= <thsun@trip.com>
Date: Fri, 21 Apr 2023 00:54:48 +0800
Subject: [PATCH 2/4] fix golangci-lint

---
 go/logic/applier.go | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/go/logic/applier.go b/go/logic/applier.go
index 3c19e64f2..9fae565ac 100644
--- a/go/logic/applier.go
+++ b/go/logic/applier.go
@@ -11,6 +11,7 @@ import (
 	"strings"
 	"sync/atomic"
 	"time"
+	"errors"
 
 	"github.com/github/gh-ost/go/base"
 	"github.com/github/gh-ost/go/binlog"
@@ -417,12 +418,12 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
 }
 
 func (this *Applier) EnableMetadataLockInstrument() (err error) {
-	query := fmt.Sprintf(`select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'`)
+	query := `select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'`
 	var enabled, timed string
 	if err = this.db.QueryRow(query).Scan(&enabled, &timed); err != nil {
 		return this.migrationContext.Log.Errorf("query performance_schema.setup_instruments with name wait/lock/metadata/sql/mdl error: %s", err)
 	}
-	if enabled == "YES" && timed == "YES" {
+	if strings.EqualFold(enabled, "YES") && strings.EqualFold(timed, "YES") {
 		this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl already has been enabled")
 		return nil
 	}
@@ -1027,7 +1028,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
 	select {
 	case <-okToDropSentryTable:
 		this.migrationContext.Log.Infof("Receive drop magic table channel, drop table %s.%s now", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName()))
-	case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Now().Sub(this.migrationContext.LockTablesStartTime)):
+	case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Since(this.migrationContext.LockTablesStartTime)):
 		this.migrationContext.Log.Warningf("Wait drop magic table channel timeout, drop table %s.%s forcefully now. Noteworthy, it is unreasonable, timeout was used for foolproof, but normally it should not be here.", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName()))
 	}
 
@@ -1053,7 +1054,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
 	select {
 	case <-okToUnlockTable:
 		this.migrationContext.Log.Infof("Receive unlock table channel, unlock tables now")
-	case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Now().Sub(this.migrationContext.LockTablesStartTime)):
+	case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Since(this.migrationContext.LockTablesStartTime)):
 		this.migrationContext.Log.Warningf("Wait unlock table channel timeout, unlock tables forcefully now. Noteworthy, it is unreasonable, timeout was used for foolproof, but normally it should not be here.")
 	}
 	// Tables still locked
@@ -1123,7 +1124,7 @@ func (this *Applier) ValidateGhostTableLocked(renameSessionId int64) (err error)
 	query := `select /*+ MAX_EXECUTION_TIME(300) */ ifnull(a.object_schema,''),a.object_name,b.PROCESSLIST_STATE,b.PROCESSLIST_INFO from performance_schema.metadata_locks a join performance_schema.threads b on a.owner_thread_id=b.thread_id where b.processlist_id=? and a.lock_status='PENDING';`
 	// Not strictly validate here
 	if err := this.db.QueryRow(query, renameSessionId).Scan(&schema, &object, &state, &info); err != nil {
-		if err == gosql.ErrNoRows {
+		if errors.Is(err, gosql.ErrNoRows) {
 			this.migrationContext.Log.Warningf("query metadata locks returns %s, perhaps instrument wait/lock/metadata/sql/mdl not enabled, enable it via -allow-setup-metadata-lock-instruments", err)
 		}
 		this.migrationContext.Log.Warningf("Grabbing rename session acquire metadata lock error: %s, query:%s", err, query)

From 11540b453b850aadbecd90dfd145d8297535d45a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?vsth=E5=AD=99=E5=A4=A9=E6=B5=A9?= <thsun@trip.com>
Date: Fri, 21 Apr 2023 00:58:57 +0800
Subject: [PATCH 3/4] fix golangci-lint

---
 go/logic/applier.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/go/logic/applier.go b/go/logic/applier.go
index 9fae565ac..cdfed1580 100644
--- a/go/logic/applier.go
+++ b/go/logic/applier.go
@@ -7,11 +7,11 @@ package logic
 
 import (
 	gosql "database/sql"
+	"errors"
 	"fmt"
 	"strings"
 	"sync/atomic"
 	"time"
-	"errors"
 
 	"github.com/github/gh-ost/go/base"
 	"github.com/github/gh-ost/go/binlog"
@@ -1130,7 +1130,7 @@ func (this *Applier) ValidateGhostTableLocked(renameSessionId int64) (err error)
 		this.migrationContext.Log.Warningf("Grabbing rename session acquire metadata lock error: %s, query:%s", err, query)
 	} else {
 		this.migrationContext.Log.Infof("Grabbing rename session acquire metadata lock is schema:%s, object:%s, state:%s, info: %s", schema, object, state, info)
-		if strings.ToLower(object) != strings.ToLower(this.migrationContext.OriginalTableName) {
+		if !strings.EqualFold(strings.ToLower(object), strings.ToLower(this.migrationContext.OriginalTableName)) {
 			return this.migrationContext.Log.Errorf("Expect rename session %d acquiring table metadata lock is %s.%s, but got %s.%s",
 				renameSessionId,
 				sql.EscapeName(this.migrationContext.DatabaseName),

From 7fc7d60ca1cf1ae86cf9d254e081a12d71b1b527 Mon Sep 17 00:00:00 2001
From: Tim Vaillancourt <tim@timvaillancourt.com>
Date: Fri, 8 Dec 2023 23:54:23 +0100
Subject: [PATCH 4/4] Make go fmt happy

---
 go/base/context.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/go/base/context.go b/go/base/context.go
index 83226adc4..5ae9b9dad 100644
--- a/go/base/context.go
+++ b/go/base/context.go
@@ -233,7 +233,7 @@ type MigrationContext struct {
 	recentBinlogCoordinates mysql.BinlogCoordinates
 
 	AllowSetupMetadataLockInstruments bool
-	BinlogSyncerMaxReconnectAttempts int
+	BinlogSyncerMaxReconnectAttempts  int
 
 	Log Logger
 }