Skip to content

Commit ba6a81c

Browse files
yash2710jcocchidibahlfikirankumarkolliananth7592
authored
Change feed: Adds id and pk to ChangeFeedMetadata for delete operations (#5470)
## Description Adds id and partition key to ChangeFeedMetadata. This is populated for delete operations using all versions all deletes change feed mode so that when customers get the delete, they know what was deleted. Note it doesn't have the value of the document that was deleted only the id and partition key through which a document can be uniquely identified. Please note that user provided serializer is not being used. This PR includes commits from #5191 and additional simplifications and unit test changes ## Type of change New feature (non-breaking change which adds functionality) ## Basic Usage ### Single Partition Key Example ```csharp using Microsoft.Azure.Cosmos; public class Product { public string id { get; set; } public string categoryId { get; set; } public string name { get; set; } public decimal price { get; set; } } // Create Change Feed Processor with All Versions and Deletes mode ChangeFeedProcessor processor = container .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<Product>( processorName: "productProcessor", onChangesDelegate: async (context, changes, cancellationToken) => { foreach (ChangeFeedItem<Product> change in changes) { switch (change.Metadata.OperationType) { case ChangeFeedOperationType.Create: case ChangeFeedOperationType.Replace: Console.WriteLine($"Document: {change.Current.id}"); break; case ChangeFeedOperationType.Delete: // Access deleted document's id and partition key from metadata Console.WriteLine($"Deleted Document Id: {change.Metadata.Id}"); // PartitionKey is a dictionary: key = path name, value = partition key value string categoryId = change.Metadata.PartitionKey["categoryId"]?.ToString(); Console.WriteLine($"Partition Key: {categoryId}"); // Check if deleted due to TTL expiration if (change.Metadata.IsTimeToLiveExpired) { Console.WriteLine("Deleted via TTL expiration"); } // Cleanup related data await RemoveFromCacheAsync(change.Metadata.Id, categoryId); break; } } }) .WithInstanceName("instance1") .WithLeaseContainer(leaseContainer) .Build(); await processor.StartAsync(); ``` ### Hierarchical Partition Key Example ```csharp public class UserSession { public string id { get; set; } public string tenantId { get; set; } public string userId { get; set; } public string sessionId { get; set; } } // Container with HPK: ["/tenantId", "/userId", "/sessionId"] ChangeFeedProcessor processor = container .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<UserSession>( processorName: "sessionProcessor", onChangesDelegate: async (context, changes, cancellationToken) => { foreach (ChangeFeedItem<UserSession> change in changes) { if (change.Metadata.OperationType == ChangeFeedOperationType.Delete) { Console.WriteLine($"Deleted Session Id: {change.Metadata.Id}"); // PartitionKey dictionary contains all hierarchy levels in order string tenantId = change.Metadata.PartitionKey["tenantId"]?.ToString(); string userId = change.Metadata.PartitionKey["userId"]?.ToString(); string sessionId = change.Metadata.PartitionKey["sessionId"]?.ToString(); Console.WriteLine($"HPK: tenant={tenantId}, user={userId}, session={sessionId}"); await CleanupSessionAsync(tenantId, userId, sessionId); } } }) .WithInstanceName("instance1") .WithLeaseContainer(leaseContainer) .Build(); await processor.StartAsync(); ``` ### Cache Synchronization Example ```csharp ChangeFeedProcessor processor = container .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<dynamic>( processorName: "cacheSync", onChangesDelegate: async (context, changes, cancellationToken) => { foreach (ChangeFeedItem<dynamic> change in changes) { if (change.Metadata.OperationType == ChangeFeedOperationType.Delete) { // Get id and partition key from metadata string id = change.Metadata.Id; string pk = change.Metadata.PartitionKey.Values.FirstOrDefault()?.ToString(); // Remove from cache and search index await cache.RemoveAsync(id); await searchIndex.DeleteAsync(id); Console.WriteLine($"Removed {id} from cache and index"); } else { // Update cache and index for creates/updates await cache.SetAsync(change.Current.id.ToString(), change.Current); await searchIndex.IndexAsync(change.Current); } } }) .WithInstanceName("cacheSync1") .WithLeaseContainer(leaseContainer) .Build(); await processor.StartAsync(); ``` ## Property Details ### `ChangeFeedMetadata.Id` - **Type**: `string` - **Populated**: Delete operations only (null for create/replace) - **Description**: The document id of the deleted item - **Example**: `"order-12345"` ### `ChangeFeedMetadata.PartitionKey` - **Type**: `Dictionary<string, object>` - **Populated**: Delete operations only (null for create/replace) - **Description**: Dictionary with partition key path(s) as keys and values as objects - **Key Format**: Partition key property name without leading `/` - **Value Types**: Can be string, number, boolean, or null **Examples:** Single Partition Key (`/categoryId`): ```csharp { "categoryId": "electronics" } ``` Hierarchical Partition Key (`["/tenantId", "/userId", "/sessionId"]`): ```csharp { "tenantId": "tenant123", "userId": "user456", "sessionId": "session789" } ``` Mixed Types (`["/category", "/priority", "/isActive"]`): ```csharp { "category": "electronics", "priority": 1, "isActive": true } ``` ## Notes 1. **Only for Deletes**: `Id` and `PartitionKey` are only populated when `OperationType == Delete` 2. **No Document Body**: The deleted document body is not available (Current will be null) 3. **No Custom Serializer**: User-provided serializers are not applied to these metadata properties 4. **TTL Detection**: Use `IsTimeToLiveExpired` to distinguish TTL deletes from explicit deletes 5. **Container Setup Required**: Container must have `ChangeFeedPolicy.FullFidelityRetention` configured --------- Co-authored-by: Justine Cocchi <jucocchi@microsoft.com> Co-authored-by: Dikshi Bahl <DikshiBahl@microsoft.com> Co-authored-by: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Co-authored-by: Kiran Kumar Kolli <kirankk@microsoft.com> Co-authored-by: ananth7592 <amudumba@microsoft.com>
1 parent 0be9384 commit ba6a81c

10 files changed

Lines changed: 1082 additions & 740 deletions

File tree

Microsoft.Azure.Cosmos/src/Resource/FullFidelity/ChangeFeedItem.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,22 @@ namespace Microsoft.Azure.Cosmos
5757
class ChangeFeedItem<T>
5858
{
5959
/// <summary>
60-
/// The full fidelity change feed current item.
60+
/// The current version of the item for all versions and deletes change feed mode.
61+
/// It is always null for delete change feed operations.
6162
/// </summary>
6263
[JsonProperty(PropertyName = "current")]
6364
[JsonPropertyName("current")]
6465
public T Current { get; set; }
6566

6667
/// <summary>
67-
/// The full fidelity change feed metadata.
68+
/// The item metadata for all versions and deletes change feed mode.
6869
/// </summary>
6970
[JsonProperty(PropertyName = "metadata", NullValueHandling = NullValueHandling.Ignore)]
7071
[JsonPropertyName("metadata")]
7172
public ChangeFeedMetadata Metadata { get; set; }
7273

7374
/// <summary>
74-
/// For delete operations, previous image is always going to be provided. The previous image on replace operations is not going to be exposed by default and requires account-level or container-level opt-in.
75+
/// The previous version of the item for all versions and deletes change feed mode. The previous version on delete and replace operations is not exposed by default and requires container-level opt-in. Refer to https://aka.ms/cosmosdb-change-feed-deletes for more information.
7576
/// </summary>
7677
[JsonProperty(PropertyName = "previous", NullValueHandling = NullValueHandling.Ignore)]
7778
[JsonPropertyName("previous")]

Microsoft.Azure.Cosmos/src/Resource/FullFidelity/ChangeFeedMetadata.cs

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,54 +5,125 @@
55
namespace Microsoft.Azure.Cosmos
66
{
77
using System;
8-
using System.Text.Json;
8+
using System.Collections.Generic;
99
using Microsoft.Azure.Cosmos.Resource.FullFidelity;
10-
using Microsoft.Azure.Cosmos.Resource.FullFidelity.Converters;
1110
using Microsoft.Azure.Documents;
1211
using Newtonsoft.Json;
1312
using Newtonsoft.Json.Converters;
1413

1514
/// <summary>
1615
/// The metadata of a change feed resource with <see cref="ChangeFeedMode"/> is initialized to <see cref="ChangeFeedMode.AllVersionsAndDeletes"/>.
1716
/// </summary>
18-
[System.Text.Json.Serialization.JsonConverter(typeof(ChangeFeedMetadataConverter))]
1917
#if PREVIEW
2018
public
2119
#else
2220
internal
2321
#endif
24-
class ChangeFeedMetadata
22+
class ChangeFeedMetadata
2523
{
24+
private readonly static DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
25+
2626
/// <summary>
2727
/// The change's conflict resolution timestamp.
2828
/// </summary>
29-
[JsonProperty(PropertyName = ChangeFeedMetadataFields.ConflictResolutionTimestamp, NullValueHandling = NullValueHandling.Ignore)]
30-
[JsonConverter(typeof(UnixDateTimeConverter))]
31-
public DateTime ConflictResolutionTimestamp { get; internal set; }
29+
[System.Text.Json.Serialization.JsonIgnore]
30+
[Newtonsoft.Json.JsonIgnore]
31+
public DateTime? ConflictResolutionTimestamp => this.ConflictResolutionTimestampInSeconds.HasValue ? UnixEpoch.AddSeconds(this.ConflictResolutionTimestampInSeconds.Value) : null;
32+
33+
[System.Text.Json.Serialization.JsonInclude]
34+
[System.Text.Json.Serialization.JsonPropertyName(ChangeFeedMetadataFields.ConflictResolutionTimestamp)]
35+
[JsonProperty(PropertyName = ChangeFeedMetadataFields.ConflictResolutionTimestamp)]
36+
internal double? ConflictResolutionTimestampInSeconds { get; set; }
3237

3338
/// <summary>
3439
/// The current change's logical sequence number.
3540
/// </summary>
41+
[System.Text.Json.Serialization.JsonInclude]
42+
[System.Text.Json.Serialization.JsonPropertyName(ChangeFeedMetadataFields.Lsn)]
3643
[JsonProperty(PropertyName = ChangeFeedMetadataFields.Lsn, NullValueHandling = NullValueHandling.Ignore)]
3744
public long Lsn { get; internal set; }
3845

3946
/// <summary>
4047
/// The change's feed operation type <see cref="ChangeFeedOperationType"/>.
4148
/// </summary>
49+
[Newtonsoft.Json.JsonConverter(typeof(StringEnumConverter))]
50+
[System.Text.Json.Serialization.JsonInclude]
51+
[System.Text.Json.Serialization.JsonPropertyName(ChangeFeedMetadataFields.OperationType)]
52+
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
4253
[JsonProperty(PropertyName = ChangeFeedMetadataFields.OperationType, NullValueHandling = NullValueHandling.Ignore)]
43-
[JsonConverter(typeof(StringEnumConverter))]
4454
public ChangeFeedOperationType OperationType { get; internal set; }
4555

4656
/// <summary>
4757
/// The previous change's logical sequence number.
4858
/// </summary>
59+
[System.Text.Json.Serialization.JsonInclude]
60+
[System.Text.Json.Serialization.JsonPropertyName(ChangeFeedMetadataFields.PreviousImageLSN)]
4961
[JsonProperty(PropertyName = ChangeFeedMetadataFields.PreviousImageLSN, NullValueHandling = NullValueHandling.Ignore)]
5062
public long PreviousLsn { get; internal set; }
5163

5264
/// <summary>
53-
/// Used to distinquish explicit deletes (e.g. via DeleteItem) from deletes caused by TTL expiration (a collection may define time-to-live policy for documents).
65+
/// Used to distinguish explicit deletes (e.g. via DeleteItem) from deletes caused by TTL expiration (a collection may define time-to-live policy for documents).
5466
/// </summary>
67+
[System.Text.Json.Serialization.JsonInclude]
68+
[System.Text.Json.Serialization.JsonPropertyName(ChangeFeedMetadataFields.TimeToLiveExpired)]
5569
[JsonProperty(PropertyName = ChangeFeedMetadataFields.TimeToLiveExpired, NullValueHandling = NullValueHandling.Ignore)]
5670
public bool IsTimeToLiveExpired { get; internal set; }
71+
72+
/// <summary>
73+
/// Applicable for delete operations only, otherwise null.
74+
/// The id of the previous item version.
75+
/// </summary>
76+
[System.Text.Json.Serialization.JsonInclude]
77+
[System.Text.Json.Serialization.JsonPropertyName(ChangeFeedMetadataFields.Id)]
78+
[JsonProperty(PropertyName = ChangeFeedMetadataFields.Id, NullValueHandling = NullValueHandling.Ignore)]
79+
public string Id { get; internal set; }
80+
81+
/// <summary>
82+
/// Applicable for delete operations only, otherwise null.
83+
/// The partition key of the previous item version represented as a dictionary where the key is the partition key property name
84+
/// and the value is the partition key property value. All levels of hierarchy will be present if a hierarchical partition key (HPK) is used.
85+
/// </summary>
86+
/// <remarks>
87+
/// <para>
88+
/// For single partition key containers, the dictionary will contain one entry with the partition key path name (without the leading '/')
89+
/// as the key and the partition key value as the value.
90+
/// </para>
91+
/// <para>
92+
/// For hierarchical partition key containers, the dictionary will contain multiple entries, one for each level of the hierarchy,
93+
/// as defined in the container's partition key definition.
94+
/// </para>
95+
/// <para>
96+
/// Example for a single partition key container with partition key path "/tenantId":
97+
/// <code>
98+
/// {
99+
/// "tenantId": "tenant123"
100+
/// }
101+
/// </code>
102+
/// </para>
103+
/// <para>
104+
/// Example for a hierarchical partition key container with partition key paths ["/tenantId", "/userId", "/sessionId"]:
105+
/// <code>
106+
/// {
107+
/// "tenantId": "tenant123",
108+
/// "userId": "user456",
109+
/// "sessionId": "session789"
110+
/// }
111+
/// </code>
112+
/// </para>
113+
/// <para>
114+
/// The partition key values can be of different types (string, number, boolean, null) depending on the document's schema.
115+
/// For example, with partition key paths ["/category", "/priority"]:
116+
/// <code>
117+
/// {
118+
/// "category": "electronics",
119+
/// "priority": 1
120+
/// }
121+
/// </code>
122+
/// </para>
123+
/// </remarks>
124+
[System.Text.Json.Serialization.JsonInclude]
125+
[System.Text.Json.Serialization.JsonPropertyName(ChangeFeedMetadataFields.PartitionKey)]
126+
[JsonProperty(PropertyName = ChangeFeedMetadataFields.PartitionKey, NullValueHandling = NullValueHandling.Ignore)]
127+
public Dictionary<string, object> PartitionKey { get; internal set; }
57128
}
58129
}

Microsoft.Azure.Cosmos/src/Resource/FullFidelity/ChangeFeedMetadataFields.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,7 @@ internal class ChangeFeedMetadataFields
1111
public const string OperationType = "operationType";
1212
public const string PreviousImageLSN = "previousImageLSN";
1313
public const string TimeToLiveExpired = "timeToLiveExpired";
14+
public const string Id = "id";
15+
public const string PartitionKey = "partitionKey";
1416
}
1517
}

