Skip to content

Commit

Permalink
Merge pull request #169 from sonroyaalmerol/update-go
Browse files Browse the repository at this point in the history
ensure backup start and close handlers are thread safe
  • Loading branch information
sonroyaalmerol authored Feb 17, 2025
2 parents 39c90b4 + afade97 commit 71e5520
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 179 deletions.
8 changes: 5 additions & 3 deletions cmd/windows_updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/alexflint/go-filemutex"
"github.com/kardianos/service"
"github.com/sonroyaalmerol/pbs-plus/internal/agent"
"github.com/sonroyaalmerol/pbs-plus/internal/agent/controllers"
"github.com/sonroyaalmerol/pbs-plus/internal/syslog"
"golang.org/x/sys/windows"
)
Expand Down Expand Up @@ -115,8 +114,11 @@ func (u *UpdaterService) runUpdateCheck() {
}

func (u *UpdaterService) checkForActiveBackups() (bool, error) {
store := controllers.GetNFSSessionStore()
return store.HasSessions(), nil
store, err := agent.NewBackupStore()
if err != nil {
return true, err
}
return store.HasActiveBackups()
}

func (u *UpdaterService) checkForNewVersion() (string, error) {
Expand Down
124 changes: 0 additions & 124 deletions internal/agent/controllers/nfssession.go

This file was deleted.

104 changes: 80 additions & 24 deletions internal/agent/controllers/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package controllers
import (
"context"
"fmt"
"sync"

"github.com/sonroyaalmerol/pbs-plus/internal/agent"
"github.com/sonroyaalmerol/pbs-plus/internal/agent/nfs"
Expand All @@ -13,6 +14,12 @@ import (
"github.com/sonroyaalmerol/pbs-plus/internal/websockets"
)

var (
activeBackupsCtx map[string]context.Context
activeBackupsCtxCancel map[string]context.CancelFunc
activeBackupsMu sync.Mutex
)

func sendResponse(c *websockets.WSClient, msgType, content string) {
response := websockets.Message{
Type: "response-" + msgType,
Expand All @@ -36,42 +43,92 @@ func BackupStartHandler(c *websockets.WSClient) func(ctx context.Context, msg *w
drive := msg.Content
syslog.L.Infof("Received backup request for drive %s.", drive)

store := GetNFSSessionStore()
if err := store.Delete(drive); err != nil {
syslog.L.Errorf("Error cleaning up session store: %v", err)
store, err := agent.NewBackupStore()
if err != nil {
syslog.L.Errorf("backup store error: %v", err)
sendError(c, "backup_start", drive, err.Error())
return err
}

backupStatus := agent.GetBackupStatus()
backupStatus.StartBackup(drive)
defer backupStatus.EndBackup(drive)
if hasActive, err := store.HasActiveBackupForDrive(drive); hasActive || err != nil {
if err != nil {
syslog.L.Errorf("backup store error: %v", err)
sendError(c, "backup_start", drive, err.Error())
return err
}

syslog.L.Errorf("an attempt to backup drive %s was cancelled due to existing session", drive)
sendError(c, "backup_start", drive, "An existing backup for requested drive is currently running. Only one instance is allowed at a time.")
}

activeBackupsMu.Lock()

if activeBackupsCtx == nil {
activeBackupsCtx = make(map[string]context.Context)
}
if activeBackupsCtxCancel == nil {
activeBackupsCtxCancel = make(map[string]context.CancelFunc)
}

if cancel, ok := activeBackupsCtxCancel[drive]; ok {
cancel()
}

activeBackupsCtx[drive], activeBackupsCtxCancel[drive] = context.WithCancel(context.Background())

activeBackupsMu.Unlock()

err = store.StartBackup(drive)
if err != nil {
syslog.L.Errorf("backup store error: %v", err)
sendError(c, "backup_start", drive, err.Error())
return err
}

snapshot, err := snapshots.Snapshot(drive)
if err != nil {
syslog.L.Errorf("snapshot error: %v", err)
sendError(c, "backup_start", drive, err.Error())
_ = store.EndBackup(drive)
return err
}

nfsSession := nfs.NewNFSSession(context.Background(), snapshot, drive)
activeBackupsMu.Lock()
currentCtx := activeBackupsCtx[drive]
currentCtxCancel := activeBackupsCtxCancel[drive]
activeBackupsMu.Unlock()

go func() {
<-currentCtx.Done()
_ = store.EndBackup(drive)
snapshot.Close()
}()

nfsSession := nfs.NewNFSSession(currentCtx, snapshot, drive)
if nfsSession == nil {
syslog.L.Error("NFS session is nil.")
sendError(c, "backup_start", drive, "NFS session is nil.")
_ = store.EndBackup(drive)
snapshot.Close()
return fmt.Errorf("NFS session is nil.")
}

if err := store.Store(drive, nfsSession); err != nil {
syslog.L.Errorf("Error storing session: %v", err)
err = store.StartNFS(drive)
if err != nil {
syslog.L.Errorf("backup store error: %v", err)
sendError(c, "backup_start", drive, err.Error())
snapshot.Close()
return err
}

go func() {
defer func() {
if r := recover(); r != nil {
syslog.L.Errorf("Panic in NFS session for drive %s: %v", drive, r)
}
if err := store.Delete(drive); err != nil {
syslog.L.Errorf("Error cleaning up session store: %v", err)
}
backupStatus.EndBackup(drive)
_ = store.EndBackup(drive)
snapshot.Close()
currentCtxCancel()
}()
nfsSession.Serve()
}()
Expand All @@ -86,17 +143,16 @@ func BackupCloseHandler(c *websockets.WSClient) func(ctx context.Context, msg *w
drive := msg.Content
syslog.L.Infof("Received closure request for drive %s.", drive)

store := GetNFSSessionStore()
if err := store.Delete(drive); err != nil {
syslog.L.Errorf("Error cleaning up session store: %v", err)
sendError(c, "backup_close", drive, err.Error())
return err
}

backupStatus := agent.GetBackupStatus()
backupStatus.EndBackup(drive)
activeBackupsMu.Lock()
defer activeBackupsMu.Unlock()

sendResponse(c, "backup_close", drive)
return nil
if cancel, ok := activeBackupsCtxCancel[drive]; ok {
cancel()
sendResponse(c, "backup_close", drive)
return nil
} else {
sendError(c, "backup_close", drive, "No ongoing backup for drive")
return fmt.Errorf("No ongoing backup for drive %s", drive)
}
}
}
Loading

0 comments on commit 71e5520

Please sign in to comment.