Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 82 additions & 8 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ internal class GatewayAddressCache : IAddressCache, IDisposable
// value for this timeout is 45 minutes at the moment.
private static readonly TimeSpan WarmupCacheAndOpenConnectionTimeout = TimeSpan.FromMinutes(45);

/// <summary>
/// Opt-in test-only invariant: when <c>true</c> and a forced address-cache
/// refresh is emitted with an <see cref="RefreshReason.Unspecified"/> reason,
/// <see cref="GetServerAddressesViaGatewayAsync"/> and
/// <see cref="GetMasterAddressesViaGatewayAsync"/> throw
/// <see cref="InvalidOperationException"/>. Tests set this to <c>true</c> via
/// <c>[AssemblyInitialize]</c> so any future force-refresh call-site that
/// forgets to tag its cause is caught automatically. Default <c>false</c>:
/// zero production overhead.
/// </summary>
internal static bool ValidateRefreshReasonPresence;

private readonly Uri serviceEndpoint;
private readonly Uri addressEndpoint;

Expand Down Expand Up @@ -237,6 +249,10 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
if (forceRefreshDueToSuboptimalPartitionReplicaSet && this.suboptimalServerPartitionTimestamps.TryUpdate(partitionKeyRangeIdentity, DateTime.MaxValue, suboptimalServerPartitionTimestamp))
{
forceRefreshPartitionAddresses = true;
if (request.RequestContext.RefreshReason == RefreshReason.Unspecified)
{
request.RequestContext.RefreshReason = RefreshReason.InsufficientReplicasSuboptimalTimer;
}
}
}

