Skip to content

Commit

Permalink
Schedule fix (#236)
Browse files Browse the repository at this point in the history
* checkin

* Ensure backups and retention are only evaluated on the primary

* Cleanup

* Fixes

* Continue on failure

* More cleanup

* No need to monitor backups outside of the primary region

* Minor tweak
  • Loading branch information
davissp14 authored Jul 9, 2024
1 parent cfb23e6 commit 6f8fc58
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 65 deletions.
7 changes: 4 additions & 3 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func main() {
}
}()

if os.Getenv("S3_ARCHIVE_CONFIG") != "" {
// No need to monitor backups outside of the primary region.
if os.Getenv("S3_ARCHIVE_CONFIG") != "" && node.PrimaryRegion == node.RepMgr.Region {
store, err := state.NewStore()
if err != nil {
panic(fmt.Errorf("failed initialize cluster state store: %s", err))
Expand All @@ -63,10 +64,10 @@ func main() {
}

// Backup scheduler
go monitorBackupSchedule(ctx, barman)
go monitorBackupSchedule(ctx, node, barman)

// Backup retention monitor
go monitorBackupRetention(ctx, barman)
go monitorBackupRetention(ctx, node, barman)
}

// Readonly monitor
Expand Down
19 changes: 13 additions & 6 deletions cmd/monitor/monitor_backup_retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,31 @@ import (
"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupRetention(ctx context.Context, barman *flypg.Barman) {
func monitorBackupRetention(ctx context.Context, node *flypg.Node, barman *flypg.Barman) {

ticker := time.NewTicker(defaultBackupRetentionEvalFrequency)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Println("Shutting down backup retention monitor")
log.Println("[WARN] Shutting down backup retention monitor...")
return
case <-ticker.C:
result, err := barman.WALArchiveDelete(ctx)
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("Backup retention failed with: %s", err)
log.Printf("[WARN] Failed to resolve primary when evaluating retention: %s", err)
continue
}

if !primary {
continue
}

if len(result) > 0 {
log.Printf("Backup retention response: %s", result)
if _, err := barman.WALArchiveDelete(ctx); err != nil {
log.Printf("[WARN] Failed to prune WAL Archive: %s", err)
}

}
}
}
162 changes: 106 additions & 56 deletions cmd/monitor/monitor_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,114 +9,164 @@ import (
"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupSchedule(ctx context.Context, barman *flypg.Barman) {
const (
backupRetryInterval = time.Second * 30
)

func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.Barman) {
// Determine when the last backup was taken.
lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("Failed to resolve the last backup taken: %s", err)
log.Printf("[WARN] %s", err)
}

fullBackupSchedule := defaultFullBackupSchedule
// Calculate the next scheduled backup time.
nextScheduledBackup := calculateNextBackupTime(barman, lastBackupTime)

// Set the full backup schedule if it is defined in the configuration.
if barman.Settings.FullBackupFrequency != "" {
fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency)
switch {
case err != nil:
log.Printf("Failed to parse full backup frequency: %s", err)
default:
fullBackupSchedule = fullBackupDur
}
// Check to see if we are the Primary.
primary, err := isPrimary(ctx, node)
if err != nil {
log.Printf("[WARN] Failed to resolve primary status: %s", err)
}

// Ensure we have a least one backup before proceeding.
if lastBackupTime.IsZero() {
log.Println("No backups found! Performing the initial base backup.")
// Perform the initial base backup if we are the primary and either no backups have been taken
// or the next scheduled backup time is less than 0.
if primary {
if nextScheduledBackup < 0 {
err := performBaseBackup(ctx, barman, true)
switch {
case err != nil:
log.Printf("[WARN] Failed to perform initial base backup: %s", err)
log.Printf("[INFO] Retrying in 10 minutes...")
lastBackupTime = time.Now().Add(-backupFrequency(barman) + 10*time.Minute)
default:
log.Println("[INFO] Initial base backup completed successfully")
lastBackupTime = time.Now()
}

if err := performInitialBaseBackup(ctx, barman); err != nil {
log.Printf("Failed to perform the initial full backup: %s", err)
log.Printf("Backup scheduler will re-attempt in %s.", fullBackupSchedule)
// Recalculate the next scheduled backup time after the initial backup.
nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime)
}

lastBackupTime = time.Now()
}

log.Printf("Full backup schedule set to: %s", fullBackupSchedule)

// Calculate the time until the next backup is due.
timeUntilNextBackup := time.Until(lastBackupTime.Add(fullBackupSchedule))

// Perform backup immediately if the time until the next backup is negative.
if timeUntilNextBackup < 0 {
log.Println("Performing full backup now")
_, err := barman.Backup(ctx, false)
if err != nil {
log.Printf("Full backup failed with: %s", err)
}

timeUntilNextBackup = fullBackupSchedule
// Safety net in case ticker does not have a valid duration.
if nextScheduledBackup < 0 {
nextScheduledBackup = backupFrequency(barman)
}

log.Printf("Next full backup due in: %s", timeUntilNextBackup)
log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup)

ticker := time.NewTicker(timeUntilNextBackup)
// Monitor the backup schedule even if we are not the primary. This is to ensure backups will
// continue to be taken in the event of a failover.
ticker := time.NewTicker(nextScheduledBackup)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Println("Shutting down backup scheduler")
log.Println("[WARN] Shutting down backup schedule monitor...")
return
case <-ticker.C:
// Perform a backup while passively waiting for the checkpoint process to complete.
// This could actually take a while, so we should be prepared to wait.
log.Println("Performing full backup")
_, err := barman.Backup(ctx, false)
// Check to see if we are the Primary. This is necessary given failovers can occur at runtime.
primary, err := isPrimary(ctx, node)
if err != nil {
// TODO - Implement a backup-off strategy.
timeUntilNextBackup = time.Hour * 1
ticker.Reset(timeUntilNextBackup)
log.Printf("[WARN] Failed to resolve primary status: %s", err)
continue
}

log.Printf("Backup retention failed with: %s.", err)
log.Printf("Backup will be re-attempted in %s.", timeUntilNextBackup)
if !primary {
continue
}

lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("[WARN] Failed to determine when the last backup was taken: %s", err)
continue
}

log.Printf("Full backup completed successfully")
ticker.Reset(fullBackupSchedule)
// Recalculate the next scheduled backup time.
nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime)

