Skip to content

Commit 15666a2

Browse files
Merge branch 'master' into users/nalutripician/issue-5095-contacted-regions-hub-fallback
2 parents dbf081c + dd06aae commit 15666a2

30 files changed

Lines changed: 5590 additions & 32 deletions

Directory.Build.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
22
<PropertyGroup>
3-
<ClientOfficialVersion>3.58.0</ClientOfficialVersion>
4-
<ClientPreviewVersion>3.59.0</ClientPreviewVersion>
3+
<ClientOfficialVersion>3.59.0</ClientOfficialVersion>
4+
<ClientPreviewVersion>3.60.0</ClientPreviewVersion>
55
<ClientPreviewSuffixVersion>preview.0</ClientPreviewSuffixVersion>
66
<DirectVersion>3.42.4</DirectVersion>
77
<FaultInjectionVersion>1.0.0</FaultInjectionVersion>

Microsoft.Azure.Cosmos.Encryption.Custom/src/Microsoft.Azure.Cosmos.Encryption.Custom.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@
6868
</PropertyGroup>
6969

7070
<PropertyGroup Condition=" '$(SdkProjectRef)' == 'True' ">
71+
<DefineSdkProjectRefSymbol Condition=" '$(DefineSdkProjectRefSymbol)' == '' ">true</DefineSdkProjectRefSymbol>
72+
</PropertyGroup>
73+
74+
<PropertyGroup Condition=" '$(DefineSdkProjectRefSymbol)' == 'true' ">
7175
<DefineConstants>$(DefineConstants);SDKPROJECTREF</DefineConstants>
7276
</PropertyGroup>
7377
</Project>

Microsoft.Azure.Cosmos.Encryption/src/Microsoft.Azure.Cosmos.Encryption.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@
7474
</PropertyGroup>
7575

7676
<PropertyGroup Condition=" '$(SdkProjectRef)' == 'True' ">
77+
<DefineSdkProjectRefSymbol Condition=" '$(DefineSdkProjectRefSymbol)' == '' ">true</DefineSdkProjectRefSymbol>
78+
</PropertyGroup>
79+
80+
<PropertyGroup Condition=" '$(DefineSdkProjectRefSymbol)' == 'true' ">
7781
<DefineConstants>$(DefineConstants);SDKPROJECTREF</DefineConstants>
7882
</PropertyGroup>
7983
</Project>

Microsoft.Azure.Cosmos/contracts/API_3.59.0.txt

Lines changed: 1778 additions & 0 deletions
Large diffs are not rendered by default.

Microsoft.Azure.Cosmos/contracts/API_3.60.0-preview.0.txt

