Skip to content

feat(rust): implement idempotency-aware retry logic [PECOBLR-2091]#370

Open
vikrantpuppala wants to merge 1 commit intomainfrom
stack/PECOBLR-2091-retry-logic-impl
Open

feat(rust): implement idempotency-aware retry logic [PECOBLR-2091]#370
vikrantpuppala wants to merge 1 commit intomainfrom
stack/PECOBLR-2091-retry-logic-impl

Conversation

@vikrantpuppala
Copy link
Copy Markdown
Collaborator

@vikrantpuppala vikrantpuppala commented Mar 25, 2026

🥞 Stacked PR

Use this link to review incremental changes.


Summary

Implements the standardized DBSQL connector retry specification in the Rust ADBC driver, replacing the previous one-size-fits-all retry logic.

  • RequestType enum as single source of truth for request classification — callers pass one value, HTTP client resolves category (for config) and idempotency (for strategy) internally
  • Idempotent strategy: retries everything except known non-retryable codes (400, 401, 403, 404, etc.) and all network errors
  • Non-idempotent strategy (ExecuteStatement only): retries only on connection errors (request never reached server) and 429/503
  • Backoff: Retry-After header honored first, exponential backoff fallback, both clamped to [min_wait, max_wait], with random jitter (50–750ms)
  • Overall timeout: cumulative 900s default, stops retrying even if max_retries not reached
  • Per-category config: SEA, CloudFetch, Auth each get their own RetryConfig with global defaults
  • Metadata queries (list_catalogs, list_schemas, etc.) classified as idempotent via ExecuteMetadataQuery

Design doc: #369

Test plan

  • 30+ unit tests for RequestType mapping, retry strategies, backoff calculation, config overrides
  • All 288 existing tests pass (0 failures)
  • cargo clippy --all-targets -- -D warnings clean
  • cargo +stable fmt --all clean
  • CI passes

This pull request was AI-assisted by Isaac.

@vikrantpuppala vikrantpuppala force-pushed the stack/PECOBLR-2091-retry-logic-impl branch from 8b81b96 to 3855476 Compare March 25, 2026 11:59
@vikrantpuppala
Copy link
Copy Markdown
Collaborator Author

Range-diff: main (8b81b96 -> 3855476)
rust/src/client/http.rs
@@ -109,34 +109,30 @@
 -    /// - 429 Too Many Requests
 -    /// - 503 Service Unavailable
 -    /// - 504 Gateway Timeout
--    ///
--    /// Non-retryable errors are returned immediately.
--    pub async fn execute(&self, request: Request) -> Result<Response> {
--        self.execute_impl(request, true).await
 +    /// Returns the retry config for the given request category.
 +    fn retry_config(&self, category: RequestCategory) -> &RetryConfig {
 +        self.retry_configs
 +            .get(&category)
 +            .expect("All categories should have retry configs")
-     }
- 
--    /// Execute a request without authentication (for CloudFetch downloads).
++    }
++
 +    /// Execute an HTTP request with authentication and idempotency-aware retry.
      ///
--    /// CloudFetch presigned URLs don't need our auth header - they have
--    /// their own authentication built into the URL or custom headers.
--    pub async fn execute_without_auth(&self, request: Request) -> Result<Response> {
--        self.execute_impl(request, false).await
+-    /// Non-retryable errors are returned immediately.
+-    pub async fn execute(&self, request: Request) -> Result<Response> {
+-        self.execute_impl(request, true).await
 +    /// The `RequestType` determines which `RetryConfig` and `RetryStrategy` to use.
 +    pub async fn execute(&self, request: Request, request_type: RequestType) -> Result<Response> {
 +        self.execute_impl(request, true, request_type).await
      }
  
--    /// Internal implementation of execute with configurable auth.
--    async fn execute_impl(&self, request: Request, with_auth: bool) -> Result<Response> {
--        let mut attempts = 0;
+-    /// Execute a request without authentication (for CloudFetch downloads).
 +    /// Execute a request without authentication (for CloudFetch, OAuth endpoints).
-+    ///
+     ///
+-    /// CloudFetch presigned URLs don't need our auth header - they have
+-    /// their own authentication built into the URL or custom headers.
+-    pub async fn execute_without_auth(&self, request: Request) -> Result<Response> {
+-        self.execute_impl(request, false).await
 +    /// Same retry logic as `execute()`, just skips the Authorization header.
 +    pub async fn execute_without_auth(
 +        &self,
@@ -144,8 +140,11 @@
 +        request_type: RequestType,
 +    ) -> Result<Response> {
 +        self.execute_impl(request, false, request_type).await
-+    }
-+
+     }
+ 
+-    /// Internal implementation of execute with configurable auth.
+-    async fn execute_impl(&self, request: Request, with_auth: bool) -> Result<Response> {
+-        let mut attempts = 0;
 +    /// Internal implementation of execute with configurable auth and retry strategy.
 +    async fn execute_impl(
 +        &self,
@@ -534,4 +533,514 @@
 +        let client = DatabricksHttpClient::new(config, default_retry_configs());
          assert!(client.is_ok());
      }
- 
\ No newline at end of file
+ 
+         assert!(debug_output.contains("[REDACTED]"));
+         assert!(!debug_output.contains("secret123"));
+     }
++
++    // --- Retry integration tests (with wiremock) ---
++
++    mod retry_integration {
++        use super::*;
++        use crate::client::retry::{
++            build_retry_configs, RequestCategory, RequestType, RetryConfig, RetryConfigOverrides,
++        };
++        use std::collections::HashSet;
++        use std::sync::atomic::{AtomicU32, Ordering};
++        use wiremock::matchers::{method, path};
++        use wiremock::{Mock, MockServer, ResponseTemplate};
++
++        /// Build a client pointed at the mock server with short retry waits for fast tests.
++        fn test_client(
++            mock_server: &MockServer,
++            retry_config: RetryConfig,
++        ) -> DatabricksHttpClient {
++            let mut overrides = HashMap::new();
++            let ovr = RetryConfigOverrides {
++                min_wait: Some(retry_config.min_wait),
++                max_wait: Some(retry_config.max_wait),
++                overall_timeout: Some(retry_config.overall_timeout),
++                max_retries: Some(retry_config.max_retries),
++                override_retryable_codes: retry_config.override_retryable_codes.clone(),
++            };
++            overrides.insert(RequestCategory::Sea, ovr.clone());
++            overrides.insert(RequestCategory::CloudFetch, ovr.clone());
++            overrides.insert(RequestCategory::Auth, ovr);
++
++            let global = RetryConfig::default();
++            let retry_configs = build_retry_configs(&global, &overrides);
++
++            let config = HttpClientConfig::default();
++            let client = DatabricksHttpClient::new(config, retry_configs).unwrap();
++            let auth = Arc::new(PersonalAccessToken::new("test-token".to_string()));
++            client.set_auth_provider(auth);
++
++            // Override the client's base URL by using the mock_server URI directly in requests
++            let _ = mock_server; // used by caller to build request URLs
++            client
++        }
++
++        /// Short retry config for fast tests (10ms waits instead of 1s).
++        fn fast_retry_config(max_retries: u32) -> RetryConfig {
++            RetryConfig {
++                min_wait: Duration::from_millis(10),
++                max_wait: Duration::from_millis(50),
++                overall_timeout: Duration::from_secs(10),
++                max_retries,
++                override_retryable_codes: None,
++            }
++        }
++
++        #[tokio::test]
++        async fn test_idempotent_request_succeeds_on_retry_after_503() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Return 503 twice, then 200
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    let n = counter.fetch_add(1, Ordering::SeqCst);
++                    if n < 2 {
++                        ResponseTemplate::new(503)
++                    } else {
++                        ResponseTemplate::new(200).set_body_string("ok")
++                    }
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(5));
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let response = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            assert!(response.is_ok());
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
++        }
++
++        #[tokio::test]
++        async fn test_idempotent_request_fails_after_max_retries() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Always return 503
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    counter.fetch_add(1, Ordering::SeqCst);
++                    ResponseTemplate::new(503)
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let result = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            assert!(result.is_err());
++            // max_retries=3 means 4 total attempts (1 initial + 3 retries)
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 4);
++        }
++
++        #[tokio::test]
++        async fn test_idempotent_request_retries_500() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Return 500 once, then 200 — idempotent strategy retries 500
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    let n = counter.fetch_add(1, Ordering::SeqCst);
++                    if n < 1 {
++                        ResponseTemplate::new(500)
++                    } else {
++                        ResponseTemplate::new(200).set_body_string("ok")
++                    }
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let response = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            assert!(response.is_ok());
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 2);
++        }
++
++        #[tokio::test]
++        async fn test_idempotent_no_retry_on_400() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    counter.fetch_add(1, Ordering::SeqCst);
++                    ResponseTemplate::new(400).set_body_string("bad request")
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let result = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            assert!(result.is_err());
++            // 400 is non-retryable — only 1 attempt
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 1);
++        }
++
++        #[tokio::test]
++        async fn test_idempotent_no_retry_on_401() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    counter.fetch_add(1, Ordering::SeqCst);
++                    ResponseTemplate::new(401).set_body_string("unauthorized")
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let result = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            assert!(result.is_err());
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 1);
++        }
++
++        #[tokio::test]
++        async fn test_non_idempotent_no_retry_on_500() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            Mock::given(method("POST"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    counter.fetch_add(1, Ordering::SeqCst);
++                    ResponseTemplate::new(500).set_body_string("server error")
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .post(format!("{}/test", mock_server.uri()))
++                .body("sql query")
++                .build()
++                .unwrap();
++
++            let result = client.execute(request, RequestType::ExecuteStatement).await;
++            assert!(result.is_err());
++            // Non-idempotent: 500 is NOT retryable — only 1 attempt
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 1);
++        }
++
++        #[tokio::test]
++        async fn test_non_idempotent_retries_on_429() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Return 429 once, then 200
++            Mock::given(method("POST"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    let n = counter.fetch_add(1, Ordering::SeqCst);
++                    if n < 1 {
++                        ResponseTemplate::new(429)
++                    } else {
++                        ResponseTemplate::new(200).set_body_string("ok")
++                    }
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .post(format!("{}/test", mock_server.uri()))
++                .body("sql query")
++                .build()
++                .unwrap();
++
++            let response = client.execute(request, RequestType::ExecuteStatement).await;
++            assert!(response.is_ok());
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 2);
++        }
++
++        #[tokio::test]
++        async fn test_non_idempotent_retries_on_503() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Return 503 twice, then 200
++            Mock::given(method("POST"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    let n = counter.fetch_add(1, Ordering::SeqCst);
++                    if n < 2 {
++                        ResponseTemplate::new(503)
++                    } else {
++                        ResponseTemplate::new(200).set_body_string("ok")
++                    }
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .post(format!("{}/test", mock_server.uri()))
++                .body("sql query")
++                .build()
++                .unwrap();
++
++            let response = client.execute(request, RequestType::ExecuteStatement).await;
++            assert!(response.is_ok());
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 3);
++        }
++
++        #[tokio::test]
++        async fn test_retry_after_header_honored() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Return 429 with Retry-After: 0, then 200
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    let n = counter.fetch_add(1, Ordering::SeqCst);
++                    if n < 1 {
++                        ResponseTemplate::new(429).append_header("Retry-After", "0")
++                    } else {
++                        ResponseTemplate::new(200).set_body_string("ok")
++                    }
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let start = std::time::Instant::now();
++            let response = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            let elapsed = start.elapsed();
++
++            assert!(response.is_ok());
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 2);
++            // Retry-After: 0 clamped to min_wait=10ms + jitter(50-750ms)
++            // Should complete quickly (under 2s)
++            assert!(elapsed < Duration::from_secs(2));
++        }
++
++        #[tokio::test]
++        async fn test_overall_timeout_stops_retries() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Always return 503
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    counter.fetch_add(1, Ordering::SeqCst);
++                    ResponseTemplate::new(503)
++                })
++                .mount(&mock_server)
++                .await;
++
++            // overall_timeout=2s with min_wait=500ms + jitter means ~2-3 attempts max
++            let config = RetryConfig {
++                min_wait: Duration::from_millis(500),
++                max_wait: Duration::from_millis(800),
++                overall_timeout: Duration::from_secs(2),
++                max_retries: 100, // high limit — timeout should stop us first
++                override_retryable_codes: None,
++            };
++            let client = test_client(&mock_server, config);
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let result = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            assert!(result.is_err());
++            let err_msg = format!("{:?}", result.unwrap_err());
++            assert!(err_msg.contains("retry timeout exceeded"));
++            // Should have made 2-4 attempts before timeout (not 100)
++            let attempts = attempt_count.load(Ordering::SeqCst);
++            assert!(
++                (2..=5).contains(&attempts),
++                "Expected 2-5 attempts, got {}",
++                attempts
++            );
++        }
++
++        #[tokio::test]
++        async fn test_override_retryable_codes() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Return 418 once, then 200
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    let n = counter.fetch_add(1, Ordering::SeqCst);
++                    if n < 1 {
++                        ResponseTemplate::new(418) // I'm a teapot — not normally retryable
++                    } else {
++                        ResponseTemplate::new(200).set_body_string("ok")
++                    }
++                })
++                .mount(&mock_server)
++                .await;
++
++            // Override: make 418 retryable
++            let mut codes = HashSet::new();
++            codes.insert(418);
++            let config = RetryConfig {
++                min_wait: Duration::from_millis(10),
++                max_wait: Duration::from_millis(50),
++                overall_timeout: Duration::from_secs(10),
++                max_retries: 3,
++                override_retryable_codes: Some(codes),
++            };
++
++            let client = test_client(&mock_server, config);
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let response = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            assert!(response.is_ok());
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 2);
++        }
++
++        #[tokio::test]
++        async fn test_cloudfetch_403_not_retried_at_http_layer() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Return 403 (expired presigned URL) — should NOT be retried at HTTP layer
++            Mock::given(method("GET"))
++                .and(path("/presigned"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    counter.fetch_add(1, Ordering::SeqCst);
++                    ResponseTemplate::new(403).set_body_string("access denied")
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .get(format!("{}/presigned", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let result = client
++                .execute_without_auth(request, RequestType::CloudFetchDownload)
++                .await;
++            assert!(result.is_err());
++            // 403 is non-retryable for idempotent requests — only 1 attempt
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 1);
++        }
++
++        #[tokio::test]
++        async fn test_metadata_query_retries_as_idempotent() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Return 500 once, then 200
++            // ExecuteMetadataQuery is idempotent so 500 should be retried
++            Mock::given(method("POST"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    let n = counter.fetch_add(1, Ordering::SeqCst);
++                    if n < 1 {
++                        ResponseTemplate::new(500)
++                    } else {
++                        ResponseTemplate::new(200).set_body_string("{}")
++                    }
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .post(format!("{}/test", mock_server.uri()))
++                .body("SHOW CATALOGS")
++                .build()
++                .unwrap();
++
++            let response = client
++                .execute(request, RequestType::ExecuteMetadataQuery)
++                .await;
++            assert!(response.is_ok());
++            // Idempotent: 500 is retried — 2 attempts
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 2);
++        }
++    }
+ }
\ No newline at end of file

Reproduce locally: git range-diff f69804a..8b81b96 f69804a..3855476 | Disable: git config gitstack.push-range-diff false

@vikrantpuppala vikrantpuppala force-pushed the stack/PECOBLR-2091-retry-logic-impl branch from 3855476 to 57ddb54 Compare March 25, 2026 12:05
@vikrantpuppala
Copy link
Copy Markdown
Collaborator Author

Range-diff: main (3855476 -> 57ddb54)
rust/src/client/http.rs
@@ -1007,6 +1007,183 @@
 +        }
 +
 +        #[tokio::test]
