Skip to content

Commit 639ffa7

Browse files
Merge branch 'master' into users/ntripician/encryption-pipeline-sdkref
2 parents 15bc0bc + b0f1abb commit 639ffa7

19 files changed

Lines changed: 1661 additions & 22 deletions

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Microsoft.Azure.Cosmos
66
{
77
using System;
8+
using System.IO;
89
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
910
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
1011
using static Microsoft.Azure.Cosmos.Container;
@@ -219,22 +220,73 @@ public ChangeFeedProcessorBuilder WithLeaseContainer(Container leaseContainer)
219220
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
220221
public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer()
221222
{
222-
if (this.leaseContainer != null)
223+
this.ValidateNoLeaseContainerConfigured();
224+
225+
if (string.IsNullOrEmpty(this.InstanceName))
223226
{
224-
throw new InvalidOperationException("The builder already defined a lease container.");
227+
this.InstanceName = ChangeFeedProcessorBuilder.InMemoryDefaultHostName;
225228
}
226229

227-
if (this.LeaseStoreManager != null)
230+
this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory();
231+
return this;
232+
}
233+
234+
/// <summary>
235+
/// Uses an in-memory container to maintain state of the leases, optionally initialized from a <see cref="MemoryStream"/>
236+
/// containing previously persisted lease state.
237+
///
238+
/// When the processor is stopped via <see cref="ChangeFeedProcessor.StopAsync"/>, the current lease state
239+
/// is automatically written back to the same <paramref name="leaseState"/> stream, allowing the state to be
240+
/// restored when creating a new processor instance.
241+
///
242+
/// Using an in-memory container restricts the scaling capability to just the instance running the current processor.
243+
/// </summary>
244+
/// <remarks>
245+
/// <para>
246+
/// <see cref="ChangeFeedProcessor.StopAsync"/> must not be invoked concurrently from multiple threads; the
247+
/// in-memory container expects a single shutdown call and does not synchronize concurrent writers to
248+
/// <paramref name="leaseState"/>.
249+
/// </para>
250+
/// </remarks>
251+
/// <param name="leaseState">
252+
/// A <see cref="MemoryStream"/> that serves as both input and output for lease state.
253+
/// If the stream contains data, leases are deserialized and used to initialize the container.
254+
/// When the processor stops, the current lease state is serialized back into this stream.
255+
/// The stream must be writable and expandable (for example, created via <c>new MemoryStream()</c>).
256+
/// A fixed-size stream such as <c>new MemoryStream(byte[])</c> will fail at shutdown if the
257+
/// serialized lease state exceeds the original buffer capacity.
258+
/// A <see cref="MemoryStream"/> is required (rather than the base <see cref="System.IO.Stream"/> type) so that
259+
/// the lease state can be trimmed via <see cref="MemoryStream.SetLength(long)"/> when a new snapshot is smaller
260+
/// than the previously persisted one. To integrate with <see cref="System.IO.Stream"/>-based persistence
261+
/// (e.g., a file or blob), call <see cref="MemoryStream.ToArray"/> after <see cref="ChangeFeedProcessor.StopAsync"/>
262+
/// to obtain the persisted bytes; create an expandable
263+
/// <see cref="MemoryStream"/> (<c>new MemoryStream()</c>), write the bytes into it, set
264+
/// <see cref="System.IO.Stream.Position"/> back to 0, and pass it to this method.
265+
/// </param>
266+
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
267+
/// <exception cref="ArgumentNullException">Thrown when <paramref name="leaseState"/> is null.</exception>
268+
public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(MemoryStream leaseState)
269+
{
270+
if (leaseState == null)
228271
{
229-
throw new InvalidOperationException("The builder already defined an in-memory lease container instance.");
272+
throw new ArgumentNullException(nameof(leaseState));
273+
}
274+
275+
this.ValidateNoLeaseContainerConfigured();
276+
277+
if (!leaseState.CanWrite)
278+
{
279+
throw new ArgumentException("The lease state stream must be writable so that state can be persisted on shutdown.", nameof(leaseState));
230280
}
231281

232282
if (string.IsNullOrEmpty(this.InstanceName))
233283
{
234284
this.InstanceName = ChangeFeedProcessorBuilder.InMemoryDefaultHostName;
235285
}
236286

237-
this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory();
287+
// Deserialization of lease state (if any) is handled inside the manager
288+
// so that serialization and deserialization are co-located in the same layer.
289+
this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory(leaseState);
238290
return this;
239291
}
240292

@@ -317,5 +369,18 @@ public ChangeFeedProcessor Build()
317369
this.isBuilt = true;
318370
return this.changeFeedProcessor;
319371
}
372+
373+
private void ValidateNoLeaseContainerConfigured()
374+
{
375+
if (this.leaseContainer != null)
376+
{
377+
throw new InvalidOperationException("The builder already defined a lease container.");
378+
}
379+
380+
if (this.LeaseStoreManager != null)
381+
{
382+
throw new InvalidOperationException("The builder already defined an in-memory lease container instance.");
383+
}
384+
}
320385
}
321386
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
namespace Microsoft.Azure.Cosmos.ChangeFeed
66
{
77
using System;
8-
using System.Collections.Generic;
98
using System.Threading.Tasks;
109
using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping;
1110
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
@@ -79,7 +78,14 @@ public override async Task StartAsync()
7978
public override async Task StopAsync()
8079
{
8180
DefaultTrace.TraceInformation("Stopping processor...");
81+
82+
// Persist in-memory lease state before stopping the partition manager so that
83+
// a subsequent partition-manager shutdown failure cannot prevent recovery of the
84+
// lease snapshot. No-op for Cosmos-backed leases.
85+
await this.documentServiceLeaseStoreManager.ShutdownAsync().ConfigureAwait(false);
86+
8287
await this.partitionManager.StopAsync().ConfigureAwait(false);
88+
8389
DefaultTrace.TraceInformation("Processor stopped.");
8490
}
8591

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerInMemory.cs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,32 @@
1-
//------------------------------------------------------------
1+
//------------------------------------------------------------
22
// Copyright (c) Microsoft Corporation. All rights reserved.
33
//------------------------------------------------------------
44

55
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
7+
using System;
78
using System.Collections.Concurrent;
89
using System.Collections.Generic;
10+
using System.IO;
911
using System.Linq;
1012
using System.Threading.Tasks;
1113

1214
internal sealed class DocumentServiceLeaseContainerInMemory : DocumentServiceLeaseContainer
1315
{
1416
private readonly ConcurrentDictionary<string, DocumentServiceLease> container;
17+
private readonly MemoryStream leaseStateStream;
1518

1619
public DocumentServiceLeaseContainerInMemory(ConcurrentDictionary<string, DocumentServiceLease> container)
20+
: this(container, leaseStateStream: null)
21+
{
22+
}
23+
24+
public DocumentServiceLeaseContainerInMemory(
25+
ConcurrentDictionary<string, DocumentServiceLease> container,
26+
MemoryStream leaseStateStream)
1727
{
1828
this.container = container;
29+
this.leaseStateStream = leaseStateStream;
1930
}
2031

2132
public override Task<IReadOnlyList<DocumentServiceLease>> GetAllLeasesAsync()
@@ -27,5 +38,46 @@ public override Task<IEnumerable<DocumentServiceLease>> GetOwnedLeasesAsync()
2738
{
2839
return Task.FromResult<IEnumerable<DocumentServiceLease>>(this.container.Values.AsEnumerable());
2940
}
41+
42+
/// <summary>
43+
/// Persists the current in-memory lease state into the user-supplied <see cref="MemoryStream"/>.
44+
/// </summary>
45+
/// <remarks>
46+
/// Must only be invoked from the single <c>ChangeFeedProcessor.StopAsync</c> call path;
47+
/// concurrent invocation is not supported and may corrupt the stream.
48+
/// </remarks>
49+
/// <returns>A completed task once the stream has been populated, or a no-op if no stream was supplied.</returns>
50+
public Task ShutdownAsync()
51+
{
52+
if (this.leaseStateStream == null)
53+
{
54+
return Task.CompletedTask;
55+
}
56+
57+
byte[] serializedBytes = InMemoryLeaseJsonFormat.Serialize(this.container.Values.ToList());
58+
59+
// Resize the target stream BEFORE writing. If the stream is not expandable and
60+
// cannot hold the new payload, SetLength throws NotSupportedException and the
61+
// user's stream is left untouched (no partial-write corruption). If SetLength
62+
// succeeds, the subsequent Write is guaranteed to fit.
63+
try
64+
{
65+
this.leaseStateStream.SetLength(serializedBytes.Length);
66+
}
67+
catch (NotSupportedException ex)
68+
{
69+
throw new InvalidOperationException(
70+
"Failed to persist lease state because the MemoryStream is not expandable and the serialized "
71+
+ "state exceeds its capacity. Use 'new MemoryStream()' or a MemoryStream with sufficient "
72+
+ "capacity instead of 'new MemoryStream(byte[])' to create a resizable stream.",
73+
ex);
74+
}
75+
76+
this.leaseStateStream.Position = 0;
77+
this.leaseStateStream.Write(serializedBytes, 0, serializedBytes.Length);
78+
this.leaseStateStream.Position = 0;
79+
80+
return Task.CompletedTask;
81+
}
3082
}
3183
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManager.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
7+
using System.Threading.Tasks;
8+
79
/// <summary>
810
/// The DocumentServiceLeaseStoreManager defines a way to perform operations with <see cref="DocumentServiceLease"/>.
911
/// </summary>
@@ -29,5 +31,13 @@ internal abstract class DocumentServiceLeaseStoreManager
2931
/// for particular monitoring collection and lease container prefix.
3032
/// </summary>
3133
public abstract DocumentServiceLeaseStore LeaseStore { get; }
34+
35+
/// <summary>
36+
/// Called when the processor is stopping. Implementations may override to perform
37+
/// cleanup or state persistence. The default implementation (for Cosmos-backed
38+
/// lease stores) is a no-op. Exceptions thrown from this method propagate to the
39+
/// caller of <see cref="ChangeFeedProcessor.StopAsync"/>.
40+
/// </summary>
41+
public abstract Task ShutdownAsync();
3242
}
3343
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerCosmos.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
77
using System;
8+
using System.Threading.Tasks;
89
using Microsoft.Azure.Cosmos;
910

