Skip to content

Commit b1f5d24

Browse files
committed
Squashed commit of the following:
commit 362ccaa Author: Ashwin G <ashwin.govindarajulu@couchbase.com> Date: Tue Mar 31 14:27:05 2026 +0530 Fix bucket collision in SDKClient cache for multi-bucket loading Changed cache key from 'scope+collection' to 'bucket:scope:collection' to prevent wrong SDKClient instances being returned when multiple buckets share identical scope/collection names while using shared Cluster. Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com> commit ff05c4c Author: Ashwin G <ashwin.govindarajulu@couchbase.com> Date: Mon Mar 23 16:40:28 2026 +0530 Fix memory pressure in SimpleValue for concurrent doc_gen commit 159f6dd Author: Ashwin G <ashwin.govindarajulu@couchbase.com> Date: Tue Mar 17 16:14:29 2026 +0530 Adding Makefile commit 899451b Author: Ashwin G <ashwin.govindarajulu@couchbase.com> Date: Tue Mar 17 13:06:42 2026 +0530 Updating agentic md files for including SharedClusterManager improvements commit ae52264 Merge: 6e16b36 78f9120 Author: Ashwin <ashwintrojan+github@gmail.com> Date: Tue Mar 17 12:52:19 2026 +0530 Merge pull request #43 from couchbaselabs/error_fix Enable shared cluster environment recreation post-shutdown commit 78f9120 Author: Ashwin G <ashwin.govindarajulu@couchbase.com> Date: Tue Mar 17 12:15:31 2026 +0530 Enable shared cluster environment recreation post-shutdown - Track environment shutdown state with volatile flag for thread-safety - Fix initialization race condition preventing environment reuse after shutdown - Maintain proper ClusterEnvironment lifecycle for long-running workloads commit 6e16b36 Merge: 5b11afc c60fb6a Author: Ashwin <ashwintrojan+github@gmail.com> Date: Mon Mar 16 09:06:44 2026 +0530 Merge pull request #42 from dananjay-s/main Init shared env when trying to create shared cluster commit c60fb6a Author: Dananjay <dananjay.s@couchbase.com> Date: Sun Mar 15 10:38:33 2026 +0530 Init shared env when trying to create shared cluster commit 5b11afc Merge: bfb8d9c c5d4db0 Author: Ashwin <ashwintrojan+github@gmail.com> Date: Fri Mar 13 15:21:39 2026 +0530 Merge pull request #41 from couchbaselabs/shared_cluster_batched_coll_load_approach Improvements in Cluster connection creation and Couchbase loader tasks commit c5d4db0 Author: Ashwin G <ashwin.govindarajulu@couchbase.com> Date: Fri Mar 13 14:18:11 2026 +0530 More improvments - Decr. sleep to 5ms within submit_task() calls - Removed lock base SDKClient booking from pool commit 2f112d3 Author: Ashwin G <ashwin.govindarajulu@couchbase.com> Date: Fri Mar 13 13:04:50 2026 +0530 Test new enhancements
1 parent bfb8d9c commit b1f5d24

14 files changed

Lines changed: 658 additions & 118 deletions

File tree