++        async fn test_exponential_backoff_timing() {
++            // Verify that delays between attempts follow exponential backoff.
++            // Config: min_wait=100ms, max_wait=500ms
++            // Expected: attempt 1→2: ~100ms+jitter, attempt 2→3: ~200ms+jitter, attempt 3→4: ~400ms+jitter
++            let mock_server = MockServer::start().await;
++            let timestamps = Arc::new(std::sync::Mutex::new(Vec::<Instant>::new()));
++            let ts = timestamps.clone();
++
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    ts.lock().unwrap().push(Instant::now());
++                    ResponseTemplate::new(503)
++                })
++                .mount(&mock_server)
++                .await;
++
++            let config = RetryConfig {
++                min_wait: Duration::from_millis(100),
++                max_wait: Duration::from_millis(500),
++                overall_timeout: Duration::from_secs(30),
++                max_retries: 3,
++                override_retryable_codes: None,
++            };
++            let client = test_client(&mock_server, config);
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let _ = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++
++            let ts = timestamps.lock().unwrap();
++            assert_eq!(ts.len(), 4, "Expected 4 attempts (1 initial + 3 retries)");
++
++            // Gap between attempt 1 and 2: ~100ms base + 50-750ms jitter = 150ms-850ms
++            let gap1 = ts[1].duration_since(ts[0]);
++            assert!(
++                gap1 >= Duration::from_millis(100) && gap1 <= Duration::from_millis(1000),
++                "Gap 1→2: {:?}, expected 100-1000ms",
++                gap1
++            );
++
++            // Gap between attempt 2 and 3: ~200ms base + 50-750ms jitter = 250ms-950ms
++            let gap2 = ts[2].duration_since(ts[1]);
++            assert!(
++                gap2 >= Duration::from_millis(200) && gap2 <= Duration::from_millis(1100),
++                "Gap 2→3: {:?}, expected 200-1100ms",
++                gap2
++            );
++
++            // Gap between attempt 3 and 4: ~400ms base + 50-750ms jitter = 450ms-1150ms
++            // But capped at max_wait=500ms, so: 500ms + 50-750ms jitter = 550ms-1250ms
++            let gap3 = ts[3].duration_since(ts[2]);
++            assert!(
++                gap3 >= Duration::from_millis(400) && gap3 <= Duration::from_millis(1400),
++                "Gap 3→4: {:?}, expected 400-1400ms",
++                gap3
++            );
++
++            // Exponential growth is verified by the ranges above:
++            // gap1 lower bound (100ms) < gap2 lower bound (200ms) < gap3 lower bound (400ms)
++            // Direct gap comparison is unreliable due to jitter (50-750ms) overlapping the base.
++        }
++
++        #[tokio::test]
++        async fn test_retry_after_header_delays_correctly() {
++            // Verify that Retry-After: N causes a delay of at least N seconds
++            // (clamped to min_wait, plus jitter)
++            let mock_server = MockServer::start().await;
++            let timestamps = Arc::new(std::sync::Mutex::new(Vec::<Instant>::new()));
++            let ts = timestamps.clone();
++
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    let n = ts.lock().unwrap().len();
++                    ts.lock().unwrap().push(Instant::now());
++                    if n < 1 {
++                        // Tell client to wait 1 second
++                        ResponseTemplate::new(429).append_header("Retry-After", "1")
++                    } else {
++                        ResponseTemplate::new(200).set_body_string("ok")
++                    }
++                })
++                .mount(&mock_server)
++                .await;
++
++            let config = RetryConfig {
++                min_wait: Duration::from_millis(50), // low min so Retry-After dominates
++                max_wait: Duration::from_secs(5),
++                overall_timeout: Duration::from_secs(30),
++                max_retries: 3,
++                override_retryable_codes: None,
++            };
++            let client = test_client(&mock_server, config);
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let response = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            assert!(response.is_ok());
++
++            let ts = timestamps.lock().unwrap();
++            assert_eq!(ts.len(), 2);
++
++            // Gap should be ~1s (Retry-After) + 50-750ms jitter = 1050ms-1750ms
++            let gap = ts[1].duration_since(ts[0]);
++            assert!(
++                gap >= Duration::from_millis(1000) && gap <= Duration::from_millis(2000),
++                "Gap with Retry-After:1: {:?}, expected 1000-2000ms",
++                gap
++            );
++        }
++
++        #[tokio::test]
++        async fn test_retry_after_clamped_to_max_wait() {
++            // Retry-After: 10 but max_wait: 500ms → should clamp to 500ms + jitter
++            let mock_server = MockServer::start().await;
++            let timestamps = Arc::new(std::sync::Mutex::new(Vec::<Instant>::new()));
++            let ts = timestamps.clone();
++
++            Mock::given(method("GET"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    let n = ts.lock().unwrap().len();
++                    ts.lock().unwrap().push(Instant::now());
++                    if n < 1 {
++                        // Tell client to wait 10 seconds — but max_wait will clamp this
++                        ResponseTemplate::new(429).append_header("Retry-After", "10")
++                    } else {
++                        ResponseTemplate::new(200).set_body_string("ok")
++                    }
++                })
++                .mount(&mock_server)
++                .await;
++
++            let config = RetryConfig {
++                min_wait: Duration::from_millis(50),
++                max_wait: Duration::from_millis(500), // clamp Retry-After:10 down to 500ms
++                overall_timeout: Duration::from_secs(30),
++                max_retries: 3,
++                override_retryable_codes: None,
++            };
++            let client = test_client(&mock_server, config);
++            let request = client
++                .inner()
++                .get(format!("{}/test", mock_server.uri()))
++                .build()
++                .unwrap();
++
++            let response = client
++                .execute(request, RequestType::GetStatementStatus)
++                .await;
++            assert!(response.is_ok());
++
++            let ts = timestamps.lock().unwrap();
++            assert_eq!(ts.len(), 2);
++
++            // Gap should be ~500ms (clamped max_wait) + 50-750ms jitter = 550-1250ms
++            // Crucially, NOT 10 seconds
++            let gap = ts[1].duration_since(ts[0]);
++            assert!(
++                gap >= Duration::from_millis(500) && gap <= Duration::from_millis(1500),
++                "Gap with clamped Retry-After: {:?}, expected 500-1500ms (NOT 10s)",
++                gap
++            );
++        }
++
++        #[tokio::test]
 +        async fn test_metadata_query_retries_as_idempotent() {
 +            let mock_server = MockServer::start().await;
 +            let attempt_count = Arc::new(AtomicU32::new(0));

Reproduce locally: git range-diff f69804a..3855476 f69804a..57ddb54 | Disable: git config gitstack.push-range-diff false

@vikrantpuppala vikrantpuppala force-pushed the stack/PECOBLR-2091-retry-logic-impl branch from 57ddb54 to dbcd69a Compare March 25, 2026 12:45
@vikrantpuppala
Copy link
Copy Markdown
Collaborator Author

Range-diff: main (57ddb54 -> dbcd69a)
rust/src/client/http.rs
@@ -1010,7 +1010,8 @@
 +        async fn test_exponential_backoff_timing() {
 +            // Verify that delays between attempts follow exponential backoff.
 +            // Config: min_wait=100ms, max_wait=500ms
-+            // Expected: attempt 1→2: ~100ms+jitter, attempt 2→3: ~200ms+jitter, attempt 3→4: ~400ms+jitter
++            // Per spec: exp_backoff = 2^attempt * min_wait
++            // Expected: attempt 1→2: ~200ms+jitter, attempt 2→3: ~400ms+jitter, attempt 3→4: ~500ms(capped)+jitter
 +            let mock_server = MockServer::start().await;
 +            let timestamps = Arc::new(std::sync::Mutex::new(Vec::<Instant>::new()));
 +            let ts = timestamps.clone();
@@ -1045,33 +1046,32 @@
 +            let ts = timestamps.lock().unwrap();
 +            assert_eq!(ts.len(), 4, "Expected 4 attempts (1 initial + 3 retries)");
 +
-+            // Gap between attempt 1 and 2: ~100ms base + 50-750ms jitter = 150ms-850ms
++            // Gap between attempt 1 and 2: 2^1*100ms = 200ms + 50-750ms jitter = 250-950ms
 +            let gap1 = ts[1].duration_since(ts[0]);
 +            assert!(
-+                gap1 >= Duration::from_millis(100) && gap1 <= Duration::from_millis(1000),
-+                "Gap 1→2: {:?}, expected 100-1000ms",
++                gap1 >= Duration::from_millis(200) && gap1 <= Duration::from_millis(1100),
++                "Gap 1→2: {:?}, expected 250-1100ms",
 +                gap1
 +            );
 +
-+            // Gap between attempt 2 and 3: ~200ms base + 50-750ms jitter = 250ms-950ms
++            // Gap between attempt 2 and 3: 2^2*100ms = 400ms + jitter = 450-1150ms
 +            let gap2 = ts[2].duration_since(ts[1]);
 +            assert!(
-+                gap2 >= Duration::from_millis(200) && gap2 <= Duration::from_millis(1100),
-+                "Gap 2→3: {:?}, expected 200-1100ms",
++                gap2 >= Duration::from_millis(400) && gap2 <= Duration::from_millis(1300),
++                "Gap 2→3: {:?}, expected 450-1300ms",
 +                gap2
 +            );
 +
-+            // Gap between attempt 3 and 4: ~400ms base + 50-750ms jitter = 450ms-1150ms
-+            // But capped at max_wait=500ms, so: 500ms + 50-750ms jitter = 550ms-1250ms
++            // Gap between attempt 3 and 4: 2^3*100ms = 800ms, capped to max_wait=500ms + jitter
 +            let gap3 = ts[3].duration_since(ts[2]);
 +            assert!(
-+                gap3 >= Duration::from_millis(400) && gap3 <= Duration::from_millis(1400),
-+                "Gap 3→4: {:?}, expected 400-1400ms",
++                gap3 >= Duration::from_millis(500) && gap3 <= Duration::from_millis(1400),
++                "Gap 3→4: {:?}, expected 550-1400ms (capped at max_wait=500ms)",
 +                gap3
 +            );
 +
 +            // Exponential growth is verified by the ranges above:
-+            // gap1 lower bound (100ms) < gap2 lower bound (200ms) < gap3 lower bound (400ms)
++            // gap1 lower bound (200ms) < gap2 lower bound (400ms) < gap3 lower bound (500ms, capped)
 +            // Direct gap comparison is unreliable due to jitter (50-750ms) overlapping the base.
 +        }
 +
rust/src/client/retry.rs
@@ -216,6 +216,12 @@
 +        let code = status.as_u16();
 +
 +        // If override codes are set, use them as the exhaustive retryable set
++        // Success codes are never retryable (the caller checks success first,
++        // but guard here for safety if the method is called directly)
++        if status.is_success() || status.is_redirection() {
++            return false;
++        }
++
 +        if let Some(ref codes) = self.override_codes {
 +            return codes.contains(&code);
 +        }
@@ -256,10 +262,8 @@
 +        // Clamp Retry-After to [min_wait, max_wait]
 +        wait.clamp(config.min_wait, config.max_wait)
 +    } else {
-+        // Exponential backoff: min_wait * 2^(attempt - 1)
-+        let exp = config
-+            .min_wait
-+            .saturating_mul(2u32.saturating_pow(attempt.saturating_sub(1)));
++        // Exponential backoff: min_wait * 2^attempt (per spec)
++        let exp = config.min_wait.saturating_mul(2u32.saturating_pow(attempt));
 +        exp.min(config.max_wait)
 +    };
 +
@@ -427,11 +431,10 @@
 +            idempotency: RequestIdempotency::Idempotent,
 +            override_codes: None,
 +        };
-+        // 2xx codes should not be retryable (they're successes)
-+        // Note: the retry loop checks success first, but the strategy itself
-+        // says "retryable" for any non-explicitly-non-retryable code.
-+        // The 200 case is handled by the success check, not the strategy.
-+        assert!(strategy.is_retryable_status(StatusCode::OK));
++        // 2xx and 3xx codes are never retryable
++        assert!(!strategy.is_retryable_status(StatusCode::OK));
++        assert!(!strategy.is_retryable_status(StatusCode::CREATED));
++        assert!(!strategy.is_retryable_status(StatusCode::MOVED_PERMANENTLY));
 +    }
 +
 +    #[test]
@@ -493,17 +496,18 @@
 +            max_wait: Duration::from_secs(60),
 +            ..Default::default()
 +        };
-+        // Attempt 1: ~1s, Attempt 2: ~2s, Attempt 3: ~4s
-+        // Due to jitter (50-750ms), we check ranges
++        // Per spec: exp_backoff = 2^attempt * min_wait
++        // Attempt 1: 2^1 * 1s = 2s + jitter, Attempt 2: 2^2 * 1s = 4s + jitter,
++        // Attempt 3: 2^3 * 1s = 8s + jitter
 +        let b1 = calculate_backoff(&config, 1, None);
 +        let b2 = calculate_backoff(&config, 2, None);
 +        let b3 = calculate_backoff(&config, 3, None);
-+        assert!(b1 >= Duration::from_millis(1050));
-+        assert!(b1 <= Duration::from_millis(1750));
-+        assert!(b2 >= Duration::from_millis(2050));
-+        assert!(b2 <= Duration::from_millis(2750));
-+        assert!(b3 >= Duration::from_millis(4050));
-+        assert!(b3 <= Duration::from_millis(4750));
++        assert!(b1 >= Duration::from_millis(2050));
++        assert!(b1 <= Duration::from_millis(2750));
++        assert!(b2 >= Duration::from_millis(4050));
++        assert!(b2 <= Duration::from_millis(4750));
++        assert!(b3 >= Duration::from_millis(8050));
++        assert!(b3 <= Duration::from_millis(8750));
 +    }
 +
 +    #[test]
@@ -525,9 +529,9 @@
 +            max_wait: Duration::from_secs(60),
 +            ..Default::default()
 +        };
-+        // Attempt 1: min_wait * 2^0 = 2s + jitter
++        // Attempt 1: min_wait * 2^1 = 4s + jitter
 +        let backoff = calculate_backoff(&config, 1, None);
-+        assert!(backoff >= Duration::from_millis(2050));
++        assert!(backoff >= Duration::from_millis(4050));
 +    }
 +
 +    #[test]
@@ -561,10 +565,10 @@
 +            min_wait: Duration::from_secs(1),
 +            ..Default::default()
 +        };
-+        // Invalid Retry-After should fall back to exponential
++        // Invalid Retry-After should fall back to exponential: 2^1 * 1s = 2s + jitter
 +        let backoff = calculate_backoff(&config, 1, Some("not-a-number"));
-+        assert!(backoff >= Duration::from_millis(1050));
-+        assert!(backoff <= Duration::from_millis(1750));
++        assert!(backoff >= Duration::from_millis(2050));
++        assert!(backoff <= Duration::from_millis(2750));
 +    }
 +
 +    #[test]
@@ -575,11 +579,11 @@
 +            ..Default::default()
 +        };
 +        // Run multiple times and check jitter range
++        // Attempt 1: 2^1 * 1s = 2s base + 50ms to 750ms jitter
 +        for _ in 0..100 {
 +            let backoff = calculate_backoff(&config, 1, None);
-+            // 1s base + 50ms to 750ms jitter
-+            assert!(backoff >= Duration::from_millis(1050));
-+            assert!(backoff <= Duration::from_millis(1750));
++            assert!(backoff >= Duration::from_millis(2050));
++            assert!(backoff <= Duration::from_millis(2750));
 +        }
 +    }
 +
@@ -634,6 +638,31 @@
 +    }
 +
 +    #[test]
