Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public class CassandraConnector {
/** Cassandra Session Map. */
private static Map<String, Session> sessionMap = new HashMap<String, Session>();

/** Guard to prevent registering duplicate JVM shutdown hooks. */
private static boolean shutdownHookRegistered = false;

static {
if (Platform.getBoolean("service.db.cassandra.enabled", true))
prepareSession("lp", getConsistencyLevel("lp"));
Expand Down Expand Up @@ -77,9 +80,11 @@ private static void prepareSession(String sessionKey, ConsistencyLevel level) {
.build().connect());
}

registerShutdownHook();
if (!shutdownHookRegistered) {
registerShutdownHook();
shutdownHookRegistered = true;
}
} catch (Exception e) {
e.printStackTrace();
TelemetryManager.error("Error! While Loading Cassandra Properties." + e.getMessage(), e);
}
}
Expand Down Expand Up @@ -129,21 +134,33 @@ private static List<InetSocketAddress> getSocketAddress(List<String> hosts) {
}

/**
* Close connection with the cluster.
*
* Close all Cassandra sessions gracefully.
* Each session is closed individually so a failure in one does not
* prevent closing the others.
*/
public static void close() {
sessionMap.entrySet().stream().forEach(stream -> stream.getValue().close());
sessionMap.forEach((key, session) -> {
if (session != null && !session.isClosed()) {
try {
session.close();
} catch (Exception e) {
TelemetryManager.error("Error closing Cassandra session for key: " + key + " — " + e.getMessage(), e);
}
}
});
sessionMap.clear();
}

/**
* Register JVM shutdown hook to close cassandra open session.
* Register a single JVM shutdown hook to close all open Cassandra sessions.
* Protected by shutdownHookRegistered so it is called only once even when
* prepareSession() is invoked multiple times.
*/
private static void registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
TelemetryManager.log("Shutting down Cassandra connector session");
TelemetryManager.log("Shutting down Cassandra connector — closing all sessions");
CassandraConnector.close();
}
});
Expand Down