Skip to content

Commit c9d0a61

Browse files
authored
Better credential refresh (#1147)
- Start attempting to refresh earlier when using S3 SDK. Credential refresh could take a while. - Less contending over the credential lock in GCS - Fix logic in Python refreshable credentials Context: some users are seeing errors when trying to refresh credentials, but we weren't able to reproduce.
1 parent 611f5c0 commit c9d0a61

File tree

3 files changed

+61
-34
lines changed

3 files changed

+61
-34
lines changed

icechunk-python/src/config.rs

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -140,19 +140,19 @@ pub(crate) fn datetime_repr(d: &DateTime<Utc>) -> String {
140140
#[derive(Clone, Debug, Serialize, Deserialize)]
141141
struct PythonCredentialsFetcher<CredType> {
142142
pub pickled_function: Vec<u8>,
143-
pub current: Option<CredType>,
143+
pub initial: Option<CredType>,
144144
}
145145

146146
impl<CredType> PythonCredentialsFetcher<CredType> {
147147
fn new(pickled_function: Vec<u8>) -> Self {
148-
PythonCredentialsFetcher { pickled_function, current: None }
148+
PythonCredentialsFetcher { pickled_function, initial: None }
149149
}
150150

151-
fn new_with_current<C>(pickled_function: Vec<u8>, current: C) -> Self
151+
fn new_with_initial<C>(pickled_function: Vec<u8>, current: C) -> Self
152152
where
153153
C: Into<CredType>,
154154
{
155-
PythonCredentialsFetcher { pickled_function, current: Some(current.into()) }
155+
PythonCredentialsFetcher { pickled_function, initial: Some(current.into()) }
156156
}
157157
}
158158

@@ -174,18 +174,22 @@ where
174174
#[typetag::serde]
175175
impl S3CredentialsFetcher for PythonCredentialsFetcher<S3StaticCredentials> {
176176
async fn get(&self) -> Result<S3StaticCredentials, String> {
177-
Python::with_gil(|py| {
178-
if let Some(static_creds) = self.current.as_ref() {
179-
// avoid herding by adding some randomness
180-
let delta = TimeDelta::seconds(rand::random_range(5..180));
181-
let expiration = static_creds
182-
.expires_after
183-
.map(|exp| exp - delta)
184-
.unwrap_or(chrono::DateTime::<Utc>::MAX_UTC);
185-
if Utc::now() < expiration {
177+
if let Some(static_creds) = self.initial.as_ref() {
178+
match static_creds.expires_after {
179+
None => {
180+
return Ok(static_creds.clone());
181+
}
182+
Some(expiration)
183+
if expiration
184+
> Utc::now()
185+
+ TimeDelta::seconds(rand::random_range(120..=180)) =>
186+
{
186187
return Ok(static_creds.clone());
187188
}
189+
_ => {}
188190
}
191+
}
192+
Python::with_gil(|py| {
189193
call_pickled::<PyS3StaticCredentials>(py, self.pickled_function.clone())
190194
.map(|c| c.into())
191195
})
@@ -197,18 +201,22 @@ impl S3CredentialsFetcher for PythonCredentialsFetcher<S3StaticCredentials> {
197201
#[typetag::serde]
198202
impl GcsCredentialsFetcher for PythonCredentialsFetcher<GcsBearerCredential> {
199203
async fn get(&self) -> Result<GcsBearerCredential, String> {
200-
Python::with_gil(|py| {
201-
if let Some(static_creds) = self.current.as_ref() {
202-
// avoid herding by adding some randomness
203-
let delta = TimeDelta::seconds(rand::random_range(5..180));
204-
let expiration = static_creds
205-
.expires_after
206-
.map(|exp| exp - delta)
207-
.unwrap_or(chrono::DateTime::<Utc>::MAX_UTC);
208-
if Utc::now() < expiration {
204+
if let Some(static_creds) = self.initial.as_ref() {
205+
match static_creds.expires_after {
206+
None => {
207+
return Ok(static_creds.clone());
208+
}
209+
Some(expiration)
210+
if expiration
211+
> Utc::now()
212+
+ TimeDelta::seconds(rand::random_range(120..=180)) =>
213+
{
209214
return Ok(static_creds.clone());
210215
}
216+
_ => {}
211217
}
218+
}
219+
Python::with_gil(|py| {
212220
call_pickled::<PyGcsBearerCredential>(py, self.pickled_function.clone())
213221
.map(|c| c.into())
214222
})
@@ -233,7 +241,7 @@ impl From<PyS3Credentials> for S3Credentials {
233241
PyS3Credentials::Static(creds) => S3Credentials::Static(creds.into()),
234242
PyS3Credentials::Refreshable { pickled_function, current } => {
235243
let fetcher = if let Some(current) = current {
236-
PythonCredentialsFetcher::new_with_current(pickled_function, current)
244+
PythonCredentialsFetcher::new_with_initial(pickled_function, current)
237245
} else {
238246
PythonCredentialsFetcher::new(pickled_function)
239247
};
@@ -320,7 +328,7 @@ impl From<PyGcsCredentials> for GcsCredentials {
320328
PyGcsCredentials::Static(creds) => GcsCredentials::Static(creds.into()),
321329
PyGcsCredentials::Refreshable { pickled_function, current } => {
322330
let fetcher = if let Some(current) = current {
323-
PythonCredentialsFetcher::new_with_current(pickled_function, current)
331+
PythonCredentialsFetcher::new_with_initial(pickled_function, current)
324332
} else {
325333
PythonCredentialsFetcher::new(pickled_function)
326334
};

icechunk/src/storage/object_store.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
};
99
use async_trait::async_trait;
1010
use bytes::{Buf, Bytes};
11-
use chrono::{DateTime, Utc};
11+
use chrono::{DateTime, TimeDelta, Utc};
1212
use futures::{
1313
StreamExt, TryStreamExt,
1414
stream::{self, BoxStream},
@@ -38,7 +38,7 @@ use std::{
3838
};
3939
use tokio::{
4040
io::AsyncRead,
41-
sync::{Mutex, OnceCell},
41+
sync::{OnceCell, RwLock},
4242
};
4343
use tokio_util::compat::FuturesAsyncReadCompatExt;
4444
use tracing::instrument;
@@ -1147,29 +1147,34 @@ impl ObjectStoreBackend for GcsObjectStoreBackend {
11471147

11481148
#[derive(Debug)]
11491149
pub struct GcsRefreshableCredentialProvider {
1150-
last_credential: Arc<Mutex<Option<GcsBearerCredential>>>,
1150+
last_credential: Arc<RwLock<Option<GcsBearerCredential>>>,
11511151
refresher: Arc<dyn GcsCredentialsFetcher>,
11521152
}
11531153

11541154
impl GcsRefreshableCredentialProvider {
11551155
pub fn new(refresher: Arc<dyn GcsCredentialsFetcher>) -> Self {
1156-
Self { last_credential: Arc::new(Mutex::new(None)), refresher }
1156+
Self { last_credential: Arc::new(RwLock::new(None)), refresher }
11571157
}
11581158

11591159
pub async fn get_or_update_credentials(
11601160
&self,
11611161
) -> Result<GcsBearerCredential, StorageError> {
1162-
let mut last_credential = self.last_credential.lock().await;
1162+
let last_credential = self.last_credential.read().await;
11631163

11641164
// If we have a credential and it hasn't expired, return it
11651165
if let Some(creds) = last_credential.as_ref() {
11661166
if let Some(expires_after) = creds.expires_after {
1167-
if expires_after > Utc::now() {
1167+
if expires_after
1168+
> Utc::now() + TimeDelta::seconds(rand::random_range(120..=180))
1169+
{
11681170
return Ok(creds.clone());
11691171
}
11701172
}
11711173
}
11721174

1175+
drop(last_credential);
1176+
let mut last_credential = self.last_credential.write().await;
1177+
11731178
// Otherwise, refresh the credential and cache it
11741179
let creds = self
11751180
.refresher

icechunk/src/storage/s3.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use aws_credential_types::provider::error::CredentialsError;
2323
use aws_sdk_s3::{
2424
Client,
2525
config::{
26-
Builder, ConfigBag, Intercept, ProvideCredentials, Region, RuntimeComponents,
27-
interceptors::BeforeTransmitInterceptorContextMut,
26+
Builder, ConfigBag, IdentityCache, Intercept, ProvideCredentials, Region,
27+
RuntimeComponents, interceptors::BeforeTransmitInterceptorContextMut,
2828
},
2929
error::{BoxError, SdkError},
3030
operation::put_object::PutObjectError,
@@ -40,7 +40,7 @@ use futures::{
4040
};
4141
use serde::{Deserialize, Serialize};
4242
use tokio::{io::AsyncRead, sync::OnceCell};
43-
use tracing::instrument;
43+
use tracing::{error, instrument};
4444

4545
use super::{
4646
CHUNK_PREFIX, CONFIG_PATH, DeleteObjectsResult, FetchConfigResult, GetRefResult,
@@ -171,10 +171,19 @@ pub async fn mk_client(
171171
.with_max_backoff(core::time::Duration::from_millis(
172172
settings.retries().max_backoff_ms() as u64,
173173
));
174+
174175
let mut s3_builder = Builder::from(&aws_config.load().await)
175176
.force_path_style(config.force_path_style)
176177
.retry_config(retry_config);
177178

179+
// credentials may take a while to refresh, defaults are too strict
180+
let id_cache = IdentityCache::lazy()
181+
.load_timeout(core::time::Duration::from_secs(120))
182+
.buffer_time(core::time::Duration::from_secs(120))
183+
.build();
184+
185+
s3_builder = s3_builder.identity_cache(id_cache);
186+
178187
if !extra_read_headers.is_empty() || !extra_write_headers.is_empty() {
179188
s3_builder = s3_builder.interceptor(ExtraHeadersInterceptor {
180189
extra_read_headers,
@@ -1017,7 +1026,12 @@ impl ProvideRefreshableCredentials {
10171026
async fn provide(
10181027
&self,
10191028
) -> Result<aws_credential_types::Credentials, CredentialsError> {
1020-
let creds = self.0.get().await.map_err(CredentialsError::not_loaded)?;
1029+
let creds = self
1030+
.0
1031+
.get()
1032+
.await
1033+
.inspect_err(|err| error!(error = err, "Cannot load credentials"))
1034+
.map_err(CredentialsError::not_loaded)?;
10211035
let creds = aws_credential_types::Credentials::new(
10221036
creds.access_key_id,
10231037
creds.secret_access_key,

0 commit comments

Comments
 (0)