++    fn test_parse_retry_after_http_date() {
++        // Use a future date relative to test execution
++        let future = chrono::Utc::now() + chrono::Duration::seconds(30);
++        let formatted = future.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
++        let result = parse_retry_after(&formatted);
++        assert!(result.is_some());
++        let duration = result.unwrap();
++        // Allow some tolerance for test execution time
++        assert!(
++            duration.as_secs() >= 28 && duration.as_secs() <= 32,
++            "Expected ~30s, got {:?}",
++            duration
++        );
++    }
++
++    #[test]
++    fn test_parse_retry_after_past_date() {
++        // A date in the past should return Duration::ZERO
++        let past = chrono::Utc::now() - chrono::Duration::seconds(60);
++        let formatted = past.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
++        let result = parse_retry_after(&formatted);
++        assert_eq!(result, Some(Duration::ZERO));
++    }
++
++    #[test]
 +    fn test_parse_retry_after_invalid() {
 +        assert_eq!(parse_retry_after("not-a-number"), None);
 +        assert_eq!(parse_retry_after(""), None);

Reproduce locally: git range-diff f69804a..57ddb54 f69804a..dbcd69a | Disable: git config gitstack.push-range-diff false

@vikrantpuppala vikrantpuppala force-pushed the stack/PECOBLR-2091-retry-logic-impl branch from dbcd69a to 5095574 Compare March 25, 2026 13:05
@vikrantpuppala
Copy link
Copy Markdown
Collaborator Author

Range-diff: main (dbcd69a -> 5095574)
rust/docs/designs/PECOBLR-2091-retry-logic-design.md
@@ -0,0 +1,735 @@
+diff --git a/rust/docs/designs/PECOBLR-2091-retry-logic-design.md b/rust/docs/designs/PECOBLR-2091-retry-logic-design.md
+new file mode 100644
+--- /dev/null
++++ b/rust/docs/designs/PECOBLR-2091-retry-logic-design.md
++<!--
++  Copyright (c) 2025 ADBC Drivers Contributors
++
++  Licensed under the Apache License, Version 2.0 (the "License");
++  you may not use this file except in compliance with the License.
++  You may obtain a copy of the License at
++
++      http://www.apache.org/licenses/LICENSE-2.0
++
++  Unless required by applicable law or agreed to in writing, software
++  distributed under the License is distributed on an "AS IS" BASIS,
++  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++  See the License for the specific language governing permissions and
++  limitations under the License.
++-->
++
++# PECOBLR-2091: Retry Logic Design
++
++| Field | Value |
++|-------|-------|
++| **Author** | Vikrant Puppala |
++| **Date** | March 25, 2026 |
++| **Status** | DRAFT |
++| **JIRA** | [PECOBLR-2091](https://databricks.atlassian.net/browse/PECOBLR-2091) |
++| **Spec** | [DBSQL Connectors Retry Logic](~/docs/dbsql-connectors-retry-logic.md) |
++
++## Overview
++
++This design implements the standardized DBSQL connector retry logic in the Rust ADBC driver. The driver currently has basic retry support (exponential backoff on 429/502/503/504) but lacks idempotency-aware strategies, `Retry-After` header support, configurable parameters, jitter, cumulative timeout, and structured retry telemetry.
++
++The goal is to align with the cross-connector retry specification while integrating cleanly with the driver's single `DatabricksHttpClient` architecture.
++
++### Current State
++
++The HTTP client (`client/http.rs`) retries all requests identically:
++- Retries on 429, 502, 503, 504 and network errors (timeout, connect, request)
++- Exponential backoff: `delay * 2^attempt` with no jitter
++- Count-based only (`max_retries = 5`), no cumulative timeout
++- No `Retry-After` header parsing
++- No idempotency classification
++
++### Target State
++
++- Requests classified as **idempotent** or **non-idempotent** with distinct retry strategies
++- `Retry-After` header honored, with min/max clamping
++- Exponential backoff with random jitter (50ms–750ms)
++- Cumulative timeout (default 900s) in addition to per-attempt count
++- Per-category configuration with global defaults
++- Structured logging per attempt and per request summary
++- CloudFetch presigned URL expiry integrated into the retry flow
++
++## Architecture
++
++### Component Overview
++
++```mermaid
++classDiagram
++    class DatabricksHttpClient {
++        -client: Client
++        -config: HttpClientConfig
++        -auth_provider: OnceLock~AuthProvider~
++        -retry_configs: HashMap~RequestCategory, RetryConfig~
++        +execute(request, request_type) Result~Response~
++        +execute_without_auth(request, request_type) Result~Response~
++    }
++
++    class RequestType {
++        <<enumeration>>
++        CreateSession
++        CloseSession
++        ExecuteStatement
++        ExecuteMetadataQuery
++        GetStatementStatus
++        CancelStatement
++        CloseStatement
++        GetResultChunks
++        CloudFetchDownload
++        AuthTokenRequest
++        AuthDiscovery
++        TelemetryPush
++        VolumeRead
++        VolumeWrite
++        +category() RequestCategory
++        +idempotency() RequestIdempotency
++    }
++
++    class RequestCategory {
++        <<enumeration>>
++        Sea
++        CloudFetch
++        Auth
++        Telemetry
++        Volume
++    }
++
++    class RequestIdempotency {
++        <<enumeration>>
++        Idempotent
++        NonIdempotent
++    }
++
++    class RetryConfig {
++        +min_wait: Duration
++        +max_wait: Duration
++        +overall_timeout: Duration
++        +max_retries: u32
++        +override_retryable_codes: Option~HashSet~u16~~
++    }
++
++    class RetryStrategy {
++        <<trait>>
++        +is_retryable_status(status) bool
++        +is_retryable_error(error) bool
++    }
++
++    class IdempotentRetryStrategy {
++        +is_retryable_status(status) bool
++        +is_retryable_error(error) bool
++    }
++
++    class NonIdempotentRetryStrategy {
++        +is_retryable_status(status) bool
++        +is_retryable_error(error) bool
++    }
++
++    class BackoffCalculator {
++        +calculate(config, attempt, retry_after_header) Duration
++    }
++
++    DatabricksHttpClient --> RetryConfig : holds per-category
++    DatabricksHttpClient --> RequestType : callers pass this
++    RequestType --> RequestCategory : derives
++    RequestType --> RequestIdempotency : derives
++    DatabricksHttpClient ..> RetryStrategy : selects from idempotency
++    RetryStrategy <|.. IdempotentRetryStrategy
++    RetryStrategy <|.. NonIdempotentRetryStrategy
++    DatabricksHttpClient ..> BackoffCalculator
++```
++
++### Retry Flow
++
++```mermaid
++sequenceDiagram
++    participant Caller as SeaClient / ChunkDownloader
++    participant HTTP as DatabricksHttpClient
++    participant Strategy as RetryStrategy
++    participant Backoff as BackoffCalculator
++    participant Server as Databricks API
++
++    Caller->>HTTP: execute(request, RequestType)
++    HTTP->>HTTP: Resolve category + idempotency from RequestType
++    HTTP->>HTTP: Look up RetryConfig for category
++    HTTP->>HTTP: Record start_time
++
++    loop Until success, non-retryable error, or timeout
++        HTTP->>Server: Send request (attempt N)
++        Server-->>HTTP: Response / Error
++
++        alt Success (2xx)
++            HTTP-->>Caller: Ok(Response)
++        else HTTP Error
++            HTTP->>Strategy: is_retryable_status(status)?
++            Strategy-->>HTTP: true / false
++            alt Not retryable OR attempts exhausted OR overall_timeout exceeded
++                HTTP-->>Caller: Err(error)
++            else Retryable
++                HTTP->>Backoff: calculate(config, attempt, Retry-After header)
++                Backoff-->>HTTP: wait_duration
++                HTTP->>HTTP: sleep(wait_duration)
++            end
++        else Network Error
++            HTTP->>Strategy: is_retryable_error(error)?
++            Strategy-->>HTTP: true / false
++            alt Not retryable OR attempts exhausted OR overall_timeout exceeded
++                HTTP-->>Caller: Err(error)
++            else Retryable
++                HTTP->>Backoff: calculate(config, attempt, None)
++                Backoff-->>HTTP: wait_duration
++                HTTP->>HTTP: sleep(wait_duration)
++            end
++        end
++    end
++```
++
++## Detailed Design
++
++### RequestType Enum and Centralized Classification
++
++Instead of having each caller decide the idempotency of their request, we define a `RequestType` enum that serves as the **single source of truth** for both category and idempotency mapping. Callers pass a `RequestType` to the HTTP client, which resolves everything internally.
++
++```rust
++/// Identifies the type of request being made. The HTTP client uses this
++/// to look up the correct RetryConfig (from category) and select the
++/// correct RetryStrategy (from idempotency).
++pub enum RequestType {
++    // SEA
++    CreateSession,
++    CloseSession,
++    ExecuteStatement,
++    ExecuteMetadataQuery,
++    GetStatementStatus,
++    CancelStatement,
++    CloseStatement,
++    GetResultChunks,
++    // CloudFetch
++    CloudFetchDownload,
++    // Auth
++    AuthTokenRequest,
++    AuthDiscovery,
++    // Future
++    TelemetryPush,
++    VolumeRead,
++    VolumeWrite,
++}
++
++pub enum RequestCategory {
++    Sea,
++    CloudFetch,
++    Auth,
++    Telemetry,
++    Volume,
++}
++
++pub enum RequestIdempotency {
++    Idempotent,
++    NonIdempotent,
++}
++```
++
++The mapping is centralized in `RequestType`:
++
++```rust
++impl RequestType {
++    pub fn category(&self) -> RequestCategory { ... }
++    pub fn idempotency(&self) -> RequestIdempotency { ... }
++}
++```
++
++| RequestType | Category | Idempotency | Notes |
++|-------------|----------|-------------|-------|
++| `CreateSession` | Sea | Idempotent | May create unused sessions, bounded by retry count |
++| `CloseSession` | Sea | Idempotent | Handle already-closed |
++| `ExecuteStatement` | Sea | **NonIdempotent** | User SQL may have side effects |
++| `ExecuteMetadataQuery` | Sea | Idempotent | Read-only SHOW/SELECT on information_schema |
++| `GetStatementStatus` | Sea | Idempotent | |
++| `CancelStatement` | Sea | Idempotent | Handle already-closed |
++| `CloseStatement` | Sea | Idempotent | Handle already-closed |
++| `GetResultChunks` | Sea | Idempotent | |
++| `CloudFetchDownload` | CloudFetch | Idempotent | See [Presigned URL Expiry](#presigned-url-expiry) |
++| `AuthDiscovery` | Auth | Idempotent | OIDC endpoint discovery |
++| `AuthTokenRequest` | Auth | Idempotent | Token endpoint returns fresh token |
++| `TelemetryPush` | Telemetry | Idempotent | Server deduplicates (future) |
++| `VolumeRead` | Volume | Idempotent | GET/LIST/SHOW (future) |
++| `VolumeWrite` | Volume | **NonIdempotent** | PUT (future) |
++
++**Why a `RequestType` enum instead of passing idempotency directly:**
++- **Single source of truth** — the `RequestType -> (Category, Idempotency)` mapping lives in one place, preventing misclassification (e.g., a caller accidentally marking `ExecuteStatement` as idempotent)
++- **Better logging** — the HTTP client can log the request type name (e.g., `"CreateSession"`) instead of just the HTTP method/URL
++- **Simpler caller API** — callers pass one value instead of two (`RetryConfig` + `RequestIdempotency`), and the HTTP client resolves everything internally
++- **Extensible** — adding a new endpoint means adding one enum variant with its mapping, not updating every call site
++
++Metadata queries (e.g., `list_catalogs`, `list_schemas`, `list_tables`, `list_columns`) execute read-only SQL via `POST /statements/`. Despite using the same endpoint as `ExecuteStatement`, they use `ExecuteMetadataQuery` because:
++- The SQL is generated internally (not user-provided)
++- All metadata SQL is read-only (`SHOW CATALOGS`, `SHOW SCHEMAS`, etc.)
++
++The `SeaClient` passes `RequestType::ExecuteMetadataQuery` for all `list_*` methods and `RequestType::ExecuteStatement` for user-initiated queries.
++
++### Retry Strategies
++
++#### `RetryStrategy` Trait
++
++```rust
++pub trait RetryStrategy: Send + Sync {
++    /// Whether the given HTTP status code is retryable.
++    fn is_retryable_status(&self, status: StatusCode) -> bool;
++
++    /// Whether the given network/client error is retryable.
++    fn is_retryable_error(&self, error: &reqwest::Error) -> bool;
++}
++```
++
++#### Idempotent Strategy
++
++Retries **everything except** known non-retryable client errors:
++
++**Non-retryable HTTP codes:** 400, 401, 403, 404, 405, 409, 410, 411, 412, 413, 414, 415, 416
++
++**Non-retryable errors:** None — all network errors (timeout, connect, request) are retried for idempotent requests.
++
++**Override behavior:** If `override_retryable_codes` is set, it becomes the **exhaustive** set of retryable codes — only those codes are retried, everything else is non-retryable. This replaces the default logic entirely.
++
++#### Non-Idempotent Strategy
++
++Retries **only** when the request provably did not reach the server, or the server explicitly signals retry:
++
++**Retryable HTTP codes:** 429, 503 (configurable via `override_retryable_codes`)
++
++**Retryable errors (connection-level only):**
++- `error.is_connect()` — connection refused, DNS failure, no route to host
++
++**Not retried:**
++- `error.is_timeout()` — request may have reached the server
++- `error.is_request()` — request was partially sent
++- Any other network error
++
++This is the critical safety boundary: a non-idempotent request that may have reached the server must not be retried, as it could cause duplicate writes or side effects.
++
++### Backoff Calculator
++
++```rust
++pub fn calculate_backoff(
++    config: &RetryConfig,
++    attempt: u32,
++    retry_after_header: Option<&str>,
++) -> Duration
++```
++
++**Logic:**
++
++```
++if Retry-After header present and parseable:
++    wait = parse_retry_after(header)    // seconds value or HTTP-date
++    wait = clamp(wait, config.min_wait, config.max_wait)
++else:
++    exp_backoff = config.min_wait * 2^(attempt - 1)
++    wait = min(exp_backoff, config.max_wait)
++
++jitter = random(50ms, 750ms)
++return wait + jitter
++```
++
++`Retry-After` parsing supports both formats per HTTP spec:
++- Seconds: `Retry-After: 120` → 120 seconds
++- HTTP-date: `Retry-After: Wed, 25 Mar 2026 10:30:00 GMT` → delta from now
++
++If parsing fails, falls back to exponential backoff.
++
++### Retry Configuration
++
++```rust
++pub struct RetryConfig {
++    /// Minimum wait time per retry (default: 1s).
++    /// Clamps both exponential backoff floor and Retry-After minimum.
++    pub min_wait: Duration,
++
++    /// Maximum wait time per retry (default: 60s).
++    /// Clamps both exponential backoff ceiling and Retry-After maximum.
++    pub max_wait: Duration,
++
++    /// Cumulative wall-clock timeout for all retry attempts (default: 900s).
++    /// Stops retrying when total elapsed time exceeds this, even if
++    /// max_retries has not been reached.
++    pub overall_timeout: Duration,
++
++    /// Maximum number of retry attempts (default: 5).
++    /// A request is attempted at most max_retries + 1 times.
++    pub max_retries: u32,
++
++    /// Override the set of HTTP codes that are retryable.
++    /// When set, this becomes the **exhaustive** set — only these codes are
++    /// retried, regardless of the strategy's default logic. All other codes
++    /// are treated as non-retryable.
++    pub override_retryable_codes: Option<HashSet<u16>>,
++}
++```
++
++**Defaults:**
++
++| Parameter | Default | Spec Default |
++|-----------|---------|--------------|
++| `min_wait` | 1s | 1s |
++| `max_wait` | 60s | 60s |
++| `overall_timeout` | 900s | 900s |
++| `max_retries` | 5 | — |
++
++### Per-Category Configuration with Global Defaults
++
++Each request category has its own `RetryConfig`, but unset fields fall back to a global default. This is achieved through a layered config model:
++
++```mermaid
++classDiagram
++    class GlobalRetryDefaults {
++        +min_wait: 1s
++        +max_wait: 60s
++        +overall_timeout: 900s
++        +max_retries: 5
++    }
++
++    class SeaRetryConfig {
++        +overrides global defaults
++    }
++
++    class CloudFetchRetryConfig {
++        +overrides global defaults
++    }
++
++    class AuthRetryConfig {
++        +overrides global defaults
++    }
++
++    class TelemetryRetryConfig {
++        +overrides global defaults
++        +(future)
++    }
++
++    class VolumeRetryConfig {
++        +overrides global defaults
++        +(future)
++    }
++
++    GlobalRetryDefaults <|-- SeaRetryConfig
++    GlobalRetryDefaults <|-- CloudFetchRetryConfig
++    GlobalRetryDefaults <|-- AuthRetryConfig
++    GlobalRetryDefaults <|-- TelemetryRetryConfig
++    GlobalRetryDefaults <|-- VolumeRetryConfig
++```
++
++**ADBC connection parameters:**
++
++| Parameter | Scope | Default |
++|-----------|-------|---------|
++| `databricks.retry.min_wait_ms` | Global | 1000 |
++| `databricks.retry.max_wait_ms` | Global | 60000 |
++| `databricks.retry.overall_timeout_ms` | Global | 900000 |
++| `databricks.retry.max_retries` | Global | 5 |
++| `databricks.retry.retryable_codes` | Global | (strategy default) |
++| `databricks.retry.sea.min_wait_ms` | SEA | (global) |
++| `databricks.retry.sea.max_wait_ms` | SEA | (global) |
++| `databricks.retry.sea.overall_timeout_ms` | SEA | (global) |
++| `databricks.retry.sea.max_retries` | SEA | (global) |
++| `databricks.retry.sea.retryable_codes` | SEA | (global) |
++| `databricks.retry.cloudfetch.min_wait_ms` | CloudFetch | (global) |
++| `databricks.retry.cloudfetch.max_wait_ms` | CloudFetch | (global) |
++| `databricks.retry.cloudfetch.overall_timeout_ms` | CloudFetch | (global) |
++| `databricks.retry.cloudfetch.max_retries` | CloudFetch | (global) |
++| `databricks.retry.cloudfetch.retryable_codes` | CloudFetch | (global) |
++| `databricks.retry.auth.min_wait_ms` | Auth | (global) |
++| `databricks.retry.auth.max_wait_ms` | Auth | (global) |
++| `databricks.retry.auth.overall_timeout_ms` | Auth | (global) |
++| `databricks.retry.auth.max_retries` | Auth | (global) |
++| `databricks.retry.auth.retryable_codes` | Auth | (global) |
++
++Category-specific parameters override global defaults when explicitly set. This allows users to, for example, set a shorter timeout for auth requests without affecting SEA retries.
++
++**Resolution order** for each config field: category-specific → global → built-in default. For `retryable_codes`, if set at any level, it becomes the exhaustive retryable set for that category, replacing the strategy's default logic.
++
++### Presigned URL Expiry Strategy {#presigned-url-expiry}
++
++CloudFetch presigned URLs have server-controlled expiration times (typically minutes). The retry strategy must account for this to avoid retrying with an expired URL.
++
++**Current behavior** (already implemented in `streaming_provider.rs`):
++- Before each download attempt, check `link.is_expired()` (with 60s buffer)
++- If expired, proactively refetch via `link_fetcher.refetch_link()`
++- Store refreshed link for subsequent retry attempts
++
++**Integration with the new retry framework:**
++
++CloudFetch downloads use `execute_without_auth()` which will now accept `RetryConfig` and `RequestIdempotency::Idempotent`. However, the HTTP-level retry loop alone is insufficient for presigned URL expiry — a 403 from an expired URL should trigger a **link refresh**, not just a backoff-and-retry with the same URL.
++
++The solution is a **two-layer retry architecture** for CloudFetch:
++
++```mermaid
++sequenceDiagram
++    participant SP as StreamingProvider
++    participant CD as ChunkDownloader
++    participant HTTP as DatabricksHttpClient
++    participant Cloud as Cloud Storage
++
++    SP->>SP: Check link.is_expired() (60s buffer)
++    alt Link expired
++        SP->>SP: refetch_link() from SEA API
++    end
++
++    SP->>CD: download(link)
++    CD->>HTTP: execute_without_auth(GET url, CloudFetchDownload)
++
++    loop HTTP retry loop (transient errors)
++        HTTP->>Cloud: GET presigned_url
++        Cloud-->>HTTP: Response
++
++        alt 200 OK
++            HTTP-->>CD: Ok(Response)
++        else 403 Forbidden
++            HTTP-->>CD: Err (non-retryable at HTTP layer)
++            Note over HTTP: 403 is in the idempotent<br/>non-retryable set
++        else 429/503/5xx (transient)
++            HTTP->>HTTP: Backoff + retry
++        end
++    end
++
++    CD-->>SP: Ok(batches) or Err
++
++    alt Err (including 403)
++        SP->>SP: Invalidate link, refetch_link()
++        SP->>CD: Retry download with fresh link
++    end
++```
++
++**Key design decisions:**
++
++1. **403 is non-retryable at the HTTP layer** — it's in the idempotent non-retryable set (per spec). This is correct because retrying the same expired URL will always fail.
++
++2. **Link refresh happens at the `StreamingProvider` layer** — this layer already owns link lifecycle, expiry checks, and the `ChunkLinkFetcher`. When a download fails with 403, the provider invalidates the cached link and refetches before retrying.
++
++3. **Two independent retry budgets:**
++   - **HTTP layer**: retries transient errors (429, 502, 503, 504, network errors) using the `CloudFetch` category's `RetryConfig` (resolved from `RequestType::CloudFetchDownload`)
++   - **Provider layer**: retries with link refresh on 403/expiry, bounded by its own `max_retries` (from `CloudFetchConfig`)
++
++4. **Proactive expiry check unchanged** — the existing 60-second buffer check before download continues to work, preventing most expiry-related 403s.
++
++This separation keeps the HTTP retry layer generic (no knowledge of presigned URLs) while the provider layer handles the domain-specific concern of link expiry.
++
++### HTTP Client Interface Changes
++
++The `execute` and `execute_without_auth` methods gain a single `RequestType` parameter. The HTTP client resolves the `RetryConfig` (from its internal category map) and `RetryStrategy` (from the idempotency mapping) internally:
++
++```rust
++impl DatabricksHttpClient {
++    /// Execute with auth and retry logic.
++    /// The RequestType determines which RetryConfig and RetryStrategy to use.
++    pub async fn execute(
++        &self,
++        request: Request,
++        request_type: RequestType,
++    ) -> Result<Response>;
++
++    /// Execute without auth (CloudFetch, OAuth token endpoint).
++    /// Same retry logic, just skips the Authorization header.
++    pub async fn execute_without_auth(
++        &self,
++        request: Request,
++        request_type: RequestType,
++    ) -> Result<Response>;
++}
++```
++
++The HTTP client holds the per-category retry configs, populated at construction from `HttpClientConfig`:
++
++```rust
++pub struct DatabricksHttpClient {
++    client: Client,
++    config: HttpClientConfig,
++    auth_provider: OnceLock<Arc<dyn AuthProvider>>,
++    retry_configs: HashMap<RequestCategory, RetryConfig>,
++}
++```
++
++**Migration path:** All current callers will be updated to pass the appropriate `RequestType`. The existing `HttpClientConfig.max_retries` and `HttpClientConfig.retry_delay` fields will be removed in favor of the new `RetryConfig` system.
++
++### SeaClient Changes
++
++`SeaClient` no longer holds retry configs — those live in `DatabricksHttpClient`. Instead, each method simply passes the appropriate `RequestType`:
++
++| Method | RequestType Passed |
++|--------|-------------------|
++| `create_session()` | `CreateSession` |
++| `delete_session()` | `CloseSession` |
++| `execute_statement()` | `ExecuteStatement` |
++| `list_catalogs()`, `list_schemas()`, etc. | `ExecuteMetadataQuery` |
++| `get_statement_status()` | `GetStatementStatus` |
++| `cancel_statement()` | `CancelStatement` |
++| `close_statement()` | `CloseStatement` |
++| `get_result_chunks()` | `GetResultChunks` |
++
++For metadata queries, the internal `call_execute_api` helper accepts a `RequestType` parameter. The `DatabricksClient::execute_statement` trait method is called with `RequestType::ExecuteStatement` by default, while all `list_*` metadata methods use `RequestType::ExecuteMetadataQuery`. This avoids adding an `is_idempotent` flag to the trait — the `RequestType` enum encodes both the category and the idempotency in a single value.
++
++### Auth Retry Changes
++
++OAuth providers (`ClientCredentialsProvider`, `AuthorizationCodeProvider`) currently call `execute_without_auth()` for token endpoint requests. These will now pass `RequestType::AuthTokenRequest` or `RequestType::AuthDiscovery`, and the HTTP client resolves the `Auth` category retry config internally.
++
++No structural changes needed to the auth providers — they just add the `RequestType` parameter to their `execute_without_auth()` calls.
++
++## Logging and Telemetry
++
++### Per-Attempt Logging (DEBUG)
++
++Each retry attempt logs:
++- Request description (e.g., `POST /statements`, `GET chunk/3`)
++- Attempt number (e.g., `2/6`)
++- Backoff delay and source (`Retry-After: 5s` or `exponential: 4s + jitter: 312ms`)
++- Error that triggered the retry (status code or error message)
++
++```
++DEBUG retry: POST /api/2.0/sql/statements attempt 2/6,
++      waiting 4.312s (exponential), error: HTTP 503
++```
++
++### Per-Request Summary (DEBUG)
++
++After all attempts complete (success or failure):
++- Request description
++- Total attempts
++- Total elapsed time
++- Final outcome (`success` or `failed`)
++- Final error (if failed)
++
++```
++DEBUG retry: POST /api/2.0/sql/statements completed after 3 attempts
++      in 9.7s — success
++```
++
++```
++DEBUG retry: GET /api/2.0/sql/statements/abc123 failed after 6 attempts
++      in 127.4s — HTTP 503 Service Unavailable
++```
++
++### Telemetry Metrics (Future)
++
++When the telemetry system is implemented (PECOBLR-2098), retry metrics will be emitted:
++- `retry.attempt_count` — histogram of attempts per request
++- `retry.total_duration_ms` — histogram of total retry duration
++- `retry.outcome` — counter by outcome (success_first_attempt, success_after_retry, failed)
++
++## Edge Cases and Failure Modes
++
++### Overall Timeout vs. Retry-After
++
++If the server sends a `Retry-After` value that would exceed `overall_timeout`, the driver does **not** wait. It immediately returns the error. This follows Gopal's review feedback: honor `overall_timeout` as the hard ceiling.
++
++```
++remaining = overall_timeout - elapsed
++if calculated_wait > remaining:
++    return Err("Retry timeout exceeded")
++```
++
++### Max Retries vs. Overall Timeout
++
++Both limits are enforced. Whichever is reached first stops retrying:
++- `attempts > max_retries` → stop
++- `elapsed > overall_timeout` → stop
++- `elapsed + next_wait > overall_timeout` → stop (don't start a retry that will time out)
++
++### Auth Token Refresh During Retries
++
++OAuth tokens may expire between retry attempts. Since `auth_header()` is called fresh on each attempt (existing behavior), token refresh happens transparently. If token refresh itself fails, that error propagates as a non-retryable error.
++
++### Concurrent Retry Storms
++
++Multiple parallel CloudFetch downloads may all fail simultaneously (e.g., during a brief network blip). The jitter (50ms–750ms) helps spread out retry attempts. The existing `max_chunks_in_memory` backpressure limit bounds the number of concurrent retrying downloads.
++
++### Already-Closed Resources
++
++When retrying `close_session`, `close_statement`, or `cancel_statement`, the resource may already be closed from a previous attempt that succeeded on the server but failed on the network return path. These methods should handle "already closed" responses gracefully (not treat them as errors).
++
++## Test Strategy
++
++### Unit Tests
++
++**RequestType mapping:**
++- `test_request_type_category_mapping` — all variants map to expected category
++- `test_request_type_idempotency_mapping` — all variants map to expected idempotency
++- `test_execute_statement_is_non_idempotent`
++- `test_execute_metadata_query_is_idempotent`
++
++**Idempotent strategy:**
++- `test_idempotent_strategy_retries_5xx`
++- `test_idempotent_strategy_no_retry_on_400_401_403_404`
++- `test_idempotent_strategy_retries_network_errors`
++- `test_idempotent_strategy_override_codes`
++
++**Non-idempotent classification:**
++- `test_non_idempotent_strategy_retries_429_503`
++- `test_non_idempotent_strategy_no_retry_on_timeout`
++- `test_non_idempotent_strategy_retries_connect_error`
++- `test_non_idempotent_strategy_no_retry_on_5xx`
++
++**Backoff calculation:**
++- `test_exponential_backoff_increases`
++- `test_backoff_capped_at_max_wait`
++- `test_backoff_respects_min_wait`
++- `test_retry_after_header_seconds`
++- `test_retry_after_header_http_date`
++- `test_retry_after_clamped_to_min_max`
++- `test_jitter_in_range_50ms_750ms`
++
++**Configuration:**
++- `test_retry_config_defaults`
++- `test_category_config_overrides_global`
++- `test_overall_timeout_stops_retries`
++- `test_max_retries_stops_retries`
++
++### Integration Tests (with mock HTTP server)
++
++- `test_request_succeeds_on_retry` — mock returns 503 twice, then 200
++- `test_request_fails_after_max_retries` — mock returns 503 forever
++- `test_request_fails_after_overall_timeout` — mock returns 503 with slow backoff
++- `test_retry_after_header_honored` — mock returns 429 with Retry-After
++- `test_non_idempotent_no_retry_on_500` — mock returns 500, verify single attempt
++- `test_non_idempotent_retries_on_429` — mock returns 429, verify retry
++
++### CloudFetch-Specific Tests
++
++- `test_expired_link_triggers_refresh` — mock expired link, verify refetch before download
++- `test_403_triggers_link_refresh` — mock 403 on download, verify provider-level retry with fresh link
++- `test_transient_error_retried_at_http_layer` — mock 503, verify HTTP-level retry without link refresh
++
++## Implementation Plan
++
++| Phase | Scope | Estimated Effort |
++|-------|-------|------------------|
++| **1** | Core retry framework: `RequestType`, `RetryConfig`, `RetryStrategy` trait + implementations, `BackoffCalculator` with jitter + Retry-After | 2 days |
++| **2** | Wire into `DatabricksHttpClient`: per-category config map, update `execute_impl`, overall timeout, structured logging | 1 day |
++| **3** | Update all `SeaClient` methods to pass `RequestType`, update `ChunkDownloader` and auth providers | 1 day |
++| **4** | ADBC connection parameter parsing for global + per-category retry config | 1 day |
++| **5** | CloudFetch integration: align `StreamingProvider` retry with HTTP-level retry, 403 handling | 1 day |
++| **6** | Tests: unit tests for RequestType mapping, strategies, backoff; integration tests with mock server | 2 days |
++| **Total** | | **8 days** |
++
++## Alternatives Considered
++
++### Option 1: Retry middleware via `reqwest-retry` crate
++
++**Pros:** Less custom code, well-tested crate.
++**Cons:** Doesn't support idempotency-aware strategies, can't integrate `Retry-After` header with per-category config, adds external dependency. The spec requires nuanced behavior (non-idempotent strategy, overall timeout, override codes) that middleware crates don't support out of the box.
++
++### Option 2: Retry at the `SeaClient` layer instead of HTTP layer
++
++**Pros:** Full context about the request semantics.
++**Cons:** Duplicates retry loops across every method. The HTTP client is the natural place for HTTP-level retry logic. CloudFetch and Auth also need retries, and those don't go through `SeaClient`.
++
++### Option 3: Single retry strategy for all requests
++
++**Pros:** Simpler implementation.
++**Cons:** Violates the spec. Retrying a non-idempotent `ExecuteStatement` after a timeout could cause duplicate query execution, leading to duplicate writes or incorrect billing.
++
++### Option 4: Callers pass `(RetryConfig, RequestIdempotency)` directly
++
++**Pros:** HTTP client stays generic, no domain knowledge.
++**Cons:** Verbose call sites (two extra params on every call), callers can misclassify requests (e.g., accidentally marking `ExecuteStatement` as idempotent), retry config must be threaded through every layer.
++
++**Chosen approach:** `RequestType` enum passed by callers, with centralized mapping to `(RequestCategory, RequestIdempotency)` inside the HTTP client. The HTTP client holds per-category retry configs and resolves everything from the single `RequestType` value. This keeps the caller API simple, prevents misclassification, and centralizes all retry policy decisions.
\ No newline at end of file

Reproduce locally: git range-diff f69804a..dbcd69a f69804a..5095574 | Disable: git config gitstack.push-range-diff false

@vikrantpuppala
Copy link
Copy Markdown
Collaborator Author

Range-diff: main (5095574 -> d194c2f)
rust/docs/designs/PECOBLR-2091-retry-logic-design.md
@@ -297,7 +297,7 @@
 +
 +Retries **only** when the request provably did not reach the server, or the server explicitly signals retry:
 +
-+**Retryable HTTP codes:** 429, 503 (configurable via `override_retryable_codes`)
++**Retryable HTTP codes:** 429, 503 — **only when the server sends a `Retry-After` header** (configurable via `override_retryable_codes`). Without `Retry-After`, the server has not explicitly signaled that it is safe to retry, so the request is not retried. This follows Mehmet's review feedback on the spec.
 +
 +**Retryable errors (connection-level only):**
 +- `error.is_connect()` — connection refused, DNS failure, no route to host
@@ -306,6 +306,7 @@
 +- `error.is_timeout()` — request may have reached the server
 +- `error.is_request()` — request was partially sent
 +- Any other network error
++- 429/503 **without** `Retry-After` header
 +
 +This is the critical safety boundary: a non-idempotent request that may have reached the server must not be retried, as it could cause duplicate writes or side effects.
 +
rust/src/client/http.rs
@@ -233,7 +233,11 @@
 +                        .and_then(|v| v.to_str().ok())
 +                        .map(|s| s.to_string());
 +
-+                    if strategy.is_retryable_status(status) && attempts < max_attempts {
++                    let has_retry_after = retry_after.is_some();
++
++                    if strategy.is_retryable_status(status, has_retry_after)
++                        && attempts < max_attempts
++                    {
 +                        let elapsed = start_time.elapsed();
 +                        let wait = calculate_backoff(config, attempts, retry_after.as_deref());
 +
@@ -779,18 +783,18 @@
 +        }
 +
 +        #[tokio::test]
-+        async fn test_non_idempotent_retries_on_429() {
++        async fn test_non_idempotent_retries_on_429_with_retry_after() {
 +            let mock_server = MockServer::start().await;
 +            let attempt_count = Arc::new(AtomicU32::new(0));
 +            let counter = attempt_count.clone();
 +
-+            // Return 429 once, then 200
++            // Return 429 with Retry-After once, then 200
 +            Mock::given(method("POST"))
 +                .and(path("/test"))
 +                .respond_with(move |_req: &wiremock::Request| {
 +                    let n = counter.fetch_add(1, Ordering::SeqCst);
 +                    if n < 1 {
-+                        ResponseTemplate::new(429)
++                        ResponseTemplate::new(429).append_header("Retry-After", "0")
 +                    } else {
 +                        ResponseTemplate::new(200).set_body_string("ok")
 +                    }
@@ -812,18 +816,48 @@
 +        }
 +
 +        #[tokio::test]
-+        async fn test_non_idempotent_retries_on_503() {
++        async fn test_non_idempotent_no_retry_on_429_without_retry_after() {
++            let mock_server = MockServer::start().await;
++            let attempt_count = Arc::new(AtomicU32::new(0));
++            let counter = attempt_count.clone();
++
++            // Return 429 WITHOUT Retry-After — should NOT retry for non-idempotent
++            Mock::given(method("POST"))
++                .and(path("/test"))
++                .respond_with(move |_req: &wiremock::Request| {
++                    counter.fetch_add(1, Ordering::SeqCst);
++                    ResponseTemplate::new(429)
++                })
++                .mount(&mock_server)
++                .await;
++
++            let client = test_client(&mock_server, fast_retry_config(3));
++            let request = client
++                .inner()
++                .post(format!("{}/test", mock_server.uri()))
++                .body("sql query")
++                .build()
++                .unwrap();
++
++            let result = client.execute(request, RequestType::ExecuteStatement).await;
++            assert!(result.is_err());
++            // No Retry-After → not retried for non-idempotent — only 1 attempt
++            assert_eq!(attempt_count.load(Ordering::SeqCst), 1);
++        }
++
++        #[tokio::test]
++        async fn test_non_idempotent_retries_on_503_with_retry_after() {
 +            let mock_server = MockServer::start().await;
 +            let attempt_count = Arc::new(AtomicU32::new(0));
 +            let counter = attempt_count.clone();
 +
-+            // Return 503 twice, then 200
++            // Return 503 with Retry-After twice, then 200
 +            Mock::given(method("POST"))
 +                .and(path("/test"))
 +                .respond_with(move |_req: &wiremock::Request| {
 +                    let n = counter.fetch_add(1, Ordering::SeqCst);
 +                    if n < 2 {
-+                        ResponseTemplate::new(503)
++                        ResponseTemplate::new(503).append_header("Retry-After", "0")
 +                    } else {
 +                        ResponseTemplate::new(200).set_body_string("ok")
 +                    }
rust/src/client/retry.rs
@@ -212,10 +212,13 @@
 +    }
 +
 +    /// Whether the given HTTP status code is retryable.
-+    pub fn is_retryable_status(&self, status: StatusCode) -> bool {
++    ///
++    /// For non-idempotent requests, 429/503 are only retryable when the server
++    /// sends a `Retry-After` header — this is the explicit signal that it is
++    /// safe to retry (per Mehmet's review feedback on the spec).
++    pub fn is_retryable_status(&self, status: StatusCode, has_retry_after: bool) -> bool {
 +        let code = status.as_u16();
 +
-+        // If override codes are set, use them as the exhaustive retryable set
 +        // Success codes are never retryable (the caller checks success first,
 +        // but guard here for safety if the method is called directly)
 +        if status.is_success() || status.is_redirection() {
@@ -229,8 +232,11 @@
 +        match self.idempotency {
 +            // Idempotent: retry everything except known non-retryable codes
 +            RequestIdempotency::Idempotent => !IDEMPOTENT_NON_RETRYABLE.contains(&code),
-+            // Non-idempotent: retry only explicitly retryable codes
-+            RequestIdempotency::NonIdempotent => NON_IDEMPOTENT_RETRYABLE.contains(&code),
++            // Non-idempotent: retry only when the server explicitly signals retry
++            // via Retry-After header on 429/503
++            RequestIdempotency::NonIdempotent => {
++                NON_IDEMPOTENT_RETRYABLE.contains(&code) && has_retry_after
++            }
 +        }
 +    }
 +
@@ -395,10 +401,11 @@
 +            idempotency: RequestIdempotency::Idempotent,
 +            override_codes: None,
 +        };
-+        assert!(strategy.is_retryable_status(StatusCode::INTERNAL_SERVER_ERROR));
-+        assert!(strategy.is_retryable_status(StatusCode::BAD_GATEWAY));
-+        assert!(strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE));
-+        assert!(strategy.is_retryable_status(StatusCode::GATEWAY_TIMEOUT));
++        // Idempotent: Retry-After presence doesn't matter
++        assert!(strategy.is_retryable_status(StatusCode::INTERNAL_SERVER_ERROR, false));
++        assert!(strategy.is_retryable_status(StatusCode::BAD_GATEWAY, false));
++        assert!(strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE, false));
++        assert!(strategy.is_retryable_status(StatusCode::GATEWAY_TIMEOUT, false));
 +    }
 +
 +    #[test]
@@ -407,7 +414,7 @@
 +            idempotency: RequestIdempotency::Idempotent,
 +            override_codes: None,
 +        };
-+        assert!(strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS));
++        assert!(strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS, false));
 +    }
 +
 +    #[test]
@@ -416,13 +423,13 @@
 +            idempotency: RequestIdempotency::Idempotent,
 +            override_codes: None,
 +        };
-+        assert!(!strategy.is_retryable_status(StatusCode::BAD_REQUEST));
-+        assert!(!strategy.is_retryable_status(StatusCode::UNAUTHORIZED));
-+        assert!(!strategy.is_retryable_status(StatusCode::FORBIDDEN));
-+        assert!(!strategy.is_retryable_status(StatusCode::NOT_FOUND));
-+        assert!(!strategy.is_retryable_status(StatusCode::METHOD_NOT_ALLOWED));
-+        assert!(!strategy.is_retryable_status(StatusCode::CONFLICT));
-+        assert!(!strategy.is_retryable_status(StatusCode::GONE));
++        assert!(!strategy.is_retryable_status(StatusCode::BAD_REQUEST, false));
++        assert!(!strategy.is_retryable_status(StatusCode::UNAUTHORIZED, false));
++        assert!(!strategy.is_retryable_status(StatusCode::FORBIDDEN, false));
++        assert!(!strategy.is_retryable_status(StatusCode::NOT_FOUND, false));
++        assert!(!strategy.is_retryable_status(StatusCode::METHOD_NOT_ALLOWED, false));
++        assert!(!strategy.is_retryable_status(StatusCode::CONFLICT, false));
++        assert!(!strategy.is_retryable_status(StatusCode::GONE, false));
 +    }
 +
 +    #[test]
@@ -432,9 +439,9 @@
 +            override_codes: None,
 +        };
 +        // 2xx and 3xx codes are never retryable
-+        assert!(!strategy.is_retryable_status(StatusCode::OK));
-+        assert!(!strategy.is_retryable_status(StatusCode::CREATED));
-+        assert!(!strategy.is_retryable_status(StatusCode::MOVED_PERMANENTLY));
++        assert!(!strategy.is_retryable_status(StatusCode::OK, false));
++        assert!(!strategy.is_retryable_status(StatusCode::CREATED, false));
++        assert!(!strategy.is_retryable_status(StatusCode::MOVED_PERMANENTLY, false));
 +    }
 +
 +    #[test]
@@ -446,33 +453,46 @@
 +            override_codes: Some(override_codes),
 +        };
 +        // Only 503 is retryable now
