Skip to content

Commit 4029146

Browse files
Sahil-tarentoKarthikeyan R
andauthored
4.8.35 cache optimization (#274)
* Optimisation fix (#273) * Optimization for cache --------- Co-authored-by: Karthikeyan R <karthikeyanr@TI-MAC-118.local>
1 parent e344b62 commit 4029146

File tree

10 files changed

+214
-143
lines changed

10 files changed

+214
-143
lines changed

src/main/java/com/igot/cb/cache/AccessSettingRuleCacheMgr.java

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.time.Duration;
44
import java.util.*;
55
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.atomic.AtomicInteger;
67
import java.util.stream.Collectors;
78

89
import com.github.benmanes.caffeine.cache.Cache;
@@ -40,10 +41,16 @@ public class AccessSettingRuleCacheMgr {
4041
@Value("${access.rule.ttl.minutes}")
4142
private int ttlMinutes;
4243

44+
@Value("${access.setting.rules.caffine.cache.max.size:5000}")
45+
private int maxCacheSize;
46+
47+
@Value("${access.settings.cache.batch.size:500}")
48+
private int accessSettingsCacheBatchSize;
49+
4350
@PostConstruct
4451
public void initCache() {
4552
accessSettingsCache = Caffeine.newBuilder()
46-
.maximumSize(1000)
53+
.maximumSize(maxCacheSize)
4754
.expireAfterWrite(Duration.ofMinutes(ttlMinutes))
4855
.build();
4956
}
@@ -109,41 +116,39 @@ private void loadAccessSettingRules() {
109116
Map.Entry::getKey,
110117
entry -> new CachedAccessSettingRule(entry.getValue())));
111118
} else {
112-
List<Map<String, Object>> accessSettingRuleMapList = cassandraOperation.getRecordsByProperties(
113-
Constants.KEYSPACE_SUNBIRD_COURSE, Constants.ACCESS_SETTINGS_RULES_TABLE_V2, null,
114-
null, null);
115-
cachedAccessSettingRules = accessSettingRuleMapList.stream()
116-
.map(record -> new CachedAccessSettingRule(
117-
(String) record.get("contextId"),
118-
(String) record.get("contextIdType"),
119-
(String) record.get("contextData"),
120-
false))
121-
.collect(Collectors.toMap(
122-
CachedAccessSettingRule::getCacheKey,
123-
rule -> rule));
124-
// Cache the rules in Redis
125-
for (CachedAccessSettingRule rule : cachedAccessSettingRules.values()) {
126-
127-
try {
128-
129-
Map<String, Object> contextData = rule.getContextData();
130-
if (contextData == null) {
131-
log.warn("No contextData found for rule: {}", rule.getCacheKey());
132-
continue;
133-
}
134-
135-
// Call the new method for processing
136-
processContextData(rule.getCacheKey(), contextData);
137-
cachedAccessSettingRules.put(rule.getCacheKey(), rule);
138-
139-
// Finally, push the raw contextData to Redis
140-
redisCacheMgr.setAccessSettingRuleCache(ACCESS_SETTINGS_CACHE_KEY, rule.getCacheKey(),
141-
contextData);
142-
143-
} catch (Exception e) {
144-
log.error("Error processing rule {}", rule.getCacheKey(), e);
145-
}
146-
}
119+
cachedAccessSettingRules = new ConcurrentHashMap<>();
120+
AtomicInteger processedCount = new AtomicInteger();
121+
cassandraOperation.forEachRecordByProperties(
122+
Constants.KEYSPACE_SUNBIRD_COURSE,
123+
Constants.ACCESS_SETTINGS_RULES_TABLE_V2,
124+
null,
125+
null,
126+
accessSettingsCacheBatchSize,
127+
null,
128+
record -> {
129+
try {
130+
CachedAccessSettingRule rule = new CachedAccessSettingRule(
131+
(String) record.get("contextId"),
132+
(String) record.get("contextIdType"),
133+
(String) record.get("contextData"),
134+
Boolean.TRUE.equals(record.get("isArchived")));
135+
Map<String, Object> contextData = rule.getContextData();
136+
if (contextData == null) {
137+
log.warn("No contextData found for rule: {}", rule.getCacheKey());
138+
return;
139+
}
140+
processContextData(rule.getCacheKey(), contextData);
141+
cachedAccessSettingRules.put(rule.getCacheKey(), rule);
142+
redisCacheMgr.setAccessSettingRuleCache(
143+
ACCESS_SETTINGS_CACHE_KEY,
144+
rule.getCacheKey(),
145+
contextData);
146+
processedCount.incrementAndGet();
147+
} catch (Exception e) {
148+
log.error("Error processing access setting rule record: {}", record, e);
149+
}
150+
});
151+
log.info("Processed {} access setting rules from Cassandra in paged mode", processedCount.get());
147152
}
148153
log.info("Access setting rules loaded into cache successfully. Number of rules loaded: {}",
149154
cachedAccessSettingRules.size());

src/main/java/com/igot/cb/cache/CbPlanCacheMgr.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,18 @@ public class CbPlanCacheMgr {
3737

3838
private final CassandraOperation cassandraOperation;
3939
private Cache<String, List<Map<String, Object>>> cbPlanCache;
40+
private Cache<String, Map<String, Object>> planIdCache;
4041

4142
@PostConstruct
4243
public void initCache() {
4344
this.cbPlanCache = Caffeine.newBuilder()
4445
.maximumSize(maxCacheSize)
4546
.expireAfterWrite(Duration.ofMinutes(ttlMinutes))
4647
.build();
48+
this.planIdCache = Caffeine.newBuilder()
49+
.maximumSize(maxCacheSize)
50+
.expireAfterWrite(Duration.ofMinutes(ttlMinutes))
51+
.build();
4752
}
4853

4954
public CbPlanCacheMgr(CassandraOperation cassandraOperation) {
@@ -141,6 +146,13 @@ public List<Map<String, Object>> getCbPlanForAllAndOrgId(String orgId, AtomicBoo
141146

142147
activeCbPlans = existingCbPlans.stream()
143148
.filter(plan -> Constants.LIVE.equalsIgnoreCase((String) plan.get(Constants.STATUS))).collect(Collectors.toList());
149+
// Prime the planIdCache
150+
for (Map<String, Object> plan : existingCbPlans) {
151+
String id = (String) plan.get(Constants.PLAN_ID);
152+
if (id != null) {
153+
planIdCache.put(id, plan);
154+
}
155+
}
144156
//TODO - Need to remove draftData (if available) and also contextData.accessControl
145157
log.info("Found {} CB Plans for orgId: {}, active count: {}", existingCbPlans.size(), orgId, activeCbPlans.size());
146158
cbPlanCache.put(orgId, activeCbPlans);
@@ -156,12 +168,26 @@ public List<Map<String, Object>> getCbPlansByPlanIdsInBatch(List<String> planIds
156168
return allCbPlans;
157169
}
158170

159-
log.info("Fetching CB Plan details for {} plan IDs in batches of 5", planIds.size());
171+
List<String> missingPlanIds = new ArrayList<>();
172+
for (String planId : planIds) {
173+
Map<String, Object> cachedPlan = planIdCache.getIfPresent(planId);
174+
if (cachedPlan != null) {
175+
allCbPlans.add(cachedPlan);
176+
} else {
177+
missingPlanIds.add(planId);
178+
}
179+
}
180+
181+
if (missingPlanIds.isEmpty()) {
182+
log.info("Full cache hit for {} plan IDs", planIds.size());
183+
return allCbPlans;
184+
}
160185

161-
// Process in batches of 5
186+
log.info("Fetching CB Plan details for {} missing plan IDs from Cassandra in batches of {}", missingPlanIds.size(), planBatchSize);
162187

163-
for (int i = 0; i < planIds.size(); i += planBatchSize) {
164-
List<String> batch = planIds.subList(i, Math.min(i + planBatchSize, planIds.size()));
188+
// Process in batches
189+
for (int i = 0; i < missingPlanIds.size(); i += planBatchSize) {
190+
List<String> batch = missingPlanIds.subList(i, Math.min(i + planBatchSize, missingPlanIds.size()));
165191

166192
Map<String, Object> propertiesMap = new HashMap<>();
167193
propertiesMap.put(Constants.PLAN_ID, batch);
@@ -177,6 +203,12 @@ public List<Map<String, Object>> getCbPlansByPlanIdsInBatch(List<String> planIds
177203

178204
if (CollectionUtils.isNotEmpty(batchResult)) {
179205
allCbPlans.addAll(batchResult);
206+
for (Map<String, Object> plan : batchResult) {
207+
String id = (String) plan.get(Constants.PLAN_ID);
208+
if (id != null) {
209+
planIdCache.put(id, plan);
210+
}
211+
}
180212
log.info("Fetched {} records for plan IDs batch: {}", batchResult.size(), batch);
181213
} else {
182214
log.warn("No records found for plan IDs batch: {}", batch);

src/main/java/com/igot/cb/cache/PromotionalContentRuleCacheMgr.java

Lines changed: 33 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.util.*;
1515
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.atomic.AtomicInteger;
1617

1718
/**
1819
* Cache manager for promotional content access rules.
@@ -76,10 +77,9 @@ public Collection<CachedAccessSettingRule> getAccessSettingRules() {
7677

7778
/**
7879
* Loads access rules from Cassandra into indexed cache.
79-
* Filters out archived records at database level.
80-
* Implements pagination to fetch all records in batches for better performance.
81-
* Uses configurable batch size for each query and max query size as upper limit.
82-
* Optimized with Java 17 parallel streams for multi-core CPU utilization.
80+
* Uses Cassandra driver paging to fetch rows in batches.
81+
* Applies configurable max query size as an upper bound for rows processed.
82+
* Filters out archived records during row processing.
8383
*/
8484
private void loadAccessSettingRules() {
8585
int batchSize = properties.getPromotionalContentCacheBatchSize();
@@ -89,64 +89,42 @@ private void loadAccessSettingRules() {
8989
batchSize, maxQuerySize);
9090
cacheMap.clear();
9191
try {
92-
List<Map<String, Object>> allRecords = new ArrayList<>();
93-
int totalFetched = 0;
94-
int pageNumber = 1;
95-
boolean hasMoreRecords = true;
96-
while (hasMoreRecords && totalFetched < maxQuerySize) {
97-
log.info("Fetching page {} with batch size {}", pageNumber, batchSize);
98-
List<Map<String, Object>> pageRecords = cassandraOperation.getRecordsByProperties(
99-
Constants.KEYSPACE_SUNBIRD_COURSE,
100-
Constants.PROMOTIONAL_CONTENT_RULES,
101-
null,
102-
null,
103-
batchSize);
104-
if (CollectionUtils.isEmpty(pageRecords)) {
105-
log.info("No more records found on page {}", pageNumber);
106-
hasMoreRecords = false;
107-
} else {
108-
allRecords.addAll(pageRecords);
109-
totalFetched += pageRecords.size();
110-
log.info("Fetched {} records on page {}, total so far: {}",
111-
pageRecords.size(), pageNumber, totalFetched);
92+
AtomicInteger scannedCount = new AtomicInteger();
93+
AtomicInteger archivedCount = new AtomicInteger();
11294

113-
if (pageRecords.size() < batchSize) {
114-
log.info("Received fewer records than batch size. Pagination complete.");
115-
hasMoreRecords = false;
116-
} else {
117-
pageNumber++;
118-
hasMoreRecords = false;
119-
log.warn("Cassandra query returned full batch size. There may be more records, " +
120-
"but pagination token is not supported. Consider increasing batch size.");
121-
}
122-
}
123-
}
124-
if (totalFetched >= maxQuerySize) {
125-
log.warn("Reached maximum query size limit of {}. There may be more records in the database. " +
126-
"Consider increasing promotional.content.cache.max.query.size", maxQuerySize);
127-
}
128-
if (allRecords.isEmpty()) {
129-
log.warn("No access setting rules found in database");
130-
return;
131-
}
132-
log.info("Total records fetched: {}, processing with parallel streams...", allRecords.size());
133-
allRecords.parallelStream()
134-
.filter(rec -> {
135-
Boolean isArchived = (Boolean) rec.get(Constants.IS_ARCHIVED_KEY);
136-
return !isArchived;
137-
})
138-
.map(rec -> new CachedAccessSettingRule(
139-
(String) rec.get("contextId"),
140-
(String) rec.get("contextIdType"),
141-
(String) rec.get("contextData"),
142-
false))
143-
.forEach(rule -> {
95+
cassandraOperation.forEachRecordByProperties(
96+
Constants.KEYSPACE_SUNBIRD_COURSE,
97+
Constants.PROMOTIONAL_CONTENT_RULES,
98+
null,
99+
null,
100+
batchSize,
101+
maxQuerySize,
102+
rec -> {
103+
scannedCount.incrementAndGet();
104+
if (Boolean.TRUE.equals(rec.get(Constants.IS_ARCHIVED_KEY))) {
105+
archivedCount.incrementAndGet();
106+
return;
107+
}
108+
CachedAccessSettingRule rule = new CachedAccessSettingRule(
109+
(String) rec.get("contextId"),
110+
(String) rec.get("contextIdType"),
111+
(String) rec.get("contextData"),
112+
false);
144113
processAndCacheRule(rule);
145114
cacheMap.put(rule.getCacheKey(), rule);
146115
});
116+
117+
if (scannedCount.get() == 0) {
118+
log.warn("No access setting rules found in database");
119+
return;
120+
}
121+
if (maxQuerySize > 0 && scannedCount.get() >= maxQuerySize) {
122+
log.warn("Reached maximum query size limit of {} while loading promotional rules", maxQuerySize);
123+
}
147124
long totalProcessed = cacheMap.size();
148125
log.info("Promotional Content rules loaded into cache successfully. Total rules loaded: {}",
149126
totalProcessed);
127+
log.info("Total scanned: {}, archived skipped: {}", scannedCount.get(), archivedCount.get());
150128
} catch (Exception e) {
151129
log.error("Failed to load Promotional Content rules into Cache. Exception: ", e);
152130
}

src/main/java/com/igot/cb/cassandra/CassandraOperation.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import java.util.List;
66
import java.util.Map;
7+
import java.util.function.Consumer;
78

89
/**
910
* @author Mahesh RV
@@ -52,4 +53,10 @@ Object insertRecord(
5253
String primaryKeyValue,
5354
Map<String, Object> compositeKey,
5455
Map<String, Object> otherFields);
56+
57+
void forEachRecordByProperties(String keyspaceName, String tableName,
58+
Map<String, Object> propertyMap, List<String> fields,
59+
Integer pageSize, Integer maxRows, Consumer<Map<String, Object>> rowConsumer);
60+
61+
5562
}

src/main/java/com/igot/cb/cassandra/CassandraOperationImpl.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.stereotype.Component;
2323

2424
import java.util.*;
25+
import java.util.function.Consumer;
2526
import java.util.stream.Collectors;
2627

2728

@@ -241,4 +242,32 @@ public Object insertRecord(
241242
return response;
242243
}
243244

245+
@Override
246+
public void forEachRecordByProperties(String keyspaceName, String tableName,
247+
Map<String, Object> propertyMap, List<String> fields,
248+
Integer pageSize, Integer maxRows,
249+
Consumer<Map<String, Object>> rowConsumer) {
250+
try {
251+
Select selectQuery = processQuery(keyspaceName, tableName, propertyMap, fields);
252+
SimpleStatementBuilder statementBuilder = SimpleStatement.builder(selectQuery.build());
253+
if (pageSize != null && pageSize > 0) {
254+
statementBuilder.setPageSize(pageSize);
255+
}
256+
ResultSet results = connectionManager.getSession(keyspaceName).execute(statementBuilder.build());
257+
Map<String, String> columnsMapping = CassandraUtil.fetchColumnsMapping(results);
258+
int rowsProcessed = 0;
259+
for (Row row : results) {
260+
if (maxRows != null && maxRows > 0 && rowsProcessed >= maxRows) {
261+
break;
262+
}
263+
Map<String, Object> rowMap = new HashMap<>();
264+
columnsMapping.forEach((key, value) -> rowMap.put(key, row.getObject(value)));
265+
rowConsumer.accept(rowMap);
266+
rowsProcessed++;
267+
}
268+
} catch (Exception e) {
269+
log.error("Error iterating records from {}: {}", tableName, e.getMessage(), e);
270+
}
271+
}
272+
244273
}

src/main/java/com/igot/cb/service/CourseAccessServiceImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,8 +525,9 @@ public ApiResponse getAssignedExternalCoursesForUser(Map<String,Object> request,
525525

526526
private List<String> getCoursesFromCacheOrServiceForExternalCourse(String partnerId) {
527527
try {
528-
List<String> cachedCourses = courseCategoryCache.get(partnerId);
529-
Long lastUpdated = cacheTimestamps.get(partnerId);
528+
String cacheKey = "access_settings_enabled_" + partnerId;
529+
List<String> cachedCourses = courseCategoryCache.get(cacheKey);
530+
Long lastUpdated = cacheTimestamps.get(cacheKey);
530531
boolean isCacheValid = lastUpdated != null &&
531532
(System.currentTimeMillis() - lastUpdated) < cacheTtlMs;
532533

@@ -538,8 +539,8 @@ private List<String> getCoursesFromCacheOrServiceForExternalCourse(String partne
538539
log.info("Cache miss or expired for category: {}, fetching from service", partnerId);
539540
List<String> fetchedCourses = fetchAccessSettingsEnabledCoursesForExternalCourses(partnerId);
540541
if (!fetchedCourses.isEmpty()) {
541-
courseCategoryCache.put("access_settings_enabled_" + partnerId, fetchedCourses);
542-
cacheTimestamps.put(partnerId, System.currentTimeMillis());
542+
courseCategoryCache.put(cacheKey, fetchedCourses);
543+
cacheTimestamps.put(cacheKey, System.currentTimeMillis());
543544
log.info("Cached {} course identifiers for partnerId {}", fetchedCourses.size(), partnerId);
544545
return fetchedCourses;
545546
} else {

src/main/resources/application.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,6 @@ cbpores.service.host=http://localhost:7001
115115
content.service.retire.url=/content/v4/retire
116116
content.retirement.notification.subject=Important: {0} Will Be Retired on {1}
117117

118+
access.setting.rules.caffine.cache.max.size=10000
119+
access.settings.cache.batch.size=500
120+

0 commit comments

Comments
 (0)