1011
/// <summary>
@@ -80,5 +81,10 @@ internal DocumentServiceLeaseStoreManagerCosmos(
8081
public override DocumentServiceLeaseCheckpointer LeaseCheckpointer => this.leaseCheckpointer;
8182

8283
public override DocumentServiceLeaseContainer LeaseContainer => this.leaseContainer;
84+
85+
public override Task ShutdownAsync()
86+
{
87+
return Task.CompletedTask;
88+
}
8389
}
8490
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerInMemory.cs

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
77
using System;
88
using System.Collections.Concurrent;
9+
using System.Collections.Generic;
10+
using System.IO;
11+
using System.Threading.Tasks;
12+
using Newtonsoft.Json;
913

1014
/// <summary>
1115
/// Lease manager that is using In-Memory as lease storage.
@@ -15,15 +19,33 @@ internal sealed class DocumentServiceLeaseStoreManagerInMemory : DocumentService
1519
private readonly DocumentServiceLeaseStore leaseStore;
1620
private readonly DocumentServiceLeaseManager leaseManager;
1721
private readonly DocumentServiceLeaseCheckpointer leaseCheckpointer;
18-
private readonly DocumentServiceLeaseContainer leaseContainer;
22+
private readonly DocumentServiceLeaseContainerInMemory leaseContainer;
1923

