Skip to content

Commit 3250008

Browse files
authored
Merge branch 'main' into tvclean0
2 parents 4d2982f + 268923d commit 3250008

File tree

31 files changed

+2676
-1282
lines changed

31 files changed

+2676
-1282
lines changed

docs/changelog/138982.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138982
2+
summary: Semantic search CCS support when ccs_minimize_roundtrips=false
3+
area: Vector Search
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/rest/action/RestActions.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.action.FailedNodeException;
1414
import org.elasticsearch.action.ShardOperationFailedException;
15+
import org.elasticsearch.action.search.SearchRequest;
1516
import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
1617
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
1718
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
@@ -47,6 +48,7 @@ public class RestActions {
4748
public static final ParseField SKIPPED_FIELD = new ParseField("skipped");
4849
public static final ParseField FAILED_FIELD = new ParseField("failed");
4950
public static final ParseField FAILURES_FIELD = new ParseField("failures");
51+
public static final ParseField PROJECT_ROUTING = new ParseField("project_routing");
5052

5153
public static long parseVersion(RestRequest request) {
5254
if (request.hasParam("version")) {
@@ -260,10 +262,14 @@ public RestResponse buildResponse(NodesResponse response, XContentBuilder builde
260262

261263
}
262264

265+
public static QueryBuilder getQueryContent(XContentParser parser) {
266+
return getQueryContent(parser, null);
267+
}
268+
263269
/**
264270
* Parses a top level query including the query element that wraps it
265271
*/
266-
public static QueryBuilder getQueryContent(XContentParser parser) {
272+
public static QueryBuilder getQueryContent(XContentParser parser, SearchRequest searchRequest) {
267273
try {
268274
QueryBuilder queryBuilder = null;
269275
XContentParser.Token first = parser.nextToken();
@@ -281,6 +287,9 @@ public static QueryBuilder getQueryContent(XContentParser parser) {
281287
String currentName = parser.currentName();
282288
if ("query".equals(currentName)) {
283289
queryBuilder = parseTopLevelQuery(parser);
290+
} else if (PROJECT_ROUTING.match(currentName, parser.getDeprecationHandler()) && searchRequest != null) {
291+
parser.nextToken();
292+
searchRequest.setProjectRouting(parser.text());
284293
} else {
285294
throw new ParsingException(parser.getTokenLocation(), "request does not support [" + parser.currentName() + "]");
286295
}

server/src/main/java/org/elasticsearch/rest/action/cat/RestCountAction.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.ElasticsearchException;
1414
import org.elasticsearch.action.search.SearchRequest;
1515
import org.elasticsearch.action.search.SearchResponse;
16+
import org.elasticsearch.action.support.IndicesOptions;
1617
import org.elasticsearch.client.internal.node.NodeClient;
1718
import org.elasticsearch.common.Strings;
1819
import org.elasticsearch.common.Table;
@@ -25,24 +26,31 @@
2526
import org.elasticsearch.rest.action.RestActions;
2627
import org.elasticsearch.rest.action.RestResponseListener;
2728
import org.elasticsearch.search.builder.SearchSourceBuilder;
29+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
2830

2931
import java.io.IOException;
3032
import java.util.List;
3133

3234
import static org.elasticsearch.rest.RestRequest.Method.GET;
35+
import static org.elasticsearch.rest.RestRequest.Method.POST;
3336

3437
@ServerlessScope(Scope.PUBLIC)
3538
public class RestCountAction extends AbstractCatAction {
3639

37-
private final Settings settings;
40+
private final CrossProjectModeDecider crossProjectModeDecider;
3841

3942
public RestCountAction(Settings settings) {
40-
this.settings = settings;
43+
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
4144
}
4245

4346
@Override
4447
public List<Route> routes() {
45-
return List.of(new Route(GET, "/_cat/count"), new Route(GET, "/_cat/count/{index}"));
48+
return List.of(
49+
new Route(GET, "/_cat/count"),
50+
new Route(POST, "/_cat/count"),
51+
new Route(GET, "/_cat/count/{index}"),
52+
new Route(POST, "/_cat/count/{index}")
53+
);
4654
}
4755

4856
@Override
@@ -58,24 +66,25 @@ protected void documentation(StringBuilder sb) {
5866

5967
@Override
6068
public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
61-
if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
62-
// accept but drop project_routing param until fully supported
63-
request.param("project_routing");
64-
}
65-
6669
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
6770
SearchRequest countRequest = new SearchRequest(indices);
6871
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
6972
countRequest.source(searchSourceBuilder);
73+
if (crossProjectModeDecider.crossProjectEnabled() && countRequest.allowsCrossProject()) {
74+
countRequest.indicesOptions(
75+
IndicesOptions.builder().crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true)).build()
76+
);
77+
}
7078
try {
7179
request.withContentOrSourceParamParserOrNull(parser -> {
7280
if (parser == null) {
7381
QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);
7482
if (queryBuilder != null) {
83+
// since there is no request body, no need to pass in countRequest to handle project_routing param
7584
searchSourceBuilder.query(queryBuilder);
7685
}
7786
} else {
78-
searchSourceBuilder.query(RestActions.getQueryContent(parser));
87+
searchSourceBuilder.query(RestActions.getQueryContent(parser, countRequest));
7988
}
8089
});
8190
} catch (IOException e) {

server/src/main/java/org/elasticsearch/rest/action/search/RestCountAction.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.rest.action.RestActions;
2525
import org.elasticsearch.rest.action.RestBuilderListener;
2626
import org.elasticsearch.search.builder.SearchSourceBuilder;
27+
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
2728
import org.elasticsearch.xcontent.XContentBuilder;
2829

2930
import java.io.IOException;
@@ -37,10 +38,10 @@
3738
@ServerlessScope(Scope.PUBLIC)
3839
public class RestCountAction extends BaseRestHandler {
3940

40-
private Settings settings;
41+
private final CrossProjectModeDecider crossProjectModeDecider;
4142

4243
public RestCountAction(Settings settings) {
43-
this.settings = settings;
44+
this.crossProjectModeDecider = new CrossProjectModeDecider(settings);
4445
}
4546

4647
@Override
@@ -60,23 +61,26 @@ public String getName() {
6061

6162
@Override
6263
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
63-
if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
64-
// accept but drop project_routing param until fully supported
65-
request.param("project_routing");
64+
SearchRequest countRequest = new SearchRequest(Strings.splitStringByCommaToArray(request.param("index")));
65+
IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, countRequest.indicesOptions());
66+
if (crossProjectModeDecider.crossProjectEnabled() && countRequest.allowsCrossProject()) {
67+
indicesOptions = IndicesOptions.builder(indicesOptions)
68+
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
69+
.build();
6670
}
71+
countRequest.indicesOptions(indicesOptions);
6772

68-
SearchRequest countRequest = new SearchRequest(Strings.splitStringByCommaToArray(request.param("index")));
69-
countRequest.indicesOptions(IndicesOptions.fromRequest(request, countRequest.indicesOptions()));
7073
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
7174
countRequest.source(searchSourceBuilder);
7275
request.withContentOrSourceParamParserOrNull(parser -> {
7376
if (parser == null) {
7477
QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);
7578
if (queryBuilder != null) {
79+
// since there is no request body, no need to pass in countRequest to handle project_routing param
7680
searchSourceBuilder.query(queryBuilder);
7781
}
7882
} else {
79-
searchSourceBuilder.query(RestActions.getQueryContent(parser));
83+
searchSourceBuilder.query(RestActions.getQueryContent(parser, countRequest));
8084
}
8185
});
8286
countRequest.routing(request.param("routing"));

server/src/test/java/org/elasticsearch/rest/action/RestActionsTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.rest.action;
1111

1212
import org.elasticsearch.action.ShardOperationFailedException;
13+
import org.elasticsearch.action.search.SearchRequest;
1314
import org.elasticsearch.action.search.ShardSearchFailure;
1415
import org.elasticsearch.cluster.metadata.IndexMetadata;
1516
import org.elasticsearch.common.ParsingException;
@@ -42,6 +43,8 @@
4243
import static java.util.Collections.emptyList;
4344
import static org.elasticsearch.index.query.QueryStringQueryBuilder.DEFAULT_OPERATOR;
4445
import static org.hamcrest.CoreMatchers.equalTo;
46+
import static org.junit.Assert.assertNotNull;
47+
import static org.junit.Assert.assertNull;
4548

4649
public class RestActionsTests extends ESTestCase {
4750

@@ -233,6 +236,51 @@ public void testUrlParamsToQueryBuilderError() {
233236
);
234237
}
235238

239+
public void testParseWithProjectRouting() throws IOException {
240+
QueryBuilder query = new MatchQueryBuilder("foo", "bar");
241+
String requestBody1 = """
242+
{
243+
"query": _QUERY_,
244+
"project_routing": "_alias:_origin"
245+
}
246+
""";
247+
String requestBody2 = """
248+
{
249+
"project_routing": "_csp:aws AND (_region:us* OR _region:eu-west-1)",
250+
"query": _QUERY_
251+
}
252+
""";
253+
254+
{
255+
String requestBody = randomFrom(requestBody1, requestBody2).replaceFirst("_QUERY_", query.toString());
256+
try (XContentParser parser = createParser(JsonXContent.jsonXContent, requestBody)) {
257+
// if no SearchRequest passed in, an error should be thrown that project_routing is not supported for that endpoint
258+
ParsingException e = expectThrows(ParsingException.class, () -> RestActions.getQueryContent(parser));
259+
assertEquals(e.getMessage(), "request does not support [project_routing]");
260+
}
261+
}
262+
{
263+
SearchRequest searchRequest = new SearchRequest("index");
264+
String requestBody = requestBody1.replaceFirst("_QUERY_", query.toString());
265+
try (XContentParser parser = createParser(JsonXContent.jsonXContent, requestBody)) {
266+
assertNull(searchRequest.getProjectRouting());
267+
QueryBuilder actual = RestActions.getQueryContent(parser, searchRequest);
268+
assertEquals(query, actual);
269+
assertEquals(searchRequest.getProjectRouting(), "_alias:_origin");
270+
}
271+
}
272+
{
273+
String requestBody = requestBody2.replaceFirst("_QUERY_", query.toString());
274+
try (XContentParser parser = createParser(JsonXContent.jsonXContent, requestBody)) {
275+
SearchRequest searchRequest = new SearchRequest("index");
276+
assertNull(searchRequest.getProjectRouting());
277+
QueryBuilder actual = RestActions.getQueryContent(parser, searchRequest);
278+
assertEquals(query, actual);
279+
assertEquals(searchRequest.getProjectRouting(), "_csp:aws AND (_region:us* OR _region:eu-west-1)");
280+
}
281+
}
282+
}
283+
236284
private static ShardSearchFailure createShardFailureParsingException(String nodeId, int shardId, String clusterAlias) {
237285
String index = "index";
238286
ParsingException ex = new ParsingException(0, 0, "error", new IllegalArgumentException("some bad argument"));

test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import org.elasticsearch.xcontent.XContentFactory;
2525
import org.elasticsearch.xcontent.XContentType;
2626

27+
import java.io.ByteArrayOutputStream;
2728
import java.io.IOException;
2829
import java.net.URLDecoder;
2930
import java.util.HashMap;
30-
import java.util.Locale;
31+
import java.util.LinkedHashMap;
3132
import java.util.Map;
3233
import java.util.Objects;
3334
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.regex.Pattern;
3436
import java.util.stream.Collectors;
3537

3638
import static fixture.gcs.MockGcsBlobStore.failAndThrow;
@@ -153,22 +155,29 @@ public void handle(final HttpExchange exchange) throws IOException {
153155
}
154156

155157
} else if (Regex.simpleMatch("POST /batch/storage/v1", request)) {
156-
// Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
157-
final String uri = "/storage/v1/b/" + bucket + "/o/";
158-
final StringBuilder batch = new StringBuilder();
159-
for (String line : Streams.readAllLines(requestBody.streamInput())) {
160-
if (line.isEmpty() || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
161-
batch.append(line).append("\r\n");
162-
} else if (line.startsWith("DELETE")) {
163-
final String name = line.substring(line.indexOf(uri) + uri.length(), line.lastIndexOf(" HTTP"));
164-
if (Strings.hasText(name)) {
165-
mockGcsBlobStore.deleteBlob(URLDecoder.decode(name, UTF_8));
166-
batch.append("HTTP/1.1 204 NO_CONTENT").append("\r\n");
167-
batch.append("\r\n");
158+
// https://docs.cloud.google.com/storage/docs/batch#http
159+
final var boundary = MultipartContent.Reader.getBoundary(exchange);
160+
final var batchReader = MultipartContent.Reader.readStream(boundary, requestBody.streamInput());
161+
final var responseStream = new ByteArrayOutputStream();
162+
final var batchWriter = new MultipartContent.Writer(boundary, responseStream);
163+
while (batchReader.hasNext()) {
164+
final var batchItem = batchReader.next();
165+
final var contentId = batchItem.headers().get("content-id");
166+
// only deletes are supported in batch
167+
final var objectName = parseBatchItemDeleteObject(bucket, batchItem.content());
168+
mockGcsBlobStore.deleteBlob(objectName);
169+
final var responsePartHeaders = new LinkedHashMap<String, String>() {
170+
{
171+
put("content-type", "application/http");
172+
put("content-id", "response-" + contentId);
168173
}
169-
}
174+
};
175+
final var responsePartContent = "HTTP/1.1 204 No Content\r\n\r\n";
176+
batchWriter.write(MultipartContent.Part.of(responsePartHeaders, responsePartContent));
170177
}
171-
byte[] response = batch.toString().getBytes(UTF_8);
178+
batchWriter.end();
179+
180+
byte[] response = responseStream.toByteArray();
172181
exchange.getResponseHeaders().add("Content-Type", exchange.getRequestHeaders().getFirst("Content-Type"));
173182
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
174183
exchange.getResponseBody().write(response);
@@ -255,6 +264,32 @@ private void writeBlobVersionAsJson(HttpExchange exchange, MockGcsBlobStore.Blob
255264
}
256265
}
257266

267+
// Example of DELETE batch item status line
268+
// DELETE http://127.0.0.1:49177/storage/v1/b/bucket/o/test/tests-vQzflxz2Swa_bhmlM6gtyA/data-5odMgVMYTbKAI6DxS0qi-A.dat HTTP/1.1";
269+
static final Pattern BATCH_ITEM_HTTP_LINE = Pattern.compile(
270+
"(?<method>\\w+) (.+)/storage/v1/b/(?<bucket>.+)/o/(?<object>.+) HTTP/1\\.1"
271+
);
272+
273+
static String parseBatchItemDeleteObject(String bucket, BytesReference bytes) {
274+
final var s = bytes.utf8ToString();
275+
return s.lines().findFirst().map(line -> {
276+
var matcher = BATCH_ITEM_HTTP_LINE.matcher(line);
277+
if (matcher.find() == false) {
278+
throw new IllegalStateException("Cannot parse batch item HTTP line: " + line);
279+
}
280+
var method = matcher.group("method");
281+
if (method.equals("DELETE") == false) {
282+
throw new IllegalStateException("Expected DELETE item, found " + line);
283+
}
284+
var _bucket = matcher.group("bucket");
285+
if (bucket.equals(_bucket) == false) {
286+
throw new IllegalStateException("Bucket does not match expected: " + bucket + ", got: " + _bucket);
287+
}
288+
return URLDecoder.decode(matcher.group("object"), UTF_8);
289+
290+
}).orElseThrow();
291+
}
292+
258293
record ListBlobsResponse(String bucket, MockGcsBlobStore.PageOfBlobs pageOfBlobs) implements ToXContent {
259294

260295
@Override

0 commit comments

Comments
 (0)