Skip to content

Commit 78deffe

Browse files
committed
Simplify implementation
1 parent dddde37 commit 78deffe

11 files changed

Lines changed: 207 additions & 949 deletions

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessor.cs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos
66
{
77
using System;
88
using System.Collections.Generic;
9+
using System.Text.Json;
910
using System.Threading;
1011
using System.Threading.Tasks;
1112

@@ -27,53 +28,45 @@ public abstract class ChangeFeedProcessor
2728
public abstract Task StopAsync();
2829

2930
/// <summary>
30-
/// Exports all leases from the lease container to a list of <see cref="LeaseExportData"/> objects.
31+
/// Exports all leases from the lease container.
3132
/// </summary>
3233
/// <param name="cancellationToken">A cancellation token to observe.</param>
33-
/// <returns>A list of exported lease data.</returns>
34+
/// <returns>A list of lease objects as JSON elements.</returns>
3435
/// <remarks>
3536
/// <para>
36-
/// The processor must be stopped before calling this method. If the processor is running,
37-
/// an <see cref="InvalidOperationException"/> will be thrown.
37+
/// Each exported lease is a JSON object representing the serialized lease state.
38+
/// The payload should not be modified and is intended to be passed directly to <see cref="ImportLeasesAsync"/>.
3839
/// </para>
3940
/// <para>
40-
/// The export includes all lease metadata including continuation tokens, ownership history,
41-
/// and custom properties. The exported data can be used to restore leases using <see cref="ImportLeasesAsync"/>.
41+
/// This operation can be performed while the processor is running.
4242
/// </para>
4343
/// </remarks>
44-
/// <exception cref="InvalidOperationException">Thrown when the processor is currently running.</exception>
4544
/// <exception cref="NotSupportedException">Thrown when the implementation does not support export.</exception>
46-
public virtual Task<IReadOnlyList<LeaseExportData>> ExportLeasesAsync(CancellationToken cancellationToken = default)
45+
public virtual Task<IReadOnlyList<JsonElement>> ExportLeasesAsync(CancellationToken cancellationToken = default)
4746
{
4847
throw new NotSupportedException("This ChangeFeedProcessor implementation does not support lease export.");
4948
}
5049

5150
/// <summary>
52-
/// Imports leases from a list of <see cref="LeaseExportData"/> objects into the lease container.
51+
/// Imports leases into the lease container.
5352
/// </summary>
54-
/// <param name="leases">The list of lease data to import.</param>
55-
/// <param name="overwriteExisting">Whether to overwrite existing leases with the same ID. Default is false.</param>
53+
/// <param name="leases">The list of lease objects as JSON elements to import.</param>
54+
/// <param name="overwriteExisting">Whether to overwrite existing leases with the same token. Default is false.</param>
5655
/// <param name="cancellationToken">A cancellation token to observe.</param>
5756
/// <returns>A task representing the asynchronous operation.</returns>
5857
/// <remarks>
5958
/// <para>
60-
/// The processor must be stopped before calling this method. If the processor is running,
61-
/// an <see cref="InvalidOperationException"/> will be thrown.
62-
/// </para>
63-
/// <para>
64-
/// When importing, the ownership is transferred to the current processor instance,
65-
/// and the ownership history is updated to reflect the import action.
59+
/// The lease objects should be the opaque JSON elements obtained from <see cref="ExportLeasesAsync"/>.
6660
/// </para>
6761
/// <para>
6862
/// If <paramref name="overwriteExisting"/> is false (default), existing leases will not be modified.
6963
/// If true, existing leases will be replaced with the imported data.
7064
/// </para>
7165
/// </remarks>
7266
/// <exception cref="ArgumentNullException">Thrown when <paramref name="leases"/> is null.</exception>
73-
/// <exception cref="InvalidOperationException">Thrown when the processor is currently running.</exception>
7467
/// <exception cref="NotSupportedException">Thrown when the implementation does not support import.</exception>
7568
public virtual Task ImportLeasesAsync(
76-
IReadOnlyList<LeaseExportData> leases,
69+
IReadOnlyList<JsonElement> leases,
7770
bool overwriteExisting = false,
7871
CancellationToken cancellationToken = default)
7972
{

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs

Lines changed: 38 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
66
{
77
using System;
88
using System.Collections.Generic;
9+
using System.Text.Json;
910
using System.Threading;
1011
using System.Threading.Tasks;
1112
using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping;
@@ -101,44 +102,37 @@ public override async Task StopAsync()
101102
}
102103
}
103104

104-
public override async Task<IReadOnlyList<LeaseExportData>> ExportLeasesAsync(CancellationToken cancellationToken = default)
105+
public override async Task<IReadOnlyList<JsonElement>> ExportLeasesAsync(CancellationToken cancellationToken = default)
105106
{
106-
// Wait for any ongoing start/stop operations to complete
107-
// If processor is running, this will wait until StopAsync is called by the user
108-
await this.runningLock.WaitAsync(cancellationToken).ConfigureAwait(false);
109-
try
107+
// Initialize if needed to access the lease container
108+
if (!this.initialized)
110109
{
111-
if (this.isRunning)
110+
await this.runningLock.WaitAsync(cancellationToken).ConfigureAwait(false);
111+
try
112112
{
113-
// Release lock and throw - user must stop the processor first
114-
throw new InvalidOperationException(
115-
"Cannot export leases while the ChangeFeedProcessor is running. " +
116-
"Please call StopAsync() before exporting leases.");
113+
if (!this.initialized)
114+
{
115+
await this.InitializeAsync().ConfigureAwait(false);
116+
}
117117
}
118-
119-
// Initialize if needed to access the lease container
120-
if (!this.initialized)
118+
finally
121119
{
122-
await this.InitializeAsync().ConfigureAwait(false);
120+
this.runningLock.Release();
123121
}
122+
}
124123

125-
DefaultTrace.TraceInformation("Exporting leases...");
126-
IReadOnlyList<LeaseExportData> exportedLeases = await this.documentServiceLeaseStoreManager
127-
.LeaseContainer
128-
.ExportLeasesAsync(this.instanceName, cancellationToken)
129-
.ConfigureAwait(false);
124+
DefaultTrace.TraceInformation("Exporting leases...");
125+
IReadOnlyList<JsonElement> exportedLeases = await this.documentServiceLeaseStoreManager
126+
.LeaseContainer
127+
.ExportLeasesAsync(cancellationToken)
128+
.ConfigureAwait(false);
130129

131-
DefaultTrace.TraceInformation("Exported {0} leases.", exportedLeases.Count);
132-
return exportedLeases;
133-
}
134-
finally
135-
{
136-
this.runningLock.Release();
137-
}
130+
DefaultTrace.TraceInformation("Exported {0} leases.", exportedLeases.Count);
131+
return exportedLeases;
138132
}
139133