-+        assert!(strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE));
-+        assert!(!strategy.is_retryable_status(StatusCode::INTERNAL_SERVER_ERROR));
-+        assert!(!strategy.is_retryable_status(StatusCode::BAD_GATEWAY));
-+        assert!(!strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS));
++        assert!(strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE, false));
++        assert!(!strategy.is_retryable_status(StatusCode::INTERNAL_SERVER_ERROR, false));
++        assert!(!strategy.is_retryable_status(StatusCode::BAD_GATEWAY, false));
++        assert!(!strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS, false));
 +    }
 +
 +    // --- Non-idempotent strategy tests ---
 +
 +    #[test]
-+    fn test_non_idempotent_strategy_retries_429_503() {
++    fn test_non_idempotent_strategy_retries_429_503_with_retry_after() {
 +        let strategy = RetryStrategy {
 +            idempotency: RequestIdempotency::NonIdempotent,
 +            override_codes: None,
 +        };
-+        assert!(strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS));
-+        assert!(strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE));
++        // With Retry-After header: 429/503 are retryable
++        assert!(strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS, true));
++        assert!(strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE, true));
 +    }
 +
 +    #[test]
++    fn test_non_idempotent_strategy_no_retry_429_503_without_retry_after() {
++        let strategy = RetryStrategy {
++            idempotency: RequestIdempotency::NonIdempotent,
++            override_codes: None,
++        };
++        // Without Retry-After header: 429/503 are NOT retryable for non-idempotent
++        assert!(!strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS, false));
++        assert!(!strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE, false));
++    }
++
++    #[test]
 +    fn test_non_idempotent_strategy_no_retry_on_5xx() {
 +        let strategy = RetryStrategy {
 +            idempotency: RequestIdempotency::NonIdempotent,
 +            override_codes: None,
 +        };
-+        assert!(!strategy.is_retryable_status(StatusCode::INTERNAL_SERVER_ERROR));
-+        assert!(!strategy.is_retryable_status(StatusCode::BAD_GATEWAY));
-+        assert!(!strategy.is_retryable_status(StatusCode::GATEWAY_TIMEOUT));
++        // Even with Retry-After, 500/502/504 are not in the retryable set
++        assert!(!strategy.is_retryable_status(StatusCode::INTERNAL_SERVER_ERROR, true));
++        assert!(!strategy.is_retryable_status(StatusCode::BAD_GATEWAY, true));
++        assert!(!strategy.is_retryable_status(StatusCode::GATEWAY_TIMEOUT, true));
 +    }
 +
 +    #[test]