// Perform a full backup if the next scheduled backup time is less than 0.
if nextScheduledBackup < 0 {
log.Println("[INFO] Performing full backup...")
if err := performBaseBackup(ctx, barman, false); err != nil {
log.Printf("[WARN] Failed to perform full backup: %v", err)
}

// TODO - We should consider retrying at a shorter interval in the event of a failure.
nextScheduledBackup = backupFrequency(barman)
}

log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup)

// Reset the ticker frequency in case the backup frequency has changed.
ticker.Reset(nextScheduledBackup)
}
}
}

func performInitialBaseBackup(ctx context.Context, barman *flypg.Barman) error {
func calculateNextBackupTime(barman *flypg.Barman, lastBackupTime time.Time) time.Duration {
// If there was no backup, return a negative duration to trigger an immediate backup.
if lastBackupTime.IsZero() {
return -1
}
return time.Until(lastBackupTime.Add(backupFrequency(barman)))
}

func isPrimary(ctx context.Context, node *flypg.Node) (bool, error) {
conn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
return false, fmt.Errorf("failed to open local connection: %s", err)
}
defer func() { _ = conn.Close(ctx) }()

return node.RepMgr.IsPrimary(ctx, conn)
}

func backupFrequency(barman *flypg.Barman) time.Duration {
fullBackupSchedule := defaultFullBackupSchedule

// Set the full backup schedule if it is defined in the configuration.
if barman.Settings.FullBackupFrequency != "" {
fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency)
switch {
case err != nil:
log.Printf("[WARN] Failed to parse full backup frequency: %s", err)
default:
fullBackupSchedule = fullBackupDur
}
}

return fullBackupSchedule
}

func performBaseBackup(ctx context.Context, barman *flypg.Barman, immediateCheckpoint bool) error {
maxRetries := 10
retryCount := 0
for {
select {
case <-ctx.Done():
return nil
return ctx.Err()
default:
_, err := barman.Backup(ctx, true)
if err != nil {
log.Printf("Failed to perform the initial full backup: %s. Retrying in 30 seconds.", err)
if _, err := barman.Backup(ctx, immediateCheckpoint); err != nil {
log.Printf("[WARN] Failed to perform full backup: %s. Retrying in 30 seconds.", err)

// If we've exceeded the maximum number of retries, we should return an error.
if retryCount >= maxRetries {
return fmt.Errorf("failed to perform the initial full backup after %d retries", maxRetries)
return fmt.Errorf("failed to perform full backup after %d retries", maxRetries)
}

retryCount++

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 30):
case <-time.After(backupRetryInterval):
continue
}
}

log.Println("Initial full backup completed successfully")
return nil
}
}
Expand Down
Loading

0 comments on commit 6f8fc58

Please sign in to comment.