Skip to content

Commit 6a95020

Browse files
committed
[FEAT] 회원 탈퇴로 인한 서비스 전파 fix : 토픽 이름 변경, dlt 로깅, 일부 로직 변경, yml 파일 수정
1 parent 031259c commit 6a95020

File tree

8 files changed

+180
-85
lines changed

8 files changed

+180
-85
lines changed

src/main/java/com/project/user/domain/application/usecase/DeleteAccountUseCase.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.project.user.domain.domain.service.UserService;
77
import com.project.user.global.exception.RestApiException;
88
import com.project.user.global.security.TokenProvider;
9+
import com.project.user.domain.infra.kafka.ResilientSender;
910
import jakarta.transaction.Transactional;
1011
import lombok.RequiredArgsConstructor;
1112
import org.mindrot.jbcrypt.BCrypt;
@@ -27,9 +28,8 @@ public class DeleteAccountUseCase {
2728

2829
private final TokenProvider tokenProvider;
2930
private final UserService userService;
30-
private final KafkaTemplate<String, String> kafka; // ★ 카프카 프로듀서
3131
private final ObjectMapper om;
32-
32+
private final ResilientSender sender;
3333
/**
3434
* 유저 삭제 흐름의 시작점.
3535
* 1) 액세스 토큰에서 userNo 추출 및 사용자 검증
@@ -63,10 +63,11 @@ public void execute(String accessToken, UserDeletionRequest request) {
6363
try {
6464
String payload = om.writeValueAsString(start);
6565
// key=userNo 로 파티셔닝 → 동일 유저 이벤트 순서 보장
66-
kafka.send("user.deletion.start", String.valueOf(userNo), payload).get();
66+
//kafka.send("user.start-delete.command", String.valueOf(userNo), payload).get();
67+
sender.sendSync("user.start-delete.command", String.valueOf(userNo), payload);
6768
} catch (Exception e) {
6869
// 프로듀스 실패 → 트랜잭션 롤백(서비스 계층 예외로 래핑)
69-
throw new RuntimeException("failed to publish user.deletion.start", e);
70+
throw new RuntimeException("failed to publish user.start-delete.command", e);
7071
}
7172
}
7273

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.project.user.domain.infra.kafka;
2+
// 공통 유틸(선택): 헤더를 예쁘게 로깅하고, 카운터 증가
3+
4+
import io.micrometer.core.instrument.MeterRegistry;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.kafka.clients.consumer.ConsumerRecord;
8+
import org.springframework.kafka.support.KafkaHeaders;
9+
import org.springframework.messaging.handler.annotation.Header;
10+
import org.springframework.stereotype.Component;
11+
12+
@Slf4j
13+
@Component
14+
@RequiredArgsConstructor
15+
public class DltLogger {
16+
17+
private final MeterRegistry meter; // micrometer
18+
19+
public void logAndCount(
20+
String dltTopic,
21+
ConsumerRecord<String, String> rec,
22+
@Header(name = KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) String oTopic,
23+
@Header(name = KafkaHeaders.DLT_ORIGINAL_PARTITION, required = false) Integer oPart,
24+
@Header(name = KafkaHeaders.DLT_ORIGINAL_OFFSET, required = false) Long oOffset,
25+
@Header(name = KafkaHeaders.DLT_EXCEPTION_FQCN, required = false) String exClass,
26+
@Header(name = KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) String exMsg
27+
) {
28+
// 메트릭: dlt 카운트
29+
meter.counter("kafka.dlt.count",
30+
"dltTopic", dltTopic,
31+
"origTopic", String.valueOf(oTopic))
32+
.increment();
33+
34+
log.error("[DLT] topic={}, origTopic={}, origPartition={}, origOffset={}, key={}, ts={}, exClass={}, exMsg={}, payload={}",
35+
dltTopic,
36+
oTopic, oPart, oOffset,
37+
rec.key(),
38+
rec.timestamp(),
39+
exClass, exMsg,
40+
rec.value()
41+
);
42+
}
43+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.project.user.domain.infra.kafka;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.springframework.kafka.core.KafkaTemplate;
5+
import org.springframework.stereotype.Component;
6+
import io.github.resilience4j.retry.annotation.Retry;
7+
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
8+
9+
/**
10+
* 카프카 전송 래퍼:
11+
* - .get() 호출로 브로커 acks=all 까지 동기 확인(실패 시 예외 발생)
12+
* - Resilience4j @Retry/@CircuitBreaker 로 일시 장애 완충
13+
* (Fallback 은 재던져서 Outbox 백오프로 연결)
14+
*/
15+
@Component @RequiredArgsConstructor
16+
public class ResilientSender {
17+
private final KafkaTemplate<String,String> kafka;
18+
19+
@Retry(name="kafkaSend")
20+
@CircuitBreaker(name="kafkaSend", fallbackMethod = "sendFallback")
21+
public void sendSync(String topic, String key, String payload) throws Exception {
22+
kafka.send(topic, key, payload).get(); // 동기 전송
23+
}
24+
@SuppressWarnings("unused")
25+
public void sendFallback(String topic, String key, String payload, Throwable t) throws Exception {
26+
throw (t instanceof Exception) ? (Exception)t : new RuntimeException(t);
27+
}
28+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.project.user.domain.infra.kafka;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.kafka.support.KafkaHeaders;
7+
import org.springframework.messaging.handler.annotation.Header;
8+
import org.springframework.stereotype.Component;
9+
10+
@Component
11+
@RequiredArgsConstructor
12+
public class UserDltListeners {
13+
14+
private final DltLogger dlt;
15+
16+
@KafkaListener(topics = "user.final-delete.command.dlt", groupId = "user-service")
17+
public void onFinalDeleteCmdDlt(
18+
ConsumerRecord<String,String> rec,
19+
@Header(name = KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) String oTopic,
20+
@Header(name = KafkaHeaders.DLT_ORIGINAL_PARTITION, required = false) Integer oPart,
21+
@Header(name = KafkaHeaders.DLT_ORIGINAL_OFFSET, required = false) Long oOffset,
22+
@Header(name = KafkaHeaders.DLT_EXCEPTION_FQCN, required = false) String exClass,
23+
@Header(name = KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) String exMsg
24+
){
25+
dlt.logAndCount("user.final-delete.command.dlt", rec, oTopic, oPart, oOffset, exClass, exMsg);
26+
}
27+
}

src/main/java/com/project/user/domain/infra/kafka/UserFinalDeleteListener.java

Lines changed: 53 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.kafka.clients.consumer.ConsumerRecord;
1111
import org.springframework.kafka.annotation.KafkaListener;
1212
import org.springframework.kafka.annotation.RetryableTopic;
13-
import org.springframework.kafka.core.KafkaTemplate;
1413
import org.springframework.kafka.support.Acknowledgment;
1514
import org.springframework.retry.annotation.Backoff;
1615
import org.springframework.stereotype.Component;
@@ -23,9 +22,8 @@ public class UserFinalDeleteListener {
2322

2423
private final UserRepository userRepo;
2524
private final ProcessedEventRepository peRepo;
26-
private final KafkaTemplate<String,String> kafka;
2725
private final ObjectMapper om;
28-
26+
private final ResilientSender sender;
2927
/** 멱등 시작 마킹: (없으면 INSERT) → 이미 SUCCESS면 스킵 */
3028
private boolean tryBegin(String eventId, String type){
3129
if (!peRepo.existsById(eventId)) {
@@ -61,81 +59,84 @@ private void markError(String eventId, String msg){
6159

6260
/**
6361
* 최종 삭제 처리 플로우:
64-
* - @RetryableTopic: 최대 5회 지수 백오프 재시도 → 실패 시 user.final.delete.dlt 로 이동
62+
* - @RetryableTopic: 최대 5회 지수 백오프 재시도 → 실패 시 user.final-delete.command.dlt 로 이동
6563
* - InvalidPayloadException 은 exclude → 즉시 DLT(재시도 없음)
6664
* - 수신 → 파싱/검증(독성은 즉시 DLT) → 멱등 시작 → 비즈니스(하드 삭제) → reply 동기 전송
6765
* - 커밋 이후에만 수동 ACK(MANUAL_IMMEDIATE)로 오프셋 커밋 → DB/메시지 정합 보장
6866
*/
6967
@RetryableTopic(
7068
attempts = "5",
7169
backoff = @Backoff(delay = 1000, multiplier = 2.0),
72-
autoCreateTopics = "true",
70+
autoCreateTopics = "false",
7371
dltTopicSuffix = ".dlt",
74-
exclude = { InvalidPayloadException.class } // ★ 독성 페이로드는 즉시 DLT
72+
exclude = { InvalidPayloadException.class }
7573
)
7674
@KafkaListener(
77-
topics = "user.final.delete",
75+
topics = "user.final-delete.command",
7876
groupId = "user-service",
7977
containerFactory = "kafkaManualAckFactory"
8078
)
8179
@Transactional
8280
public void onFinalDelete(ConsumerRecord<String,String> rec, Acknowledgment ack) throws Exception {
8381

84-
// 0) 파싱 + 유효성 검증 — 독성(InvalidPayloadException)은 즉시 DLT 보내도록 로깅 후 재던짐
82+
// 0) 파싱 + 유효성 검증 — 독성(InvalidPayloadException)은 즉시 DLT
8583
final JsonNode n;
8684
try {
8785
n = om.readTree(rec.value());
88-
String eventId = n.path("eventId").asText("").trim();
89-
long userNo = n.path("userNo").asLong(0L);
90-
if (eventId.isEmpty()) throw new InvalidPayloadException("Missing or empty 'eventId'");
91-
if (userNo <= 0L) throw new InvalidPayloadException("Invalid 'userNo'");
86+
} catch (Exception e) {
87+
log.error("[user-service] toxic payload -> DLT, value={}", rec.value());
88+
throw new InvalidPayloadException("Invalid JSON: " + e.getMessage(), e);
89+
}
9290

93-
// 1) 멱등 처리(이미 성공이면 스킵)
94-
if (!tryBegin(eventId, "FINAL_DELETE")) {
95-
ack.acknowledge();
96-
return;
97-
}
91+
final String eventId = n.path("eventId").asText("").trim();
92+
final long userNo = n.path("userNo").asLong(0L);
93+
if (eventId.isEmpty()) throw new InvalidPayloadException("Missing or empty 'eventId'");
94+
if (userNo <= 0L) throw new InvalidPayloadException("Invalid 'userNo'");
9895

99-
try {
100-
// 2) 실제 하드 삭제(멱등)
101-
userRepo.deleteById(userNo);
96+
// 1) 멱등 처리(이미 SUCCESS면 스킵)
97+
if (!tryBegin(eventId, "FINAL_DELETE")) {
98+
ack.acknowledge();
99+
return;
100+
}
102101

103-
// 3) 성공 reply 동기 전송(acks=all, .get())
104-
var reply = om.createObjectNode()
105-
.put("eventId", eventId)
106-
.put("userNo", userNo)
107-
.put("status", "SUCCESS")
108-
.put("type", "FINAL_DELETE");
109-
kafka.send("user.final.reply", String.valueOf(userNo), om.writeValueAsString(reply)).get();
102+
// 2) 실제 하드 삭제(멱등)
103+
try {
104+
userRepo.deleteById(userNo);
110105

111-
// 운영 관측성: 키/이벤트/유저
112-
log.info("FINAL_DELETE ok key={}, eventId={}, userNo={}", rec.key(), eventId, userNo);
106+
// 3) 성공 마킹(DB 커밋 대상)
107+
markSuccess(eventId);
113108

114-
// 4) 성공 마킹
115-
markSuccess(eventId);
109+
// 4) 커밋 이후 reply 전송 + ACK
110+
final String replyPayload = om.createObjectNode()
111+
.put("eventId", eventId)
112+
.put("userNo", userNo)
113+
.put("status", "SUCCESS")
114+
.put("type", "FINAL_DELETE")
115+
.toString();
116116

117-
// 5) 트랜잭션 커밋 이후 ACK (정합성 보장)
118-
org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization(
119-
new org.springframework.transaction.support.TransactionSynchronization() {
120-
@Override public void afterCommit() { ack.acknowledge(); }
117+
org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization(
118+
new org.springframework.transaction.support.TransactionSynchronization() {
119+
@Override public void afterCommit() {
120+
try {
121+
// 커밋 확정 후에만 브로커에 회신(정합성 ↑)
122+
sender.sendSync("user.final-delete.reply", String.valueOf(userNo), replyPayload);
123+
ack.acknowledge(); // 전송 성공시에만 오프셋 커밋
124+
log.info("FINAL_DELETE ok key={}, eventId={}, userNo={}", rec.key(), eventId, userNo);
125+
} catch (Exception sendEx) {
126+
// 전송 실패: ACK 하지 않음 → 오프셋 미커밋 상태 유지
127+
// 컨슈머 재시작/리밸런스 시 같은 레코드 재처리됨(@RetryableTopic와는 별개)
128+
log.error("FINAL_DELETE reply send failed (will be retried on re-consume). " +
129+
"eventId={}, userNo={}, err={}", eventId, userNo, sendEx.toString());
130+
}
121131
}
122-
);
123-
} catch (Exception ex) {
124-
log.warn("FINAL_DELETE failed userNo={}, eventId={}, err={}", userNo, eventId, ex.toString());
125-
markError(eventId, ex.getMessage());
126-
throw ex; // 재시도 → 최대 횟수 후 DLT
127-
}
132+
}
133+
);
128134

129-
} catch (InvalidPayloadException bad) {
130-
// ★ 독성 페이로드 → 재시도 없이 즉시 DLT (exclude 규칙)
131-
log.error("[user-service] toxic payload -> DLT, value={}", rec.value());
132-
throw bad;
135+
} catch (Exception ex) {
136+
// 비즈니스/기술 예외 → 재시도/DTL 체인
137+
log.warn("FINAL_DELETE failed userNo={}, eventId={}, err={}", userNo, eventId, ex.toString());
138+
markError(eventId, ex.getMessage());
139+
throw ex;
133140
}
134141
}
135-
136-
@KafkaListener(topics = "user.final.delete.dlt", groupId = "user-service")
137-
public void onFinalDeleteDlt(String payload){
138-
// DLT 모니터링(알람/대시보드 연계 지점)
139-
log.error("[DLT][user.final.delete] {}", payload);
140-
}
141-
}
142+
}

src/main/java/com/project/user/global/config/KafkaConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ public ProducerFactory<String,String> producerFactory(KafkaProperties props){
2424
Map<String,Object> cfg = new HashMap<>(props.buildProducerProperties());
2525
cfg.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
2626
cfg.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
27-
cfg.put(ProducerConfig.ACKS_CONFIG, "all"); // 브로커 전부 ACK 요구
28-
cfg.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등 프로듀서
29-
cfg.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 순서 보장
27+
// cfg.put(ProducerConfig.ACKS_CONFIG, "all"); // 브로커 전부 ACK 요구
28+
// cfg.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등 프로듀서
29+
// cfg.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 순서 보장
3030
return new DefaultKafkaProducerFactory<>(cfg);
3131
}
3232
@Bean public KafkaTemplate<String,String> kafkaTemplate(ProducerFactory<String,String> pf){ return new KafkaTemplate<>(pf); }
@@ -37,7 +37,7 @@ public ConsumerFactory<String,String> consumerFactory(KafkaProperties props){
3737
cfg.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
3838
cfg.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
3939
cfg.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 수동 커밋(ACK)
40-
cfg.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 운영 기본: 최신부터
40+
cfg.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 클라우드 운영 기본: 최신부터
4141
cfg.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 배치 사이즈(사건당 작업시간 고려)
4242
cfg.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000); // 최대 처리 간격(5분)
4343
return new DefaultKafkaConsumerFactory<>(cfg);
@@ -58,7 +58,7 @@ public ConcurrentKafkaListenerContainerFactory<String,String> kafkaManualAckFact
5858
var f = new ConcurrentKafkaListenerContainerFactory<String,String>();
5959
f.setConsumerFactory(cf);
6060
f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 우리가 직접 ack
61-
f.setConcurrency(1); // 파티션 1개 기준(필요 시 조정)
61+
//f.setConcurrency(1); // 파티션 1개 기준(필요 시 조정)
6262
return f;
6363
}
6464

src/main/java/com/project/user/global/config/Resilience4jConfig.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/main/resources/application.yml

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,28 @@ resilience4j:
2424

2525
spring:
2626
kafka:
27-
bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:9094} # 기본값은 로컬 외부포트
27+
bootstrap-servers: pkc-gq2xn.asia-northeast3.gcp.confluent.cloud:9092
28+
properties:
29+
security.protocol: SASL_SSL
30+
sasl.mechanism: PLAIN
31+
sasl.jaas.config: >
32+
org.apache.kafka.common.security.plain.PlainLoginModule required
33+
username="AGQDVMNB6DVPXFX7"
34+
password="cfltBaumbQnzwuPIRS3E8S1MNrfqrsb9LXzDvUBPBrP0YQRUTfBqdD2nOKmzUXFw";
35+
producer:
36+
acks: all
37+
retries: 3
38+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
39+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
2840
consumer:
29-
group-id: user-service
41+
enable-auto-commit: false
42+
auto-offset-reset: latest
43+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
44+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3045
admin:
46+
auto-create: false
3147
fail-fast: true
48+
3249
datasource:
3350
username: ${RDB_USERNAME}
3451
url: ${RDB_URL}

0 commit comments

Comments
 (0)