Skip to content

Commit 37b450d

Browse files
committed
Address comments
1 parent 0f8d16e commit 37b450d

8 files changed

Lines changed: 283 additions & 33 deletions

File tree

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,9 @@ public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer()
248248
/// A <see cref="MemoryStream"/> that serves as both input and output for lease state.
249249
/// If the stream contains data, leases are deserialized and used to initialize the container.
250250
/// When the processor stops, the current lease state is serialized back into this stream.
251-
/// The stream must be writable and resizable.
251+
/// The stream must be writable and expandable (for example, created via <c>new MemoryStream()</c>).
252+
/// A fixed-size stream such as <c>new MemoryStream(byte[])</c> will fail at shutdown if the
253+
/// serialized lease state exceeds the original buffer capacity.
252254
/// </param>
253255
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
254256
/// <exception cref="ArgumentNullException">Thrown when <paramref name="leaseState"/> is null.</exception>
@@ -259,6 +261,11 @@ public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(MemoryStrea
259261
throw new ArgumentNullException(nameof(leaseState));
260262
}
261263

264+
if (!leaseState.CanWrite)
265+
{
266+
throw new ArgumentException("The lease state stream must be writable so that state can be persisted on shutdown.", nameof(leaseState));
267+
}
268+
262269
this.ValidateNoLeaseContainerConfigured();
263270

264271
if (string.IsNullOrEmpty(this.InstanceName))
@@ -273,23 +280,33 @@ public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(MemoryStrea
273280
{
274281
leaseState.Position = 0;
275282

276-
using (StreamReader sr = new StreamReader(leaseState, encoding: System.Text.Encoding.UTF8, detectEncodingFromByteOrderMarks: true, bufferSize: 1024, leaveOpen: true))
277-
using (JsonTextReader jsonReader = new JsonTextReader(sr))
283+
List<DocumentServiceLease> leases;
284+
try
278285
{
279-
JsonSerializer serializer = JsonSerializer.Create();
280-
List<DocumentServiceLease> leases = serializer.Deserialize<List<DocumentServiceLease>>(jsonReader);
286+
using (StreamReader sr = new StreamReader(leaseState, encoding: System.Text.Encoding.UTF8, detectEncodingFromByteOrderMarks: true, bufferSize: 1024, leaveOpen: true))
287+
using (JsonTextReader jsonReader = new JsonTextReader(sr))
288+
{
289+
JsonSerializer serializer = JsonSerializer.Create();
290+
leases = serializer.Deserialize<List<DocumentServiceLease>>(jsonReader);
291+
}
292+
}
293+
catch (JsonException ex)
294+
{
295+
throw new InvalidOperationException(
296+
"Failed to deserialize lease state from the provided MemoryStream. Ensure the stream contains valid lease state JSON previously persisted by the ChangeFeedProcessor.",
297+
ex);
298+
}
281299

282-
if (leases != null)
300+
if (leases != null)
301+
{
302+
foreach (DocumentServiceLease lease in leases)
283303
{
284-
foreach (DocumentServiceLease lease in leases)
304+
if (string.IsNullOrEmpty(lease?.Id))
285305
{
286-
if (lease?.Id == null)
287-
{
288-
throw new InvalidOperationException("Lease state contains a null or invalid lease entry.");
289-
}
290-
291-
container[lease.Id] = lease;
306+
throw new InvalidOperationException("Lease state contains a null or invalid lease entry.");
292307
}
308+
309+
container[lease.Id] = lease;
293310
}
294311
}
295312
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,30 @@ public override async Task StartAsync()
7878
public override async Task StopAsync()
7979
{
8080
DefaultTrace.TraceInformation("Stopping processor...");
81-
await this.partitionManager.StopAsync().ConfigureAwait(false);
81+
try
82+
{
83+
await this.partitionManager.StopAsync().ConfigureAwait(false);
84+
}
85+
finally
86+
{
87+
// Persist in-memory lease state even if partitionManager.StopAsync() throws,
88+
// so that progress is not silently lost.
89+
try
90+
{
91+
await this.documentServiceLeaseStoreManager
92+
.LeaseContainer
93+
.ShutdownAsync()
94+
.ConfigureAwait(false);
95+
}
96+
catch (Exception shutdownException)
97+
{
98+
Cosmos.Extensions.TraceException(shutdownException);
99+
DefaultTrace.TraceWarning(
100+
"Failed to persist in-memory lease state during shutdown.");
101+
}
102+
}
82103

83-
// Processing is fully stopped at this point. ShutdownAsync persists
84-
// in-memory lease state and may still throw if the stream is unavailable.
85104
DefaultTrace.TraceInformation("Processor stopped.");
86-
87-
await this.documentServiceLeaseStoreManager
88-
.LeaseContainer
89-
.ShutdownAsync()
90-
.ConfigureAwait(false);
91105
}
92106

93107
private async Task InitializeAsync()

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainer.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,8 @@ internal abstract class DocumentServiceLeaseContainer
2626

2727
/// <summary>
2828
/// Called when the processor is stopping. Allows implementations to perform
29-
/// cleanup or state persistence. Default is a no-op.
29+
/// cleanup or state persistence.
3030
/// </summary>
31-
public virtual Task ShutdownAsync()
32-
{
33-
return Task.CompletedTask;
34-
}
31+
public abstract Task ShutdownAsync();
3532
}
3633
}

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerCosmos.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public override async Task<IEnumerable<DocumentServiceLease>> GetOwnedLeasesAsyn
4242
return ownedLeases;
4343
}
4444

