Skip to content

Commit 7b2fc88

Browse files
[8.19] Add LINQ to ESQL provider methods (#8858) (#8863)
Co-authored-by: Florian Bernd <git@flobernd.de>
1 parent 6d978d9 commit 7b2fc88

7 files changed

Lines changed: 538 additions & 4 deletions

File tree

src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<PackageId>Elastic.Clients.Elasticsearch</PackageId>
@@ -31,6 +31,7 @@
3131
</PropertyGroup>
3232

3333
<ItemGroup>
34+
<PackageReference Include="Elastic.Esql" Version="0.10.0" />
3435
<PackageReference Include="Elastic.Transport" Version="0.15.1" />
3536
<PackageReference Include="PolySharp" Version="1.15.0">
3637
<PrivateAssets>all</PrivateAssets>
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using System.IO;
8+
using System.Text.Json;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
using Elastic.Esql;
12+
using Elastic.Esql.Execution;
13+
using Elastic.Esql.QueryModel;
14+
using Elastic.Transport;
15+
16+
#if NET10_0_OR_GREATER
17+
using System.IO.Pipelines;
18+
#endif
19+
20+
namespace Elastic.Clients.Elasticsearch.Esql;
21+
22+
/// <summary>
23+
/// Implements <see cref="IEsqlQueryExecutor"/> by delegating to the native
24+
/// <see cref="EsqlNamespacedClient"/> typed request/response pipeline.
25+
/// </summary>
26+
internal sealed class EsqlQueryExecutor : IEsqlQueryExecutor
27+
{
28+
private readonly EsqlNamespacedClient _client;
29+
30+
public EsqlQueryExecutor(EsqlNamespacedClient client)
31+
{
32+
_client = client ?? throw new ArgumentNullException(nameof(client));
33+
}
34+
35+
public IEsqlResponse ExecuteQuery(string esql, EsqlParameters? parameters, object? options)
36+
{
37+
var queryOptions = ResolveOptions(options);
38+
var request = BuildQueryRequest(esql, parameters, queryOptions);
39+
request.BeforeRequest();
40+
var response = _client.DoRequest<EsqlQueryRequest, StreamResponse, EsqlQueryRequestParameters>(request);
41+
return new EsqlTransportResponse(response);
42+
}
43+
44+
public async Task<IEsqlAsyncResponse> ExecuteQueryAsync(string esql, EsqlParameters? parameters, object? options, CancellationToken cancellationToken)
45+
{
46+
var queryOptions = ResolveOptions(options);
47+
var request = BuildQueryRequest(esql, parameters, queryOptions);
48+
request.BeforeRequest();
49+
var response = await _client.DoRequestAsync<EsqlQueryRequest, StreamResponse, EsqlQueryRequestParameters>(request, cancellationToken)
50+
.ConfigureAwait(false);
51+
return new EsqlTransportAsyncResponse(response);
52+
}
53+
54+
public IEsqlResponse SubmitAsyncQuery(string esql, EsqlParameters? parameters, object? options, EsqlAsyncQueryOptions? asyncOptions)
55+
{
56+
var queryOptions = ResolveOptions(options);
57+
var request = BuildAsyncQueryRequest(esql, parameters, queryOptions, asyncOptions);
58+
request.BeforeRequest();
59+
var response = _client.DoRequest<AsyncQueryRequest, StreamResponse, AsyncQueryRequestParameters>(request);
60+
return new EsqlTransportResponse(response);
61+
}
62+
63+
public async Task<IEsqlAsyncResponse> SubmitAsyncQueryAsync(string esql, EsqlParameters? parameters, object? options, EsqlAsyncQueryOptions? asyncOptions, CancellationToken cancellationToken)
64+
{
65+
var queryOptions = ResolveOptions(options);
66+
var request = BuildAsyncQueryRequest(esql, parameters, queryOptions, asyncOptions);
67+
request.BeforeRequest();
68+
var response = await _client.DoRequestAsync<AsyncQueryRequest, StreamResponse, AsyncQueryRequestParameters>(request, cancellationToken)
69+
.ConfigureAwait(false);
70+
return new EsqlTransportAsyncResponse(response);
71+
}
72+
73+
public IEsqlResponse PollAsyncQuery(string queryId, object? options)
74+
{
75+
var queryOptions = ResolveOptions(options);
76+
var request = new AsyncQueryGetRequest(queryId) { Format = EsqlFormat.Json };
77+
if (queryOptions?.RequestConfiguration is not null)
78+
request.RequestConfiguration = queryOptions.RequestConfiguration;
79+
request.BeforeRequest();
80+
var response = _client.DoRequest<AsyncQueryGetRequest, StreamResponse, AsyncQueryGetRequestParameters>(request);
81+
return new EsqlTransportResponse(response);
82+
}
83+
84+
public async Task<IEsqlAsyncResponse> PollAsyncQueryAsync(string queryId, object? options, CancellationToken cancellationToken)
85+
{
86+
var queryOptions = ResolveOptions(options);
87+
var request = new AsyncQueryGetRequest(queryId) { Format = EsqlFormat.Json };
88+
if (queryOptions?.RequestConfiguration is not null)
89+
request.RequestConfiguration = queryOptions.RequestConfiguration;
90+
request.BeforeRequest();
91+
var response = await _client.DoRequestAsync<AsyncQueryGetRequest, StreamResponse, AsyncQueryGetRequestParameters>(request, cancellationToken)
92+
.ConfigureAwait(false);
93+
return new EsqlTransportAsyncResponse(response);
94+
}
95+
96+
public void DeleteAsyncQuery(string queryId, object? options)
97+
{
98+
var queryOptions = ResolveOptions(options);
99+
var request = new AsyncQueryDeleteRequest(queryId);
100+
if (queryOptions?.RequestConfiguration is not null)
101+
request.RequestConfiguration = queryOptions.RequestConfiguration;
102+
request.BeforeRequest();
103+
_client.DoRequest<AsyncQueryDeleteRequest, AsyncQueryDeleteResponse, AsyncQueryDeleteRequestParameters>(request);
104+
}
105+
106+
public async Task DeleteAsyncQueryAsync(string queryId, object? options, CancellationToken cancellationToken)
107+
{
108+
var queryOptions = ResolveOptions(options);
109+
var request = new AsyncQueryDeleteRequest(queryId);
110+
if (queryOptions?.RequestConfiguration is not null)
111+
request.RequestConfiguration = queryOptions.RequestConfiguration;
112+
request.BeforeRequest();
113+
await _client.DoRequestAsync<AsyncQueryDeleteRequest, AsyncQueryDeleteResponse, AsyncQueryDeleteRequestParameters>(request, cancellationToken)
114+
.ConfigureAwait(false);
115+
}
116+
117+
private static EsqlQueryOptions? ResolveOptions(object? options) =>
118+
options as EsqlQueryOptions;
119+
120+
private static EsqlQueryRequest BuildQueryRequest(string esql, EsqlParameters? parameters, EsqlQueryOptions? options)
121+
{
122+
var request = new EsqlQueryRequest(esql)
123+
{
124+
Format = EsqlFormat.Json,
125+
Columnar = false,
126+
Params = MergeAndConvertParams(parameters, options?.NamedParameters)
127+
};
128+
129+
ApplyQueryOptions(request, options);
130+
return request;
131+
}
132+
133+
private static AsyncQueryRequest BuildAsyncQueryRequest(string esql, EsqlParameters? parameters, EsqlQueryOptions? queryOptions, EsqlAsyncQueryOptions? asyncOptions)
134+
{
135+
var request = new AsyncQueryRequest(esql)
136+
{
137+
Format = EsqlFormat.Json,
138+
Columnar = false,
139+
Params = MergeAndConvertParams(parameters, queryOptions?.NamedParameters)
140+
};
141+
142+
ApplyQueryOptions(request, queryOptions);
143+
144+
if (asyncOptions is not null)
145+
{
146+
if (asyncOptions.WaitForCompletionTimeout is { } waitTimeout)
147+
request.WaitForCompletionTimeout = new Duration(waitTimeout);
148+
149+
if (asyncOptions.KeepAlive is { } keepAlive)
150+
request.KeepAlive = new Duration(keepAlive);
151+
152+
request.KeepOnCompletion = asyncOptions.KeepOnCompletion;
153+
}
154+
155+
return request;
156+
}
157+
158+
private static void ApplyQueryOptions(EsqlQueryRequest request, EsqlQueryOptions? options)
159+
{
160+
if (options is null)
161+
return;
162+
163+
request.Locale = options.Locale;
164+
request.TimeZone = options.TimeZone;
165+
request.Filter = options.Filter;
166+
request.AllowPartialResults = options.AllowPartialResults;
167+
request.DropNullColumns = options.DropNullColumns;
168+
request.ProjectRouting = options.ProjectRouting;
169+
170+
if (options.RequestConfiguration is not null)
171+
request.RequestConfiguration = options.RequestConfiguration;
172+
}
173+
174+
private static void ApplyQueryOptions(AsyncQueryRequest request, EsqlQueryOptions? options)
175+
{
176+
if (options is null)
177+
return;
178+
179+
request.Locale = options.Locale;
180+
request.TimeZone = options.TimeZone;
181+
request.Filter = options.Filter;
182+
request.AllowPartialResults = options.AllowPartialResults;
183+
request.DropNullColumns = options.DropNullColumns;
184+
request.ProjectRouting = options.ProjectRouting;
185+
186+
if (options.RequestConfiguration is not null)
187+
request.RequestConfiguration = options.RequestConfiguration;
188+
}
189+
190+
private static Union<ICollection<ICollection<FieldValue>>, ICollection<KeyValuePair<string, ICollection<FieldValue>>>>?
191+
MergeAndConvertParams(EsqlParameters? translated, Dictionary<string, FieldValue>? userParams)
192+
{
193+
var hasTranslated = translated is not null && translated.HasParameters;
194+
var hasUser = userParams is { Count: > 0 };
195+
196+
if (!hasTranslated && !hasUser)
197+
return null;
198+
199+
var merged = new Dictionary<string, FieldValue>();
200+
201+
if (hasTranslated)
202+
{
203+
foreach (var kvp in translated!.Parameters)
204+
merged[kvp.Key] = ConvertJsonElement(kvp.Value);
205+
}
206+
207+
if (hasUser)
208+
{
209+
foreach (var kvp in userParams!)
210+
merged[kvp.Key] = kvp.Value;
211+
}
212+
213+
var namedParams = new List<KeyValuePair<string, ICollection<FieldValue>>>(merged.Count);
214+
foreach (var kvp in merged)
215+
{
216+
namedParams.Add(new KeyValuePair<string, ICollection<FieldValue>>(
217+
kvp.Key, [kvp.Value]));
218+
}
219+
220+
return new Union<ICollection<ICollection<FieldValue>>, ICollection<KeyValuePair<string, ICollection<FieldValue>>>>(namedParams);
221+
222+
static FieldValue ConvertJsonElement(JsonElement element) =>
223+
element.ValueKind switch
224+
{
225+
JsonValueKind.String => FieldValue.String(element.GetString()!),
226+
JsonValueKind.Number when element.TryGetInt64(out var l) => FieldValue.Long(l),
227+
JsonValueKind.Number => FieldValue.Double(element.GetDouble()),
228+
JsonValueKind.True => FieldValue.True,
229+
JsonValueKind.False => FieldValue.False,
230+
JsonValueKind.Null or JsonValueKind.Undefined => FieldValue.Null,
231+
_ => FieldValue.String(element.GetRawText())
232+
};
233+
}
234+
}
235+
236+
internal sealed class EsqlTransportResponse : IEsqlResponse
237+
{
238+
private readonly StreamResponse _response;
239+
240+
public EsqlTransportResponse(StreamResponse response) => _response = response;
241+
242+
public Stream Body => _response.Body;
243+
244+
public void Dispose() => _response.Dispose();
245+
}
246+
247+
#if NET10_0_OR_GREATER
248+
internal sealed class EsqlTransportAsyncResponse : IEsqlAsyncResponse
249+
{
250+
private readonly StreamResponse _response;
251+
252+
public EsqlTransportAsyncResponse(StreamResponse response)
253+
{
254+
_response = response;
255+
Body = PipeReader.Create(response.Body);
256+
}
257+
258+
public PipeReader Body { get; }
259+
260+
public async ValueTask DisposeAsync()
261+
{
262+
await Body.CompleteAsync().ConfigureAwait(false);
263+
_response.Dispose();
264+
}
265+
}
266+
#else
267+
internal sealed class EsqlTransportAsyncResponse : IEsqlAsyncResponse
268+
{
269+
private readonly StreamResponse _response;
270+
271+
public EsqlTransportAsyncResponse(StreamResponse response) => _response = response;
272+
273+
public Stream Body => _response.Body;
274+
275+
public ValueTask DisposeAsync()
276+
{
277+
_response.Dispose();
278+
return default;
279+
}
280+
}
281+
#endif
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
7+
using Elastic.Transport;
8+
9+
namespace Elastic.Clients.Elasticsearch.Esql;
10+
11+
/// <summary>Per-query options for LINQ-to-ES|QL queries executed via WithOptions.</summary>
12+
public sealed record EsqlQueryOptions
13+
{
14+
/// <summary>Per-request transport configuration (timeouts, headers, auth).</summary>
15+
public IRequestConfiguration? RequestConfiguration { get; init; }
16+
17+
/// <summary>If true, partial results will be returned on shard failures.</summary>
18+
public bool? AllowPartialResults { get; init; }
19+
20+
/// <summary>If true, entirely null columns are removed from the response.</summary>
21+
public bool? DropNullColumns { get; init; }
22+
23+
/// <summary>A Query DSL filter applied to the document set before the ES|QL query runs.</summary>
24+
public QueryDsl.Query? Filter { get; init; }
25+
26+
/// <summary>Locale for result formatting (e.g., "en-US").</summary>
27+
public string? Locale { get; init; }
28+
29+
/// <summary>Project routing for serverless cross-project queries.</summary>
30+
public string? ProjectRouting { get; init; }
31+
32+
/// <summary>Default timezone for date operations (e.g., "UTC").</summary>
33+
public string? TimeZone { get; init; }
34+
35+
/// <summary>
36+
/// User-supplied named parameters. Merged with parameters from the translated query.
37+
/// If a key exists in both, NamedParameters takes precedence.
38+
/// </summary>
39+
public Dictionary<string, FieldValue>? NamedParameters { get; init; }
40+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Linq.Expressions;
7+
8+
using Elastic.Esql.Core;
9+
10+
namespace Elastic.Clients.Elasticsearch.Esql;
11+
12+
/// <summary>Extension methods for attaching query options to LINQ-to-ES|QL queries.</summary>
13+
public static class EsqlQueryableExtensions
14+
{
15+
/// <summary>Attaches ES|QL query options to the query pipeline.</summary>
16+
public static IEsqlQueryable<T> WithOptions<T>(this IEsqlQueryable<T> source, EsqlQueryOptions options)
17+
{
18+
var method = new Func<IEsqlQueryable<T>, EsqlQueryOptions, IEsqlQueryable<T>>(WithOptions).Method;
19+
return (IEsqlQueryable<T>)source.Provider.CreateQuery<T>(
20+
Expression.Call(null, method, source.Expression, Expression.Constant(options)));
21+
}
22+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using Elastic.Esql.Core;
6+
using Elastic.Esql.QueryModel;
7+
8+
namespace Elastic.Clients.Elasticsearch.Esql;
9+
10+
internal sealed class EsqlSourceInferenceInterceptor : IEsqlQueryInterceptor
11+
{
12+
private readonly Inferrer _inferrer;
13+
14+
public EsqlSourceInferenceInterceptor(Inferrer inferrer) => _inferrer = inferrer;
15+
16+
public EsqlQuery Intercept(EsqlQuery query)
17+
{
18+
if (query.Source is not null)
19+
return query;
20+
21+
var indexName = _inferrer.IndexName(query.ElementType);
22+
return query.WithSource(indexName);
23+
}
24+
}

0 commit comments

Comments
 (0)