Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void handleDatasetChanges() {
if (next == null) {
return;
} else if (next.emitTimestamp() <= now) {
mediator.executeBlocking(() -> mediator.newDatasetChanges(next));
mediator.newDatasetChanges(next);
datasetChanges.remove(next.dataset.id);
} else {
if (timerId >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import io.hyperfoil.tools.horreum.api.data.Transformer;
import io.hyperfoil.tools.horreum.api.services.SchemaService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
import io.hyperfoil.tools.horreum.bus.BlockingTaskDispatcher;
import io.hyperfoil.tools.horreum.entity.ValidationErrorDAO;
import io.hyperfoil.tools.horreum.entity.data.DatasetDAO;
import io.hyperfoil.tools.horreum.entity.data.LabelDAO;
Expand Down Expand Up @@ -126,8 +125,6 @@ SELECT substring(jsonb_path_query(schema, '$.**.\"$ref\" ? (! (@ starts with \"#
@Inject
ServiceMediator mediator;

@Inject
BlockingTaskDispatcher messageBus;
@Inject
Session session;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.hyperfoil.tools.horreum.svc;

import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -97,6 +99,10 @@ public class ServiceMediator {
@Channel("dataset-event-out")
Emitter<Dataset.EventNew> dataSetEmitter;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("change-detection-event-out")
Emitter<ChangeDetectionEvent> changeDetectionEmitter;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("run-recalc-out")
Emitter<Integer> runEmitter;
Expand All @@ -117,10 +123,6 @@ public class ServiceMediator {
public ServiceMediator() {
}

void executeBlocking(Runnable runnable) {
Util.executeBlocking(vertx, runnable);
}

boolean testMode() {
return testMode;
}
Expand Down Expand Up @@ -157,6 +159,7 @@ void propagatedDatasetDelete(int datasetId) {
datasetService.deleteDataset(datasetId);
}

@Transactional
void newChange(Change.Event event) {
actionService.onNewChange(event);
aggregator.onNewChange(event);
Expand Down Expand Up @@ -240,6 +243,22 @@ public void queueRunUpload(String start, String stop, String test, String owner,
runUploadEmitter.send(upload);
}

@Incoming("change-detection-event-in")
@Blocking("horreum.change-detection.pool") // the default `ordered = true` ensures messages with the same groupID are executed sequentially
@ActivateRequestContext
public void processChangeDetectionEvent(ChangeDetectionEvent event) {
alertingService.runChangeDetection(
event.testId, event.datasetId, event.variableIds, event.timestamp, event.notification, true, true);
}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
void queueChangeDetectionEvent(ChangeDetectionEvent event) {
OutgoingAmqpMetadata meta = OutgoingAmqpMetadata.builder()
.withGroupId(event.testId.toString()) // serialize messages on a combination of test and variable
.build();
changeDetectionEmitter.send(Message.of(event).addMetadata(meta));
}

void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) {
experimentService.onDatapointsCreated(event);
}
Expand Down Expand Up @@ -349,4 +368,7 @@ public RunUpload(String start, String stop, String test, String owner,
}
}

public record ChangeDetectionEvent(
Integer testId, Integer datasetId, Collection<Integer> variableIds, Instant timestamp, boolean notification) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -707,21 +707,19 @@ public void recalculateTestDatasets(int testId) {
Log.debugf("Recalculate Datasets for run %d - forcing recalculation for test %d (%s)", runId, testId,
test.name);

mediator.executeBlocking(() -> {
int newDatasets = 0;
try {
newDatasets = mediator.transform(runId, true);
} finally {
synchronized (status) {
status.finished++;
status.datasets += newDatasets;
if (status.finished == status.totalRuns) {
Log.infof("Datasets recalculation for test %d (%s) completed", testId, test.name);
recalculations.remove(testId, status);
}
int newDatasets = 0;
try {
newDatasets = mediator.transform(runId, true);
} finally {
synchronized (status) {
status.finished++;
status.datasets += newDatasets;
if (status.finished == status.totalRuns) {
Log.infof("Datasets recalculation for test %d (%s) completed", testId, test.name);
recalculations.remove(testId, status);
}
}
});
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,20 +528,6 @@ public static String explainCauses(Throwable e) {
return causes.toString();
}

public static void executeBlocking(Vertx vertx, Runnable runnable) {
Runnable wrapped = wrapForBlockingExecution(runnable);
vertx.executeBlocking(promise -> {
try {
wrapped.run();
} catch (Exception e) {
Log.error("Failed to execute blocking task", e);
} finally {
promise.complete();
}
}, result -> {
});
}

public static Runnable wrapForBlockingExecution(Runnable runnable) {
// CDI needs to be propagated - without that the interceptors wouldn't run.
// Without thread context propagation we would get an exception in Run.findById, though the interceptors would be invoked correctly.
Expand Down
16 changes: 16 additions & 0 deletions horreum-backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ quarkus.datasource.jdbc.initial-size=3

# thread pool sizes
smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=10
smallrye.messaging.worker.horreum.change-detection.pool.max-concurrency=5
smallrye.messaging.worker.horreum.run.pool.max-concurrency=6
smallrye.messaging.worker.horreum.schema.pool.max-concurrency=5

Expand All @@ -55,6 +56,21 @@ mp.messaging.outgoing.dataset-event-out.container-id=horreum-broker
mp.messaging.outgoing.dataset-event-out.link-name=dataset-event
mp.messaging.outgoing.dataset-event-out.failure-strategy=modified-failed

# change-detection-event incoming
mp.messaging.incoming.change-detection-event-in.connector=smallrye-amqp
mp.messaging.incoming.change-detection-event-in.address=change-detection-event
mp.messaging.incoming.change-detection-event-in.durable=true
mp.messaging.incoming.change-detection-event-in.container-id=horreum-broker
mp.messaging.incoming.change-detection-event-in.link-name=change-detection-event
mp.messaging.incoming.change-detection-event-in.failure-strategy=modified-failed
# change-detection-event outgoing
mp.messaging.outgoing.change-detection-event-out.connector=smallrye-amqp
mp.messaging.outgoing.change-detection-event-out.address=change-detection-event
mp.messaging.outgoing.change-detection-event-out.durable=true
mp.messaging.outgoing.change-detection-event-out.container-id=horreum-broker
mp.messaging.outgoing.change-detection-event-out.link-name=change-detection-event
mp.messaging.outgoing.change-detection-event-out.failure-strategy=modified-failed

# re-calc incoming
mp.messaging.incoming.run-recalc-in.connector=smallrye-amqp
mp.messaging.incoming.run-recalc-in.address=run-recalc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void testEdvisiveModelAnalyze(TestInfo info) throws Exception {
int run10 = uploadRun(ts + 4, ts + 4, runWithValue(10, schema), test.name);
assertValue(datapointQueue, 10);

Change.Event changeEvent1 = changeQueue.poll(10, TimeUnit.SECONDS);
Change.Event changeEvent1 = changeQueue.poll(40, TimeUnit.SECONDS);
assertNotNull(changeEvent1);

testSerialization(changeEvent1, Change.Event.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testChangeDetection(TestInfo info) throws InterruptedException {
int run4 = uploadRun(ts + 3, ts + 3, runWithValue(2, schema), test.name);
assertValue(datapointQueue, 2);

assertNull(changeQueue.poll(50, TimeUnit.MILLISECONDS));
assertNull(changeQueue.poll(500, TimeUnit.MILLISECONDS));

uploadRun(ts + 4, ts + 4, runWithValue(3, schema), test.name);
assertValue(datapointQueue, 3);
Expand Down Expand Up @@ -186,7 +186,7 @@ public void testChangeDetection(TestInfo info) throws InterruptedException {
assertValue(datapointQueue, 1.5);

// mean of previous values is 1.5, now the min is 1.5 => no change
assertNull(changeQueue.poll(50, TimeUnit.MILLISECONDS));
assertNull(changeQueue.poll(500, TimeUnit.MILLISECONDS));

uploadRun(ts + 6, ts + 6, runWithValue(2, schema), test.name);
assertValue(datapointQueue, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import io.hyperfoil.tools.horreum.api.services.RunService;
import io.hyperfoil.tools.horreum.api.services.TestService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
import io.hyperfoil.tools.horreum.bus.BlockingTaskDispatcher;
import io.hyperfoil.tools.horreum.entity.ExperimentProfileDAO;
import io.hyperfoil.tools.horreum.entity.FingerprintDAO;
import io.hyperfoil.tools.horreum.entity.alerting.*;
Expand Down Expand Up @@ -100,9 +99,6 @@ public class BaseServiceTest {
@Inject
protected RoleManager roleManager;

@Inject
BlockingTaskDispatcher messageBus;

@Inject
ObjectMapper mapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.hyperfoil.tools.horreum.mapper.VariableMapper;
import io.hyperfoil.tools.horreum.server.CloseMe;
import io.hyperfoil.tools.horreum.test.*;
import io.quarkus.logging.Log;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
Expand Down Expand Up @@ -642,39 +643,44 @@ void testUpdateLabelsOnRuns() throws IOException, InterruptedException {
assertEquals(11, labels2.size());

BlockingQueue<Test> newDataPoints = serviceMediator.getEventQueue(AsyncEventChannels.DATAPOINT_NEW, testId);
BlockingQueue<Test> updates = serviceMediator.getEventQueue(AsyncEventChannels.DATASET_UPDATED_LABELS, testId);
List<Integer> runIds = new ArrayList<>();
for (int i = 1; i < 5; i++) {

BlockingQueue<Test> events = serviceMediator.getEventQueue(AsyncEventChannels.RUN_NEW, testId);
Run run = new Run();
run.testid = testId;
run.data = mapper.readTree(p.resolve("quarkus_sb_run" + i + ".json").toFile());
run.start = Instant.parse(run.data.get("timing").get("start").asText());
run.stop = Instant.parse(run.data.get("timing").get("stop").asText());
run.owner = "foo-team";

Response response = jsonRequest()
.auth()
.oauth2(getUploaderToken())
.body(run)
.post("/api/run/test");
assertEquals(202, response.statusCode());
assertEquals(1, response.getBody().as(List.class).size());

runIds.addAll(response.getBody().as(List.class));
assertFalse(runIds.isEmpty());
assertNotNull(events.poll(10, TimeUnit.SECONDS));
for (int j = 1; j < 11; j++) {
BlockingQueue<Test> events = serviceMediator.getEventQueue(AsyncEventChannels.RUN_NEW, testId);
Run run = new Run();
run.testid = testId;
run.data = mapper.readTree(p.resolve("quarkus_sb_run" + i + ".json").toFile());
run.start = Instant.parse(run.data.get("timing").get("start").asText()).plusSeconds(j);
run.stop = Instant.parse(run.data.get("timing").get("stop").asText()).plusSeconds(j);
run.owner = "foo-team";

Response response = jsonRequest()
.auth()
.oauth2(getUploaderToken())
.body(run)
.post("/api/run/test");
assertEquals(202, response.statusCode());
assertEquals(1, response.getBody().as(List.class).size());

runIds.addAll(response.getBody().as(List.class));
assertFalse(runIds.isEmpty());
assertNotNull(events.poll(10, TimeUnit.SECONDS));
}
}
//make sure we've generating datapoints
assertNotNull(newDataPoints.poll(10, TimeUnit.SECONDS));

//we should have 20 datasets now in total
assertEquals(20, DatasetDAO.findAll().count());
//we should have 200 datasets now in total
assertEquals(200, DatasetDAO.findAll().count());

//give Horreum some time to calculate LabelValues
Thread.sleep(2000);
System.out.println("Number of LabelValues: " + LabelValueDAO.count());
assertEquals(currentNumberOfLabelValues + 220, LabelValueDAO.count());
for (int i = 0; i < 200; i++) {
assertNotNull(updates.poll(20, TimeUnit.SECONDS));
}

Log.info("Number of LabelValues: " + LabelValueDAO.count());
assertEquals(currentNumberOfLabelValues + 2200, LabelValueDAO.count());

OptionalInt maxId;
try (Stream<DataPointDAO> datapoints = DataPointDAO.findAll().stream()) {
Expand All @@ -686,21 +692,21 @@ void testUpdateLabelsOnRuns() throws IOException, InterruptedException {
.then().statusCode(200).extract().body().jsonPath().getList(".", Integer.class);
assertEquals(1, updatedLabels.size());

Thread.sleep(2000);
try (Stream<DataPointDAO> datapoints = DataPointDAO.findAll().stream()) {
maxId = datapoints.mapToInt(n -> n.id).max();
//make sure we get all the updates
for (int i = 0; i < 200; i++) {
assertNotNull(updates.poll(20, TimeUnit.SECONDS));
}

Log.info("Number of LabelValues after 1 label update: " + LabelValueDAO.count());
maxId = DataPointDAO.<DataPointDAO> streamAll().mapToInt(dp -> dp.id).max();

//lets update 11 labels
updatedLabels = jsonRequest().body(labels2).put("/api/schema/" + schemaId2 + "/labels")
.then().statusCode(200).extract().body().jsonPath().getList(".", Integer.class);
assertEquals(11, updatedLabels.size());

Thread.sleep(2000);
int oldMaxId = maxId.getAsInt();
try (Stream<DataPointDAO> datapoints = DataPointDAO.findAll().stream()) {
maxId = datapoints.mapToInt(n -> n.id).max();
}
assertEquals(oldMaxId + 80, maxId.getAsInt());
Log.info("Waiting after labels update");
Thread.sleep(20 * 1000);
assertEquals(maxId.getAsInt() + 800, DataPointDAO.<DataPointDAO> streamAll().mapToInt(dp -> dp.id).max().getAsInt());
}
}