Skip to content

Commit 10ac71b

Browse files
thomasmuellerandreeastroe96
authored andcommitted
OAK-11365 Incremental index store: ability to set a timeout (apache#1962)
1 parent 01f3833 commit 10ac71b

File tree

5 files changed

+40
-5
lines changed

5 files changed

+40
-5
lines changed

oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,10 @@ public IndexStore buildStore() throws IOException, CommitFailedException {
266266
}
267267

268268
public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint) throws IOException, CommitFailedException {
269+
return buildStore(initialCheckpoint, finalCheckpoint, Long.MAX_VALUE);
270+
}
271+
272+
public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint, long maxDurationSeconds) throws IOException, CommitFailedException {
269273
IncrementalStoreBuilder builder;
270274
IndexStore incrementalStore;
271275
Set<IndexDefinition> indexDefinitions = indexerSupport.getIndexDefinitions();
@@ -308,6 +312,7 @@ public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint) t
308312
try {
309313
builder = new IncrementalStoreBuilder(indexHelper.getWorkDir(), indexHelper, initialCheckpoint, finalCheckpoint)
310314
.withPreferredPathElements(preferredPathElements)
315+
.withMaxDurationSeconds(maxDurationSeconds)
311316
.withPathPredicate(predicate)
312317
.withBlobStore(indexHelper.getGCBlobStore());
313318
incrementalStore = builder.build();

oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.io.BufferedWriter;
3131
import java.io.IOException;
32+
import java.util.concurrent.TimeUnit;
3233
import java.util.function.Predicate;
3334

3435
public class IncrementalFlatFileStoreEditor implements Editor {
@@ -38,18 +39,29 @@ public class IncrementalFlatFileStoreEditor implements Editor {
3839
private final IncrementalFlatFileStoreNodeStateEntryWriter entryWriter;
3940
private final Predicate<String> predicate;
4041
private final IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy;
42+
// if not 0, timeout if System.nanoTime() exceeds this value
43+
private final long timeoutAtNanos;
4144
private static final int LINE_SEP_LENGTH = System.getProperty("line.separator").length();
4245

4346
public IncrementalFlatFileStoreEditor(BufferedWriter bufferedWriter, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, Predicate<String> predicate,
44-
IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy) {
47+
IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy, long maxDurationSeconds) {
4548
this.bufferedWriter = bufferedWriter;
4649
this.entryWriter = entryWriter;
4750
this.predicate = predicate;
4851
this.incrementalFlatFileStoreStrategy = incrementalFlatFileStoreStrategy;
52+
long timeout;
53+
if (maxDurationSeconds == Long.MAX_VALUE) {
54+
timeout = 0;
55+
} else {
56+
timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(maxDurationSeconds, TimeUnit.SECONDS);
57+
log.info("Max duration: " + maxDurationSeconds + " timeout: " + timeout + " now: " + System.nanoTime());
58+
}
59+
this.timeoutAtNanos = timeout;
4960
}
5061

5162
@Override
5263
public void enter(NodeState before, NodeState after) {
64+
checkTimeout();
5365
}
5466

5567
@Override
@@ -112,4 +124,13 @@ private void writeToFile(NodeState e, IncrementalStoreOperand action) {
112124
throw new RuntimeException("Error while creating incremental store", ex);
113125
}
114126
}
127+
128+
private void checkTimeout() {
129+
if (timeoutAtNanos != 0) {
130+
long now = System.nanoTime();
131+
if (now > timeoutAtNanos) {
132+
throw new RuntimeException("Timeout");
133+
}
134+
}
135+
}
115136
}

oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,11 @@ public class IncrementalFlatFileStoreStrategy implements IncrementalIndexStoreSo
6363
private long textSize = 0;
6464
private long entryCount = 0;
6565
private final Set<String> preferredPathElements;
66+
private final long maxDurationSeconds;
6667

6768
public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull String beforeCheckpoint, @NotNull String afterCheckpoint, File storeDir,
6869
Set<String> preferredPathElements, @NotNull Compression algorithm,
69-
Predicate<String> pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter) {
70+
Predicate<String> pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, long maxDurationSeconds) {
7071
this.nodeStore = nodeStore;
7172
this.beforeCheckpoint = beforeCheckpoint;
7273
this.afterCheckpoint = afterCheckpoint;
@@ -76,6 +77,7 @@ public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull String bef
7677
this.entryWriter = entryWriter;
7778
this.preferredPathElements = preferredPathElements;
7879
this.comparator = new PathElementComparator(preferredPathElements);
80+
this.maxDurationSeconds = maxDurationSeconds;
7981
}
8082

