Skip to content

Commit

Permalink
feat: add chatroom message send
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelgamito committed Nov 26, 2024
1 parent 5d85cfe commit 1dbc328
Show file tree
Hide file tree
Showing 20 changed files with 545 additions and 36 deletions.
26 changes: 22 additions & 4 deletions cmd/modules/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,27 @@ package modules

import (
"go-live-chat/internal/handlers"
usecase "go-live-chat/internal/usecase/chatroom"
"go-live-chat/internal/handlers/ws"
"go-live-chat/internal/infraestructure/databases"
"go-live-chat/internal/usecase"
usecase_chatroom "go-live-chat/internal/usecase/chatroom"
usecase_conversation "go-live-chat/internal/usecase/conversation"
"go.uber.org/fx"
)

func ConversationModule() fx.Option {
return fx.Provide(
func(chatroomRepositorySearch usecase.ChatroomRepositorySearch,
conversationsRepository usecase.ConversationsRepository,
rdb *databases.RedisClient) map[string]ws.ConversationUseCase {
return map[string]ws.ConversationUseCase{
"chatroom": usecase_conversation.NewChatroomConversationUseCase(chatroomRepositorySearch, conversationsRepository, rdb),
"user": usecase_conversation.NewUserConversationUseCase(rdb),
}
},
)
}

