Skip to content

Commit f8e92ff

Browse files
committed
KB-11374 :: DEV | BE | Service Impl for consume the kafka event and update the content metaData
2 parents 36aa8c9 + e7c6f7b commit f8e92ff

File tree

12 files changed

+722
-3
lines changed

12 files changed

+722
-3
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.igot.cb.common;
2+
3+
import lombok.Getter;
4+
import lombok.Setter;
5+
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.stereotype.Component;
7+
8+
@Getter
9+
@Setter
10+
@Component
11+
public class ServerProperties {
12+
13+
@Value("${learning_service_vm_base_url}")
14+
private String learningServiceVmBaseUrl;
15+
16+
@Value("${system.content.update.url}")
17+
private String systemUpdateAPI;
18+
19+
@Value("${vod.bucket.prefix}")
20+
private String vodBucketPrefix;
21+
22+
@Value("${vod.stream.url.prefix}")
23+
private String vodStreamUrlPrefix;
24+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.igot.cb.config;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.common.serialization.StringDeserializer;
5+
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.Configuration;
8+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
9+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
10+
import org.springframework.kafka.core.ConsumerFactory;
11+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
12+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
13+
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
@Configuration
17+
public class ConsumerConfiguration {
18+
@Value("${spring.kafka.bootstrap.servers}")
19+
private String kafkabootstrapAddress;
20+
21+
@Value("${kakfa.offset.reset.value}")
22+
private String kafkaOffsetResetValue;
23+
24+
@Value("${kafka.max.poll.interval.ms}")
25+
private Integer kafkaMaxPollInterval;
26+
27+
@Value("${kafka.max.poll.records}")
28+
private Integer kafkaMaxPollRecords;
29+
30+
@Value("${kafka.auto.commit.interval.ms}")
31+
private Integer kafkaAutoCommitInterval;
32+
33+
@Bean
34+
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
35+
36+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
37+
factory.setConsumerFactory(consumerFactory());
38+
factory.setConcurrency(4);
39+
factory.getContainerProperties().setPollTimeout(3000);
40+
return factory;
41+
}
42+
43+
@Bean
44+
public ConsumerFactory<String, String> consumerFactory() {
45+
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
46+
47+
}
48+
49+
@Bean
50+
public Map<String, Object> consumerConfigs() {
51+
Map<String, Object> propsMap = new HashMap<>();
52+
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkabootstrapAddress);
53+
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
54+
propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000");
55+
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaAutoCommitInterval);
56+
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
57+
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
58+
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
59+
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaOffsetResetValue);
60+
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaMaxPollInterval);
61+
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaMaxPollRecords);
62+
return propsMap;
63+
}
64+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.igot.cb.config;
2+
3+
import org.apache.kafka.clients.producer.ProducerConfig;
4+
import org.apache.kafka.common.serialization.StringSerializer;
5+
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.Configuration;
8+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
9+
import org.springframework.kafka.core.KafkaTemplate;
10+
import org.springframework.kafka.core.ProducerFactory;
11+
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
@Configuration
15+
public class ProducerConfiguration {
16+
@Value("${spring.kafka.bootstrap.servers}")
17+
private String kafkabootstrapAddress;
18+
19+
@Bean
20+
public ProducerFactory<String, String> producerFactory() {
21+
22+
Map<String, Object> config = new HashMap<>();
23+
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkabootstrapAddress);
24+
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
25+
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
26+
return new DefaultKafkaProducerFactory<>(config);
27+
}
28+
29+
@Bean
30+
public KafkaTemplate<String, String> kafkaTemplate() {
31+
return new KafkaTemplate<>(producerFactory());
32+
}
33+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package com.igot.cb.consumer;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.core.type.TypeReference;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.igot.cb.common.ServerProperties;
7+
import com.igot.cb.service.OutboundRequestHandlerServiceImpl;
8+
import com.igot.cb.util.Constants;
9+
import org.apache.commons.lang3.StringUtils;
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import org.springframework.beans.factory.annotation.Autowired;
14+
import org.springframework.kafka.annotation.KafkaListener;
15+
import org.springframework.stereotype.Component;
16+
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
import java.util.concurrent.CompletableFuture;
20+
21+
@Component
22+
public class VideoOnDemandKafkaConsumer {
23+
private static final Logger logger = LoggerFactory.getLogger(VideoOnDemandKafkaConsumer.class);
24+
25+
@Autowired
26+
private ObjectMapper mapper;
27+
28+
@Autowired
29+
private ServerProperties serverProperties;
30+
31+
@Autowired
32+
private OutboundRequestHandlerServiceImpl outboundRequestHandlerService;
33+
34+
@KafkaListener(topics = "${spring.kafka.content.metadata.update.topic.name}", groupId = "${spring.kafka.content.metadata.update.consumer.group.id}")
35+
public void contentMetadataUpdateConsumerForVOD(ConsumerRecord<String, String> data) {
36+
logger.debug("Processing Kafka message from topic: {}", data.topic());
37+
38+
if (StringUtils.isBlank(data.value())) {
39+
logger.warn("Received empty message from Kafka topic: {}", data.topic());
40+
return;
41+
}
42+
43+
CompletableFuture.runAsync(() -> processMetadataUpdate(data.value()))
44+
.exceptionally(ex -> {
45+
logger.error("Failed to process metadata update asynchronously for topic: {}", data.topic(), ex);
46+
return null;
47+
});
48+
}
49+
50+
private void processMetadataUpdate(String messageValue) {
51+
try {
52+
Map<String, String> data = mapper.readValue(messageValue, new TypeReference<Map<String, String>>() {
53+
});
54+
55+
if (!isValidRequest(data)) {
56+
logger.warn("Invalid request data: missing identifier or streaming URL");
57+
return;
58+
}
59+
60+
String identifier = data.get(Constants.IDENTIFIER);
61+
String streamingUrl = data.get(Constants.STREAMING_URI);
62+
63+
updateContentMetadata(identifier, streamingUrl);
64+
65+
} catch (JsonProcessingException e) {
66+
logger.error("Failed to parse Kafka message: {}", messageValue, e);
67+
} catch (Exception e) {
68+
logger.error("Unexpected error processing metadata update for message: {}", messageValue, e);
69+
}
70+
}
71+
72+
private boolean isValidRequest(Map<String, String> data) {
73+
if (data == null || data.isEmpty()) {
74+
return false;
75+
}
76+
77+
String identifier = data.get(Constants.IDENTIFIER);
78+
String streamingUrl = data.get(Constants.STREAMING_URI);
79+
80+
return StringUtils.isNotBlank(identifier) && StringUtils.isNotBlank(streamingUrl);
81+
}
82+
83+
private void updateContentMetadata(String identifier, String streamingUrl) {
84+
logger.debug("Updating content metadata for identifier: {}", identifier);
85+
86+
try {
87+
String url = buildUpdateUrl(identifier);
88+
Map<String, String> headers = createHeaders();
89+
Map<String, Object> requestPayload = buildRequestPayload(streamingUrl);
90+
91+
Map<String, Object> response = outboundRequestHandlerService.fetchResultUsingPatch(url, requestPayload, headers);
92+
93+
handleUpdateResponse(identifier, response);
94+
95+
} catch (Exception e) {
96+
logger.error("Failed to update content metadata for identifier: {}", identifier, e);
97+
}
98+
}
99+
100+
private String buildUpdateUrl(String identifier) {
101+
return serverProperties.getLearningServiceVmBaseUrl() +
102+
serverProperties.getSystemUpdateAPI() +
103+
identifier;
104+
}
105+
106+
private Map<String, String> createHeaders() {
107+
Map<String, String> headers = new HashMap<>();
108+
headers.put(Constants.CONTENT_TYPE, Constants.APPLICATION_JSON);
109+
return headers;
110+
}
111+
112+
private Map<String, Object> buildRequestPayload(String streamingUrl) {
113+
String transformedUrl = streamingUrl.replaceAll(
114+
serverProperties.getVodBucketPrefix(),
115+
serverProperties.getVodStreamUrlPrefix()
116+
);
117+
118+
Map<String, Object> content = new HashMap<>();
119+
content.put(Constants.STREAMING_URL, transformedUrl);
120+
121+
Map<String, Object> contentRequest = new HashMap<>();
122+
contentRequest.put(Constants.CONTENT, content);
123+
124+
Map<String, Object> request = new HashMap<>();
125+
request.put(Constants.REQUEST, contentRequest);
126+
127+
return request;
128+
}
129+
130+
private void handleUpdateResponse(String identifier, Map<String, Object> response) {
131+
if (response == null || response.isEmpty()) {
132+
logger.warn("Received empty response for identifier: {}", identifier);
133+
return;
134+
}
135+
136+
String responseCode = (String) response.get(Constants.RESPONSE_CODE);
137+
138+
if (Constants.OK.equalsIgnoreCase(responseCode)) {
139+
logger.info("Successfully updated metadata for identifier: {}", identifier);
140+
} else {
141+
logger.warn("Failed to update metadata for identifier: {}, response code: {}", identifier, responseCode);
142+
}
143+
}
144+
}

