Skip to content

Commit cc61cc3

Browse files
committed
Update implementation
1 parent 2ca5b2a commit cc61cc3

3 files changed

Lines changed: 171 additions & 136 deletions

File tree

Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerCosmos.cs

Lines changed: 117 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
1313
using System.Threading.Tasks;
1414
using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions;
1515
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
16+
using Microsoft.Azure.Documents;
1617

1718
internal sealed class DocumentServiceLeaseContainerCosmos : DocumentServiceLeaseContainer
1819
{
@@ -50,27 +51,58 @@ public override async Task<IEnumerable<DocumentServiceLease>> GetOwnedLeasesAsyn
5051
public override async Task<IReadOnlyList<JsonElement>> ExportLeasesAsync(
5152
CancellationToken cancellationToken = default)
5253
{
53-
IReadOnlyList<DocumentServiceLease> allLeases = await this.GetAllLeasesAsync().ConfigureAwait(false);
54-
5554
List<JsonElement> exportedLeases = new List<JsonElement>();
5655

57-
foreach (DocumentServiceLease lease in allLeases)
56+
string prefix = this.options.ContainerNamePrefix;
57+
string query = "SELECT * FROM c WHERE STARTSWITH(c.id, '" + prefix + "')";
58+
59+
using FeedIterator iterator = this.container.GetItemQueryStreamIterator(
60+
query,
61+
continuationToken: null,
62+
requestOptions: queryRequestOptions);
63+
64+
while (iterator.HasMoreResults)
5865
{
5966
cancellationToken.ThrowIfCancellationRequested();
6067

61-
// Only support EPK-based leases for export
62-
if (lease is not DocumentServiceLeaseCoreEpk)
68+
using (ResponseMessage responseMessage = await iterator.ReadNextAsync().ConfigureAwait(false))
6369
{
64-
throw new LeaseOperationNotSupportedException(lease, "ExportLeases");
65-
}
70+
responseMessage.EnsureSuccessStatusCode();
6671

67-
using (Stream stream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream(lease))
68-
using (StreamReader reader = new StreamReader(stream))
69-
{
70-
string payload = await reader.ReadToEndAsync().ConfigureAwait(false);
71-
using (JsonDocument doc = JsonDocument.Parse(payload))
72+
using (JsonDocument feedResponse = await JsonDocument.ParseAsync(responseMessage.Content).ConfigureAwait(false))
7273
{
73-
exportedLeases.Add(doc.RootElement.Clone());
74+
if (!feedResponse.RootElement.TryGetProperty("Documents", out JsonElement documents))
75+
{
76+
continue;
77+
}
78+
79+
foreach (JsonElement docElement in documents.EnumerateArray())
80+
{
81+
if (!docElement.TryGetProperty("id", out JsonElement idElement)
82+
|| idElement.ValueKind != JsonValueKind.String)
83+
{
84+
continue;
85+
}
86+
87+
string id = idElement.GetString();
88+
89+
if (!this.IsMetadataDocumentId(id))
90+
{
91+
// Regular lease - deserialize, validate FeedRange
92+
string raw = docElement.GetRawText();
93+
using (MemoryStream ms = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(raw)))
94+
{
95+
DocumentServiceLease lease = CosmosContainerExtensions.DefaultJsonSerializer.FromStream<DocumentServiceLease>(ms);
96+
97+
if (lease.FeedRange == null)
98+
{
99+
throw new LeaseOperationNotSupportedException(lease, "ExportLeases");
100+
}
101+
}
102+
}
103+
104+
exportedLeases.Add(docElement.Clone());
105+
}
74106
}
75107
}
76108
}
@@ -97,35 +129,83 @@ public override async Task ImportLeasesAsync(
97129
continue;
98130
}
99131

100-
string payloadJson = leaseElement.GetRawText();
101-
using (MemoryStream stream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(payloadJson)))
102-
{
103-
DocumentServiceLease lease = CosmosContainerExtensions.DefaultJsonSerializer.FromStream<DocumentServiceLease>(stream);
104-
if (lease == null)
105-
{
106-
continue;
107-
}
132+
// Detect metadata document from the raw JSON before deserializing
133+
bool isMetadataDocument = leaseElement.TryGetProperty("id", out JsonElement idElement)
134+
&& this.IsMetadataDocumentId(idElement.GetString());
108135

109-
// Only support EPK-based leases for import
110-
if (lease is not DocumentServiceLeaseCoreEpk)
111-
{
112-
throw new LeaseOperationNotSupportedException(lease, "ImportLeases");
113-
}
136+
if (isMetadataDocument)
137+
{
138+
// Metadata document (.info, .lock) - import raw JSON to preserve original structure
139+
string id = idElement.GetString();
140+
Microsoft.Azure.Cosmos.PartitionKey pk = new Microsoft.Azure.Cosmos.PartitionKey(id);
141+
byte[] bytes = System.Text.Encoding.UTF8.GetBytes(leaseElement.GetRawText());
114142

115-
if (overwriteExisting)
143+
using (MemoryStream stream = new MemoryStream(bytes))
116144
{
117-
// Use upsert to create or replace
118-
await this.UpsertLeaseAsync(lease).ConfigureAwait(false);
145+
if (overwriteExisting)
146+
{
147+
using (ResponseMessage response = await this.container.UpsertItemStreamAsync(stream, pk).ConfigureAwait(false))
148+
{
149+
response.EnsureSuccessStatusCode();
150+
}
151+
}
152+
else
153+
{
154+
using (ResponseMessage response = await this.container.CreateItemStreamAsync(stream, pk).ConfigureAwait(false))
155+
{
156+
if (response.StatusCode != HttpStatusCode.Conflict)
157+
{
158+
response.EnsureSuccessStatusCode();
159+
}
160+
}
161+
}
119162
}
120-
else
163+
}
164+
else
165+
{
166+
// Regular lease - deserialize and validate
167+
string payloadJson = leaseElement.GetRawText();
168+
using (MemoryStream stream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(payloadJson)))
121169
{
122-
// Try to create, ignore if already exists
123-
await this.TryCreateLeaseAsync(lease).ConfigureAwait(false);
170+
DocumentServiceLease lease = CosmosContainerExtensions.DefaultJsonSerializer.FromStream<DocumentServiceLease>(stream);
171+
if (lease == null)
172+
{
173+
continue;
174+
}
175+
176+
if (lease.FeedRange == null)
177+
{
178+
throw new LeaseOperationNotSupportedException(lease, "ImportLeases");
179+
}
180+
181+
if (overwriteExisting)
182+
{
183+
await this.UpsertLeaseAsync(lease).ConfigureAwait(false);
184+
}
185+
else
186+
{
187+
await this.TryCreateLeaseAsync(lease).ConfigureAwait(false);
188+
}
124189
}
125190
}
126191
}
127192
}
128193

