Skip to content

Commit fc26061

Browse files
committed
2 parents 7ed5512 + 7df091e commit fc26061

File tree

7 files changed

+234
-2
lines changed

7 files changed

+234
-2
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ dependencies {
3232
// Apache HttpClient5
3333
implementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
3434

35+
// Kafka
36+
implementation "org.springframework.kafka:spring-kafka"
37+
3538
// Lombok
3639
compileOnly 'org.projectlombok:lombok'
3740
annotationProcessor 'org.projectlombok:lombok'
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.project.auth.domain.infra.kafka;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.kafka.support.KafkaHeaders;
7+
import org.springframework.messaging.handler.annotation.Header;
8+
import org.springframework.stereotype.Component;
9+
10+
@Component
11+
@RequiredArgsConstructor
12+
public class AuthDltListeners {
13+
14+
private final DltLogger dlt;
15+
16+
@KafkaListener(topics = "auth.token-delete.command.dlt", groupId = "auth-service")
17+
public void onTokenDeleteCmdDlt(
18+
ConsumerRecord<String,String> rec,
19+
@Header(name = KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) String oTopic,
20+
@Header(name = KafkaHeaders.DLT_ORIGINAL_PARTITION, required = false) Integer oPart,
21+
@Header(name = KafkaHeaders.DLT_ORIGINAL_OFFSET, required = false) Long oOffset,
22+
@Header(name = KafkaHeaders.DLT_EXCEPTION_FQCN, required = false) String exClass,
23+
@Header(name = KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) String exMsg
24+
){
25+
dlt.logAndCount("auth.token-delete.command.dlt", rec, oTopic, oPart, oOffset, exClass, exMsg);
26+
}
27+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// com.project.auth.domain.infra.kafka.AuthRevokeListener.java
2+
package com.project.auth.domain.infra.kafka;
3+
4+
import com.fasterxml.jackson.databind.JsonNode;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.project.auth.domain.domain.service.TokenBlacklistService;
7+
import com.project.auth.domain.domain.service.RefreshTokenService;
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import org.springframework.kafka.annotation.KafkaListener;
12+
import org.springframework.kafka.annotation.RetryableTopic;
13+
import org.springframework.kafka.support.Acknowledgment;
14+
import org.springframework.retry.annotation.Backoff;
15+
import org.springframework.stereotype.Component;
16+
17+
import java.time.Duration;
18+
19+
@Slf4j
20+
@Component
21+
@RequiredArgsConstructor
22+
public class AuthRevokeListener {
23+
24+
private final ObjectMapper om;
25+
private final TokenBlacklistService blacklist;
26+
private final RefreshTokenService refreshTokens;
27+
28+
@RetryableTopic(
29+
attempts = "3",
30+
backoff = @Backoff(delay = 1000, multiplier = 2.0),
31+
autoCreateTopics = "false",
32+
dltTopicSuffix = ".dlt",
33+
exclude = { InvalidPayloadException.class } // 독성 → 즉시 DLT
34+
)
35+
@KafkaListener(
36+
topics = "auth.token-delete.command",
37+
groupId = "auth-service",
38+
containerFactory = "kafkaManualAckFactory"
39+
)
40+
public void onRevoke(ConsumerRecord<String, String> rec, Acknowledgment ack) throws Exception {
41+
final JsonNode n;
42+
try {
43+
n = om.readTree(rec.value());
44+
} catch (Exception e) {
45+
log.error("[auth] toxic payload -> DLT, value={}", rec.value());
46+
throw new InvalidPayloadException("Invalid JSON: " + e.getMessage(), e);
47+
}
48+
49+
long userNo = n.path("userNo").asLong(0L);
50+
String accessToken = n.path("accessToken").asText(null);
51+
long ttlSec = n.path("blacklistSeconds").asLong(0L);
52+
53+
if (userNo <= 0L) {
54+
log.error("[auth] toxic payload -> DLT (invalid userNo), value={}", rec.value());
55+
throw new InvalidPayloadException("Missing/invalid userNo");
56+
}
57+
58+
// 1) 액세스 토큰 블랙리스트: 남은 TTL이 있을 때만
59+
if (accessToken != null && !accessToken.isBlank() && ttlSec > 0) {
60+
blacklist.blacklist(accessToken, Duration.ofSeconds(ttlSec));
61+
log.info("[auth] access token blacklisted userNo={}, ttlSec={}", userNo, ttlSec);
62+
} else {
63+
log.info("[auth] skip access-token blacklist (tokenMissingOrTtl<=0) userNo={}, ttlSec={}, tokenPresent={}",
64+
userNo, ttlSec, accessToken != null && !accessToken.isBlank());
65+
}
66+
67+
// 2) 리프레시 토큰은 ‘항상’ 제거 (재발급 경로 차단)
68+
refreshTokens.deleteRefreshToken(userNo);
69+
70+
// 3) 오프셋 커밋
71+
ack.acknowledge();
72+
}
73+
74+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.project.auth.domain.infra.kafka;
2+
3+
// 공통 유틸(선택): 헤더를 예쁘게 로깅하고, 카운터 증가
4+
5+
import io.micrometer.core.instrument.MeterRegistry;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.apache.kafka.clients.consumer.ConsumerRecord;
9+
import org.springframework.kafka.support.KafkaHeaders;
10+
import org.springframework.messaging.handler.annotation.Header;
11+
import org.springframework.stereotype.Component;
12+
13+
@Slf4j
14+
@Component
15+
@RequiredArgsConstructor
16+
public class DltLogger {
17+
18+
private final MeterRegistry meter; // micrometer
19+
20+
public void logAndCount(
21+
String dltTopic,
22+
ConsumerRecord<String, String> rec,
23+
@Header(name = KafkaHeaders.DLT_ORIGINAL_TOPIC, required = false) String oTopic,
24+
@Header(name = KafkaHeaders.DLT_ORIGINAL_PARTITION, required = false) Integer oPart,
25+
@Header(name = KafkaHeaders.DLT_ORIGINAL_OFFSET, required = false) Long oOffset,
26+
@Header(name = KafkaHeaders.DLT_EXCEPTION_FQCN, required = false) String exClass,
27+
@Header(name = KafkaHeaders.DLT_EXCEPTION_MESSAGE, required = false) String exMsg
28+
) {
29+
// 메트릭: dlt 카운트
30+
meter.counter("kafka.dlt.count",
31+
"dltTopic", dltTopic,
32+
"origTopic", String.valueOf(oTopic))
33+
.increment();
34+
35+
log.error("[DLT] topic={}, origTopic={}, origPartition={}, origOffset={}, key={}, ts={}, exClass={}, exMsg={}, payload={}",
36+
dltTopic,
37+
oTopic, oPart, oOffset,
38+
rec.key(),
39+
rec.timestamp(),
40+
exClass, exMsg,
41+
rec.value()
42+
);
43+
}
44+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.project.auth.domain.infra.kafka;
2+
3+
public class InvalidPayloadException extends RuntimeException {
4+
public InvalidPayloadException(String msg) { super(msg); }
5+
public InvalidPayloadException(String msg, Throwable cause) { super(msg, cause); }
6+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// com.project.auth.global.config.KafkaConfig.java
2+
package com.project.auth.global.config;
3+
4+
import org.apache.kafka.clients.consumer.ConsumerConfig;
5+
import org.apache.kafka.common.serialization.StringDeserializer;
6+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
7+
import org.springframework.context.annotation.Bean;
8+
import org.springframework.context.annotation.Configuration;
9+
import org.springframework.kafka.annotation.EnableKafka;
10+
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
11+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
12+
import org.springframework.kafka.core.ConsumerFactory;
13+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
14+
import org.springframework.kafka.listener.ContainerProperties;
15+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
16+
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
20+
@Configuration
21+
@EnableKafka
22+
@EnableKafkaRetryTopic // @RetryableTopic 활성화
23+
public class KafkaConfig {
24+
25+
@Bean
26+
public ConsumerFactory<String, String> consumerFactory(KafkaProperties props) {
27+
Map<String, Object> cfg = new HashMap<>(props.buildConsumerProperties());
28+
cfg.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
29+
cfg.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
30+
cfg.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
31+
return new DefaultKafkaConsumerFactory<>(cfg);
32+
}
33+
34+
@Bean(name = "kafkaManualAckFactory")
35+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckFactory(
36+
ConsumerFactory<String, String> cf) {
37+
var f = new ConcurrentKafkaListenerContainerFactory<String, String>();
38+
f.setConsumerFactory(cf);
39+
f.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
40+
//f.setConcurrency(1);
41+
return f;
42+
}
43+
44+
// @RetryableTopic의 백오프 스케줄링용
45+
@Bean
46+
public org.springframework.scheduling.TaskScheduler taskScheduler() {
47+
ThreadPoolTaskScheduler t = new ThreadPoolTaskScheduler();
48+
t.setPoolSize(2);
49+
t.setThreadNamePrefix("auth-sched-");
50+
t.initialize();
51+
return t;
52+
}
53+
}

src/main/resources/application.yml

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,31 @@ server:
22
port: 8080
33

44
spring:
5+
kafka:
6+
bootstrap-servers: pkc-gq2xn.asia-northeast3.gcp.confluent.cloud:9092
7+
properties:
8+
security.protocol: SASL_SSL
9+
sasl.mechanism: PLAIN
10+
sasl.jaas.config: >
11+
org.apache.kafka.common.security.plain.PlainLoginModule required
12+
username="AGQDVMNB6DVPXFX7"
13+
password="cfltBaumbQnzwuPIRS3E8S1MNrfqrsb9LXzDvUBPBrP0YQRUTfBqdD2nOKmzUXFw";
14+
producer:
15+
acks: all
16+
retries: 3
17+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
18+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
19+
consumer:
20+
enable-auto-commit: false
21+
auto-offset-reset: latest
22+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
23+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
24+
max-poll-records: 100
25+
max-poll-interval-ms: 300000 # 5분
26+
admin:
27+
auto-create: false
28+
fail-fast: true
29+
530
data:
631
redis:
732
host: ${SPRING_DATA_REDIS_HOST:localhost}
@@ -80,7 +105,7 @@ logging:
80105
org.springframework: DEBUG
81106
org.springframework.web: DEBUG
82107
org.springframework.boot: DEBUG
83-
com.project.user: DEBUG
108+
com.project.auth: DEBUG
84109
org.hibernate.SQL: DEBUG
85110
org.hibernate.type.descriptor.sql: TRACE
86111

@@ -109,7 +134,7 @@ management:
109134
enabled: true
110135
metrics:
111136
tags:
112-
application: user-service
137+
application: auth-service
113138
distribution:
114139
percentiles-histogram:
115140
http.server.requests: true

0 commit comments

Comments
 (0)