Expand Down Expand Up @@ -330,7 +346,8 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
cachedAddresses: currentCachedValue,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true));
forceRefresh: true,
explicitReason: RefreshReason.ReplicaHealthUnhealthyLongLived));
}
else
{
Expand Down Expand Up @@ -589,13 +606,20 @@ private async Task<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>

int targetReplicaSetSize = this.serviceConfigReader.SystemReplicationPolicy.MaxReplicaSetSize;

forceRefresh = forceRefresh ||
(masterAddressAndRange != null &&
bool masterSuboptimalTriggered =
masterAddressAndRange != null &&
masterAddressAndRange.Item2.AllAddresses.Count() < targetReplicaSetSize &&
DateTime.UtcNow.Subtract(this.suboptimalMasterPartitionTimestamp) > TimeSpan.FromSeconds(this.suboptimalPartitionForceRefreshIntervalInSeconds));
DateTime.UtcNow.Subtract(this.suboptimalMasterPartitionTimestamp) > TimeSpan.FromSeconds(this.suboptimalPartitionForceRefreshIntervalInSeconds);

forceRefresh = forceRefresh || masterSuboptimalTriggered;

if (forceRefresh || request.ForceCollectionRoutingMapRefresh || this.masterPartitionAddressCache == null)
{
if (masterSuboptimalTriggered && request.RequestContext.RefreshReason == RefreshReason.Unspecified)
{
request.RequestContext.RefreshReason = RefreshReason.InsufficientReplicasSuboptimalTimer;
}

string entryUrl = PathsHelper.GeneratePath(
ResourceType.Database,
string.Empty,
Expand Down Expand Up @@ -640,10 +664,11 @@ private async Task<PartitionAddressInformation> GetAddressesForRangeIdAsync(
PartitionAddressInformation cachedAddresses,
string collectionRid,
string partitionKeyRangeId,
bool forceRefresh)
bool forceRefresh,
RefreshReason explicitReason = RefreshReason.Unspecified)
{
using (DocumentServiceResponse response =
await this.GetServerAddressesViaGatewayAsync(request, collectionRid, new[] { partitionKeyRangeId }, forceRefresh))
await this.GetServerAddressesViaGatewayAsync(request, collectionRid, new[] { partitionKeyRangeId }, forceRefresh, explicitReason))
{
FeedResource<Address> addressFeed = response.GetResource<FeedResource<Address>>();

Expand Down Expand Up @@ -706,13 +731,51 @@ await this.GetServerAddressesViaGatewayAsync(request, collectionRid, new[] { par
}
}

/// <summary>
/// Resolves the effective <see cref="RefreshReason"/> for a forced
/// address-cache egress and writes the <c>x-ms-cosmos-refresh-reason</c>
/// header. Called only when <c>forceRefresh=true</c>.
///
/// Precedence: <paramref name="explicitReason"/> (if non-Unspecified) wins
/// over <c>request.RequestContext.RefreshReason</c>. When both are
/// Unspecified and <see cref="ValidateRefreshReasonPresence"/> is enabled,
/// throws <see cref="InvalidOperationException"/> so that any untagged
/// force-refresh site is caught in tests.
/// </summary>
internal static void EmitRefreshReasonHeader(
INameValueCollection headers,
DocumentServiceRequest request,
RefreshReason explicitReason,
string callerName)
{
RefreshReason effective = explicitReason != RefreshReason.Unspecified
? explicitReason
: request?.RequestContext?.RefreshReason ?? RefreshReason.Unspecified;

if (effective == RefreshReason.Unspecified)
{
if (GatewayAddressCache.ValidateRefreshReasonPresence)
{
throw new InvalidOperationException(
$"{callerName} was invoked with forceRefresh=true but no RefreshReason was set. " +
$"Every forced address-cache refresh must be tagged via DocumentServiceRequestContext.RefreshReason " +
$"or an explicitReason argument so the gateway can attribute the cause.");
}

return;
}

headers.Set(HttpConstants.HttpHeaders.CosmosRefreshReason, effective.ToHeaderValue());
}

private async Task<DocumentServiceResponse> GetMasterAddressesViaGatewayAsync(
DocumentServiceRequest request,
ResourceType resourceType,
string resourceAddress,
string entryUrl,
bool forceRefresh,
bool useMasterCollectionResolver)
bool useMasterCollectionResolver,
RefreshReason explicitReason = RefreshReason.Unspecified)
{
INameValueCollection addressQuery = new RequestNameValueCollection
{
Expand All @@ -723,6 +786,11 @@ private async Task<DocumentServiceResponse> GetMasterAddressesViaGatewayAsync(
if (forceRefresh)
{
headers.Set(HttpConstants.HttpHeaders.ForceRefresh, bool.TrueString);
GatewayAddressCache.EmitRefreshReasonHeader(
headers: headers,
request: request,
explicitReason: explicitReason,
callerName: nameof(GetMasterAddressesViaGatewayAsync));
}

if (useMasterCollectionResolver)
Expand Down Expand Up @@ -799,7 +867,8 @@ private async Task<DocumentServiceResponse> GetServerAddressesViaGatewayAsync(
DocumentServiceRequest request,
string collectionRid,
IEnumerable<string> partitionKeyRangeIds,
bool forceRefresh)
bool forceRefresh,
RefreshReason explicitReason = RefreshReason.Unspecified)
{
string entryUrl = PathsHelper.GeneratePath(ResourceType.Document, collectionRid, true);

Expand All @@ -812,6 +881,11 @@ private async Task<DocumentServiceResponse> GetServerAddressesViaGatewayAsync(
if (forceRefresh)
{
headers.Set(HttpConstants.HttpHeaders.ForceRefresh, bool.TrueString);
GatewayAddressCache.EmitRefreshReasonHeader(
headers: headers,
request: request,
explicitReason: explicitReason,
callerName: nameof(GetServerAddressesViaGatewayAsync));
}

if (request != null && request.ForceCollectionRoutingMapRefresh)
Expand Down
70 changes: 70 additions & 0 deletions Microsoft.Azure.Cosmos/src/Routing/RefreshReason.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Routing
{
/// <summary>
/// A design-time-bounded, closed enumeration of reasons the SDK performs a
/// forced cache refresh. The value is emitted on the wire as the
/// <c>x-ms-cosmos-refresh-reason</c> header so the service can attribute
/// each forced refresh to its true cause.
///
/// The enum is deliberately GENERIC (not address-cache specific). Existing
/// values correspond to address-cache force-refresh paths. Future values
/// may be added for other forced-refresh egress paths (partition-key-range
/// cache ChangeFeed forward, collection-routing-map refresh, etc.).
///
/// Naming convention for wire values: two dot-separated segments,
/// <c>&lt;cache_or_surface&gt;.&lt;subcause&gt;</c>. Never three segments.
///
/// When adding a new value:
/// - Give it an explicit integer value. Do not reuse retired values.
/// - Add an entry in <see cref="RefreshReasonExtensions.WireValues"/>.
/// - If it is driven by a <c>TransportErrorCode</c>, wire it into
/// <see cref="RefreshReasonExtensions.FromTransportErrorCode"/>.
/// </summary>
internal enum RefreshReason
{
// Sentinel. Must never appear on the wire in production once all call
// sites are tagged. The opt-in validator enforces this in tests.
Unspecified = 0,

// ---- Group A: real 410 from the server (no transport synthesis) ----
GoneServer = 1,

// ---- Group B: Gone with server-provided substatus (routing-topology changes) ----
// NOTE: these typically drive a PK-range / collection-cache refresh
// rather than an address-cache refresh; they are pre-positioned here
// because the enum is generic and will tag PK-range egress too.
GoneCompletingSplit = 2,
GoneCompletingPartitionMigration = 3,
GoneNameCacheStale = 4,
GonePartitionKeyRangeGone = 5,

// ---- Group C: Gone synthesized by the SDK's transport layer ----
// Pairs of (*Failed, *Timeout) are intentionally collapsed because the
// gateway's reaction is the same.
GoneUnknown = 6, // TransportErrorCode.Unknown, ChannelOpenFailed, ChannelOpenTimeout, RequestTimeout
GoneDnsResolution = 7, // DnsResolutionFailed, DnsResolutionTimeout
GoneConnect = 8, // ConnectFailed, ConnectTimeout
GoneSslNegotiation = 9, // SslNegotiationFailed, SslNegotiationTimeout
GoneNegotiationTimeout = 10, // TransportNegotiationTimeout
GoneChannelMultiplexerClosed = 11, // ChannelMultiplexerClosed
GoneSend = 12, // SendFailed, SendTimeout
GoneSendLockTimeout = 13, // SendLockTimeout (client-side lock contention)
GoneReceive = 14, // ReceiveFailed, ReceiveTimeout
GoneReceiveStreamClosed = 15, // ReceiveStreamClosed (server clean close while awaiting response)
GoneConnectionBroken = 16, // ConnectionBroken
GoneChannelWaitingToOpenTimeout = 17, // ChannelWaitingToOpenTimeout (slot-wait saturation)
GoneWriteNotSent = 18, // DocumentServiceRequest.UserRequestSent == false on write-path Gone synthesis

// ---- Group D: forced refresh NOT driven by a Gone ----
InsufficientReplicasQuorum = 19, // StoreReader decided replica-set too small for consistency
InsufficientReplicasSuboptimalTimer = 20, // 10-minute suboptimal-replica-set timer
ReplicaHealthUnhealthyLongLived = 21, // on-demand revalidation of a URI unhealthy >= 1 minute
ConnectionEventServerClosed = 22, // Dispatcher.RaiseConnectionEvent -> ReadEof / ReadFailure

// Insert new values above this comment with the next integer value.
}
}
Loading
Loading