Skip to content

Commit 2e478c0

Browse files
committed
Issue #152 : Changed the way indexing is performed so that a single indexer object is now responsible for indexing documents and adding them to the appropriate shard.
1 parent 0d790d2 commit 2e478c0

27 files changed

+480
-494
lines changed

CHANGELOG.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
99

1010
### Changed
1111

12+
## [v5.0-beta.30] - 2017-06-06
13+
14+
* Issue **#152** : Changed the way indexing is performed so that a single indexer object is now responsible for indexing documents and adding them to the appropriate shard.
15+
1216
## [v5.0-beta.29] - 2017-05-26
1317

1418
* Issue **#179** : Updated Saxon-HE to version 9.7.0-18 and added XSLTFilter option to `usePool` to see if caching might be responsible for issue.
@@ -429,7 +433,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
429433
## [v5.0-beta.4] - 2016-10-03
430434
* Initial open source release
431435

432-
[Unreleased]: https://github.com/gchq/stroom/compare/v5.0-beta.29...HEAD
436+
[Unreleased]: https://github.com/gchq/stroom/compare/v5.0-beta.30...HEAD
437+
[v5.0-beta.30]: https://github.com/gchq/stroom/compare/v5.0-beta.29...v5.0-beta.30
433438
[v5.0-beta.29]: https://github.com/gchq/stroom/compare/v5.0-beta.28...v5.0-beta.29
434439
[v5.0-beta.28]: https://github.com/gchq/stroom/compare/v5.0-beta.27...v5.0-beta.28
435440
[v5.0-beta.27]: https://github.com/gchq/stroom/compare/v5.0-beta.26...v5.0-beta.27