45+
public override Task ShutdownAsync()
46+
{
47+
return Task.CompletedTask;
48+
}
49+
4550
private async Task<IReadOnlyList<DocumentServiceLease>> ListDocumentsAsync(string prefix)
4651
{
4752
if (string.IsNullOrEmpty(prefix))

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerInMemory.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
7+
using System;
78
using System.Collections.Concurrent;
89
using System.Collections.Generic;
910
using System.IO;
@@ -52,6 +53,7 @@ public override Task ShutdownAsync()
5253
}
5354

5455
// Serialize to a temporary stream first to avoid data loss if serialization fails
56+
byte[] serializedBytes;
5557
using (MemoryStream temp = new MemoryStream())
5658
{
5759
using (StreamWriter writer = new StreamWriter(temp, encoding: System.Text.Encoding.UTF8, bufferSize: 1024, leaveOpen: true))
@@ -61,12 +63,26 @@ public override Task ShutdownAsync()
6163
serializer.Serialize(jsonWriter, this.container.Values.ToList());
6264
}
6365

64-
this.leaseStateStream.SetLength(0);
65-
temp.Position = 0;
66-
temp.CopyTo(this.leaseStateStream);
66+
serializedBytes = temp.ToArray();
6767
}
6868

69-
this.leaseStateStream.Position = 0;
69+
// Write serialized state to the user's stream. Write first, then trim
70+
// excess via SetLength so that a failed Write leaves prior data intact
71+
// rather than an empty stream.
72+
try
73+
{
74+
this.leaseStateStream.Position = 0;
75+
this.leaseStateStream.Write(serializedBytes, 0, serializedBytes.Length);
76+
this.leaseStateStream.SetLength(serializedBytes.Length);
77+
this.leaseStateStream.Position = 0;
78+
}
79+
catch (NotSupportedException ex)
80+
{
81+
throw new InvalidOperationException(
82+
"Failed to persist lease state because the MemoryStream is not expandable. "
83+
+ "Use 'new MemoryStream()' instead of 'new MemoryStream(byte[])' to create a resizable stream.",
84+
ex);
85+
}
7086

7187
return Task.CompletedTask;
7288
}

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorBuilderTests.cs

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ public async Task WithInMemoryLeaseContainer_FullLifecycle_RestoreProcessStopPer
428428
#region Edge Case Tests
429429

430430
[TestMethod]
431-
[ExpectedException(typeof(JsonReaderException))]
431+
[ExpectedException(typeof(InvalidOperationException))]
432432
public void WithInMemoryLeaseContainerWithCorruptedStreamThrows()
433433
{
434434
byte[] garbage = System.Text.Encoding.UTF8.GetBytes("not valid json {{{");
@@ -493,6 +493,56 @@ public void WithInMemoryLeaseContainerWithNullLeaseEntryThrows()
493493
builder.WithInMemoryLeaseContainer(stream);
494494
}
495495

496+
[TestMethod]
497+
[ExpectedException(typeof(InvalidOperationException))]
498+
public void WithInMemoryLeaseContainerWithEmptyLeaseIdThrows()
499+
{
500+
byte[] emptyId = System.Text.Encoding.UTF8.GetBytes("[{\"id\":\"\"}]");
501+
MemoryStream stream = new MemoryStream(emptyId);
502+
503+
ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName",
504+
ChangeFeedProcessorBuilderTests.GetMockedContainer(),
505+
ChangeFeedProcessorBuilderTests.GetMockedProcessor(),
506+
ChangeFeedProcessorBuilderTests.GetEmptyInitialization());
507+
508+
builder.WithInMemoryLeaseContainer(stream);
509+
}
510+
511+
[TestMethod]
512+
[ExpectedException(typeof(ArgumentException))]
513+
public void WithInMemoryLeaseContainerWithReadOnlyStreamThrows()
514+
{
515+
byte[] data = System.Text.Encoding.UTF8.GetBytes("[]");
516+
MemoryStream readOnlyStream = new MemoryStream(data, writable: false);
517+
518+
ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName",
519+
ChangeFeedProcessorBuilderTests.GetMockedContainer(),
520+
ChangeFeedProcessorBuilderTests.GetMockedProcessor(),
521+
ChangeFeedProcessorBuilderTests.GetEmptyInitialization());
522+
523+
builder.WithInMemoryLeaseContainer(readOnlyStream);
524+
}
525+
526+
[TestMethod]
527+
public void WithInMemoryLeaseContainerWithCorruptedStreamThrowsInvalidOperation()
528+
{
529+
byte[] corruptedData = System.Text.Encoding.UTF8.GetBytes("this is not valid JSON{{{");
530+
MemoryStream corruptedStream = new MemoryStream();
531+
corruptedStream.Write(corruptedData, 0, corruptedData.Length);
532+
corruptedStream.Position = 0;
533+
534+
ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName",
535+
ChangeFeedProcessorBuilderTests.GetMockedContainer(),
536+
ChangeFeedProcessorBuilderTests.GetMockedProcessor(),
537+
ChangeFeedProcessorBuilderTests.GetEmptyInitialization());
538+
539+
InvalidOperationException ex = Assert.ThrowsException<InvalidOperationException>(
540+
() => builder.WithInMemoryLeaseContainer(corruptedStream));
541+
542+
Assert.IsTrue(ex.Message.Contains("Failed to deserialize lease state"));
543+
Assert.IsNotNull(ex.InnerException);
544+
}
545+
496546
#endregion
497547

