Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions app/commons/JedisFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,33 @@
public class JedisFactory {

private static JedisPool jedisPool;
private static boolean redisEnabled = AppConfig.getBoolean("redis.cache.enabled", false);
Comment thread
aimansharief marked this conversation as resolved.

private static int maxConnections = 128;
private static String host = "localhost";
private static int port = 6379;
private static int index = 0;

static {
if (AppConfig.config.hasPath("redis.host")) host = AppConfig.config.getString("redis.host");
if (AppConfig.config.hasPath("redis.port")) port = AppConfig.config.getInt("redis.port");
if (AppConfig.config.hasPath("redis.maxConnections")) maxConnections = AppConfig.config.getInt("redis.maxConnections");
if (AppConfig.config.hasPath("redis.dbIndex")) index = AppConfig.config.getInt("redis.dbIndex");
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxConnections);
config.setBlockWhenExhausted(true);
jedisPool = new JedisPool(config, host, port);
if (redisEnabled) {
if (AppConfig.config.hasPath("redis.host")) host = AppConfig.config.getString("redis.host");
if (AppConfig.config.hasPath("redis.port")) port = AppConfig.config.getInt("redis.port");
if (AppConfig.config.hasPath("redis.maxConnections")) maxConnections = AppConfig.config.getInt("redis.maxConnections");
if (AppConfig.config.hasPath("redis.dbIndex")) index = AppConfig.config.getInt("redis.dbIndex");
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxConnections);
config.setBlockWhenExhausted(true);
jedisPool = new JedisPool(config, host, port);
}
}

public static boolean isEnabled() {
return redisEnabled;
}

public static Jedis getRedisConncetion() {
if (!redisEnabled)
throw new ServerException(DialCodeEnum.ERR_CACHE_CONNECTION_ERROR.name(), "Redis cache is disabled.");
try {
Jedis jedis = jedisPool.getResource();
if (index > 0)
Expand All @@ -38,11 +47,12 @@ public static Jedis getRedisConncetion() {
}

public static void returnConnection(Jedis jedis) {
if (!redisEnabled) return;
try {
if (null != jedis)
jedisPool.returnResource(jedis);
} catch (Exception e) {
throw new ServerException(DialCodeEnum.ERR_CACHE_CONNECTION_ERROR.name(), e.getMessage());
}
}
}
}
39 changes: 39 additions & 0 deletions app/dbstore/SystemConfigStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
*/
package dbstore;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import commons.AppConfig;
import utils.CassandraConnector;
import utils.DialCodeEnum;

import java.util.HashMap;
Expand Down Expand Up @@ -38,4 +42,39 @@ public void setDialCodeIndex(double maxIndex) throws Exception {
data.put(DialCodeEnum.prop_value.name(), String.valueOf((int) maxIndex));
update(DialCodeEnum.prop_key.name(), DialCodeEnum.dialcode_max_index.name(), data, null);
}

/**
* Atomically increments the dialcode_max_index using a Cassandra
* Lightweight Transaction (compare-and-set). Retries on CAS conflict to
* safely support concurrent requests and multiple service instances when
* Redis is disabled.
*
* @return the new (incremented) index value
* @throws Exception if the row is missing or all retries are exhausted
*/
public Double getAndIncrementDialCodeIndex() throws Exception {
int maxRetries = 10;
for (int i = 0; i < maxRetries; i++) {
List<Row> rows = read(DialCodeEnum.prop_key.name(), DialCodeEnum.dialcode_max_index.name());
if (rows.isEmpty())
throw new Exception("dialcode_max_index not found in system_config");
Row row = rows.get(0);
String currentStr = row.getString(DialCodeEnum.prop_value.name());
double current = Double.valueOf(currentStr);
double next = current + 1;
String nextStr = String.valueOf((int) next);
Statement stmt = QueryBuilder.update(getKeyspace(), getTable())
.with(QueryBuilder.set(DialCodeEnum.prop_value.name(), nextStr))
.where(QueryBuilder.eq(DialCodeEnum.prop_key.name(), DialCodeEnum.dialcode_max_index.name()))
.onlyIf(QueryBuilder.eq(DialCodeEnum.prop_value.name(), currentStr));
ResultSet rs = CassandraConnector.getSession().execute(stmt);
Row applied = rs.one();
if (applied.getBool("[applied]")) {
return next;
}
// Another instance won the CAS race; re-read and retry.
}
throw new Exception("Failed to allocate DIAL code index after " + maxRetries
+ " retries due to high concurrency.");
}
}
36 changes: 21 additions & 15 deletions app/managers/HealthCheckManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package managers;