194+
/// <summary>
195+
/// Determines if a document ID belongs to a metadata document.
196+
/// Metadata documents are system artifacts like ".info" and ".lock" documents.
197+
/// </summary>
198+
private bool IsMetadataDocumentId(string id)
199+
{
200+
if (string.IsNullOrEmpty(id))
201+
{
202+
return false;
203+
}
204+
205+
return id.EndsWith(".info", StringComparison.Ordinal)
206+
|| id.EndsWith(".lock", StringComparison.Ordinal);
207+
}
208+
129209
private async Task<IReadOnlyList<DocumentServiceLease>> ListDocumentsAsync(string prefix)
130210
{
131211
if (string.IsNullOrEmpty(prefix))
@@ -153,7 +233,7 @@ private async Task<IReadOnlyList<DocumentServiceLease>> ListDocumentsAsync(strin
153233

154234
private async Task<bool> TryCreateLeaseAsync(DocumentServiceLease lease)
155235
{
156-
PartitionKey partitionKey = this.GetPartitionKey(lease);
236+
Microsoft.Azure.Cosmos.PartitionKey partitionKey = this.GetPartitionKey(lease);
157237
ItemResponse<DocumentServiceLease> response = await this.container.TryCreateItemAsync(
158238
partitionKey,
159239
lease).ConfigureAwait(false);
@@ -163,7 +243,7 @@ private async Task<bool> TryCreateLeaseAsync(DocumentServiceLease lease)
163243

164244
private async Task UpsertLeaseAsync(DocumentServiceLease lease)
165245
{
166-
PartitionKey partitionKey = this.GetPartitionKey(lease);
246+
Microsoft.Azure.Cosmos.PartitionKey partitionKey = this.GetPartitionKey(lease);
167247

168248
using (System.IO.Stream itemStream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream(lease))
169249
{
@@ -176,15 +256,15 @@ private async Task UpsertLeaseAsync(DocumentServiceLease lease)
176256
}
177257
}
178258

179-
private PartitionKey GetPartitionKey(DocumentServiceLease lease)
259+
private Microsoft.Azure.Cosmos.PartitionKey GetPartitionKey(DocumentServiceLease lease)
180260
{
181261
// If the lease has a partition key, use it; otherwise use the lease ID
182262
if (!string.IsNullOrEmpty(lease.PartitionKey))
183263
{
184-
return new PartitionKey(lease.PartitionKey);
264+
return new Microsoft.Azure.Cosmos.PartitionKey(lease.PartitionKey);
185265
}
186266

187-
return new PartitionKey(lease.Id);
267+
return new Microsoft.Azure.Cosmos.PartitionKey(lease.Id);
188268
}
189269
}
190270
}

Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerCosmosTests.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public async Task ImportLeasesAsync_WithNonEpkLeases_ThrowsUnsupportedException(
123123
await leaseContainer.ImportLeasesAsync(leasesToImport, overwriteExisting: false);
124124
}
125125

126-
private static Container GetMockedContainer(string containerName = "myColl")
126+
private static Container GetMockedContainer()
127127
{
128128
Headers headers = new Headers
129129
{
@@ -155,6 +155,13 @@ private static Container GetMockedContainer(string containerName = "myColl")
155155
It.IsAny<QueryRequestOptions>()))
156156
.Returns(() => mockedQuery.Object);
157157

158+
// Export uses ContainerNamePrefix (broader query to include metadata documents)
159+
mockedItems.Setup(i => i.GetItemQueryStreamIterator(
160+
It.Is<string>(value => string.Equals("SELECT * FROM c WHERE STARTSWITH(c.id, '" + DocumentServiceLeaseContainerCosmosTests.leaseStoreManagerSettings.ContainerNamePrefix + "')", value)),
161+
It.IsAny<string>(),
162+
It.IsAny<QueryRequestOptions>()))
163+
.Returns(() => mockedQuery.Object);
164+
158165
return mockedItems.Object;
159166
}
160167

0 commit comments

Comments
 (0)