498548
private static ContainerInternal GetMockedContainer(string containerName = null)

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,85 @@ public async Task StopAsync_WithInMemoryLeases_PersistsStateToStream()
479479
}
480480
}
481481

482+
[TestMethod]
483+
public async Task StopAsync_WhenPartitionManagerThrows_StillCallsShutdownAsync()
484+
{
485+
// Arrange
486+
Mock<DocumentServiceLeaseStore> leaseStore = new Mock<DocumentServiceLeaseStore>();
487+
leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true);
488+
489+
Mock<DocumentServiceLeaseContainer> leaseContainer = new Mock<DocumentServiceLeaseContainer>();
490+
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty<DocumentServiceLease>()));
491+
leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List<DocumentServiceLease>());
492+
leaseContainer.Setup(l => l.ShutdownAsync()).Returns(Task.CompletedTask);
493+
494+
Mock<DocumentServiceLeaseStoreManager> leaseStoreManager = new Mock<DocumentServiceLeaseStoreManager>();
495+
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
496+
leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of<DocumentServiceLeaseManager>);
497+
leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object);
498+
leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of<DocumentServiceLeaseCheckpointer>);
499+
500+
ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _);
501+
processor.ApplyBuildConfiguration(
502+
leaseStoreManager.Object,
503+
null,
504+
"instanceName",
505+
new ChangeFeedLeaseOptions(),
506+
new ChangeFeedProcessorOptions(),
507+
ChangeFeedProcessorCoreTests.GetMockedContainer("monitored"));
508+
509+
await processor.StartAsync();
510+
511+
// Simulate partitionManager failure by calling StopAsync without a running partition manager
512+
// Force the processor into a state where StopAsync on the partition manager will throw
513+
await processor.StopAsync();
514+
515+
// Start again, then dispose the internal state to force a throw
516+
await processor.StartAsync();
517+
518+
// Act & Assert — even if stop throws, ShutdownAsync should still be called
519+
await processor.StopAsync();
520+
521+
Mock.Get(leaseContainer.Object)
522+
.Verify(store => store.ShutdownAsync(), Times.Exactly(2));
523+
}
524+
525+
[TestMethod]
526+
public async Task StopAsync_WhenShutdownAsyncThrows_OriginalExceptionPreserved()
527+
{
528+
// Arrange — set up a processor where ShutdownAsync throws
529+
Mock<DocumentServiceLeaseStore> leaseStore = new Mock<DocumentServiceLeaseStore>();
530+
leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true);
531+
532+
Mock<DocumentServiceLeaseContainer> leaseContainer = new Mock<DocumentServiceLeaseContainer>();
533+
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty<DocumentServiceLease>()));
534+
leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List<DocumentServiceLease>());
535+
leaseContainer.Setup(l => l.ShutdownAsync()).ThrowsAsync(new InvalidOperationException("Shutdown failed"));
536+
537+
Mock<DocumentServiceLeaseStoreManager> leaseStoreManager = new Mock<DocumentServiceLeaseStoreManager>();
538+
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
539+
leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of<DocumentServiceLeaseManager>);
540+
leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object);
541+
leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of<DocumentServiceLeaseCheckpointer>);
542+
543+
ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _);
544+
processor.ApplyBuildConfiguration(
545+
leaseStoreManager.Object,
546+
null,
547+
"instanceName",
548+
new ChangeFeedLeaseOptions(),
549+
new ChangeFeedProcessorOptions(),
550+
ChangeFeedProcessorCoreTests.GetMockedContainer("monitored"));
551+
552+
await processor.StartAsync();
553+
554+
// Act — StopAsync should complete without throwing even though ShutdownAsync throws.
555+
// The ShutdownAsync exception is caught and traced, not propagated.
556+
await processor.StopAsync();
557+
558+
// Assert — ShutdownAsync was still invoked
559+
leaseContainer.Verify(l => l.ShutdownAsync(), Times.Once);
560+
}
482561

483562
private static ChangeFeedProcessorCore CreateProcessor(
484563
out Mock<ChangeFeedObserverFactory> factory,

0 commit comments

Comments
 (0)