Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions src/main/java/com/igot/cb/cache/AccessSettingRuleCacheMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.igot.cb.cassandra.CassandraOperation;
import jakarta.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

Expand All @@ -30,8 +33,6 @@ public class AccessSettingRuleCacheMgr {
private final CassandraOperation cassandraOperation;
private Map<String, CachedAccessSettingRule> cachedAccessSettingRules = new ConcurrentHashMap<>();

private Cache<String, CachedAccessSettingRule> accessSettingsCache;


private final long LOCAL_CACHE_TTL = 3600000;

Expand All @@ -42,10 +43,6 @@ public class AccessSettingRuleCacheMgr {

@PostConstruct
public void initCache() {
accessSettingsCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(Duration.ofMinutes(ttlMinutes))
.build();
}


Expand Down Expand Up @@ -231,10 +228,35 @@ BitSet createBitSetForAttribute(Collection<Integer> attributeValues) {

public CachedAccessSettingRule getOrLoadAccessSettingRule(String courseId, String contextId) {
String cacheKey = courseId + "|" + contextId;
CachedAccessSettingRule cachedRule = accessSettingsCache.getIfPresent(cacheKey);
if (cachedRule != null) {
log.debug("Cache hit for rule key: {}", cacheKey);
return cachedRule;
// Use Redis for caching instead of in-memory cache
String cachedData = redisCacheMgr.getFromCache(cacheKey);
if (StringUtils.isNotBlank(cachedData)) {
try {
// Try to parse as Map first (since Redis stores the full object as JSON)
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> ruleMap = mapper.readValue(cachedData, new TypeReference<Map<String, Object>>() {});
String contextIdVal = (String) ruleMap.getOrDefault(Constants.CONTEXT_ID_KEY, ruleMap.get(Constants.CONTEXT_ID_KEY));
String contextIdTypeVal = (String) ruleMap.getOrDefault(Constants.CONTEXT_ID_TYPE, ruleMap.get(Constants.CONTEXT_ID_TYPE));
Object contextDataObj = ruleMap.get(Constants.CONTEXT_DATA_KEY);
String contextDataStr;
if (contextDataObj instanceof String) {
contextDataStr = (String) contextDataObj;
} else if (contextDataObj != null) {
contextDataStr = mapper.writeValueAsString(contextDataObj);
} else {
contextDataStr = "{}";
}
boolean isArchived = false;
if (ruleMap.containsKey(Constants.IS_ARCHIVED)) {
isArchived = Boolean.TRUE.equals(ruleMap.get(Constants.IS_ARCHIVED));
}
CachedAccessSettingRule cachedRule = new CachedAccessSettingRule(
contextIdVal, contextIdTypeVal, contextDataStr, isArchived);
log.debug("Cache hit for rule key: {} from Redis", cacheKey);
return cachedRule;
} catch (Exception e) {
log.error("Failed to parse cached rule from Redis for key {}: {}", cacheKey, e.getMessage(), e);
}
}
log.info("Cache miss for rule key: {}, loading from Cassandra...", cacheKey);
try {
Expand All @@ -253,22 +275,20 @@ public CachedAccessSettingRule getOrLoadAccessSettingRule(String courseId, Strin
log.warn("No access setting rule found in Cassandra for key: {}", cacheKey);
return null;
}
Map<String, Object> r = records.get(0);
Map<String, Object> ruleMap = records.get(0);
CachedAccessSettingRule loadedRule = new CachedAccessSettingRule(
(String) r.get(Constants.CONTEXT_ID_KEY),
(String) r.get(Constants.CONTEXT_ID_TYPE),
(String) r.get(Constants.CONTEXT_DATA_KEY),
false
(String) ruleMap.get(Constants.CONTEXT_ID_KEY),
(String) ruleMap.get(Constants.CONTEXT_ID_TYPE),
(String) ruleMap.get(Constants.CONTEXT_DATA_KEY),
(Boolean) ruleMap.get(Constants.IS_ARCHIVED_KEY)
);
try {
Map<String, Object> contextData = loadedRule.getContextData();
if (MapUtils.isNotEmpty(contextData)) {
processContextData(cacheKey, contextData);
}
accessSettingsCache.put(cacheKey, loadedRule);
log.info("Loaded and cached rule for key: {}", cacheKey);
// Store in Redis for future requests
String json = new ObjectMapper().writeValueAsString(ruleMap);
redisCacheMgr.putInCache(cacheKey, json, ttlMinutes); // TTL 1 hour, adjust as needed
log.info("Loaded and cached rule for key: {} in Redis", cacheKey);
} catch (Exception e) {
log.error("Error processing rule {}: {}", cacheKey, e.getMessage(), e);
log.error("Error serializing rule {}: {}", cacheKey, e.getMessage(), e);
}
return loadedRule;
} catch (Exception e) {
Expand Down
228 changes: 155 additions & 73 deletions src/main/java/com/igot/cb/cache/CbPlanCacheMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.igot.cb.cassandra.CassandraOperation;
Expand All @@ -35,29 +37,50 @@ public class CbPlanCacheMgr {
@Value("${cb.plan.caffine.cache.max.size:5000}")
private int maxCacheSize;

@Value("${cbplan.client.cache.ttl.seconds:3600}")
private int clientCacheTtlSeconds;

private final CassandraOperation cassandraOperation;
private Cache<String, List<Map<String, Object>>> cbPlanCache;
private final RedisCacheMgr redisCacheMgr;
private final ObjectMapper objectMapper = new ObjectMapper();
// Store only active plan IDs in cache, not full plan objects
private Cache<String, List<String>> cbPlanIdCache;

public CbPlanCacheMgr(CassandraOperation cassandraOperation, RedisCacheMgr redisCacheMgr) {
this.cassandraOperation = cassandraOperation;
this.redisCacheMgr = redisCacheMgr;
}

@PostConstruct
public void initCache() {
this.cbPlanCache = Caffeine.newBuilder()
this.cbPlanIdCache = Caffeine.newBuilder()
.maximumSize(maxCacheSize)
.expireAfterWrite(Duration.ofMinutes(ttlMinutes))
.build();
}

public CbPlanCacheMgr(CassandraOperation cassandraOperation) {
this.cassandraOperation = cassandraOperation;
}

private List<Map<String, Object>> getCbPlanForAll() {
String redisCacheKey = "all-lookup";
List<Map<String, Object>> allCbPlanList = cbPlanCache.getIfPresent(redisCacheKey);
if (allCbPlanList == null) {
log.info("No CB Plans for all orgs in Cache, reading from Cassandra");
/**
* Fetches only active plan IDs for all orgs and caches them.
*/
private List<String> getActivePlanIdsForAll() {
String cacheKey = "all-lookup";
// Use the same key for Redis and Caffeine
List<String> planIds = null;
String cached = redisCacheMgr.getFromCache(cacheKey);
if (cached != null) {
try {
planIds = objectMapper.readValue(cached, new TypeReference<List<String>>() {});
log.info("Cache hit for all orgs in Redis: Found {} plan IDs", planIds.size());
} catch (Exception e) {
log.warn("Failed to parse plan IDs from Redis for all orgs: {}", e.getMessage());
}
}
if (CollectionUtils.isEmpty(planIds)) {
planIds = null;
log.info("No CB Plan IDs for all orgs in Redis, reading from Cassandra");
Map<String, Object> propertiesMap = new HashMap<>();
propertiesMap.put(Constants.PLAN_YEAR, "ALL");
allCbPlanList = cassandraOperation.getRecordsByProperties(
List<Map<String, Object>> allCbPlanList = cassandraOperation.getRecordsByProperties(
Constants.KEYSPACE_SUNBIRD,
Constants.TABLE_CB_PLAN_V2_LOOKUP_BY_ALL_ORG,
propertiesMap,
Expand All @@ -66,88 +89,147 @@ private List<Map<String, Object>> getCbPlanForAll() {
if (allCbPlanList == null) {
allCbPlanList = new ArrayList<>();
}
allCbPlanList = allCbPlanList.stream()
.filter(plan -> Boolean.TRUE.equals(plan.get(Constants.IS_ACTIVE))).collect(Collectors.toList());
cbPlanCache.put(redisCacheKey, allCbPlanList);
planIds = allCbPlanList.stream()
.filter(plan -> Boolean.TRUE.equals(plan.get(Constants.IS_ACTIVE)))
.map(plan -> (String) plan.get(Constants.PLAN_ID))
.collect(Collectors.toList());
// Cache in Redis for all orgs
try {
redisCacheMgr.putInCache(cacheKey, objectMapper.writeValueAsString(planIds), clientCacheTtlSeconds);
} catch (Exception e) {
log.warn("Failed to cache plan IDs in Redis for all orgs: {}", e.getMessage());
}
// Optionally update Caffeine for legacy/local fallback
cbPlanIdCache.put(cacheKey, planIds);
} else {
log.info("Cache hit for all orgs: Found {} records", allCbPlanList.size());
log.info("Cache hit for all orgs: Found {} plan IDs", planIds.size());
}
return allCbPlanList;
return planIds;
}

private List<Map<String, Object>> getCbPlanForOrgId(String orgId) {
String redisCacheKey = orgId + "-lookup";
List<Map<String, Object>> cbPlanList = cbPlanCache.getIfPresent(redisCacheKey);

if (cbPlanList == null) {
log.info("No CB Plans for orgId in Cache: {}, reading from Cassandra", orgId);
Map<String, Object> propertiesMap = new HashMap<>();
propertiesMap.put(Constants.ORG_ID, orgId);
cbPlanList = cassandraOperation.getRecordsByProperties(
Constants.KEYSPACE_SUNBIRD,
Constants.TABLE_CB_PLAN_V2_LOOKUP_BY_ORG,
propertiesMap,
new ArrayList<>(),
null);
if (cbPlanList == null) {
cbPlanList = new ArrayList<>();
/**
* Fetches only active plan IDs for a specific org and caches them in Redis and Caffeine.
*/
private List<String> getActivePlanIdsForOrgId(String orgId) {
String redisKey = "cbplan_ids_" + orgId;
// Try Redis first
String cached = redisCacheMgr.getFromCache(redisKey);
if (cached != null) {
try {
List<String> planIds = objectMapper.readValue(cached, new TypeReference<List<String>>() {});
// Only use Redis, do not update local Caffeine cache
return planIds;
} catch (Exception e) {
log.warn("Failed to parse plan IDs from Redis for orgId {}: {}", orgId, e.getMessage());
}
cbPlanList = cbPlanList.stream()
.filter(plan -> Boolean.TRUE.equals(plan.get(Constants.IS_ACTIVE))).collect(Collectors.toList());
cbPlanCache.put(redisCacheKey, cbPlanList);
cbPlanList.addAll(getCbPlanForAll());
} else {
log.info("Cache hit for orgId: {}, Found {} records", orgId, cbPlanList.size());
}

return cbPlanList;
// Fallback to Cassandra
log.info("No CB Plan IDs for orgId in Redis: {}, reading from Cassandra", orgId);
Map<String, Object> propertiesMap = new HashMap<>();
propertiesMap.put(Constants.ORG_ID, orgId);
// Batchwise fetch using planIds list, similar to getCbPlansByPlanIdsInBatch
List<String> allPlanIds = new ArrayList<>();
List<String> orgPlanIds = new ArrayList<>();
// First, fetch all plan IDs for the org in one go (using offset/limit if needed)
// (Assume propertiesMap is set up with orgId)
List<Map<String, Object>> allLookupRows = new ArrayList<>();
int fetchBatchSize = planBatchSize;
int offset = 0;
boolean moreRecords = true;
while (moreRecords) {
Map<String, Object> batchProps = new HashMap<>(propertiesMap);
batchProps.put("offset", offset);
batchProps.put("limit", fetchBatchSize);
List<Map<String, Object>> cbPlanList = cassandraOperation.getRecordsByProperties(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please validate if this is fetching the proper impl

Constants.KEYSPACE_SUNBIRD,
Constants.TABLE_CB_PLAN_V2_LOOKUP_BY_ORG,
batchProps,
new ArrayList<>(),
null);
if (cbPlanList == null || cbPlanList.isEmpty()) {
moreRecords = false;
} else {
allLookupRows.addAll(cbPlanList);
offset += fetchBatchSize;
if (cbPlanList.size() < fetchBatchSize) {
moreRecords = false;
}
}
}
// Now, collect all plan IDs from the lookup rows
orgPlanIds = allLookupRows.stream()
.filter(plan -> Boolean.TRUE.equals(plan.get(Constants.IS_ACTIVE)))
.map(plan -> (String) plan.get(Constants.PLAN_ID))
.collect(Collectors.toList());
// Add 'all' org plans as well
allPlanIds.addAll(orgPlanIds);
allPlanIds.addAll(getActivePlanIdsForAll());
// Cache in Redis only
try {
redisCacheMgr.putInCache(redisKey, objectMapper.writeValueAsString(allPlanIds), clientCacheTtlSeconds);
} catch (Exception e) {
log.warn("Failed to cache plan IDs in Redis for orgId {}: {}", orgId, e.getMessage());
}
// Do not update Caffeine cache
return allPlanIds;
}

/**
* Public API: Returns full plan details for org, using only plan IDs from cache, fetching details in batch from Cassandra.
*/
public List<Map<String, Object>> getCbPlanForAllAndOrgId(String orgId, AtomicBoolean isCacheEnabled) {
List<Map<String, Object>> activeCbPlans = cbPlanCache.getIfPresent(orgId);
if (CollectionUtils.isNotEmpty(activeCbPlans)) {
log.info("Cache hit for orgId: {}, Found {} active CB Plans", orgId, activeCbPlans.size());
isCacheEnabled.set(true);
return activeCbPlans;
// Try Redis first for org-specific plan IDs
List<String> planIds = null;
String redisKey = "cbplan_ids_" + orgId;
String cached = redisCacheMgr.getFromCache(redisKey);
if (cached != null) {
try {
planIds = objectMapper.readValue(cached, new TypeReference<List<String>>() {});
log.info("Cache hit for orgId: {}, Found {} active CB Plan IDs in Redis", orgId, planIds.size());
isCacheEnabled.set(true);
} catch (Exception e) {
log.warn("Failed to parse plan IDs from Redis for orgId {}: {}", orgId, e.getMessage());
}
}
if (CollectionUtils.isEmpty(planIds)) {
planIds = getActivePlanIdsForOrgId(orgId);
if (planIds.isEmpty()) {
log.info("No CB Plan IDs found for orgId: {}", orgId);
return new ArrayList<>();
}
}
List<Map<String, Object>> cbPlanList = getCbPlanForOrgId(orgId);
// Fetch plan details in batch using only plan IDs (primary keys)
List<Map<String, Object>> cbPlanList = getCbPlansByPlanIdsInBatch(planIds);
if (cbPlanList.isEmpty()) {
log.info("No CB Plans found for orgId: {}", orgId);
cbPlanList = new ArrayList<>();
cbPlanCache.put(orgId, cbPlanList);
return cbPlanList;
log.info("No CB Plans found for orgId: {} after batch fetch", orgId);
return new ArrayList<>();
}
cbPlanList = cbPlanList.stream()
cbPlanList = cbPlanList.stream()
.filter(m -> m.get(Constants.END_DATE_REQUEST) != null)
.sorted(Comparator.comparing(
m -> (Instant) m.get(Constants.END_DATE_REQUEST),
Comparator.reverseOrder()
))
.collect(Collectors.toList());
List<String> planIds = cbPlanList.stream()
.map(plan -> (String) plan.get(Constants.PLAN_ID)).collect(Collectors.toList());
Map<String, Object> propertiesMap = new HashMap<>();
propertiesMap.put(Constants.PLAN_ID, planIds);
List<Map<String, Object>> existingCbPlans = cassandraOperation.getRecordsByProperties(
Constants.KEYSPACE_SUNBIRD,
Constants.TABLE_CB_PLAN_V2,
propertiesMap,
new ArrayList<>(),
null);
if (existingCbPlans == null) {
log.error("Failed to read cassandra for cb plan, for PlanIds: {}", planIds);
return new ArrayList<>();
}

activeCbPlans = existingCbPlans.stream()
.filter(plan -> Constants.LIVE.equalsIgnoreCase((String) plan.get(Constants.STATUS))).collect(Collectors.toList());
//TODO - Need to remove draftData (if available) and also contextData.accessControl
log.info("Found {} CB Plans for orgId: {}, active count: {}", existingCbPlans.size(), orgId, activeCbPlans.size());
cbPlanCache.put(orgId, activeCbPlans);
isCacheEnabled.set(true);
// Only return LIVE plans
List<Map<String, Object>> activeCbPlans = cbPlanList.stream()
.filter(plan -> Constants.LIVE.equalsIgnoreCase((String) plan.get(Constants.STATUS)))
.collect(Collectors.toList());
log.info("Found {} CB Plans for orgId: {}, active count: {}", cbPlanList.size(), orgId, activeCbPlans.size());
return activeCbPlans;
}

/**
* Public API: Returns plan IDs and TTL for org, for client-side caching.
*/
public Map<String, Object> getPlanIdsAndTtlForOrg(String orgId) {
List<String> planIds = getActivePlanIdsForOrgId(orgId);
Map<String, Object> result = new HashMap<>();
result.put("planIds", planIds);
result.put("cacheTtlSeconds", clientCacheTtlSeconds);
return result;
}

public List<Map<String, Object>> getCbPlansByPlanIdsInBatch(List<String> planIds) {
List<Map<String, Object>> allCbPlans = new ArrayList<>();

Expand Down
Loading