@@ -481,10 +501,10 @@
 +            idempotency: RequestIdempotency::NonIdempotent,
 +            override_codes: None,
 +        };
-+        assert!(!strategy.is_retryable_status(StatusCode::BAD_REQUEST));
-+        assert!(!strategy.is_retryable_status(StatusCode::UNAUTHORIZED));
-+        assert!(!strategy.is_retryable_status(StatusCode::FORBIDDEN));
-+        assert!(!strategy.is_retryable_status(StatusCode::NOT_FOUND));
++        assert!(!strategy.is_retryable_status(StatusCode::BAD_REQUEST, true));
++        assert!(!strategy.is_retryable_status(StatusCode::UNAUTHORIZED, true));
++        assert!(!strategy.is_retryable_status(StatusCode::FORBIDDEN, true));
++        assert!(!strategy.is_retryable_status(StatusCode::NOT_FOUND, true));
 +    }
 +
 +    // --- Backoff calculation tests ---

Reproduce locally: git range-diff f69804a..5095574 f69804a..d194c2f | Disable: git config gitstack.push-range-diff false

@vikrantpuppala vikrantpuppala force-pushed the stack/PECOBLR-2091-retry-logic-impl branch from d194c2f to de83d2e Compare March 25, 2026 13:22
@vikrantpuppala
Copy link
Copy Markdown
Collaborator Author

Range-diff: main (d194c2f -> de83d2e)
rust/src/client/sea.rs
@@ -38,15 +38,20 @@
          })
      }
 +
