Skip to content

Commit

Permalink
use channels instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Son Roy Almerol committed Jan 31, 2025
1 parent 1bb73a9 commit 2f1f958
Showing 1 changed file with 16 additions and 37 deletions.
53 changes: 16 additions & 37 deletions internal/proxy/controllers/plus/plus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/sonroyaalmerol/pbs-plus/internal/store"
Expand Down Expand Up @@ -45,10 +44,19 @@ func MountHandler(storeInstance *store.Store) http.HandlerFunc {
agentDrive := string(agentDriveBytes)

if r.Method == http.MethodPost {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Create response channel and register handler
respChan := make(chan *websockets.Message, 1)
cleanup := storeInstance.WSHub.RegisterHandler("response-backup_start", func(handlerCtx context.Context, msg *websockets.Message) error {
if msg.Content == "Acknowledged: "+agentDrive {
respChan <- msg
}
return nil
})
defer cleanup()

// Send initial message
err := storeInstance.WSHub.SendToClient(targetHostname, websockets.Message{
Type: "backup_start",
Expand All @@ -59,42 +67,13 @@ func MountHandler(storeInstance *store.Store) http.HandlerFunc {
return
}

// Set up synchronization
var mu sync.Mutex
var cond = sync.NewCond(&mu)
var resp *websockets.Message

// Register handler and ensure cleanup
cleanup := storeInstance.WSHub.RegisterHandler("response-backup_start", func(handlerCtx context.Context, msg *websockets.Message) error {
if msg.Content == "Acknowledged: "+agentDrive {
mu.Lock()
resp = msg
cond.Signal()
mu.Unlock()
}
return nil
})
defer cleanup()

// Wait for response or timeout
mu.Lock()
for resp == nil {
done := make(chan struct{})
go func() {
cond.Wait()
close(done)
}()

select {
case <-done:
// Response received
case <-ctx.Done():
mu.Unlock()
http.Error(w, "MountHandler: Timeout waiting for backup acknowledgement from target", http.StatusInternalServerError)
return
}
// Wait for either response or timeout
select {
case <-respChan:
case <-ctx.Done():
http.Error(w, "MountHandler: Timeout waiting for backup acknowledgement from target", http.StatusInternalServerError)
return
}
mu.Unlock()

// Handle successful response
w.Header().Set("Content-Type", "application/json")
Expand Down

0 comments on commit 2f1f958

Please sign in to comment.