Skip to content

Commit 321a7f5

Browse files
authored
Merge pull request #17 from Octa-Cloud/feat/mong-91/회원-탈퇴로-인한-서비스-전파-개발
Feat/mong 91/회원 탈퇴로 인한 서비스 전파 개발
2 parents 4d19600 + 6a95020 commit 321a7f5

File tree

12 files changed

+489
-7
lines changed

12 files changed

+489
-7
lines changed

build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ dependencies {
3333
// Kafka
3434
implementation 'org.springframework.kafka:spring-kafka'
3535

36+
//resilience4j
37+
implementation platform("io.github.resilience4j:resilience4j-bom:2.2.0") // 버전 한줄로 관리
38+
implementation "io.github.resilience4j:resilience4j-spring-boot3" // ★ 필수 (Registry/AutoConfig)
39+
implementation "org.springframework.boot:spring-boot-starter-aop" // 필수
40+
3641
// Swagger
3742
implementation 'org.springdoc:springdoc-openapi-starter-webmvc-ui:2.6.0'
3843

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,45 @@
11
package com.project.user.domain.application.usecase;
22

3+
import com.fasterxml.jackson.databind.ObjectMapper;
34
import com.project.user.domain.application.dto.request.UserDeletionRequest;
45
import com.project.user.domain.domain.entity.User;
56
import com.project.user.domain.domain.service.UserService;
67
import com.project.user.global.exception.RestApiException;
78
import com.project.user.global.security.TokenProvider;
9+
import com.project.user.domain.infra.kafka.ResilientSender;
810
import jakarta.transaction.Transactional;
911
import lombok.RequiredArgsConstructor;
1012
import org.mindrot.jbcrypt.BCrypt;
13+
import org.springframework.kafka.core.KafkaTemplate;
1114
import org.springframework.stereotype.Service;
1215

16+
import java.time.Duration;
17+
import java.util.UUID;
18+
19+
import static com.project.user.global.exception.code.status.AuthErrorStatus.*;
1320
import static com.project.user.global.exception.code.status.AuthErrorStatus.INVALID_ID_TOKEN;
1421
import static com.project.user.global.exception.code.status.AuthErrorStatus.INVALID_PASSWORD;
15-
22+
import lombok.extern.slf4j.Slf4j;
23+
@Slf4j
1624
@Service
1725
@Transactional
1826
@RequiredArgsConstructor
1927
public class DeleteAccountUseCase {
2028

2129
private final TokenProvider tokenProvider;
2230
private final UserService userService;
23-
31+
private final ObjectMapper om;
32+
private final ResilientSender sender;
33+
/**
34+
* 유저 삭제 흐름의 시작점.
35+
* 1) 액세스 토큰에서 userNo 추출 및 사용자 검증
36+
* 2) 토큰 TTL(초) 계산 → 오케스트레이터가 블랙리스트 TTL로 사용
37+
* 3) user.deletion.start 주제에 사가 시작 이벤트를 "동기 전송(get)"으로 발행
38+
* - acks=all + .get() 으로 브로커 수신을 확인(전송 멱등/안정성 강화)
39+
* 컨트롤러는 202 Accepted로 비동기 진행 알림이 자연스럽다.
40+
*/
2441
public void execute(String accessToken, UserDeletionRequest request) {
42+
log.debug("accessToken userId={}", tokenProvider.getId(accessToken));
2543
Long userNo = tokenProvider.getId(accessToken)
2644
.orElseThrow(() -> new RestApiException(INVALID_ID_TOKEN));
2745

@@ -31,8 +49,28 @@ public void execute(String accessToken, UserDeletionRequest request) {
3149
throw new RestApiException(INVALID_PASSWORD);
3250
}
3351

34-
// todo: Kafka - blacklist 전파
52+
Duration expiration = tokenProvider.getRemainingDuration(accessToken)
53+
.orElseThrow(() -> new RestApiException(INVALID_ACCESS_TOKEN));
54+
long ttlSec = Math.max(0, expiration.getSeconds());
3555

36-
userService.deleteUser(userNo);
56+
var start = new StartEvent(
57+
UUID.randomUUID().toString(), // ★ sagaId == correlationId
58+
userNo,
59+
accessToken,
60+
ttlSec
61+
);
62+
63+
try {
64+
String payload = om.writeValueAsString(start);
65+
// key=userNo 로 파티셔닝 → 동일 유저 이벤트 순서 보장
66+
//kafka.send("user.start-delete.command", String.valueOf(userNo), payload).get();
67+
sender.sendSync("user.start-delete.command", String.valueOf(userNo), payload);
68+
} catch (Exception e) {
69+
// 프로듀스 실패 → 트랜잭션 롤백(서비스 계층 예외로 래핑)
70+
throw new RuntimeException("failed to publish user.start-delete.command", e);
71+
}
3772
}
73+
74+
// 오케스트레이터가 소비하는 시작 이벤트 포맷
75+
public record StartEvent(String sagaId, Long userNo, String accessToken, long accessTokenTtlSec) {}
3876
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.project.user.domain.domain.entity;
2+
3+
import jakarta.persistence.*;
4+
import lombok.*;
5+
import java.time.Instant;
6+
7+
@Entity
8+
@Table(name="processed_event")
9+
@Getter @Setter @NoArgsConstructor @AllArgsConstructor @Builder
10+
public class ProcessedEvent {
11+
/**
12+
* eventId 를 PK로 사용 → 동일 이벤트 재처리 방지(멱등).
13+
* 상태 전이가 명확: IN_PROGRESS → SUCCESS | ERROR(+attempts++).
14+
*/
15+
@Id
16+
@Column(length=36)
17+
private String eventId;
18+
19+
@Column(nullable=false, length=16)
20+
private String type; // FINAL_DELETE
21+
22+
@Column(nullable=false, length=16)
23+
private String status; // IN_PROGRESS | SUCCESS | ERROR
24+
25+
@Builder.Default
26+
@Column(nullable=false)
27+
private Integer attempts = 1;
28+
29+
private String lastError;
30+
31+
@Builder.Default
32+
@Column(nullable=false)
33+
private Instant updatedAt = Instant.now();
34+
35+
// DB 기본값 보장(INSERT/UPDATE 시각)
36+
@PrePersist
37+
public void prePersist() {
38+
if (attempts == null) attempts = 1;
39+
if (updatedAt == null) updatedAt = Instant.now();
40+
}
41+
@PreUpdate
42+
public void preUpdate() {
43+
this.updatedAt = Instant.now();
44+
}
45+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.project.user.domain.domain.repository;
2+
3+
import com.project.user.domain.domain.entity.ProcessedEvent;
4+
import org.springframework.data.jpa.repository.JpaRepository;
5+
6+
public interface ProcessedEventRepository extends JpaRepository<ProcessedEvent, String> {}

src/main/java/com/project/user/domain/domain/service/UserService.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.project.user.domain.domain.service;
22

3+
import com.project.user.domain.application.dto.request.ChangeNicknameRequest;
4+
import com.project.user.domain.application.dto.request.ChangePasswordRequest;
35
import com.project.user.domain.application.dto.request.SignUpRequest;
46
import com.project.user.domain.domain.entity.User;
57
import com.project.user.domain.domain.repository.UserRepository;
@@ -9,6 +11,9 @@
911
import org.springframework.stereotype.Service;
1012
import org.springframework.transaction.annotation.Transactional;
1113

14+
import java.util.Objects;
15+
16+
import static com.project.user.global.exception.code.status.GlobalErrorStatus.PASSWORD_NOT_MATCH;
1217
import static com.project.user.global.exception.code.status.GlobalErrorStatus._NOT_FOUND;
1318

1419
@Service
@@ -43,8 +48,23 @@ public User findById(Long userNo) {
4348
.orElseThrow(() -> new RestApiException(_NOT_FOUND));
4449
}
4550

51+
public User updateNickname(Long userNo, ChangeNicknameRequest request) {
52+
User user = findById(userNo);
53+
user.changeNickname(request.nickname());
54+
return user;
55+
}
56+
57+
public void updatePassword(Long userNo, ChangePasswordRequest request) {
58+
//비밀번호 == 확인용 비밀번호 검사, 문자열 값 자체 비교
59+
if(!Objects.equals(request.password(), request.checkPassword())){
60+
throw new RestApiException(PASSWORD_NOT_MATCH);
61+
}
62+
User user = findById(userNo);
63+
user.changePassword(BCrypt.hashpw(request.password(), BCrypt.gensalt(12)));
64+
}
4665
@Transactional
4766
public void deleteUser(Long userNo) {
48-
userRepository.deleteById(userNo);
67+
// 멱등: 이미 삭제돼 있어도 예외 없이 통과
68+
userRepository.findById(userNo).ifPresent(userRepository::delete);
4969
}
5070
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.project.user.domain.infra.kafka;
2+
// 공통 유틸(선택): 헤더를 예쁘게 로깅하고, 카운터 증가
3+
4+
import io.micrometer.core.instrument.MeterRegistry;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.kafka.clients.consumer.ConsumerRecord;
8+
import org.springframework.kafka.support.KafkaHeaders;
9+
import org.springframework.messaging.handler.annotation.Header;
10+
import org.springframework.stereotype.Component;
11+
12+
@Slf4j
13+
@Component
14+
@RequiredArgsConstructor
15+
public class DltLogger {
16+
17+
private final MeterRegistry meter; // micrometer
18+
19+
public void logAndCount(
20+
String dltTopic,
21+
ConsumerRecord<String, String> rec,
22+
@Header(name = KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) String oTopic,
23+
@Header(name = KafkaHeaders.DLT_ORIGINAL_PARTITION, required = false) Integer oPart,
24+
@Header(name = KafkaHeaders.DLT_ORIGINAL_OFFSET, required = false) Long oOffset,
25+
@Header(name = KafkaHeaders.DLT_EXCEPTION_FQCN, required = false) String exClass,
26+
@Header(name = KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) String exMsg
27+
) {
28+
// 메트릭: dlt 카운트
29+
meter.counter("kafka.dlt.count",
30+
"dltTopic", dltTopic,
31+
"origTopic", String.valueOf(oTopic))
32+
.increment();
33+
34+
log.error("[DLT] topic={}, origTopic={}, origPartition={}, origOffset={}, key={}, ts={}, exClass={}, exMsg={}, payload={}",
35+
dltTopic,
36+
oTopic, oPart, oOffset,
37+
rec.key(),
38+
rec.timestamp(),
39+
exClass, exMsg,
40+
rec.value()
41+
);
42+
}
43+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.project.user.domain.infra.kafka;
2+
3+
public class InvalidPayloadException extends RuntimeException {
4+
// ★ 파싱/검증 불가(독성)인 경우 던져서 @RetryableTopic exclude 로 즉시 DLT 이동
5+
public InvalidPayloadException(String message) { super(message); }
6+
public InvalidPayloadException(String message, Throwable cause) { super(message, cause); }
7+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.project.user.domain.infra.kafka;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.springframework.kafka.core.KafkaTemplate;
5+
import org.springframework.stereotype.Component;
6+
import io.github.resilience4j.retry.annotation.Retry;
7+
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
8+
9+
/**
10+
* 카프카 전송 래퍼:
11+
* - .get() 호출로 브로커 acks=all 까지 동기 확인(실패 시 예외 발생)
12+
* - Resilience4j @Retry/@CircuitBreaker 로 일시 장애 완충
13+
* (Fallback 은 재던져서 Outbox 백오프로 연결)
14+
*/
15+
@Component @RequiredArgsConstructor
16+
public class ResilientSender {
17+
private final KafkaTemplate<String,String> kafka;
18+
19+
@Retry(name="kafkaSend")
20+
@CircuitBreaker(name="kafkaSend", fallbackMethod = "sendFallback")
21+
public void sendSync(String topic, String key, String payload) throws Exception {
22+
kafka.send(topic, key, payload).get(); // 동기 전송
23+
}
24+
@SuppressWarnings("unused")
25+
public void sendFallback(String topic, String key, String payload, Throwable t) throws Exception {
26+
throw (t instanceof Exception) ? (Exception)t : new RuntimeException(t);
27+
}
28+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.project.user.domain.infra.kafka;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.kafka.support.KafkaHeaders;
7+
import org.springframework.messaging.handler.annotation.Header;
8+
import org.springframework.stereotype.Component;
9+
10+
@Component
11+
@RequiredArgsConstructor
12+
public class UserDltListeners {
13+
14+
private final DltLogger dlt;
15+
16+
@KafkaListener(topics = "user.final-delete.command.dlt", groupId = "user-service")
17+
public void onFinalDeleteCmdDlt(
18+
ConsumerRecord<String,String> rec,
19+
@Header(name = KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) String oTopic,
20+
@Header(name = KafkaHeaders.DLT_ORIGINAL_PARTITION, required = false) Integer oPart,
21+
@Header(name = KafkaHeaders.DLT_ORIGINAL_OFFSET, required = false) Long oOffset,
22+
@Header(name = KafkaHeaders.DLT_EXCEPTION_FQCN, required = false) String exClass,
23+
@Header(name = KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) String exMsg
24+
){
25+
dlt.logAndCount("user.final-delete.command.dlt", rec, oTopic, oPart, oOffset, exClass, exMsg);
26+
}
27+
}

0 commit comments

Comments
 (0)