Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 25 additions & 47 deletions src/Couchbase.Analytics/Async/QueryHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@

using System.Text.Json;
using Couchbase.AnalyticsClient.Internal;
using Couchbase.AnalyticsClient.Options;
using Couchbase.AnalyticsClient.Results;

namespace Couchbase.AnalyticsClient.Async;

/// <summary>
/// Represents a handle to a server-side asynchronous query.
/// Obtained from <see cref="Cluster.StartQueryAsync"/> or <see cref="Cluster.QueryHandleFromSerialized"/>.
/// Obtained from <see cref="Cluster.StartQueryAsync"/>.
/// </summary>
public class QueryHandle
{
private readonly IAnalyticsService _analyticsService;
private readonly TimeSpan? _requestTimeout;

/// <summary>
/// The query handle string used to poll status and fetch results.
/// This is the path segment after <c>/api/v1/request/status/</c>.
/// The query handle string used to poll for the result handle.
/// </summary>
public string Handle { get; }

Expand All @@ -44,75 +44,53 @@ public class QueryHandle
/// </summary>
public string RequestId { get; }

internal QueryHandle(string handle, string requestId, IAnalyticsService analyticsService, TimeSpan? requestTimeout = null)
internal QueryHandle(string handle, string requestId, IAnalyticsService analyticsService)
{
Handle = handle ?? throw new ArgumentNullException(nameof(handle));
RequestId = requestId ?? throw new ArgumentNullException(nameof(requestId));
_analyticsService = analyticsService ?? throw new ArgumentNullException(nameof(analyticsService));
_requestTimeout = requestTimeout;
}

/// <summary>
/// Fetches the current status of the asynchronous query from the server.
/// Fetches the result handle of the asynchronous query from the server.
/// </summary>
/// <param name="options">Options for fetching the result handle.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A <see cref="QueryStatus"/> representing the current state of the query.</returns>
public async Task<QueryStatus> FetchStatusAsync(CancellationToken cancellationToken = default)
/// <returns>A <see cref="QueryResultHandle"/> if results are ready, otherwise null.</returns>
public Task<QueryResultHandle?> FetchResultHandleAsync(FetchResultHandleOptions? options = null, CancellationToken cancellationToken = default)
{
return await _analyticsService.FetchStatusAsync(Handle, _requestTimeout, cancellationToken)
.ConfigureAwait(false);
options ??= new FetchResultHandleOptions();
return _analyticsService.FetchResultHandleAsync(this, options, cancellationToken);
}

/// <summary>
/// Discards the query results on the server. After this call, the results can no longer be fetched.
/// Fetches the result handle of the asynchronous query from the server.
/// </summary>
/// <param name="cancellationToken">A cancellation token.</param>
public async Task DiscardResultsAsync(CancellationToken cancellationToken = default)
public Task<QueryResultHandle?> FetchResultHandleAsync(Func<FetchResultHandleOptions, FetchResultHandleOptions> options, CancellationToken cancellationToken = default)
{
await _analyticsService.DiscardResultsAsync(Handle, _requestTimeout, cancellationToken)
.ConfigureAwait(false);
var fetchOptions = new FetchResultHandleOptions();
fetchOptions = options.Invoke(fetchOptions);
return FetchResultHandleAsync(fetchOptions, cancellationToken);
}

/// <summary>
/// Cancels the query on the server. If the query has already completed, this is a no-op.
/// </summary>
/// <param name="options">Options for cancellation.</param>
/// <param name="cancellationToken">A cancellation token.</param>
public async Task CancelAsync(CancellationToken cancellationToken = default)
{
await _analyticsService.CancelQueryAsync(RequestId, _requestTimeout, cancellationToken)
.ConfigureAwait(false);
}

/// <summary>
/// Serializes this <see cref="QueryHandle"/> to a JSON string so it can be persisted and
/// later reconstructed via <see cref="Cluster.QueryHandleFromSerialized"/>.
/// This method does not perform any network operations.
/// </summary>
/// <returns>A JSON string containing the handle and request ID.</returns>
public string Serialize()
public Task CancelAsync(CancelOptions? options = null, CancellationToken cancellationToken = default)
{
var data = new SerializedQueryHandle(Handle, RequestId);
return JsonSerializer.Serialize(data);
options ??= new CancelOptions();
return _analyticsService.CancelQueryAsync(RequestId, options, cancellationToken);
}

/// <summary>
/// Deserializes a <see cref="QueryHandle"/> from a JSON string previously produced by <see cref="Serialize"/>.
/// This method does not perform any network operations.
/// Cancels the query on the server. If the query has already completed, this is a no-op.
/// </summary>
internal static QueryHandle Deserialize(string serializedHandle, IAnalyticsService analyticsService, TimeSpan? requestTimeout = null)
public Task CancelAsync(Func<CancelOptions, CancelOptions> options, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(serializedHandle);

var data = JsonSerializer.Deserialize<SerializedQueryHandle>(serializedHandle)
?? throw new ArgumentException("Invalid serialized handle format.", nameof(serializedHandle));

if (string.IsNullOrWhiteSpace(data.Handle) || string.IsNullOrWhiteSpace(data.RequestId))
{
throw new ArgumentException("Serialized handle is missing required fields.", nameof(serializedHandle));
}

return new QueryHandle(data.Handle, data.RequestId, analyticsService, requestTimeout);
var cancelOptions = new CancelOptions();
cancelOptions = options.Invoke(cancelOptions);
return CancelAsync(cancelOptions, cancellationToken);
}

private record SerializedQueryHandle(string Handle, string RequestId);
}
58 changes: 0 additions & 58 deletions src/Couchbase.Analytics/Async/QueryHandleResults.cs

