2323 */
2424public class SharedClusterManager {
2525 private static Logger logger = LogManager .getLogger (SharedClusterManager .class );
26-
26+
2727 // Common KV connections setting for massively parallel collection loads
2828 // Increased from default 5 to 500 to support 5,000 collections loading in parallel
29- private static final int DEFAULT_KV_CONNECTIONS = 5 ;
30-
29+ private static final int DEFAULT_KV_CONNECTIONS = 500 ;
30+
3131 // Shared ClusterEnvironment with optimized connection settings
3232 private static ClusterEnvironment sharedEnvironment ;
33-
34- // Track whether environment has been shutdown
35- private static volatile boolean environmentShutdown = false ;
36-
37- private static final Object environmentLock = new Object ();
38-
33+
3934 // Store cluster instances per server connection string
4035 private static ConcurrentHashMap <String , ClusterWrapper > clusterMap = new ConcurrentHashMap <>();
41-
42- // Initialize the shared environment (lazy initialization with recreation )
36+
37+ // Initialize the shared environment once (lazy initialization)
4338 private static void initializeSharedEnvironment () {
44- if (sharedEnvironment == null || environmentShutdown ) {
45- synchronized (environmentLock ) {
46- // Double-check under lock
47- if (sharedEnvironment == null || environmentShutdown ) {
48- try {
49- if (sharedEnvironment != null && environmentShutdown ) {
50- logger .info ("Shared Cluster Environment was shutdown, recreating" );
51- }
52-
53- sharedEnvironment = ClusterEnvironment .builder ()
54- .timeoutConfig (TimeoutConfig .builder ().kvTimeout (Duration .ofSeconds (10 )))
55- .securityConfig (SecurityConfig .enableTls (true )
56- .trustManagerFactory (InsecureTrustManagerFactory .INSTANCE ))
57- .ioConfig (IoConfig .enableDnsSrv (true ))
58- .ioConfig (IoConfig .numKvConnections (DEFAULT_KV_CONNECTIONS ))
59- .build ();
60- environmentShutdown = false ;
61- logger .info ("Shared Cluster Environment initialized with " + DEFAULT_KV_CONNECTIONS + " KV connections for massively parallel collection loads" );
62- } catch (Exception e ) {
63- logger .error ("Failed to initialize shared Cluster Environment" , e );
64- }
65- }
39+ if (sharedEnvironment == null ) {
40+ try {
41+ sharedEnvironment = ClusterEnvironment .builder ()
42+ .timeoutConfig (TimeoutConfig .builder ().kvTimeout (Duration .ofSeconds (10 )))
43+ .securityConfig (SecurityConfig .enableTls (true )
44+ .trustManagerFactory (InsecureTrustManagerFactory .INSTANCE ))
45+ .ioConfig (IoConfig .enableDnsSrv (true ))
46+ .ioConfig (IoConfig .numKvConnections (DEFAULT_KV_CONNECTIONS ))
47+ .build ();
48+ logger .info ("Shared Cluster Environment initialized with " + DEFAULT_KV_CONNECTIONS + " KV connections for massively parallel collection loads" );
49+ } catch (Exception e ) {
50+ logger .error ("Failed to initialize shared Cluster Environment" , e );
6651 }
6752 }
6853 }
69-
54+
7055 /**
7156 * Get or create a shared Cluster instance for the given server connection
7257 */
7358 public static synchronized Cluster getCluster (Server server ) throws AuthenticationFailureException {
74- initializeSharedEnvironment ();
7559 String clusterKey = getClusterKey (server );
7660
7761 ClusterWrapper wrapper = clusterMap .get (clusterKey );
@@ -88,27 +72,27 @@ public static synchronized Cluster getCluster(Server server) throws Authenticati
8872
8973 return wrapper .cluster ;
9074 }
91-
75+
9276 /**
9377 * Release reference to the shared Cluster instance
9478 */
9579 public static synchronized void releaseCluster (Server server ) {
9680 String clusterKey = getClusterKey (server );
9781 ClusterWrapper wrapper = clusterMap .get (clusterKey );
98-
82+
9983 if (wrapper != null ) {
10084 int refCount = wrapper .decrementRefCount ();
101- logger .debug ("Released Cluster instance for server: " + server .ip +
85+ logger .debug ("Released Cluster instance for server: " + server .ip +
10286 " (ref count: " + refCount + ")" );
103-
87+
10488 if (refCount == 0 ) {
10589 logger .info ("No more references, disconnecting Cluster for server: " + server .ip );
10690 wrapper .cluster .disconnect ();
10791 clusterMap .remove (clusterKey );
10892 }
10993 }
11094 }
111-
95+
11296 /**
11397 * Shutdown all cluster instances and the shared environment
11498 */
@@ -123,11 +107,10 @@ public static synchronized void shutdownAll() {
123107
124108 if (sharedEnvironment != null ) {
125109 sharedEnvironment .shutdown ();
126- environmentShutdown = true ;
127110 logger .info ("Shared Cluster Environment shutdown complete" );
128111 }
129112 }
130-
113+
131114 private static Cluster createCluster (Server server ) throws AuthenticationFailureException {
132115 ClusterOptions clusterOptions ;
133116 try {
@@ -138,52 +121,52 @@ private static Cluster createCluster(Server server) throws AuthenticationFailure
138121 clusterOptions = ClusterOptions .clusterOptions (server .rest_username , server .rest_password )
139122 .environment (createNonTLSEnvironment ());
140123 }
141-
124+
142125 Cluster cluster = Cluster .connect (server .ip , clusterOptions );
143126 logger .info ("Cluster connection successful: " + server .ip );
144127 return cluster ;
145128 } catch (AuthenticationFailureException e ) {
146- logger .error ("Authentication failed for server: " + server .ip +
129+ logger .error ("Authentication failed for server: " + server .ip +
147130 " with user: " + server .rest_username );
148131 throw e ;
149132 } catch (Exception e ) {
150133 logger .error ("Failed to connect Cluster to server: " + server .ip , e );
151134 throw new RuntimeException ("Cluster connection failed" , e );
152135 }
153136 }
154-
137+
155138 private static ClusterEnvironment createNonTLSEnvironment () {
156139 return ClusterEnvironment .builder ()
157140 .timeoutConfig (TimeoutConfig .builder ().kvTimeout (Duration .ofSeconds (10 )))
158141 .ioConfig (IoConfig .enableDnsSrv (true ))
159142 .ioConfig (IoConfig .numKvConnections (DEFAULT_KV_CONNECTIONS ))
160143 .build ();
161144 }
162-
145+
163146 private static String getClusterKey (Server server ) {
164147 return server .ip + ":" + server .memcached_port ;
165148 }
166-
149+
167150 /**
168151 * Wrapper class to track reference count for shared Cluster instances
169152 */
170153 private static class ClusterWrapper {
171154 Cluster cluster ;
172155 AtomicInteger refCount ;
173-
156+
174157 ClusterWrapper (Cluster cluster ) {
175158 this .cluster = cluster ;
176159 this .refCount = new AtomicInteger (1 );
177160 }
178-
161+
179162 void incrementRefCount () {
180163 refCount .incrementAndGet ();
181164 }
182-
165+
183166 int decrementRefCount () {
184167 return refCount .decrementAndGet ();
185168 }
186-
169+
187170 int getRefCount () {
188171 return refCount .get ();
189172 }
0 commit comments