-+    /// Execute a metadata query (SHOW commands) using `RequestType::ExecuteMetadataQuery`.
-+    async fn execute_metadata_query(&self, session_id: &str, sql: &str) -> Result<ExecuteResult> {
++    /// Execute a SQL statement end-to-end: call API → poll → create reader.
++    ///
++    /// Shared implementation for both user queries (`ExecuteStatement`) and
++    /// metadata queries (`ExecuteMetadataQuery`). The `request_type` controls
++    /// the retry strategy (idempotent vs non-idempotent).
++    async fn execute_and_read(
++        &self,
++        session_id: &str,
++        sql: &str,
++        params: &ExecuteParams,
++        request_type: RequestType,
++    ) -> Result<ExecuteResult> {
 +        let response = self
-+            .call_execute_api(
-+                session_id,
-+                sql,
-+                &ExecuteParams::default(),
-+                RequestType::ExecuteMetadataQuery,
-+            )
++            .call_execute_api(session_id, sql, params, request_type)
 +            .await?;
 +        let response = self.wait_for_completion(response).await?;
 +        let reader_factory = self.reader_factory()?;
@@ -82,16 +87,29 @@
  
          debug!("Deleted session: {}", session_id);
  
+         sql: &str,
          params: &ExecuteParams,
      ) -> Result<ExecuteResult> {
-         // 1. Call SEA API
+-        // 1. Call SEA API
 -        let response = self.call_execute_api(session_id, sql, params).await?;
-+        let response = self
-+            .call_execute_api(session_id, sql, params, RequestType::ExecuteStatement)
-+            .await?;
+-
+-        // 2. Poll until completion
+-        let response = self.wait_for_completion(response).await?;
+-
+-        // 3. Create appropriate reader
+-        let reader_factory = self.reader_factory()?;
+-        let reader = reader_factory.create_reader(&response.statement_id, &response)?;
+-
+-        Ok(ExecuteResult {
+-            statement_id: response.statement_id,
+-            reader,
+-            manifest: response.manifest,
+-        })
++        self.execute_and_read(session_id, sql, params, RequestType::ExecuteStatement)
++            .await
+     }
  
-         // 2. Poll until completion
-         let response = self.wait_for_completion(response).await?;
+     async fn get_result_chunks(
                  DatabricksErrorHelper::io().message(format!("Failed to build request: {}", e))
              })?;
  
@@ -129,7 +147,13 @@
          debug!("list_catalogs: {}", sql);
 -        self.execute_statement(session_id, &sql, &ExecuteParams::default())
 -            .await
-+        self.execute_metadata_query(session_id, &sql).await
++        self.execute_and_read(
++            session_id,
++            &sql,
++            &ExecuteParams::default(),
++            RequestType::ExecuteMetadataQuery,
++        )
++        .await
      }
  
      async fn list_schemas(
@@ -138,7 +162,13 @@
          debug!("list_schemas: {}", sql);
 -        self.execute_statement(session_id, &sql, &ExecuteParams::default())
 -            .await
-+        self.execute_metadata_query(session_id, &sql).await
++        self.execute_and_read(
++            session_id,
++            &sql,
++            &ExecuteParams::default(),
++            RequestType::ExecuteMetadataQuery,
++        )
++        .await
      }
  
      async fn list_tables(
@@ -147,7 +177,13 @@
          debug!("list_tables: {}", sql);
 -        self.execute_statement(session_id, &sql, &ExecuteParams::default())
 -            .await
-+        self.execute_metadata_query(session_id, &sql).await
++        self.execute_and_read(
++            session_id,
++            &sql,
++            &ExecuteParams::default(),
++            RequestType::ExecuteMetadataQuery,
++        )
++        .await
      }
  
      async fn list_columns(
@@ -156,7 +192,13 @@
          debug!("list_columns: {}", sql);
 -        self.execute_statement(session_id, &sql, &ExecuteParams::default())
 -            .await
-+        self.execute_metadata_query(session_id, &sql).await
++        self.execute_and_read(
++            session_id,
++            &sql,
++            &ExecuteParams::default(),
++            RequestType::ExecuteMetadataQuery,
++        )
++        .await
      }
  
      async fn list_procedures(
@@ -165,7 +207,13 @@
          debug!("list_procedures: {}", sql);
 -        self.execute_statement(session_id, &sql, &ExecuteParams::default())
 -            .await
-+        self.execute_metadata_query(session_id, &sql).await
++        self.execute_and_read(
++            session_id,
++            &sql,
++            &ExecuteParams::default(),
++            RequestType::ExecuteMetadataQuery,
++        )
++        .await
      }
  
      async fn list_procedure_columns(
@@ -174,7 +222,13 @@
          debug!("list_procedure_columns: {}", sql);
 -        self.execute_statement(session_id, &sql, &ExecuteParams::default())
 -            .await
-+        self.execute_metadata_query(session_id, &sql).await
++        self.execute_and_read(
++            session_id,
++            &sql,
++            &ExecuteParams::default(),
++            RequestType::ExecuteMetadataQuery,
++        )
++        .await
      }
  
      fn list_table_types(&self) -> Vec<String> {

Reproduce locally: git range-diff f69804a..d194c2f f69804a..de83d2e | Disable: git config gitstack.push-range-diff false


let http_client = Arc::new(
DatabricksHttpClient::new(HttpClientConfig::default())
DatabricksHttpClient::with_default_retry(HttpClientConfig::default())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may need to have different retry configurations for Auth and APIs

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added full config wiring in this PR. The infrastructure already supported per-category configs via build_retry_configs(global, overrides), and now it's exposed through ADBC options:

Global options:

  • databricks.retry.min_wait_ms (default: 1000)
  • databricks.retry.max_wait_ms (default: 60000)
  • databricks.retry.overall_timeout_ms (default: 900000)
  • databricks.retry.max_retries (default: 5)

Per-category overrides (inherit from global when unset):

  • databricks.retry.{sea,auth,cloudfetch}.min_wait_ms
  • databricks.retry.{sea,auth,cloudfetch}.max_wait_ms
  • databricks.retry.{sea,auth,cloudfetch}.overall_timeout_ms
  • databricks.retry.{sea,auth,cloudfetch}.max_retries

Auth category defaults to overall_timeout=30s, max_retries=3 (see next comment).

- Requests classified as **idempotent** or **non-idempotent** with distinct retry strategies
- `Retry-After` header honored, with min/max clamping
- Exponential backoff with random jitter (50ms–750ms)
- Cumulative timeout (default 900s) in addition to per-attempt count
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be applicable to all requests? 900s is too high for requests like OAuth, and even create connection

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — 900s is way too long for auth/session creation. Added per-category defaults:

  • Auth (AuthTokenRequest, AuthDiscovery): overall_timeout=30s, max_retries=3
  • Sea (CreateSession, ExecuteStatement, etc.): inherits global 900s/5 retries
  • CloudFetch: inherits global 900s/5 retries

The Auth default is set in Database::default() so it applies out of the box. Users can further customize via databricks.retry.auth.overall_timeout_ms etc.

CreateSession/CloseSession are in the Sea category along with query execution. We could split them into their own category if needed, but for now they share the same retry config. Let me know if you think session management should have its own timeout.

// Non-idempotent: retry only when the server explicitly signals retry
// via Retry-After header on 429/503
RequestIdempotency::NonIdempotent => {
NON_IDEMPOTENT_RETRYABLE.contains(&code) && has_retry_after
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that means even for 429, if there is no retry_after header, we won't retry with exponential backoff

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional per Mehmet's review feedback on the design doc — for non-idempotent requests (ExecuteStatement), we only retry 429/503 when the server explicitly signals it via Retry-After. Without the header, there's ambiguity about whether the server started processing the request before returning the error.

That said, I think there's a reasonable argument that 429 specifically is always safe to retry (the request was rate-limited, not executed). We could treat 429 as unconditionally retryable for non-idempotent while keeping 503 gated on Retry-After. Let me check with Mehmet on whether he'd be OK with that refinement.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated — 429 is now unconditionally retryable for non-idempotent requests (rate-limiting means the request was rejected before execution). Only 503 requires Retry-After header for non-idempotent, since 503 is ambiguous about whether processing started.

Tests updated:

  • test_non_idempotent_strategy_429_always_retryable — verifies 429 retries with and without Retry-After
  • test_non_idempotent_strategy_503_requires_retry_after — verifies 503 only retries with header
  • test_non_idempotent_retries_on_429_without_retry_after (wiremock) — E2E proof
  • test_non_idempotent_no_retry_on_503_without_retry_after (wiremock) — E2E proof

};

// Add random jitter between 50ms and 750ms
let jitter_ms = rand::rng().random_range(50..=750);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a proportional jitter? So, that it remains proportional to the backoff?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. The spec defines jitter as random(50ms, 750ms) (fixed range), so the current implementation follows that. At higher backoff values (32s, 60s), a fixed 50-750ms jitter becomes negligible and doesn't meaningfully spread retries across clients.

Proportional jitter (e.g., 5-25% of base backoff) would scale better:

  • Attempt 1 (2s base): 100-500ms jitter
  • Attempt 3 (8s base): 400ms-2s jitter
  • Attempt 5 (32s base): 1.6-8s jitter

I'd prefer to keep the spec-defined fixed jitter for now to stay aligned with other connectors, and revisit proportional jitter as a follow-up if we see thundering-herd issues in practice. Does that work for you?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — switched to proportional jitter: random 5-25% of base wait, with a 50ms floor.

This scales naturally with backoff:

  • Attempt 1 (2s base): 100-500ms jitter
  • Attempt 3 (8s base): 400-2000ms jitter
  • Attempt 5 (32s base): 1.6-8s jitter

Tests:

  • test_proportional_jitter — verifies jitter scales with base at both low and high attempts
  • test_jitter_minimum_50ms — verifies 50ms floor for very small base waits
  • All timing assertions updated for proportional ranges

@vikrantpuppala vikrantpuppala force-pushed the stack/PECOBLR-2091-retry-logic-impl branch from de83d2e to c2cd59b Compare April 1, 2026 07:08
@vikrantpuppala
Copy link
Copy Markdown
Collaborator Author

Range-diff: main (de83d2e -> c2cd59b)
rust/src/database.rs
@@ -4,7 +4,9 @@
  
  use crate::auth::config::{AuthConfig, AuthType};
  use crate::auth::{AuthProvider, AuthorizationCodeProvider, PersonalAccessToken};
-+use crate::client::retry::{build_retry_configs, RetryConfig};
++use crate::client::retry::{
++    build_retry_configs, RequestCategory, RetryConfig, RetryConfigOverrides,
++};
  use crate::client::{
      DatabricksClient, DatabricksClientConfig, DatabricksHttpClient, HttpClientConfig, SeaClient,
  };
@@ -15,31 +17,358 @@
  use std::sync::Arc;
  use std::time::Duration;
  
+ /// A Database is created from a Driver and is used to establish Connections.
+ /// Configuration options like host, credentials, and HTTP path are set on
+ /// the Database before creating connections.
+-#[derive(Debug, Default)]
++#[derive(Debug)]
+ pub struct Database {
+     // Core configuration
+     uri: Option<String>,
+ 
+     // Authentication configuration
+     auth_config: AuthConfig,
++
++    // Retry configuration
++    retry_config: RetryConfig,
++    retry_overrides: HashMap<RequestCategory, RetryConfigOverrides>,
++}
++
++impl Default for Database {
++    fn default() -> Self {
++        let mut retry_overrides = HashMap::new();
++        // Auth requests should fail fast — shorter timeout and fewer retries
++        // than the 900s/5-retry global default used for query execution.
++        retry_overrides.insert(
++            RequestCategory::Auth,
++            RetryConfigOverrides {
++                overall_timeout: Some(Duration::from_secs(30)),
++                max_retries: Some(3),
++                ..Default::default()
++            },
++        );
++        Self {
++            uri: None,
++            warehouse_id: None,
++            access_token: None,
++            catalog: None,
++            schema: None,
++            http_config: HttpClientConfig::default(),
++            cloudfetch_config: CloudFetchConfig::default(),
++            log_level: None,
++            log_file: None,
++            auth_config: AuthConfig::default(),
++            retry_config: RetryConfig::default(),
++            retry_overrides,
++        }
++    }
+ }
+ 
+ impl Database {
+         }
+     }
+ 
++    /// Parse a per-category retry option like `databricks.retry.auth.max_retries`.
++    fn set_retry_category_option(
++        &mut self,
++        option_name: &str,
++        key: &OptionDatabase,
++        value: OptionValue,
++    ) -> Result<()> {
++        // Parse "databricks.retry.<category>.<field>"
++        let rest = option_name
++            .strip_prefix("databricks.retry.")
++            .expect("caller verified prefix");
++        let (cat_str, field) = rest
++            .split_once('.')
++            .ok_or_else(|| DatabricksErrorHelper::set_unknown_option(key).to_adbc())?;
++
++        let category = match cat_str {
++            "sea" => RequestCategory::Sea,
++            "auth" => RequestCategory::Auth,
++            "cloudfetch" => RequestCategory::CloudFetch,
++            _ => return Err(DatabricksErrorHelper::set_unknown_option(key).to_adbc()),
++        };
++
++        let ovr = self.retry_overrides.entry(category).or_default();
++
++        match field {
++            "min_wait_ms" => {
++                if let Some(v) = Self::parse_int_option(&value) {
++                    ovr.min_wait = Some(Duration::from_millis(v as u64));
++                    Ok(())
++                } else {
++                    Err(DatabricksErrorHelper::set_invalid_option(key, &value).to_adbc())
++                }
++            }
++            "max_wait_ms" => {
++                if let Some(v) = Self::parse_int_option(&value) {
++                    ovr.max_wait = Some(Duration::from_millis(v as u64));
++                    Ok(())
++                } else {
++                    Err(DatabricksErrorHelper::set_invalid_option(key, &value).to_adbc())
++                }
++            }
++            "overall_timeout_ms" => {
++                if let Some(v) = Self::parse_int_option(&value) {
++                    ovr.overall_timeout = Some(Duration::from_millis(v as u64));
++                    Ok(())
++                } else {
++                    Err(DatabricksErrorHelper::set_invalid_option(key, &value).to_adbc())
++                }
++            }
++            "max_retries" => {
++                if let Some(v) = Self::parse_int_option(&value) {
++                    ovr.max_retries = Some(v as u32);
++                    Ok(())
++                } else {
++                    Err(DatabricksErrorHelper::set_invalid_option(key, &value).to_adbc())
++                }
++            }
++            _ => Err(DatabricksErrorHelper::set_unknown_option(key).to_adbc()),
++        }
++    }
++
+     /// Parse a float option value.
+     fn parse_float_option(value: &OptionValue) -> Option<f64> {
+         match value {
+                     }
+                 }
+ 
+-                // HTTP client options
+-                "databricks.http.connect_timeout_ms" => {
++                // Retry configuration — global defaults
++                "databricks.retry.min_wait_ms" => {
+                     if let Some(v) = Self::parse_int_option(&value) {
+-                        self.http_config.connect_timeout = Duration::from_millis(v as u64);
++                        self.retry_config.min_wait = Duration::from_millis(v as u64);
+                         Ok(())
+                     } else {
                          Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc())
                      }
                  }
+-                "databricks.http.read_timeout_ms" => {
++                "databricks.retry.max_wait_ms" => {
+                     if let Some(v) = Self::parse_int_option(&value) {
+-                        self.http_config.read_timeout = Duration::from_millis(v as u64);
++                        self.retry_config.max_wait = Duration::from_millis(v as u64);
+                         Ok(())
+                     } else {
+                         Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc())
+                     }
+                 }
 -                "databricks.http.max_retries" => {
--                    if let Some(v) = Self::parse_int_option(&value) {
++                "databricks.retry.overall_timeout_ms" => {
+                     if let Some(v) = Self::parse_int_option(&value) {
 -                        self.http_config.max_retries = v as u32;
--                        Ok(())
--                    } else {
--                        Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc())
--                    }
--                }
--
++                        self.retry_config.overall_timeout = Duration::from_millis(v as u64);
++                        Ok(())
++                    } else {
++                        Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc())
++                    }
++                }
++                "databricks.retry.max_retries" => {
++                    if let Some(v) = Self::parse_int_option(&value) {
++                        self.retry_config.max_retries = v as u32;
+                         Ok(())
+                     } else {
+                         Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc())
+                     }
+                 }
+ 
++                // Retry configuration — per-category overrides
++                s if s.starts_with("databricks.retry.sea.")
++                    || s.starts_with("databricks.retry.auth.")
++                    || s.starts_with("databricks.retry.cloudfetch.") =>
++                {
++                    self.set_retry_category_option(s, &key, value)
++                }
++
++                // HTTP client options
++                "databricks.http.connect_timeout_ms" => {
++                    if let Some(v) = Self::parse_int_option(&value) {
++                        self.http_config.connect_timeout = Duration::from_millis(v as u64);
++                        Ok(())
++                    } else {
++                        Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc())
++                    }
++                }
++                "databricks.http.read_timeout_ms" => {
++                    if let Some(v) = Self::parse_int_option(&value) {
++                        self.http_config.read_timeout = Duration::from_millis(v as u64);
++                        Ok(())
++                    } else {
++                        Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc())
++                    }
++                }
                  _ => Err(DatabricksErrorHelper::set_unknown_option(&key).to_adbc()),
              },
              _ => Err(DatabricksErrorHelper::set_unknown_option(&key).to_adbc()),