Microsoft.Azure.Cosmos/src/Resource/FullFidelity/Converters/ChangeFeedMetadataConverter.cs

Lines changed: 0 additions & 92 deletions
This file was deleted.

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CFP/AllVersionsAndDeletes/BuilderTests.cs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public async Task WhenADocumentIsCreatedWithTtlSetThenTheDocumentIsDeletedTestsA
6969
Assert.IsTrue(DateTime.TryParse(s: change.Metadata.ConflictResolutionTimestamp.ToString(), out _), message: "Invalid csrt must be a datetime value.");
7070
Assert.IsTrue(change.Metadata.Lsn > 0, message: "Invalid lsn must be a long value.");
7171
Assert.IsFalse(change.Metadata.IsTimeToLiveExpired);
72-
72+
Assert.IsNull(change.Metadata.Id);
73+
Assert.IsNull(change.Metadata.PartitionKey);
7374
// previous
7475
Assert.IsNull(change.Previous);
7576
}
@@ -84,10 +85,9 @@ public async Task WhenADocumentIsCreatedWithTtlSetThenTheDocumentIsDeletedTestsA
8485
Assert.IsTrue(change.Metadata.IsTimeToLiveExpired);
8586

8687
// previous
87-
Assert.AreEqual(expected: "1", actual: change.Previous.id.ToString());
88-
Assert.AreEqual(expected: "1", actual: change.Previous.pk.ToString());
89-
Assert.AreEqual(expected: "Testing TTL on CFP.", actual: change.Previous.description.ToString());
90-
Assert.AreEqual(expected: ttlInSeconds, actual: change.Previous.ttl);
88+
Assert.AreEqual(expected: "1", actual: change.Metadata.Id.ToString());
89+
Assert.AreEqual(expected: "1", actual: change.Metadata.PartitionKey.Values.FirstOrDefault());
90+
Assert.IsNull(change.Previous);
9191

