Skip to content

Commit 2e3387e

Browse files
committed
fix(TaskRequest): retain completed tasks to prevent false stop_task failures
Introduce a `completed_tasks` map alongside `loader_tasks` so that tasks are never silently dropped after completion. Previously, calling `get_task_result()` removed the task from `loader_tasks`, causing a subsequent `stop_task()` call to return `{"status": false, "error": "Task X does not exists"}` — logged as CRITICAL in TAF even when the load succeeded. Changes: - `get_task_result()`: moves task from loader_tasks → completed_tasks instead of discarding; also checks completed_tasks via getOrDefault so it works when stop_task was called first - `stop_task()`: moves task from loader_tasks → completed_tasks after stop_load(); falls back to completed_tasks lookup to return true when task was already consumed by get_task_result() - `reset_task_manager()`: clears completed_tasks alongside loader_tasks to prevent stale task-name collisions across test runs
1 parent 5ca14fe commit 2e3387e

3 files changed

Lines changed: 62 additions & 1 deletion

File tree

.agents/profiles/CBRestLoader.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,58 @@ sequenceDiagram
9494

9595
C->>TM: /get_task_result
9696

97+
### Task Lifecycle & Cleanup Flow
98+
99+
`loader_tasks` and `completed_tasks` are the two `ConcurrentHashMap`s in `TaskRequest` that track task state.
100+
Tasks are added to `loader_tasks` on `doc_load`, then transition based on which completion path is taken.
101+
102+
**Path A — `get_task_result()` called before `stop_task()`**
103+
```
104+
doc_load ──► loader_tasks.put(taskName, task)
105+
106+
submit_task ──► taskManager.submit(task) ──► task runs ──► "Task_X is completed!" (log)
107+
108+
get_task_result()
109+
├─► taskManager.getTaskResult(task) (waits for future)
110+
├─► loader_tasks.remove(taskName)
111+
└─► completed_tasks.put(taskName, task)
112+
113+
stop_task()
114+
├─► loader_tasks.get() → null
115+
├─► completed_tasks.contains → true
116+
└─► status: true ✓
117+
```
118+
119+
**Path B — `stop_task()` called before `get_task_result()`**
120+
```
121+
doc_load ──► loader_tasks.put(taskName, task)
122+
123+
submit_task ──► taskManager.submit(task) ──► task runs
124+
125+
stop_task()
126+
├─► task.stop_load() (sets flag, run loop exits)
127+
├─► loader_tasks.remove(taskName)
128+
└─► completed_tasks.put(taskName, task)
129+
130+
get_task_result()
131+
├─► loader_tasks.getOrDefault(taskName, completed_tasks.get(taskName))
132+
│ (finds task in completed_tasks)
133+
├─► taskManager.getTaskResult(task)
134+
└─► failures + status returned ✓
135+
```
136+
137+
**Reset**
138+
```
139+
reset_task_manager()
140+
├─► loader_tasks = new ConcurrentHashMap()
141+
└─► completed_tasks = new ConcurrentHashMap() (prevents stale name collisions across test runs)
142+
```
143+
144+
**Why `completed_tasks` exists:**
145+
Without it, `stop_task` called after `get_task_result` (or vice versa) hits a missing key in `loader_tasks`
146+
and returns `{"status": false, "error": "Task X does not exists"}` — logged as CRITICAL in TAF
147+
even though the task completed successfully.
148+
97149
### Performance Optimization Guidelines
98150
* **Multi-Collection Strategy**: Prefer bucket-level clients with dynamic collection switching over per-collection client instances. Workers should call `selectCollection()` dynamically per operation instead of creating dedicated clients per collection.
99151
* **Shared Cluster Management**: Use `SharedClusterManager` for all multi-collection workloads. It provides:

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
READ @AGENTS.md

src/main/java/RestServer/TaskRequest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class TaskRequest {
4646
static ArrayList<Server> known_servers = new ArrayList<Server>();
4747
static Object lock_obj = new Object();
4848
static private ConcurrentHashMap<String, WorkLoadGenerate> loader_tasks = new ConcurrentHashMap<String, WorkLoadGenerate>();
49+
static private ConcurrentHashMap<String, WorkLoadGenerate> completed_tasks = new ConcurrentHashMap<String, WorkLoadGenerate>();
4950
static private ConcurrentHashMap<String, mongo.loadgen.WorkLoadGenerate> mongo_loader_tasks = new ConcurrentHashMap<String, mongo.loadgen.WorkLoadGenerate>();
5051

5152
// Consumed by init_task_manager()
@@ -495,6 +496,7 @@ public ResponseEntity<Map<String, Object>> reset_task_manager() {
495496
this.shutdown_taskmanager();
496497
this.init_taskmanager();
497498
TaskRequest.loader_tasks = new ConcurrentHashMap<String, WorkLoadGenerate>();
499+
TaskRequest.completed_tasks = new ConcurrentHashMap<String, WorkLoadGenerate>();
498500
body.put("status", true);
499501
return new ResponseEntity<>(body, HttpStatus.OK);
500502
}
@@ -563,7 +565,8 @@ public ResponseEntity<Map<String, Object>> get_task_result_mongo() {
563565

564566
public ResponseEntity<Map<String, Object>> get_task_result() {
565567
Map<String, Object> body = new HashMap<>();
566-
WorkLoadGenerate task = TaskRequest.loader_tasks.get(this.taskName);
568+
WorkLoadGenerate task = TaskRequest.loader_tasks.getOrDefault(this.taskName,
569+
TaskRequest.completed_tasks.get(this.taskName));
567570
if (task != null) {
568571
Map<String, Object> failures = new HashMap<>();
569572
boolean okay = false;
@@ -573,6 +576,7 @@ public ResponseEntity<Map<String, Object>> get_task_result() {
573576
body.put("error", "Exception during getTaskResult: " + e.toString());
574577
}
575578
TaskRequest.loader_tasks.remove(this.taskName);
579+
TaskRequest.completed_tasks.put(this.taskName, task);
576580
for (HashMap.Entry<String, List<Result>> optype : task.failedMutations.entrySet()) {
577581
optype.getValue().forEach(
578582
(failed_result) -> {
@@ -602,6 +606,10 @@ public ResponseEntity<Map<String, Object>> stop_task() {
602606
WorkLoadGenerate task = TaskRequest.loader_tasks.get(this.taskName);
603607
if (task != null) {
604608
task.stop_load();
609+
TaskRequest.loader_tasks.remove(this.taskName);
610+
TaskRequest.completed_tasks.put(this.taskName, task);
611+
body.put("status", true);
612+
} else if (TaskRequest.completed_tasks.containsKey(this.taskName)) {
605613
body.put("status", true);
606614
} else {
607615
body.put("error", "Task " + this.taskName + " does not exists");

0 commit comments

Comments
 (0)