Skip to content

Commit be21a2b

Browse files
committed
Merge branch '7.8' of github.com:gchq/stroom
2 parents b21716b + fae5422 commit be21a2b

File tree

18 files changed

+402
-127
lines changed

18 files changed

+402
-127
lines changed

Diff for: stroom-analytics/stroom-analytics-impl/src/main/java/stroom/analytics/impl/AbstractScheduledQueryExecutor.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ abstract class AbstractScheduledQueryExecutor<T extends AbstractAnalyticRuleDoc>
6969
private final NodeInfo nodeInfo;
7070
private final SecurityContext securityContext;
7171
private final ExecutionScheduleDao executionScheduleDao;
72-
private final DuplicateCheckDirs duplicateCheckDirs;
7372
private final Provider<DocRefInfoService> docRefInfoServiceProvider;
7473
private final String processType;
7574

@@ -79,7 +78,6 @@ abstract class AbstractScheduledQueryExecutor<T extends AbstractAnalyticRuleDoc>
7978
final NodeInfo nodeInfo,
8079
final SecurityContext securityContext,
8180
final ExecutionScheduleDao executionScheduleDao,
82-
final DuplicateCheckDirs duplicateCheckDirs,
8381
final Provider<DocRefInfoService> docRefInfoServiceProvider,
8482
final String processType) {
8583
this.executorProvider = executorProvider;
@@ -88,7 +86,6 @@ abstract class AbstractScheduledQueryExecutor<T extends AbstractAnalyticRuleDoc>
8886
this.nodeInfo = nodeInfo;
8987
this.securityContext = securityContext;
9088
this.executionScheduleDao = executionScheduleDao;
91-
this.duplicateCheckDirs = duplicateCheckDirs;
9289
this.docRefInfoServiceProvider = docRefInfoServiceProvider;
9390
this.processType = processType;
9491
}
@@ -99,9 +96,6 @@ public void exec() {
9996
try {
10097
info(() -> "Starting scheduled " + processType + " processing");
10198

102-
// Start by finding a set of UUIDs for existing rule checking stores.
103-
final List<String> duplicateStoreDirs = duplicateCheckDirs.getAnalyticRuleUUIDList();
104-
10599
// Load rules.
106100
final List<T> docs = loadScheduledRules();
107101

@@ -122,8 +116,7 @@ public void exec() {
122116
// Join.
123117
workQueue.join();
124118

125-
// Delete unused duplicate stores.
126-
duplicateCheckDirs.deleteUnused(duplicateStoreDirs, docs);
119+
postExecuteTidyUp(docs);
127120

128121
info(() ->
129122
LogUtil.message("Finished scheduled {} processing in {}", processType, logExecutionTime));
@@ -137,6 +130,13 @@ public void exec() {
137130
}
138131
}
139132

133+
/**
134+
* Called at the end of execution to perform any tidy up operations.
135+
*
136+
* @param analyticDocs The list of all known analyticDocs for this executor.
137+
*/
138+
abstract void postExecuteTidyUp(final List<T> analyticDocs);
139+
140140
private Runnable createRunnable(final T doc,
141141
final TaskContext parentTaskContext) {
142142
return () -> {

Diff for: stroom-analytics/stroom-analytics-impl/src/main/java/stroom/analytics/impl/DuplicateCheckDirs.java

+56-15
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
package stroom.analytics.impl;
22

3-
import stroom.analytics.shared.AbstractAnalyticRuleDoc;
3+
import stroom.analytics.shared.AnalyticRuleDoc;
44
import stroom.lmdb2.LmdbEnvDir;
55
import stroom.lmdb2.LmdbEnvDirFactory;
66
import stroom.query.common.v2.DuplicateCheckStoreConfig;
7+
import stroom.util.NullSafe;
78
import stroom.util.logging.LambdaLogger;
89
import stroom.util.logging.LambdaLoggerFactory;
10+
import stroom.util.logging.LogUtil;
911

1012
import jakarta.inject.Inject;
1113

1214
import java.io.IOException;
1315
import java.nio.file.Files;
1416
import java.nio.file.Path;
1517
import java.util.ArrayList;
16-
import java.util.HashSet;
1718
import java.util.List;
19+
import java.util.Objects;
20+
import java.util.Optional;
1821
import java.util.Set;
22+
import java.util.stream.Collectors;
1923
import java.util.stream.Stream;
2024

2125
public class DuplicateCheckDirs {
@@ -68,24 +72,61 @@ public List<String> getAnalyticRuleUUIDList() {
6872
return uuidList;
6973
}
7074

71-
public <T extends AbstractAnalyticRuleDoc> void deleteUnused(
72-
final List<String> duplicateStoreDirs,
73-
final List<T> analytics) {
75+
public List<String> deleteUnused(
76+
final List<String> duplicateStoreUuids,
77+
final List<AnalyticRuleDoc> analytics) {
78+
final List<String> deletedUuids = new ArrayList<>();
7479
try {
75-
// Delete unused duplicate stores.
76-
final Set<String> remaining = new HashSet<>(duplicateStoreDirs);
77-
for (final T analyticRuleDoc : analytics) {
78-
remaining.remove(analyticRuleDoc.getUuid());
79-
}
80-
for (final String uuid : remaining) {
81-
try {
82-
getDir(uuid).delete();
83-
} catch (final RuntimeException e) {
84-
LOGGER.error(e::getMessage, e);
80+
LOGGER.debug(() -> LogUtil.message(
81+
"deleteUnused() - duplicateStoreUuids.size: {}, analytics.size: {}",
82+
NullSafe.size(duplicateStoreUuids), NullSafe.size(analytics)));
83+
if (NullSafe.hasItems(duplicateStoreUuids)) {
84+
final List<String> redundantDupStoreUuids;
85+
if (NullSafe.hasItems(analytics)) {
86+
final Set<String> analyticUuids = analytics.stream()
87+
.filter(Objects::nonNull)
88+
.map(AnalyticRuleDoc::getUuid)
89+
.filter(Objects::nonNull)
90+
.collect(Collectors.toSet());
91+
// Find dup stores with no corresponding analytic
92+
redundantDupStoreUuids = duplicateStoreUuids.stream()
93+
.filter(uuid -> !analyticUuids.contains(uuid))
94+
.toList();
95+
} else {
96+
// No analytics so all redundant
97+
redundantDupStoreUuids = duplicateStoreUuids;
98+
}
99+
100+
// Delete unused duplicate stores.
101+
redundantDupStoreUuids.stream()
102+
.map(this::deleteDuplicateStore)
103+
.filter(Optional::isPresent)
104+
.map(Optional::get)
105+
.forEach(deletedUuids::add);
106+
107+
if (!deletedUuids.isEmpty()) {
108+
LOGGER.info("Deleted {} redundant duplicate check stores", deletedUuids.size());
85109
}
86110
}
87111
} catch (final RuntimeException e) {
88112
LOGGER.error(e::getMessage, e);
89113
}
114+
// Return this to ease testing
115+
return deletedUuids;
116+
}
117+
118+
private Optional<String> deleteDuplicateStore(final String uuid) {
119+
try {
120+
final LmdbEnvDir lmdbEnvDir = getDir(uuid);
121+
lmdbEnvDir.delete();
122+
LOGGER.info("Deleted redundant duplicate check store with UUID: {}, path: {}",
123+
uuid, LogUtil.path(lmdbEnvDir.getEnvDir()));
124+
return Optional.of(uuid);
125+
} catch (final RuntimeException e) {
126+
LOGGER.error(() -> LogUtil.message(
127+
"Error deleting duplicateStore with UUID {}: {}",
128+
uuid, LogUtil.exceptionMessage(e), e));
129+
return Optional.empty();
130+
}
90131
}
91132
}

Diff for: stroom-analytics/stroom-analytics-impl/src/main/java/stroom/analytics/impl/ReportExecutor.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ public ReportExecutor(final ExecutorProvider executorProvider,
118118
final NodeInfo nodeInfo,
119119
final SecurityContext securityContext,
120120
final ExecutionScheduleDao executionScheduleDao,
121-
final DuplicateCheckDirs duplicateCheckDirs,
122121
final Provider<DocRefInfoService> docRefInfoServiceProvider,
123122
final ReportStore reportStore,
124123
final ResultStoreManager searchResponseCreatorManager,
@@ -138,7 +137,6 @@ public ReportExecutor(final ExecutorProvider executorProvider,
138137
nodeInfo,
139138
securityContext,
140139
executionScheduleDao,
141-
duplicateCheckDirs,
142140
docRefInfoServiceProvider,
143141
"report");
144142
this.reportStore = reportStore;
@@ -297,6 +295,11 @@ boolean process(final ReportDoc reportDoc,
297295
return success;
298296
}
299297

298+
@Override
299+
void postExecuteTidyUp(final List<ReportDoc> analyticDocs) {
300+
// Nothing to do
301+
}
302+
300303
private Path createFile(final ReportDoc reportDoc,
301304
final Instant executionTime,
302305
final Instant effectiveExecutionTime,

Diff for: stroom-analytics/stroom-analytics-impl/src/main/java/stroom/analytics/impl/ScheduledQueryAnalyticExecutor.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public class ScheduledQueryAnalyticExecutor extends AbstractScheduledQueryExecut
9292
private final DuplicateCheckFactory duplicateCheckFactory;
9393
private final ExpressionPredicateFactory expressionPredicateFactory;
9494
private final Provider<AnalyticUiDefaultConfig> analyticUiDefaultConfigProvider;
95+
private final DuplicateCheckDirs duplicateCheckDirs;
9596

9697
@Inject
9798
ScheduledQueryAnalyticExecutor(final AnalyticRuleStore analyticRuleStore,
@@ -118,7 +119,6 @@ public class ScheduledQueryAnalyticExecutor extends AbstractScheduledQueryExecut
118119
nodeInfo,
119120
securityContext,
120121
executionScheduleDao,
121-
duplicateCheckDirs,
122122
docRefInfoServiceProvider,
123123
"analytic rule");
124124
this.analyticRuleStore = analyticRuleStore;
@@ -132,6 +132,7 @@ public class ScheduledQueryAnalyticExecutor extends AbstractScheduledQueryExecut
132132
this.duplicateCheckFactory = duplicateCheckFactory;
133133
this.expressionPredicateFactory = expressionPredicateFactory;
134134
this.analyticUiDefaultConfigProvider = analyticUiDefaultConfigProvider;
135+
this.duplicateCheckDirs = duplicateCheckDirs;
135136
}
136137

137138
@Override
@@ -370,6 +371,15 @@ boolean process(final AnalyticRuleDoc analytic,
370371
return success;
371372
}
372373

374+
@Override
375+
void postExecuteTidyUp(final List<AnalyticRuleDoc> analyticDocs) {
376+
// Start by finding a set of UUIDs for existing rule checking stores.
377+
final List<String> duplicateStoreUuids = duplicateCheckDirs.getAnalyticRuleUUIDList();
378+
379+
// Delete unused duplicate stores.
380+
duplicateCheckDirs.deleteUnused(duplicateStoreUuids, analyticDocs);
381+
}
382+
373383
@Override
374384
AnalyticRuleDoc load(final DocRef docRef) {
375385
return analyticRuleStore.readDocument(docRef);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package stroom.analytics.impl;
2+
3+
import stroom.analytics.shared.AnalyticRuleDoc;
4+
import stroom.lmdb2.LmdbEnvDir;
5+
import stroom.lmdb2.LmdbEnvDirFactory;
6+
import stroom.query.common.v2.DuplicateCheckStoreConfig;
7+
8+
import org.junit.jupiter.api.Test;
9+
import org.junit.jupiter.api.extension.ExtendWith;
10+
import org.mockito.Mock;
11+
import org.mockito.Mockito;
12+
import org.mockito.junit.jupiter.MockitoExtension;
13+
14+
import java.util.List;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
18+
@ExtendWith(MockitoExtension.class)
19+
class TestDuplicateCheckDirs {
20+
21+
@Mock
22+
private LmdbEnvDirFactory mockLmdbEnvDirFactory;
23+
@Mock
24+
private LmdbEnvDirFactory.Builder mockLmdbEnvDirFactoryBuilder;
25+
@Mock
26+
private LmdbEnvDir mockLmdbEnvDir;
27+
@Mock
28+
private DuplicateCheckStoreConfig mockDuplicateCheckStoreConfig;
29+
30+
@Test
31+
void testDeleteUnused_some() {
32+
33+
Mockito.when(mockLmdbEnvDirFactory.builder())
34+
.thenReturn(mockLmdbEnvDirFactoryBuilder);
35+
Mockito.when(mockLmdbEnvDirFactoryBuilder.config(Mockito.any()))
36+
.thenReturn(mockLmdbEnvDirFactoryBuilder);
37+
Mockito.when(mockLmdbEnvDirFactoryBuilder.subDir(Mockito.any()))
38+
.thenReturn(mockLmdbEnvDirFactoryBuilder);
39+
Mockito.when(mockLmdbEnvDirFactoryBuilder.build())
40+
.thenReturn(mockLmdbEnvDir);
41+
42+
final DuplicateCheckDirs duplicateCheckDirs = new DuplicateCheckDirs(
43+
mockLmdbEnvDirFactory, mockDuplicateCheckStoreConfig);
44+
45+
final List<String> dupStoreUuids = List.of(
46+
"uuid1",
47+
"uuid2",
48+
"uuid3",
49+
"uuid4");
50+
final List<AnalyticRuleDoc> analyticRuleDocs = List.of(
51+
makeDoc("uuid2"),
52+
makeDoc("uuid4"),
53+
makeDoc("uuid5"));
54+
55+
final List<String> deletedUuids = duplicateCheckDirs.deleteUnused(dupStoreUuids, analyticRuleDocs);
56+
assertThat(deletedUuids)
57+
.containsExactly(
58+
"uuid1",
59+
"uuid3");
60+
}
61+
62+
@Test
63+
void testDeleteUnused_all() {
64+
65+
Mockito.when(mockLmdbEnvDirFactory.builder())
66+
.thenReturn(mockLmdbEnvDirFactoryBuilder);
67+
Mockito.when(mockLmdbEnvDirFactoryBuilder.config(Mockito.any()))
68+
.thenReturn(mockLmdbEnvDirFactoryBuilder);
69+
Mockito.when(mockLmdbEnvDirFactoryBuilder.subDir(Mockito.any()))
70+
.thenReturn(mockLmdbEnvDirFactoryBuilder);
71+
Mockito.when(mockLmdbEnvDirFactoryBuilder.build())
72+
.thenReturn(mockLmdbEnvDir);
73+
74+
final DuplicateCheckDirs duplicateCheckDirs = new DuplicateCheckDirs(
75+
mockLmdbEnvDirFactory, mockDuplicateCheckStoreConfig);
76+
77+
final List<String> dupStoreUuids = List.of(
78+
"uuid1",
79+
"uuid2",
80+
"uuid3",
81+
"uuid4");
82+
final List<AnalyticRuleDoc> analyticRuleDocs = null;
83+
84+
final List<String> deletedUuids = duplicateCheckDirs.deleteUnused(dupStoreUuids, analyticRuleDocs);
85+
assertThat(deletedUuids)
86+
.containsExactly(
87+
"uuid1",
88+
"uuid2",
89+
"uuid3",
90+
"uuid4");
91+
}
92+
93+
@Test
94+
void testDeleteUnused_none() {
95+
final DuplicateCheckDirs duplicateCheckDirs = new DuplicateCheckDirs(
96+
mockLmdbEnvDirFactory, mockDuplicateCheckStoreConfig);
97+
98+
final List<String> dupStoreUuids = List.of(
99+
"uuid1",
100+
"uuid2",
101+
"uuid3",
102+
"uuid4");
103+
final List<AnalyticRuleDoc> analyticRuleDocs = List.of(
104+
makeDoc("uuid1"),
105+
makeDoc("uuid2"),
106+
makeDoc("uuid3"),
107+
makeDoc("uuid4"),
108+
makeDoc("uuid5"));
109+
110+
final List<String> deletedUuids = duplicateCheckDirs.deleteUnused(dupStoreUuids, analyticRuleDocs);
111+
assertThat(deletedUuids)
112+
.isEmpty();
113+
}
114+
115+
private AnalyticRuleDoc makeDoc(final String uuid) {
116+
final AnalyticRuleDoc mockDoc = Mockito.mock(AnalyticRuleDoc.class);
117+
Mockito.when(mockDoc.getUuid())
118+
.thenReturn(uuid);
119+
return mockDoc;
120+
}
121+
}

Diff for: stroom-app/src/test/java/stroom/analytics/TestTableBuilderAnalytics.java

+14
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,20 @@ void testHavingCount() {
6868
basicTest(query, 9, 2);
6969
}
7070

71+
@Test
72+
void testHavingEquals() {
73+
final String query = """
74+
from index_view
75+
where UserId = user5
76+
eval count = count()
77+
eval EventTime = floorYear(EventTime)
78+
eval my_num = min(3)
79+
group by EventTime, UserId
80+
having count > 3 and my_num = 3
81+
select EventTime, UserId, count""";
82+
basicTest(query, 9, 2);
83+
}
84+
7185
@Test
7286
void testWindowCount() {
7387
final String query = """

Diff for: stroom-app/src/test/java/stroom/search/TestInteractiveSearch2.java

+21
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,27 @@ void positiveCaseInsensitiveTest() {
9393
test(queryString, 5);
9494
}
9595

96+
/**
97+
* Test the having clause.
98+
*/
99+
@Test
100+
void testHavingEquals() {
101+
String queryString = """
102+
from "Test index"
103+
where UserId = user5 and Description = e0567
104+
and EventTime >= 2000-01-01T00:00:00.000Z
105+
and EventTime <= 2016-01-02T00:00:00.000Z
106+
eval my_num = 3
107+
having my_num = 3
108+
select
109+
StreamId as "Stream Id",
110+
EventId as "Event Id",
111+
EventTime as "Event Time",
112+
"annotation:Status" as Status
113+
""";
114+
test(queryString, 5);
115+
}
116+
96117
// @Test
97118
// void positiveCaseInsensitiveTestMultiComponent() {
98119
// final ExpressionOperator.Builder expression = buildExpression("UserId", "user5", "2000-01-01T00:00:00.000Z",

0 commit comments

Comments
 (0)