Skip to content

Commit 2b6a7fe

Browse files
authored
Fix issues with ReinitializingSourceProvider (elastic#118370) (elastic#118430)
The previous fix to ensure that each thread uses its own SearchProvider wasn't good enough. The read from `perThreadProvider` field could be stale and therefore returning a previous source provider. Instead the source provider should be returned from `provider` local variable. This change also addresses another issue, sometimes current docid goes backwards compared to last seen docid and this causes issue when synthetic source provider is used, as doc values can't advance backwards. This change addresses that by returning a new source provider if backwards docid is detected. Closes elastic#118238
1 parent 3bb04d0 commit 2b6a7fe

File tree

3 files changed

+41
-8
lines changed

3 files changed

+41
-8
lines changed

docs/changelog/118370.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 118370
2+
summary: Fix concurrency issue with `ReinitializingSourceProvider`
3+
area: Mapping
4+
type: bug
5+
issues:
6+
- 118238

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.xpack.esql.core.type.DataType;
3838
import org.elasticsearch.xpack.esql.parser.ParsingException;
3939
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
40+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
4041
import org.junit.Before;
4142

4243
import java.io.IOException;
@@ -1673,17 +1674,32 @@ public void testScriptField() throws Exception {
16731674
String sourceMode = randomBoolean() ? "stored" : "synthetic";
16741675
Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode);
16751676
client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get();
1676-
for (int i = 0; i < 10; i++) {
1677+
int numDocs = 256;
1678+
for (int i = 0; i < numDocs; i++) {
16771679
index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i));
16781680
}
16791681
refresh("test-script");
1680-
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) {
1682+
1683+
var pragmas = randomPragmas();
1684+
if (canUseQueryPragmas()) {
1685+
Settings.Builder pragmaSettings = Settings.builder().put(pragmas.getSettings());
1686+
pragmaSettings.put("task_concurrency", 10);
1687+
pragmaSettings.put("data_partitioning", "doc");
1688+
pragmas = new QueryPragmas(pragmaSettings.build());
1689+
}
1690+
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT " + numDocs, pragmas)) {
16811691
List<Object> k1Column = Iterators.toList(resp.column(0));
1682-
assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
1692+
assertThat(k1Column, equalTo(LongStream.range(0L, numDocs).boxed().toList()));
16831693
List<Object> k2Column = Iterators.toList(resp.column(1));
1684-
assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null));
1694+
assertThat(k2Column, equalTo(Collections.nCopies(numDocs, null)));
16851695
List<Object> meterColumn = Iterators.toList(resp.column(2));
1686-
assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0));
1696+
var expectedMeterColumn = new ArrayList<>(numDocs);
1697+
double val = 0.0;
1698+
for (int i = 0; i < numDocs; i++) {
1699+
expectedMeterColumn.add(val);
1700+
val += 10000.0;
1701+
}
1702+
assertThat(meterColumn, equalTo(expectedMeterColumn));
16871703
}
16881704
}
16891705

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,23 @@
1515
import java.util.function.Supplier;
1616

1717
/**
18-
* This is a workaround for when compute engine executes concurrently with data partitioning by docid.
18+
* This class exists as a workaround for using SourceProvider in the compute engine.
19+
* <p>
20+
* The main issue is when compute engine executes concurrently with data partitioning by docid (inter segment parallelization).
21+
* A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses
22+
* its own {@link SourceProvider}.
23+
* <p>
24+
* Additionally, this source provider protects against going backwards, which the synthetic source provider can't handle.
1925
*/
2026
final class ReinitializingSourceProvider implements SourceProvider {
2127

2228
private PerThreadSourceProvider perThreadProvider;
2329
private final Supplier<SourceProvider> sourceProviderFactory;
2430

31+
// Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized:
32+
// (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards)
33+
private int lastSeenDocId;
34+
2535
ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
2636
this.sourceProviderFactory = sourceProviderFactory;
2737
}
@@ -30,11 +40,12 @@ final class ReinitializingSourceProvider implements SourceProvider {
3040
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
3141
var currentThread = Thread.currentThread();
3242
PerThreadSourceProvider provider = perThreadProvider;
33-
if (provider == null || provider.creatingThread != currentThread) {
43+
if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) {
3444
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
3545
this.perThreadProvider = provider;
3646
}
37-
return perThreadProvider.source.getSource(ctx, doc);
47+
lastSeenDocId = doc;
48+
return provider.source.getSource(ctx, doc);
3849
}
3950

4051
private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {

0 commit comments

Comments
 (0)