9292
// stop after reading delete since it is the last document in feed.
9393
stopwatch.Stop();
@@ -145,7 +145,7 @@ public async Task WhenADocumentIsCreatedWithTtlSetThenTheDocumentIsDeletedTestsA
145145
[TestMethod]
146146
[Owner("philipthomas-MSFT")]
147147
[Description("Scenario: When a document is created, then updated, and finally deleted, there should be 3 changes that will appear for that " +
148-
"document when using ChangeFeedProcessor with AllVersionsAndDeletes set as the ChangeFeedMode.")]
148+
"document when using ChangeFeedProcessor with AllVersionsAndDeletes set as the ChangeFeedMode and enablePreviousImageForDeleteInFFCF true")]
149149
public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
150150
{
151151
ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes);
@@ -155,6 +155,8 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
155155
ChangeFeedProcessor processor = monitoredContainer
156156
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> docs, CancellationToken token) =>
157157
{
158+
string metadataId = default;
159+
string metadataPk = default;
158160
string id = default;
159161
string pk = default;
160162
string description = default;
@@ -171,14 +173,13 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
171173
}
172174
else
173175
{
174-
id = change.Previous.id.ToString();
175-
pk = change.Previous.pk.ToString();
176-
description = change.Previous.description.ToString();
176+
metadataId = change.Metadata.Id.ToString();
177+
metadataPk = change.Metadata.PartitionKey.Values.FirstOrDefault().ToString();
177178
}
178179

