-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add initial version of authorized WS chat
- Loading branch information
Showing
2 changed files
with
195 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
package wschat | ||
|
||
import ( | ||
"net/http" | ||
"time" | ||
|
||
"github.com/gorilla/websocket" | ||
"github.com/labstack/echo/v4" | ||
"github.com/pet-sitter/pets-next-door-api/internal/service" | ||
"github.com/rs/zerolog/log" | ||
) | ||
|
||
type WSServer struct { | ||
// key: UserID, value: WSClient | ||
clients map[int64]WSClient | ||
broadcast chan Message | ||
upgrader websocket.Upgrader | ||
|
||
authService service.AuthService | ||
} | ||
|
||
type WSClient struct { | ||
conn *websocket.Conn | ||
userID int64 | ||
} | ||
|
||
func NewWSClient( | ||
conn *websocket.Conn, | ||
userID int64, | ||
) WSClient { | ||
return WSClient{conn, userID} | ||
} | ||
|
||
func (c *WSClient) WriteJSON(v interface{}) error { | ||
return c.conn.WriteJSON(v) | ||
} | ||
|
||
func (c *WSClient) Close() error { | ||
return c.conn.Close() | ||
} | ||
|
||
type Message struct { | ||
Sender Sender `json:"user"` | ||
Room Room `json:"room"` | ||
MessageType string `json:"messageType"` | ||
Media *Media `json:"media,omitempty"` | ||
Message string `json:"message"` | ||
CreatedAt string `json:"createdAt"` | ||
UpdatedAt string `json:"updatedAt"` | ||
} | ||
|
||
type Sender struct { | ||
ID int64 `json:"id"` | ||
} | ||
|
||
type Room struct { | ||
ID int64 `json:"id"` | ||
} | ||
|
||
type Media struct { | ||
ID int64 `json:"id"` | ||
MediaType string `json:"type"` | ||
URL string `json:"url"` | ||
} | ||
|
||
func NewPlainMessage(sender Sender, Room Room, message string, now time.Time) Message { | ||
return Message{ | ||
Sender: sender, | ||
Room: Room, | ||
MessageType: "plain", | ||
Message: message, | ||
CreatedAt: now.Format(time.RFC3339), | ||
UpdatedAt: now.Format(time.RFC3339), | ||
} | ||
} | ||
|
||
func NewMediaMessage(sender Sender, Room Room, media *Media, now time.Time) Message { | ||
return Message{ | ||
Sender: sender, | ||
Room: Room, | ||
MessageType: "media", | ||
Media: media, | ||
CreatedAt: now.Format(time.RFC3339), | ||
UpdatedAt: now.Format(time.RFC3339), | ||
} | ||
} | ||
|
||
func NewWSServer( | ||
upgrader websocket.Upgrader, | ||
authService service.AuthService, | ||
) *WSServer { | ||
return &WSServer{ | ||
clients: make(map[int64]WSClient), | ||
broadcast: make(chan Message), | ||
upgrader: upgrader, | ||
authService: authService, | ||
} | ||
} | ||
|
||
func NewDefaultUpgrader() websocket.Upgrader { | ||
upgrader := websocket.Upgrader{ | ||
CheckOrigin: func(r *http.Request) bool { | ||
return true | ||
}, | ||
ReadBufferSize: 1024, | ||
WriteBufferSize: 1024, | ||
} | ||
|
||
return upgrader | ||
} | ||
|
||
// Server-side WebSocket handler | ||
func (s *WSServer) HandleConnections( | ||
c echo.Context, | ||
) error { | ||
log.Info().Msg("Handling connections") | ||
|
||
foundUser, err2 := s.authService.VerifyAuthAndGetUser(c.Request().Context(), c.Request().Header.Get("Authorization")) | ||
if err2 != nil { | ||
return c.JSON(err2.StatusCode, err2) | ||
} | ||
userID := foundUser.ID | ||
|
||
conn, err := s.upgrader.Upgrade(c.Response().Writer, c.Request(), nil) | ||
defer conn.Close() | ||
if err != nil { | ||
log.Error().Err(err).Msg("Failed to upgrade connection") | ||
return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": err.Error()}) | ||
} | ||
client := NewWSClient(conn, userID) | ||
|
||
s.clients[userID] = client | ||
|
||
for { | ||
var msg Message | ||
err := conn.ReadJSON(&msg) | ||
if err != nil { | ||
log.Error().Err(err).Msg("Failed to read message") | ||
delete(s.clients, userID) | ||
return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": err.Error()}) | ||
} | ||
|
||
s.broadcast <- msg | ||
} | ||
} | ||
|
||
// Broadcast messages to all clients | ||
func (s *WSServer) LoopOverClientMessages() { | ||
log.Info().Msg("Looping over client messages") | ||
|
||
for { | ||
msg := <-s.broadcast | ||
|
||
for _, client := range s.clients { | ||
// Filter messages from the same user | ||
if client.userID == msg.Sender.ID { | ||
return | ||
} | ||
|
||
// TODO: Check if the message is for the room | ||
msg.Sender.ID = client.userID | ||
|
||
if err := client.WriteJSON(msg); err != nil { | ||
// No way but to close the connection | ||
log.Error().Err(err).Msg("Failed to write message") | ||
client.Close() | ||
delete(s.clients, client.userID) | ||
return | ||
} | ||
} | ||
} | ||
} |