Skip to content

Commit

Permalink
refactor : heartbeat설정 추가
Browse files Browse the repository at this point in the history
  • Loading branch information
HandmadeCloud committed Jan 30, 2024
1 parent a046851 commit 314cd8d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@ConfigurationPropertiesScan("com.programmers.bucketback.global.config.security.jwt")
@EnableScheduling
public class BucketBackApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.programmers.bucketback.domains.sse;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

Expand All @@ -12,18 +15,37 @@
@Component
public class SseEmitters {

private static final Long DEFAULT_TIMEOUT = 25 * 1000L; //25초
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
private static final Long DEFAULT_TIMEOUT = 60 * 1000L; // 60초
private final Map<Long, Map<Long, SseEmitter>> emitters = new ConcurrentHashMap<>();

public SseEmitter add(final Long receiverId) {
SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
this.emitters.put(receiverId, sseEmitter);
UUID uuid = UUID.randomUUID();

if (emitters.get(receiverId) == null) {
emitters.put(receiverId, new ConcurrentHashMap<>());
}

Map<Long, SseEmitter> longSseEmitterMap = this.emitters.get(receiverId);
longSseEmitterMap.put(uuid.getMostSignificantBits(), sseEmitter);

sseEmitter.onTimeout(sseEmitter::complete);
sseEmitter.onCompletion(() -> this.emitters.remove(receiverId));
sseEmitter.onError((e) -> {
log.error("error with emitter");
this.emitters.remove(receiverId);
});
sseEmitter.onError((e) -> this.emitters.remove(receiverId));
sseEmitter.onCompletion(
() -> {
log.error("SSE 연결이 종료되었습니다. userId : {}", receiverId);
Map<Long, SseEmitter> emitterMap = this.emitters.get(receiverId);
emitterMap.remove(uuid.getMostSignificantBits());
}
);
sseEmitter.onError(
(e) -> {
log.error("SSE 연결이 종료되었습니다. userId : {}", receiverId);
Map<Long, SseEmitter> emitterMap = this.emitters.get(receiverId);
emitterMap.remove(uuid.getMostSignificantBits());
}
);

return sseEmitter;
}
Expand All @@ -32,17 +54,33 @@ public void send(
final Long receiverId,
final Object data
) {
SseEmitter sseEmitter = emitters.get(receiverId);
if (sseEmitter == null) {
Map<Long, SseEmitter> sseEmitterMap = emitters.get(receiverId);
if (sseEmitterMap == null) {
log.warn("SSE를 구독하지 않은 유저입니다. userId : {}", receiverId);
return;
}
sseEmitterMap.forEach(
(id, emitter) -> {
try {
emitter.send(data);
log.info("알람을 보냈습니다. userId : {}", receiverId);
} catch (Exception e) {
log.warn("알람을 보내는 과정에서 오류가 발생했습니다.");
}
}
);
}

try {
sseEmitter.send(data);
log.info("알람을 보냈습니다. userId : {}", receiverId);
} catch (Exception e) {
log.warn("알람을 보내는 과정에서 오류가 발생했습니다.");
}
@Scheduled(fixedRate = 45 * 1000) // 45초 간격
public void sendHeartbeat() {
emitters.forEach((id, sseEmitterMap) -> {
sseEmitterMap.forEach((emitterId, emitter) -> {
try {
emitter.send(SseEmitter.event().comment("heartbeat"));
log.info("heartbeat을 보냈습니다. userId : {}", id);
} catch (IOException e) {
emitter.complete();
}
});
});
}
}

0 comments on commit 314cd8d

Please sign in to comment.