-
Notifications
You must be signed in to change notification settings - Fork 57
Expand file tree
/
Copy pathTemporalConnection.cs
More file actions
295 lines (270 loc) · 10.6 KB
/
TemporalConnection.cs
File metadata and controls
295 lines (270 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
#pragma warning disable VSTHRD003 // We await a task we created in constructor
#pragma warning disable CA1001 // We are disposing in destructor by intention
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Temporalio.Bridge.Api.Grpc.Health.V1;
using Temporalio.Runtime;
namespace Temporalio.Client
{
/// <summary>
/// Connection to Temporal.
/// </summary>
/// <remarks>
/// Connections are thread-safe and are encouraged to be reused.
/// </remarks>
public sealed class TemporalConnection : ITemporalConnection, IBridgeClientProviderInternal
{
// Not set if not lazy
private readonly SemaphoreSlim? semaphoreForLazyClient;
private readonly object rpcMetadataLock = new();
private readonly object apiKeyLock = new();
private Bridge.Client? client;
private IReadOnlyCollection<KeyValuePair<string, string>> rpcMetadata;
private IReadOnlyCollection<KeyValuePair<string, byte[]>> rpcBinaryMetadata;
private string? apiKey;
private TemporalConnection(TemporalConnectionOptions options, bool lazy)
{
WorkflowService = new WorkflowService.Core(this);
OperatorService = new OperatorService.Core(this);
CloudService = new CloudService.Core(this);
TestService = new TestService.Core(this);
Options = options;
if (options.RpcMetadata == null)
{
rpcMetadata = Array.Empty<KeyValuePair<string, string>>();
}
else
{
rpcMetadata = new List<KeyValuePair<string, string>>(options.RpcMetadata);
}
if (options.RpcBinaryMetadata == null)
{
rpcBinaryMetadata = Array.Empty<KeyValuePair<string, byte[]>>();
}
else
{
rpcBinaryMetadata = new List<KeyValuePair<string, byte[]>>(options.RpcBinaryMetadata);
}
apiKey = options.ApiKey;
// Set default identity if unset
options.Identity ??= System.Diagnostics.Process.GetCurrentProcess().Id
+ "@"
+ System.Net.Dns.GetHostName();
// Only set semaphore if lazy
if (lazy)
{
semaphoreForLazyClient = new(1, 1);
}
}
/// <summary>
/// Finalizes an instance of the <see cref="TemporalConnection" /> class.
/// </summary>
~TemporalConnection()
{
client?.Dispose();
semaphoreForLazyClient?.Dispose();
}
/// <inheritdoc />
public IReadOnlyCollection<KeyValuePair<string, string>> RpcMetadata
{
get
{
lock (rpcMetadataLock)
{
return rpcMetadata;
}
}
set
{
var client = this.client;
if (client == null)
{
throw new InvalidOperationException("Cannot set RPC metadata if client never connected");
}
lock (rpcMetadataLock)
{
// Set on Rust side first to prevent errors from affecting field
client.UpdateMetadata(value);
// We copy this every time just to be safe
rpcMetadata = new List<KeyValuePair<string, string>>(value);
}
}
}
/// <inheritdoc />
public IReadOnlyCollection<KeyValuePair<string, byte[]>> RpcBinaryMetadata
{
get
{
lock (rpcMetadataLock)
{
return rpcBinaryMetadata;
}
}
set
{
var client = this.client;
if (client == null)
{
throw new InvalidOperationException("Cannot set RPC metadata if client never connected");
}
lock (rpcMetadataLock)
{
// Set on Rust side first to prevent errors from affecting field
client.UpdateBinaryMetadata(value);
// We copy this every time just to be safe
rpcBinaryMetadata = new List<KeyValuePair<string, byte[]>>(value);
}
}
}
/// <inheritdoc />
public string? ApiKey
{
get
{
lock (apiKeyLock)
{
return apiKey;
}
}
set
{
var client = this.client;
if (client == null)
{
throw new InvalidOperationException("Cannot set API key if client never connected");
}
lock (apiKeyLock)
{
// Set on Rust side first to prevent errors from affecting field
#pragma warning disable VSTHRD002 // We know it's completed
client.UpdateApiKey(value);
#pragma warning restore VSTHRD002
apiKey = value;
}
}
}
/// <inheritdoc />
public WorkflowService WorkflowService { get; private init; }
/// <inheritdoc />
public OperatorService OperatorService { get; private init; }
/// <inheritdoc />
public CloudService CloudService { get; private init; }
/// <inheritdoc />
public TestService TestService { get; private init; }
/// <inheritdoc />
public TemporalConnectionOptions Options { get; private init; }
/// <inheritdoc />
public bool IsConnected => client != null;
/// <inheritdoc />
public SafeHandle? BridgeClient => client?.Handle;
/// <inheritdoc />
Bridge.SafeClientHandle? IBridgeClientProviderInternal.ClientHandle => client?.Handle;
/// <inheritdoc />
Bridge.Runtime? IBridgeClientProviderInternal.Runtime => client?.Runtime;
/// <summary>
/// Connect to Temporal.
/// </summary>
/// <param name="options">Options for connecting.</param>
/// <returns>The established connection.</returns>
/// <exception cref="System.InvalidOperationException">Thrown when cannot successfully connect.</exception>
public static async Task<TemporalConnection> ConnectAsync(TemporalConnectionOptions options)
{
var conn = new TemporalConnection(options, lazy: false);
await conn.GetBridgeClientAsync().ConfigureAwait(false);
return conn;
}
/// <summary>
/// Create a client that will connect to Temporal lazily upon first use. If an initial
/// connection fails, it will be retried next time it is needed. Unconnected clients made
/// from lazy connections cannot be used by workers. Note, <see cref="RpcMetadata" /> cannot
/// be set until a connection is made.
/// </summary>
/// <param name="options">Options for connecting.</param>
/// <returns>The not-yet-connected connection.</returns>
public static TemporalConnection CreateLazy(TemporalConnectionOptions options) =>
new(options, lazy: true);
/// <inheritdoc />
public async Task<bool> CheckHealthAsync(RpcService? service = null, RpcOptions? options = null)
{
var client = await GetBridgeClientAsync().ConfigureAwait(false);
var serviceName = service?.FullName ?? "temporal.api.workflowservice.v1.WorkflowService";
var resp = await client.CallAsync(
Bridge.Interop.TemporalCoreRpcService.Health,
"Check",
new HealthCheckRequest() { Service = serviceName },
HealthCheckResponse.Parser,
options?.Retry ?? false,
options?.Metadata,
options?.BinaryMetadata,
options?.Timeout,
options?.CancellationToken).ConfigureAwait(false);
return resp.Status == HealthCheckResponse.Types.ServingStatus.Serving;
}
/// <inheritdoc />
public Task ConnectAsync() => GetBridgeClientAsync();
/// <summary>
/// Invoke RPC call on this connection.
/// </summary>
/// <typeparam name="T">Proto response type.</typeparam>
/// <param name="service">RPC service to call.</param>
/// <param name="rpc">RPC operation.</param>
/// <param name="req">Request proto.</param>
/// <param name="resp">Response proto parser.</param>
/// <param name="options">RPC options.</param>
/// <returns>Response proto.</returns>
internal async Task<T> InvokeRpcAsync<T>(
RpcService service,
string rpc,
IMessage req,
MessageParser<T> resp,
RpcOptions? options = null)
where T : IMessage<T>
{
var client = await GetBridgeClientAsync().ConfigureAwait(false);
return await client.CallAsync(
service.Service,
rpc,
req,
resp,
options?.Retry ?? false,
options?.Metadata,
options?.BinaryMetadata,
options?.Timeout,
options?.CancellationToken).ConfigureAwait(false);
}
private async Task<Bridge.Client> GetBridgeClientAsync()
{
// Return client if already not-null (without lock)
if (client is not null)
{
return client;
}
// Attempt connect under semaphore if present
if (semaphoreForLazyClient is not null)
{
await semaphoreForLazyClient.WaitAsync().ConfigureAwait(false);
}
try
{
// Return client if already not-null (with lock)
#pragma warning disable CA1508 // False positive in concurrent situation
if (client != null)
{
return client;
}
#pragma warning restore CA1508
var runtime = Options.Runtime ?? TemporalRuntime.Default;
client = await Bridge.Client.ConnectAsync(runtime.Runtime, Options).ConfigureAwait(false);
return client;
}
finally
{
semaphoreForLazyClient?.Release();
}
}
}
}