Skip to content

Commit

Permalink
feat: add initial version of authorized WS chat
Browse files Browse the repository at this point in the history
# Conflicts:
#	cmd/server/router.go
  • Loading branch information
litsynp committed Jul 28, 2024
1 parent d516fe9 commit 12c0469
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 15 deletions.
39 changes: 24 additions & 15 deletions cmd/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (

"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"

"github.com/pet-sitter/pets-next-door-api/cmd/server/handler"
"github.com/pet-sitter/pets-next-door-api/internal/chat"
"github.com/pet-sitter/pets-next-door-api/internal/configs"
"github.com/pet-sitter/pets-next-door-api/internal/domain/auth"
s3infra "github.com/pet-sitter/pets-next-door-api/internal/infra/bucket"
"github.com/pet-sitter/pets-next-door-api/internal/infra/database"
kakaoinfra "github.com/pet-sitter/pets-next-door-api/internal/infra/kakao"
"github.com/pet-sitter/pets-next-door-api/internal/service"
"github.com/pet-sitter/pets-next-door-api/internal/wschat"
pndmiddleware "github.com/pet-sitter/pets-next-door-api/lib/middleware"
"github.com/rs/zerolog"
echoswagger "github.com/swaggo/echo-swagger"
Expand Down Expand Up @@ -56,7 +55,7 @@ func NewRouter(app *firebaseinfra.FirebaseApp) (*echo.Echo, error) {
breedService := service.NewBreedService(db)
sosPostService := service.NewSOSPostService(db)
conditionService := service.NewSOSConditionService(db)
chatService := service.NewChatService(db)
// chatService := service.NewChatService(db)

// Initialize handlers
authHandler := handler.NewAuthHandler(authService, kakaoinfra.NewKakaoDefaultClient())
Expand All @@ -66,14 +65,14 @@ func NewRouter(app *firebaseinfra.FirebaseApp) (*echo.Echo, error) {
sosPostHandler := handler.NewSOSPostHandler(*sosPostService, authService)
conditionHandler := handler.NewConditionHandler(*conditionService)

// InMemoryStateManager는 클라이언트와 채팅방의 상태를 메모리에 저장하고 관리합니다.
// 이 메서드는 단순하고 빠르며 테스트 목적으로 적합합니다.
// 전략 패턴을 사용하여 이 부분을 다른 상태 관리 구현체로 쉽게 교체할 수 있습니다.
stateManager := chat.NewInMemoryStateManager()
wsServer := chat.NewWebSocketServer(stateManager)
go wsServer.Run()
chat.InitializeWebSocketServer(ctx, wsServer, chatService)
chatHandler := handler.NewChatController(wsServer, stateManager, authService, *chatService)
// // InMemoryStateManager는 클라이언트와 채팅방의 상태를 메모리에 저장하고 관리합니다.
// // 이 메서드는 단순하고 빠르며 테스트 목적으로 적합합니다.
// // 전략 패턴을 사용하여 이 부분을 다른 상태 관리 구현체로 쉽게 교체할 수 있습니다.
// stateManager := chat.NewInMemoryStateManager()
// wsServer := chat.NewWebSocketServer(stateManager)
// go wsServer.Run()
// chat.InitializeWebSocketServer(ctx, wsServer, chatService)
// chatHandler := handler.NewChatController(wsServer, stateManager, authService, *chatService)

// RegisterChan middlewares
logger := zerolog.New(os.Stdout)
Expand All @@ -91,7 +90,7 @@ func NewRouter(app *firebaseinfra.FirebaseApp) (*echo.Echo, error) {
}))
e.Use(pndmiddleware.BuildAuthMiddleware(authService, auth.FirebaseAuthClientKey))

// RegisterChan routes
// Register routes
e.GET("/health", func(c echo.Context) error {
return c.JSON(http.StatusOK, map[string]string{"status": "ok"})
})
Expand Down Expand Up @@ -143,11 +142,21 @@ func NewRouter(app *firebaseinfra.FirebaseApp) (*echo.Echo, error) {
postAPIGroup.GET("/sos/conditions", conditionHandler.FindConditions)
}

// chatAPIGroup := apiRouteGroup.Group("/chat")
// {
// chatAPIGroup.GET("/ws", func(c echo.Context) error {
// return chatHandler.ServerWebsocket(c, c.Response().Writer, c.Request())
// })
// }

upgrader := wschat.NewDefaultUpgrader()
wsServerV2 := wschat.NewWSServer(upgrader, authService)

go wsServerV2.LoopOverClientMessages()

chatAPIGroup := apiRouteGroup.Group("/chat")
{
chatAPIGroup.GET("/ws", func(c echo.Context) error {
return chatHandler.ServerWebsocket(c, c.Response().Writer, c.Request())
})
chatAPIGroup.GET("/ws", wsServerV2.HandleConnections)
}

return e, nil
Expand Down
200 changes: 200 additions & 0 deletions internal/wschat/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package wschat

import (
"net/http"
"strconv"
"time"

"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/pet-sitter/pets-next-door-api/internal/service"
"github.com/rs/zerolog/log"
)

type WSServer struct {
// key: UserID, value: WSClient
clients map[int64]WSClient
broadcast chan MessageRequest
upgrader websocket.Upgrader

authService service.AuthService
}

func NewWSServer(
upgrader websocket.Upgrader,
authService service.AuthService,
) *WSServer {
return &WSServer{
clients: make(map[int64]WSClient),
broadcast: make(chan MessageRequest),
upgrader: upgrader,
authService: authService,
}
}

func NewDefaultUpgrader() websocket.Upgrader {
upgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool {
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

return upgrader
}

// Server-side WebSocket handler
func (s *WSServer) HandleConnections(
c echo.Context,
) error {
log.Info().Msg("Handling connections")

foundUser, err2 := s.authService.VerifyAuthAndGetUser(c.Request().Context(), c.Request().Header.Get("Authorization"))
if err2 != nil {
return c.JSON(err2.StatusCode, err2)
}
userID := foundUser.ID

conn, err := s.upgrader.Upgrade(c.Response().Writer, c.Request(), nil)
defer func() {
err2 := conn.Close()
if err2 != nil {
log.Error().Err(err2).Msg("Failed to close connection")
}
delete(s.clients, userID)
}()

if err != nil {
log.Error().Err(err).Msg("Failed to upgrade connection")
return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": err.Error()})
}

client := NewWSClient(conn, userID)
s.clients[userID] = client

for {
var msgReq MessageRequest
err := conn.ReadJSON(&msgReq)
msgReq.Sender = Sender{ID: userID}
if err != nil {
log.Error().Err(err).Msg("Failed to read message")
delete(s.clients, userID)

return c.JSON(http.StatusInternalServerError, map[string]interface{}{"error": err.Error()})
}

s.broadcast <- msgReq
}
}

// Broadcast messages to all clients
func (s *WSServer) LoopOverClientMessages() {
log.Info().Msg("Looping over client messages")

for {
msgReq := <-s.broadcast

for _, client := range s.clients {
log.Info().Msg(
"Message from user: " +
strconv.Itoa(int(client.userID)) +
" to user: " + strconv.Itoa(int(msgReq.Sender.ID)))

// Filter messages from the same user
if client.userID == msgReq.Sender.ID {
continue
}

// TODO: Check if the message is for the room
msg := NewPlainMessageResponse(msgReq.Sender, msgReq.Room, msgReq.Message, time.Now())

if err := client.WriteJSON(msg); err != nil {
// No way but to close the connection
log.Error().Err(err).Msg("Failed to write message")
err := client.Close()
if err != nil {
log.Error().Err(err).Msg("Failed to close connection")
delete(s.clients, client.userID)
return
}
delete(s.clients, client.userID)
return
}
}
}
}

type WSClient struct {
conn *websocket.Conn
userID int64
}

func NewWSClient(
conn *websocket.Conn,
userID int64,
) WSClient {
return WSClient{conn, userID}
}

func (c *WSClient) WriteJSON(v interface{}) error {
return c.conn.WriteJSON(v)
}

func (c *WSClient) Close() error {
return c.conn.Close()
}

type MessageRequest struct {
Sender Sender `json:"user"`
Room Room `json:"room"`
MessageType string `json:"messageType"`
Media *Media `json:"media,omitempty"`
Message string `json:"message"`
}

type MessageResponse struct {
Sender Sender `json:"user"`
Room Room `json:"room"`
MessageType string `json:"messageType"`
Media *Media `json:"media,omitempty"`
Message string `json:"message"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
}

type Sender struct {
ID int64 `json:"id"`
}

type Room struct {
ID int64 `json:"id"`
}

type Media struct {
ID int64 `json:"id"`
MediaType string `json:"type"`
URL string `json:"url"`
}

func NewPlainMessageResponse(sender Sender, room Room, message string, now time.Time) MessageResponse {
return MessageResponse{
Sender: sender,
Room: room,
MessageType: "plain",
Message: message,
CreatedAt: now.Format(time.RFC3339),
UpdatedAt: now.Format(time.RFC3339),
}
}

func NewMediaMessageResponse(sender Sender, room Room, media *Media, now time.Time) MessageResponse {
return MessageResponse{
Sender: sender,
Room: room,
MessageType: "media",
Media: media,
CreatedAt: now.Format(time.RFC3339),
UpdatedAt: now.Format(time.RFC3339),
}
}

0 comments on commit 12c0469

Please sign in to comment.