Skip to content

Commit 6feaa6d

Browse files
authored
Merge pull request #4832 from gchq/gh-4828
#4828 Fix recursive cache invalidation for index load
2 parents 095afb4 + b7b58b6 commit 6feaa6d

File tree

6 files changed

+232
-1895
lines changed

6 files changed

+232
-1895
lines changed

.gitignore

+1-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ jars/
1717
bin/
1818
embedmysql/
1919
.settings/
20-
stroom-dashboard-gwt/hs_err_pid*.log
21-
stroom-app-gwt/hs_err_pid*.log
20+
hs_err_pid*.log
2221
stroom-util-build.properties
2322

2423
stroom-core/src/test/resources/transientContentPacks/

stroom-index/stroom-index-impl-db/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ dependencies {
77
implementation project(':stroom-dictionary:stroom-dictionary-api')
88
implementation project(':stroom-docref')
99
implementation project(':stroom-docstore:stroom-docstore-api')
10+
implementation project(':stroom-docstore:stroom-docstore-impl')
1011
implementation project(':stroom-explorer:stroom-collection-api')
1112
implementation project(':stroom-explorer:stroom-docrefinfo-api')
1213
implementation project(':stroom-explorer:stroom-explorer-api')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2016 Crown Copyright
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package stroom.index.impl.db.migration;
18+
19+
import stroom.docref.DocRef;
20+
import stroom.docstore.api.Serialiser2;
21+
import stroom.docstore.impl.Serialiser2FactoryImpl;
22+
import stroom.index.shared.LuceneIndexDoc;
23+
import stroom.index.shared.LuceneIndexField;
24+
import stroom.util.NullSafe;
25+
import stroom.util.logging.LambdaLogger;
26+
import stroom.util.logging.LambdaLoggerFactory;
27+
28+
import org.flywaydb.core.api.migration.BaseJavaMigration;
29+
import org.flywaydb.core.api.migration.Context;
30+
31+
import java.io.IOException;
32+
import java.nio.charset.StandardCharsets;
33+
import java.sql.PreparedStatement;
34+
import java.sql.ResultSet;
35+
import java.sql.SQLException;
36+
import java.sql.SQLSyntaxErrorException;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.Map;
40+
41+
@SuppressWarnings("unused")
42+
public class V07_08_00_001__IndexFields extends BaseJavaMigration {
43+
44+
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(V07_08_00_001__IndexFields.class);
45+
46+
private final Serialiser2<LuceneIndexDoc> indexDocSerialiser =
47+
new Serialiser2FactoryImpl().createSerialiser(LuceneIndexDoc.class);
48+
49+
@Override
50+
public void migrate(final Context context) throws Exception {
51+
final List<LuceneIndexDoc> docs = getDocs(context);
52+
for (final LuceneIndexDoc doc : docs) {
53+
if (NullSafe.hasItems(doc, LuceneIndexDoc::getFields)) {
54+
final DocRef docRef = doc.asDocRef();
55+
56+
// Ensure field source
57+
createFieldSource(context, docRef);
58+
59+
// Get field source
60+
final int fieldSourceId = getFieldSourceId(context, docRef);
61+
62+
// Add fields to DB
63+
addFieldsToDb(context, fieldSourceId, doc.getFields());
64+
65+
// Remove all fields from doc
66+
doc.setFields(null);
67+
68+
// Write the updated doc
69+
writeDoc(context, doc);
70+
}
71+
}
72+
}
73+
74+
private List<LuceneIndexDoc> getDocs(final Context context) throws SQLException {
75+
final List<LuceneIndexDoc> docs = new ArrayList<>();
76+
try (final PreparedStatement preparedStatement = context.getConnection()
77+
.prepareStatement("" +
78+
"SELECT" +
79+
" `uuid`," +
80+
" `name`," +
81+
" `data`" +
82+
" FROM `doc`" +
83+
" WHERE `type` = ?" +
84+
" AND `ext` = ?")) {
85+
preparedStatement.setString(1, LuceneIndexDoc.TYPE);
86+
preparedStatement.setString(2, "meta");
87+
88+
try (final ResultSet resultSet = preparedStatement.executeQuery()) {
89+
while (resultSet.next()) {
90+
final String uuid = resultSet.getString(1);
91+
final String name = resultSet.getString(2);
92+
final String data = resultSet.getString(3);
93+
94+
// Read the doc
95+
final LuceneIndexDoc doc = readDoc(data);
96+
docs.add(doc);
97+
}
98+
}
99+
} catch (final SQLSyntaxErrorException e) {
100+
// Ignore errors as they are due to doc not existing, which is ok because it means we have nothing to
101+
// migrate.
102+
LOGGER.debug(e::getMessage, e);
103+
}
104+
return docs;
105+
}
106+
107+
private void createFieldSource(final Context context, final DocRef docRef) throws SQLException {
108+
// Ensure field source
109+
try (final PreparedStatement ps = context.getConnection()
110+
.prepareStatement("" +
111+
"INSERT INTO `index_field_source`" +
112+
" (`type`," +
113+
" `uuid`," +
114+
" `name`)" +
115+
" VALUES (?, ?, ?)" +
116+
" ON DUPLICATE KEY UPDATE" +
117+
" `type`=`type`")) {
118+
ps.setString(1, docRef.getType());
119+
ps.setString(2, docRef.getUuid());
120+
ps.setString(3, docRef.getName());
121+
ps.execute();
122+
}
123+
}
124+
125+
private int getFieldSourceId(final Context context, final DocRef docRef) throws SQLException {
126+
try (final PreparedStatement ps = context.getConnection()
127+
.prepareStatement("" +
128+
"SELECT `id`" +
129+
" FROM `index_field_source`" +
130+
" WHERE `type` = ?" +
131+
" AND `uuid` = ?")) {
132+
ps.setString(1, docRef.getType());
133+
ps.setString(2, docRef.getUuid());
134+
135+
try (final ResultSet rs = ps.executeQuery()) {
136+
if (rs.next()) {
137+
return rs.getInt(1);
138+
}
139+
}
140+
}
141+
throw new RuntimeException("Field source not found");
142+
}
143+
144+
private void addFieldsToDb(final Context context,
145+
final int fieldSourceId,
146+
final List<LuceneIndexField> fields) throws SQLException {
147+
try (final PreparedStatement ps = context.getConnection()
148+
.prepareStatement("" +
149+
"INSERT INTO `index_field` (" +
150+
"`fk_index_field_source_id`, " +
151+
"`type`, " +
152+
"`name`, " +
153+
"`analyzer`, " +
154+
"`indexed`, " +
155+
"`stored`, " +
156+
"`term_positions`, " +
157+
"`case_sensitive`)" +
158+
" VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
159+
" ON DUPLICATE KEY UPDATE" +
160+
" `fk_index_field_source_id`=`fk_index_field_source_id`")) {
161+
162+
for (final LuceneIndexField field : fields) {
163+
ps.setInt(1, fieldSourceId);
164+
ps.setByte(2, (byte) (field.getFldType() == null
165+
? 0
166+
: field.getFldType().getIndex()));
167+
ps.setString(3, field.getFldName());
168+
ps.setString(4, field.getAnalyzerType() == null
169+
? null
170+
: field.getAnalyzerType().getDisplayValue());
171+
ps.setBoolean(5, field.isIndexed());
172+
ps.setBoolean(6, field.isStored());
173+
ps.setBoolean(7, field.isTermPositions());
174+
ps.setBoolean(8, field.isCaseSensitive());
175+
ps.executeUpdate();
176+
}
177+
}
178+
}
179+
180+
private LuceneIndexDoc readDoc(final String data) {
181+
LuceneIndexDoc doc = null;
182+
try {
183+
doc = indexDocSerialiser.read(data.getBytes(StandardCharsets.UTF_8));
184+
} catch (final IOException | RuntimeException e) {
185+
LOGGER.error(e.getMessage(), e);
186+
}
187+
return doc;
188+
}
189+
190+
private void writeDoc(final Context context, final LuceneIndexDoc doc) throws IOException, SQLException {
191+
final Map<String, byte[]> dataMap = indexDocSerialiser.write(doc);
192+
final byte[] newData = dataMap.remove("meta");
193+
// Add the records.
194+
try (final PreparedStatement ps = context.getConnection().prepareStatement(
195+
"UPDATE `doc` SET `data` = ? WHERE `type` = ? AND `uuid` = ? AND `ext` = ?")) {
196+
ps.setBytes(1, newData);
197+
ps.setString(2, doc.getType());
198+
ps.setString(3, doc.getUuid());
199+
ps.setString(4, "meta");
200+
ps.executeUpdate();
201+
}
202+
}
203+
}

stroom-index/stroom-index-impl/src/main/java/stroom/index/impl/IndexStoreImpl.java

+3-19
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,12 @@ public void remapDependencies(final DocRef docRef,
142142

143143
@Override
144144
public LuceneIndexDoc readDocument(final DocRef docRef) {
145-
return transferFieldsToDb(store.readDocument(docRef));
145+
return store.readDocument(docRef);
146146
}
147147

148148
@Override
149149
public LuceneIndexDoc writeDocument(final LuceneIndexDoc document) {
150-
return transferFieldsToDb(store.writeDocument(document));
150+
return store.writeDocument(document);
151151
}
152152

153153
////////////////////////////////////////////////////////////////////////
@@ -192,7 +192,7 @@ public DocRef importDocument(final DocRef docRef,
192192
}
193193

194194
// Transfer fields to the database.
195-
if (!NullSafe.isEmptyCollection(doc.getFields())) {
195+
if (NullSafe.hasItems(doc.getFields())) {
196196
// Make sure we transfer all fields to the DB and remove them from the doc.
197197
final List<IndexField> fields = doc
198198
.getFields()
@@ -216,22 +216,6 @@ public DocRef importDocument(final DocRef docRef,
216216
return store.importDocument(docRef, effectiveDataMap, importState, importSettings);
217217
}
218218

219-
private LuceneIndexDoc transferFieldsToDb(final LuceneIndexDoc doc) {
220-
if (doc == null || NullSafe.isEmptyCollection(doc.getFields())) {
221-
return doc;
222-
}
223-
224-
// Make sure we transfer all fields to the DB and remove them from the doc.
225-
final List<IndexField> fields = doc
226-
.getFields()
227-
.stream()
228-
.map(field -> (IndexField) field)
229-
.toList();
230-
indexFieldServiceProvider.get().addFields(doc.asDocRef(), fields);
231-
doc.setFields(null);
232-
return store.writeDocument(doc);
233-
}
234-
235219
@Override
236220
public Map<String, byte[]> exportDocument(final DocRef docRef,
237221
final boolean omitAuditFields,

0 commit comments

Comments
 (0)