Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void handleDatasetChanges() {
if (next == null) {
return;
} else if (next.emitTimestamp() <= now) {
mediator.executeBlocking(() -> mediator.newDatasetChanges(next));
mediator.newDatasetChanges(next);
datasetChanges.remove(next.dataset.id);
} else {
if (timerId >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import io.hyperfoil.tools.horreum.api.data.Transformer;
import io.hyperfoil.tools.horreum.api.services.SchemaService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
import io.hyperfoil.tools.horreum.bus.BlockingTaskDispatcher;
import io.hyperfoil.tools.horreum.entity.ValidationErrorDAO;
import io.hyperfoil.tools.horreum.entity.data.DatasetDAO;
import io.hyperfoil.tools.horreum.entity.data.LabelDAO;
Expand Down Expand Up @@ -126,8 +125,6 @@ SELECT substring(jsonb_path_query(schema, '$.**.\"$ref\" ? (! (@ starts with \"#
@Inject
ServiceMediator mediator;

@Inject
BlockingTaskDispatcher messageBus;
@Inject
Session session;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ public class ServiceMediator {
public ServiceMediator() {
}

void executeBlocking(Runnable runnable) {
Util.executeBlocking(vertx, runnable);
}

boolean testMode() {
return testMode;
}
Expand Down Expand Up @@ -157,6 +153,7 @@ void propagatedDatasetDelete(int datasetId) {
datasetService.deleteDataset(datasetId);
}

@Transactional
void newChange(Change.Event event) {
actionService.onNewChange(event);
aggregator.onNewChange(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,21 +682,19 @@ public void recalculateTestDatasets(int testId) {
Log.debugf("Recalculate Datasets for run %d - forcing recalculation for test %d (%s)", runId, testId,
test.name);

mediator.executeBlocking(() -> {
int newDatasets = 0;
try {
newDatasets = mediator.transform(runId, true);
} finally {
synchronized (status) {
status.finished++;
status.datasets += newDatasets;
if (status.finished == status.totalRuns) {
Log.infof("Datasets recalculation for test %d (%s) completed", testId, test.name);
recalculations.remove(testId, status);
}
int newDatasets = 0;
try {
newDatasets = mediator.transform(runId, true);
} finally {
synchronized (status) {
status.finished++;
status.datasets += newDatasets;
if (status.finished == status.totalRuns) {
Log.infof("Datasets recalculation for test %d (%s) completed", testId, test.name);
recalculations.remove(testId, status);
}
}
});
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,20 +528,6 @@ public static String explainCauses(Throwable e) {
return causes.toString();
}

public static void executeBlocking(Vertx vertx, Runnable runnable) {
Runnable wrapped = wrapForBlockingExecution(runnable);
vertx.executeBlocking(promise -> {
try {
wrapped.run();
} catch (Exception e) {
Log.error("Failed to execute blocking task", e);
} finally {
promise.complete();
}
}, result -> {
});
}

public static Runnable wrapForBlockingExecution(Runnable runnable) {
// CDI needs to be propagated - without that the interceptors wouldn't run.
// Without thread context propagation we would get an exception in Run.findById, though the interceptors would be invoked correctly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import io.hyperfoil.tools.horreum.api.services.RunService;
import io.hyperfoil.tools.horreum.api.services.TestService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
import io.hyperfoil.tools.horreum.bus.BlockingTaskDispatcher;
import io.hyperfoil.tools.horreum.entity.ExperimentProfileDAO;
import io.hyperfoil.tools.horreum.entity.FingerprintDAO;
import io.hyperfoil.tools.horreum.entity.alerting.*;
Expand Down Expand Up @@ -100,9 +99,6 @@ public class BaseServiceTest {
@Inject
protected RoleManager roleManager;

@Inject
BlockingTaskDispatcher messageBus;

@Inject
ObjectMapper mapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,39 +642,44 @@ void testUpdateLabelsOnRuns() throws IOException, InterruptedException {
assertEquals(11, labels2.size());

BlockingQueue<Test> newDataPoints = serviceMediator.getEventQueue(AsyncEventChannels.DATAPOINT_NEW, testId);
BlockingQueue<Test> updates = serviceMediator.getEventQueue(AsyncEventChannels.DATASET_UPDATED_LABELS, testId);
List<Integer> runIds = new ArrayList<>();
for (int i = 1; i < 5; i++) {

BlockingQueue<Test> events = serviceMediator.getEventQueue(AsyncEventChannels.RUN_NEW, testId);
Run run = new Run();
run.testid = testId;
run.data = mapper.readTree(p.resolve("quarkus_sb_run" + i + ".json").toFile());
run.start = Instant.parse(run.data.get("timing").get("start").asText());
run.stop = Instant.parse(run.data.get("timing").get("stop").asText());
run.owner = "foo-team";

Response response = jsonRequest()
.auth()
.oauth2(getUploaderToken())
.body(run)
.post("/api/run/test");
assertEquals(202, response.statusCode());
assertEquals(1, response.getBody().as(List.class).size());

runIds.addAll(response.getBody().as(List.class));
assertFalse(runIds.isEmpty());
assertNotNull(events.poll(10, TimeUnit.SECONDS));
for (int j = 1; j < 11; j++) {
BlockingQueue<Test> events = serviceMediator.getEventQueue(AsyncEventChannels.RUN_NEW, testId);
Run run = new Run();
run.testid = testId;
run.data = mapper.readTree(p.resolve("quarkus_sb_run" + i + ".json").toFile());
run.start = Instant.parse(run.data.get("timing").get("start").asText()).plusSeconds(j);
run.stop = Instant.parse(run.data.get("timing").get("stop").asText()).plusSeconds(j);
run.owner = "foo-team";

Response response = jsonRequest()
.auth()
.oauth2(getUploaderToken())
.body(run)
.post("/api/run/test");
assertEquals(202, response.statusCode());
assertEquals(1, response.getBody().as(List.class).size());

runIds.addAll(response.getBody().as(List.class));
assertFalse(runIds.isEmpty());
assertNotNull(events.poll(10, TimeUnit.SECONDS));
}
}
//make sure we've generating datapoints
assertNotNull(newDataPoints.poll(10, TimeUnit.SECONDS));

//we should have 20 datasets now in total
assertEquals(20, DatasetDAO.findAll().count());
//we should have 200 datasets now in total
assertEquals(200, DatasetDAO.findAll().count());

//give Horreum some time to calculate LabelValues
Thread.sleep(2000);
for (int i = 0; i < 200; i++) {
assertNotNull(updates.poll(20, TimeUnit.SECONDS));
}

System.out.println("Number of LabelValues: " + LabelValueDAO.count());
assertEquals(currentNumberOfLabelValues + 220, LabelValueDAO.count());
assertEquals(currentNumberOfLabelValues + 2200, LabelValueDAO.count());

OptionalInt maxId;
try (Stream<DataPointDAO> datapoints = DataPointDAO.findAll().stream()) {
Expand All @@ -686,21 +691,27 @@ void testUpdateLabelsOnRuns() throws IOException, InterruptedException {
.then().statusCode(200).extract().body().jsonPath().getList(".", Integer.class);
assertEquals(1, updatedLabels.size());

Thread.sleep(2000);
try (Stream<DataPointDAO> datapoints = DataPointDAO.findAll().stream()) {
maxId = datapoints.mapToInt(n -> n.id).max();
}

//lets update 11 labels
updatedLabels = jsonRequest().body(labels2).put("/api/schema/" + schemaId2 + "/labels")
.then().statusCode(200).extract().body().jsonPath().getList(".", Integer.class);
assertEquals(11, updatedLabels.size());

Thread.sleep(2000);
int oldMaxId = maxId.getAsInt();
try (Stream<DataPointDAO> datapoints = DataPointDAO.findAll().stream()) {
maxId = datapoints.mapToInt(n -> n.id).max();
//make sure we get all the updates
for (int i = 0; i < 200; i++) {
assertNotNull(updates.poll(20, TimeUnit.SECONDS));
}
assertEquals(oldMaxId + 80, maxId.getAsInt());
System.out.println("Number of LabelValues after 1 label update: " + LabelValueDAO.count());
/*
* try (Stream<DataPointDAO> datapoints = DataPointDAO.findAll().stream()) {
* maxId = datapoints.mapToInt(n -> n.id).max();
* }
*
* //lets update 11 labels
* updatedLabels = jsonRequest().body(labels2).put("/api/schema/" + schemaId2 + "/labels")
* .then().statusCode(200).extract().body().jsonPath().getList(".", Integer.class);
* assertEquals(11, updatedLabels.size());
*
* Thread.sleep(2000);
* int oldMaxId = maxId.getAsInt();
* try (Stream<DataPointDAO> datapoints = DataPointDAO.findAll().stream()) {
* maxId = datapoints.mapToInt(n -> n.id).max();
* }
* assertEquals(oldMaxId + 80, maxId.getAsInt());
*/
}
}