This file was deleted.

22 changes: 0 additions & 22 deletions src/Couchbase.Analytics/Async/QueryPartition.cs

This file was deleted.

90 changes: 90 additions & 0 deletions src/Couchbase.Analytics/Async/QueryResultHandle.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#region License
/* ************************************************************
*
* @author Couchbase <info@couchbase.com>
* @copyright 2025 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ************************************************************/
#endregion

using Couchbase.AnalyticsClient.Internal;
using Couchbase.AnalyticsClient.Options;
using Couchbase.AnalyticsClient.Results;

namespace Couchbase.AnalyticsClient.Async;

/// <summary>
/// Provides access to the results of a completed asynchronous query.
/// </summary>
public class QueryResultHandle
{
private readonly string _handlePath;
private readonly IAnalyticsService _analyticsService;

/// <summary>
/// The request ID assigned by the server when the query was submitted.
/// </summary>
public string RequestId { get; }

internal QueryResultHandle(string handlePath, string requestId, IAnalyticsService analyticsService)
{
_handlePath = handlePath ?? throw new ArgumentNullException(nameof(handlePath));
RequestId = requestId ?? throw new ArgumentNullException(nameof(requestId));
_analyticsService = analyticsService ?? throw new ArgumentNullException(nameof(analyticsService));
}

/// <summary>
/// Fetches the results of the query from the server.
/// </summary>
/// <param name="options">Options for fetching the results.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>An <see cref="IQueryResult"/> that can be used to enumerate the result rows.</returns>
public Task<IQueryResult> FetchResultsAsync(FetchResultsOptions? options = null, CancellationToken cancellationToken = default)
{
options ??= new FetchResultsOptions();
return _analyticsService.FetchResultsAsync(RequestId, _handlePath, options, cancellationToken);
}

/// <summary>
/// Fetches the results of the query from the server.
/// </summary>
public Task<IQueryResult> FetchResultsAsync(Func<FetchResultsOptions, FetchResultsOptions> options, CancellationToken cancellationToken = default)
{
var fetchOptions = new FetchResultsOptions();
fetchOptions = options.Invoke(fetchOptions);
return FetchResultsAsync(fetchOptions, cancellationToken);
}

/// <summary>
/// Discards the query results on the server. After this call, the results can no longer be fetched.
/// </summary>
/// <param name="options">Options for discarding results.</param>
/// <param name="cancellationToken">A cancellation token.</param>
public Task DiscardResultsAsync(DiscardResultsOptions? options = null, CancellationToken cancellationToken = default)
{
options ??= new DiscardResultsOptions();
return _analyticsService.DiscardResultsAsync(RequestId, _handlePath, options, cancellationToken);
}

/// <summary>
/// Discards the query results on the server. After this call, the results can no longer be fetched.
/// </summary>
public Task DiscardResultsAsync(Func<DiscardResultsOptions, DiscardResultsOptions> options, CancellationToken cancellationToken = default)
{
var discardOptions = new DiscardResultsOptions();
discardOptions = options.Invoke(discardOptions);
return DiscardResultsAsync(discardOptions, cancellationToken);
}
}
Loading
Loading