Skip to content

Commit

Permalink
Merge pull request #113 from sonroyaalmerol/cleanup-ws-handler
Browse files Browse the repository at this point in the history
add cleanup functions on handler registration
  • Loading branch information
sonroyaalmerol authored Jan 31, 2025
2 parents cb916db + 1bb73a9 commit 16c8c0a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 26 deletions.
2 changes: 1 addition & 1 deletion internal/backend/mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func Mount(storeInstance *store.Store, target *types.Target) (*AgentMount, error
// Mount using NFS
mountArgs := []string{
"-t", "nfs",
"-o", fmt.Sprintf("port=%s,mountport=%s,vers=3,ro,tcp,noacl,actimeo=3600,noatime", agentPort, agentPort),
"-o", fmt.Sprintf("port=%s,mountport=%s,vers=3,ro,tcp,noacl,nocto,actimeo=3600,rsize=1048576,lookupcache=positive,noatime", agentPort, agentPort),
fmt.Sprintf("%s:/", agentHost),
agentMount.Path,
}
Expand Down
58 changes: 39 additions & 19 deletions internal/proxy/controllers/plus/plus.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func MountHandler(storeInstance *store.Store) http.HandlerFunc {
agentDrive := string(agentDriveBytes)

if r.Method == http.MethodPost {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Send initial message
err := storeInstance.WSHub.SendToClient(targetHostname, websockets.Message{
Type: "backup_start",
Content: agentDrive,
Expand All @@ -54,34 +59,49 @@ func MountHandler(storeInstance *store.Store) http.HandlerFunc {
return
}

// Set up synchronization
var mu sync.Mutex
var cond = sync.NewCond(&mu)
var resp *websockets.Message
var wg sync.WaitGroup
wg.Add(1)
storeInstance.WSHub.RegisterHandler("response-backup_start", func(ctx context.Context, msg *websockets.Message) error {

// Register handler and ensure cleanup
cleanup := storeInstance.WSHub.RegisterHandler("response-backup_start", func(handlerCtx context.Context, msg *websockets.Message) error {
if msg.Content == "Acknowledged: "+agentDrive {
mu.Lock()
resp = msg
wg.Done()
cond.Signal()
mu.Unlock()
}

return nil
})

go func() {
time.Sleep(time.Second * 10)
wg.Done()
}()

wg.Wait()

if resp == nil {
http.Error(w, fmt.Sprintf("MountHandler: Failed to receive backup acknowledgement from target -> %v", err), http.StatusInternalServerError)
return
defer cleanup()

// Wait for response or timeout
mu.Lock()
for resp == nil {
done := make(chan struct{})
go func() {
cond.Wait()
close(done)
}()

select {
case <-done:
// Response received
case <-ctx.Done():
mu.Unlock()
http.Error(w, "MountHandler: Timeout waiting for backup acknowledgement from target", http.StatusInternalServerError)
return
}
}
mu.Unlock()

// Handle successful response
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "true"})

return
if err := json.NewEncoder(w).Encode(map[string]string{"status": "true"}); err != nil {
http.Error(w, fmt.Sprintf("MountHandler: Failed to encode response -> %v", err), http.StatusInternalServerError)
return
}
}

if r.Method == http.MethodDelete {
Expand Down
19 changes: 17 additions & 2 deletions internal/websockets/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,27 @@ func (c *WSClient) Send(ctx context.Context, msg Message) error {
return nil
}

func (c *WSClient) RegisterHandler(msgType string, handler MessageHandler) {
func (c *WSClient) RegisterHandler(msgType string, handler MessageHandler) UnregisterFunc {
c.handlerMu.Lock()
defer c.handlerMu.Unlock()
c.handlers[msgType] = handler
c.handlerMu.Unlock()

syslog.L.Infof("Registered message handler | type=%s client_id=%s",
msgType, c.config.ClientID)

return func() {
c.handlerMu.Lock()
defer c.handlerMu.Unlock()

if _, exists := c.handlers[msgType]; exists {
delete(c.handlers, msgType)
syslog.L.Infof("Unregistered message handler | type=%s client_id=%s",
msgType, c.config.ClientID)
} else {
syslog.L.Warnf("Attempted to unregister non-existent handler | type=%s client_id=%s",
msgType, c.config.ClientID)
}
}
}

func (c *WSClient) Close() error {
Expand Down
38 changes: 34 additions & 4 deletions internal/websockets/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,44 @@ func NewServer(ctx context.Context, opts ...ServerOption) *Server {
return s
}

func (s *Server) RegisterHandler(msgType string, handler MessageHandler) {
type UnregisterFunc func()

func (s *Server) RegisterHandler(msgType string, handler MessageHandler) UnregisterFunc {
s.handlerMu.Lock()
currentCount := len(s.handlers[msgType])
s.handlers[msgType] = append(s.handlers[msgType], handler)
currentHandlers := s.handlers[msgType]
handlerIndex := len(currentHandlers)
s.handlers[msgType] = append(currentHandlers, handler)
s.handlerMu.Unlock()

syslog.L.Infof("Registered message handler | type=%s handler_count=%d",
msgType, currentCount+1)
msgType, handlerIndex+1)

return func() {
s.handlerMu.Lock()
defer s.handlerMu.Unlock()

handlers := s.handlers[msgType]
if handlerIndex >= len(handlers) {
syslog.L.Warnf("Handler already unregistered | type=%s handler_index=%d",
msgType, handlerIndex)
return
}

newHandlers := make([]MessageHandler, 0, len(handlers)-1)
newHandlers = append(newHandlers, handlers[:handlerIndex]...)
if handlerIndex+1 < len(handlers) {
newHandlers = append(newHandlers, handlers[handlerIndex+1:]...)
}

if len(newHandlers) == 0 {
delete(s.handlers, msgType)
} else {
s.handlers[msgType] = newHandlers
}

syslog.L.Infof("Unregistered message handler | type=%s remaining_handlers=%d",
msgType, len(newHandlers))
}
}

func (s *Server) handleMessage(msg *Message) {
Expand Down

0 comments on commit 16c8c0a

Please sign in to comment.