+                             .to_adbc()
+                     })
+                     .map(|p| p as i64),
++                "databricks.retry.min_wait_ms" => Ok(self.retry_config.min_wait.as_millis() as i64),
++                "databricks.retry.max_wait_ms" => Ok(self.retry_config.max_wait.as_millis() as i64),
++                "databricks.retry.overall_timeout_ms" => {
++                    Ok(self.retry_config.overall_timeout.as_millis() as i64)
++                }
++                "databricks.retry.max_retries" => Ok(self.retry_config.max_retries as i64),
+                 _ => Err(DatabricksErrorHelper::get_unknown_option(&key).to_adbc()),
+             },
+             _ => Err(DatabricksErrorHelper::get_unknown_option(&key).to_adbc()),
          let access_token = self.access_token.as_ref();
  
          // Create HTTP client (without auth provider - two-phase initialization)
 -        let http_client =
 -            Arc::new(DatabricksHttpClient::new(self.http_config.clone()).map_err(|e| e.to_adbc())?);
-+        let retry_configs = build_retry_configs(&RetryConfig::default(), &HashMap::new());
++        let retry_configs = build_retry_configs(&self.retry_config, &self.retry_overrides);
 +        let http_client = Arc::new(
 +            DatabricksHttpClient::new(self.http_config.clone(), retry_configs)
 +                .map_err(|e| e.to_adbc())?,
 +        );
  
          // Create tokio runtime for async operations (needed before auth provider creation for U2M)