.agents/profiles/Architect.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@ graph TD
2424
elasticsearch[src/main/java/elasticsearch] -->|Defines Requirements| ARCH[The Architect]
2525
Mongo[src/main/java/mongo] -->|Defines Requirements| ARCH[The Architect]
2626
Utils-->|Defines Requirements| ARCH[The Architect]
27+
RestServer-->|Defines Requirements| ARCH[The Architect]
2728
LoaderJava[src/main/java/Loader.java] -->|Invokes| Couchbase
2829
MongoLoaderJava[src/main/java/MongoLoader.java] -->|Invokes| Mongo
2930
SIFTLoaderJava[src/main/java/SIFTLoader.java] -->|Invokes| elasticsearch
3031
RestServer-->|Utilizes| Couchbase
3132
RestServer-->|Utilizes| Mongo
3233
RestServer-->|Utilizes| Utils
34+
RestServer/SharedClusterManager -->|Manages| Couchbase/sdk
35+
RestServer/CollectionLoadBatcher -->|Coordinates| RestServer/TaskRequest
3336
Couchbase-->|Uses| Utils
37+
Couchbase/sdk/SDKClientPool -->|Uses| Couchbase/sdk/SharedClusterManager
3438
Mongo-->|Uses| Utils
3539
elasticsearch-->|Uses| Utils
3640
Utils-->|Utilized by| Couchbase
3741
Utils-->|Utilized by| Mongo
3842
Utils-->|Utilized by| elasticsearch
3943
Utils-->|Utilized by| RestServer
44+
Couchbase/sdk/SharedClusterManager -->|Optimizes| Cluster Connections
45+
RestServer/CollectionLoadBatcher -->|Optimizes| Multi-Collection Loads
4046
```

.agents/profiles/CBRestLoader.md

Lines changed: 89 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,23 @@ To generate high-performance, thread-safe, and efficient REST-based document loa
1111
graph TD
1212
RestApplication[src/main/java/RestServer/RestApplication.java] -->|Entry Point| RESTLOADER[The CBRestLoader]
1313
TaskRequest[src/main/java/RestServer/TaskRequest.java] -->|Business Logic| RESTLOADER
14+
CollectionLoadBatcher[src/main/java/RestServer/CollectionLoadBatcher.java] -->|Batch Processing| RESTLOADER
1415
RESTLOADER-->|Utilizes| Couchbase[src/main/java/couchbase]
15-
Couchbase-->|Utilizes| Utils[src/main/java/utils]
16+
Couchbase-->|Uses| Utils[src/main/java/utils]
17+
Couchbase/sdk/SDKClientPool -->|Uses| SharedClusterManager[src/main/java/couchbase/sdk/SharedClusterManager.java]
18+
SharedClusterManager -->|Manages| Cluster Instances
19+
CollectionLoadBatcher -->|Coordinates| TaskManager
1620
Utils-->|Utilized by| Couchbase
21+
Utils-->|Utilized by| RestServer
1722
```
1823

1924
### Logic & Constraints
2025
* **Step-Zero:** Always scan `./src/main/java/couchbase` and `./src/main/java/RestServer` to understand existing SDK and REST patterns before proposing new code.
26+
* **Component Selection:**
27+
- **Single Collection Workloads**: Use standard `SDKClientPool``SDKClient``Cluster` pattern
28+
- **Multi-Collection Workloads (100-1000 collections)**: Use `SharedClusterManager` + dynamic collection switching
29+
- **Massive Collection Loads (1000+ collections)**: Use `CollectionLoadBatcher` + `SharedClusterManager`
30+
- **High-Throughput Operations**: Leverage shared ClusterEnvironment with 500+ KV connections
2131
* **REST API Focus:** Modifications target Spring Boot REST endpoints (RestHandlers) and TaskRequest business logic for HTTP-based document loading.
2232
* **SDK Precision:** Default to the latest Couchbase SDK (v3.x) unless specified otherwise.
2333
* **N1QL Mastery:** Must prioritize Indexing strategies and GSI (Global Secondary Index) awareness when writing queries.
@@ -26,6 +36,37 @@ graph TD
2636
- Always include error handling for DocumentNotFound and CasMismatch.
2737
* **Tone:** Technical, efficiency-focused, and precise.
2838

39+
### Core Architecture Components
40+
41+
**SharedClusterManager** (`couchbase/sdk/SharedClusterManager.java`)
42+
- **Purpose**: Singleton pattern managing shared Cluster instances per server connection to avoid connection exhaustion
43+
- **Key Features**:
44+
- Shared ClusterEnvironment with optimized KV connections (default: 500 for massively parallel loads)
45+
- Thread-safe reference counting for Cluster instances
46+
- Automatic environment recreation post-shutdown for long-running workloads
47+
- Supports both TLS and non-TLS connections
48+
- **Usage Pattern**:
49+
```java
50+
Cluster cluster = SharedClusterManager.getCluster(server);
51+
// Perform operations
52+
SharedClusterManager.releaseCluster(server);
53+
```
54+
- **Performance Benefits**: Eliminates connection thrashing for multi-collection workloads, reduces memory overhead from per-collection Cluster instances
55+
56+
**CollectionLoadBatcher** (`RestServer/CollectionLoadBatcher.java`)
57+
- **Purpose**: Java-side batch processing for massive collection loads (thousands of collections)
58+
- **Key Features**:
59+
- Fixed batch size (default: 50) with concurrent processing
60+
- Thread-safe batch state tracking with progress monitoring
61+
- Prevents worker starvation and queue overhead
62+
- Integration with REST API via `submitToBatch()` endpoint
63+
- **Usage Pattern**:
64+
```java
65+
ResponseEntity<Map<String, Object>> result =
66+
CollectionLoadBatcher.submitToBatch(requestBody);
67+
```
68+
- **Performance Benefits**: Sequential Python calls become batched Java operations, maximizing throughput for massive collection loads
69+
2970
### Work flow of loading
3071
sequenceDiagram
3172
participant C as Client (REST)
@@ -55,9 +96,18 @@ sequenceDiagram
5596