2024
public DocumentServiceLeaseStoreManagerInMemory()
2125
: this(new ConcurrentDictionary<string, DocumentServiceLease>())
2226
{
2327
}
2428

29+
/// <summary>
30+
/// Initializes a new instance from a <see cref="MemoryStream"/> containing
31+
/// previously persisted lease state. Deserialization is co-located here so
32+
/// that the manager owns the lease JSON format for both read (restore) and
33+
/// write (ShutdownAsync → persist).
34+
/// </summary>
35+
internal DocumentServiceLeaseStoreManagerInMemory(MemoryStream leaseStateStream)
36+
: this(DocumentServiceLeaseStoreManagerInMemory.DeserializeLeaseState(leaseStateStream), leaseStateStream)
37+
{
38+
}
39+
2540
internal DocumentServiceLeaseStoreManagerInMemory(ConcurrentDictionary<string, DocumentServiceLease> container)
26-
: this(new DocumentServiceLeaseUpdaterInMemory(container), container)
41+
: this(new DocumentServiceLeaseUpdaterInMemory(container), container, leaseStateStream: null)
42+
{
43+
}
44+
45+
internal DocumentServiceLeaseStoreManagerInMemory(
46+
ConcurrentDictionary<string, DocumentServiceLease> container,
47+
MemoryStream leaseStateStream)
48+
: this(new DocumentServiceLeaseUpdaterInMemory(container), container, leaseStateStream)
2749
{
2850
}
2951

