Skip to content

Commit fdfd503

Browse files
committed
Merge branch '7.3'
2 parents cd7a82a + 5a93b47 commit fdfd503

File tree

5 files changed

+308
-24
lines changed

5 files changed

+308
-24
lines changed

stroom-core-shared/src/main/java/stroom/index/shared/IndexFieldImpl.java

+4
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ public Builder copy() {
133133
return new Builder(this);
134134
}
135135

136+
137+
// --------------------------------------------------------------------------------
138+
139+
136140
public static final class Builder {
137141

138142
private String fldName;

stroom-core-shared/src/main/java/stroom/index/shared/LuceneIndexField.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ public boolean isTermPositions() {
247247
return termPositions;
248248
}
249249

250-
251250
@Override
252251
@JsonIgnore
253252
public String getDisplayValue() {
@@ -295,6 +294,10 @@ public Builder copy() {
295294
return new Builder(this);
296295
}
297296

297+
298+
// --------------------------------------------------------------------------------
299+
300+
298301
public static final class Builder {
299302

300303
private String fldName;

stroom-index/stroom-index-impl-db/src/main/java/stroom/index/impl/db/IndexFieldDaoImpl.java

+50-23
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import stroom.db.util.JooqUtil;
2424
import stroom.db.util.StringMatchConditionUtil;
2525
import stroom.docref.DocRef;
26-
import stroom.docref.StringMatch;
2726
import stroom.index.impl.IndexFieldDao;
2827
import stroom.index.shared.IndexFieldImpl;
2928
import stroom.util.shared.ResultPage;
3029

3130
import jakarta.inject.Inject;
3231
import org.jooq.Condition;
32+
import org.jooq.DSLContext;
3333

3434
import java.util.Collection;
3535
import java.util.Collections;
@@ -48,40 +48,64 @@ public class IndexFieldDaoImpl implements IndexFieldDao {
4848
this.queryDatasourceDbConnProvider = queryDatasourceDbConnProvider;
4949
}
5050

51-
private int getOrCreateFieldSource(final DocRef docRef) {
52-
Optional<Integer> optional = getFieldSource(docRef);
53-
if (optional.isEmpty()) {
54-
createFieldSource(docRef);
55-
optional = getFieldSource(docRef);
56-
}
57-
return optional.orElseThrow();
51+
private void ensureFieldSource(final DocRef docRef) {
52+
JooqUtil.context(queryDatasourceDbConnProvider, context -> {
53+
Optional<Integer> optional = getFieldSource(context, docRef, false);
54+
if (optional.isEmpty()) {
55+
createFieldSource(context, docRef);
56+
}
57+
});
5858
}
5959

60-
private Optional<Integer> getFieldSource(final DocRef docRef) {
61-
return JooqUtil
62-
.contextResult(queryDatasourceDbConnProvider, context -> context
63-
.select(INDEX_FIELD_SOURCE.ID)
64-
.from(INDEX_FIELD_SOURCE)
65-
.where(INDEX_FIELD_SOURCE.TYPE.eq(docRef.getType()))
66-
.and(INDEX_FIELD_SOURCE.UUID.eq(docRef.getUuid()))
67-
.fetchOptional(INDEX_FIELD_SOURCE.ID));
60+
private Optional<Integer> getFieldSource(final DocRef docRef,
61+
final boolean lockFieldSource) {
62+
63+
return JooqUtil.contextResult(queryDatasourceDbConnProvider, context ->
64+
getFieldSource(context, docRef, lockFieldSource));
6865
}
6966

70-
private void createFieldSource(final DocRef docRef) {
71-
JooqUtil.context(queryDatasourceDbConnProvider, context -> context
67+
private Optional<Integer> getFieldSource(final DSLContext context,
68+
final DocRef docRef,
69+
final boolean lockFieldSource) {
70+
var c = context
71+
.select(INDEX_FIELD_SOURCE.ID)
72+
.from(INDEX_FIELD_SOURCE)
73+
.where(INDEX_FIELD_SOURCE.TYPE.eq(docRef.getType()))
74+
.and(INDEX_FIELD_SOURCE.UUID.eq(docRef.getUuid()));
75+
76+
if (lockFieldSource) {
77+
return c.forUpdate()
78+
.fetchOptional(INDEX_FIELD_SOURCE.ID);
79+
} else {
80+
return c.fetchOptional(INDEX_FIELD_SOURCE.ID);
81+
}
82+
}
83+
84+
private void createFieldSource(final DSLContext context, final DocRef docRef) {
85+
// TODO Consider using JooqUtil.tryCreate() as onDuplicateKeyIgnore will
86+
// ignore any error, not just dup keys
87+
context
7288
.insertInto(INDEX_FIELD_SOURCE)
7389
.set(INDEX_FIELD_SOURCE.TYPE, docRef.getType())
7490
.set(INDEX_FIELD_SOURCE.UUID, docRef.getUuid())
7591
.set(INDEX_FIELD_SOURCE.NAME, docRef.getName())
7692
.onDuplicateKeyIgnore()
77-
.execute());
93+
.execute();
7894
}
7995

8096
@Override
8197
public void addFields(final DocRef docRef, final Collection<IndexField> fields) {
82-
final int fieldSourceId = getOrCreateFieldSource(docRef);
83-
JooqUtil.context(queryDatasourceDbConnProvider, context -> {
84-
var c = context.insertInto(INDEX_FIELD,
98+
// Do this outside the txn so other threads can see it asap
99+
ensureFieldSource(docRef);
100+
101+
JooqUtil.transaction(queryDatasourceDbConnProvider, txnContext -> {
102+
// Get a record lock on the field source, so we are the only thread
103+
// that can mutate the index fields for that source, else we can get a deadlock.
104+
final int fieldSourceId = getFieldSource(txnContext, docRef, true)
105+
.orElseThrow(() -> new RuntimeException("No field source found for " + docRef));
106+
107+
// Insert any new fields under lock
108+
var c = txnContext.insertInto(INDEX_FIELD,
85109
INDEX_FIELD.FK_INDEX_FIELD_SOURCE_ID,
86110
INDEX_FIELD.TYPE,
87111
INDEX_FIELD.NAME,
@@ -90,6 +114,7 @@ public void addFields(final DocRef docRef, final Collection<IndexField> fields)
90114
INDEX_FIELD.STORED,
91115
INDEX_FIELD.TERM_POSITIONS,
92116
INDEX_FIELD.CASE_SENSITIVE);
117+
93118
for (final IndexField field : fields) {
94119
c = c.values(fieldSourceId,
95120
(byte) field.getFldType().getIndex(),
@@ -100,6 +125,7 @@ public void addFields(final DocRef docRef, final Collection<IndexField> fields)
100125
field.isTermPositions(),
101126
field.isCaseSensitive());
102127
}
128+
// Effectively ignore existing fields
103129
c.onDuplicateKeyUpdate()
104130
.set(INDEX_FIELD.FK_INDEX_FIELD_SOURCE_ID, fieldSourceId)
105131
.execute();
@@ -108,7 +134,8 @@ public void addFields(final DocRef docRef, final Collection<IndexField> fields)
108134

109135
@Override
110136
public ResultPage<IndexField> findFields(final FindIndexFieldCriteria criteria) {
111-
final Optional<Integer> optional = getFieldSource(criteria.getDataSourceRef());
137+
final Optional<Integer> optional = getFieldSource(criteria.getDataSourceRef(), false);
138+
112139
if (optional.isEmpty()) {
113140
return ResultPage.createCriterialBasedList(Collections.emptyList(), criteria);
114141
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package stroom.index.impl.db;
2+
3+
import stroom.datasource.api.v2.AnalyzerType;
4+
import stroom.datasource.api.v2.FieldType;
5+
import stroom.datasource.api.v2.FindIndexFieldCriteria;
6+
import stroom.datasource.api.v2.IndexField;
7+
import stroom.db.util.JooqUtil;
8+
import stroom.docref.DocRef;
9+
import stroom.index.impl.IndexFieldDao;
10+
import stroom.index.shared.IndexFieldImpl;
11+
import stroom.index.shared.LuceneIndexDoc;
12+
import stroom.util.concurrent.ThreadUtil;
13+
import stroom.util.exception.ThrowingRunnable;
14+
import stroom.util.logging.LambdaLogger;
15+
import stroom.util.logging.LambdaLoggerFactory;
16+
import stroom.util.shared.PageRequest;
17+
import stroom.util.shared.ResultPage;
18+
19+
import com.google.inject.Guice;
20+
import com.google.inject.Injector;
21+
import jakarta.inject.Inject;
22+
import org.jooq.Record1;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Disabled;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.sql.SQLException;
28+
import java.util.ArrayList;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.ExecutionException;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
36+
37+
import static org.assertj.core.api.Assertions.assertThat;
38+
import static stroom.db.util.JooqUtil.count;
39+
import static stroom.index.impl.db.jooq.tables.IndexField.INDEX_FIELD;
40+
import static stroom.index.impl.db.jooq.tables.IndexFieldSource.INDEX_FIELD_SOURCE;
41+
42+
class TestIndexFieldDaoImpl {
43+
44+
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(TestIndexFieldDaoImpl.class);
45+
46+
private static final DocRef DOC_REF_1 = DocRef.builder()
47+
.uuid("a8c078d8-b2e7-41e6-b254-d4870ab75ab8")
48+
.type(LuceneIndexDoc.DOCUMENT_TYPE)
49+
.name("foo")
50+
.build();
51+
52+
private static final DocRef DOC_REF_2 = DocRef.builder()
53+
.uuid("05da4d75-28d4-48ad-9195-9302cfacbc1c")
54+
.type(LuceneIndexDoc.DOCUMENT_TYPE)
55+
.name("bar")
56+
.build();
57+
58+
private static final IndexField FIELD_1 = IndexFieldImpl.builder()
59+
.fldName("ID")
60+
.fldType(FieldType.ID)
61+
.analyzerType(AnalyzerType.NUMERIC)
62+
.build();
63+
private static final IndexField FIELD_2 = IndexFieldImpl.builder()
64+
.fldName("Name")
65+
.fldType(FieldType.TEXT)
66+
.analyzerType(AnalyzerType.ALPHA_NUMERIC)
67+
.build();
68+
private static final IndexField FIELD_3 = IndexFieldImpl.builder()
69+
.fldName("State")
70+
.fldType(FieldType.BOOLEAN)
71+
.analyzerType(AnalyzerType.KEYWORD)
72+
.build();
73+
74+
@Inject
75+
IndexFieldDao indexFieldDao;
76+
@Inject
77+
IndexDbConnProvider indexDbConnProvider;
78+
79+
@BeforeEach
80+
void setUp() throws SQLException {
81+
final Injector injector = Guice.createInjector(
82+
new IndexDbModule(),
83+
new IndexDaoModule(),
84+
new TestModule());
85+
injector.injectMembers(this);
86+
87+
JooqUtil.transaction(indexDbConnProvider, context -> {
88+
LOGGER.info("Tear down");
89+
context.deleteFrom(INDEX_FIELD).execute();
90+
context.deleteFrom(INDEX_FIELD_SOURCE).execute();
91+
92+
assertThat(count(context, INDEX_FIELD))
93+
.isEqualTo(0);
94+
assertThat(count(context, INDEX_FIELD_SOURCE))
95+
.isEqualTo(0);
96+
});
97+
}
98+
99+
@Test
100+
void addFields() {
101+
List<IndexField> fields = getFields(DOC_REF_1);
102+
103+
assertThat(fields.size())
104+
.isEqualTo(0);
105+
106+
indexFieldDao.addFields(DOC_REF_1, List.of(FIELD_1));
107+
108+
fields = getFields(DOC_REF_1);
109+
110+
assertThat(fields.size())
111+
.isEqualTo(1);
112+
113+
// Now add all three fields, so field 1 is ignored
114+
indexFieldDao.addFields(DOC_REF_1, List.of(FIELD_1, FIELD_2, FIELD_3));
115+
116+
fields = getFields(DOC_REF_1);
117+
118+
assertThat(fields.size())
119+
.isEqualTo(3);
120+
}
121+
122+
@Disabled // Verifying lock behaviour
123+
@Test
124+
void test() throws ExecutionException, InterruptedException {
125+
indexFieldDao.addFields(DOC_REF_1, List.of(FIELD_1));
126+
127+
final CountDownLatch startLatch = new CountDownLatch(1);
128+
129+
CompletableFuture.runAsync(() -> {
130+
JooqUtil.context(indexDbConnProvider, context -> {
131+
try {
132+
startLatch.await();
133+
} catch (InterruptedException e) {
134+
throw new RuntimeException(e);
135+
}
136+
137+
final Record1<Integer> rec = context.select(INDEX_FIELD_SOURCE.ID)
138+
.from(INDEX_FIELD_SOURCE)
139+
.where(INDEX_FIELD_SOURCE.UUID.eq(DOC_REF_1.getUuid()))
140+
.and(INDEX_FIELD_SOURCE.TYPE.eq(DOC_REF_1.getType()))
141+
.fetchOne();
142+
143+
final Integer id = rec.get(INDEX_FIELD_SOURCE.ID);
144+
145+
LOGGER.info("id {}", id);
146+
});
147+
});
148+
149+
CompletableFuture.runAsync(() -> {
150+
JooqUtil.context(indexDbConnProvider, context -> {
151+
context.select(INDEX_FIELD_SOURCE.ID)
152+
.from(INDEX_FIELD_SOURCE)
153+
.where(INDEX_FIELD_SOURCE.UUID.eq(DOC_REF_1.getUuid()))
154+
.and(INDEX_FIELD_SOURCE.TYPE.eq(DOC_REF_1.getType()))
155+
.forUpdate()
156+
.fetch();
157+
LOGGER.info("Done lock");
158+
startLatch.countDown();
159+
160+
ThreadUtil.sleepIgnoringInterrupts(5_000);
161+
});
162+
}).get();
163+
}
164+
165+
/**
166+
* Make sure multiple threads can concurrently add different lists of fields
167+
*/
168+
@Test
169+
void testMultiThread() throws ExecutionException, InterruptedException {
170+
final int threads = 10;
171+
final ExecutorService executorService = Executors.newFixedThreadPool(threads);
172+
final CountDownLatch startLatch = new CountDownLatch(1);
173+
174+
final List<CompletableFuture<?>> futures = new ArrayList<>();
175+
for (int i = 0; i < threads; i++) {
176+
final int finalI = i;
177+
final CompletableFuture<Void> future = CompletableFuture.runAsync(
178+
ThrowingRunnable.unchecked(() -> {
179+
final List<IndexField> fields = finalI % 2 == 0
180+
? List.of(FIELD_1, FIELD_3)
181+
: List.of(FIELD_2, FIELD_3);
182+
startLatch.await();
183+
indexFieldDao.addFields(DOC_REF_1, fields);
184+
LOGGER.debug("Thread {} complete", finalI);
185+
}), executorService);
186+
futures.add(future);
187+
}
188+
startLatch.countDown();
189+
190+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
191+
.get();
192+
193+
final List<IndexField> fields = getFields(DOC_REF_1);
194+
195+
assertThat(fields.size())
196+
.isEqualTo(3);
197+
}
198+
199+
@Test
200+
void findFields() {
201+
List<IndexField> fields = getFields(DOC_REF_1);
202+
assertThat(fields.size())
203+
.isEqualTo(0);
204+
205+
indexFieldDao.addFields(DOC_REF_1, List.of(FIELD_1, FIELD_2, FIELD_3));
206+
indexFieldDao.addFields(DOC_REF_2, List.of(FIELD_3));
207+
208+
fields = getFields(DOC_REF_1);
209+
assertThat(fields.size())
210+
.isEqualTo(3);
211+
212+
fields = getFields(DOC_REF_2);
213+
assertThat(fields.size())
214+
.isEqualTo(1);
215+
}
216+
217+
private List<IndexField> getFields(final DocRef docRef) {
218+
final ResultPage<IndexField> resultPage = indexFieldDao.findFields(
219+
new FindIndexFieldCriteria(
220+
new PageRequest(),
221+
Collections.emptyList(),
222+
docRef,
223+
null));
224+
return resultPage.getValues();
225+
}
226+
}

0 commit comments

Comments
 (0)