Skip to content

Commit 424e93e

Browse files
stalepwillr3
authored andcommitted
Transient Work with CompletableFuture and crash recovery (PR #118)
Work is now a plain POJO — no longer an @entity or PanacheEntity. Work items exist only in memory, eliminating orphaned work rows and removing the work/work_values/work_active_nodes/work_source_nodes tables entirely. CompletableFuture upload tracking: - upload() returns CompletableFuture<Void> that completes when all work (including cascaded children) finishes - UploadTracker with AtomicInteger tracks pending work count per upload - Tracker association derived from Work.sourceValues (no duplicated state), supporting future cross-upload Work automatically - createTracked() accepts Set<Long> rootValueIds for multi-source - Double-decrement protection on retry path - UploadTracker.fail() poisons counter to prevent subsequent complete() Crash recovery: - UploadProcessingEntity tracks each upload (rootValueId, folderName) - On startup, incomplete uploads are re-triggered using all source nodes (not just top-level) to handle mid-cascade crashes - @priority ordering guarantees WorkService initializes before recovery - Value deduplication in execute() safely skips already-computed values Source value data kept in memory through work queue: - Avoids DB round-trip JSON deserialization (was #1 CPU hotspot) - Hibernate.isPropertyInitialized() for conditional lazy loading fallback FolderServiceTest (5 tests): - Recovery reprocesses incomplete upload - Recovery skips missing root value / missing folder - Recovery processes multiple independent nodes - Recovery completes when all values already computed Also: LoadLegacyRuns uses CompletableFuture.allOf() for batched uploads, FolderResource upload description fixed, unused code removed.
1 parent a914e87 commit 424e93e

16 files changed

Lines changed: 667 additions & 218 deletions

src/main/java/io/hyperfoil/tools/h5m/api/svc/FolderServiceInterface.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.nio.file.Path;
1010
import java.util.List;
1111
import java.util.Map;
12+
import java.util.concurrent.CompletableFuture;
1213

1314
/**
1415
* Service interface for managing Folders.
@@ -55,12 +56,15 @@ public interface FolderServiceInterface {
5556

5657
/**
5758
* Uploads data to a specific path within a folder.
59+
* Returns a CompletableFuture that completes when all processing
60+
* (including cascaded work) finishes for this upload.
5861
*
5962
* @param name The name of the folder.
6063
* @param path The path within the folder.
6164
* @param data The JSON data to upload.
65+
* @return A future that completes when all work for this upload is done.
6266
*/
63-
void upload(String name, String path, JsonNode data);
67+
CompletableFuture<Void> upload(String name, String path, JsonNode data);
6468

6569
/**
6670
* Recalculates the contents or state of a folder by its name.

src/main/java/io/hyperfoil/tools/h5m/cli/LoadLegacyRuns.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@
1515
import java.sql.PreparedStatement;
1616
import java.sql.ResultSet;
1717
import java.sql.Statement;
18+
import java.util.ArrayList;
1819
import java.util.HashMap;
20+
import java.util.List;
1921
import java.util.Map;
2022
import java.util.Scanner;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.TimeUnit;
2125
import java.util.concurrent.Callable;
2226

2327
@CommandLine.Command(name="load-legacy-runs")
@@ -112,27 +116,36 @@ public Integer call() throws Exception {
112116
int count = 0;
113117
int batchCount = 0;
114118
Scanner scanner = new Scanner(System.in);
119+
List<CompletableFuture<Void>> batchFutures = new ArrayList<>();
115120
try (ResultSet rs = ps.executeQuery()) {
116121
while(rs.next()){
117122
Long id = rs.getLong(1);
118123
System.out.println(name+" "+id);
119124
JsonNode data = mapper.readTree(rs.getCharacterStream("data"));
120-
folderService.upload(folder.name(),null,data);
125+
batchFutures.add(folderService.upload(folder.name(),null,data));
121126
count++;
122127
batchCount++;
123-
if(batch > 0 && batchCount > batch){
124-
System.out.println("waiting for batch to complete");
125-
while(!workService.isIdle()){
126-
Thread.sleep(10_000);
127-
}
128+
if(batch > 0 && batchCount >= batch){
129+
System.out.println("waiting for batch of " + batchCount + " to complete");
130+
CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0]))
131+
.orTimeout(10, TimeUnit.MINUTES)
132+
.join();
128133
System.out.println("batch complete");
134+
batchFutures.clear();
129135
if(pause){
130136
scanner.nextLine();
131137
}
132138
batchCount = 0;
133139
}
134140
}
135141
}
142+
// Wait for any remaining uploads
143+
if (!batchFutures.isEmpty()) {
144+
System.out.println("waiting for final " + batchFutures.size() + " uploads to complete");
145+
CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0]))
146+
.orTimeout(10, TimeUnit.MINUTES)
147+
.join();
148+
}
136149
System.out.println("loaded " + count + " runs");
137150
} finally {
138151
connection.setAutoCommit(true);
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.hyperfoil.tools.h5m.entity;
2+
3+
import io.quarkus.hibernate.orm.panache.PanacheEntity;
4+
import jakarta.persistence.*;
5+
import org.hibernate.annotations.CreationTimestamp;
6+
7+
import java.time.LocalDateTime;
8+
9+
/**
10+
* Tracks whether all work for a given upload has been processed.
11+
* Used for crash recovery: on startup, incomplete uploads are re-triggered.
12+
*/
13+
@Entity(name = "upload_processing")
14+
public class UploadProcessingEntity extends PanacheEntity {
15+
16+
@Column(name = "root_value_id", nullable = false, updatable = false)
17+
public long rootValueId;
18+
19+
@Column(name = "folder_name", nullable = false, updatable = false)
20+
public String folderName;
21+
22+
@Column(nullable = false)
23+
public boolean completed = false;
24+
25+
@CreationTimestamp
26+
@Column(updatable = false)
27+
public LocalDateTime createdAt;
28+
29+
public UploadProcessingEntity() {}
30+
31+
public UploadProcessingEntity(long rootValueId, String folderName) {
32+
this.rootValueId = rootValueId;
33+
this.folderName = folderName;
34+
}
35+
}

src/main/java/io/hyperfoil/tools/h5m/entity/work/Work.java

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,25 @@
44
import io.hyperfoil.tools.h5m.entity.ValueEntity;
55
import io.hyperfoil.tools.h5m.entity.node.RelativeDifference;
66
import io.hyperfoil.tools.h5m.svc.WorkService;
7-
import io.quarkus.hibernate.orm.panache.PanacheEntity;
87
import jakarta.enterprise.inject.spi.CDI;
9-
import jakarta.persistence.*;
10-
import org.hibernate.annotations.BatchSize;
118

129
import java.util.*;
1310
import java.util.stream.Collectors;
1411

1512

16-
@Entity(name = "work")
17-
@DiscriminatorColumn(name = "type")
18-
@DiscriminatorValue("node")
1913
//cross test comparison could use sourceNodes and not have an activeNode?
2014
//custom post nodegroup actions could have sourceNodes without activeNode
21-
public class Work extends PanacheEntity implements Runnable, Comparable<Work>{
22-
23-
@BatchSize(size=10)
24-
@ManyToMany(cascade = {CascadeType.PERSIST}, fetch = FetchType.LAZY)
25-
@JoinTable(
26-
name="work_values",
27-
joinColumns = @JoinColumn(name = "work_id"),
28-
inverseJoinColumns = @JoinColumn(name = "value_id")
29-
)
15+
public class Work implements Runnable, Comparable<Work>{
16+
3017
public List<ValueEntity> sourceValues;//multiple values could happen for cross test comparisons and
3118

32-
@BatchSize(size=10)
33-
@ManyToMany(cascade = {CascadeType.PERSIST}, fetch = FetchType.LAZY)
34-
@JoinTable(
35-
name="work_source_nodes",
36-
joinColumns = @JoinColumn(name = "work_id"),
37-
inverseJoinColumns = @JoinColumn(name = "node_id")
38-
)
3919
public List<NodeEntity> sourceNodes; //what is going to use a list of sources that are not already listed for the activeNode?
4020

4121
public int retryCount;
4222

43-
@BatchSize(size=10)
44-
@ManyToMany(cascade = {CascadeType.PERSIST}, fetch = FetchType.EAGER)
45-
@JoinTable(
46-
name="work_active_nodes",
47-
joinColumns = @JoinColumn(name = "work_id"),
48-
inverseJoinColumns = @JoinColumn(name = "node_id")
49-
)
5023
public Set<NodeEntity> activeNodes;
5124

52-
boolean cumulative = false;
53-
25+
public boolean cumulative = false;
5426

5527
public Work(){
5628
retryCount = 0;
@@ -187,7 +159,7 @@ public int compareTo(Work o) {
187159

188160
@Override
189161
public String toString() {
190-
return "Work<id="+id+" activeNodes="+activeNodes+
162+
return "Work<activeNodes="+activeNodes+
191163
" sourceNodes="+sourceNodes.stream().map(n->""+n.getId()).collect(Collectors.joining(","))+
192164
" sourceValues="+sourceValues.stream().map(v->""+v.getId()).collect(Collectors.joining(","))+
193165
" retry="+retryCount+
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.hyperfoil.tools.h5m.queue;
2+
3+
import io.quarkus.logging.Log;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
8+
/**
9+
* Tracks completion of all work items triggered by a single upload.
10+
* The counter is incremented when work items are created (including cascade)
11+
* and decremented when each work item finishes. The future completes
12+
* when all work reaches zero, or completes exceptionally on failure.
13+
*/
14+
public class UploadTracker {
15+
16+
private final long rootValueId;
17+
private final AtomicInteger pendingCount = new AtomicInteger(0);
18+
private final CompletableFuture<Void> future = new CompletableFuture<>();
19+
20+
public UploadTracker(long rootValueId) {
21+
this.rootValueId = rootValueId;
22+
}
23+
24+
public CompletableFuture<Void> getFuture() {
25+
return future;
26+
}
27+
28+
public void increment(int count) {
29+
pendingCount.addAndGet(count);
30+
}
31+
32+
public void decrement() {
33+
int remaining = pendingCount.decrementAndGet();
34+
Log.debugf("UploadTracker[%d]: decrement → %d remaining", rootValueId, remaining);
35+
if (remaining == 0) {
36+
future.complete(null);
37+
} else if (remaining < 0) {
38+
Log.warnf("UploadTracker[%d]: over-decremented to %d — possible accounting bug", rootValueId, remaining);
39+
}
40+
}
41+
42+
public void fail(Throwable t) {
43+
Log.errorf(t, "UploadTracker[%d]: work failed", rootValueId);
44+
// Set pending to a negative sentinel so subsequent decrements
45+
// cannot trigger future.complete(null) — the failure wins
46+
pendingCount.set(Integer.MIN_VALUE);
47+
future.completeExceptionally(t);
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return "UploadTracker[rootValueId=" + rootValueId + ", pending=" + pendingCount.get() + "]";
53+
}
54+
}

src/main/java/io/hyperfoil/tools/h5m/queue/WorkQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ public Collection<Work> addWorks(Collection<Work> works){
188188
List<Work> acceptedWork = works.stream().filter(w -> {
189189
boolean has = hasWork(w);
190190
if (has) {
191-
log.warn("addWorks: REJECTED duplicate work id={} hash={} pending={} active={}",
192-
w.id, w.hashCode(), isPending(w), isActive(w));
191+
log.warn("addWorks: REJECTED duplicate work hash={} pending={} active={}",
192+
w.hashCode(), isPending(w), isActive(w));
193193
}
194194
return !has;
195195
}).peek(w-> {

src/main/java/io/hyperfoil/tools/h5m/rest/FolderResource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ public void upload(
8383
@QueryParam("path") @Parameter(description = "Path within the folder") String path,
8484
JsonNode data) {
8585
folderService.upload(name, path, data);
86+
// The upload() now returns a CompletableFuture, but the REST endpoint
87+
// doesn't need to wait for it — the caller can poll for results.
88+
// Future enhancement: return CompletionStage<Void> for async response.
8689
}
8790

8891
@POST

0 commit comments

Comments
 (0)