Skip to content

Commit

Permalink
Merge pull request #197 from TRIP-Side-Project/feature/#196-sse-notif…
Browse files Browse the repository at this point in the history
…ication-refactor

Refactor: SSE 알림 리팩토링
  • Loading branch information
kwondongwook authored Jan 4, 2024
2 parents aa2d216 + d5b7013 commit 1fa781f
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.web.client.HttpClientErrorException;

import java.util.List;
import java.util.Set;

@RequiredArgsConstructor
@Component
Expand Down Expand Up @@ -55,8 +56,8 @@ public void updateData()
List<CreateItemRequest> createItemRequests = naverApiService.toCreateItemRequest(shoppingItems);
for (CreateItemRequest createItemRequest : createItemRequests) {
Item item = itemService.createItem(createItemRequest);
notificationService.createNotification(item, createItemRequest.getTagNames());
sseEmitterMap.sendToAll("notification",new SseNotificationResponse(item.getId(), createItemRequest.getTagNames()));
Set<Long> memberIds = notificationService.createNotifications(item, createItemRequest.getTagNames());
sseEmitterMap.send(memberIds,"notification",new SseNotificationResponse(item.getId(), createItemRequest.getTagNames()));
/**
*
* 알림이 가져야할 데이터가 itemId, memberId
Expand Down
37 changes: 20 additions & 17 deletions src/main/java/com/api/trip/common/sse/emitter/SseEmitterMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -15,40 +16,42 @@
@Slf4j
public class SseEmitterMap {

private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
private final Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

public void put(String email, SseEmitter sseEmitter) {
sseEmitter.onCompletion(() -> remove(email));
public void put(Long memberId, SseEmitter sseEmitter) {
sseEmitter.onCompletion(() -> remove(memberId));
sseEmitter.onTimeout(sseEmitter::complete);
sseEmitterMap.put(email, sseEmitter);
log.info("connected with {}, the number of connections is {}", email, sseEmitterMap.size());
sseEmitterMap.put(memberId, sseEmitter);
log.info("connected with {}, the number of connections is {}", memberId, sseEmitterMap.size());
}

public void remove(String email) {
sseEmitterMap.remove(email);
log.info("disconnected with {}, the number of connections is {}", email, sseEmitterMap.size());
public void remove(Long memberId) {
sseEmitterMap.remove(memberId);
log.info("disconnected with {}, the number of connections is {}", memberId, sseEmitterMap.size());
}

public void send(String email, String eventName, Object eventData) {
SseEmitter sseEmitter = sseEmitterMap.get(email);
public void send(Long memberId, String eventName, Object eventData) {
SseEmitter sseEmitter = sseEmitterMap.get(memberId);
try {
sseEmitter.send(
event()
.name(eventName)
.data(eventData)
);
} catch (IOException | IllegalStateException e) {
remove(email);
remove(memberId);
}
}

public void sendToAll(String eventName, Object eventData) {
public void send(Collection<Long> memberIds, String eventName, Object eventData) {
SseEventBuilder sseEventBuilder = event().name(eventName).data(eventData);
sseEmitterMap.forEach((email, sseEmitter) -> {
try {
sseEmitter.send(sseEventBuilder);
} catch (IOException | IllegalStateException e) {
remove(email);
sseEmitterMap.forEach((memberId, sseEmitter) -> {
if (memberIds.contains(memberId)) {
try {
sseEmitter.send(sseEventBuilder);
} catch (IOException | IllegalStateException e) {
remove(memberId);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import com.api.trip.domain.member.model.Member;
import com.api.trip.domain.tag.model.Tag;
import com.api.trip.domain.tag.repository.TagRepository;
import com.api.trip.domain.tag.service.TagService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Service
@Slf4j
Expand Down Expand Up @@ -45,8 +46,8 @@ public List<String> getInterestTag(Member member) {
}

@Transactional(readOnly = true)
public List<Member> getMemberByTags(List<String> tagNames){
public Set<Member> getMembersByTagNames(List<String> tagNames) {
List<InterestTag> interestTags = interestTagRepository.findInterestTagsByTagNames(tagNames);
return interestTags.stream().map(InterestTag::getMember).toList();
return interestTags.stream().map(InterestTag::getMember).collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.api.trip.domain.notification.controller;

import com.api.trip.common.exception.CustomException;
import com.api.trip.common.exception.ErrorCode;
import com.api.trip.common.sse.emitter.SseEmitterMap;
import com.api.trip.domain.member.model.Member;
import com.api.trip.domain.member.repository.MemberRepository;
import com.api.trip.domain.notification.controller.dto.DeleteNotificationRequest;
import com.api.trip.domain.notification.controller.dto.GetMyNotificationsResponse;
import com.api.trip.domain.notification.controller.dto.ReadNotificationRequest;
Expand All @@ -19,15 +23,19 @@
@RequiredArgsConstructor
public class NotificationController {

private final MemberRepository memberRepository;
private final NotificationService notificationService;
private final SseEmitterMap sseEmitterMap;

@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> connect() {
String email = SecurityContextHolder.getContext().getAuthentication().getName();
Member member = memberRepository.findByEmail(email)
.orElseThrow(() -> new CustomException(ErrorCode.UNAUTHORIZED));

SseEmitter sseEmitter = new SseEmitter(3600000L);
sseEmitterMap.put(email, sseEmitter);
sseEmitterMap.send(email, "connect", LocalDateTime.now());
sseEmitterMap.put(member.getId(), sseEmitter);
sseEmitterMap.send(member.getId(), "connect", LocalDateTime.now());
return ResponseEntity.ok(sseEmitter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Service
@Transactional
Expand All @@ -27,8 +29,8 @@ public class NotificationService {
private final NotificationRepository notificationRepository;
private final InterestTagService interestTagService;

public void createNotification(Item item, List<String> tagNames) {
List<Member> receivers = interestTagService.getMemberByTags(tagNames);
public Set<Long> createNotifications(Item item, List<String> tagNames) {
Set<Member> receivers = interestTagService.getMembersByTagNames(tagNames);

receivers.forEach(member -> {
notificationRepository.save(
Expand All @@ -38,6 +40,8 @@ public void createNotification(Item item, List<String> tagNames) {
.build()
);
});

return receivers.stream().map(Member::getId).collect(Collectors.toSet());
}

@Transactional(readOnly = true)
Expand Down

0 comments on commit 1fa781f

Please sign in to comment.