179180
ChangeFeedOperationType operationType = change.Metadata.OperationType;
180181
long previousLsn = change.Metadata.PreviousLsn;
181-
DateTime m = change.Metadata.ConflictResolutionTimestamp;
182+
DateTime? m = change.Metadata.ConflictResolutionTimestamp;
182183
long lsn = change.Metadata.Lsn;
183184
bool isTimeToLiveExpired = change.Metadata.IsTimeToLiveExpired;
184185
}
@@ -211,8 +212,9 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
211212

212213
ChangeFeedItem<dynamic> deleteChange = docs.ElementAt(2);
213214
Assert.IsNull(deleteChange.Current.id);
215+
Assert.AreEqual(expected: "1", actual: deleteChange.Metadata.Id.ToString());
216+
Assert.AreEqual(expected: "1", actual: deleteChange.Metadata.PartitionKey.Values.FirstOrDefault());
214217
Assert.AreEqual(expected: deleteChange.Metadata.OperationType, actual: ChangeFeedOperationType.Delete);
215-
Assert.AreEqual(expected: replaceChange.Metadata.Lsn, actual: deleteChange.Metadata.PreviousLsn);
216218
Assert.IsNotNull(deleteChange.Previous);
217219
Assert.AreEqual(expected: "1", actual: deleteChange.Previous.id.ToString());
218220
Assert.AreEqual(expected: "1", actual: deleteChange.Previous.pk.ToString());

0 commit comments

Comments
 (0)