src/main/java/com/igot/cb/service/OutboundRequestHandlerServiceImpl.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package com.igot.cb.service;
22

33
import java.lang.reflect.Type;
4+
import java.util.HashMap;
45
import java.util.Map;
56

7+
import org.apache.commons.collections.MapUtils;
68
import org.springframework.core.ParameterizedTypeReference;
7-
import org.springframework.http.HttpMethod;
8-
import org.springframework.http.ResponseEntity;
9+
import org.springframework.http.*;
910
import org.springframework.stereotype.Service;
11+
import org.springframework.util.CollectionUtils;
1012
import org.springframework.web.client.HttpClientErrorException;
1113
import org.springframework.web.client.RestTemplate;
1214

@@ -92,4 +94,36 @@ public <T> T fetchResultUsingExchange(String uri, ParameterizedTypeReference<T>
9294
}
9395
return null;
9496
}
97+
98+
public Map<String, Object> fetchResultUsingPatch(String uri, Object request, Map<String, String> headersValues) {
99+
Map<String, Object> response = null;
100+
try {
101+
HttpHeaders headers = new HttpHeaders();
102+
if (!CollectionUtils.isEmpty(headersValues)) {
103+
headersValues.forEach((k, v) -> headers.set(k, v));
104+
}
105+
headers.setContentType(MediaType.APPLICATION_JSON);
106+
HttpEntity<Object> entity = new HttpEntity<>(request, headers);
107+
if (log.isDebugEnabled()) {
108+
log.info(uri, request);
109+
}
110+
response = restTemplate.patchForObject(uri, entity, Map.class);
111+
if (log.isDebugEnabled()) {
112+
log.info(uri, response);
113+
}
114+
} catch (HttpClientErrorException e) {
115+
try {
116+
response = (new ObjectMapper()).readValue(e.getResponseBodyAsString(),
117+
new TypeReference<HashMap<String, Object>>() {
118+
});
119+
} catch (Exception e1) {
120+
}
121+
log.error("Error received: " + e.getResponseBodyAsString(), e);
122+
}
123+
if (response == null) {
124+
return MapUtils.EMPTY_MAP;
125+
}
126+
return response;
127+
}
128+
95129
}

