diff --git a/platform-core/cassandra-connector/src/main/java/org/sunbird/cassandra/CassandraConnector.java b/platform-core/cassandra-connector/src/main/java/org/sunbird/cassandra/CassandraConnector.java index ebbe9e47f..a13fefed7 100644 --- a/platform-core/cassandra-connector/src/main/java/org/sunbird/cassandra/CassandraConnector.java +++ b/platform-core/cassandra-connector/src/main/java/org/sunbird/cassandra/CassandraConnector.java @@ -2,9 +2,10 @@ import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.ProtocolVersion; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.Session; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; import org.apache.commons.lang3.StringUtils; import org.sunbird.common.Platform; import org.sunbird.common.exception.ServerException; @@ -13,171 +14,194 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; -public class CassandraConnector { - /** Cassandra Session Map. */ - private static Map sessionMap = new HashMap(); +public class CassandraConnector { - /** Guard to prevent registering duplicate JVM shutdown hooks. */ - private static boolean shutdownHookRegistered = false; + private static final Map sessionMap = new ConcurrentHashMap<>(); + private static final Map clusterMap = new ConcurrentHashMap<>(); + private static final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false); + private static final int MAX_STARTUP_RETRIES = 30; + private static final long RETRY_BASE_MS = 2_000L; // 2 s — cap doubles each attempt + private static final long RETRY_MAX_MS = 30_000L; // 30 s — cap ceiling static { if (Platform.getBoolean("service.db.cassandra.enabled", true)) - prepareSession("lp", getConsistencyLevel("lp")); + prepareSessionWithRetry("lp", getConsistencyLevel("lp")); } - - /** - * Provide lp Session. - * - * @return lp session. - */ public static Session getSession() { return getSession("lp"); } /** - * @param sessionKey - * @return + * Returns the active session for {@code sessionKey}. + * @param sessionKey one of "lp", "lpa", "sunbird", "platform-courses" + * @return an active Session + * @throws ServerException if the session cannot be established */ public static Session getSession(String sessionKey) { - Session session = sessionMap.containsKey(sessionKey) ? sessionMap.get(sessionKey) : null; + String key = sessionKey.toLowerCase(); + Session session = sessionMap.get(key); + Cluster cluster = clusterMap.get(key); - if (null == session || session.isClosed()) { - ConsistencyLevel level = getConsistencyLevel(sessionKey); - prepareSession(sessionKey, level); - session = sessionMap.get(sessionKey); + if (session != null && !session.isClosed() && cluster != null && !cluster.isClosed()) { + return session; + } + + synchronized (CassandraConnector.class) { + session = sessionMap.get(key); + cluster = clusterMap.get(key); + + if (session == null || session.isClosed() || cluster == null || cluster.isClosed()) { + prepareSessionOnce(key, getConsistencyLevel(key)); + session = sessionMap.get(key); + } } - if (null == session) - throw new ServerException("ERR_INITIALISE_CASSANDRA_SESSION", "Error while initialising cassandra"); + + if (session == null) + throw new ServerException("ERR_INITIALISE_CASSANDRA_SESSION", + "Unable to obtain Cassandra session for key: " + sessionKey); return session; } /** - * - * @param sessionKey - * @param level + * Closes all Cluster objects, which releases their sessions, connection pools, + * and driver-internal background threads. Each cluster is closed independently + * so a failure in one does not block the others. */ - private static void prepareSession(String sessionKey, ConsistencyLevel level) { - List connectionInfo = getConnectionInfo(sessionKey.toLowerCase()); - List addressList = getSocketAddress(connectionInfo); - try { - if (null != level) { - sessionMap.put(sessionKey.toLowerCase(), Cluster.builder() - .addContactPointsWithPorts(addressList) - .withQueryOptions(new QueryOptions().setConsistencyLevel(level)) - .withoutJMXReporting() - .withProtocolVersion(ProtocolVersion.V4) - .build().connect()); - } else { - sessionMap.put(sessionKey.toLowerCase(), Cluster.builder() - .addContactPointsWithPorts(addressList) - .withoutJMXReporting() - .withProtocolVersion(ProtocolVersion.V4) - .build().connect()); + public static void close() { + clusterMap.forEach((key, cluster) -> { + if (cluster != null && !cluster.isClosed()) { + try { + cluster.close(); + } catch (Exception e) { + TelemetryManager.error( + "Error closing Cassandra cluster [" + key + "]: " + e.getMessage(), e); + } } + }); + sessionMap.clear(); + clusterMap.clear(); + } - if (!shutdownHookRegistered) { - registerShutdownHook(); - shutdownHookRegistered = true; + // Startup retry loop — called only from the static initialiser. + private static void prepareSessionWithRetry(String sessionKey, ConsistencyLevel level) { + int attempt = 0; + long cap = RETRY_BASE_MS; + + while (attempt < MAX_STARTUP_RETRIES) { + attempt++; + try { + prepareSession(sessionKey, level); + TelemetryManager.log( + "Cassandra session ready for [" + sessionKey + "] on attempt " + attempt); + return; + } catch (Exception e) { + TelemetryManager.error("Cassandra connect attempt " + attempt + "/" + + MAX_STARTUP_RETRIES + " failed for [" + sessionKey + "]: " + + e.getMessage(), e); + + if (attempt < MAX_STARTUP_RETRIES) { + // Full jitter: sleep = random(0, min(cap, RETRY_MAX_MS)) + long sleep = (long) (Math.random() * Math.min(cap, RETRY_MAX_MS)); + try { + Thread.sleep(sleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + TelemetryManager.error( + "Cassandra startup retry interrupted for [" + sessionKey + "]", ie); + return; + } + cap = Math.min(cap * 2, RETRY_MAX_MS); + } } + } + TelemetryManager.error("All " + MAX_STARTUP_RETRIES + + " Cassandra startup connect attempts exhausted for [" + sessionKey + "]"); + } + + // Runtime reconnect — called only from inside the synchronized block in {@link #getSession}. + private static void prepareSessionOnce(String sessionKey, ConsistencyLevel level) { + try { + prepareSession(sessionKey, level); + TelemetryManager.log("Cassandra session re-established for [" + sessionKey + "]"); } catch (Exception e) { - TelemetryManager.error("Error! While Loading Cassandra Properties." + e.getMessage(), e); + TelemetryManager.error("Cassandra reconnect attempt failed for [" + + sessionKey + "]: " + e.getMessage(), e); } } /** - * - * @param sessionKey - * @return + * Creates a brand-new Cluster and Session for {@code sessionKey} and stores them + * in the maps. Closes any previous Cluster for the same key to free its resources. + * Throws if no contact point is reachable so callers can react appropriately. */ - private static List getConnectionInfo(String sessionKey) { - List connectionInfo = null; - switch (sessionKey) { - case "lp": - connectionInfo = Arrays.asList(Platform.config.getString("cassandra.lp.connection").split(",")); - break; - case "lpa": - connectionInfo = Arrays.asList(Platform.config.getString("cassandra.lpa.connection").split(",")); - break; - case "sunbird": - connectionInfo = Arrays.asList(Platform.config.getString("cassandra.sunbird.connection").split(",")); - break; - case "platform-courses": - connectionInfo = Arrays - .asList(Platform.config.getString("cassandra.connection.platform_courses").split(",")); - break; + private static void prepareSession(String sessionKey, ConsistencyLevel level) { + List addressList = getSocketAddress(getConnectionInfo(sessionKey)); + + Cluster.Builder builder = Cluster.builder() + .addContactPointsWithPorts(addressList) + .withReconnectionPolicy(new ExponentialReconnectionPolicy(1_000L, 60_000L)) + .withRetryPolicy(DefaultRetryPolicy.INSTANCE) + .withoutJMXReporting(); + + if (level != null) + builder.withQueryOptions(new QueryOptions().setConsistencyLevel(level)); + + Cluster cluster = builder.build(); + Session session = cluster.connect(); + + Cluster oldCluster = clusterMap.put(sessionKey, cluster); + sessionMap.put(sessionKey, session); + + if (oldCluster != null && !oldCluster.isClosed()) { + try { oldCluster.close(); } catch (Exception ignored) { /* best effort */ } } - if (null == connectionInfo || connectionInfo.isEmpty()) - connectionInfo = new ArrayList<>(Arrays.asList("localhost:9042")); - return connectionInfo; + if (shutdownHookRegistered.compareAndSet(false, true)) + registerShutdownHook(); } - /** - * - * @param hosts - * @return - */ - private static List getSocketAddress(List hosts) { - List connectionList = new ArrayList<>(); - for (String connection : hosts) { - String[] conn = connection.split(":"); - String host = conn[0]; - int port = Integer.valueOf(conn[1]); - connectionList.add(new InetSocketAddress(host, port)); - } - return connectionList; + private static void registerShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + TelemetryManager.log("Shutting down Cassandra connector — closing all clusters"); + CassandraConnector.close(); + })); } - /** - * Close all Cassandra sessions gracefully. - * Each session is closed individually so a failure in one does not - * prevent closing the others. - */ - public static void close() { - sessionMap.forEach((key, session) -> { - if (session != null && !session.isClosed()) { - try { - session.close(); - } catch (Exception e) { - TelemetryManager.error("Error closing Cassandra session for key: " + key + " — " + e.getMessage(), e); - } - } - }); - sessionMap.clear(); + private static List getConnectionInfo(String sessionKey) { + String configKey; + switch (sessionKey) { + case "lp": configKey = "cassandra.lp.connection"; break; + case "lpa": configKey = "cassandra.lpa.connection"; break; + case "sunbird": configKey = "cassandra.sunbird.connection"; break; + case "platform-courses": configKey = "cassandra.connection.platform_courses"; break; + default: configKey = null; break; + } + if (configKey != null && Platform.config.hasPath(configKey)) { + List nodes = Arrays.asList(Platform.config.getString(configKey).split(",")); + if (!nodes.isEmpty()) return nodes; + } + return new ArrayList<>(Collections.singletonList("localhost:9042")); } - /** - * Register a single JVM shutdown hook to close all open Cassandra sessions. - * Protected by shutdownHookRegistered so it is called only once even when - * prepareSession() is invoked multiple times. - */ - private static void registerShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - TelemetryManager.log("Shutting down Cassandra connector — closing all sessions"); - CassandraConnector.close(); - } - }); + private static List getSocketAddress(List hosts) { + List list = new ArrayList<>(); + for (String conn : hosts) { + String[] parts = conn.trim().split(":"); + list.add(new InetSocketAddress(parts[0].trim(), Integer.parseInt(parts[1].trim()))); + } + return list; } - /** - * This Method Returns the value of Consistency Level for Multi Node/DC - * Cassandra Cluster. - * - * @return ConsistencyLevel - */ private static ConsistencyLevel getConsistencyLevel(String clusterName) { - String key = "cassandra." + clusterName + ".consistency.level"; - String consistencyLevel = Platform.config.hasPath(key) ? Platform.config.getString(key) : null; - if (StringUtils.isNotBlank(consistencyLevel)) - return ConsistencyLevel.valueOf(consistencyLevel.toUpperCase()); - return null; + String key = "cassandra." + clusterName + ".consistency.level"; + String value = Platform.config.hasPath(key) ? Platform.config.getString(key) : null; + return StringUtils.isNotBlank(value) ? ConsistencyLevel.valueOf(value.toUpperCase()) : null; } - } diff --git a/platform-core/cassandra-connector/src/main/java/org/sunbird/cassandra/CassandraStore.java b/platform-core/cassandra-connector/src/main/java/org/sunbird/cassandra/CassandraStore.java index 20e12fc3f..2bb79becf 100644 --- a/platform-core/cassandra-connector/src/main/java/org/sunbird/cassandra/CassandraStore.java +++ b/platform-core/cassandra-connector/src/main/java/org/sunbird/cassandra/CassandraStore.java @@ -14,6 +14,7 @@ import org.sunbird.cassandra.enums.CassandraParams; import org.sunbird.cassandra.store.Constants; import org.sunbird.common.exception.ServerException; +import org.sunbird.telemetry.logger.TelemetryManager; import java.util.Arrays; import java.util.Iterator; @@ -58,7 +59,7 @@ public void update(String identifier, Object idValue, Map reques objects[i] = idValue; executeQuery(updateQuery, objects); } catch (Exception e) { - e.printStackTrace(); + TelemetryManager.error("Error while updating record for id : " + idValue + " | " + e.getMessage(), e); throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), "Error while updating record for id : " + idValue, e); } @@ -73,6 +74,7 @@ public void delete(String identifier, Object idValue) { Delete.Where delete = QueryBuilder.delete().from(keySpace, table).where(eq(identifier, idValue)); CassandraConnector.getSession().execute(delete); } catch (Exception e) { + TelemetryManager.error("Error while deleting record for id : " + idValue + " | " + e.getMessage(), e); throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), "Error while deleting record for id : " + idValue, e); } @@ -91,6 +93,7 @@ public List read(String key, Object value) { ResultSet results = CassandraConnector.getSession().execute(selectQuery); return results.all(); } catch (Exception e) { + TelemetryManager.error("Error while fetching record for ID : " + value + " | " + e.getMessage(), e); throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), "Error while fetching record for ID : " + value, e); } @@ -110,6 +113,7 @@ protected List getRecordsByProperty(String propertyName, List prope ResultSet results = CassandraConnector.getSession().execute(selectQuery); return results.all(); } catch (Exception e) { + TelemetryManager.error("Error while fetching record for Property : " + propertyName + " | " + e.getMessage(), e); throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), "Error while fetching record for Property : " + propertyName, e); } @@ -136,6 +140,7 @@ protected List getRecordsByProperties(Map propertyMap) { ResultSet results = CassandraConnector.getSession().execute(selectQuery.allowFiltering()); return results.all(); } catch (Exception e) { + TelemetryManager.error("Error while fetching records | " + e.getMessage(), e); throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), "Error while fetching records", e); } @@ -153,6 +158,7 @@ protected List getPropertiesValueById(String identifier, String idValue, St ResultSet results = CassandraConnector.getSession().execute(boundStatement.bind(identifier)); return results.all(); } catch (Exception e) { + TelemetryManager.error("Error while fetching properties for ID : " + idValue + " | " + e.getMessage(), e); throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), "Error while fetching properties for ID : " + idValue, e); } @@ -164,6 +170,7 @@ protected List getAllRecords() { ResultSet results = CassandraConnector.getSession().execute(selectQuery); return results.all(); } catch (Exception e) { + TelemetryManager.error("Error while fetching all records | " + e.getMessage(), e); throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), "Error while fetching all records", e); } @@ -173,7 +180,7 @@ protected void upsertRecord(Map request) { try { if (null == request || request.isEmpty()) { throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), - "Invalid Identifier to read"); + "Invalid request to upsert."); } Session session = CassandraConnector.getSession(); String query = getPreparedStatementFrUpsert(request); @@ -183,8 +190,8 @@ protected void upsertRecord(Map request) { session.execute(boundStatement.bind(objects)); } catch (Exception e) { - throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), "Error while upsert record", - e); + TelemetryManager.error("Error while upsert record | " + e.getMessage(), e); + throw new ServerException(CassandraParams.ERR_SERVER_ERROR.name(), "Error while upsert record", e); } }