Skip to content

Commit c4cace5

Browse files
tvaron3Copilot
andcommitted
Add backup endpoint fallback for Cosmos client
Add with_backup_endpoints() API to support fallback endpoints when the primary global endpoint is unavailable. Endpoints are tried in order during initialization and steady-state metadata refreshes. Driver crate: - AccountReference: add backup_endpoints field, builder method, accessor - CosmosDriver: fallback in probe_http_version and refresh_account_properties SDK crate: - CosmosClientBuilder::with_backup_endpoints() public API - GlobalEndpointManager: fallback in get_database_account() - Integration tests for boot-via-backup and post-boot operations Fixes Azure#4099 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent a79ab96 commit c4cace5

10 files changed

Lines changed: 393 additions & 11 deletions

File tree

sdk/cosmos/azure_data_cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Features Added
66

7+
- Added `CosmosClientBuilder::with_backup_endpoints()` for specifying fallback endpoints when the primary global endpoint is unavailable during initialization and account metadata refreshes. ([#4099](https://github.com/Azure/azure-sdk-for-rust/issues/4099))
78
- Added `CosmosClientBuilder::with_proxy_allowed(bool)` for explicit opt-in to HTTP proxy usage with documented support limitations. ([#4062](https://github.com/Azure/azure-sdk-for-rust/pull/4062))
89
- Added `CustomResponseBuilder` and `FaultInjectionRule::hit_count()` APIs for fault injection, enabling ergonomic construction of synthetic HTTP responses and test verification of rule activation counts. ([#3888](https://github.com/Azure/azure-sdk-for-rust/pull/3888))
910

sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client_builder.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ pub struct CosmosClientBuilder {
8787
/// Fault injection builder for testing error handling
8888
#[cfg(feature = "fault_injection")]
8989
fault_injection_builder: Option<crate::fault_injection::FaultInjectionClientBuilder>,
90+
/// Fallback endpoints tried when the primary endpoint is unavailable.
91+
backup_endpoints: Vec<azure_core::http::Url>,
9092
}
9193

9294
impl CosmosClientBuilder {
@@ -162,6 +164,22 @@ impl CosmosClientBuilder {
162164
self
163165
}
164166

167+
/// Sets backup endpoints that the client will try when the primary global
168+
/// endpoint is unavailable.
169+
///
170+
/// During initialization and periodic account metadata refreshes, if the
171+
/// primary endpoint fails, the SDK will try each backup endpoint in order
172+
/// until one succeeds. A successful connection allows normal service
173+
/// discovery to proceed.
174+
///
175+
/// # Arguments
176+
///
177+
/// * `endpoints` - Ordered list of fallback endpoint URLs.
178+
pub fn with_backup_endpoints(mut self, endpoints: Vec<crate::CosmosAccountEndpoint>) -> Self {
179+
self.backup_endpoints = endpoints.into_iter().map(|e| e.into_url()).collect();
180+
self
181+
}
182+
165183
/// Builds the [`CosmosClient`] with the specified account reference and region selection strategy.
166184
///
167185
/// The account reference bundles an endpoint and credential. You can create one using
@@ -317,6 +335,7 @@ impl CosmosClientBuilder {
317335
preferred_regions,
318336
Vec::new(),
319337
pipeline_core.clone(),
338+
self.backup_endpoints.clone(),
320339
);
321340

322341
// Enable per-partition circuit breaker based on the
@@ -364,7 +383,8 @@ impl CosmosClientBuilder {
364383
// TODO: Each CosmosClient currently creates its own CosmosDriverRuntime. The runtime
365384
// should be shared across clients targeting the same account to avoid duplicate
366385
// background tasks and connection pools. See https://github.com/Azure/azure-sdk-for-rust/issues/3908
367-
let driver_account = build_driver_account(endpoint, driver_credential);
386+
let driver_account =
387+
build_driver_account(endpoint, driver_credential, self.backup_endpoints);
368388
#[allow(unused_mut)]
369389
let mut driver_runtime_builder = CosmosDriverRuntimeBuilder::new();
370390
#[cfg(feature = "allow_invalid_certificates")]
@@ -401,15 +421,21 @@ impl CosmosClientBuilder {
401421
fn build_driver_account(
402422
endpoint: azure_core::http::Url,
403423
credential: CosmosCredential,
424+
backup_endpoints: Vec<azure_core::http::Url>,
404425
) -> azure_data_cosmos_driver::models::AccountReference {
405-
match credential {
426+
let base = match credential {
406427
CosmosCredential::TokenCredential(tc) => {
407428
azure_data_cosmos_driver::models::AccountReference::with_credential(endpoint, tc)
408429
}
409430
#[cfg(feature = "key_auth")]
410431
CosmosCredential::MasterKey(key) => {
411432
azure_data_cosmos_driver::models::AccountReference::with_master_key(endpoint, key)
412433
}
434+
};
435+
if backup_endpoints.is_empty() {
436+
base
437+
} else {
438+
base.with_backup_endpoints(backup_endpoints)
413439
}
414440
}
415441

sdk/cosmos/azure_data_cosmos/src/retry_policies/client_retry_policy.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ mod tests {
766766
vec![Region::from("West US"), Region::from("East US")],
767767
vec![],
768768
pipeline,
769+
vec![],
769770
)
770771
}
771772

@@ -784,6 +785,7 @@ mod tests {
784785
vec![],
785786
vec![],
786787
pipeline,
788+
vec![],
787789
)
788790
}
789791

@@ -802,6 +804,7 @@ mod tests {
802804
vec![Region::EAST_ASIA, Region::WEST_US, Region::NORTH_CENTRAL_US],
803805
vec![],
804806
pipeline,
807+
vec![],
805808
)
806809
}
807810

@@ -891,6 +894,7 @@ mod tests {
891894
vec![Region::from("West US"), Region::from("East US")],
892895
vec![],
893896
pipeline,
897+
vec![],
894898
);
895899

896900
let west = AccountRegion {

sdk/cosmos/azure_data_cosmos/src/retry_policies/metadata_request_retry_policy.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ mod tests {
291291
vec![Region::from("West US"), Region::from("East US")],
292292
vec![],
293293
pipeline,
294+
vec![],
294295
)
295296
}
296297

@@ -309,6 +310,7 @@ mod tests {
309310
vec![],
310311
vec![],
311312
pipeline,
313+
vec![],
312314
)
313315
}
314316

@@ -327,6 +329,7 @@ mod tests {
327329
vec![Region::EAST_ASIA, Region::WEST_US, Region::NORTH_CENTRAL_US],
328330
vec![],
329331
pipeline,
332+
vec![],
330333
)
331334
}
332335

sdk/cosmos/azure_data_cosmos/src/routing/global_endpoint_manager.rs

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ pub(crate) struct GlobalEndpointManager {
3535
/// The primary default endpoint URL for the Cosmos DB account
3636
default_endpoint: Url,
3737

38+
/// Fallback endpoints tried when the primary endpoint is unavailable
39+
backup_endpoints: Vec<Url>,
40+
3841
/// Thread-safe cache of location information including read/write endpoints and availability status
3942
location_cache: Mutex<LocationCache>,
4043

@@ -64,6 +67,7 @@ impl Debug for GlobalEndpointManager {
6467
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6568
f.debug_struct("GlobalEndpointManager")
6669
.field("default_endpoint", &self.default_endpoint)
70+
.field("backup_endpoints", &self.backup_endpoints)
6771
.field("location_cache", &self.location_cache)
6872
.field("pipeline", &self.pipeline)
6973
.field("account_properties_cache", &self.account_properties_cache)
@@ -87,6 +91,7 @@ impl GlobalEndpointManager {
8791
/// * `preferred_locations` - Ordered list of preferred Azure regions for request routing
8892
/// * `excluded_regions` - List of regions to exclude from routing
8993
/// * `pipeline` - HTTP pipeline for making service requests
94+
/// * `backup_endpoints` - Ordered fallback endpoint URLs tried when the primary endpoint is unavailable
9095
///
9196
/// # Returns
9297
/// A new `GlobalEndpointManager` instance ready for request routing
@@ -95,6 +100,7 @@ impl GlobalEndpointManager {
95100
preferred_locations: Vec<Region>,
96101
excluded_regions: Vec<Region>,
97102
pipeline: Pipeline,
103+
backup_endpoints: Vec<Url>,
98104
) -> Arc<Self> {
99105
let location_cache = Mutex::new(LocationCache::new(
100106
default_endpoint.clone(),
@@ -108,6 +114,7 @@ impl GlobalEndpointManager {
108114

109115
let instance = Arc::new(Self {
110116
default_endpoint,
117+
backup_endpoints,
111118
location_cache,
112119
pipeline,
113120
account_properties_cache,
@@ -442,14 +449,65 @@ impl GlobalEndpointManager {
442449
/// # Returns
443450
/// `Ok(Response<AccountProperties>)` with account metadata, or `Err` if request failed
444451
pub async fn get_database_account(&self) -> azure_core::Result<Response<AccountProperties>> {
452+
match self.get_database_account_from_endpoint(None).await {
453+
Ok(response) => Ok(response),
454+
Err(primary_error) if !self.backup_endpoints.is_empty() => {
455+
tracing::warn!(
456+
endpoint = %self.default_endpoint,
457+
error = %primary_error,
458+
"primary endpoint account fetch failed; trying backup endpoints"
459+
);
460+
461+
for backup_url in &self.backup_endpoints {
462+
match self
463+
.get_database_account_from_endpoint(Some(backup_url.clone()))
464+
.await
465+
{
466+
Ok(response) => {
467+
tracing::info!(
468+
backup_endpoint = %backup_url,
469+
"backup endpoint account fetch succeeded"
470+
);
471+
return Ok(response);
472+
}
473+
Err(e) => {
474+
tracing::warn!(
475+
backup_endpoint = %backup_url,
476+
error = %e,
477+
"backup endpoint account fetch failed; trying next"
478+
);
479+
}
480+
}
481+
}
482+
483+
tracing::error!(
484+
endpoint = %self.default_endpoint,
485+
backup_count = self.backup_endpoints.len(),
486+
"all endpoints exhausted during account properties fetch"
487+
);
488+
Err(primary_error)
489+
}
490+
Err(error) => Err(error),
491+
}
492+
}
493+
494+
/// Fetches database account properties from a specific endpoint.
495+
///
496+
/// If `endpoint_override` is `None`, uses the endpoint resolved from the
497+
/// location cache. Otherwise uses the provided endpoint directly.
498+
async fn get_database_account_from_endpoint(
499+
&self,
500+
endpoint_override: Option<Url>,
501+
) -> azure_core::Result<Response<AccountProperties>> {
445502
let resource_link = ResourceLink::root(ResourceType::DatabaseAccount);
446503
let builder = CosmosRequest::builder(OperationType::Read, resource_link.clone());
447504
let mut cosmos_request = builder.build()?;
448-
let endpoint = self
449-
.location_cache
450-
.lock()
451-
.unwrap()
452-
.resolve_service_endpoint(&cosmos_request);
505+
let endpoint = endpoint_override.unwrap_or_else(|| {
506+
self.location_cache
507+
.lock()
508+
.unwrap()
509+
.resolve_service_endpoint(&cosmos_request)
510+
});
453511
cosmos_request.request_context.location_endpoint_to_route = Some(endpoint);
454512
let ctx_owned = Context::default().with_value(resource_link);
455513
self.pipeline
@@ -575,6 +633,7 @@ mod tests {
575633
vec![Region::from("West US"), Region::from("East US")],
576634
vec![],
577635
create_test_pipeline(),
636+
vec![],
578637
)
579638
}
580639

sdk/cosmos/azure_data_cosmos/src/routing/global_partition_endpoint_manager.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,6 +1136,7 @@ mod tests {
11361136
vec![Region::from("West US")],
11371137
vec![],
11381138
create_test_pipeline(),
1139+
vec![],
11391140
)
11401141
}
11411142

@@ -1145,6 +1146,7 @@ mod tests {
11451146
vec![Region::from("West US"), Region::from("East US")],
11461147
vec![],
11471148
create_test_pipeline(),
1149+
vec![],
11481150
);
11491151

11501152
let west = AccountRegion {
@@ -1170,6 +1172,7 @@ mod tests {
11701172
],
11711173
vec![],
11721174
create_test_pipeline(),
1175+
vec![],
11731176
);
11741177

11751178
let west = AccountRegion {
@@ -1202,6 +1205,7 @@ mod tests {
12021205
vec![Region::from("West US"), Region::from("East US")],
12031206
vec![],
12041207
create_test_pipeline(),
1208+
vec![],
12051209
);
12061210

12071211
let west = AccountRegion {
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
//! Integration tests for backup endpoint fallback.
5+
//! Verifies that the SDK can boot using a backup endpoint when the global endpoint is unavailable.
6+
7+
#![cfg(feature = "key_auth")]
8+
9+
use super::framework;
10+
11+
use azure_data_cosmos::{
12+
ConnectionString, CosmosAccountEndpoint, CosmosAccountReference, CosmosClient, RoutingStrategy,
13+
};
14+
use framework::{TestClient, CONNECTION_STRING_ENV_VAR, HUB_REGION};
15+
use std::error::Error;
16+
17+
/// Tests that the SDK can initialize when the global endpoint is unreachable
18+
/// but a valid backup endpoint is provided.
19+
///
20+
/// The test uses a RFC 5737 TEST-NET address (guaranteed non-routable) as the
21+
/// global endpoint and sets the real account endpoint as a backup. The client
22+
/// should fail to reach the global endpoint, fall back to the backup endpoint,
23+
/// and initialize successfully.
24+
#[tokio::test]
25+
async fn client_boots_via_backup_when_global_endpoint_unreachable() -> Result<(), Box<dyn Error>> {
26+
TestClient::run(async |_run_context| {
27+
let Ok(env_var) = std::env::var(CONNECTION_STRING_ENV_VAR) else {
28+
eprintln!("Skipping: no connection string");
29+
return Ok(());
30+
};
31+
32+
let connection_string: ConnectionString = env_var.parse()?;
33+
let real_endpoint: CosmosAccountEndpoint = connection_string.account_endpoint.parse()?;
34+
35+
// RFC 5737 TEST-NET address — guaranteed non-routable for deterministic failure.
36+
let fake_endpoint: CosmosAccountEndpoint = "https://192.0.2.1:443/".parse()?;
37+
38+
let client = CosmosClient::builder()
39+
.with_backup_endpoints(vec![real_endpoint])
40+
.build(
41+
CosmosAccountReference::with_master_key(
42+
fake_endpoint,
43+
connection_string.account_key.clone(),
44+
),
45+
RoutingStrategy::ProximityTo(HUB_REGION),
46+
)
47+
.await;
48+
49+
assert!(
50+
client.is_ok(),
51+
"client should initialize via backup endpoint, but got: {:?}",
52+
client.err()
53+
);
54+
55+
Ok(())
56+
})
57+
.await
58+
}
59+
60+
/// Tests that the SDK can list databases after initializing via a backup endpoint.
61+
#[tokio::test]
62+
async fn operations_work_after_backup_endpoint_boot() -> Result<(), Box<dyn Error>> {
63+
TestClient::run(async |_run_context| {
64+
let Ok(env_var) = std::env::var(CONNECTION_STRING_ENV_VAR) else {
65+
eprintln!("Skipping: no connection string");
66+
return Ok(());
67+
};
68+
69+
let connection_string: ConnectionString = env_var.parse()?;
70+
let real_endpoint: CosmosAccountEndpoint = connection_string.account_endpoint.parse()?;
71+
72+
let fake_endpoint: CosmosAccountEndpoint = "https://192.0.2.1:443/".parse()?;
73+
74+
let client = CosmosClient::builder()
75+
.with_backup_endpoints(vec![real_endpoint])
76+
.build(
77+
CosmosAccountReference::with_master_key(
78+
fake_endpoint,
79+
connection_string.account_key.clone(),
80+
),
81+
RoutingStrategy::ProximityTo(HUB_REGION),
82+
)
83+
.await?;
84+
85+
// Verify the client can list databases (a basic read operation)
86+
use futures::TryStreamExt;
87+
let query = azure_data_cosmos::Query::from("SELECT * FROM root r");
88+
let mut pager = client.query_databases(query, None)?;
89+
let page = pager.try_next().await;
90+
assert!(
91+
page.is_ok(),
92+
"should be able to list databases after backup boot: {:?}",
93+
page.err()
94+
);
95+
96+
Ok(())
97+
})
98+
.await
99+
}

sdk/cosmos/azure_data_cosmos/tests/emulator_tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
3+
mod cosmos_backup_endpoints;
34
mod cosmos_batch;
45
mod cosmos_containers;
56
mod cosmos_databases;

0 commit comments

Comments
 (0)