Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ public override async Task StartAsync()

// Determine whether we need to apply the StartTime back-off compensation
// introduced by PR #5617 to avoid missing writes during async lease acquisition.
// AllVersionsAndDeletes (AVAD) is exempt because AVAD uses LSN-based continuation
// (IfNoneMatch: *) rather than RFC1123 IfModifiedSince, so the seconds-precision
// rounding issue does not apply. See PR #5825 for details.
// AllVersionsAndDeletes (AVAD) is exempt for two related reasons:
// 1. AVAD uses LSN-based continuation (IfNoneMatch: *) rather than RFC1123
// IfModifiedSince, so the seconds-precision rounding issue does not apply.
// 2. The AVAD endpoint rejects an explicit StartTime on a null-continuation
// lease with HTTP 400 (#5846), which was the regression introduced by #5617.
// See PRs #5825 and #5852 for details. Do not drop the mode guard without
// re-validating both #5268 and #5846.
bool shouldAnchorStartTime =
!this.changeFeedProcessorOptions.StartFromBeginning
&& this.changeFeedProcessorOptions.StartTime == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,67 @@ public async Task WhenACFPInAVADModeUsesWithStartFromBeginningExpectExceptionTes
actual: exception.Message);
}

[TestMethod]
[Timeout(120000)]
[Owner("ntripician")]
[Description("Regression test for issue #5846 (https://github.com/Azure/azure-cosmos-dotnet-v3/issues/5846). " +
"PR #5617 introduced an unconditional StartTime backfill in ChangeFeedProcessorCore.StartAsync() that " +
"broke AVAD push processors on cold start (the AVAD endpoint 400s on an explicit StartTime against a " +
"null-continuation lease). Verifies an AVAD ChangeFeedProcessor started cold (no WithStartTime / " +
"WithStartFromBeginning, empty lease container) actually delivers an inserted document to the observer.")]
public async Task TestAllVersionsAndDeletesProcessor_ColdStart_DoesNotFail()
{
ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes);
ManualResetEvent docDelivered = new ManualResetEvent(false);
Exception exception = default;

ChangeFeedProcessor processor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
processorName: "regressionProcessor",
onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoActivity>> docs, CancellationToken token) =>
{
if (docs != null && docs.Count > 0)
{
docDelivered.Set();
}

return Task.CompletedTask;
})
.WithInstanceName(Guid.NewGuid().ToString())
.WithLeaseContainer(this.LeaseContainer)
.WithErrorNotification((leaseToken, error) =>
{
exception = error.InnerException ?? error;

return Task.CompletedTask;
})
.Build();

await processor.StartAsync();

try
{
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);

await monitoredContainer.CreateItemAsync<ToDoActivity>(
new ToDoActivity { id = "1", pk = "1", description = "AVAD cold-start regression test for #5846." },
partitionKey: new PartitionKey("1"));

bool received = docDelivered.WaitOne(TimeSpan.FromSeconds(30));

if (exception != default)
{
Assert.Fail($"AVAD CFP cold-start regression (#5846) surfaced an error: {exception.Message}");
}

Assert.IsTrue(received, "AVAD CFP cold-start (#5846) did not deliver any change to the observer within the timeout.");
}
finally
{
await processor.StopAsync();
}
}

[TestMethod]
[Owner("trivediyash")]
[Description("Validates that ConflictResolutionTimestampInSeconds getter throws JsonException when value is zero.")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,177 @@ public async Task StartAsync_DoesNotSetStartTime_WhenStartFromBeginning()
}
}

// Defends #5846 (https://github.com/Azure/azure-cosmos-dotnet-v3/issues/5846) — AC1.
// AVAD push processor cold-start MUST NOT have StartTime backfilled, because the AVAD
// endpoint rejects an explicit StartTime on a null-continuation lease with HTTP 400.
[TestMethod]
public async Task StartAsync_DoesNotSetStartTime_WhenAllVersionsAndDeletesMode()
{
Mock<DocumentServiceLeaseStore> leaseStore = new Mock<DocumentServiceLeaseStore>();
leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true);

