Skip to content

Commit 516000b

Browse files
ChangeFeedProcessor: Fixes blocking review comments for lease export
1. Eager MemoryStream validation: WithInMemoryLeaseContainer now checks that the stream is expandable at config time via SetLength(same length). A MemoryStream(byte[]) is writable but not expandable - SetLength/Write throws NotSupportedException when serialized data exceeds original capacity. Failing fast at config time prevents silent data loss at shutdown. 2. StopAsync propagates ShutdownAsync exceptions: Removed try/finally and catch-swallow pattern. ShutdownAsync runs sequentially after partitionManager.StopAsync() so persistence failures propagate to the caller. partitionManager.StopAsync() rarely fails, and when it does the caller already receives that exception. Silently swallowing persistence failures defeats the feature guarantee. 3. Log timing: Moved 'Processor stopped.' trace after ShutdownAsync for consistency with StartAsync pattern. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 20d52c4 commit 516000b

2 files changed

Lines changed: 27 additions & 22 deletions

File tree

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,21 @@ public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(MemoryStrea
266266
throw new ArgumentException("The lease state stream must be writable so that state can be persisted on shutdown.", nameof(leaseState));
267267
}
268268

269+
// Verify the stream is expandable. A MemoryStream created via
270+
// new MemoryStream(byte[]) is writable but not expandable and will
271+
// throw NotSupportedException when ShutdownAsync tries to resize it.
272+
try
273+
{
274+
leaseState.SetLength(leaseState.Length);
275+
}
276+
catch (NotSupportedException)
277+
{
278+
throw new ArgumentException(
279+
"The lease state stream must be resizable. Use 'new MemoryStream()' and write bytes into it "
280+
+ "instead of 'new MemoryStream(byte[])' to create a resizable stream.",
281+
nameof(leaseState));
282+
}
283+
269284
this.ValidateNoLeaseContainerConfigured();
270285

271286
if (string.IsNullOrEmpty(this.InstanceName))

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -78,28 +78,18 @@ public override async Task StartAsync()
7878
public override async Task StopAsync()
7979
{
8080
DefaultTrace.TraceInformation("Stopping processor...");
81-
try
82-
{
83-
await this.partitionManager.StopAsync().ConfigureAwait(false);
84-
}
85-
finally
86-
{
87-
// Persist in-memory lease state even if partitionManager.StopAsync() throws,
88-
// so that progress is not silently lost.
89-
try
90-
{
91-
await this.documentServiceLeaseStoreManager
92-
.LeaseContainer
93-
.ShutdownAsync()
94-
.ConfigureAwait(false);
95-
}
96-
catch (Exception shutdownException)
97-
{
98-
Cosmos.Extensions.TraceException(shutdownException);
99-
DefaultTrace.TraceWarning(
100-
"Failed to persist in-memory lease state during shutdown.");
101-
}
102-
}
81+
await this.partitionManager.StopAsync().ConfigureAwait(false);
82+
83+
// ShutdownAsync persists in-memory lease state. Runs outside finally
84+
// because: (a) partitionManager.StopAsync() rarely fails, and when it
85+
// does the caller already receives that exception; (b) letting
86+
// ShutdownAsync exceptions propagate ensures callers know if
87+
// persistence failed — silently swallowing defeats the feature's
88+
// guarantee.
89+
await this.documentServiceLeaseStoreManager
90+
.LeaseContainer
91+
.ShutdownAsync()
92+
.ConfigureAwait(false);
10393

10494
DefaultTrace.TraceInformation("Processor stopped.");
10595
}

0 commit comments

Comments
 (0)