Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
import org.apache.commons.lang3.StringUtils;
import org.sunbird.common.Platform;
import org.sunbird.common.exception.ServerException;
Expand All @@ -13,171 +14,194 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

public class CassandraConnector {

/** Cassandra Session Map. */
private static Map<String, Session> sessionMap = new HashMap<String, Session>();
public class CassandraConnector {

/** Guard to prevent registering duplicate JVM shutdown hooks. */
private static boolean shutdownHookRegistered = false;
private static final Map<String, Session> sessionMap = new ConcurrentHashMap<>();
private static final Map<String, Cluster> clusterMap = new ConcurrentHashMap<>();
private static final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false);
private static final int MAX_STARTUP_RETRIES = 30;
private static final long RETRY_BASE_MS = 2_000L; // 2 s — cap doubles each attempt
private static final long RETRY_MAX_MS = 30_000L; // 30 s — cap ceiling

static {
if (Platform.getBoolean("service.db.cassandra.enabled", true))
prepareSession("lp", getConsistencyLevel("lp"));
prepareSessionWithRetry("lp", getConsistencyLevel("lp"));
}

/**
* Provide lp Session.
*
* @return lp session.
*/
public static Session getSession() {
return getSession("lp");
}

/**
* @param sessionKey
* @return
* Returns the active session for {@code sessionKey}.
* @param sessionKey one of "lp", "lpa", "sunbird", "platform-courses"
* @return an active Session
* @throws ServerException if the session cannot be established
*/
public static Session getSession(String sessionKey) {
Session session = sessionMap.containsKey(sessionKey) ? sessionMap.get(sessionKey) : null;
String key = sessionKey.toLowerCase();
Session session = sessionMap.get(key);
Cluster cluster = clusterMap.get(key);

if (null == session || session.isClosed()) {
ConsistencyLevel level = getConsistencyLevel(sessionKey);
prepareSession(sessionKey, level);
session = sessionMap.get(sessionKey);
if (session != null && !session.isClosed() && cluster != null && !cluster.isClosed()) {
return session;
}

synchronized (CassandraConnector.class) {
session = sessionMap.get(key);
cluster = clusterMap.get(key);

if (session == null || session.isClosed() || cluster == null || cluster.isClosed()) {
prepareSessionOnce(key, getConsistencyLevel(key));
session = sessionMap.get(key);
}
}
if (null == session)
throw new ServerException("ERR_INITIALISE_CASSANDRA_SESSION", "Error while initialising cassandra");

if (session == null)
throw new ServerException("ERR_INITIALISE_CASSANDRA_SESSION",
"Unable to obtain Cassandra session for key: " + sessionKey);
return session;
}

/**
*
* @param sessionKey
* @param level
* Closes all Cluster objects, which releases their sessions, connection pools,
* and driver-internal background threads. Each cluster is closed independently
* so a failure in one does not block the others.
*/
private static void prepareSession(String sessionKey, ConsistencyLevel level) {
List<String> connectionInfo = getConnectionInfo(sessionKey.toLowerCase());
List<InetSocketAddress> addressList = getSocketAddress(connectionInfo);
try {
if (null != level) {
sessionMap.put(sessionKey.toLowerCase(), Cluster.builder()
.addContactPointsWithPorts(addressList)
.withQueryOptions(new QueryOptions().setConsistencyLevel(level))
.withoutJMXReporting()
.withProtocolVersion(ProtocolVersion.V4)
.build().connect());
} else {
sessionMap.put(sessionKey.toLowerCase(), Cluster.builder()
.addContactPointsWithPorts(addressList)
.withoutJMXReporting()
.withProtocolVersion(ProtocolVersion.V4)
.build().connect());
public static void close() {
clusterMap.forEach((key, cluster) -> {
if (cluster != null && !cluster.isClosed()) {
try {
cluster.close();
} catch (Exception e) {
TelemetryManager.error(
"Error closing Cassandra cluster [" + key + "]: " + e.getMessage(), e);
}
}
});
sessionMap.clear();
clusterMap.clear();
}

if (!shutdownHookRegistered) {
registerShutdownHook();
shutdownHookRegistered = true;
// Startup retry loop — called only from the static initialiser.
private static void prepareSessionWithRetry(String sessionKey, ConsistencyLevel level) {
int attempt = 0;
long cap = RETRY_BASE_MS;

while (attempt < MAX_STARTUP_RETRIES) {
attempt++;
try {
prepareSession(sessionKey, level);
TelemetryManager.log(
"Cassandra session ready for [" + sessionKey + "] on attempt " + attempt);
return;
} catch (Exception e) {
TelemetryManager.error("Cassandra connect attempt " + attempt + "/"
+ MAX_STARTUP_RETRIES + " failed for [" + sessionKey + "]: "
+ e.getMessage(), e);

if (attempt < MAX_STARTUP_RETRIES) {
// Full jitter: sleep = random(0, min(cap, RETRY_MAX_MS))
long sleep = (long) (Math.random() * Math.min(cap, RETRY_MAX_MS));
try {
Thread.sleep(sleep);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
TelemetryManager.error(
"Cassandra startup retry interrupted for [" + sessionKey + "]", ie);
return;
}
cap = Math.min(cap * 2, RETRY_MAX_MS);
}
}
}
TelemetryManager.error("All " + MAX_STARTUP_RETRIES
+ " Cassandra startup connect attempts exhausted for [" + sessionKey + "]");
}

// Runtime reconnect — called only from inside the synchronized block in {@link #getSession}.
private static void prepareSessionOnce(String sessionKey, ConsistencyLevel level) {
try {
prepareSession(sessionKey, level);
TelemetryManager.log("Cassandra session re-established for [" + sessionKey + "]");
} catch (Exception e) {
TelemetryManager.error("Error! While Loading Cassandra Properties." + e.getMessage(), e);
TelemetryManager.error("Cassandra reconnect attempt failed for ["
+ sessionKey + "]: " + e.getMessage(), e);
}
}

/**
*
* @param sessionKey
* @return
* Creates a brand-new Cluster and Session for {@code sessionKey} and stores them
* in the maps. Closes any previous Cluster for the same key to free its resources.
* Throws if no contact point is reachable so callers can react appropriately.
*/
private static List<String> getConnectionInfo(String sessionKey) {
List<String> connectionInfo = null;
switch (sessionKey) {
case "lp":
connectionInfo = Arrays.asList(Platform.config.getString("cassandra.lp.connection").split(","));
break;
case "lpa":
connectionInfo = Arrays.asList(Platform.config.getString("cassandra.lpa.connection").split(","));
break;
case "sunbird":
connectionInfo = Arrays.asList(Platform.config.getString("cassandra.sunbird.connection").split(","));
break;
case "platform-courses":
connectionInfo = Arrays
.asList(Platform.config.getString("cassandra.connection.platform_courses").split(","));
break;
private static void prepareSession(String sessionKey, ConsistencyLevel level) {
List<InetSocketAddress> addressList = getSocketAddress(getConnectionInfo(sessionKey));

Cluster.Builder builder = Cluster.builder()
.addContactPointsWithPorts(addressList)
.withReconnectionPolicy(new ExponentialReconnectionPolicy(1_000L, 60_000L))
.withRetryPolicy(DefaultRetryPolicy.INSTANCE)
.withoutJMXReporting();

if (level != null)
builder.withQueryOptions(new QueryOptions().setConsistencyLevel(level));

Cluster cluster = builder.build();
Session session = cluster.connect();

Cluster oldCluster = clusterMap.put(sessionKey, cluster);
sessionMap.put(sessionKey, session);

if (oldCluster != null && !oldCluster.isClosed()) {
try { oldCluster.close(); } catch (Exception ignored) { /* best effort */ }
}
if (null == connectionInfo || connectionInfo.isEmpty())
connectionInfo = new ArrayList<>(Arrays.asList("localhost:9042"));

return connectionInfo;
if (shutdownHookRegistered.compareAndSet(false, true))
registerShutdownHook();
}

/**
*
* @param hosts
* @return
*/
private static List<InetSocketAddress> getSocketAddress(List<String> hosts) {
List<InetSocketAddress> connectionList = new ArrayList<>();
for (String connection : hosts) {
String[] conn = connection.split(":");
String host = conn[0];
int port = Integer.valueOf(conn[1]);
connectionList.add(new InetSocketAddress(host, port));
}
return connectionList;
private static void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
TelemetryManager.log("Shutting down Cassandra connector — closing all clusters");
CassandraConnector.close();
}));
}

/**
* Close all Cassandra sessions gracefully.
* Each session is closed individually so a failure in one does not
* prevent closing the others.
*/
public static void close() {
sessionMap.forEach((key, session) -> {
if (session != null && !session.isClosed()) {
try {
session.close();
} catch (Exception e) {
TelemetryManager.error("Error closing Cassandra session for key: " + key + " — " + e.getMessage(), e);
}
}
});
sessionMap.clear();
private static List<String> getConnectionInfo(String sessionKey) {
String configKey;
switch (sessionKey) {
case "lp": configKey = "cassandra.lp.connection"; break;
case "lpa": configKey = "cassandra.lpa.connection"; break;
case "sunbird": configKey = "cassandra.sunbird.connection"; break;
case "platform-courses": configKey = "cassandra.connection.platform_courses"; break;
default: configKey = null; break;
}
if (configKey != null && Platform.config.hasPath(configKey)) {
List<String> nodes = Arrays.asList(Platform.config.getString(configKey).split(","));
if (!nodes.isEmpty()) return nodes;
}
return new ArrayList<>(Collections.singletonList("localhost:9042"));
}

/**
* Register a single JVM shutdown hook to close all open Cassandra sessions.
* Protected by shutdownHookRegistered so it is called only once even when
* prepareSession() is invoked multiple times.
*/
private static void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
TelemetryManager.log("Shutting down Cassandra connector — closing all sessions");
CassandraConnector.close();
}
});
private static List<InetSocketAddress> getSocketAddress(List<String> hosts) {
List<InetSocketAddress> list = new ArrayList<>();
for (String conn : hosts) {
String[] parts = conn.trim().split(":");
list.add(new InetSocketAddress(parts[0].trim(), Integer.parseInt(parts[1].trim())));
}
return list;
}

/**
* This Method Returns the value of Consistency Level for Multi Node/DC
* Cassandra Cluster.
*
* @return ConsistencyLevel
*/
private static ConsistencyLevel getConsistencyLevel(String clusterName) {
String key = "cassandra." + clusterName + ".consistency.level";
String consistencyLevel = Platform.config.hasPath(key) ? Platform.config.getString(key) : null;
if (StringUtils.isNotBlank(consistencyLevel))
return ConsistencyLevel.valueOf(consistencyLevel.toUpperCase());
return null;
String key = "cassandra." + clusterName + ".consistency.level";
String value = Platform.config.hasPath(key) ? Platform.config.getString(key) : null;
return StringUtils.isNotBlank(value) ? ConsistencyLevel.valueOf(value.toUpperCase()) : null;
}

}
Loading
Loading