From 2f1f95807eff005530fa4b0a1063a38214e663cf Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 31 Jan 2025 15:17:17 -0500 Subject: [PATCH] use channels instead --- internal/proxy/controllers/plus/plus.go | 53 ++++++++----------------- 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/internal/proxy/controllers/plus/plus.go b/internal/proxy/controllers/plus/plus.go index 86cb4af..0dcec92 100644 --- a/internal/proxy/controllers/plus/plus.go +++ b/internal/proxy/controllers/plus/plus.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "net/http" - "sync" "time" "github.com/sonroyaalmerol/pbs-plus/internal/store" @@ -45,10 +44,19 @@ func MountHandler(storeInstance *store.Store) http.HandlerFunc { agentDrive := string(agentDriveBytes) if r.Method == http.MethodPost { - // Create context with timeout ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + // Create response channel and register handler + respChan := make(chan *websockets.Message, 1) + cleanup := storeInstance.WSHub.RegisterHandler("response-backup_start", func(handlerCtx context.Context, msg *websockets.Message) error { + if msg.Content == "Acknowledged: "+agentDrive { + respChan <- msg + } + return nil + }) + defer cleanup() + // Send initial message err := storeInstance.WSHub.SendToClient(targetHostname, websockets.Message{ Type: "backup_start", @@ -59,42 +67,13 @@ func MountHandler(storeInstance *store.Store) http.HandlerFunc { return } - // Set up synchronization - var mu sync.Mutex - var cond = sync.NewCond(&mu) - var resp *websockets.Message - - // Register handler and ensure cleanup - cleanup := storeInstance.WSHub.RegisterHandler("response-backup_start", func(handlerCtx context.Context, msg *websockets.Message) error { - if msg.Content == "Acknowledged: "+agentDrive { - mu.Lock() - resp = msg - cond.Signal() - mu.Unlock() - } - return nil - }) - defer cleanup() - - // Wait for response or timeout - mu.Lock() - for resp == nil { - done := make(chan struct{}) - go func() { - cond.Wait() - close(done) - }() - - select { - case <-done: - // Response received - case <-ctx.Done(): - mu.Unlock() - http.Error(w, "MountHandler: Timeout waiting for backup acknowledgement from target", http.StatusInternalServerError) - return - } + // Wait for either response or timeout + select { + case <-respChan: + case <-ctx.Done(): + http.Error(w, "MountHandler: Timeout waiting for backup acknowledgement from target", http.StatusInternalServerError) + return } - mu.Unlock() // Handle successful response w.Header().Set("Content-Type", "application/json")