Skip to content

Commit 65f8d14

Browse files
authored
Merge pull request #4241 from gchq/gh-4240-multiple-shards
PR for #4240 - Stroom creating too many shards per partition
2 parents 935b00e + bef6936 commit 65f8d14

File tree

4 files changed

+80
-5
lines changed

4 files changed

+80
-5
lines changed

stroom-core-shared/src/main/java/stroom/index/shared/IndexShard.java

+4
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,10 @@ public String toString() {
313313
return sb.toString();
314314
}
315315

316+
317+
// --------------------------------------------------------------------------------
318+
319+
316320
/**
317321
* The status of this indexUuid shard
318322
*/

stroom-index/stroom-index-impl-db/src/main/java/stroom/index/impl/db/IndexShardDaoImpl.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import stroom.query.language.functions.ValNull;
3131
import stroom.query.language.functions.ValString;
3232
import stroom.query.language.functions.ValuesConsumer;
33+
import stroom.util.logging.LambdaLogger;
34+
import stroom.util.logging.LambdaLoggerFactory;
3335
import stroom.util.shared.PageRequest;
3436
import stroom.util.shared.Range;
3537
import stroom.util.shared.ResultPage;
@@ -63,6 +65,8 @@
6365
@Singleton // holding all the volume selectors
6466
class IndexShardDaoImpl implements IndexShardDao {
6567

68+
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(IndexShardDaoImpl.class);
69+
6670
private static final Function<Record, IndexShard> RECORD_TO_INDEX_SHARD_MAPPER = record -> {
6771
final IndexShard indexShard = new IndexShard();
6872
indexShard.setId(record.get(INDEX_SHARD.ID));
@@ -301,12 +305,16 @@ public boolean setStatus(final Long id,
301305
case CORRUPT -> INDEX_SHARD.STATUS.ne(IndexShardStatus.DELETED.getPrimitiveValue());
302306
};
303307

304-
return JooqUtil.contextResult(indexDbConnProvider, context -> context
308+
final boolean didUpdate = JooqUtil.contextResult(indexDbConnProvider, context -> context
305309
.update(INDEX_SHARD)
306310
.set(INDEX_SHARD.STATUS, status.getPrimitiveValue())
307311
.where(INDEX_SHARD.ID.eq(id))
308312
.and(currentStateCondition)
309313
.execute()) > 0;
314+
315+
LOGGER.debug("Setting shard status to {} for shard id {}, didUpdate: {}", status, id, didUpdate);
316+
317+
return didUpdate;
310318
}
311319

312320
@Override

stroom-index/stroom-index-impl/src/main/java/stroom/index/impl/ActiveShardsCacheImpl.java

+43-4
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import stroom.index.shared.LuceneIndexDoc;
1212
import stroom.node.api.NodeInfo;
1313
import stroom.security.api.SecurityContext;
14+
import stroom.util.NullSafe;
1415
import stroom.util.logging.LambdaLogger;
1516
import stroom.util.logging.LambdaLoggerFactory;
17+
import stroom.util.logging.LogUtil;
1618
import stroom.util.shared.ResultPage;
1719

1820
import jakarta.inject.Inject;
@@ -21,6 +23,7 @@
2123

2224
import java.io.UncheckedIOException;
2325
import java.util.ArrayList;
26+
import java.util.EnumSet;
2427
import java.util.List;
2528
import java.util.concurrent.atomic.AtomicInteger;
2629
import java.util.concurrent.locks.ReentrantLock;
@@ -78,6 +81,7 @@ private ActiveShards create(final IndexShardKey indexShardKey) {
7881
throw new IndexException("Unable to find index with UUID: " + indexShardKey.getIndexUuid());
7982
}
8083

84+
LOGGER.debug("Creating ActiveShards for node: {}, indexShardKey: {}", nodeInfo, indexShardKey);
8185
return new ActiveShards(
8286
nodeInfo,
8387
indexShardWriterCache,
@@ -89,9 +93,24 @@ private ActiveShards create(final IndexShardKey indexShardKey) {
8993
});
9094
}
9195

96+
97+
// --------------------------------------------------------------------------------
98+
99+
92100
public static class ActiveShards {
93101

94-
private static final int MAX_ATTEMPTS = 10000;
102+
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(ActiveShards.class);
103+
104+
// All shards are CLOSED on boot, but if this cache is cleared or items age off
105+
// then we may shards in other states.
106+
private static final EnumSet<IndexShardStatus> REQUIRED_SHARD_STATES = EnumSet.of(
107+
IndexShardStatus.NEW,
108+
IndexShardStatus.OPEN,
109+
IndexShardStatus.OPENING,
110+
IndexShardStatus.CLOSED,
111+
IndexShardStatus.CLOSING);
112+
113+
private static final int MAX_ATTEMPTS = 10_000;
95114

96115
private final NodeInfo nodeInfo;
97116
private final IndexShardWriterCache indexShardWriterCache;
@@ -130,6 +149,7 @@ public void addDocument(final IndexDocument document) {
130149

131150
// Attempt under lock if we failed to add.
132151
if (!success) {
152+
LOGGER.debug("Trying again under lock");
133153
// If we failed then try under lock to make sure we get a new writer.
134154
addDocumentUnderLock(document);
135155
}
@@ -220,13 +240,16 @@ private boolean addDocument(final IndexDocument document,
220240

221241
private List<IndexShard> getIndexShards() {
222242
List<IndexShard> indexShards = this.indexShards;
243+
223244
if (indexShards.size() < shardsPerPartition) {
224245
indexShards = ensureShards();
225246
}
226247
return indexShards;
227248
}
228249

229250
private synchronized List<IndexShard> ensureShards() {
251+
LOGGER.debug(() -> LogUtil.message(
252+
"ensureShards, indexShards size before: {}", NullSafe.size(indexShards)));
230253
List<IndexShard> list = indexShards;
231254
if (list.size() < shardsPerPartition) {
232255
list = new ArrayList<>(list);
@@ -235,10 +258,13 @@ private synchronized List<IndexShard> ensureShards() {
235258
}
236259
}
237260
indexShards = list;
261+
LOGGER.debug(() -> LogUtil.message(
262+
"ensureShards, indexShards size after: {}", NullSafe.size(indexShards)));
238263
return list;
239264
}
240265

241266
private synchronized void addActiveShard(final IndexShardKey indexShardKey) {
267+
LOGGER.debug("Adding shard for key {}", indexShardKey);
242268
final IndexShard indexShard = createNewShard(indexShardKey);
243269
final List<IndexShard> list = new ArrayList<>(indexShards);
244270
list.add(indexShard);
@@ -263,21 +289,34 @@ private List<IndexShard> getExistingShards(final IndexShardKey indexShardKey) {
263289

264290
final List<IndexShard> indexShards = new ArrayList<>();
265291
final ResultPage<IndexShard> indexShardResultPage = indexShardDao.find(criteria);
292+
LOGGER.debug(() -> LogUtil.message(
293+
"getExistingShards(), found {} un-filtered shards, maxDocsPerShard: {}",
294+
NullSafe.getOrElse(indexShardResultPage, ResultPage::size, 0),
295+
maxDocsPerShard));
266296
for (final IndexShard indexShard : indexShardResultPage.getValues()) {
267297
// Look for non deleted, non-full, non-corrupt index shards.
268-
if (IndexShardStatus.CLOSED.equals(indexShard.getStatus()) &&
269-
indexShard.getDocumentCount() < maxDocsPerShard) {
298+
final IndexShardStatus status = indexShard.getStatus();
299+
if (status != null
300+
&& REQUIRED_SHARD_STATES.contains(status)
301+
&& indexShard.getDocumentCount() < maxDocsPerShard) {
270302
indexShards.add(indexShard);
303+
} else {
304+
LOGGER.debug(() -> LogUtil.message("Ignoring shard {} with status: {}, docCount: {}",
305+
indexShard.getId(), status, indexShard.getDocumentCount()));
271306
}
272307
}
308+
LOGGER.debug(() -> LogUtil.message(
309+
"getExistingShards(), indexShards size: {}", NullSafe.size(indexShards)));
273310
return indexShards;
274311
}
275312

276313
/**
277314
* Creates a new index shard writer for the specified key and opens a writer for it.
278315
*/
279316
private IndexShard createNewShard(final IndexShardKey indexShardKey) {
280-
return indexShardCreator.createIndexShard(indexShardKey, nodeInfo.getThisNodeName());
317+
final String thisNodeName = nodeInfo.getThisNodeName();
318+
LOGGER.debug("Creating shard for key {} on {}", indexShardKey, thisNodeName);
319+
return indexShardCreator.createIndexShard(indexShardKey, thisNodeName);
281320
}
282321
}
283322
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
* Issue **#4240** : Fix creation of new shards when non-full shards already exist for a partition.
2+
3+
4+
```sh
5+
# ********************************************************************************
6+
# Issue title: Stroom creating too many shards per partition
7+
# Issue link: https://github.com/gchq/stroom/issues/4240
8+
# ********************************************************************************
9+
10+
# ONLY the top line will be included as a change entry in the CHANGELOG.
11+
# The entry should be in GitHub flavour markdown and should be written on a SINGLE
12+
# line with no hard breaks. You can have multiple change files for a single GitHub issue.
13+
# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than
14+
# 'Fixed nasty bug'.
15+
#
16+
# Examples of acceptable entries are:
17+
#
18+
#
19+
# * Issue **123** : Fix bug with an associated GitHub issue in this repository
20+
#
21+
# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository
22+
#
23+
# * Fix bug with no associated GitHub issue.
24+
```

0 commit comments

Comments
 (0)