From e23df269d24676f5cf5f7101c8bbea2ce12070a1 Mon Sep 17 00:00:00 2001 From: San Kim Date: Sun, 29 Dec 2024 14:35:47 +0900 Subject: [PATCH] =?UTF-8?q?Feat(Chat):=20=EC=B1=84=ED=8C=85=20=EB=A9=94?= =?UTF-8?q?=EC=8B=9C=EC=A7=80=20=EB=B0=9C=EC=86=A1=20Redis=20Pub/Sub=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chat/port/inbound/ReceiveMessage.kt | 6 ++- .../chat/port/outbound/MessageProducer.kt | 2 +- .../chat/service/ReceiveMessageService.kt | 28 ++++++++--- .../chat/service/SendMessageService.kt | 2 +- .../threedays/domain/chat/entity/Message.kt | 3 ++ .../redis/chat/MessageProducerRedisAdapter.kt | 21 --------- .../adapter/MessageProducerRedisAdapter.kt | 28 +++++++++++ .../adapter/MessageSubscriberRedisAdapter.kt | 46 +++++++++++++++++++ .../redis/chat/event/MessageRedisEvent.kt | 17 +++++++ 9 files changed, 122 insertions(+), 31 deletions(-) delete mode 100644 infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/MessageProducerRedisAdapter.kt create mode 100644 infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/adapter/MessageProducerRedisAdapter.kt create mode 100644 infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/adapter/MessageSubscriberRedisAdapter.kt create mode 100644 infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/event/MessageRedisEvent.kt diff --git a/application/src/main/kotlin/com/threedays/application/chat/port/inbound/ReceiveMessage.kt b/application/src/main/kotlin/com/threedays/application/chat/port/inbound/ReceiveMessage.kt index ed78121..781b859 100644 --- a/application/src/main/kotlin/com/threedays/application/chat/port/inbound/ReceiveMessage.kt +++ b/application/src/main/kotlin/com/threedays/application/chat/port/inbound/ReceiveMessage.kt @@ -4,6 +4,10 @@ import com.threedays.domain.chat.entity.Message interface ReceiveMessage { - fun invoke(command: Message) + fun invoke(command: Command) + + data class Command( + val message: Message + ) } diff --git a/application/src/main/kotlin/com/threedays/application/chat/port/outbound/MessageProducer.kt b/application/src/main/kotlin/com/threedays/application/chat/port/outbound/MessageProducer.kt index 9163a95..305c84a 100644 --- a/application/src/main/kotlin/com/threedays/application/chat/port/outbound/MessageProducer.kt +++ b/application/src/main/kotlin/com/threedays/application/chat/port/outbound/MessageProducer.kt @@ -4,6 +4,6 @@ import com.threedays.domain.chat.entity.Message interface MessageProducer { - fun produce(message: Message) + fun produceSendEvent(message: Message) } diff --git a/application/src/main/kotlin/com/threedays/application/chat/service/ReceiveMessageService.kt b/application/src/main/kotlin/com/threedays/application/chat/service/ReceiveMessageService.kt index eca45ff..0ffdf70 100644 --- a/application/src/main/kotlin/com/threedays/application/chat/service/ReceiveMessageService.kt +++ b/application/src/main/kotlin/com/threedays/application/chat/service/ReceiveMessageService.kt @@ -1,18 +1,32 @@ package com.threedays.application.chat.service import com.threedays.application.chat.port.inbound.ReceiveMessage -import com.threedays.domain.chat.entity.Message -import com.threedays.domain.chat.repository.MessageRepository +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.springframework.stereotype.Service @Service -class ReceiveMessageService( - private val messageRepository: MessageRepository, -) : ReceiveMessage { +class ReceiveMessageService() : ReceiveMessage { - override fun invoke(command: Message) { - messageRepository.save(command) + companion object { + + private val logger = KotlinLogging.logger { } } + private val scope = CoroutineScope(Dispatchers.IO) + + override fun invoke(command: ReceiveMessage.Command) { + scope.launch(exceptionHandler) { + logger.info { "Received message: $command" } + // TODO Implement message processing + } + } + + private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> + logger.error(throwable) { "An error occurred while processing the message" } + } } diff --git a/application/src/main/kotlin/com/threedays/application/chat/service/SendMessageService.kt b/application/src/main/kotlin/com/threedays/application/chat/service/SendMessageService.kt index 86ee253..368b41e 100644 --- a/application/src/main/kotlin/com/threedays/application/chat/service/SendMessageService.kt +++ b/application/src/main/kotlin/com/threedays/application/chat/service/SendMessageService.kt @@ -26,7 +26,7 @@ class SendMessageService( Message.createCardMessage(channelId, sender, messageContent) } - messageProducer.produce(message) + messageProducer.produceSendEvent(message) } diff --git a/domain/src/main/kotlin/com/threedays/domain/chat/entity/Message.kt b/domain/src/main/kotlin/com/threedays/domain/chat/entity/Message.kt index 8e980e3..38fb09a 100644 --- a/domain/src/main/kotlin/com/threedays/domain/chat/entity/Message.kt +++ b/domain/src/main/kotlin/com/threedays/domain/chat/entity/Message.kt @@ -70,6 +70,9 @@ data class Message( } } + fun getPartitionKey(): String { + return channelId.value.toString() + } fun markAsRead(): Message { require(status == Status.SENT) { "메시지가 전송된 상태에서만 읽음 처리가 가능합니다" } diff --git a/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/MessageProducerRedisAdapter.kt b/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/MessageProducerRedisAdapter.kt deleted file mode 100644 index fa00d7c..0000000 --- a/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/MessageProducerRedisAdapter.kt +++ /dev/null @@ -1,21 +0,0 @@ -package com.threedays.redis.chat - -import com.threedays.application.chat.port.outbound.MessageProducer -import com.threedays.domain.chat.entity.Message -import io.github.oshai.kotlinlogging.KotlinLogging -import org.springframework.stereotype.Component - -@Component -class MessageProducerRedisAdapter() : MessageProducer { - - companion object { - - private val logger = KotlinLogging.logger {} - } - - override fun produce(message: Message) { - // TODO: Implement this method - logger.info { "Producing message: $message" } - } - -} diff --git a/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/adapter/MessageProducerRedisAdapter.kt b/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/adapter/MessageProducerRedisAdapter.kt new file mode 100644 index 0000000..f38c2ab --- /dev/null +++ b/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/adapter/MessageProducerRedisAdapter.kt @@ -0,0 +1,28 @@ +package com.threedays.redis.chat.adapter + +import com.threedays.application.chat.port.outbound.MessageProducer +import com.threedays.domain.chat.entity.Message +import com.threedays.redis.chat.event.MessageRedisEvent +import com.threedays.redis.chat.event.MessageRedisEvent.Sent.Companion.toSentEvent +import io.github.oshai.kotlinlogging.KotlinLogging +import org.springframework.data.redis.core.RedisTemplate +import org.springframework.stereotype.Component + +@Component +class MessageProducerRedisAdapter( + private val redisTemplate: RedisTemplate +) : MessageProducer { + + companion object { + + private val logger = KotlinLogging.logger {} + } + + override fun produceSendEvent(message: Message) { + logger.debug { "Producing message sent event to redis: $message" } + + val messageSentEvent: MessageRedisEvent.Sent = message.toSentEvent() + redisTemplate.convertAndSend(messageSentEvent.getPartitionKey(), messageSentEvent) + } + +} diff --git a/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/adapter/MessageSubscriberRedisAdapter.kt b/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/adapter/MessageSubscriberRedisAdapter.kt new file mode 100644 index 0000000..765ebd8 --- /dev/null +++ b/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/adapter/MessageSubscriberRedisAdapter.kt @@ -0,0 +1,46 @@ +package com.threedays.redis.chat.adapter + +import com.fasterxml.jackson.databind.ObjectMapper +import com.threedays.application.chat.port.inbound.ReceiveMessage +import com.threedays.redis.chat.event.MessageRedisEvent +import io.github.oshai.kotlinlogging.KotlinLogging +import org.springframework.data.redis.connection.Message +import org.springframework.data.redis.connection.MessageListener +import org.springframework.stereotype.Component + +@Component +class MessageSubscriberRedisAdapter( + private val receiveMessage: ReceiveMessage, + private val objectMapper: ObjectMapper, +) : MessageListener { + + companion object { + + private val logger = KotlinLogging.logger {} + } + + + override fun onMessage( + message: Message, + pattern: ByteArray? + ) { + try { + val channel = String(message.channel) + val body = String(message.body) + logger.debug { "Received message from channel '$channel': $body" } + + val event: MessageRedisEvent.Sent = objectMapper.readValue(body, MessageRedisEvent.Sent::class.java) + handleEvent(event) + } catch (e: Exception) { + logger.error(e) { "Failed to process Redis message" } + } + } + + private fun handleEvent(event: MessageRedisEvent.Sent) { + logger.info { "Handling received event: $event" } + val command = ReceiveMessage.Command(event.message) + receiveMessage.invoke(command) + } + + +} diff --git a/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/event/MessageRedisEvent.kt b/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/event/MessageRedisEvent.kt new file mode 100644 index 0000000..ba8bc08 --- /dev/null +++ b/infrastructure/redis/src/main/kotlin/com/threedays/redis/chat/event/MessageRedisEvent.kt @@ -0,0 +1,17 @@ +package com.threedays.redis.chat.event + +import com.threedays.domain.chat.entity.Message + +sealed class MessageRedisEvent { + + data class Sent(val message: Message) { + companion object { + fun Message.toSentEvent() = Sent(this) + } + + fun getPartitionKey() = message.getPartitionKey() + } + + + +}