Skip to content

Commit

Permalink
Feat(Chat): 채팅 메시지 발송 Redis Pub/Sub 구현
Browse files Browse the repository at this point in the history
  • Loading branch information
waterfogSW committed Dec 29, 2024
1 parent 2ca8b89 commit e23df26
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import com.threedays.domain.chat.entity.Message

interface ReceiveMessage {

fun invoke(command: Message)
fun invoke(command: Command)

data class Command(
val message: Message
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import com.threedays.domain.chat.entity.Message

interface MessageProducer {

fun produce(message: Message)
fun produceSendEvent(message: Message)

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
package com.threedays.application.chat.service

import com.threedays.application.chat.port.inbound.ReceiveMessage
import com.threedays.domain.chat.entity.Message
import com.threedays.domain.chat.repository.MessageRepository
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.springframework.stereotype.Service

@Service
class ReceiveMessageService(
private val messageRepository: MessageRepository,
) : ReceiveMessage {
class ReceiveMessageService() : ReceiveMessage {

override fun invoke(command: Message) {
messageRepository.save(command)
companion object {

private val logger = KotlinLogging.logger { }
}

private val scope = CoroutineScope(Dispatchers.IO)

override fun invoke(command: ReceiveMessage.Command) {
scope.launch(exceptionHandler) {
logger.info { "Received message: $command" }
// TODO Implement message processing
}
}

private val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
logger.error(throwable) { "An error occurred while processing the message" }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class SendMessageService(
Message.createCardMessage(channelId, sender, messageContent)
}

messageProducer.produce(message)
messageProducer.produceSendEvent(message)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ data class Message(
}
}

fun getPartitionKey(): String {
return channelId.value.toString()
}

fun markAsRead(): Message {
require(status == Status.SENT) { "메시지가 전송된 상태에서만 읽음 처리가 가능합니다" }
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.threedays.redis.chat.adapter

import com.threedays.application.chat.port.outbound.MessageProducer
import com.threedays.domain.chat.entity.Message
import com.threedays.redis.chat.event.MessageRedisEvent
import com.threedays.redis.chat.event.MessageRedisEvent.Sent.Companion.toSentEvent
import io.github.oshai.kotlinlogging.KotlinLogging
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.stereotype.Component

@Component
class MessageProducerRedisAdapter(
private val redisTemplate: RedisTemplate<String, Any>
) : MessageProducer {

companion object {

private val logger = KotlinLogging.logger {}
}

override fun produceSendEvent(message: Message) {
logger.debug { "Producing message sent event to redis: $message" }

val messageSentEvent: MessageRedisEvent.Sent = message.toSentEvent()
redisTemplate.convertAndSend(messageSentEvent.getPartitionKey(), messageSentEvent)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.threedays.redis.chat.adapter

import com.fasterxml.jackson.databind.ObjectMapper
import com.threedays.application.chat.port.inbound.ReceiveMessage
import com.threedays.redis.chat.event.MessageRedisEvent
import io.github.oshai.kotlinlogging.KotlinLogging
import org.springframework.data.redis.connection.Message
import org.springframework.data.redis.connection.MessageListener
import org.springframework.stereotype.Component

@Component
class MessageSubscriberRedisAdapter(
private val receiveMessage: ReceiveMessage,
private val objectMapper: ObjectMapper,
) : MessageListener {

companion object {

private val logger = KotlinLogging.logger {}
}


override fun onMessage(
message: Message,
pattern: ByteArray?
) {
try {
val channel = String(message.channel)
val body = String(message.body)
logger.debug { "Received message from channel '$channel': $body" }

val event: MessageRedisEvent.Sent = objectMapper.readValue(body, MessageRedisEvent.Sent::class.java)
handleEvent(event)
} catch (e: Exception) {
logger.error(e) { "Failed to process Redis message" }
}
}

private fun handleEvent(event: MessageRedisEvent.Sent) {
logger.info { "Handling received event: $event" }
val command = ReceiveMessage.Command(event.message)
receiveMessage.invoke(command)
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.threedays.redis.chat.event

import com.threedays.domain.chat.entity.Message

sealed class MessageRedisEvent {

data class Sent(val message: Message) {
companion object {
fun Message.toSentEvent() = Sent(this)
}

fun getPartitionKey() = message.getPartitionKey()
}



}

0 comments on commit e23df26

Please sign in to comment.