src/main/java/com/igot/cb/util/Constants.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,9 @@ public class Constants {
407407
public static final String ATTRIBUTE_NAME = "attributeName";
408408
public static final String VALUES = "values";
409409
public static final String VALUE = "value";
410-
410+
public static final String STREAMING_URL = "streamingUrl";
411+
public static final String APPLICATION_JSON = "application/json";
412+
public static final String STREAMING_URI = "streamUri";
411413

412414
private Constants() {
413415
}

src/main/resources/application.properties

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,19 @@ elasticsearch.port=9201
5555
elasticsearch.username=
5656
elasticsearch.password=
5757
cb.plan.v2.index=cb_plan_v2
58+
59+
learning_service_vm_base_url=http://localhost:8080/learning-service
60+
system.content.update.url=/system/v3/content/update/
61+
spring.kafka.content.metadata.update.topic.name=dev.vod.content.meta.update
62+
spring.kafka.content.metadata.update.consumer.group.id=vodContentMetaUpdateConsumerGroup
63+
vod.bucket.prefix=gs://igotuatvideostream/
64+
vod.stream.url.prefix=https://portal.uat.karmayogibharat.net/stream-store/
65+
66+
67+
#--------------------------kafka Server
68+
spring.kafka.bootstrap.servers=localhost:9092
69+
#-----------------kafka properties
70+
kakfa.offset.reset.value=latest
71+
kafka.max.poll.interval.ms=15000
72+
kafka.max.poll.records=100
73+
kafka.auto.commit.interval.ms=10000

0 commit comments

Comments
 (0)