Skip to content

Commit 2a99ee8

Browse files
authored
Merge pull request #1200 from aimansharief/cassandra-reconnect
fix: Added cassandra reconnect functionality
2 parents 1648265 + dd5583e commit 2a99ee8

File tree

2 files changed

+159
-128
lines changed

2 files changed

+159
-128
lines changed

platform-core/cassandra-connector/src/main/java/org/sunbird/cassandra/CassandraConnector.java

Lines changed: 148 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import com.datastax.driver.core.Cluster;
44
import com.datastax.driver.core.ConsistencyLevel;
5-
import com.datastax.driver.core.ProtocolVersion;
65
import com.datastax.driver.core.QueryOptions;
76
import com.datastax.driver.core.Session;
7+
import com.datastax.driver.core.policies.DefaultRetryPolicy;
8+
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
89
import org.apache.commons.lang3.StringUtils;
910
import org.sunbird.common.Platform;
1011
import org.sunbird.common.exception.ServerException;
@@ -13,171 +14,194 @@
1314
import java.net.InetSocketAddress;
1415
import java.util.ArrayList;
1516
import java.util.Arrays;
16-
import java.util.HashMap;
17+
import java.util.Collections;
1718
import java.util.List;
1819
import java.util.Map;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922

20-
public class CassandraConnector {
2123

22-
/** Cassandra Session Map. */
23-
private static Map<String, Session> sessionMap = new HashMap<String, Session>();
24+
public class CassandraConnector {
2425

25-
/** Guard to prevent registering duplicate JVM shutdown hooks. */
26-
private static boolean shutdownHookRegistered = false;
26+
private static final Map<String, Session> sessionMap = new ConcurrentHashMap<>();
27+
private static final Map<String, Cluster> clusterMap = new ConcurrentHashMap<>();
28+
private static final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false);
29+
private static final int MAX_STARTUP_RETRIES = 30;
30+
private static final long RETRY_BASE_MS = 2_000L; // 2 s — cap doubles each attempt
31+
private static final long RETRY_MAX_MS = 30_000L; // 30 s — cap ceiling
2732

2833
static {
2934
if (Platform.getBoolean("service.db.cassandra.enabled", true))
30-
prepareSession("lp", getConsistencyLevel("lp"));
35+
prepareSessionWithRetry("lp", getConsistencyLevel("lp"));
3136
}
32-
33-
/**
34-
* Provide lp Session.
35-
*
36-
* @return lp session.
37-
*/
3837
public static Session getSession() {
3938
return getSession("lp");
4039
}
4140

4241
/**
43-
* @param sessionKey
44-
* @return
42+
* Returns the active session for {@code sessionKey}.
43+
* @param sessionKey one of "lp", "lpa", "sunbird", "platform-courses"
44+
* @return an active Session
45+
* @throws ServerException if the session cannot be established
4546
*/
4647
public static Session getSession(String sessionKey) {
47-
Session session = sessionMap.containsKey(sessionKey) ? sessionMap.get(sessionKey) : null;
48+
String key = sessionKey.toLowerCase();
49+
Session session = sessionMap.get(key);
50+
Cluster cluster = clusterMap.get(key);
4851

49-
if (null == session || session.isClosed()) {
50-
ConsistencyLevel level = getConsistencyLevel(sessionKey);
51-
prepareSession(sessionKey, level);
52-
session = sessionMap.get(sessionKey);
52+
if (session != null && !session.isClosed() && cluster != null && !cluster.isClosed()) {
53+
return session;
54+
}
55+
56+
synchronized (CassandraConnector.class) {
57+
session = sessionMap.get(key);
58+
cluster = clusterMap.get(key);
59+
60+
if (session == null || session.isClosed() || cluster == null || cluster.isClosed()) {
61+
prepareSessionOnce(key, getConsistencyLevel(key));
62+
session = sessionMap.get(key);
63+
}
5364
}
54-
if (null == session)
55-
throw new ServerException("ERR_INITIALISE_CASSANDRA_SESSION", "Error while initialising cassandra");
65+
66+
if (session == null)
67+
throw new ServerException("ERR_INITIALISE_CASSANDRA_SESSION",
68+
"Unable to obtain Cassandra session for key: " + sessionKey);
5669
return session;
5770
}
5871

5972
/**
60-
*
61-
* @param sessionKey
62-
* @param level
73+
* Closes all Cluster objects, which releases their sessions, connection pools,
74+
* and driver-internal background threads. Each cluster is closed independently
75+
* so a failure in one does not block the others.
6376
*/
64-
private static void prepareSession(String sessionKey, ConsistencyLevel level) {
65-
List<String> connectionInfo = getConnectionInfo(sessionKey.toLowerCase());
66-
List<InetSocketAddress> addressList = getSocketAddress(connectionInfo);
67-
try {
68-
if (null != level) {
69-
sessionMap.put(sessionKey.toLowerCase(), Cluster.builder()
70-
.addContactPointsWithPorts(addressList)
71-
.withQueryOptions(new QueryOptions().setConsistencyLevel(level))
72-
.withoutJMXReporting()
73-
.withProtocolVersion(ProtocolVersion.V4)
74-
.build().connect());
75-
} else {
76-
sessionMap.put(sessionKey.toLowerCase(), Cluster.builder()
77-
.addContactPointsWithPorts(addressList)
78-
.withoutJMXReporting()
79-
.withProtocolVersion(ProtocolVersion.V4)
80-
.build().connect());
77+
public static void close() {
78+
clusterMap.forEach((key, cluster) -> {
79+
if (cluster != null && !cluster.isClosed()) {
80+
try {
81+
cluster.close();
82+
} catch (Exception e) {
83+
TelemetryManager.error(
84+
"Error closing Cassandra cluster [" + key + "]: " + e.getMessage(), e);
85+
}
8186
}
87+
});
88+
sessionMap.clear();
89+
clusterMap.clear();
90+
}
8291

83-
if (!shutdownHookRegistered) {
84-
registerShutdownHook();
85-
shutdownHookRegistered = true;
92+
// Startup retry loop — called only from the static initialiser.
93+
private static void prepareSessionWithRetry(String sessionKey, ConsistencyLevel level) {
94+
int attempt = 0;
95+
long cap = RETRY_BASE_MS;
96+
97+
while (attempt < MAX_STARTUP_RETRIES) {
98+
attempt++;
99+
try {
100+
prepareSession(sessionKey, level);
101+
TelemetryManager.log(
102+
"Cassandra session ready for [" + sessionKey + "] on attempt " + attempt);
103+
return;
104+
} catch (Exception e) {
105+
TelemetryManager.error("Cassandra connect attempt " + attempt + "/"
106+
+ MAX_STARTUP_RETRIES + " failed for [" + sessionKey + "]: "
107+
+ e.getMessage(), e);
108+
109+
if (attempt < MAX_STARTUP_RETRIES) {
110+
// Full jitter: sleep = random(0, min(cap, RETRY_MAX_MS))
111+
long sleep = (long) (Math.random() * Math.min(cap, RETRY_MAX_MS));
112+
try {
113+
Thread.sleep(sleep);
114+
} catch (InterruptedException ie) {
115+
Thread.currentThread().interrupt();
116+
TelemetryManager.error(
117+
"Cassandra startup retry interrupted for [" + sessionKey + "]", ie);
118+
return;
119+
}
120+
cap = Math.min(cap * 2, RETRY_MAX_MS);
121+
}
86122
}
123+
}
124+
TelemetryManager.error("All " + MAX_STARTUP_RETRIES
125+
+ " Cassandra startup connect attempts exhausted for [" + sessionKey + "]");
126+
}
127+
128+
// Runtime reconnect — called only from inside the synchronized block in {@link #getSession}.
129+
private static void prepareSessionOnce(String sessionKey, ConsistencyLevel level) {
130+
try {
131+
prepareSession(sessionKey, level);
132+
TelemetryManager.log("Cassandra session re-established for [" + sessionKey + "]");
87133
} catch (Exception e) {
88-
TelemetryManager.error("Error! While Loading Cassandra Properties." + e.getMessage(), e);
134+
TelemetryManager.error("Cassandra reconnect attempt failed for ["
135+
+ sessionKey + "]: " + e.getMessage(), e);
89136
}
90137
}
91138

92139
/**
93-
*
94-
* @param sessionKey
95-
* @return
140+
* Creates a brand-new Cluster and Session for {@code sessionKey} and stores them
141+
* in the maps. Closes any previous Cluster for the same key to free its resources.
142+
* Throws if no contact point is reachable so callers can react appropriately.
96143
*/
97-
private static List<String> getConnectionInfo(String sessionKey) {
98-
List<String> connectionInfo = null;
99-
switch (sessionKey) {
100-
case "lp":
101-
connectionInfo = Arrays.asList(Platform.config.getString("cassandra.lp.connection").split(","));
102-
break;
103-
case "lpa":
104-
connectionInfo = Arrays.asList(Platform.config.getString("cassandra.lpa.connection").split(","));
105-
break;
106-
case "sunbird":
107-
connectionInfo = Arrays.asList(Platform.config.getString("cassandra.sunbird.connection").split(","));
108-
break;
109-
case "platform-courses":
110-
connectionInfo = Arrays
111-
.asList(Platform.config.getString("cassandra.connection.platform_courses").split(","));
112-
break;
144+
private static void prepareSession(String sessionKey, ConsistencyLevel level) {
145+
List<InetSocketAddress> addressList = getSocketAddress(getConnectionInfo(sessionKey));
146+
147+
Cluster.Builder builder = Cluster.builder()
148+
.addContactPointsWithPorts(addressList)
149+
.withReconnectionPolicy(new ExponentialReconnectionPolicy(1_000L, 60_000L))
150+
.withRetryPolicy(DefaultRetryPolicy.INSTANCE)
151+
.withoutJMXReporting();
152+
153+
if (level != null)
154+
builder.withQueryOptions(new QueryOptions().setConsistencyLevel(level));
155+
156+
Cluster cluster = builder.build();
157+
Session session = cluster.connect();
158+
159+
Cluster oldCluster = clusterMap.put(sessionKey, cluster);
160+
sessionMap.put(sessionKey, session);
161+
162+
if (oldCluster != null && !oldCluster.isClosed()) {
163+
try { oldCluster.close(); } catch (Exception ignored) { /* best effort */ }
113164
}
114-
if (null == connectionInfo || connectionInfo.isEmpty())
115-
connectionInfo = new ArrayList<>(Arrays.asList("localhost:9042"));
116165

117-
return connectionInfo;
166+
if (shutdownHookRegistered.compareAndSet(false, true))
167+
registerShutdownHook();
118168
}
119169

120-
/**
121-
*
122-
* @param hosts
123-
* @return
124-
*/
125-
private static List<InetSocketAddress> getSocketAddress(List<String> hosts) {
126-
List<InetSocketAddress> connectionList = new ArrayList<>();
127-
for (String connection : hosts) {
128-
String[] conn = connection.split(":");
129-
String host = conn[0];
130-
int port = Integer.valueOf(conn[1]);
131-
connectionList.add(new InetSocketAddress(host, port));
132-
}
133-
return connectionList;
170+
private static void registerShutdownHook() {
171+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
172+
TelemetryManager.log("Shutting down Cassandra connector — closing all clusters");
173+
CassandraConnector.close();
174+
}));
134175
}
135176

136-
/**
137-
* Close all Cassandra sessions gracefully.
138-
* Each session is closed individually so a failure in one does not
139-
* prevent closing the others.
140-
*/
141-
public static void close() {
142-
sessionMap.forEach((key, session) -> {
143-
if (session != null && !session.isClosed()) {
144-
try {
145-
session.close();
146-
} catch (Exception e) {
147-
TelemetryManager.error("Error closing Cassandra session for key: " + key + " — " + e.getMessage(), e);
148-
}
149-
}
150-
});
151-
sessionMap.clear();
177+
private static List<String> getConnectionInfo(String sessionKey) {
178+
String configKey;
179+
switch (sessionKey) {
180+
case "lp": configKey = "cassandra.lp.connection"; break;
181+
case "lpa": configKey = "cassandra.lpa.connection"; break;
182+
case "sunbird": configKey = "cassandra.sunbird.connection"; break;
183+
case "platform-courses": configKey = "cassandra.connection.platform_courses"; break;
184+
default: configKey = null; break;
185+
}
186+
if (configKey != null && Platform.config.hasPath(configKey)) {
187+
List<String> nodes = Arrays.asList(Platform.config.getString(configKey).split(","));
188+
if (!nodes.isEmpty()) return nodes;
189+
}
190+
return new ArrayList<>(Collections.singletonList("localhost:9042"));
152191
}
153192

154-
/**
155-
* Register a single JVM shutdown hook to close all open Cassandra sessions.
156-
* Protected by shutdownHookRegistered so it is called only once even when
157-
* prepareSession() is invoked multiple times.
158-
*/
159-
private static void registerShutdownHook() {
160-
Runtime.getRuntime().addShutdownHook(new Thread() {
161-
@Override
162-
public void run() {
163-
TelemetryManager.log("Shutting down Cassandra connector — closing all sessions");
164-
CassandraConnector.close();
165-
}
166-
});
193+
private static List<InetSocketAddress> getSocketAddress(List<String> hosts) {
194+
List<InetSocketAddress> list = new ArrayList<>();
195+
for (String conn : hosts) {
196+
String[] parts = conn.trim().split(":");
197+
list.add(new InetSocketAddress(parts[0].trim(), Integer.parseInt(parts[1].trim())));
198+
}
199+
return list;
167200
}
168201

169-
/**
170-
* This Method Returns the value of Consistency Level for Multi Node/DC
171-
* Cassandra Cluster.
172-
*
173-
* @return ConsistencyLevel
174-
*/
175202
private static ConsistencyLevel getConsistencyLevel(String clusterName) {
176-
String key = "cassandra." + clusterName + ".consistency.level";
177-
String consistencyLevel = Platform.config.hasPath(key) ? Platform.config.getString(key) : null;
178-
if (StringUtils.isNotBlank(consistencyLevel))
179-
return ConsistencyLevel.valueOf(consistencyLevel.toUpperCase());
180-
return null;
203+
String key = "cassandra." + clusterName + ".consistency.level";
204+
String value = Platform.config.hasPath(key) ? Platform.config.getString(key) : null;
205+
return StringUtils.isNotBlank(value) ? ConsistencyLevel.valueOf(value.toUpperCase()) : null;
181206
}
182-
183207
}

0 commit comments

Comments
 (0)