Skip to content

Commit

Permalink
Merge pull request #58 from sonroyaalmerol/remove-passthrough-proxy
Browse files Browse the repository at this point in the history
Merge websockets with the golang broadcast channel implementation
  • Loading branch information
sonroyaalmerol authored Jan 12, 2025
2 parents c3ffad7 + b58545d commit 1992baa
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 217 deletions.
5 changes: 4 additions & 1 deletion cmd/pbs_plus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"flag"
"log"
"net/http"
Expand Down Expand Up @@ -32,7 +33,9 @@ func main() {
jobRun := flag.String("job", "", "Job ID to execute")
flag.Parse()

wsHub := websockets.NewServer()
wsHub := websockets.NewServer(context.Background())
go wsHub.Run()

storeInstance, err := store.Initialize(wsHub)
if err != nil {
s.Errorf("Failed to initialize store: %v", err)
Expand Down
9 changes: 4 additions & 5 deletions internal/proxy/controllers/plus/plus.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func MountHandler(storeInstance *store.Store) http.HandlerFunc {
agentDrive := string(agentDriveBytes)

if r.Method == http.MethodPost {
err := storeInstance.WSHub.SendCommand(targetHostname, websockets.Message{
err := storeInstance.WSHub.SendToClient(targetHostname, websockets.Message{
Type: "backup_start",
Content: agentDrive,
})
Expand All @@ -56,9 +56,8 @@ func MountHandler(storeInstance *store.Store) http.HandlerFunc {
return
}

listener := storeInstance.WSHub.Broadcast.Subscribe()
defer storeInstance.WSHub.Broadcast.CancelSubscription(listener)

listener, closeListener := storeInstance.WSHub.Subscribe()
defer closeListener()
respWait:
for {
select {
Expand All @@ -79,7 +78,7 @@ func MountHandler(storeInstance *store.Store) http.HandlerFunc {
}

if r.Method == http.MethodDelete {
_ = storeInstance.WSHub.SendCommand(targetHostname, websockets.Message{
_ = storeInstance.WSHub.SendToClient(targetHostname, websockets.Message{
Type: "backup_close",
Content: agentDrive,
})
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/controllers/plus/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ func WSHandler(storeInstance *store.Store) http.HandlerFunc {
return
}

storeInstance.WSHub.HandleClientConnection(w, r)
storeInstance.WSHub.ServeWS(w, r)
}
}
9 changes: 1 addition & 8 deletions internal/store/agent_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,5 @@ func (storeInstance *Store) AgentPing(agentTarget *Target) bool {
return false
}

storeInstance.WSHub.ClientsMux.RLock()
if client, ok := storeInstance.WSHub.Clients[agentHostname]; ok && client != nil {
storeInstance.WSHub.ClientsMux.RUnlock()
return true
}
storeInstance.WSHub.ClientsMux.RUnlock()

return false
return storeInstance.WSHub.IsClientConnected(agentHostname)
}
113 changes: 0 additions & 113 deletions internal/websockets/message_broadcast.go

This file was deleted.

Loading

0 comments on commit 1992baa

Please sign in to comment.