Skip to content

Commit 4c534fd

Browse files
authored
fix: SSE emitter 관리 로직 개선 및 구독 안정성 향상 (#275)
* refactor: SSE 이벤트에서 수동 JSON 직렬화 제거 (#257) * refactor: emitter 정리 로직 개선 (중복 호출 제거, 메서드 분리, key 파싱 안정화) (#257) - findAllEmitters() 중복 호출 제거로 오버헤드 감소 - emitter 제거 조건 메서드로 분리해 가독성 및 재사용성 향상 - emitterId 파싱 시 split → lastIndexOf로 변경하여 userId 내 '_' 포함 대응 * test: isExpiredEmitterEntry 단위 테스트 추가 (#257) - 오래된 emitter 제거 여부 확인 - 최신 emitter는 유지되는지 확인 - 잘못된 키 형식 및 파싱 실패 케이스 처리 * refactor: EmitterRepository 인터페이스 분리 및 emitter 정리 통합 테스트 추가 (#257) * feat: 로그아웃 시 해당 사용자의 SSE emitter 제거 기능 추가 (#257) * fix: SSE 구독 엔드포인트에 produces 헤더 명시 (#257)
1 parent 9bed88c commit 4c534fd

File tree

11 files changed

+196
-46
lines changed

11 files changed

+196
-46
lines changed

src/main/java/com/moa/moa_server/domain/auth/service/AuthService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.moa.moa_server.domain.auth.handler.AuthErrorCode;
88
import com.moa.moa_server.domain.auth.handler.AuthException;
99
import com.moa.moa_server.domain.auth.service.strategy.OAuthLoginStrategy;
10+
import com.moa.moa_server.domain.notification.repository.NotificationEmitterRepository;
1011
import com.moa.moa_server.domain.user.entity.User;
1112
import java.util.Map;
1213
import java.util.Set;
@@ -31,6 +32,7 @@ public class AuthService {
3132
private final Map<String, OAuthLoginStrategy> strategies;
3233
private final JwtTokenService jwtTokenService;
3334
private final RefreshTokenService refreshTokenService;
35+
private final NotificationEmitterRepository notificationEmitterRepository;
3436

3537
public LoginResult login(String provider, String code, String redirectUri) {
3638
if (!OAuth.ProviderCode.isSupported(provider)) {
@@ -69,6 +71,8 @@ public TokenRefreshResponse refreshAccessToken(String refreshToken) {
6971
}
7072

7173
public boolean logout(Long userId) {
74+
notificationEmitterRepository.deleteAllByUserId(userId);
75+
log.info("[AuthService#logout] userId={} 로그아웃 - SSE emitters 제거 완료", userId);
7276
return refreshTokenService.deleteRefreshTokenByUserId(
7377
userId); // true면 SUCCESS, false면 ALREADY_LOGGED_OUT
7478
}

src/main/java/com/moa/moa_server/domain/global/util/JsonUtil.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

src/main/java/com/moa/moa_server/domain/notification/application/sse/NotificationSseBroadcaster.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.moa.moa_server.domain.notification.application.sse;
22

3-
import static com.moa.moa_server.domain.global.util.JsonUtil.toJson;
4-
53
import com.moa.moa_server.domain.notification.dto.NotificationItem;
64
import com.moa.moa_server.domain.notification.entity.Notification;
75
import com.moa.moa_server.domain.notification.repository.NotificationEmitterRepository;
@@ -18,12 +16,11 @@ public class NotificationSseBroadcaster {
1816
private final NotificationEmitterRepository emitterRepository;
1917

2018
public void send(Notification notification) {
21-
String payload = toJson(NotificationItem.from(notification));
2219
SseEmitter.SseEventBuilder event =
2320
SseEmitter.event()
2421
.id(String.valueOf(notification.getId()))
2522
.name("notification")
26-
.data(payload);
23+
.data(NotificationItem.from(notification));
2724

2825
emitterRepository
2926
.findAllByUserId(notification.getUser().getId())
Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,49 @@
11
package com.moa.moa_server.domain.notification.application.sse;
22

33
import com.moa.moa_server.domain.notification.config.SseProperties;
4-
import com.moa.moa_server.domain.notification.repository.NotificationEmitterRepository;
4+
import com.moa.moa_server.domain.notification.repository.EmitterRepository;
5+
import java.util.Map;
56
import lombok.RequiredArgsConstructor;
67
import lombok.extern.slf4j.Slf4j;
78
import org.springframework.scheduling.annotation.Scheduled;
89
import org.springframework.stereotype.Component;
10+
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
911

1012
/** 주기적으로 오래된 SSE Emitter를 정리하는 컴포넌트. */
1113
@Slf4j
1214
@Component
1315
@RequiredArgsConstructor
1416
public class NotificationSseCleaner {
1517

16-
private final NotificationEmitterRepository emitterRepository;
18+
private final EmitterRepository emitterRepository;
1719
private final SseProperties sseProperties;
1820

1921
@Scheduled(fixedRateString = "#{@sseProperties.staleCleanInterval}")
2022
public void cleanUpStaleEmitters() {
2123
long now = System.currentTimeMillis();
2224
long threshold = sseProperties.getStaleThreshold();
2325

24-
int before = emitterRepository.findAllEmitters().size();
25-
26-
emitterRepository
27-
.findAllEmitters()
28-
.entrySet()
29-
.removeIf(
30-
entry -> {
31-
String key = entry.getKey(); // user_timestamp
32-
String[] parts = key.split("_");
33-
if (parts.length != 2) return true;
34-
35-
try {
36-
long createdAt = Long.parseLong(parts[1]);
37-
return (now - createdAt) > threshold;
38-
} catch (NumberFormatException e) {
39-
return true; // 잘못된 형식의 키도 정리
40-
}
41-
});
26+
Map<String, SseEmitter> allEmitters = emitterRepository.findAllEmitters();
27+
int before = allEmitters.size();
28+
29+
allEmitters.entrySet().removeIf(entry -> isExpiredEmitterEntry(entry, now, threshold));
4230

4331
int after = emitterRepository.findAllEmitters().size();
4432
if (before != after) {
4533
log.info("[NotificationSseCleaner#cleanUpStaleEmitters] 만료된 emitter 정리: {}개", before - after);
4634
}
4735
}
36+
37+
boolean isExpiredEmitterEntry(Map.Entry<String, SseEmitter> entry, long now, long threshold) {
38+
String key = entry.getKey(); // userId_timestamp
39+
int idx = key.lastIndexOf('_');
40+
if (idx == -1 || idx == key.length() - 1) return true;
41+
42+
try {
43+
long createdAt = Long.parseLong(key.substring(idx + 1));
44+
return (now - createdAt) > threshold;
45+
} catch (NumberFormatException e) {
46+
return true; // 잘못된 형식의 키도 정리
47+
}
48+
}
4849
}

src/main/java/com/moa/moa_server/domain/notification/application/sse/NotificationSseConnectionHelper.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.moa.moa_server.domain.notification.application.sse;
22

3-
import static com.moa.moa_server.domain.global.util.JsonUtil.toJson;
4-
53
import com.moa.moa_server.domain.notification.dto.NotificationItem;
64
import com.moa.moa_server.domain.notification.entity.Notification;
75
import com.moa.moa_server.domain.notification.repository.NotificationEmitterRepository;
@@ -39,12 +37,11 @@ public void sendLostEvents(Long userId, String lastEventId, SseEmitter emitter)
3937

4038
for (Notification notification : lostEvents) {
4139
try {
42-
String payload = toJson(NotificationItem.from(notification));
4340
SseEmitter.SseEventBuilder event =
4441
SseEmitter.event()
4542
.id(String.valueOf(notification.getId()))
4643
.name("notification")
47-
.data(payload);
44+
.data(NotificationItem.from(notification));
4845
emitter.send(event);
4946
} catch (IOException e) {
5047
log.warn("초기 SSE 이벤트 전송 실패. emitter는 유지", e); // 초기 전송 실패는 일시적일 수 있으므로 무시하고 연결 유지

src/main/java/com/moa/moa_server/domain/notification/controller/NotificationController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import jakarta.servlet.http.HttpServletRequest;
1212
import jakarta.servlet.http.HttpServletResponse;
1313
import lombok.RequiredArgsConstructor;
14+
import org.springframework.http.MediaType;
1415
import org.springframework.http.ResponseEntity;
1516
import org.springframework.security.core.annotation.AuthenticationPrincipal;
1617
import org.springframework.web.bind.annotation.*;
@@ -45,7 +46,7 @@ public ResponseEntity<ApiResponse<NotificationReadResponse>> readNotification(
4546
}
4647

4748
@Operation(summary = "알림 SSE", description = "알림 SSE 커넥션을 열고, SseEmitter를 반환합니다.")
48-
@GetMapping("/subscribe")
49+
@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
4950
public SseEmitter subscribe(
5051
@AuthenticationPrincipal Long userId,
5152
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId,
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.moa.moa_server.domain.notification.repository;
2+
3+
import java.util.Map;
4+
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
5+
6+
public interface EmitterRepository {
7+
void save(String id, SseEmitter emitter);
8+
9+
void deleteById(String id);
10+
11+
Map<String, SseEmitter> findAllEmitters();
12+
13+
public Map<String, SseEmitter> findAllByUserId(Long userId);
14+
}

src/main/java/com/moa/moa_server/domain/notification/repository/NotificationEmitterRepository.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
@Component
1414
@RequiredArgsConstructor
15-
public class NotificationEmitterRepository {
15+
public class NotificationEmitterRepository implements EmitterRepository {
1616

1717
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
1818

@@ -36,6 +36,10 @@ public Map<String, SseEmitter> findAllByUserId(Long userId) {
3636
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
3737
}
3838

39+
public void deleteAllByUserId(Long userId) {
40+
findAllByUserId(userId).keySet().forEach(this::deleteById);
41+
}
42+
3943
@Transactional(readOnly = true)
4044
public List<Notification> findCachedEventsAfter(Long userId, String lastEventId) {
4145
Long lastId = Long.parseLong(lastEventId);
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.moa.moa_server.domain.notification.application.sse;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.moa.moa_server.domain.notification.config.SseProperties;
6+
import com.moa.moa_server.domain.notification.repository.FakeEmitterRepository;
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.DisplayName;
9+
import org.junit.jupiter.api.Test;
10+
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
11+
12+
public class NotificationSseCleanerIntegrationTest {
13+
14+
FakeEmitterRepository fakeRepo;
15+
SseProperties props;
16+
NotificationSseCleaner cleaner;
17+
18+
@BeforeEach
19+
void setUp() {
20+
fakeRepo = new FakeEmitterRepository();
21+
22+
props = new SseProperties();
23+
props.setStaleThreshold(1000L); // 1초 넘으면 stale
24+
props.setStaleCleanInterval(5000L); // 무관
25+
26+
cleaner = new NotificationSseCleaner(fakeRepo, props);
27+
}
28+
29+
@Test
30+
@DisplayName("오래된 emitter는 cleanUpStaleEmitters 실행 시 제거됨")
31+
void testOldEmitterIsRemoved() {
32+
long now = System.currentTimeMillis();
33+
34+
String staleKey = "user1_" + (now - 5000);
35+
String validKey = "user2_" + now;
36+
37+
fakeRepo.save(staleKey, new SseEmitter());
38+
fakeRepo.save(validKey, new SseEmitter());
39+
40+
// when
41+
cleaner.cleanUpStaleEmitters();
42+
43+
// then
44+
assertThat(fakeRepo.contains(staleKey)).isFalse();
45+
assertThat(fakeRepo.contains(validKey)).isTrue();
46+
assertThat(fakeRepo.findAllEmitters()).hasSize(1);
47+
}
48+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.moa.moa_server.domain.notification.application.sse;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import java.util.AbstractMap;
6+
import java.util.Map;
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.DisplayName;
9+
import org.junit.jupiter.api.Test;
10+
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
11+
12+
/** NotificationSseCleaner의 emitter 제거 조건 판단 메서드(isExpiredEmitterEntry) 단위 테스트. */
13+
public class NotificationSseCleanerTest {
14+
15+
NotificationSseCleaner cleaner;
16+
17+
@BeforeEach
18+
void setUp() {
19+
cleaner = new NotificationSseCleaner(null, null); // emitterRepository, sseProperties 필요 없음
20+
}
21+
22+
@Test
23+
@DisplayName("오래된 emitter는 제거 (true 반환)")
24+
void testExpiredEmitterEntry_shouldReturnTrue_ifTimestampTooOld() {
25+
long now = System.currentTimeMillis();
26+
long threshold = 1000L;
27+
28+
String key = "123_" + (now - 2000); // 오래된 timestamp
29+
Map.Entry<String, SseEmitter> entry = new AbstractMap.SimpleEntry<>(key, new SseEmitter());
30+
31+
boolean result = cleaner.isExpiredEmitterEntry(entry, now, threshold);
32+
assertThat(result).isTrue();
33+
}
34+
35+
@Test
36+
@DisplayName("최신 emitter는 제거 대상 아님 (false 반환)")
37+
void testExpiredEmitterEntry_shouldReturnFalse_ifTimestampRecent() {
38+
long now = System.currentTimeMillis();
39+
long threshold = 5000L;
40+
41+
String key = "123_" + now; // 방금 생성됨
42+
Map.Entry<String, SseEmitter> entry = new AbstractMap.SimpleEntry<>(key, new SseEmitter());
43+
44+
boolean result = cleaner.isExpiredEmitterEntry(entry, now, threshold);
45+
assertThat(result).isFalse();
46+
}
47+
48+
@Test
49+
@DisplayName("잘못된 키 형식 또는 파싱 실패 시 제거 대상 (true 반환)")
50+
void testExpiredEmitterEntry_shouldReturnTrue_ifKeyMalformed() {
51+
Map.Entry<String, SseEmitter> entry1 =
52+
new AbstractMap.SimpleEntry<>("invalidKey", new SseEmitter());
53+
Map.Entry<String, SseEmitter> entry2 = new AbstractMap.SimpleEntry<>("123_", new SseEmitter());
54+
Map.Entry<String, SseEmitter> entry3 =
55+
new AbstractMap.SimpleEntry<>("123_456", new SseEmitter());
56+
57+
long now = System.currentTimeMillis();
58+
long threshold = 1000L;
59+
60+
assertThat(cleaner.isExpiredEmitterEntry(entry1, now, threshold)).isTrue();
61+
assertThat(cleaner.isExpiredEmitterEntry(entry2, now, threshold)).isTrue();
62+
assertThat(cleaner.isExpiredEmitterEntry(entry3, now, threshold)).isTrue();
63+
}
64+
}

0 commit comments

Comments
 (0)