diff --git a/cmd/monitor/main.go b/cmd/monitor/main.go index a11f48b7..be9edb42 100644 --- a/cmd/monitor/main.go +++ b/cmd/monitor/main.go @@ -47,7 +47,8 @@ func main() { } }() - if os.Getenv("S3_ARCHIVE_CONFIG") != "" { + // No need to monitor backups outside of the primary region. + if os.Getenv("S3_ARCHIVE_CONFIG") != "" && node.PrimaryRegion == node.RepMgr.Region { store, err := state.NewStore() if err != nil { panic(fmt.Errorf("failed initialize cluster state store: %s", err)) @@ -63,10 +64,10 @@ func main() { } // Backup scheduler - go monitorBackupSchedule(ctx, barman) + go monitorBackupSchedule(ctx, node, barman) // Backup retention monitor - go monitorBackupRetention(ctx, barman) + go monitorBackupRetention(ctx, node, barman) } // Readonly monitor diff --git a/cmd/monitor/monitor_backup_retention.go b/cmd/monitor/monitor_backup_retention.go index c2b5915c..f53dd34f 100644 --- a/cmd/monitor/monitor_backup_retention.go +++ b/cmd/monitor/monitor_backup_retention.go @@ -8,24 +8,31 @@ import ( "github.com/fly-apps/postgres-flex/internal/flypg" ) -func monitorBackupRetention(ctx context.Context, barman *flypg.Barman) { +func monitorBackupRetention(ctx context.Context, node *flypg.Node, barman *flypg.Barman) { + ticker := time.NewTicker(defaultBackupRetentionEvalFrequency) defer ticker.Stop() for { select { case <-ctx.Done(): - log.Println("Shutting down backup retention monitor") + log.Println("[WARN] Shutting down backup retention monitor...") return case <-ticker.C: - result, err := barman.WALArchiveDelete(ctx) + primary, err := isPrimary(ctx, node) if err != nil { - log.Printf("Backup retention failed with: %s", err) + log.Printf("[WARN] Failed to resolve primary when evaluating retention: %s", err) + continue + } + + if !primary { + continue } - if len(result) > 0 { - log.Printf("Backup retention response: %s", result) + if _, err := barman.WALArchiveDelete(ctx); err != nil { + log.Printf("[WARN] Failed to prune WAL Archive: %s", err) } + } } } diff --git a/cmd/monitor/monitor_backup_schedule.go b/cmd/monitor/monitor_backup_schedule.go index f26b7a28..a16b24cb 100644 --- a/cmd/monitor/monitor_backup_schedule.go +++ b/cmd/monitor/monitor_backup_schedule.go @@ -9,101 +9,152 @@ import ( "github.com/fly-apps/postgres-flex/internal/flypg" ) -func monitorBackupSchedule(ctx context.Context, barman *flypg.Barman) { +const ( + backupRetryInterval = time.Second * 30 +) + +func monitorBackupSchedule(ctx context.Context, node *flypg.Node, barman *flypg.Barman) { // Determine when the last backup was taken. lastBackupTime, err := barman.LastCompletedBackup(ctx) if err != nil { - log.Printf("Failed to resolve the last backup taken: %s", err) + log.Printf("[WARN] %s", err) } - fullBackupSchedule := defaultFullBackupSchedule + // Calculate the next scheduled backup time. + nextScheduledBackup := calculateNextBackupTime(barman, lastBackupTime) - // Set the full backup schedule if it is defined in the configuration. - if barman.Settings.FullBackupFrequency != "" { - fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency) - switch { - case err != nil: - log.Printf("Failed to parse full backup frequency: %s", err) - default: - fullBackupSchedule = fullBackupDur - } + // Check to see if we are the Primary. + primary, err := isPrimary(ctx, node) + if err != nil { + log.Printf("[WARN] Failed to resolve primary status: %s", err) } - // Ensure we have a least one backup before proceeding. - if lastBackupTime.IsZero() { - log.Println("No backups found! Performing the initial base backup.") + // Perform the initial base backup if we are the primary and either no backups have been taken + // or the next scheduled backup time is less than 0. + if primary { + if nextScheduledBackup < 0 { + err := performBaseBackup(ctx, barman, true) + switch { + case err != nil: + log.Printf("[WARN] Failed to perform initial base backup: %s", err) + log.Printf("[INFO] Retrying in 10 minutes...") + lastBackupTime = time.Now().Add(-backupFrequency(barman) + 10*time.Minute) + default: + log.Println("[INFO] Initial base backup completed successfully") + lastBackupTime = time.Now() + } - if err := performInitialBaseBackup(ctx, barman); err != nil { - log.Printf("Failed to perform the initial full backup: %s", err) - log.Printf("Backup scheduler will re-attempt in %s.", fullBackupSchedule) + // Recalculate the next scheduled backup time after the initial backup. + nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime) } - - lastBackupTime = time.Now() } - log.Printf("Full backup schedule set to: %s", fullBackupSchedule) - - // Calculate the time until the next backup is due. - timeUntilNextBackup := time.Until(lastBackupTime.Add(fullBackupSchedule)) - - // Perform backup immediately if the time until the next backup is negative. - if timeUntilNextBackup < 0 { - log.Println("Performing full backup now") - _, err := barman.Backup(ctx, false) - if err != nil { - log.Printf("Full backup failed with: %s", err) - } - - timeUntilNextBackup = fullBackupSchedule + // Safety net in case ticker does not have a valid duration. + if nextScheduledBackup < 0 { + nextScheduledBackup = backupFrequency(barman) } - log.Printf("Next full backup due in: %s", timeUntilNextBackup) + log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup) - ticker := time.NewTicker(timeUntilNextBackup) + // Monitor the backup schedule even if we are not the primary. This is to ensure backups will + // continue to be taken in the event of a failover. + ticker := time.NewTicker(nextScheduledBackup) defer ticker.Stop() for { select { case <-ctx.Done(): - log.Println("Shutting down backup scheduler") + log.Println("[WARN] Shutting down backup schedule monitor...") return case <-ticker.C: - // Perform a backup while passively waiting for the checkpoint process to complete. - // This could actually take a while, so we should be prepared to wait. - log.Println("Performing full backup") - _, err := barman.Backup(ctx, false) + // Check to see if we are the Primary. This is necessary given failovers can occur at runtime. + primary, err := isPrimary(ctx, node) if err != nil { - // TODO - Implement a backup-off strategy. - timeUntilNextBackup = time.Hour * 1 - ticker.Reset(timeUntilNextBackup) + log.Printf("[WARN] Failed to resolve primary status: %s", err) + continue + } - log.Printf("Backup retention failed with: %s.", err) - log.Printf("Backup will be re-attempted in %s.", timeUntilNextBackup) + if !primary { + continue + } + lastBackupTime, err := barman.LastCompletedBackup(ctx) + if err != nil { + log.Printf("[WARN] Failed to determine when the last backup was taken: %s", err) continue } - log.Printf("Full backup completed successfully") - ticker.Reset(fullBackupSchedule) + // Recalculate the next scheduled backup time. + nextScheduledBackup = calculateNextBackupTime(barman, lastBackupTime) + + // Perform a full backup if the next scheduled backup time is less than 0. + if nextScheduledBackup < 0 { + log.Println("[INFO] Performing full backup...") + if err := performBaseBackup(ctx, barman, false); err != nil { + log.Printf("[WARN] Failed to perform full backup: %v", err) + } + + // TODO - We should consider retrying at a shorter interval in the event of a failure. + nextScheduledBackup = backupFrequency(barman) + } + + log.Printf("[INFO] Next full backup due in: %s", nextScheduledBackup) + + // Reset the ticker frequency in case the backup frequency has changed. + ticker.Reset(nextScheduledBackup) } } } -func performInitialBaseBackup(ctx context.Context, barman *flypg.Barman) error { +func calculateNextBackupTime(barman *flypg.Barman, lastBackupTime time.Time) time.Duration { + // If there was no backup, return a negative duration to trigger an immediate backup. + if lastBackupTime.IsZero() { + return -1 + } + return time.Until(lastBackupTime.Add(backupFrequency(barman))) +} + +func isPrimary(ctx context.Context, node *flypg.Node) (bool, error) { + conn, err := node.RepMgr.NewLocalConnection(ctx) + if err != nil { + return false, fmt.Errorf("failed to open local connection: %s", err) + } + defer func() { _ = conn.Close(ctx) }() + + return node.RepMgr.IsPrimary(ctx, conn) +} + +func backupFrequency(barman *flypg.Barman) time.Duration { + fullBackupSchedule := defaultFullBackupSchedule + + // Set the full backup schedule if it is defined in the configuration. + if barman.Settings.FullBackupFrequency != "" { + fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency) + switch { + case err != nil: + log.Printf("[WARN] Failed to parse full backup frequency: %s", err) + default: + fullBackupSchedule = fullBackupDur + } + } + + return fullBackupSchedule +} + +func performBaseBackup(ctx context.Context, barman *flypg.Barman, immediateCheckpoint bool) error { maxRetries := 10 retryCount := 0 for { select { case <-ctx.Done(): - return nil + return ctx.Err() default: - _, err := barman.Backup(ctx, true) - if err != nil { - log.Printf("Failed to perform the initial full backup: %s. Retrying in 30 seconds.", err) + if _, err := barman.Backup(ctx, immediateCheckpoint); err != nil { + log.Printf("[WARN] Failed to perform full backup: %s. Retrying in 30 seconds.", err) // If we've exceeded the maximum number of retries, we should return an error. if retryCount >= maxRetries { - return fmt.Errorf("failed to perform the initial full backup after %d retries", maxRetries) + return fmt.Errorf("failed to perform full backup after %d retries", maxRetries) } retryCount++ @@ -111,12 +162,11 @@ func performInitialBaseBackup(ctx context.Context, barman *flypg.Barman) error { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(time.Second * 30): + case <-time.After(backupRetryInterval): continue } } - log.Println("Initial full backup completed successfully") return nil } } diff --git a/cmd/monitor/monitor_backup_schedule_test.go b/cmd/monitor/monitor_backup_schedule_test.go new file mode 100644 index 00000000..c303fceb --- /dev/null +++ b/cmd/monitor/monitor_backup_schedule_test.go @@ -0,0 +1,146 @@ +package main + +import ( + "fmt" + "math" + "os" + "testing" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg" + "github.com/fly-apps/postgres-flex/internal/flypg/state" +) + +const ( + testBarmanConfigDir = "./test_results/barman" + pgTestDirectory = "./test_results/" +) + +func TestBackupFrequency(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + setDefaultEnv(t) + + store, _ := state.NewStore() + + barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + frequency := backupFrequency(barman) + expected := time.Hour * 24 + if frequency != expected { + t.Fatalf("expected frequency to be %s, but got %s", expected, frequency) + } +} + +func TestCalculateNextBackupTime(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + setDefaultEnv(t) + + store, _ := state.NewStore() + barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + t.Run("no backups", func(t *testing.T) { + nextBackupTime := calculateNextBackupTime(barman, time.Time{}) + + expected := 0.0 + val := math.Round(nextBackupTime.Hours()) + if expected != val { + t.Fatalf("expected next backup time duration to be %f, but got %f", expected, val) + } + }) + + t.Run("backup-reset", func(t *testing.T) { + nextBackupTime := calculateNextBackupTime(barman, time.Now()) + + expected := 24.0 + val := math.Round(nextBackupTime.Hours()) + if val != expected { + t.Fatalf("expected next backup duration to be %f, but got %f", expected, val) + } + }) + + t.Run("backup-delay", func(t *testing.T) { + delay := time.Now().Add(-backupFrequency(barman) + time.Minute*30) + + nextBackupTime := calculateNextBackupTime(barman, delay) + + expected := 30.0 + val := math.Round(nextBackupTime.Minutes()) + if val != expected { + t.Fatalf("expected next backup duration to be %f, but got %f", expected, val) + } + }) + + t.Run("recent backup", func(t *testing.T) { + lastBackup := time.Now().Add(-1 * time.Hour) + + nextBackupTime := calculateNextBackupTime(barman, lastBackup) + + expected := 23.0 + val := math.Round(nextBackupTime.Hours()) + if val != expected { + t.Fatalf("expected next backup time duration to be %f, but got %f", expected, val) + } + }) + + t.Run("old backup", func(t *testing.T) { + lastBackup := time.Now().Add(-25 * time.Hour) + + nextBackupTime := calculateNextBackupTime(barman, lastBackup) + + expected := -1.0 + val := math.Round(nextBackupTime.Hours()) + if val != expected { + t.Fatalf("expected next backup time duration to be %f, but got %f", expected, val) + } + }) + +} + +func setDefaultEnv(t *testing.T) { + t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") + t.Setenv("FLY_APP_NAME", "postgres-flex") +} + +func setup(t *testing.T) error { + t.Setenv("FLY_VM_MEMORY_MB", fmt.Sprint(256*(1024*1024))) + t.Setenv("UNIT_TESTING", "true") + + if _, err := os.Stat(pgTestDirectory); err != nil { + if os.IsNotExist(err) { + if err := os.Mkdir(pgTestDirectory, 0750); err != nil { + return err + } + } else { + return err + } + } + return nil +} + +func cleanup() { + if err := os.RemoveAll(pgTestDirectory); err != nil { + fmt.Printf("failed to remove testing dir: %s\n", err) + } +} diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 0db72eeb..46ceab8c 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -382,6 +382,15 @@ func (*RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) return &member, nil } +func (r *RepMgr) IsPrimary(ctx context.Context, pg *pgx.Conn) (bool, error) { + member, err := r.Member(ctx, pg) + if err != nil { + return false, err + } + + return member.Role == PrimaryRoleName, nil +} + func (r *RepMgr) VotingMembers(ctx context.Context, conn *pgx.Conn) ([]Member, error) { members, err := r.Members(ctx, conn) if err != nil {