Lines changed: 1948 additions & 0 deletions
Large diffs are not rendered by default.

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Microsoft.Azure.Cosmos
66
{
77
using System;
8+
using System.IO;
89
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
910
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
1011
using static Microsoft.Azure.Cosmos.Container;
@@ -219,22 +220,73 @@ public ChangeFeedProcessorBuilder WithLeaseContainer(Container leaseContainer)
219220
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
220221
public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer()
221222
{
222-
if (this.leaseContainer != null)
223+
this.ValidateNoLeaseContainerConfigured();
224+
225+
if (string.IsNullOrEmpty(this.InstanceName))
223226
{
224-
throw new InvalidOperationException("The builder already defined a lease container.");
227+
this.InstanceName = ChangeFeedProcessorBuilder.InMemoryDefaultHostName;
225228
}
226229

227-
if (this.LeaseStoreManager != null)
230+
this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory();
231+
return this;
232+
}
233+
234+
/// <summary>
235+
/// Uses an in-memory container to maintain state of the leases, optionally initialized from a <see cref="MemoryStream"/>
236+
/// containing previously persisted lease state.
237+
///
238+
/// When the processor is stopped via <see cref="ChangeFeedProcessor.StopAsync"/>, the current lease state
239+
/// is automatically written back to the same <paramref name="leaseState"/> stream, allowing the state to be
240+
/// restored when creating a new processor instance.
241+
///
242+
/// Using an in-memory container restricts the scaling capability to just the instance running the current processor.
243+
/// </summary>
244+
/// <remarks>
245+
/// <para>
246+
/// <see cref="ChangeFeedProcessor.StopAsync"/> must not be invoked concurrently from multiple threads; the
247+
/// in-memory container expects a single shutdown call and does not synchronize concurrent writers to
248+
/// <paramref name="leaseState"/>.
249+
/// </para>
250+
/// </remarks>
251+
/// <param name="leaseState">
252+
/// A <see cref="MemoryStream"/> that serves as both input and output for lease state.
253+
/// If the stream contains data, leases are deserialized and used to initialize the container.
254+
/// When the processor stops, the current lease state is serialized back into this stream.
255+
/// The stream must be writable and expandable (for example, created via <c>new MemoryStream()</c>).
256+
/// A fixed-size stream such as <c>new MemoryStream(byte[])</c> will fail at shutdown if the
257+
/// serialized lease state exceeds the original buffer capacity.
258+
/// A <see cref="MemoryStream"/> is required (rather than the base <see cref="System.IO.Stream"/> type) so that
259+
/// the lease state can be trimmed via <see cref="MemoryStream.SetLength(long)"/> when a new snapshot is smaller
260+
/// than the previously persisted one. To integrate with <see cref="System.IO.Stream"/>-based persistence
261+
/// (e.g., a file or blob), call <see cref="MemoryStream.ToArray"/> after <see cref="ChangeFeedProcessor.StopAsync"/>
262+
/// to obtain the persisted bytes; create an expandable
263+
/// <see cref="MemoryStream"/> (<c>new MemoryStream()</c>), write the bytes into it, set
264+
/// <see cref="System.IO.Stream.Position"/> back to 0, and pass it to this method.
265+
/// </param>
266+
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
267+
/// <exception cref="ArgumentNullException">Thrown when <paramref name="leaseState"/> is null.</exception>
268+
public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(MemoryStream leaseState)
269+
{
270+
if (leaseState == null)
228271
{
229-
throw new InvalidOperationException("The builder already defined an in-memory lease container instance.");
272+
throw new ArgumentNullException(nameof(leaseState));
273+
}
274+
275+
this.ValidateNoLeaseContainerConfigured();
276+
277+
if (!leaseState.CanWrite)
278+
{
279+
throw new ArgumentException("The lease state stream must be writable so that state can be persisted on shutdown.", nameof(leaseState));
230280
}
231281

232282
if (string.IsNullOrEmpty(this.InstanceName))
233283
{
234284
this.InstanceName = ChangeFeedProcessorBuilder.InMemoryDefaultHostName;
235285
}
236286

237-
this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory();
287+
// Deserialization of lease state (if any) is handled inside the manager
288+
// so that serialization and deserialization are co-located in the same layer.
289+
this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory(leaseState);
238290
return this;
239291
}
240292

@@ -317,5 +369,18 @@ public ChangeFeedProcessor Build()
317369
this.isBuilt = true;
318370
return this.changeFeedProcessor;
319371
}
372+
373+
private void ValidateNoLeaseContainerConfigured()
374+
{
375+
if (this.leaseContainer != null)
376+
{
377+
throw new InvalidOperationException("The builder already defined a lease container.");
378+
}
379+
380+
if (this.LeaseStoreManager != null)
381+
{
382+
throw new InvalidOperationException("The builder already defined an in-memory lease container instance.");
383+
}
384+
}
320385
}
321386
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
namespace Microsoft.Azure.Cosmos.ChangeFeed
66
{
77
using System;
8-
using System.Collections.Generic;
98
using System.Threading.Tasks;
109
using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping;
1110
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
@@ -79,7 +78,14 @@ public override async Task StartAsync()
7978
public override async Task StopAsync()
8079
{
8180
DefaultTrace.TraceInformation("Stopping processor...");
81+
82+
// Persist in-memory lease state before stopping the partition manager so that
83+
// a subsequent partition-manager shutdown failure cannot prevent recovery of the
84+
// lease snapshot. No-op for Cosmos-backed leases.
85+
await this.documentServiceLeaseStoreManager.ShutdownAsync().ConfigureAwait(false);
86+
8287
await this.partitionManager.StopAsync().ConfigureAwait(false);
88+
8389
DefaultTrace.TraceInformation("Processor stopped.");
8490
}
8591

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerInMemory.cs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,32 @@
1-
//------------------------------------------------------------
1+
//------------------------------------------------------------
22
// Copyright (c) Microsoft Corporation. All rights reserved.
33
//------------------------------------------------------------
44

55
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
7+
using System;
78
using System.Collections.Concurrent;
89
using System.Collections.Generic;
10+
using System.IO;
911
using System.Linq;
1012
using System.Threading.Tasks;
1113

1214
internal sealed class DocumentServiceLeaseContainerInMemory : DocumentServiceLeaseContainer
1315
{
1416
private readonly ConcurrentDictionary<string, DocumentServiceLease> container;
17+
private readonly MemoryStream leaseStateStream;
1518

1619
public DocumentServiceLeaseContainerInMemory(ConcurrentDictionary<string, DocumentServiceLease> container)
20+
: this(container, leaseStateStream: null)
21+
{
22+
}
23+
24+
public DocumentServiceLeaseContainerInMemory(
25+
ConcurrentDictionary<string, DocumentServiceLease> container,
26+
MemoryStream leaseStateStream)
1727
{
1828
this.container = container;
29+
this.leaseStateStream = leaseStateStream;
1930
}
2031

2132
public override Task<IReadOnlyList<DocumentServiceLease>> GetAllLeasesAsync()
@@ -27,5 +38,46 @@ public override Task<IEnumerable<DocumentServiceLease>> GetOwnedLeasesAsync()
2738
{
2839
return Task.FromResult<IEnumerable<DocumentServiceLease>>(this.container.Values.AsEnumerable());
2940
}
41+
42+
/// <summary>
43+
/// Persists the current in-memory lease state into the user-supplied <see cref="MemoryStream"/>.
44+
/// </summary>
45+
/// <remarks>
46+
/// Must only be invoked from the single <c>ChangeFeedProcessor.StopAsync</c> call path;
47+
/// concurrent invocation is not supported and may corrupt the stream.
48+
/// </remarks>
49+
/// <returns>A completed task once the stream has been populated, or a no-op if no stream was supplied.</returns>
50+
public Task ShutdownAsync()
51+
{
52+
if (this.leaseStateStream == null)
53+
{
54+
return Task.CompletedTask;
55+
}
56+
57+
byte[] serializedBytes = InMemoryLeaseJsonFormat.Serialize(this.container.Values.ToList());
58+
59+
// Resize the target stream BEFORE writing. If the stream is not expandable and
60+
// cannot hold the new payload, SetLength throws NotSupportedException and the
61+
// user's stream is left untouched (no partial-write corruption). If SetLength
62+
// succeeds, the subsequent Write is guaranteed to fit.
63+
try
64+
{
65+
this.leaseStateStream.SetLength(serializedBytes.Length);
66+
}
67+
catch (NotSupportedException ex)
68+
{
69+
throw new InvalidOperationException(
70+
"Failed to persist lease state because the MemoryStream is not expandable and the serialized "
71+
+ "state exceeds its capacity. Use 'new MemoryStream()' or a MemoryStream with sufficient "
72+
+ "capacity instead of 'new MemoryStream(byte[])' to create a resizable stream.",
73+
ex);
74+
}
75+
76+
this.leaseStateStream.Position = 0;
77+
this.leaseStateStream.Write(serializedBytes, 0, serializedBytes.Length);
78+
this.leaseStateStream.Position = 0;
79+
80+
return Task.CompletedTask;
81+
}
3082
}
3183
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManager.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
7+
using System.Threading.Tasks;
8+
79
/// <summary>
810
/// The DocumentServiceLeaseStoreManager defines a way to perform operations with <see cref="DocumentServiceLease"/>.
911
/// </summary>
@@ -29,5 +31,13 @@ internal abstract class DocumentServiceLeaseStoreManager
2931
/// for particular monitoring collection and lease container prefix.
3032
/// </summary>
3133
public abstract DocumentServiceLeaseStore LeaseStore { get; }
34+
35+
/// <summary>
36+
/// Called when the processor is stopping. Implementations may override to perform
37+
/// cleanup or state persistence. The default implementation (for Cosmos-backed
38+
/// lease stores) is a no-op. Exceptions thrown from this method propagate to the
39+
/// caller of <see cref="ChangeFeedProcessor.StopAsync"/>.
40+
/// </summary>
41+
public abstract Task ShutdownAsync();
3242
}
3343
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerCosmos.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
77
using System;
8+
using System.Threading.Tasks;
89
using Microsoft.Azure.Cosmos;
910

1011
/// <summary>
@@ -80,5 +81,10 @@ internal DocumentServiceLeaseStoreManagerCosmos(
8081
public override DocumentServiceLeaseCheckpointer LeaseCheckpointer => this.leaseCheckpointer;
8182

8283
public override DocumentServiceLeaseContainer LeaseContainer => this.leaseContainer;
84+
85+
public override Task ShutdownAsync()
86+
{
87+
return Task.CompletedTask;
88+
}
8389
}
8490
}

0 commit comments

Comments
 (0)