@@ -35,7 +57,8 @@ internal DocumentServiceLeaseStoreManagerInMemory(ConcurrentDictionary<string, D
3557
/// </remarks>
3658
internal DocumentServiceLeaseStoreManagerInMemory(
3759
DocumentServiceLeaseUpdater leaseUpdater,
38-
ConcurrentDictionary<string, DocumentServiceLease> container) // For testing purposes only.
60+
ConcurrentDictionary<string, DocumentServiceLease> container,
61+
MemoryStream leaseStateStream = null)
3962
{
4063
if (leaseUpdater == null) throw new ArgumentException(nameof(leaseUpdater));
4164

@@ -47,7 +70,7 @@ internal DocumentServiceLeaseStoreManagerInMemory(
4770
leaseUpdater,
4871
new PartitionedByIdCollectionRequestOptionsFactory());
4972

50-
this.leaseContainer = new DocumentServiceLeaseContainerInMemory(container);
73+
this.leaseContainer = new DocumentServiceLeaseContainerInMemory(container, leaseStateStream);
5174
}
5275

5376
public override DocumentServiceLeaseStore LeaseStore => this.leaseStore;
@@ -57,5 +80,61 @@ internal DocumentServiceLeaseStoreManagerInMemory(
5780
public override DocumentServiceLeaseCheckpointer LeaseCheckpointer => this.leaseCheckpointer;
5881

5982
public override DocumentServiceLeaseContainer LeaseContainer => this.leaseContainer;
83+
84+
public override Task ShutdownAsync()
85+
{
86+
return this.leaseContainer.ShutdownAsync();
87+
}
88+
89+
/// <summary>
90+
/// Deserializes lease state from a <see cref="MemoryStream"/> into a dictionary.
91+
/// This is the counterpart of the serialization in
92+
/// <see cref="DocumentServiceLeaseContainerInMemory.ShutdownAsync"/>.
93+
/// </summary>
94+
private static ConcurrentDictionary<string, DocumentServiceLease> DeserializeLeaseState(
95+
MemoryStream leaseStateStream)
96+
{
97+
ConcurrentDictionary<string, DocumentServiceLease> container =
98+
new ConcurrentDictionary<string, DocumentServiceLease>();
99+
100+
if (leaseStateStream == null || leaseStateStream.Length == 0)
101+
{
102+
return container;
103+
}
104+
105+
List<DocumentServiceLease> leases;
106+
try
107+
{
108+
leases = InMemoryLeaseJsonFormat.Deserialize(leaseStateStream);
109+
}
110+
catch (JsonException ex)
111+
{
112+
throw new InvalidOperationException(
113+
"Failed to deserialize lease state from the provided MemoryStream. "
114+
+ "Ensure the stream contains valid lease state JSON previously persisted by the ChangeFeedProcessor.",
115+
ex);
116+
}
117+
118+
foreach (DocumentServiceLease lease in leases)
119+
{
120+
if (string.IsNullOrEmpty(lease?.Id))
121+
{
122+
throw new InvalidOperationException("Lease state contains a null or invalid lease entry.");
123+
}
124+
125+
if (!container.TryAdd(lease.Id, lease))
126+
{
127+
throw new InvalidOperationException(
128+
$"Lease state contains duplicate lease id '{lease.Id}'. The persisted stream is corrupt.");
129+
}
130+
}
131+
132+
// Leave the caller's stream positioned at the start so it is symmetric with
133+
// the state produced by ShutdownAsync and the stream remains immediately
134+
// re-readable by the caller (e.g., to persist it elsewhere).
135+
leaseStateStream.Position = 0;
136+
137+
return container;
138+
}
60139
}
61140
}

0 commit comments

Comments
 (0)