Mock<DocumentServiceLeaseContainer> leaseContainer = new Mock<DocumentServiceLeaseContainer>();
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty<DocumentServiceLease>()));
leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List<DocumentServiceLease>());

Mock<DocumentServiceLeaseStoreManager> leaseStoreManager = new Mock<DocumentServiceLeaseStoreManager>();
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of<DocumentServiceLeaseManager>);
leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object);
leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of<DocumentServiceLeaseCheckpointer>);

ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions
{
Mode = ChangeFeedMode.AllVersionsAndDeletes,
};

ChangeFeedProcessorCore processor = null;
try
{
processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _);
processor.ApplyBuildConfiguration(
leaseStoreManager.Object,
null,
"instanceName",
new ChangeFeedLeaseOptions(),
options,
ChangeFeedProcessorCoreTests.GetMockedContainer("monitored"));

await processor.StartAsync();

Assert.IsNull(options.StartTime);
}
finally
{
if (processor != null)
{
await processor.StopAsync();
}
}
}

// Defends #5268 (https://github.com/Azure/azure-cosmos-dotnet-v3/issues/5268) — AC7.
// Symmetric companion to AC1: explicitly sets Mode = LatestVersion (rather than relying on
// the default at ChangeFeedProcessorOptions.cs) so that a future contributor over-broadening
// the new Mode != AllVersionsAndDeletes guard regresses #5268 loudly.
[TestMethod]
public async Task StartAsync_SetsStartTime_WhenLatestVersionMode_Explicit()
{
Mock<DocumentServiceLeaseStore> leaseStore = new Mock<DocumentServiceLeaseStore>();
leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true);

Mock<DocumentServiceLeaseContainer> leaseContainer = new Mock<DocumentServiceLeaseContainer>();
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty<DocumentServiceLease>()));
leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List<DocumentServiceLease>());

Mock<DocumentServiceLeaseStoreManager> leaseStoreManager = new Mock<DocumentServiceLeaseStoreManager>();
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of<DocumentServiceLeaseManager>);
leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object);
leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of<DocumentServiceLeaseCheckpointer>);

ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions
{
Mode = ChangeFeedMode.LatestVersion,
};

ChangeFeedProcessorCore processor = null;
try
{
processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _);
processor.ApplyBuildConfiguration(
leaseStoreManager.Object,
null,
"instanceName",
new ChangeFeedLeaseOptions(),
options,
ChangeFeedProcessorCoreTests.GetMockedContainer("monitored"));

DateTime expectedApprox = DateTime.UtcNow.AddSeconds(-1);

await processor.StartAsync();

Assert.IsTrue(options.StartTime.HasValue);
Assert.AreEqual(DateTimeKind.Utc, options.StartTime.Value.Kind);
Assert.IsTrue(
Math.Abs((expectedApprox - options.StartTime.Value).TotalSeconds) < 5,
$"Expected StartTime within 5 seconds of {expectedApprox:O} but was {options.StartTime.Value:O}.");
}
finally
{
if (processor != null)
{
await processor.StopAsync();
}
}
}

// Defends SE-7 (lease backwards compatibility for customers upgrading from buggy 3.59.0 /
// 3.60.0-preview.0) — AC8. The buggy SDK returned HTTP 400 BEFORE persisting a continuation
// token, so the realistic upgrade-path lease state is an owned lease whose ContinuationToken
// is null or "". This test mocks both, asserts StartAsync() completes without exception, and
// confirms options.StartTime stays null on the AVAD path.
[TestMethod]
public async Task StartAsync_DoesNotSetStartTime_WhenAllVersionsAndDeletesMode_WithEmptyContinuationLease()
{
foreach (string continuationToken in new[] { null, string.Empty })
{
Mock<DocumentServiceLeaseStore> leaseStore = new Mock<DocumentServiceLeaseStore>();
leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true);

IEnumerable<DocumentServiceLease> ownedLeases = new List<DocumentServiceLease>()
{
new DocumentServiceLeaseCore()
{
LeaseId = "0",
LeaseToken = "0",
ContinuationToken = continuationToken,
},
};

Mock<DocumentServiceLeaseContainer> leaseContainer = new Mock<DocumentServiceLeaseContainer>();
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(ownedLeases));
leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List<DocumentServiceLease>());