140134
public override async Task ImportLeasesAsync(
141-
IReadOnlyList<LeaseExportData> leases,
135+
IReadOnlyList<JsonElement> leases,
142136
bool overwriteExisting = false,
143137
CancellationToken cancellationToken = default)
144138
{
@@ -147,36 +141,30 @@ public override async Task ImportLeasesAsync(
147141
throw new ArgumentNullException(nameof(leases));
148142
}
149143

150-
// Wait for any ongoing start/stop operations to complete
151-
await this.runningLock.WaitAsync(cancellationToken).ConfigureAwait(false);
152-
try
144+
// Initialize if needed to access the lease container
145+
if (!this.initialized)
153146
{
154-
if (this.isRunning)
147+
await this.runningLock.WaitAsync(cancellationToken).ConfigureAwait(false);
148+
try
155149
{
156-
// Release lock and throw - user must stop the processor first
157-
throw new InvalidOperationException(
158-
"Cannot import leases while the ChangeFeedProcessor is running. " +
159-
"Please call StopAsync() before importing leases.");
150+
if (!this.initialized)
151+
{
152+
await this.InitializeAsync().ConfigureAwait(false);
153+
}
160154
}
161-
162-
// Initialize if needed to access the lease container
163-
if (!this.initialized)
155+
finally
164156
{
165-
await this.InitializeAsync().ConfigureAwait(false);
157+
this.runningLock.Release();
166158
}
159+
}
167160

168-
DefaultTrace.TraceInformation("Importing {0} leases (overwriteExisting={1})...", leases.Count, overwriteExisting);
169-
await this.documentServiceLeaseStoreManager
170-
.LeaseContainer
171-
.ImportLeasesAsync(leases, this.instanceName, overwriteExisting, cancellationToken)
172-
.ConfigureAwait(false);
161+
DefaultTrace.TraceInformation("Importing {0} leases (overwriteExisting={1})...", leases.Count, overwriteExisting);
162+
await this.documentServiceLeaseStoreManager
163+
.LeaseContainer
164+
.ImportLeasesAsync(leases, overwriteExisting, cancellationToken)
165+
.ConfigureAwait(false);
173166

174-
DefaultTrace.TraceInformation("Imported {0} leases.", leases.Count);
175-
}
176-
finally
177-
{
178-
this.runningLock.Release();
179-
}
167+
DefaultTrace.TraceInformation("Imported {0} leases.", leases.Count);
180168
}
181169

182170
private async Task InitializeAsync()

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseExportData.cs

Lines changed: 0 additions & 153 deletions
This file was deleted.

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainer.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
66
{
77
using System.Collections.Generic;
8+
using System.Text.Json;
89
using System.Threading;
910
using System.Threading.Tasks;
1011

@@ -26,26 +27,22 @@ internal abstract class DocumentServiceLeaseContainer
2627
public abstract Task<IEnumerable<DocumentServiceLease>> GetOwnedLeasesAsync();
2728

2829
/// <summary>
29-
/// Exports all leases to a list of <see cref="LeaseExportData"/> objects.
30+
/// Exports all leases as JSON elements.
3031
/// </summary>
31-
/// <param name="exportedBy">The name of the instance performing the export.</param>
3232
/// <param name="cancellationToken">A cancellation token to observe.</param>
33-
/// <returns>A list of exported lease data.</returns>
34-
public abstract Task<IReadOnlyList<LeaseExportData>> ExportLeasesAsync(
35-
string exportedBy,
33+
/// <returns>A list of lease objects as JSON elements.</returns>
34+
public abstract Task<IReadOnlyList<JsonElement>> ExportLeasesAsync(
3635
CancellationToken cancellationToken = default);
3736

3837
/// <summary>
39-
/// Imports leases from a list of <see cref="LeaseExportData"/> objects.
38+
/// Imports leases from a list of JSON elements.
4039
/// </summary>
41-
/// <param name="leases">The list of lease data to import.</param>
42-
/// <param name="importedBy">The name of the instance performing the import.</param>
40+
/// <param name="leases">The list of lease objects as JSON elements to import.</param>
4341
/// <param name="overwriteExisting">Whether to overwrite existing leases with the same ID.</param>
4442
/// <param name="cancellationToken">A cancellation token to observe.</param>
4543
/// <returns>A task representing the asynchronous operation.</returns>
4644
public abstract Task ImportLeasesAsync(
47-
IReadOnlyList<LeaseExportData> leases,
48-
string importedBy,
45+
IReadOnlyList<JsonElement> leases,
4946
bool overwriteExisting = false,
5047
CancellationToken cancellationToken = default);
5148
}

0 commit comments

Comments
 (0)