stroom-index/src/main/java/stroom/index/server/CloseIndexShardActionHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class CloseIndexShardActionHandler extends AbstractTaskHandler<CloseIndexShardAc
4242
@Override
4343
public VoidResult exec(final CloseIndexShardAction action) {
4444
final FindCloseServiceClusterTask<FindIndexShardCriteria> clusterTask = new FindCloseServiceClusterTask<>(
45-
action.getUserToken(), action.getTaskName(), IndexShardWriterCache.class,
45+
action.getUserToken(), action.getTaskName(), IndexShardManager.class,
4646
action.getCriteria());
4747

4848
dispatchHelper.execAsync(clusterTask, TargetType.ACTIVE);

stroom-index/src/main/java/stroom/index/server/DeleteIndexShardActionHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class DeleteIndexShardActionHandler extends AbstractTaskHandler<DeleteIndexShard
4242
@Override
4343
public VoidResult exec(final DeleteIndexShardAction action) {
4444
final FindDeleteServiceClusterTask<FindIndexShardCriteria> clusterTask = new FindDeleteServiceClusterTask<>(
45-
action.getUserToken(), action.getTaskName(), IndexShardWriterCache.class,
45+
action.getUserToken(), action.getTaskName(), IndexShardManager.class,
4646
action.getCriteria());
4747
dispatchHelper.execAsync(clusterTask, TargetType.ACTIVE);
4848
return new VoidResult();

stroom-index/src/main/java/stroom/index/server/FlushIndexShardActionHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class FlushIndexShardActionHandler extends AbstractTaskHandler<FlushIndexShardAc
4242
@Override
4343
public VoidResult exec(final FlushIndexShardAction action) {
4444
final FindFlushServiceClusterTask<FindIndexShardCriteria> clusterTask = new FindFlushServiceClusterTask<>(
45-
action.getUserToken(), action.getTaskName(), IndexShardWriterCache.class,
45+
action.getUserToken(), action.getTaskName(), IndexShardManager.class,
4646
action.getCriteria());
4747

4848
dispatchHelper.execAsync(clusterTask, TargetType.ACTIVE);

stroom-index/src/main/java/stroom/index/server/IndexShardWriterCache.java renamed to stroom-index/src/main/java/stroom/index/server/IndexShardManager.java

+3-10
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,15 @@
1616

1717
package stroom.index.server;
1818

19-
import stroom.index.shared.FindIndexShardCriteria;
20-
import stroom.index.shared.IndexShard;
21-
import stroom.index.shared.IndexShardKey;
22-
import stroom.cache.CacheBean;
2319
import stroom.entity.shared.FindCloseService;
2420
import stroom.entity.shared.FindDeleteService;
2521
import stroom.entity.shared.FindFlushService;
22+
import stroom.index.shared.FindIndexShardCriteria;
2623

2724
/**
28-
* API into our index shard cache.
25+
* API into our index shard manager.
2926
*/
30-
public interface IndexShardWriterCache
31-
extends CacheBean<IndexShardKey, IndexShardWriter>, FindDeleteService<FindIndexShardCriteria>,
32-
FindCloseService<FindIndexShardCriteria>, FindFlushService<FindIndexShardCriteria> {
33-
IndexShardWriter getWriter(final IndexShard indexShard);
34-
27+
public interface IndexShardManager extends FindDeleteService<FindIndexShardCriteria>, FindCloseService<FindIndexShardCriteria>, FindFlushService<FindIndexShardCriteria> {
3528
void shutdown();
3629

3730
void flushAll();

stroom-index/src/main/java/stroom/index/server/IndexShardWriterCacheImpl.java renamed to stroom-index/src/main/java/stroom/index/server/IndexShardManagerImpl.java

+98-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
import net.sf.ehcache.CacheManager;
2020
import net.sf.ehcache.Element;
21+
import org.apache.lucene.document.Document;
22+
import org.apache.lucene.index.IndexWriter;
23+
import org.apache.lucene.store.AlreadyClosedException;
2124
import org.springframework.context.annotation.Profile;
2225
import org.springframework.stereotype.Component;
2326
import stroom.cache.AbstractCacheBean;
@@ -28,6 +31,7 @@
2831
import stroom.index.shared.Index;
2932
import stroom.index.shared.IndexService;
3033
import stroom.index.shared.IndexShard;
34+
import stroom.index.shared.IndexShard.IndexShardStatus;
3135
import stroom.index.shared.IndexShardKey;
3236
import stroom.index.shared.IndexShardService;
3337
import stroom.jobsystem.server.JobTrackedSchedule;
@@ -59,16 +63,17 @@
5963
import java.util.concurrent.TimeUnit;
6064
import java.util.concurrent.atomic.AtomicBoolean;
6165
import java.util.concurrent.atomic.AtomicInteger;
66+
import java.util.concurrent.locks.Lock;
6267

6368
/**
6469
* Pool API into open index shards.
6570
*/
66-
@Component("indexShardWriterCache")
71+
@Component("indexShardManager")
6772
@Profile(StroomSpringProfiles.PROD)
6873
@EntityEventHandler(type = Index.ENTITY_TYPE)
69-
public class IndexShardWriterCacheImpl extends AbstractCacheBean<IndexShardKey, IndexShardWriter>
70-
implements IndexShardWriterCache, EntityEvent.Handler {
71-
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(IndexShardWriterCacheImpl.class);
74+
public class IndexShardManagerImpl extends AbstractCacheBean<IndexShardKey, IndexShardWriter>
75+
implements IndexShardManager, Indexer, EntityEvent.Handler {
76+
private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(IndexShardManagerImpl.class);
7277
private static final int MAX_CACHE_ENTRIES = 1000000;
7378

7479
private final IndexService indexService;
@@ -81,9 +86,11 @@ public class IndexShardWriterCacheImpl extends AbstractCacheBean<IndexShardKey,
8186
private final ConcurrentHashMap<IndexShard, IndexShardWriter> ownedWriters = new ConcurrentHashMap<>();
8287
private final AtomicBoolean deletingShards = new AtomicBoolean();
8388

89+
private final StripedLock keyLocks = new StripedLock();
90+
8491
@Inject
85-
IndexShardWriterCacheImpl(final CacheManager cacheManager, final StroomPropertyService stroomPropertyService,
86-
final IndexService indexService, final IndexShardService indexShardService, final NodeCache nodeCache, final TaskManager taskManager) {
92+
IndexShardManagerImpl(final CacheManager cacheManager, final StroomPropertyService stroomPropertyService,
93+
final IndexService indexService, final IndexShardService indexShardService, final NodeCache nodeCache, final TaskManager taskManager) {
8794
super(cacheManager, "Index Shard Writer Cache", MAX_CACHE_ENTRIES);
8895
this.stroomPropertyService = stroomPropertyService;
8996
this.indexService = indexService;
@@ -95,6 +102,83 @@ public class IndexShardWriterCacheImpl extends AbstractCacheBean<IndexShardKey,
95102
setMaxLiveTime(1, TimeUnit.DAYS);
96103
}
97104

105+
@Override
106+
public void addDocument(final IndexShardKey indexShardKey, final Document document) {
107+
if (document != null) {
108+
109+
IndexShardWriter indexShardWriter = getIndexShardWriter(indexShardKey);
110+
111+
// Try and add the document silently without locking.
112+
boolean success = false;
113+
try {
114+
indexShardWriter.addDocument(document);
115+
success = true;
116+
} catch (final Throwable t) {
117+
LOGGER.trace(t::getMessage, t);
118+
}
119+
120+
// Attempt a few more times under lock.
121+
for (int i = 0; !success && i < 100; i++) {
122+
// If we failed then try under lock to make sure we get a new writer.
123+
final Lock lock = keyLocks.getLockForKey(indexShardKey);
124+
lock.lock();
125+
try {
126+
// Ask the cache for the current one (it might have been changed by another thread) and try again.
127+
indexShardWriter = getIndexShardWriter(indexShardKey);
128+
success = addDocument(indexShardWriter, document);
129+
130+
if (!success) {
131+
// Failed to add it so remove this object from the cache and try to get another one.
132+
remove(indexShardKey);
133+
}
134+
135+
} finally {
136+
lock.unlock();
137+
}
138+
}
139+
140+
// One final try that will throw an index exception.
141+
if (!success) {
142+
try {
143+
indexShardWriter = getIndexShardWriter(indexShardKey);
144+
indexShardWriter.addDocument(document);
145+
} catch (final IndexException e) {
146+
throw e;
147+
} catch (final Throwable e) {
148+
throw new IndexException(e.getMessage(), e);
149+
}
150+
}
151+
}
152+
}
153+
154+
private boolean addDocument(final IndexShardWriter indexShardWriter, final Document document) {
155+
boolean success = false;
156+
try {
157+
indexShardWriter.addDocument(document);
158+
success = true;
159+
} catch (final AlreadyClosedException | IndexException e) {
160+
LOGGER.debug(e::getMessage, e);
161+
162+
} catch (final Throwable t) {
163+
LOGGER.error(t::getMessage, t);
164+
165+
// Mark the shard as corrupt as this should be the
166+
// only reason we can't add a document.
167+
indexShardWriter.setStatus(IndexShardStatus.CORRUPT);
168+
}
169+
170+
return success;
171+
}
172+
173+
private IndexShardWriter getIndexShardWriter(final IndexShardKey indexShardKey) {
174+
final IndexShardWriter indexShardWriter = get(indexShardKey);
175+
if (indexShardWriter == null) {
176+
throw new IndexException("Unable to get writer for index '" + indexShardKey.getIndex().getName()
177+
+ "'. Please check the index has active volumes.");
178+
}
179+
return indexShardWriter;
180+
}
181+
98182
/**
99183
* Overrides method in simple pool. Will be called when an item is created
100184
* by the pool.
@@ -124,7 +208,7 @@ private IndexShardWriter getExistingWriter(final IndexShardKey key) {
124208
// Look for closed, non deleted, non full, non corrupt index shard
125209
// to add to
126210
if (indexShard.getIndex().getId() == key.getIndex().getId()
127-
&& writer.getPartition().equals(key.getPartition()) && writer.isOkToReuse()) {
211+
&& writer.getPartition().equals(key.getPartition()) && IndexShardStatus.CLOSED.equals(writer.getStatus()) && !writer.isFull()) {
128212
try {
129213
// Open the writer.
130214
final boolean success = writer.open(false);
@@ -310,7 +394,7 @@ private void deleteLogicallyDeleted() {
310394
final Entry<IndexShard, IndexShardWriter> entry = iter.next();
311395
final IndexShardWriter writer = entry.getValue();
312396
try {
313-
if (writer != null && writer.isDeleted()) {
397+
if (writer != null && IndexShardStatus.DELETED.equals(writer.getStatus())) {
314398
LOGGER.debug(() -> "deleteLogicallyDeleted() - Deleting index shard " + writer.getIndexShard().getId());
315399

316400
if (writer.deleteFromDisk()) {
@@ -488,8 +572,12 @@ public void execute() {
488572
}
489573

490574
@Override
491-
public IndexShardWriter getWriter(final IndexShard indexShard) {
492-
return ownedWriters.get(indexShard);
575+
public IndexWriter getWriter(final IndexShard indexShard) {
576+
final IndexShardWriter indexShardWriter = ownedWriters.get(indexShard);
577+
if (indexShardWriter != null) {
578+
return indexShardWriter.getWriter();
579+
}
580+
return null;
493581
}
494582

495583
@Override

stroom-index/src/main/java/stroom/index/server/IndexShardWriter.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,25 @@
1717
package stroom.index.server;
1818

1919
import org.apache.lucene.document.Document;
20+
import org.apache.lucene.index.CorruptIndexException;
2021
import org.apache.lucene.index.IndexWriter;
2122

23+
import org.apache.lucene.store.AlreadyClosedException;
2224
import stroom.index.shared.Index;
2325
import stroom.index.shared.IndexShard;
2426
import stroom.cache.AbstractCacheBean.Destroyable;
27+
import stroom.index.shared.IndexShard.IndexShardStatus;
2528

26-
public interface IndexShardWriter extends Destroyable {
27-
void check();
28-
29-
boolean open(boolean create);
29+
import java.io.IOException;
3030

31-
boolean isOpen();
31+
interface IndexShardWriter extends Destroyable {
32+
void check();
3233

33-
boolean isClosed();
34+
IndexShardStatus getStatus();
3435

35-
boolean isDeleted();
36+
void setStatus(IndexShardStatus status);
3637

37-
boolean isOkToReuse();
38+
boolean open(boolean create);
3839

3940
boolean isFull();
4041

@@ -48,7 +49,7 @@ public interface IndexShardWriter extends Destroyable {
4849

4950
void updateIndex(Index index);
5051

51-
boolean addDocument(Document document);
52+
void addDocument(Document document) throws IOException, IndexException, AlreadyClosedException;
5253

5354
int getDocumentCount();
5455

0 commit comments

Comments
 (0)