Mock<DocumentServiceLeaseStoreManager> leaseStoreManager = new Mock<DocumentServiceLeaseStoreManager>();
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of<DocumentServiceLeaseManager>);
leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object);
leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of<DocumentServiceLeaseCheckpointer>);

ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions
{
Mode = ChangeFeedMode.AllVersionsAndDeletes,
};

ChangeFeedProcessorCore processor = null;
try
{
processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _);
processor.ApplyBuildConfiguration(
leaseStoreManager.Object,
null,
"instanceName",
new ChangeFeedLeaseOptions(),
options,
ChangeFeedProcessorCoreTests.GetMockedContainer("monitored"));

await processor.StartAsync();

Assert.IsNull(
options.StartTime,
$"StartTime must remain null for AVAD with ContinuationToken='{continuationToken ?? "<null>"}'.");
}
finally
{
if (processor != null)
{
await processor.StopAsync();
}
}
}
}

[TestMethod]
public async Task StartAsync_DoesNotSetStartTime_WhenAllVersionsAndDeletes()
{
Expand Down
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

#### Bugs Fixed

- [5852](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/5852) ChangeFeedProcessor: Fixes AllVersionsAndDeletes cold-start regression introduced by #5617
- [5870](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/5870) CrossRegionHedgingAvailabilityStrategy: Fixes StackOverflow in CrossRegionHedgingAvailabilityStrategy Observed in .NET Framework 4.7.2.

#### Other Changes
Expand Down Expand Up @@ -1945,6 +1946,7 @@ Below is a list of any know issues affecting the [recommended minimum version](#
| `FeedIterator` enters an infinite loop after a physical partition split occurs in a container using hierarchical partition keys. | Queries using prefix partition keys. | Rather than having the PK included in the query request options, filtering on top level hierarchical Pks should be done through where clauses. **NOTE:** This issue has been fixed in version 3.39.0 | [#4326](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/4326) |
| Single partition queries (queries explicitly targetted to single partition or any queries on collection that had single physical partition) that resume using continuation token after partition split can observe failure on SDK v3.38 and beyond. | Explicit query exeuction using continuation token will fail query execution if these conditions are met. | Turn off Optimistic Direct Execution during query execution either by setting EnableOptimisticDirectExecution to false in query request options or by setting environment variable AZURE_COSMOS_OPTIMISTIC_DIRECT_EXECUTION_ENABLED to false. | [#4432](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/4432) |
| An [Azure API](https://learn.microsoft.com/en-us/azure/virtual-machines/instance-metadata-service?tabs=linux) call is made to get the VM information. This call fails if cutomer is on non-Azure VM. | Although this call is made only once, during client initialization but this failure would come up into monitoring tool (e.g AppInsights, Datadog etc.) which leads to a confusion for a developer.| Turn off this call by setting environment variable COSMOS_DISABLE_IMDS_ACCESS to true. |[#4187](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/4187) |
| ChangeFeedProcessor in `ChangeFeedMode.AllVersionsAndDeletes` fails to start with HTTP 400 on cold start (no persisted lease continuation token). Affects versions [3.59.0](#3.59.0) and [3.60.0-preview.0](#3.60.0-preview.0). | New AVAD ChangeFeedProcessor deployments cannot start. Existing processors that already have a persisted lease continuation token are not affected; LatestVersion-mode processors are not affected. | Upgrade to the next release containing PR [#5852](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/5852), or downgrade to [3.57.0-preview.1](#3.57.0-preview.1) (only available as a preview release). Regression was introduced by [#5617](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/5617). | [#5846](https://github.com/Azure/azure-cosmos-dotnet-v3/issues/5846) |

## Release & Retirement dates

Expand Down
Loading