-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathCluster.cs
More file actions
243 lines (207 loc) · 11.4 KB
/
Cluster.cs
File metadata and controls
243 lines (207 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#region License
/* ************************************************************
*
* @author Couchbase <info@couchbase.com>
* @copyright 2025 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ************************************************************/
#endregion
using System.Collections.Concurrent;
using Couchbase.AnalyticsClient.Async;
using Couchbase.AnalyticsClient.HTTP;
using Couchbase.AnalyticsClient.Internal;
using Couchbase.AnalyticsClient.Internal.DI;
using Couchbase.AnalyticsClient.Options;
using Couchbase.AnalyticsClient.Results;
using Microsoft.Extensions.Logging;
namespace Couchbase.AnalyticsClient;
public partial class Cluster : IDisposable
{
private volatile ICredential _credential;
private readonly ClusterOptions _clusterOptions;
private readonly ILogger<Cluster> _logger;
private readonly ICouchbaseServiceProvider _serviceProvider;
private readonly LazyService<IAnalyticsService> _analyticsService;
private readonly ConcurrentDictionary<string, Database> _databases = new();
private Cluster(ICredential credential, ClusterOptions clusterOptions)
{
if (string.IsNullOrWhiteSpace(clusterOptions.ConnectionString))
{
throw new ArgumentException("ConnectionString cannot be null or empty.", nameof(clusterOptions));
}
_credential = credential ?? throw new ArgumentNullException(nameof(credential));
_clusterOptions = clusterOptions ?? throw new ArgumentNullException(nameof(clusterOptions));
_serviceProvider = clusterOptions.BuildServiceProvider(() => _credential);
_logger = _serviceProvider.GetRequiredService<ILogger<Cluster>>();
_analyticsService = new LazyService<IAnalyticsService>(_serviceProvider);
LogClusterCreated(_logger, clusterOptions.ConnectionString);
}
/// <summary>
/// Creates a cluster with a connection string and credentials, allowing configuration of cluster options.
/// </summary>
/// <param name="connectionString">The connection string for the cluster</param>
/// <param name="credential">The credentials to use for authentication</param>
/// <param name="configureOptions">Action to configure cluster options</param>
/// <returns>A Cluster instance</returns>
/// <exception cref="ArgumentException">Thrown when the connection string is null or empty, or the credential is null</exception>
public static Cluster Create(string connectionString, ICredential credential, Func<ClusterOptions, ClusterOptions> configureOptions)
{
if (string.IsNullOrWhiteSpace(connectionString))
throw new ArgumentException("Connection string cannot be null or empty.", nameof(connectionString));
ArgumentNullException.ThrowIfNull(credential);
var options = new ClusterOptions
{
ConnectionString = connectionString
};
options = configureOptions.Invoke(options);
return new Cluster(credential, options);
}
/// <summary>
/// Creates a cluster with a connection string and credentials, allowing configuration of cluster options.
/// </summary>
/// <param name="connectionString">The connection string for the cluster</param>
/// <param name="credential">The credentials to use for authentication</param>
/// <param name="clusterOptions">The cluster options to use for the cluster</param>
/// <returns>A Cluster instance</returns>
/// <exception cref="ArgumentException">Thrown when the connection string is null or empty, or the credential is null</exception>
public static Cluster Create(string connectionString, ICredential credential, ClusterOptions clusterOptions)
{
if (string.IsNullOrWhiteSpace(connectionString))
throw new ArgumentException("Connection string cannot be null or empty.", nameof(connectionString));
ArgumentNullException.ThrowIfNull(credential);
clusterOptions ??= new ClusterOptions();
clusterOptions.ConnectionString = connectionString;
return new Cluster(credential, clusterOptions);
}
/// <summary>
/// Creates a cluster with a connection string and credentials, allowing configuration of cluster options.
/// </summary>
/// <param name="connectionString">The connection string for the cluster</param>
/// <param name="credential">The credentials to use for authentication</param>
/// <returns>A Cluster instance</returns>
/// <exception cref="ArgumentException">Thrown when the connection string is null or empty, or the credential is null</exception>
public static Cluster Create(string connectionString, ICredential credential)
{
return Create(connectionString, credential, new ClusterOptions());
}
/// <summary>
/// Creates a cluster with cluster options that must include a connection string.
/// </summary>
/// <param name="credential">The credentials to use for authentication</param>
/// <param name="clusterOptions">Pre-configured cluster options with connection string</param>
/// <returns>A Cluster instance</returns>
/// <exception cref="ArgumentNullException">Thrown when the credential or cluster options are null</exception>
public static Cluster Create(ICredential credential, ClusterOptions clusterOptions)
{
ArgumentNullException.ThrowIfNull(credential);
ArgumentNullException.ThrowIfNull(clusterOptions);
return new Cluster(credential, clusterOptions);
}
/// <summary>
/// Creates a cluster with a connection string, username/password credential, and an options builder.
/// </summary>
public static Cluster Create(string connectionString, Credential credential, Func<ClusterOptions, ClusterOptions> configureOptions)
=> Create(connectionString, (ICredential)credential, configureOptions);
/// <summary>
/// Creates a cluster with a connection string, username/password credential, and cluster options.
/// </summary>
public static Cluster Create(string connectionString, Credential credential, ClusterOptions clusterOptions)
=> Create(connectionString, (ICredential)credential, clusterOptions);
/// <summary>
/// Creates a cluster with a connection string and username/password credential.
/// </summary>
public static Cluster Create(string connectionString, Credential credential)
=> Create(connectionString, (ICredential)credential);
/// <summary>
/// Creates a cluster with a username/password credential and cluster options.
/// </summary>
public static Cluster Create(Credential credential, ClusterOptions clusterOptions)
=> Create((ICredential)credential, clusterOptions);
/// <summary>
/// Replaces the credential used for all subsequent HTTP requests.
/// Thread-safe. The new credential must be the same type as the current credential.
/// </summary>
/// <param name="newCredential">The new credential to use.</param>
/// <exception cref="InvalidOperationException">Thrown if the new credential is a different type than the current one.</exception>
public void UpdateCredential(ICredential newCredential)
{
ArgumentNullException.ThrowIfNull(newCredential);
var current = _credential;
if (current.GetType() != newCredential.GetType())
throw new InvalidOperationException(
$"Cannot change credential type from {current.GetType().Name} to {newCredential.GetType().Name}.");
_credential = newCredential;
LogCredentialUpdated(_logger, current.GetType().Name);
}
public Task<IQueryResult> ExecuteQueryAsync(string statement, Func<QueryOptions, QueryOptions> options, CancellationToken cancellationToken = default)
{
var queryOptions = new QueryOptions();
queryOptions = options.Invoke(queryOptions);
return ExecuteQueryAsync(statement, queryOptions, cancellationToken);
}
public async Task<IQueryResult> ExecuteQueryAsync(string statement, QueryOptions? options = null, CancellationToken cancellationToken = default)
{
var service = _analyticsService.GetValueOrThrow();
return await service.SendAsync(statement, options ?? new QueryOptions(), cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Starts an asynchronous server-side query. The query is submitted to the server and a
/// <see cref="QueryHandle"/> is returned that can be used to poll status, fetch results,
/// cancel, or discard the query.
/// </summary>
/// <param name="statement">The analytics query statement to execute.</param>
/// <param name="options">Options for the async query, including timeouts and result TTL.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A <see cref="QueryHandle"/> for tracking and interacting with the query.</returns>
public async Task<QueryHandle> StartQueryAsync(string statement, StartQueryOptions? options = null, CancellationToken cancellationToken = default)
{
var service = _analyticsService.GetValueOrThrow();
return await service.StartQueryAsync(statement, options ?? new StartQueryOptions(), cancellationToken)
.ConfigureAwait(false);
}
/// <summary>
/// Starts an asynchronous server-side query with a fluent options builder.
/// </summary>
/// <param name="statement">The analytics query statement to execute.</param>
/// <param name="options">A function to configure the <see cref="StartQueryOptions"/>.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A <see cref="QueryHandle"/> for tracking and interacting with the query.</returns>
public Task<QueryHandle> StartQueryAsync(string statement, Func<StartQueryOptions, StartQueryOptions> options, CancellationToken cancellationToken = default)
{
var startQueryOptions = new StartQueryOptions();
startQueryOptions = options.Invoke(startQueryOptions);
return StartQueryAsync(statement, startQueryOptions, cancellationToken);
}
public Database Database(string databaseName)
{
return _databases.GetOrAdd(databaseName, database => new Database(this, database));
}
public void Dispose()
{
if (_serviceProvider is IDisposable disposableProvider)
{
disposableProvider.Dispose();
}
LogClusterDisposed(_logger);
}
#region Logging
[LoggerMessage(1, LogLevel.Information, "Analytics Cluster initialized for connection: {ConnectionString}")]
private static partial void LogClusterCreated(ILogger logger, string connectionString);
[LoggerMessage(2, LogLevel.Information, "Analytics Cluster credentials dynamically updated (Type: {CredentialType})")]
private static partial void LogCredentialUpdated(ILogger logger, string credentialType);
[LoggerMessage(3, LogLevel.Information, "Analytics Cluster disposed. Releasing managed resources.")]
private static partial void LogClusterDisposed(ILogger logger);
#endregion
}