-
Notifications
You must be signed in to change notification settings - Fork 533
Expand file tree
/
Copy pathCollectionCache.cs
More file actions
422 lines (375 loc) · 20 KB
/
CollectionCache.cs
File metadata and controls
422 lines (375 loc) · 20 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Common
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
#if !NETSTANDARD16
using System.Diagnostics;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
#endif
/// <summary>
/// Cache to provide resource id lookup based on resource name
/// </summary>
internal abstract class CollectionCache
{
/// <summary>
/// Master Service returns collection definition based on API Version and may not be always same for all API Versions.
/// Here the InternalCache stores collection information related to a particular API Version
/// </summary>
protected class InternalCache
{
internal InternalCache(
bool enableAsyncCacheExceptionNoSharing = true)
{
this.collectionInfoByName = new AsyncCache<string, ContainerProperties>(
new CollectionRidComparer(),
enableAsyncCacheExceptionNoSharing: enableAsyncCacheExceptionNoSharing);
this.collectionInfoById = new AsyncCache<string, ContainerProperties>(
new CollectionRidComparer(),
enableAsyncCacheExceptionNoSharing: enableAsyncCacheExceptionNoSharing);
this.collectionInfoByNameLastRefreshTime = new ConcurrentDictionary<string, DateTime>();
this.collectionInfoByIdLastRefreshTime = new ConcurrentDictionary<string, DateTime>();
}
internal readonly AsyncCache<string, ContainerProperties> collectionInfoByName;
internal readonly AsyncCache<string, ContainerProperties> collectionInfoById;
internal readonly ConcurrentDictionary<string, DateTime> collectionInfoByNameLastRefreshTime;
internal readonly ConcurrentDictionary<string, DateTime> collectionInfoByIdLastRefreshTime;
}
/// <summary>
/// cacheByApiList caches the collection information by API Version. In general it is expected that only a single version is populated
/// for a collection, but this handles the situation if customer is using multiple API versions from different applications
/// </summary>
protected readonly InternalCache[] cacheByApiList;
protected CollectionCache(
bool enableAsyncCacheExceptionNoSharing = true)
{
this.cacheByApiList = new InternalCache[2];
this.cacheByApiList[0] = new InternalCache(enableAsyncCacheExceptionNoSharing); // for API version < 2018-12-31
this.cacheByApiList[1] = new InternalCache(enableAsyncCacheExceptionNoSharing); // for API version >= 2018-12-31
}
/// <summary>
/// Resolve the ContainerProperties object from the cache. If the collection was read before "refreshAfter" Timespan, force a cache refresh by reading from the backend.
/// </summary>
/// <param name="request">Request to resolve.</param>
/// <param name="refreshAfter"> Time duration to refresh</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <param name="trace">The trace.</param>
/// <returns>Instance of <see cref="ContainerProperties"/>.</returns>
public virtual Task<ContainerProperties> ResolveCollectionAsync(
DocumentServiceRequest request,
TimeSpan refreshAfter,
CancellationToken cancellationToken,
ITrace trace)
{
cancellationToken.ThrowIfCancellationRequested();
InternalCache cache = this.GetCache(request.Headers[HttpConstants.HttpHeaders.Version]);
#if !NETSTANDARD16
Debug.Assert(request.ForceNameCacheRefresh == false);
#endif
DateTime currentTime = DateTime.UtcNow;
DateTime lastRefreshTime = DateTime.MinValue;
if (request.IsNameBased)
{
string resourceFullName = PathsHelper.GetCollectionPath(request.ResourceAddress);
if (cache.collectionInfoByNameLastRefreshTime.TryGetValue(resourceFullName, out lastRefreshTime))
{
TimeSpan cachedItemStaleness = currentTime - lastRefreshTime;
if (cachedItemStaleness > refreshAfter)
{
cache.collectionInfoByName.TryRemoveIfCompleted(resourceFullName);
}
}
}
else
{
ResourceId resourceIdParsed = ResourceId.Parse(request.ResourceId);
string collectionResourceId = resourceIdParsed.DocumentCollectionId.ToString();
if (cache.collectionInfoByIdLastRefreshTime.TryGetValue(collectionResourceId, out lastRefreshTime))
{
TimeSpan cachedItemStaleness = currentTime - lastRefreshTime;
if (cachedItemStaleness > refreshAfter)
{
cache.collectionInfoById.TryRemoveIfCompleted(request.ResourceId);
}
}
}
return this.ResolveCollectionAsync(request, cancellationToken, trace);
}
/// <summary>
/// Resolves a request to a collection in a sticky manner.
/// Unless request.ForceNameCacheRefresh is equal to true, it will return the same collection.
/// </summary>
/// <param name="request">Request to resolve.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <param name="trace">The trace.</param>
/// <returns>Instance of <see cref="ContainerProperties"/>.</returns>
public virtual async Task<ContainerProperties> ResolveCollectionAsync(
DocumentServiceRequest request,
CancellationToken cancellationToken,
ITrace trace)
{
IClientSideRequestStatistics clientSideRequestStatistics = request.RequestContext?.ClientRequestStatistics;
if (request.IsNameBased)
{
if (request.ForceNameCacheRefresh)
{
await this.RefreshAsync(request, trace, clientSideRequestStatistics, cancellationToken);
request.ForceNameCacheRefresh = false;
}
ContainerProperties collectionInfo = await this.ResolveByPartitionKeyRangeIdentityAsync(
request.Headers[HttpConstants.HttpHeaders.Version],
request.PartitionKeyRangeIdentity,
trace,
clientSideRequestStatistics,
cancellationToken);
if (collectionInfo != null)
{
return collectionInfo;
}
if (request.RequestContext.ResolvedCollectionRid == null
|| !CollectionCache.IsCollectionRid(request.RequestContext.ResolvedCollectionRid))
{
if (request.RequestContext.ResolvedCollectionRid != null)
{
DefaultTrace.TraceWarning(
"ResolvedCollectionRid '{0}' for resource '{1}' is not a collection RID; falling back to name-based resolution. ActivityId: '{2}'",
request.RequestContext.ResolvedCollectionRid,
request.ResourceAddress,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
collectionInfo =
await this.ResolveByNameAsync(
apiVersion: request.Headers[HttpConstants.HttpHeaders.Version],
resourceAddress: request.ResourceAddress,
forceRefesh: false,
trace: trace,
clientSideRequestStatistics: clientSideRequestStatistics,
cancellationToken: cancellationToken);
if (collectionInfo != null)
{
DefaultTrace.TraceVerbose(
"Mapped resourceName {0} to resourceId {1}. '{2}'",
request.ResourceAddress,
collectionInfo.ResourceId,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
if (!CollectionCache.IsCollectionRid(collectionInfo.ResourceId))
{
throw new InvalidOperationException(
$"Resolved resource '{request.ResourceAddress}' has a non-collection ResourceId " +
$"'{collectionInfo.ResourceId}'. This indicates the server returned a database or " +
$"other resource RID instead of a collection RID. ActivityId: " +
$"'{System.Diagnostics.Trace.CorrelationManager.ActivityId}'.");
}
request.ResourceId = collectionInfo.ResourceId;
request.RequestContext.ResolvedCollectionRid = collectionInfo.ResourceId;
}
else
{
DefaultTrace.TraceVerbose(
"Collection with resourceName {0} not found. '{1}'",
request.ResourceAddress,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
return collectionInfo;
}
else
{
return await this.ResolveByRidAsync(request.Headers[HttpConstants.HttpHeaders.Version], request.RequestContext.ResolvedCollectionRid, trace, clientSideRequestStatistics, cancellationToken);
}
}
else
{
return await this.ResolveByPartitionKeyRangeIdentityAsync(request.Headers[HttpConstants.HttpHeaders.Version], request.PartitionKeyRangeIdentity, trace, clientSideRequestStatistics, cancellationToken) ??
await this.ResolveByRidAsync(request.Headers[HttpConstants.HttpHeaders.Version], request.ResourceAddress, trace, clientSideRequestStatistics, cancellationToken);
}
}
/// <summary>
/// This method is only used in client SDK in retry policy as it doesn't have request handy.
/// </summary>
public void Refresh(string resourceAddress, string apiVersion = null)
{
InternalCache cache = this.GetCache(apiVersion);
if (PathsHelper.IsNameBased(resourceAddress))
{
string resourceFullName = PathsHelper.GetCollectionPath(resourceAddress);
cache.collectionInfoByName.TryRemoveIfCompleted(resourceFullName);
}
}
protected abstract Task<ContainerProperties> GetByRidAsync(string apiVersion,
string collectionRid,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken);
protected abstract Task<ContainerProperties> GetByNameAsync(string apiVersion,
string resourceAddress,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken);
private async Task<ContainerProperties> ResolveByPartitionKeyRangeIdentityAsync(string apiVersion,
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken)
{
// if request is targeted at specific partition using x-ms-documentd-partitionkeyrangeid header,
// which contains value "<collectionrid>,<partitionkeyrangeid>", then resolve to collection rid in this header.
if (partitionKeyRangeIdentity?.CollectionRid != null)
{
try
{
return await this.ResolveByRidAsync(apiVersion, partitionKeyRangeIdentity.CollectionRid, trace, clientSideRequestStatistics, cancellationToken);
}
catch (NotFoundException)
{
// This is signal to the upper logic either in Gateway or client SDK to refresh
// collection cache and retry.
throw new InvalidPartitionException(RMResources.InvalidDocumentCollection);
}
}
return null;
}
private Task<ContainerProperties> ResolveByRidAsync(
string apiVersion,
string resourceId,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ResourceId resourceIdParsed = ResourceId.Parse(resourceId);
string collectionResourceId = resourceIdParsed.DocumentCollectionId.ToString();
InternalCache cache = this.GetCache(apiVersion);
return cache.collectionInfoById.GetAsync(
collectionResourceId,
null,
async () =>
{
DateTime currentTime = DateTime.UtcNow;
ContainerProperties collection = await this.GetByRidAsync(apiVersion, collectionResourceId, trace, clientSideRequestStatistics, cancellationToken);
cache.collectionInfoByIdLastRefreshTime.AddOrUpdate(collectionResourceId, currentTime,
(string currentKey, DateTime currentValue) => currentTime);
return collection;
},
cancellationToken);
}
internal virtual async Task<ContainerProperties> ResolveByNameAsync(
string apiVersion,
string resourceAddress,
bool forceRefesh,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
string resourceFullName = PathsHelper.GetCollectionPath(resourceAddress);
InternalCache cache = this.GetCache(apiVersion);
if (forceRefesh)
{
cache.collectionInfoByName.TryRemoveIfCompleted(resourceFullName);
}
return await cache.collectionInfoByName.GetAsync(
resourceFullName,
null,
async () =>
{
DateTime currentTime = DateTime.UtcNow;
ContainerProperties collection = await this.GetByNameAsync(apiVersion, resourceFullName, trace, clientSideRequestStatistics, cancellationToken);
cache.collectionInfoById.Set(collection.ResourceId, collection);
cache.collectionInfoByNameLastRefreshTime.AddOrUpdate(resourceFullName, currentTime,
(string currentKey, DateTime currentValue) => currentTime);
cache.collectionInfoByIdLastRefreshTime.AddOrUpdate(collection.ResourceId, currentTime,
(string currentKey, DateTime currentValue) => currentTime);
return collection;
},
cancellationToken);
}
private async Task RefreshAsync(DocumentServiceRequest request,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken)
{
System.Diagnostics.Debug.Assert(request.IsNameBased);
InternalCache cache = this.GetCache(request.Headers[HttpConstants.HttpHeaders.Version]);
string resourceFullName = PathsHelper.GetCollectionPath(request.ResourceAddress);
if (request.RequestContext.ResolvedCollectionRid != null)
{
// Here we will issue backend call only if cache wasn't already refreshed (if whatever is there corresponds to presiously resolved collection rid).
await cache.collectionInfoByName.GetAsync(
resourceFullName,
ContainerProperties.CreateWithResourceId(request.RequestContext.ResolvedCollectionRid),
async () =>
{
DateTime currentTime = DateTime.UtcNow;
ContainerProperties collection = await this.GetByNameAsync(request.Headers[HttpConstants.HttpHeaders.Version], resourceFullName, trace, clientSideRequestStatistics, cancellationToken);
cache.collectionInfoById.Set(collection.ResourceId, collection);
cache.collectionInfoByNameLastRefreshTime.AddOrUpdate(resourceFullName, currentTime,
(string currentKey, DateTime currentValue) => currentTime);
cache.collectionInfoByIdLastRefreshTime.AddOrUpdate(collection.ResourceId, currentTime,
(string currentKey, DateTime currentValue) => currentTime);
return collection;
},
cancellationToken);
}
else
{
// In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we
// need to refresh unconditionally.
this.Refresh(request.ResourceAddress, request.Headers[HttpConstants.HttpHeaders.Version]);
}
request.RequestContext.ResolvedCollectionRid = null;
}
/// <summary>
/// The function selects the right cache based on apiVersion.
/// </summary>
protected InternalCache GetCache(string apiVersion)
{
// Non Partitioned Migration Version. Need this to flight V3 SDK till we make this the Current Version
if (string.IsNullOrEmpty(apiVersion) || VersionUtility.IsLaterThan(apiVersion, HttpConstants.VersionDates.v2018_12_31))
{
return this.cacheByApiList[1];
}
return this.cacheByApiList[0];
}
internal static bool IsCollectionRid(string resourceId)
{
if (string.IsNullOrWhiteSpace(resourceId) ||
!ResourceId.TryParse(resourceId, out ResourceId resourceIdParsed))
{
return false;
}
string databaseRid = resourceIdParsed.DatabaseId.ToString();
if (StringComparer.Ordinal.Equals(databaseRid, resourceId))
{
return false;
}
string collectionRid = resourceIdParsed.DocumentCollectionId.ToString();
return StringComparer.Ordinal.Equals(collectionRid, resourceId);
}
private sealed class CollectionRidComparer : IEqualityComparer<ContainerProperties>
{
public bool Equals(ContainerProperties left, ContainerProperties right)
{
if (left == null && right == null)
{
return true;
}
if ((left == null) ^ (right == null))
{
return false;
}
return StringComparer.Ordinal.Compare(left.ResourceId, right.ResourceId) == 0;
}
public int GetHashCode(ContainerProperties collection)
{
return collection.ResourceId.GetHashCode();
}
}
}
}