Change feed: Adds id and pk to ChangeFeedMetadata for delete operations#5191
Change feed: Adds id and pk to ChangeFeedMetadata for delete operations#5191
Conversation
|
Please update PR description with usage samples and also call out that user-serializer is not used. |
| /// The metadata of a change feed resource with <see cref="ChangeFeedMode"/> is initialized to <see cref="ChangeFeedMode.AllVersionsAndDeletes"/>. | ||
| /// </summary> | ||
| [System.Text.Json.Serialization.JsonConverter(typeof(ChangeFeedMetadataConverter))] | ||
| [JsonConverter(typeof(ChangeFeedMetadataNewtonSoftConverter))] |
There was a problem hiding this comment.
Why is newtonsoftconverter needed?
There was a problem hiding this comment.
because we added list of tuple<string, object> for partition key in the metadata which the default JsonProperty is not able to deserialize
There was a problem hiding this comment.
True all changeFeed is applied with user-serializer.
There was a problem hiding this comment.
Thinking loud here, for non-primitive types respective de-serializers are needed.
For primitive types may be it not needed., thoughts?
There was a problem hiding this comment.
One question I have is what is the purpose of moving all the annotations from individual properties to the class?
There was a problem hiding this comment.
One drawback I see of this approach is that we loose the capability to do different null value handling behavior for properties
There was a problem hiding this comment.
I ran into it and what I observed was when you have a class-level JsonConverter attribute, the converter takes full control. Any property-level annotations (like [JsonPropertyName], [JsonProperty], etc.) are bypassed.
kirankumarkolli
left a comment
There was a problem hiding this comment.
What's the use-case for NewtonSoftConverter needed?
|
Out-of-curiosity, is there anything holding up this PR? |
Justine already has the samples ready and she will update them once the PR goes out. |
because of the introduction of List<(string, object)>for which a custom serializator was needed to serialize/deserialize. |
Sorry somehow it slipped through the cracks. will get this going. |
|
3daf85e to
2658b33
Compare
|
Given that this received an approval last week, is there anything still blocking this PR? ( @kirankumarkolli ) |
|
/azp run |
|
Azure Pipelines will not run the associated pipelines, because the pull request was updated after the run command was issued. Review the pull request again and issue a new run command. |
|
/azp run |
|
Azure Pipelines will not run the associated pipelines, because the pull request was updated after the run command was issued. Review the pull request again and issue a new run command. |
|
/azp run |
|
Azure Pipelines will not run the associated pipelines, because the pull request was updated after the run command was issued. Review the pull request again and issue a new run command. |
| @@ -16,43 +16,49 @@ namespace Microsoft.Azure.Cosmos | |||
| /// The metadata of a change feed resource with <see cref="ChangeFeedMode"/> is initialized to <see cref="ChangeFeedMode.AllVersionsAndDeletes"/>. | |||
| /// </summary> | |||
| [System.Text.Json.Serialization.JsonConverter(typeof(ChangeFeedMetadataConverter))] | |||
There was a problem hiding this comment.
On overall approach: new possible approach is to leverage https://learn.microsoft.com/en-us/dotnet/api/system.text.json.serialization.jsonincludeattribute?view=net-9.0&viewFallbackFrom=netstandard-2.0 (assuming that we can use it with-out rebasing to latest .NET versions.
Contract changes needs refresh |
…ns (#5470) ## Description Adds id and partition key to ChangeFeedMetadata. This is populated for delete operations using all versions all deletes change feed mode so that when customers get the delete, they know what was deleted. Note it doesn't have the value of the document that was deleted only the id and partition key through which a document can be uniquely identified. Please note that user provided serializer is not being used. This PR includes commits from #5191 and additional simplifications and unit test changes ## Type of change New feature (non-breaking change which adds functionality) ## Basic Usage ### Single Partition Key Example ```csharp using Microsoft.Azure.Cosmos; public class Product { public string id { get; set; } public string categoryId { get; set; } public string name { get; set; } public decimal price { get; set; } } // Create Change Feed Processor with All Versions and Deletes mode ChangeFeedProcessor processor = container .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<Product>( processorName: "productProcessor", onChangesDelegate: async (context, changes, cancellationToken) => { foreach (ChangeFeedItem<Product> change in changes) { switch (change.Metadata.OperationType) { case ChangeFeedOperationType.Create: case ChangeFeedOperationType.Replace: Console.WriteLine($"Document: {change.Current.id}"); break; case ChangeFeedOperationType.Delete: // Access deleted document's id and partition key from metadata Console.WriteLine($"Deleted Document Id: {change.Metadata.Id}"); // PartitionKey is a dictionary: key = path name, value = partition key value string categoryId = change.Metadata.PartitionKey["categoryId"]?.ToString(); Console.WriteLine($"Partition Key: {categoryId}"); // Check if deleted due to TTL expiration if (change.Metadata.IsTimeToLiveExpired) { Console.WriteLine("Deleted via TTL expiration"); } // Cleanup related data await RemoveFromCacheAsync(change.Metadata.Id, categoryId); break; } } }) .WithInstanceName("instance1") .WithLeaseContainer(leaseContainer) .Build(); await processor.StartAsync(); ``` ### Hierarchical Partition Key Example ```csharp public class UserSession { public string id { get; set; } public string tenantId { get; set; } public string userId { get; set; } public string sessionId { get; set; } } // Container with HPK: ["/tenantId", "/userId", "/sessionId"] ChangeFeedProcessor processor = container .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<UserSession>( processorName: "sessionProcessor", onChangesDelegate: async (context, changes, cancellationToken) => { foreach (ChangeFeedItem<UserSession> change in changes) { if (change.Metadata.OperationType == ChangeFeedOperationType.Delete) { Console.WriteLine($"Deleted Session Id: {change.Metadata.Id}"); // PartitionKey dictionary contains all hierarchy levels in order string tenantId = change.Metadata.PartitionKey["tenantId"]?.ToString(); string userId = change.Metadata.PartitionKey["userId"]?.ToString(); string sessionId = change.Metadata.PartitionKey["sessionId"]?.ToString(); Console.WriteLine($"HPK: tenant={tenantId}, user={userId}, session={sessionId}"); await CleanupSessionAsync(tenantId, userId, sessionId); } } }) .WithInstanceName("instance1") .WithLeaseContainer(leaseContainer) .Build(); await processor.StartAsync(); ``` ### Cache Synchronization Example ```csharp ChangeFeedProcessor processor = container .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<dynamic>( processorName: "cacheSync", onChangesDelegate: async (context, changes, cancellationToken) => { foreach (ChangeFeedItem<dynamic> change in changes) { if (change.Metadata.OperationType == ChangeFeedOperationType.Delete) { // Get id and partition key from metadata string id = change.Metadata.Id; string pk = change.Metadata.PartitionKey.Values.FirstOrDefault()?.ToString(); // Remove from cache and search index await cache.RemoveAsync(id); await searchIndex.DeleteAsync(id); Console.WriteLine($"Removed {id} from cache and index"); } else { // Update cache and index for creates/updates await cache.SetAsync(change.Current.id.ToString(), change.Current); await searchIndex.IndexAsync(change.Current); } } }) .WithInstanceName("cacheSync1") .WithLeaseContainer(leaseContainer) .Build(); await processor.StartAsync(); ``` ## Property Details ### `ChangeFeedMetadata.Id` - **Type**: `string` - **Populated**: Delete operations only (null for create/replace) - **Description**: The document id of the deleted item - **Example**: `"order-12345"` ### `ChangeFeedMetadata.PartitionKey` - **Type**: `Dictionary<string, object>` - **Populated**: Delete operations only (null for create/replace) - **Description**: Dictionary with partition key path(s) as keys and values as objects - **Key Format**: Partition key property name without leading `/` - **Value Types**: Can be string, number, boolean, or null **Examples:** Single Partition Key (`/categoryId`): ```csharp { "categoryId": "electronics" } ``` Hierarchical Partition Key (`["/tenantId", "/userId", "/sessionId"]`): ```csharp { "tenantId": "tenant123", "userId": "user456", "sessionId": "session789" } ``` Mixed Types (`["/category", "/priority", "/isActive"]`): ```csharp { "category": "electronics", "priority": 1, "isActive": true } ``` ## Notes 1. **Only for Deletes**: `Id` and `PartitionKey` are only populated when `OperationType == Delete` 2. **No Document Body**: The deleted document body is not available (Current will be null) 3. **No Custom Serializer**: User-provided serializers are not applied to these metadata properties 4. **TTL Detection**: Use `IsTimeToLiveExpired` to distinguish TTL deletes from explicit deletes 5. **Container Setup Required**: Container must have `ChangeFeedPolicy.FullFidelityRetention` configured --------- Co-authored-by: Justine Cocchi <jucocchi@microsoft.com> Co-authored-by: Dikshi Bahl <DikshiBahl@microsoft.com> Co-authored-by: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Co-authored-by: Kiran Kumar Kolli <kirankk@microsoft.com> Co-authored-by: ananth7592 <amudumba@microsoft.com>
Adds id and partition key to ChangeFeedMetadata. This is populated for delete operations using all versions and deletes change feed mode so that when customers get the delete, they know what was deleted. Note it doesn't have the value of the document that was deleted only the id and partiton key through which a document can be uniquely identified.
Please note that user provided serializer is not being used.
New feature (non-breaking change which adds functionality)