-         let runtime = tokio::runtime::Runtime::new().map_err(|e| {
\ No newline at end of file
+         let runtime = tokio::runtime::Runtime::new().map_err(|e| {
+             Some("https://custom.endpoint/token".to_string())
+         );
+     }
++
++    #[test]
++    fn test_database_retry_defaults() {
++        let db = Database::new();
++        assert_eq!(db.retry_config.min_wait, Duration::from_secs(1));
++        assert_eq!(db.retry_config.max_wait, Duration::from_secs(60));
++        assert_eq!(db.retry_config.overall_timeout, Duration::from_secs(900));
++        assert_eq!(db.retry_config.max_retries, 5);
++
++        // Auth should have shorter defaults
++        let auth_ovr = db.retry_overrides.get(&RequestCategory::Auth).unwrap();
++        assert_eq!(auth_ovr.overall_timeout, Some(Duration::from_secs(30)));
++        assert_eq!(auth_ovr.max_retries, Some(3));
++    }
++
++    #[test]
++    fn test_database_retry_global_options() {
++        let mut db = Database::new();
++        db.set_option(
++            OptionDatabase::Other("databricks.retry.min_wait_ms".into()),
++            OptionValue::Int(500),
++        )
++        .unwrap();
++        db.set_option(
++            OptionDatabase::Other("databricks.retry.max_wait_ms".into()),
++            OptionValue::Int(30000),
++        )
++        .unwrap();
++        db.set_option(
++            OptionDatabase::Other("databricks.retry.overall_timeout_ms".into()),
++            OptionValue::Int(60000),
++        )
++        .unwrap();
++        db.set_option(
++            OptionDatabase::Other("databricks.retry.max_retries".into()),
++            OptionValue::Int(10),
++        )
++        .unwrap();
++
++        assert_eq!(db.retry_config.min_wait, Duration::from_millis(500));
++        assert_eq!(db.retry_config.max_wait, Duration::from_millis(30000));
++        assert_eq!(
++            db.retry_config.overall_timeout,
++            Duration::from_millis(60000)
++        );
++        assert_eq!(db.retry_config.max_retries, 10);
++
++        // Round-trip via get_option_int
++        assert_eq!(
++            db.get_option_int(OptionDatabase::Other("databricks.retry.min_wait_ms".into()))
++                .unwrap(),
++            500
++        );
++        assert_eq!(
++            db.get_option_int(OptionDatabase::Other("databricks.retry.max_retries".into()))
++                .unwrap(),
++            10
++        );
++    }
++
++    #[test]
++    fn test_database_retry_category_overrides() {
++        let mut db = Database::new();
++        db.set_option(
++            OptionDatabase::Other("databricks.retry.auth.overall_timeout_ms".into()),
++            OptionValue::Int(15000),
++        )
++        .unwrap();
++        db.set_option(
++            OptionDatabase::Other("databricks.retry.auth.max_retries".into()),
++            OptionValue::Int(2),
++        )
++        .unwrap();
++        db.set_option(
++            OptionDatabase::Other("databricks.retry.sea.max_retries".into()),
++            OptionValue::Int(8),
++        )
++        .unwrap();
++        db.set_option(
++            OptionDatabase::Other("databricks.retry.cloudfetch.min_wait_ms".into()),
++            OptionValue::Int(200),
++        )
++        .unwrap();
++
++        let auth_ovr = db.retry_overrides.get(&RequestCategory::Auth).unwrap();
++        assert_eq!(auth_ovr.overall_timeout, Some(Duration::from_millis(15000)));
++        assert_eq!(auth_ovr.max_retries, Some(2));
++
++        let sea_ovr = db.retry_overrides.get(&RequestCategory::Sea).unwrap();
++        assert_eq!(sea_ovr.max_retries, Some(8));
++
++        let cf_ovr = db
++            .retry_overrides
++            .get(&RequestCategory::CloudFetch)
++            .unwrap();
++        assert_eq!(cf_ovr.min_wait, Some(Duration::from_millis(200)));
++    }
++
++    #[test]
++    fn test_database_retry_unknown_category_rejected() {
++        let mut db = Database::new();
++        let result = db.set_option(
++            OptionDatabase::Other("databricks.retry.unknown.max_retries".into()),
++            OptionValue::Int(5),
++        );
++        assert!(result.is_err());
++    }
++
++    #[test]
++    fn test_database_retry_unknown_field_rejected() {
++        let mut db = Database::new();
++        let result = db.set_option(
++            OptionDatabase::Other("databricks.retry.auth.unknown_field".into()),
++            OptionValue::Int(5),
++        );
++        assert!(result.is_err());
++    }
++
++    #[test]
++    fn test_database_retry_rejects_non_int() {
++        let mut db = Database::new();
++        let result = db.set_option(
++            OptionDatabase::Other("databricks.retry.max_retries".into()),
++            OptionValue::String("not_a_number".into()),
++        );
++        assert!(result.is_err());
++    }
++
++    #[test]
++    fn test_database_retry_string_int_accepted() {
++        let mut db = Database::new();
++        // String values that parse as integers should be accepted
++        db.set_option(
++            OptionDatabase::Other("databricks.retry.max_retries".into()),
++            OptionValue::String("7".into()),
++        )
++        .unwrap();
++        assert_eq!(db.retry_config.max_retries, 7);
++    }
+ }
\ No newline at end of file

Reproduce locally: git range-diff f69804a..de83d2e f69804a..c2cd59b | Disable: git config gitstack.push-range-diff false

Implements the standardized DBSQL connector retry specification with:

- RequestType enum as single source of truth for request classification
- Idempotent vs non-idempotent retry strategies with distinct behavior
- Exponential backoff with random jitter (50-750ms)
- Retry-After header support (seconds and HTTP-date formats)
- Cumulative overall timeout (default 900s)
- Per-category retry config (SEA, CloudFetch, Auth) with global defaults
- Structured DEBUG logging per attempt and per request summary
- Metadata queries (list_*) classified as idempotent via ExecuteMetadataQuery

Co-authored-by: Isaac
@vikrantpuppala vikrantpuppala force-pushed the stack/PECOBLR-2091-retry-logic-impl branch from c2cd59b to 7f47f62 Compare April 1, 2026 07:24
@vikrantpuppala
Copy link
Copy Markdown
Collaborator Author

Range-diff: main (c2cd59b -> 7f47f62)
rust/docs/designs/PECOBLR-2091-retry-logic-design.md
@@ -47,7 +47,7 @@
 +
 +- Requests classified as **idempotent** or **non-idempotent** with distinct retry strategies
 +- `Retry-After` header honored, with min/max clamping
-+- Exponential backoff with random jitter (50ms–750ms)
++- Exponential backoff with proportional jitter (5–25% of base wait, minimum 50ms)
 +- Cumulative timeout (default 900s) in addition to per-attempt count
 +- Per-category configuration with global defaults
 +- Structured logging per attempt and per request summary
@@ -297,7 +297,9 @@
 +
 +Retries **only** when the request provably did not reach the server, or the server explicitly signals retry:
 +
-+**Retryable HTTP codes:** 429, 503 — **only when the server sends a `Retry-After` header** (configurable via `override_retryable_codes`). Without `Retry-After`, the server has not explicitly signaled that it is safe to retry, so the request is not retried. This follows Mehmet's review feedback on the spec.
++**Retryable HTTP codes:**
++- **429** (Too Many Requests) — always retryable. Rate-limiting means the request was rejected before execution.
++- **503** (Service Unavailable) — **only retryable when the server sends a `Retry-After` header**. Without it, there is ambiguity about whether the server started processing the request. Configurable via `override_retryable_codes`.
 +
 +**Retryable errors (connection-level only):**
 +- `error.is_connect()` — connection refused, DNS failure, no route to host
@@ -306,7 +308,7 @@
 +- `error.is_timeout()` — request may have reached the server
 +- `error.is_request()` — request was partially sent
 +- Any other network error
-+- 429/503 **without** `Retry-After` header
++- 503 **without** `Retry-After` header
 +
 +This is the critical safety boundary: a non-idempotent request that may have reached the server must not be retried, as it could cause duplicate writes or side effects.
 +
@@ -330,7 +332,7 @@
 +    exp_backoff = config.min_wait * 2^(attempt - 1)
 +    wait = min(exp_backoff, config.max_wait)
 +
-+jitter = random(50ms, 750ms)
++jitter = random(5%, 25%) * wait  // proportional, minimum 50ms
 +return wait + jitter
 +```
 +
@@ -641,7 +643,7 @@
 +
 +### Concurrent Retry Storms
 +
-+Multiple parallel CloudFetch downloads may all fail simultaneously (e.g., during a brief network blip). The jitter (50ms–750ms) helps spread out retry attempts. The existing `max_chunks_in_memory` backpressure limit bounds the number of concurrent retrying downloads.
++Multiple parallel CloudFetch downloads may all fail simultaneously (e.g., during a brief network blip). The proportional jitter (5–25% of base wait, minimum 50ms) helps spread out retry attempts. The existing `max_chunks_in_memory` backpressure limit bounds the number of concurrent retrying downloads.
 +
 +### Already-Closed Resources
 +
@@ -676,7 +678,8 @@
 +- `test_retry_after_header_seconds`
 +- `test_retry_after_header_http_date`
 +- `test_retry_after_clamped_to_min_max`
-+- `test_jitter_in_range_50ms_750ms`
++- `test_proportional_jitter`
++- `test_jitter_minimum_50ms`
 +
 +**Configuration:**
 +- `test_retry_config_defaults`
rust/src/client/http.rs
@@ -538,6 +538,54 @@
          assert!(client.is_ok());
      }
  
+     fn test_http_client_with_allow_self_signed() {
+         let mut config = HttpClientConfig::default();
+         config.tls.allow_self_signed = Some(true);
+-        let client = DatabricksHttpClient::new(config);
++        let client = DatabricksHttpClient::new(config, default_retry_configs());
+         assert!(client.is_ok());
+     }
+ 
+     fn test_http_client_with_hostname_mismatch_allowed() {
+         let mut config = HttpClientConfig::default();
+         config.tls.allow_hostname_mismatch = Some(true);
+-        let client = DatabricksHttpClient::new(config);
++        let client = DatabricksHttpClient::new(config, default_retry_configs());
+         assert!(client.is_ok());
+     }
+ 
+     fn test_http_client_with_tls_disabled() {
+         let mut config = HttpClientConfig::default();
+         config.tls.enabled = Some(false);
+-        let client = DatabricksHttpClient::new(config);
++        let client = DatabricksHttpClient::new(config, default_retry_configs());
+         assert!(client.is_ok());
+     }
+ 
+ 
+         let mut config = HttpClientConfig::default();
+         config.tls.trusted_certificate_path = Some(tmp.path().to_string_lossy().to_string());
+-        let client = DatabricksHttpClient::new(config);
++        let client = DatabricksHttpClient::new(config, default_retry_configs());
+         assert!(client.is_ok());
+     }
+ 
+     fn test_http_client_with_invalid_cert_path() {
+         let mut config = HttpClientConfig::default();
+         config.tls.trusted_certificate_path = Some("/nonexistent/path/cert.pem".to_string());
+-        let result = DatabricksHttpClient::new(config);
++        let result = DatabricksHttpClient::new(config, default_retry_configs());
+         assert!(result.is_err());
+         let err_msg = format!("{:?}", result.unwrap_err());
+         assert!(err_msg.contains("Failed to read TLS certificate"));
+         std::fs::write(tmp.path(), "not a valid PEM certificate").unwrap();
+         let mut config = HttpClientConfig::default();
+         config.tls.trusted_certificate_path = Some(tmp.path().to_string_lossy().to_string());
+-        let result = DatabricksHttpClient::new(config);
++        let result = DatabricksHttpClient::new(config, default_retry_configs());
+         assert!(result.is_err());
+         let err_msg = format!("{:?}", result.unwrap_err());
+         assert!(err_msg.contains("Invalid PEM certificate"));
          assert!(debug_output.contains("[REDACTED]"));
          assert!(!debug_output.contains("secret123"));
      }
@@ -783,18 +831,19 @@
 +        }
 +
 +        #[tokio::test]
-+        async fn test_non_idempotent_retries_on_429_with_retry_after() {
++        async fn test_non_idempotent_retries_on_429_without_retry_after() {
 +            let mock_server = MockServer::start().await;
 +            let attempt_count = Arc::new(AtomicU32::new(0));
 +            let counter = attempt_count.clone();
 +
-+            // Return 429 with Retry-After once, then 200
++            // 429 without Retry-After — still retried for non-idempotent
++            // (rate-limited means the request was not executed)
 +            Mock::given(method("POST"))
 +                .and(path("/test"))
 +                .respond_with(move |_req: &wiremock::Request| {
 +                    let n = counter.fetch_add(1, Ordering::SeqCst);
 +                    if n < 1 {
-+                        ResponseTemplate::new(429).append_header("Retry-After", "0")
++                        ResponseTemplate::new(429)
 +                    } else {
 +                        ResponseTemplate::new(200).set_body_string("ok")
 +                    }
@@ -816,17 +865,17 @@
 +        }
 +
 +        #[tokio::test]
-+        async fn test_non_idempotent_no_retry_on_429_without_retry_after() {
++        async fn test_non_idempotent_no_retry_on_503_without_retry_after() {
 +            let mock_server = MockServer::start().await;
 +            let attempt_count = Arc::new(AtomicU32::new(0));
 +            let counter = attempt_count.clone();
 +
-+            // Return 429 WITHOUT Retry-After — should NOT retry for non-idempotent
++            // 503 WITHOUT Retry-After — should NOT retry for non-idempotent
 +            Mock::given(method("POST"))
 +                .and(path("/test"))
 +                .respond_with(move |_req: &wiremock::Request| {
 +                    counter.fetch_add(1, Ordering::SeqCst);
-+                    ResponseTemplate::new(429)
++                    ResponseTemplate::new(503)
 +                })
 +                .mount(&mock_server)
 +                .await;
@@ -841,7 +890,7 @@
 +
 +            let result = client.execute(request, RequestType::ExecuteStatement).await;
 +            assert!(result.is_err());
-+            // No Retry-After → not retried for non-idempotent — only 1 attempt
++            // No Retry-After on 503 → not retried for non-idempotent — only 1 attempt
 +            assert_eq!(attempt_count.load(Ordering::SeqCst), 1);
 +        }
 +
@@ -1080,33 +1129,33 @@
 +            let ts = timestamps.lock().unwrap();
 +            assert_eq!(ts.len(), 4, "Expected 4 attempts (1 initial + 3 retries)");
 +
-+            // Gap between attempt 1 and 2: 2^1*100ms = 200ms + 50-750ms jitter = 250-950ms
++            // Proportional jitter: 5-25% of base (min 50ms)
++            // Gap 1→2: 200ms base, jitter 50-50ms (5%=10ms clamped to 50ms) → 250-300ms
 +            let gap1 = ts[1].duration_since(ts[0]);
 +            assert!(
-+                gap1 >= Duration::from_millis(200) && gap1 <= Duration::from_millis(1100),
-+                "Gap 1→2: {:?}, expected 250-1100ms",
++                gap1 >= Duration::from_millis(200) && gap1 <= Duration::from_millis(400),
++                "Gap 1→2: {:?}, expected 200-400ms",
 +                gap1
 +            );
 +
-+            // Gap between attempt 2 and 3: 2^2*100ms = 400ms + jitter = 450-1150ms
++            // Gap 2→3: 400ms base, jitter 50-100ms → 450-500ms
 +            let gap2 = ts[2].duration_since(ts[1]);
 +            assert!(
-+                gap2 >= Duration::from_millis(400) && gap2 <= Duration::from_millis(1300),
-+                "Gap 2→3: {:?}, expected 450-1300ms",
++                gap2 >= Duration::from_millis(400) && gap2 <= Duration::from_millis(600),
++                "Gap 2→3: {:?}, expected 400-600ms",
 +                gap2
 +            );
 +
-+            // Gap between attempt 3 and 4: 2^3*100ms = 800ms, capped to max_wait=500ms + jitter
++            // Gap 3→4: 800ms capped to 500ms base, jitter 50-125ms → 550-625ms
 +            let gap3 = ts[3].duration_since(ts[2]);
 +            assert!(
-+                gap3 >= Duration::from_millis(500) && gap3 <= Duration::from_millis(1400),
-+                "Gap 3→4: {:?}, expected 550-1400ms (capped at max_wait=500ms)",
++                gap3 >= Duration::from_millis(500) && gap3 <= Duration::from_millis(750),
++                "Gap 3→4: {:?}, expected 500-750ms (capped at max_wait=500ms)",
 +                gap3
 +            );
 +
-+            // Exponential growth is verified by the ranges above:
-+            // gap1 lower bound (200ms) < gap2 lower bound (400ms) < gap3 lower bound (500ms, capped)
-+            // Direct gap comparison is unreliable due to jitter (50-750ms) overlapping the base.
++            // Exponential growth verified: gap1 < gap2 < gap3
++            // Proportional jitter keeps ranges tight, so growth is clearly visible.
 +        }
 +
 +        #[tokio::test]
@@ -1154,11 +1203,11 @@
 +            let ts = timestamps.lock().unwrap();
 +            assert_eq!(ts.len(), 2);
 +
-+            // Gap should be ~1s (Retry-After) + 50-750ms jitter = 1050ms-1750ms
++            // Gap should be ~1s (Retry-After) + 5-25% jitter = 1050ms-1250ms
 +            let gap = ts[1].duration_since(ts[0]);
 +            assert!(
-+                gap >= Duration::from_millis(1000) && gap <= Duration::from_millis(2000),
-+                "Gap with Retry-After:1: {:?}, expected 1000-2000ms",
++                gap >= Duration::from_millis(1000) && gap <= Duration::from_millis(1500),
++                "Gap with Retry-After:1: {:?}, expected 1050-1500ms",
 +                gap
 +            );
 +        }
@@ -1207,12 +1256,12 @@
 +            let ts = timestamps.lock().unwrap();
 +            assert_eq!(ts.len(), 2);
 +
-+            // Gap should be ~500ms (clamped max_wait) + 50-750ms jitter = 550-1250ms
++            // Gap should be ~500ms (clamped max_wait) + 5-25% jitter = 525-625ms
 +            // Crucially, NOT 10 seconds
 +            let gap = ts[1].duration_since(ts[0]);
 +            assert!(
-+                gap >= Duration::from_millis(500) && gap <= Duration::from_millis(1500),
-+                "Gap with clamped Retry-After: {:?}, expected 500-1500ms (NOT 10s)",
++                gap >= Duration::from_millis(500) && gap <= Duration::from_millis(800),
++                "Gap with clamped Retry-After: {:?}, expected 525-800ms (NOT 10s)",
 +                gap
 +            );
 +        }
rust/src/client/retry.rs
@@ -192,9 +192,6 @@
 +    400, 401, 403, 404, 405, 409, 410, 411, 412, 413, 414, 415, 416,
 +];
 +
-+/// Retryable HTTP status codes for non-idempotent requests.
-+const NON_IDEMPOTENT_RETRYABLE: &[u16] = &[429, 503];
-+
 +/// Determines whether a request should be retried based on the error type
 +/// and the request's idempotency classification.
 +pub struct RetryStrategy {
@@ -232,11 +229,11 @@
 +        match self.idempotency {
 +            // Idempotent: retry everything except known non-retryable codes
 +            RequestIdempotency::Idempotent => !IDEMPOTENT_NON_RETRYABLE.contains(&code),
-+            // Non-idempotent: retry only when the server explicitly signals retry
-+            // via Retry-After header on 429/503
-+            RequestIdempotency::NonIdempotent => {
-+                NON_IDEMPOTENT_RETRYABLE.contains(&code) && has_retry_after
-+            }
++            // Non-idempotent:
++            // - 429: always retryable (rate-limited = request was not executed)
++            // - 503: only retryable with Retry-After header (ambiguous whether
++            //   processing started; require explicit server signal)
++            RequestIdempotency::NonIdempotent => code == 429 || (code == 503 && has_retry_after),
 +        }
 +    }
 +
@@ -258,7 +255,9 @@
 +/// Calculate the backoff duration for a retry attempt.
 +///
 +/// Honors `Retry-After` header if present, otherwise uses exponential backoff.
-+/// Both are clamped to `[min_wait, max_wait]` and jitter (50ms–750ms) is added.
++/// Both are clamped to `[min_wait, max_wait]` and proportional jitter (5–25% of
++/// base wait, minimum 50ms) is added. Proportional jitter scales with the backoff
++/// so it remains meaningful at higher retry intervals.
 +pub fn calculate_backoff(
 +    config: &RetryConfig,
 +    attempt: u32,
@@ -273,8 +272,12 @@
 +        exp.min(config.max_wait)
 +    };
 +
-+    // Add random jitter between 50ms and 750ms
-+    let jitter_ms = rand::rng().random_range(50..=750);
++    // Proportional jitter: random 5-25% of base_wait, minimum 50ms.
++    // Scales with backoff so retries remain spread at higher intervals.
++    let base_ms = base_wait.as_millis() as u64;
++    let jitter_lo = (base_ms * 5 / 100).max(50);
++    let jitter_hi = (base_ms * 25 / 100).max(jitter_lo + 1);
++    let jitter_ms = rand::rng().random_range(jitter_lo..=jitter_hi);
 +    base_wait + Duration::from_millis(jitter_ms)
 +}
 +
@@ -462,24 +465,25 @@
 +    // --- Non-idempotent strategy tests ---
 +
 +    #[test]
-+    fn test_non_idempotent_strategy_retries_429_503_with_retry_after() {
++    fn test_non_idempotent_strategy_429_always_retryable() {
 +        let strategy = RetryStrategy {
 +            idempotency: RequestIdempotency::NonIdempotent,
 +            override_codes: None,
 +        };
-+        // With Retry-After header: 429/503 are retryable
++        // 429 is always retryable — rate-limited means the request was not executed
 +        assert!(strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS, true));
-+        assert!(strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE, true));
++        assert!(strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS, false));
 +    }
 +
 +    #[test]
-+    fn test_non_idempotent_strategy_no_retry_429_503_without_retry_after() {
++    fn test_non_idempotent_strategy_503_requires_retry_after() {
 +        let strategy = RetryStrategy {
 +            idempotency: RequestIdempotency::NonIdempotent,
 +            override_codes: None,
 +        };
-+        // Without Retry-After header: 429/503 are NOT retryable for non-idempotent
-+        assert!(!strategy.is_retryable_status(StatusCode::TOO_MANY_REQUESTS, false));
++        // 503 with Retry-After: retryable (server explicitly signals retry)
++        assert!(strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE, true));
++        // 503 without Retry-After: NOT retryable (ambiguous whether processing started)
 +        assert!(!strategy.is_retryable_status(StatusCode::SERVICE_UNAVAILABLE, false));
 +    }
 +
@@ -517,17 +521,19 @@
 +            ..Default::default()
 +        };
 +        // Per spec: exp_backoff = 2^attempt * min_wait
-+        // Attempt 1: 2^1 * 1s = 2s + jitter, Attempt 2: 2^2 * 1s = 4s + jitter,
-+        // Attempt 3: 2^3 * 1s = 8s + jitter
++        // Proportional jitter: 5-25% of base
++        // Attempt 1: 2s base + 100-500ms = 2100-2500ms
++        // Attempt 2: 4s base + 200-1000ms = 4200-5000ms
++        // Attempt 3: 8s base + 400-2000ms = 8400-10000ms
 +        let b1 = calculate_backoff(&config, 1, None);
 +        let b2 = calculate_backoff(&config, 2, None);
 +        let b3 = calculate_backoff(&config, 3, None);
-+        assert!(b1 >= Duration::from_millis(2050));
-+        assert!(b1 <= Duration::from_millis(2750));
-+        assert!(b2 >= Duration::from_millis(4050));
-+        assert!(b2 <= Duration::from_millis(4750));
-+        assert!(b3 >= Duration::from_millis(8050));
-+        assert!(b3 <= Duration::from_millis(8750));
++        assert!(b1 >= Duration::from_millis(2100));
++        assert!(b1 <= Duration::from_millis(2500));
++        assert!(b2 >= Duration::from_millis(4200));
++        assert!(b2 <= Duration::from_millis(5000));
++        assert!(b3 >= Duration::from_millis(8400));
++        assert!(b3 <= Duration::from_millis(10000));
 +    }
 +
 +    #[test]
@@ -537,9 +543,9 @@
 +            max_wait: Duration::from_secs(10),
 +            ..Default::default()
 +        };
-+        // Attempt 10: 2^9 = 512s, should be capped to 10s + jitter
++        // Attempt 10: 2^10 = 1024s, capped to 10s + 25% jitter = 12500ms max
 +        let backoff = calculate_backoff(&config, 10, None);
-+        assert!(backoff <= Duration::from_millis(10_750));
++        assert!(backoff <= Duration::from_millis(12_500));
 +    }
 +
 +    #[test]
@@ -549,18 +555,18 @@
 +            max_wait: Duration::from_secs(60),
 +            ..Default::default()
 +        };
-+        // Attempt 1: min_wait * 2^1 = 4s + jitter
++        // Attempt 1: min_wait * 2^1 = 4s + 5% jitter min = 4200ms
 +        let backoff = calculate_backoff(&config, 1, None);
-+        assert!(backoff >= Duration::from_millis(4050));
++        assert!(backoff >= Duration::from_millis(4200));
 +    }
 +
 +    #[test]
 +    fn test_retry_after_header_seconds() {
 +        let config = RetryConfig::default();
 +        let backoff = calculate_backoff(&config, 1, Some("5"));
-+        // 5s clamped to [1s, 60s] = 5s + jitter
-+        assert!(backoff >= Duration::from_millis(5050));
-+        assert!(backoff <= Duration::from_millis(5750));
++        // 5s clamped to [1s, 60s] = 5s + 5-25% jitter = 5250-6250ms
++        assert!(backoff >= Duration::from_millis(5250));
++        assert!(backoff <= Duration::from_millis(6250));
 +    }
 +
 +    #[test]
@@ -570,13 +576,13 @@
 +            max_wait: Duration::from_secs(10),
 +            ..Default::default()
 +        };
-+        // Retry-After: 1 should be clamped up to min_wait (3s)
++        // Retry-After: 1 should be clamped up to min_wait (3s) + 5% = 3150ms
 +        let backoff = calculate_backoff(&config, 1, Some("1"));
-+        assert!(backoff >= Duration::from_millis(3050));
++        assert!(backoff >= Duration::from_millis(3150));
 +
-+        // Retry-After: 120 should be clamped down to max_wait (10s)
++        // Retry-After: 120 should be clamped down to max_wait (10s) + 25% = 12500ms
 +        let backoff = calculate_backoff(&config, 1, Some("120"));
-+        assert!(backoff <= Duration::from_millis(10_750));
++        assert!(backoff <= Duration::from_millis(12_500));
 +    }
 +
 +    #[test]
@@ -585,25 +591,47 @@
 +            min_wait: Duration::from_secs(1),
 +            ..Default::default()
 +        };
-+        // Invalid Retry-After should fall back to exponential: 2^1 * 1s = 2s + jitter
++        // Invalid Retry-After should fall back to exponential: 2s + 5-25% = 2100-2500ms
 +        let backoff = calculate_backoff(&config, 1, Some("not-a-number"));
-+        assert!(backoff >= Duration::from_millis(2050));
-+        assert!(backoff <= Duration::from_millis(2750));
++        assert!(backoff >= Duration::from_millis(2100));
++        assert!(backoff <= Duration::from_millis(2500));
 +    }
 +
 +    #[test]
-+    fn test_jitter_in_range_50ms_750ms() {
++    fn test_proportional_jitter() {
 +        let config = RetryConfig {
 +            min_wait: Duration::from_secs(1),
 +            max_wait: Duration::from_secs(60),
 +            ..Default::default()
 +        };
-+        // Run multiple times and check jitter range
-+        // Attempt 1: 2^1 * 1s = 2s base + 50ms to 750ms jitter
++        // Attempt 1: 2s base → jitter 5-25% = 100-500ms → total 2100-2500ms
++        for _ in 0..100 {
++            let backoff = calculate_backoff(&config, 1, None);
++            assert!(backoff >= Duration::from_millis(2100));
++            assert!(backoff <= Duration::from_millis(2500));
++        }
++
++        // Attempt 5: 32s base → jitter 5-25% = 1600-8000ms → total 33600-40000ms
++        // Jitter scales significantly at higher backoffs
++        for _ in 0..100 {
++            let backoff = calculate_backoff(&config, 5, None);
++            assert!(backoff >= Duration::from_millis(33600));
++            assert!(backoff <= Duration::from_millis(40000));
++        }
++    }
++
++    #[test]
++    fn test_jitter_minimum_50ms() {
++        // With a very small base wait, jitter floor should be 50ms
++        let config = RetryConfig {
++            min_wait: Duration::from_millis(10),
++            max_wait: Duration::from_secs(60),
++            ..Default::default()
++        };
++        // Attempt 1: 20ms base → 5% = 1ms, but clamped to 50ms min
 +        for _ in 0..100 {
 +            let backoff = calculate_backoff(&config, 1, None);
-+            assert!(backoff >= Duration::from_millis(2050));
-+            assert!(backoff <= Duration::from_millis(2750));
++            assert!(backoff >= Duration::from_millis(70)); // 20ms + 50ms
 +        }
 +    }
 +

Reproduce locally: git range-diff f69804a..c2cd59b 5e248be..7f47f62 | Disable: git config gitstack.push-range-diff false

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants