-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathNotificationSseService.java
More file actions
88 lines (70 loc) · 2.49 KB
/
NotificationSseService.java
File metadata and controls
88 lines (70 loc) · 2.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.ongil.backend.domain.notification.service;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.ongil.backend.domain.notification.dto.response.NotificationResponse;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class NotificationSseService {
// 유저별 SSE 연결 관리
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
private static final Long SSE_TIMEOUT = 30 * 60 * 1000L; // 30분
// SSE 구독 생성
public SseEmitter subscribe(Long userId) {
// 기존 연결이 있으면 제거 (원자적 연산)
SseEmitter existingEmitter = emitters.remove(userId);
if (existingEmitter != null) {
existingEmitter.complete();
}
SseEmitter emitter = new SseEmitter(SSE_TIMEOUT);
emitters.put(userId, emitter);
// 연결 종료 시 제거 (value 지정으로 새로 등록된 emitter를 잘못 제거하는 race condition 방지)
emitter.onCompletion(() -> {
emitters.remove(userId, emitter);
log.info("SSE 연결 종료 - userId: {}", userId);
});
emitter.onTimeout(() -> {
emitters.remove(userId, emitter);
log.info("SSE 타임아웃 - userId: {}", userId);
});
emitter.onError(e -> {
emitters.remove(userId, emitter);
log.error("SSE 에러 - userId: {}", userId, e);
});
// 연결 직후 더미 이벤트 전송 (연결 확인용)
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("SSE 연결 성공"));
} catch (IOException e) {
emitters.remove(userId, emitter);
log.error("SSE 초기 이벤트 전송 실패 - userId: {}", userId, e);
}
log.info("SSE 연결 생성 - userId: {}", userId);
return emitter;
}
// 알림 전송
public void sendNotification(Long userId, NotificationResponse notification) {
SseEmitter emitter = emitters.get(userId);
if (emitter == null) {
log.debug("SSE 연결 없음 - userId: {}", userId);
return;
}
try {
emitter.send(SseEmitter.event()
.name("price-alert")
.data(notification));
log.info("SSE 알림 전송 성공 - userId: {}, notificationId: {}", userId, notification.getNotificationId());
} catch (IOException e) {
emitters.remove(userId, emitter);
log.error("SSE 알림 전송 실패 - userId: {}", userId, e);
}
}
// 현재 연결된 사용자 수 조회 (디버깅용)
public int getConnectedUserCount() {
return emitters.size();
}
}