import commons.JedisFactory;
import commons.dto.Response;
import commons.exception.ResponseCode;
import telemetry.TelemetryManager;
Expand Down Expand Up @@ -39,20 +40,22 @@ public Response getAllServiceHealth(){

allChecks.add(check);
check = new HashMap<>();
try{
status = IHealthCheckManager.checkRedisHealth();
if(!status){
overAllHealth=false;
if (JedisFactory.isEnabled()) {
try {
status = IHealthCheckManager.checkRedisHealth();
if (!status) {
overAllHealth = false;
}
check = generateCheck("redis cache", check, status);
TelemetryManager.log("redis cache " + CONNECTION_SUCCESS);
} catch (Exception e) {
check = generateCheck("redis cache", check, status);
TelemetryManager.error("redis cache " + CONNECTION_FAIL, e);
}
check = generateCheck("redis cache",check,status);
TelemetryManager.log("redis cache "+CONNECTION_SUCCESS);
}catch (Exception e){
check = generateCheck("redis cache",check,status);
TelemetryManager.error("redis cache "+CONNECTION_FAIL,e);
allChecks.add(check);
check = new HashMap<>();
}
Comment thread
aimansharief marked this conversation as resolved.

allChecks.add(check);
check = new HashMap<>();
try{
status = IHealthCheckManager.checkCassandraHealth();
check = generateCheck("cassandra db",check,status);
Expand All @@ -66,11 +69,14 @@ public Response getAllServiceHealth(){
}
allChecks.add(check);

check = generateCheck("DIAL Max Index", check, dialMaxIndexHealth);
if (!dialMaxIndexHealth) {
overAllHealth = false;
if (JedisFactory.isEnabled()) {
check = new HashMap<>();
check = generateCheck("DIAL Max Index", check, dialMaxIndexHealth);
if (!dialMaxIndexHealth) {
overAllHealth = false;
}
allChecks.add(check);
}
allChecks.add(check);

Response response = OK("checks",allChecks);
if(!overAllHealth)
Expand Down
1 change: 1 addition & 0 deletions app/managers/IHealthCheckManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public interface IHealthCheckManager {
Response getAllServiceHealth();

static boolean checkRedisHealth(){
if (!JedisFactory.isEnabled()) return true;
try {
Jedis jedis = JedisFactory.getRedisConncetion();
jedis.close();
Expand Down
13 changes: 12 additions & 1 deletion app/utils/DialCodeGenerator.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils;

import commons.AppConfig;
import commons.JedisFactory;
import dbstore.DialCodeStore;
import dbstore.SystemConfigStore;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -52,7 +53,12 @@ public Map<Double, String> generate(double count, String channel, String publish
}
}
}
setMaxIndex(lastIndex);
// When Redis is enabled the counter lives in Redis; sync it back to Cassandra.
// When Redis is disabled each index was already committed atomically via CAS,
// so a plain write here would be redundant and unsafe under concurrency.
if (JedisFactory.isEnabled()) {
setMaxIndex(lastIndex);
}
return codes;
}

Expand All @@ -75,6 +81,11 @@ private void setMaxIndex(double maxIndex) throws Exception {
* @throws Exception
*/
private Double getMaxIndex(Double masterDBIndex) throws Exception {
if (!JedisFactory.isEnabled()) {
// Use a Cassandra LWT-based atomic increment so that concurrent
// requests and multiple service instances cannot allocate the same index.
return systemConfigStore.getAndIncrementDialCodeIndex();
}
double index = RedisStoreUtil.getNodePropertyIncVal("domain", "dialcode", "max_index");
if (index < masterDBIndex) {
String message = "Redis doesn't have the latest max index. Please set max index in redis as : " + masterDBIndex + " to enable the service.";
Expand Down
8 changes: 4 additions & 4 deletions app/utils/RedisStoreUtil.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package utils;

import commons.JedisFactory;
import commons.exception.ServerException;
import redis.clients.jedis.Jedis;

Expand All @@ -10,9 +11,8 @@ public class RedisStoreUtil {
private static final String KEY_SEPARATOR = ":";

public static void saveNodeProperty(String graphId, String objectId, String nodeProperty, String propValue) {

Jedis jedis =
getRedisConncetion();
if (!JedisFactory.isEnabled()) return;
Jedis jedis = getRedisConncetion();
try {
String redisKey = getNodePropertyKey(graphId, objectId, nodeProperty);
jedis.set(redisKey, propValue);
Expand All @@ -24,7 +24,7 @@ public static void saveNodeProperty(String graphId, String objectId, String node
}

public static Double getNodePropertyIncVal(String graphId, String objectId, String nodeProperty) {

if (!JedisFactory.isEnabled()) return null;
Jedis jedis = getRedisConncetion();
try {
String redisKey = getNodePropertyKey(graphId, objectId, nodeProperty);
Expand Down
1 change: 1 addition & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ logger.application=DEBUG

# Cache-Manager Configuration
cache.type="redis"
redis.cache.enabled=false
Comment thread
aimansharief marked this conversation as resolved.

#search.es_conn_info="10.6.0.11:9200"
search.es_conn_info="localhost:9200"
Expand Down
31 changes: 31 additions & 0 deletions test/manager/DialCodeManagerImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.regex.Pattern;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -395,6 +396,36 @@ public void validateAdopterContext () throws Exception {
Assert.assertEquals("CLIENT_ERROR", response.getResponseCode().toString());
}

/**
* When Redis is disabled (default in tests), two sequential generation
* requests must produce non-overlapping DIAL codes, proving that the
* Cassandra LWT-based index allocator does not re-use indices.
*/
@Test
public void generateDialCodesProduceUniqueIndicesWhenRedisDisabledTest() throws Exception {
String channelId = "channelTest";

String req1 = "{\"count\":3,\"publisher\": \"mock_pub01\",\"batchCode\":\"test_idx_batch1\"}";
Response resp1 = dialCodeMgr.generateDialCode(getRequestMap(req1), channelId);
assertEquals("OK", resp1.getResponseCode().toString());
@SuppressWarnings("unchecked")
Collection<String> codes1 = (Collection<String>) resp1.getResult().get("dialcodes");
assertEquals(3, codes1.size());

String req2 = "{\"count\":3,\"publisher\": \"mock_pub01\",\"batchCode\":\"test_idx_batch2\"}";
Response resp2 = dialCodeMgr.generateDialCode(getRequestMap(req2), channelId);
assertEquals("OK", resp2.getResponseCode().toString());
@SuppressWarnings("unchecked")
Collection<String> codes2 = (Collection<String>) resp2.getResult().get("dialcodes");
assertEquals(3, codes2.size());

Set<String> firstBatch = new HashSet<>(codes1);
for (String code : codes2) {
assertFalse("Index re-use detected: code " + code + " was allocated in both batches",
firstBatch.contains(code));
}
}

@Test
public void generateDialCodeExpectValidUniqueDialCodes() throws Exception {
String dialCodeGenReq = "{\"count\":900}";
Expand Down
31 changes: 31 additions & 0 deletions test/manager/HealthCheckManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.Map;

import static org.junit.Assert.*;

public class HealthCheckManagerTest extends CassandraTestSetup{
Expand All @@ -35,6 +38,34 @@ public void getAllServicesHealthTest() throws Exception{

}

/**
* When Redis is disabled (default in tests), the health response must contain
* elasticsearch and cassandra db checks but must NOT contain redis cache or
* DIAL Max Index checks.
*/
@Test
public void getAllServicesHealthWithRedisDisabledTest() throws Exception {
// redis.cache.enabled defaults to false in the test config
Response response = healthCheckManager.getAllServiceHealth();
@SuppressWarnings("unchecked")
List<Map<String, Object>> checks = (List<Map<String, Object>>) response.getResult().get("checks");
assertNotNull("checks list must be present in health response", checks);

boolean hasEs = false, hasCassandra = false, hasRedis = false, hasDialMaxIndex = false;
for (Map<String, Object> check : checks) {
String name = (String) check.get("name");
if ("elasticsearch".equals(name)) hasEs = true;
if ("cassandra db".equals(name)) hasCassandra = true;
if ("redis cache".equals(name)) hasRedis = true;
if ("DIAL Max Index".equals(name)) hasDialMaxIndex = true;
}

assertTrue("elasticsearch check must be present", hasEs);
assertTrue("cassandra db check must be present", hasCassandra);
assertFalse("redis cache check must not be present when Redis is disabled", hasRedis);
assertFalse("DIAL Max Index check must not be present when Redis is disabled", hasDialMaxIndex);
}

@Test
public void checkRedisHealth() throws Exception{
boolean actualHealth = IHealthCheckManager.checkRedisHealth();
Expand Down
1 change: 1 addition & 0 deletions test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ logger.application=DEBUG

# Cache-Manager Configuration
cache.type="redis"
redis.cache.enabled=false

#search.es_conn_info="10.6.0.11:9200"
search.es_conn_info="localhost:9200"
Expand Down
Loading