Skip to content

Commit 79f57c1

Browse files
ashwin2002nehal-mantri
authored andcommitted
More improvments
- Decr. sleep to 5ms within submit_task() calls - Removed lock base SDKClient booking from pool
1 parent c249558 commit 79f57c1

3 files changed

Lines changed: 33 additions & 33 deletions

File tree

src/main/java/RestServer/TaskRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ public ResponseEntity<Map<String, Object>> doc_load() {
702702
}
703703

704704
// Calculate effective number of workers needed
705-
int effectiveWorkers = Math.min(ws.workers,
705+
int effectiveWorkers = Math.min(ws.workers,
706706
(int)((totalDocsToProcess + docsPerWorker - 1) / docsPerWorker)); // ceil division
707707

708708
System.out.println("Smart worker counting: Total docs=" + totalDocsToProcess +

src/main/java/couchbase/sdk/SDKClientPool.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ public void create_clients(String bucket_name, Server server, int req_clients) t
8585
}
8686

8787
public SDKClient get_client_for_bucket(String bucket_name, String scope, String collection) {
88-
String cache_key = bucket_name + ":" + scope + ":" + collection;
89-
88+
String col_name = scope + collection;
89+
9090
// Check if client is already cached for this collection
91-
ClientInfo existing = clientCache.get(cache_key);
91+
ClientInfo existing = clientCache.get(col_name);
9292
if (existing != null) {
9393
existing.counter.incrementAndGet();
9494
return existing.client;
@@ -113,7 +113,7 @@ public SDKClient get_client_for_bucket(String bucket_name, String scope, String
113113
busyClients.computeIfAbsent(bucket_name, k -> new ConcurrentLinkedQueue<>()).add(client);
114114

115115
// Cache client reference with thread-safe counter
116-
clientCache.put(cache_key, new ClientInfo(client, new AtomicInteger(1)));
116+
clientCache.put(col_name, new ClientInfo(client, new AtomicInteger(1)));
117117

118118
return client;
119119
}
@@ -122,22 +122,22 @@ public void release_client(SDKClient client) {
122122
if (client == null || client.bucket == null) {
123123
return;
124124
}
125-
125+
126126
String bucket_key = client.bucket;
127-
String cache_key = bucket_key + ":" + client.scope + ":" + client.collection;
128-
127+
String col_name = client.scope + client.collection;
128+
129129
// Get cached client info
130-
ClientInfo info = clientCache.get(cache_key);
130+
ClientInfo info = clientCache.get(col_name);
131131
if (info == null) {
132132
return;
133133
}
134-
134+
135135
// Decrement counter atomically
136136
int newCount = info.counter.decrementAndGet();
137-
137+
138138
if (newCount == 0) {
139139
// Remove from cache atomically
140-
clientCache.remove(cache_key);
140+
clientCache.remove(col_name);
141141

142142
// Remove from busy pool and add to idle pool atomically
143143
ConcurrentLinkedQueue<SDKClient> busyPool = busyClients.get(bucket_key);

src/main/java/couchbase/sdk/SharedClusterManager.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@
2323
*/
2424
public class SharedClusterManager {
2525
private static Logger logger = LogManager.getLogger(SharedClusterManager.class);
26-
26+
2727
// Common KV connections setting for massively parallel collection loads
2828
// Increased from default 5 to 500 to support 5,000 collections loading in parallel
29-
private static final int DEFAULT_KV_CONNECTIONS = 500;
30-
29+
private static final int DEFAULT_KV_CONNECTIONS = 5;
30+
3131
// Shared ClusterEnvironment with optimized connection settings
3232
private static ClusterEnvironment sharedEnvironment;
33-
33+
3434
// Store cluster instances per server connection string
3535
private static ConcurrentHashMap<String, ClusterWrapper> clusterMap = new ConcurrentHashMap<>();
36-
36+
3737
// Initialize the shared environment once (lazy initialization)
3838
private static void initializeSharedEnvironment() {
3939
if (sharedEnvironment == null) {
@@ -51,7 +51,7 @@ private static void initializeSharedEnvironment() {
5151
}
5252
}
5353
}
54-
54+
5555
/**
5656
* Get or create a shared Cluster instance for the given server connection
5757
*/
@@ -72,27 +72,27 @@ public static synchronized Cluster getCluster(Server server) throws Authenticati
7272

7373
return wrapper.cluster;
7474
}
75-
75+
7676
/**
7777
* Release reference to the shared Cluster instance
7878
*/
7979
public static synchronized void releaseCluster(Server server) {
8080
String clusterKey = getClusterKey(server);
8181
ClusterWrapper wrapper = clusterMap.get(clusterKey);
82-
82+
8383
if (wrapper != null) {
8484
int refCount = wrapper.decrementRefCount();
85-
logger.debug("Released Cluster instance for server: " + server.ip +
85+
logger.debug("Released Cluster instance for server: " + server.ip +
8686
" (ref count: " + refCount + ")");
87-
87+
8888
if (refCount == 0) {
8989
logger.info("No more references, disconnecting Cluster for server: " + server.ip);
9090
wrapper.cluster.disconnect();
9191
clusterMap.remove(clusterKey);
9292
}
9393
}
9494
}
95-
95+
9696
/**
9797
* Shutdown all cluster instances and the shared environment
9898
*/
@@ -110,7 +110,7 @@ public static synchronized void shutdownAll() {
110110
logger.info("Shared Cluster Environment shutdown complete");
111111
}
112112
}
113-
113+
114114
private static Cluster createCluster(Server server) throws AuthenticationFailureException {
115115
ClusterOptions clusterOptions;
116116
try {
@@ -121,52 +121,52 @@ private static Cluster createCluster(Server server) throws AuthenticationFailure
121121
clusterOptions = ClusterOptions.clusterOptions(server.rest_username, server.rest_password)
122122
.environment(createNonTLSEnvironment());
123123
}
124-
124+
125125
Cluster cluster = Cluster.connect(server.ip, clusterOptions);
126126
logger.info("Cluster connection successful: " + server.ip);
127127
return cluster;
128128
} catch (AuthenticationFailureException e) {
129-
logger.error("Authentication failed for server: " + server.ip +
129+
logger.error("Authentication failed for server: " + server.ip +
130130
" with user: " + server.rest_username);
131131
throw e;
132132
} catch (Exception e) {
133133
logger.error("Failed to connect Cluster to server: " + server.ip, e);
134134
throw new RuntimeException("Cluster connection failed", e);
135135
}
136136
}
137-
137+
138138
private static ClusterEnvironment createNonTLSEnvironment() {
139139
return ClusterEnvironment.builder()
140140
.timeoutConfig(TimeoutConfig.builder().kvTimeout(Duration.ofSeconds(10)))
141141
.ioConfig(IoConfig.enableDnsSrv(true))
142142
.ioConfig(IoConfig.numKvConnections(DEFAULT_KV_CONNECTIONS))
143143
.build();
144144
}
145-
145+
146146
private static String getClusterKey(Server server) {
147147
return server.ip + ":" + server.memcached_port;
148148
}
149-
149+
150150
/**
151151
* Wrapper class to track reference count for shared Cluster instances
152152
*/
153153
private static class ClusterWrapper {
154154
Cluster cluster;
155155
AtomicInteger refCount;
156-
156+
157157
ClusterWrapper(Cluster cluster) {
158158
this.cluster = cluster;
159159
this.refCount = new AtomicInteger(1);
160160
}
161-
161+
162162
void incrementRefCount() {
163163
refCount.incrementAndGet();
164164
}
165-
165+
166166
int decrementRefCount() {
167167
return refCount.decrementAndGet();
168168
}
169-
169+
170170
int getRefCount() {
171171
return refCount.get();
172172
}

0 commit comments

Comments
 (0)