var (
handlersFactory = fx.Provide(
handlers.NewChatRoomHandler,
Expand All @@ -14,20 +31,21 @@ var (
handlers.NewActuatorHandler,
handlers.NewChatWebSocketHandler,
fx.Annotate(
usecase.NewCreateChatroomUseCase,
usecase_chatroom.NewCreateChatroomUseCase,
fx.As(new(handlers.CreateChatroomUseCase)),
),
fx.Annotate(
usecase.NewRetrieveChatroom,
usecase_chatroom.NewRetrieveChatroom,
fx.As(new(handlers.RetrieveChatroomUseCase)),
),
fx.Annotate(
usecase.NewUserManagementChatroomUseCase,
usecase_chatroom.NewUserManagementChatroomUseCase,
fx.As(new(handlers.UserManagementChatroomUseCase)),
),
)
HandlersModule = fx.Options(
handlersFactory,
ConversationModule(),
handlers.ModuleChatWebSocketHandler,
handlers.ModuleUserManagementHandler,
handlers.ModuleMessageHandler,
Expand Down
1 change: 1 addition & 0 deletions cmd/modules/repositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
var (
repositoriesFactory = fx.Provide(
repositories.NewChatroomRepository,
repositories.NewConversationsRepository,
)

RepositoriesModule = fx.Options(repositoriesFactory)
Expand Down
4 changes: 4 additions & 0 deletions cmd/modules/usecases.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ var (
fx.As(new(usecase.ChatroomRepositoryCreate)),
fx.As(new(usecase.ChatroomRepositorySearch)),
),
fx.Annotate(
repositories.NewConversationsRepository,
fx.As(new(usecase.ConversationsRepository)),
),
)
UseCaseModule = fx.Options(useCasesFactory)
)
25 changes: 22 additions & 3 deletions internal/handlers/chat_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,28 @@ package handlers

import (
"github.com/gorilla/websocket"
"go-live-chat/internal/handlers/dto"
"go-live-chat/internal/handlers/ws"
"go-live-chat/internal/infraestructure/databases"
"go-live-chat/internal/misc"
"go.uber.org/fx"
"log"
"net/http"
)

type ChatWebSocketHandler struct {
redisClient *databases.RedisClient
useCases map[string]ws.ConversationUseCase
}

func NewChatWebSocketHandler(redisClient *databases.RedisClient) *ChatWebSocketHandler {
func NewChatWebSocketHandler(
redisClient *databases.RedisClient,
useCases map[string]ws.ConversationUseCase,
) *ChatWebSocketHandler {

return &ChatWebSocketHandler{
redisClient: redisClient,
useCases: useCases,
}
}

Expand All @@ -36,14 +44,24 @@ func (h *ChatWebSocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

user := r.Header.Get("X-User-ID")
if user == "" {
http.Error(w, "Missing User ID", http.StatusUnauthorized)
errResp := dto.ErrorResponse{
Body: dto.ErrorBodyDTO{
Messages: []string{"Missing User ID"},
},
}
misc.WriteJSONResponse(w, http.StatusUnauthorized, errResp)
return
}

conn, err := upgrade.Upgrade(w, r, nil)
if err != nil {
log.Println("Error upgrading connection:", err)
http.Error(w, "Could not open WebSocket connection", http.StatusBadRequest)
errResp := dto.ErrorResponse{
Body: dto.ErrorBodyDTO{
Messages: []string{"Could not open WebSocket connection"},
},
}
misc.WriteJSONResponse(w, http.StatusUnauthorized, errResp)
return
}
log.Println("WebSocket connection established")
Expand All @@ -52,6 +70,7 @@ func (h *ChatWebSocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
Conn: conn,
Channel: user,
Rdb: h.redisClient.NotifyClientsRedis,
UseCase: h.useCases,
}

defer client.Close()
Expand Down
1 change: 1 addition & 0 deletions internal/handlers/chatroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (c *ChatRoomHandler) chatRoomDetails(w http.ResponseWriter, r *http.Request

if errResp != nil {
misc.WriteJSONResponse(w, errResp.StatusCode, errResp.Messages)
return
}

misc.WriteJSONResponse(w, http.StatusOK, dto.GetChatroomResponse(res))
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/chatroom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestCreateChatroom_UseCaseError(t *testing.T) {
createRequest := dto.CreateChatRoomRequestDTO{
Name: "Test Room",
}
mockUseCase.On("Execute", createRequest.ToModel(), mock.Anything).Return("", errors.New("use case error"))
mockUseCase.On("FindMembers", createRequest.ToModel(), mock.Anything).Return("", errors.New("use case error"))

body, _ := json.Marshal(createRequest)
req := httptest.NewRequest(http.MethodPost, "/api/chatrooms", bytes.NewReader(body))
Expand Down
4 changes: 4 additions & 0 deletions internal/handlers/dto/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ type (
ErrorBodyDTO struct {
Messages []string `json:"messages"`
}

ErrorBodyWsDTO struct {
Messages []string `json:"error"`
}
)

func (e *ErrorResponse) Error() string {
Expand Down
50 changes: 28 additions & 22 deletions internal/handlers/ws/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,49 @@ package ws
import (
"context"
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"github.com/redis/go-redis/v9"
"go-live-chat/internal/handlers/dto"
"slices"
"log"
"sync"
)

type ConversationWSUseCase struct {
Conn *websocket.Conn
Channel string
Rdb *redis.Client
Rdb RedisClient
Mu sync.Mutex
UseCase map[string]ConversationUseCase
}

var allowedTypes = []string{"user"}
func (c *ConversationWSUseCase) getUseCase(scenario string) ConversationUseCase {

uc, exists := c.UseCase[scenario]
if !exists {
return nil
}

return uc
}

func (c *ConversationWSUseCase) ListenAndForward(ctx context.Context) {
pubsub := c.Rdb.Subscribe(ctx, c.Channel)

defer func(pubsub *redis.PubSub) {
err := pubsub.Close()
if err != nil {
fmt.Println("pubsub close:", err)
log.Println("pubsub close:", err)
}
}(pubsub)

fmt.Printf("Client subscribed to Redis channel: %s\n", c.Channel)
log.Printf("Client subscribed to Redis channel: %s\n", c.Channel)

for msg := range pubsub.Channel() {
c.Mu.Lock()
err := c.Conn.WriteMessage(websocket.TextMessage, []byte(msg.Payload))
c.Mu.Unlock()
if err != nil {
fmt.Println("Error writing to WebSocket:", err)
log.Println("Error writing to WebSocket:", err)
break
}
}
Expand All @@ -50,25 +58,23 @@ func (c *ConversationWSUseCase) PublishFromWebSocket(ctx context.Context) {
err := c.Conn.ReadJSON(&message)
if err != nil {
if websocket.IsUnexpectedCloseError(err) {
fmt.Printf("WebSocket unexpectedly closed: %v\n", err)
log.Printf("WebSocket unexpectedly closed: %v\n", err)
}

}

if !slices.Contains(allowedTypes, message.Type) {
_ = c.Conn.WriteMessage(websocket.TextMessage, []byte("Not Allowed Type"))
}

if jsonData, err := json.Marshal(message); err == nil {
err = c.Rdb.Publish(ctx, message.Destination, jsonData).Err()
if err != nil {
fmt.Printf("Failed to publish to Redis channel %s: %v\n", message.Destination, err)
break
uc := c.getUseCase(message.Type)
if uc == nil {
errResp := dto.ErrorBodyDTO{
Messages: []string{"Not Allowed Type"},
}

fmt.Printf("Message published to channel %s: %s\n", message.Destination, message)
jsonData, _ := json.Marshal(errResp)
_ = c.Conn.WriteMessage(websocket.TextMessage, jsonData)
} else {
fmt.Printf("Failed to publish to Redis channel %s: %v\n", message.Destination, err)
members, _ := uc.FindMembers(message.Destination, ctx)
messagesPrepared := uc.PrepareMessage(members, message.Message, message.Type)
_ = uc.StoreMessage(messagesPrepared, ctx)
_ = uc.PublishMessage(messagesPrepared, ctx)
}

}
Expand All @@ -79,8 +85,8 @@ func (c *ConversationWSUseCase) Close() {
defer c.Mu.Unlock()
err := c.Conn.Close()
if err != nil {
fmt.Println("pubsub close:", err)
log.Println("pubsub close:", err)
return
}
fmt.Printf("Client disconnected from channel: %s\n", c.Channel)
log.Printf("Client disconnected from channel: %s\n", c.Channel)
}
26 changes: 26 additions & 0 deletions internal/handlers/ws/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ws

import (
"context"
"github.com/redis/go-redis/v9"
"go-live-chat/internal/model"
)

type ConversationUseCase interface {
FindMembers(input interface{}, ctx context.Context) ([]model.Member, *model.Error)
StoreMessage(messages []model.Message, ctx context.Context) *model.Error
PublishMessage(messages []model.Message, ctx context.Context) *model.Error
PrepareMessage(
members []model.Member,
message string,
from string) []model.Message
}

type RetrieveChatroomUseCase interface {
ExecuteById(id string, ctx context.Context) (*model.Chatroom, *model.Error)
}

type RedisClient interface {
Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
Subscribe(ctx context.Context, channels ...string) *redis.PubSub
}
10 changes: 9 additions & 1 deletion internal/infraestructure/databases/redis.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package databases

import (
"context"
"github.com/redis/go-redis/v9"
"go-live-chat/internal/configs"
)

type RedisClientInterface interface {
Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
Ping(ctx context.Context) *redis.StatusCmd
Process(ctx context.Context, cmd redis.Cmder) error
Subscribe(ctx context.Context, channels ...string) *redis.PubSub
}

type RedisClient struct {
NotifyClientsRedis *redis.Client
NotifyClientsRedis RedisClientInterface
}

func NewRedisClient(config *configs.Config) *RedisClient {
Expand Down
11 changes: 11 additions & 0 deletions internal/model/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package model

import "go.mongodb.org/mongo-driver/bson/primitive"

type Message struct {
Id primitive.ObjectID `json:"id" bson:"_id"`
From string `json:"from" bson:"from"`
To string `json:"to" bson:"to"`
Content string `json:"content" bson:"content"`
Type string `json:"type" bson:"type"`
}
5 changes: 0 additions & 5 deletions internal/repositories/chatroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"go-live-chat/internal/configs"
"go-live-chat/internal/infraestructure/databases"
"go-live-chat/internal/infraestructure/wrappers"
"go-live-chat/internal/model"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand All @@ -13,10 +12,6 @@ import (
"time"
)

type MongoClientInterface interface {
Database(name string, opts ...*options.DatabaseOptions) wrappers.MongoDatabaseInterface
}

type ChatroomRepository struct {
client MongoClientInterface
config *configs.Config
Expand Down
32 changes: 32 additions & 0 deletions internal/repositories/conversations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package repositories

import (
"go-live-chat/internal/configs"
"go-live-chat/internal/infraestructure/databases"
"go-live-chat/internal/model"
)

type ConversationsRepository struct {
client MongoClientInterface
config *configs.Config
}

func NewConversationsRepository(client *databases.MongoDBConnections, config *configs.Config) *ConversationsRepository {
return &ConversationsRepository{client: client.OpenChat, config: config}
}

func (c *ConversationsRepository) RetrieveChatroomHistory(chatroomId string, page int, pageSize int) ([]model.Message, error) {

return nil, nil
}

func (c *ConversationsRepository) SaveMessageToUser(message model.Message) error {
return nil
}

func (c *ConversationsRepository) BatchSaveMessage(messages []model.Message) error {
return nil
}
func (c *ConversationsRepository) RetrieveLatestMessageFromUser(userId string) (*model.Message, error) {
return nil, nil
}
10 changes: 10 additions & 0 deletions internal/repositories/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package repositories

import (
"go-live-chat/internal/infraestructure/wrappers"
"go.mongodb.org/mongo-driver/mongo/options"
)

type MongoClientInterface interface {
Database(name string, opts ...*options.DatabaseOptions) wrappers.MongoDatabaseInterface
}
Loading

0 comments on commit 1dbc328

Please sign in to comment.