5697
### Performance Optimization Guidelines
5798
* **Multi-Collection Strategy**: Prefer bucket-level clients with dynamic collection switching over per-collection client instances. Workers should call `selectCollection()` dynamically per operation instead of creating dedicated clients per collection.
58-
* **Connection Scaling**: KV connections should scale based on: `num_workers × target_collections / connection_reuse_factor`. Default of 5 connections per SDKClient may be insufficient for high-concurrency multi-collection workloads.
99+
* **Shared Cluster Management**: Use `SharedClusterManager` for all multi-collection workloads. It provides:
100+
- Single Cluster instance per server connection to avoid connection exhaustion
101+
- Optimized KV connections (default: 500) for massively parallel collection loads
102+
- Thread-safe reference counting and automatic resource cleanup
103+
- Environment recreation capability for long-running workloads
104+
* **Connection Scaling**: KV connections should scale based on: `num_workers × target_collections / connection_reuse_factor`. Default of 5 connections per SDKClient may be insufficient for high-concurrency multi-collection workloads. SharedClusterManager defaults to 500 KV connections for large-scale loads.
59105
* **Thread Pool Sizing**: Set `num_workers` based on concurrent task throughput needs, not total collections. Example: 60 workers efficiently handle 5000 collections with proper batching, rather than allocating 20 workers per collection.
60-
* **Batch Processing**: For large-scale multi-collection loading, use batch processing to load collections in chunks (e.g., 60-100 collections per batch) to avoid client pool exhaustion.
106+
* **Batch Processing**: For large-scale multi-collection loading (1000+ collections), use `CollectionLoadBatcher` to:
107+
- Process collections in batches (default: 50 per batch)
108+
- Prevent worker starvation and reduce queue overhead
109+
- Monitor batch progress and completion status
110+
- Automatically start next batch after current completion
61111
* **Client Pool Optimization**: SDKClientPool should cache clients at bucket level and support dynamic scope/collection switching, not create separate client instances per (scope+collection) combination.
62112