8183
@Override
@@ -85,7 +87,7 @@ public File createSortedStoreFile() throws IOException {
8587
try (BufferedWriter w = FlatFileStoreUtils.createWriter(file, algorithm)) {
8688
NodeState before = Objects.requireNonNull(nodeStore.retrieve(beforeCheckpoint));
8789
NodeState after = Objects.requireNonNull(nodeStore.retrieve(afterCheckpoint));
88-
Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this)), before, after);
90+
Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this, maxDurationSeconds)), before, after);
8991
if (e != null) {
9092
log.error("Exception while building incremental store for checkpoint before {}, after {}", beforeCheckpoint, afterCheckpoint, e);
9193
throw new RuntimeException(e);

oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class IncrementalStoreBuilder {
5050
private final IndexHelper indexHelper;
5151
private final String initialCheckpoint;
5252
private final String finalCheckpoint;
53+
private long maxDurationSeconds = Long.MAX_VALUE;
5354
private Predicate<String> pathPredicate = path -> true;
5455
private Set<String> preferredPathElements = Collections.emptySet();
5556
private BlobStore blobStore;
@@ -107,6 +108,10 @@ public IncrementalStoreBuilder withBlobStore(BlobStore blobStore) {
107108
return this;
108109
}
109110

111+
public IncrementalStoreBuilder withMaxDurationSeconds(long maxDurationSeconds) {
112+
this.maxDurationSeconds = maxDurationSeconds;
113+
return this;
114+
}
110115

111116
public IndexStore build() throws IOException, CompositeException {
112117
logFlags();
@@ -115,11 +120,12 @@ public IndexStore build() throws IOException, CompositeException {
115120
if (sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_FFS_STORE ||
116121
sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_TREE_STORE) {
117122
IncrementalFlatFileStoreNodeStateEntryWriter entryWriter = new IncrementalFlatFileStoreNodeStateEntryWriter(blobStore);
123+
118124
IncrementalIndexStoreSortStrategy strategy = new IncrementalFlatFileStoreStrategy(
119125
indexHelper.getNodeStore(),
120126
initialCheckpoint,
121127
finalCheckpoint,
122-
dir, preferredPathElements, algorithm, pathPredicate, entryWriter);
128+
dir, preferredPathElements, algorithm, pathPredicate, entryWriter, maxDurationSeconds);
123129
File metadataFile = strategy.createMetadataFile();
124130
File incrementalStoreFile = strategy.createSortedStoreFile();
125131
long entryCount = strategy.getEntryCount();
@@ -147,4 +153,5 @@ private void logFlags() {
147153
log.info("Compression enabled while sorting : {} ({})", IndexStoreUtils.compressionEnabled(), OAK_INDEXER_USE_ZIP);
148154
log.info("LZ4 enabled for compression algorithm : {} ({})", IndexStoreUtils.useLZ4(), OAK_INDEXER_USE_LZ4);
149155
}
156+
150157
}

oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ private IncrementalFlatFileStoreStrategy createIncrementalStrategy(Backend backe
401401
readOnlyNodeStore.retrieve(finalCheckpoint);
402402
return new IncrementalFlatFileStoreStrategy(
403403
readOnlyNodeStore, initialCheckpoint, finalCheckpoint, sortFolder.getRoot(), preferredPathElements,
404-
algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore));
404+
algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore), Long.MAX_VALUE);
405405
}
406406

407407
private void createBaseContent(NodeStore rwNodeStore) throws CommitFailedException {

0 commit comments

Comments
 (0)