Skip to content

Commit

Permalink
- Support for --postpone-swap-tables-flag-file: while this file exi…
Browse files Browse the repository at this point in the history
…sts, final table swap does not take place, and the ghost table keeps being synchronized

- Fixed version printing
- `rowCopyCompleteFlag` is a hint that allows us to escape the infinite loop of rowcopy once we are sure we have reached the end
  • Loading branch information
Shlomi Noach committed May 17, 2016
1 parent 9a3c607 commit 879b2b4
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 7 deletions.
1 change: 1 addition & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type MigrationContext struct {
ThrottleFlagFile string
ThrottleAdditionalFlagFile string
MaxLoad map[string]int64
PostponeSwapTablesFlagFile string
SwapTablesTimeoutSeconds int64

Noop bool
Expand Down
8 changes: 8 additions & 0 deletions go/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package base

import (
"fmt"
"os"
"regexp"
"time"
)
Expand All @@ -23,3 +24,10 @@ func PrettifyDurationOutput(d time.Duration) string {
result = prettifyDurationRegexp.ReplaceAllString(result, "")
return result
}

func FileExists(fileName string) bool {
if _, err := os.Stat(fileName); err == nil {
return true
}
return false
}
4 changes: 3 additions & 1 deletion go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func main() {
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
flag.StringVar(&migrationContext.PostponeSwapTablesFlagFile, "postpone-swap-tables-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Swapping would be ready to perform the moment the file is deleted.")

maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
quiet := flag.Bool("quiet", false, "quiet")
verbose := flag.Bool("verbose", false, "verbose")
Expand Down Expand Up @@ -117,7 +119,7 @@ func main() {
log.Fatale(err)
}

log.Info("starting gh-ost %+v", AppVersion)
log.Infof("starting gh-ost %+v", AppVersion)

migrator := logic.NewMigrator()
err := migrator.Migrate()
Expand Down
47 changes: 41 additions & 6 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type Migrator struct {
voluntaryLockAcquired chan bool
panicAbort chan error

allEventsUpToLockProcessedFlag int64
rowCopyCompleteFlag int64
allEventsUpToLockProcessedInjectedFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
Expand All @@ -66,7 +67,7 @@ func NewMigrator() *Migrator {
voluntaryLockAcquired: make(chan bool, 1),
panicAbort: make(chan error),

allEventsUpToLockProcessedFlag: 0,
allEventsUpToLockProcessedInjectedFlag: 0,

copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
Expand All @@ -93,13 +94,13 @@ func (this *Migrator) acceptSignals() {
func (this *Migrator) shouldThrottle() (result bool, reason string) {
// User-based throttle
if this.migrationContext.ThrottleFlagFile != "" {
if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil {
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
// Throttle file defined and exists!
return true, "flag-file"
}
}
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
if _, err := os.Stat(this.migrationContext.ThrottleAdditionalFlagFile); err == nil {
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
// 2nd Throttle file defined and exists!
return true, "flag-file"
}
Expand All @@ -109,7 +110,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
}
if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedFlag) == 0) {
if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) == 0) {
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
if err != nil {
return true, err.Error()
Expand Down Expand Up @@ -172,6 +173,21 @@ func (this *Migrator) throttle(onThrottled func()) {
}
}

// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
// (or fails with error)
func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
for {
shouldSleep, err := operation()
if err != nil {
return err
}
if !shouldSleep {
return nil
}
time.Sleep(time.Second)
}
}

// retryOperation attempts up to `count` attempts at running given function,
// exiting as soon as it returns with non-error.
func (this *Migrator) retryOperation(operation func() error) (err error) {
Expand Down Expand Up @@ -205,6 +221,7 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
// consumers and drops any further incoming events that may be left hanging.
func (this *Migrator) consumeRowCopyComplete() {
<-this.rowCopyComplete
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
go func() {
for <-this.rowCopyComplete {
}
Expand Down Expand Up @@ -330,6 +347,20 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
log.Debugf("throttling before swapping tables")
})

this.sleepWhileTrue(
func() (bool, error) {
if this.migrationContext.PostponeSwapTablesFlagFile == "" {
return false, nil
}
if base.FileExists(this.migrationContext.PostponeSwapTablesFlagFile) {
// Throttle file defined and exists!
log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeSwapTablesFlagFile)
return true, nil
}
return false, nil
},
)

if this.migrationContext.TestOnReplica {
return this.stopWritesAndCompleteMigrationOnReplica()
}
Expand Down Expand Up @@ -374,8 +405,8 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
return err
}
log.Debugf("Waiting for events up to lock")
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1)
<-this.allEventsUpToLockProcessed
atomic.StoreInt64(&this.allEventsUpToLockProcessedFlag, 1)
log.Debugf("Done waiting for events up to lock")
this.printStatus()

Expand Down Expand Up @@ -687,6 +718,10 @@ func (this *Migrator) iterateChunks() error {
return terminateRowIteration(nil)
}
for {
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
// Done
return nil
}
copyRowsFunc := func() error {
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
if err != nil {
Expand Down

0 comments on commit 879b2b4

Please sign in to comment.