63113
### Architecture Anti-Patterns
@@ -75,30 +125,53 @@ Client → TaskManager → WorkLoadGenerate → SDKClientPool → Specific Colle
75125
```
76126
Suitable for: Single collection workloads with static configuration.
77127

78-
**Multi-Collection Optimized (Recommended):**
128+
**Multi-Collection Optimized (SharedClusterManager):**
79129
```
80-
Client → TaskManager → WorkLoadTasks → SDKClientPool (Bucket-Level)
81-
130+
Client → TaskManager → WorkLoadTasks → SDKClientPool → SharedClusterManager
131+
132+
Single Cluster per Server
133+
82134
Dynamic Collection Switching per Worker
83-
84-
Worker cycles through multiple collections
135+
136+
Worker cycles through collections
85137
```
86-
Suitable for: Large-scale multi-collection loading (hundreds/thousands of collections).
138+
Suitable for: Large-scale multi-collection loading (hundreds/thousands) with optimized connection management.
87139

88-
**Batched Multi-Collection:**
140+
**Batched Multi-Collection (CollectionLoadBatcher):**
89141
```
90-
Client → TaskManager → BatchManager → WorkLoadGenerate (per batch)
91-
92-
60 workers load 60 collections concurrently
93-
94-
Next batch starts after completion
142+
Client → CollectionLoadBatcher → (Batch 1: 50 collections)
143+
→ WorkLoadGenerate per collection
144+
→ Progress Tracking
145+
→ (Batch 2: 50 collections) after completion
95146
```
96-
Suitable for: Very large collections (1000+) with controlled resource usage.
147+
Suitable for: Very large number of collections (1000+) where Python sequential calls would cause worker starvation. Uses SharedClusterManager internally for connection optimization.
97148

98149
### Key Performance Metrics to Monitor
150+
* **SharedClusterManager Metrics**:
151+
- Cluster reference count and reuse rate
152+
- KV connection utilization vs capacity (default: 500)
153+
- Environment shutdown/recreation events
154+
- Per-server cluster instance count
155+
* **CollectionLoadBatcher Metrics**:
156+
- Active batch count and batch progress percentage
157+
- Collections loaded per batch vs batch size (default: 50)
158+
- Batch completion rate and queue depth
159+
- Batch processor thread pool utilization
99160
* **Connection Pool Utilization**: Monitor KV connection count vs capacity
100161
* **Client Pool Efficiency**: Track client reuse rate vs new client creation
101162
* **Thread Wait Time**: Measure worker idle time waiting for tasks vs clients
102163
* **Task Queue Depth**: Monitor pending tasks in TaskManager
103164
* **Collection Throughput**: Track collections loaded per time unit
104165
* **Document Success Rate**: Monitor failedMutations and retry patterns
166+
167+
### Hard Constraints Integration
168+
* **SharedClusterManager**: Must use `SharedClusterManager.getCluster(server)` and `releaseCluster(server)` for all multi-collection operations. Never create standalone Cluster instances for large-scale workloads.
169+
* **Environment Lifecycle**: Must follow proper ClusterEnvironment lifecycle - use shared environment with automatic recreation capability, never manually manage environment shutdown/reactivation.
170+
* **Batch Processing Threshold**: For workloads with >100 collections, use `CollectionLoadBatcher.submitToBatch()` instead of direct REST calls to prevent worker starvation.
171+
* **Thread Safety**: SharedClusterManager uses synchronized methods and volatile shutdown flag - ensure thread-safe access patterns when dealing with reference counting and environment state.
172+
* **Error Handling**: Always handle `AuthenticationFailureException` and cluster connection errors with proper logging and retries in both SharedClusterManager and CollectionLoadBatcher.
173+
174+
### Build Verification
175+
```
176+
mvn clean compile package
177+
```

.agents/profiles/MongoCoder.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,8 @@ graph TD
2525
- Always include error handling for DocumentNotFound and DuplicateKey errors.
2626
- Ensure proper connection pooling and MongoClient management.
2727
* **Tone:** Technical, efficiency-focused, and precise.
28+
29+
### Build Verification
30+
```
31+
mvn clean compile package
32+
```

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ This project uses specialized AI agents to maintain code quality and architectur
1111
### Orchestration Logic
1212
* **If** the user asks for thread, doc_key. document generator related code -> **Handoff to:** `The Architect`.
1313
* **If** the user asks for Couchbase Sirius or REST based loader related code → **Handoff to:** `The CBRestLoader`.
14+
* **If** the user asks for batch processing, shared cluster management, or massive collection load optimization → **Handoff to:** `The CBRestLoader` with focus on `SharedClusterManager` and `CollectionLoadBatcher`.
1415
* **If** the user asks for Couchbase command line loader related code → **Handoff to:** `The CBCmdlineLoader`.
1516
* **If** the user asks for a Mongo related code → **Handoff to:** `The MongoCoder`.
1617

Makefile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
.PHONY: all rest_server
2+
3+
all:
4+
mvn clean compile package
5+
6+
rest_server: all
7+
java -cp ./target/magmadocloader/magmadocloader.jar RestServer.RestApplication --server.port=8080 --server.name="sirius_java_rest_loader"
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package RestServer;
2+
3+
import java.util.ArrayList;
4+
import java.util.HashMap;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.TimeUnit;
11+
12+
import org.apache.log4j.LogManager;
13+
import org.apache.log4j.Logger;
14+
15+
import org.springframework.http.HttpStatus;
16+
import org.springframework.http.ResponseEntity;
17+
18+
/**
19+
* CollectionLoadBatcher implements Java-side batching for massive collection loads.
20+
* When Python calls doc_load() sequentially for many collections, this batches them
21+
* to prevent worker starvation and queue overhead.
22+
*/
23+
public class CollectionLoadBatcher {
24+
static Logger logger = LogManager.getLogger(CollectionLoadBatcher.class);
25+
26+
private static final int BATCH_SIZE = 50; // Process 50 collections concurrently
27+
private static ExecutorService batchExecutor;
28+
private static Map<String, BatchState> batchStates = new ConcurrentHashMap<>();
29+
private static Object batchLock = new Object();
30+
31+
static {
32+
batchExecutor = Executors.newFixedThreadPool(5); // 5 concurrent batch processors
33+
logger.info("CollectionLoadBatcher initialized with batch size: " + BATCH_SIZE);
34+
}
35+
36+
public static class BatchState {
37+
String batchId;
38+
List<String> tasknames = new ArrayList<>();
39+
int totalCollections;
40+
int completedCollections;
41+
long startTime;
42+
43+
public BatchState(String batchId, int totalCollections) {
44+
this.batchId = batchId;
45+
this.totalCollections = totalCollections;
46+
this.completedCollections = 0;
47+
this.startTime = System.currentTimeMillis();
48+
}
49+
50+
public synchronized void addTask(String taskname) {
51+
tasknames.add(taskname);
52+
completedCollections++;
53+
}
54+
55+
public synchronized boolean isComplete() {
56+
return completedCollections >= totalCollections;
57+
}
58+
59+
public synchronized double getProgress() {
60+
return (double)completedCollections / totalCollections;
61+
}
62+
}
63+
64+
/**
65+
* Submit a collection load request to the batch processor
66+
*/
67+
public static ResponseEntity<Map<String, Object>> submitToBatch(Map<String, Object> requestBody) {
68+
try {
69+
TaskRequest taskRequest = TaskRequest.fromJson(requestBody.toString());
70+
71+
// Get current batch or create new one
72+
String batchId = getCurrentBatchId();
73+
BatchState batchState = batchStates.computeIfAbsent(batchId, k ->
74+
new BatchState(batchId, BATCH_SIZE));
75+
76+
// Process the doc_load normally
77+
ResponseEntity<Map<String, Object>> result = taskRequest.doc_load();
78+
79+
// Add to batch
80+
batchState.addTask(result.getBody().get("tasks").toString());
81+
82+
// Check if batch is complete and start next batch
83+
if (batchState.isComplete()) {
84+
logger.info("Batch " + batchId + " complete (" + batchState.totalCollections + " collections)");
85+
batchStates.remove(batchId);
86+
87+
// Start processing next batch if there are pending loads
88+
startNextBatch();
89+
}
90+
91+
return result;
92+
93+
} catch (Exception e) {
94+
Map<String, Object> body = new HashMap<>();
95+
body.put("error", "Batch processing failed: " + e.getMessage());
96+
body.put("status", false);
97+
return new ResponseEntity<>(body, HttpStatus.INTERNAL_SERVER_ERROR);
98+
}
99+
}
100+
101+
private static synchronized String getCurrentBatchId() {
102+
// Find current batch with capacity
103+
for (Map.Entry<String, BatchState> entry : batchStates.entrySet()) {
104+
if (!entry.getValue().isComplete()) {
105+
return entry.getKey();
106+
}
107+
}
108+
109+
// Create new batch ID
110+
return "batch_" + System.currentTimeMillis();
111+
}
112+
113+
private static void startNextBatch() {
114+
// Could implement proactive batch starting if needed
115+
logger.debug("Ready for next batch of collection loads");
116+
}
117+
118+
public static void shutdown() {
119+
if (batchExecutor != null) {
120+
batchExecutor.shutdownNow();
121+
logger.info("CollectionLoadBatcher shutdown complete");
122+
}
123+
}
124+
125+
public static Map<String, Object> getStats() {
126+
Map<String, Object> stats = new HashMap<>();
127+
stats.put("active_batches", batchStates.size());
128+
stats.put("total_capacity", BATCH_SIZE);
129+
130+
List<String> batchProgress = new ArrayList<>();
131+
for (Map.Entry<String, BatchState> entry : batchStates.entrySet()) {
132+
BatchState state = entry.getValue();
133+
batchProgress.add(String.format("%s: %.1f%% (%d/%d)",
134+
entry.getKey(),
135+
state.getProgress() * 100,
136+
state.completedCollections,
137+
state.totalCollections));
138+
}
139+
stats.put("batch_progress", batchProgress);
140+
141+
return stats;
142+
}
143+
}

0 commit comments

Comments
 (0)