Skip to content

Commit 64031bd

Browse files
committed
Merge branch 'main' into refactor/upsample-recursion
2 parents f084c84 + 4a6f20c commit 64031bd

File tree

41 files changed

+804
-803
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+804
-803
lines changed

crates/polars-core/src/chunked_array/ops/arity.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,18 @@ where
556556
ca
557557
}
558558

559-
#[inline]
559+
pub fn try_unary_to_series<T, F>(ca: &ChunkedArray<T>, op: F) -> PolarsResult<Series>
560+
where
561+
T: PolarsDataType,
562+
F: FnMut(&T::Array) -> PolarsResult<Box<dyn Array>>,
563+
{
564+
let chunks = ca
565+
.downcast_iter()
566+
.map(op)
567+
.collect::<PolarsResult<Vec<_>>>()?;
568+
Series::try_from((ca.name().clone(), chunks))
569+
}
570+
560571
pub fn binary_to_series<T, U, F>(
561572
lhs: &ChunkedArray<T>,
562573
rhs: &ChunkedArray<U>,
@@ -576,6 +587,25 @@ where
576587
Series::try_from((lhs.name().clone(), chunks))
577588
}
578589

590+
pub fn try_binary_to_series<T, U, F>(
591+
lhs: &ChunkedArray<T>,
592+
rhs: &ChunkedArray<U>,
593+
mut op: F,
594+
) -> PolarsResult<Series>
595+
where
596+
T: PolarsDataType,
597+
U: PolarsDataType,
598+
F: FnMut(&T::Array, &U::Array) -> PolarsResult<Box<dyn Array>>,
599+
{
600+
let (lhs, rhs) = align_chunks_binary(lhs, rhs);
601+
let chunks = lhs
602+
.downcast_iter()
603+
.zip(rhs.downcast_iter())
604+
.map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
605+
.collect::<PolarsResult<Vec<_>>>()?;
606+
Series::try_from((lhs.name().clone(), chunks))
607+
}
608+
579609
/// Applies a kernel that produces `ArrayRef` of the same type.
580610
///
581611
/// # Safety

crates/polars-expr/src/expressions/gather.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use polars_core::chunked_array::cast::CastOptions;
22
use polars_core::prelude::arity::unary_elementwise_values;
33
use polars_core::prelude::*;
44
use polars_ops::prelude::lst_get;
5-
use polars_ops::series::convert_to_unsigned_index;
5+
use polars_ops::series::convert_and_bound_index;
66
use polars_utils::index::ToIdx;
77

88
use super::*;
@@ -24,11 +24,8 @@ impl PhysicalExpr for GatherExpr {
2424
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
2525
let series = self.phys_expr.evaluate(df, state)?;
2626
let idx = self.idx.evaluate(df, state)?;
27-
let idx = convert_to_unsigned_index(
28-
idx.as_materialized_series(),
29-
series.len(),
30-
self.null_on_oob,
31-
)?;
27+
let idx =
28+
convert_and_bound_index(idx.as_materialized_series(), series.len(), self.null_on_oob)?;
3229
series.take(&idx)
3330
}
3431

crates/polars-io/src/cloud/object_store_setup.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use polars_utils::{format_pl_smallstr, pl_serialize};
1111
use tokio::sync::RwLock;
1212

1313
use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14-
use crate::cloud::CloudConfig;
14+
use crate::cloud::{CloudConfig, CloudRetryConfig};
1515

1616
/// Object stores must be cached. Every object-store will do DNS lookups and
1717
/// get rate limited when querying the DNS (can take up to 5s).
@@ -38,19 +38,20 @@ fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> Vec<u
3838
// We include credentials as they can expire, so users will send new credentials for the same url.
3939
let cloud_options = options.map(
4040
|CloudOptions {
41+
max_retries: _,
4142
// Destructure to ensure this breaks if anything changes.
42-
max_retries,
4343
#[cfg(feature = "file_cache")]
4444
file_cache_ttl,
4545
config,
46+
retry_config,
4647
#[cfg(feature = "cloud")]
4748
credential_provider,
4849
}| {
49-
CloudOptions2 {
50-
max_retries: *max_retries,
50+
CloudOptionsKey {
5151
#[cfg(feature = "file_cache")]
5252
file_cache_ttl: *file_cache_ttl,
5353
config: config.clone(),
54+
retry_config: *retry_config,
5455
#[cfg(feature = "cloud")]
5556
credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
5657
}
@@ -75,18 +76,18 @@ fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> Vec<u
7576
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
7677
struct CacheKey {
7778
url_base: PlSmallStr,
78-
cloud_options: Option<CloudOptions2>,
79+
cloud_options: Option<CloudOptionsKey>,
7980
}
8081

8182
/// Variant of CloudOptions for serializing to a cache key. The credential
8283
/// provider is replaced by the function address.
8384
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
8485
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
85-
struct CloudOptions2 {
86-
max_retries: usize,
86+
struct CloudOptionsKey {
8787
#[cfg(feature = "file_cache")]
8888
file_cache_ttl: u64,
8989
config: Option<CloudConfig>,
90+
retry_config: CloudRetryConfig,
9091
#[cfg(feature = "cloud")]
9192
credential_provider: usize,
9293
}

crates/polars-io/src/cloud/options.rs

Lines changed: 71 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ use object_store::azure::MicrosoftAzureBuilder;
1919
use object_store::gcp::GoogleCloudStorageBuilder;
2020
#[cfg(feature = "gcp")]
2121
pub use object_store::gcp::GoogleConfigKey;
22-
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
23-
use object_store::{BackoffConfig, RetryConfig};
2422
use polars_error::*;
2523
#[cfg(feature = "aws")]
2624
use polars_utils::cache::LruCache;
2725
use polars_utils::pl_path::{CloudScheme, PlRefPath};
26+
use polars_utils::total_ord::TotalOrdWrap;
2827
#[cfg(feature = "http")]
2928
use reqwest::header::HeaderMap;
3029
#[cfg(feature = "serde")]
@@ -79,10 +78,12 @@ pub(crate) enum CloudConfig {
7978
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
8079
/// Options to connect to various cloud providers.
8180
pub struct CloudOptions {
82-
pub max_retries: usize,
81+
pub max_retries: usize, // TODO: Remove in a breaking DSL change
8382
#[cfg(feature = "file_cache")]
8483
pub file_cache_ttl: u64,
8584
pub(crate) config: Option<CloudConfig>,
85+
#[cfg_attr(feature = "serde", serde(default))]
86+
pub retry_config: CloudRetryConfig,
8687
#[cfg(feature = "cloud")]
8788
/// Note: In most cases you will want to access this via [`CloudOptions::initialized_credential_provider`]
8889
/// rather than directly.
@@ -102,6 +103,7 @@ impl CloudOptions {
102103
#[cfg(feature = "file_cache")]
103104
file_cache_ttl: get_env_file_cache_ttl(),
104105
config: None,
106+
retry_config: CloudRetryConfig::default(),
105107
#[cfg(feature = "cloud")]
106108
credential_provider: None,
107109
});
@@ -110,6 +112,66 @@ impl CloudOptions {
110112
}
111113
}
112114

115+
#[derive(Clone, Copy, Debug, PartialEq, Hash, Eq)]
116+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
117+
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
118+
pub struct CloudRetryConfig {
119+
pub max_retries: usize,
120+
pub retry_timeout: std::time::Duration,
121+
pub retry_init_backoff: std::time::Duration,
122+
pub retry_max_backoff: std::time::Duration,
123+
pub retry_base_multiplier: TotalOrdWrap<f64>,
124+
}
125+
126+
impl Default for CloudRetryConfig {
127+
fn default() -> Self {
128+
use std::time::Duration;
129+
130+
return Self {
131+
max_retries: parse_env_var(2, "POLARS_CLOUD_MAX_RETRIES"),
132+
retry_timeout: Duration::from_millis(parse_env_var(
133+
10 * 1000,
134+
"POLARS_CLOUD_RETRY_TIMEOUT_MS",
135+
)),
136+
retry_init_backoff: Duration::from_millis(parse_env_var(
137+
100,
138+
"POLARS_CLOUD_RETRY_INIT_BACKOFF_MS",
139+
)),
140+
retry_max_backoff: Duration::from_millis(parse_env_var(
141+
15 * 1000,
142+
"POLARS_CLOUD_RETRY_MAX_BACKOFF_MS",
143+
)),
144+
retry_base_multiplier: TotalOrdWrap(parse_env_var(
145+
2.,
146+
"POLARS_CLOUD_RETRY_BASE_MULTIPLIER",
147+
)),
148+
};
149+
150+
fn parse_env_var<T: FromStr>(default: T, name: &'static str) -> T {
151+
std::env::var(name).map_or(default, |x| {
152+
x.parse::<T>()
153+
.ok()
154+
.unwrap_or_else(|| panic!("invalid value for {name}: {x}"))
155+
})
156+
}
157+
}
158+
}
159+
160+
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
161+
impl From<CloudRetryConfig> for object_store::RetryConfig {
162+
fn from(value: CloudRetryConfig) -> Self {
163+
object_store::RetryConfig {
164+
backoff: object_store::BackoffConfig {
165+
init_backoff: value.retry_init_backoff,
166+
max_backoff: value.retry_max_backoff,
167+
base: value.retry_base_multiplier.0,
168+
},
169+
max_retries: value.max_retries,
170+
retry_timeout: value.retry_timeout,
171+
}
172+
}
173+
}
174+
113175
#[cfg(feature = "http")]
114176
pub(crate) fn try_build_http_header_map_from_items_slice<S: AsRef<str>>(
115177
headers: &[(S, S)],
@@ -182,15 +244,6 @@ impl CloudType {
182244
}
183245
}
184246

185-
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
186-
fn get_retry_config(max_retries: usize) -> RetryConfig {
187-
RetryConfig {
188-
backoff: BackoffConfig::default(),
189-
max_retries,
190-
retry_timeout: std::time::Duration::from_secs(10),
191-
}
192-
}
193-
194247
pub static USER_AGENT: &str = concat!("polars", "/", env!("CARGO_PKG_VERSION"),);
195248

196249
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
@@ -253,9 +306,9 @@ fn read_config(
253306
}
254307

255308
impl CloudOptions {
256-
/// Set the maximum number of retries.
257-
pub fn with_max_retries(mut self, max_retries: usize) -> Self {
258-
self.max_retries = max_retries;
309+
pub fn with_retry_config(mut self, retry_config: CloudRetryConfig) -> Self {
310+
self.max_retries = retry_config.max_retries;
311+
self.retry_config = retry_config;
259312
self
260313
}
261314

@@ -399,7 +452,7 @@ impl CloudOptions {
399452
};
400453
};
401454

402-
let builder = builder.with_retry(get_retry_config(self.max_retries));
455+
let builder = builder.with_retry(self.retry_config.into());
403456

404457
let opt_credential_provider = match opt_credential_provider {
405458
#[cfg(feature = "python")]
@@ -473,7 +526,7 @@ impl CloudOptions {
473526

474527
let builder = builder
475528
.with_url(url.to_string())
476-
.with_retry(get_retry_config(self.max_retries));
529+
.with_retry(self.retry_config.into());
477530

478531
let builder =
479532
if let Some(v) = self.initialized_credential_provider(clear_cached_credentials)? {
@@ -535,7 +588,7 @@ impl CloudOptions {
535588

536589
let builder = builder
537590
.with_url(url.to_string())
538-
.with_retry(get_retry_config(self.max_retries));
591+
.with_retry(self.retry_config.into());
539592

540593
let builder = if let Some(v) = credential_provider {
541594
builder.with_credentials(v.into_gcp_provider())

crates/polars-io/src/parquet/read/mmap.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
use std::io::Cursor;
2+
13
use arrow::array::Array;
24
use arrow::bitmap::Bitmap;
35
use arrow::datatypes::Field;
46
use polars_error::PolarsResult;
57
use polars_parquet::read::{
68
BasicDecompressor, ColumnChunkMetadata, Filter, PageReader, column_iter_to_arrays,
79
};
8-
use polars_utils::mmap::{MemReader, MemSlice};
10+
use polars_utils::mmap::MemSlice;
911

1012
/// Store columns data in two scenarios:
1113
/// 1. a local memory mapped file
@@ -60,7 +62,7 @@ pub fn to_deserializer(
6062
// Advise fetching the data for the column chunk
6163
chunk.prefetch();
6264

63-
let pages = PageReader::new(MemReader::new(chunk), column_meta, vec![], usize::MAX);
65+
let pages = PageReader::new(Cursor::new(chunk), column_meta, vec![], usize::MAX);
6466
(
6567
BasicDecompressor::new(pages, vec![]),
6668
&column_meta.descriptor().descriptor.primitive_type,

crates/polars-io/src/utils/compression.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use std::cmp;
2-
use std::io::{Read, Write};
2+
use std::io::{Cursor, Read, Write};
33

44
use polars_core::prelude::*;
55
use polars_error::{feature_gated, to_compute_err};
6-
use polars_utils::mmap::{MemReader, MemSlice};
6+
use polars_utils::mmap::MemSlice;
77

88
use crate::utils::file::{Writeable, WriteableTrait};
99
use crate::utils::sync_on_close::SyncOnCloseType;
@@ -80,11 +80,11 @@ pub enum CompressedReader {
8080
offset: usize,
8181
},
8282
#[cfg(feature = "decompress")]
83-
Gzip(flate2::bufread::MultiGzDecoder<MemReader>),
83+
Gzip(flate2::bufread::MultiGzDecoder<Cursor<MemSlice>>),
8484
#[cfg(feature = "decompress")]
85-
Zlib(flate2::bufread::ZlibDecoder<MemReader>),
85+
Zlib(flate2::bufread::ZlibDecoder<Cursor<MemSlice>>),
8686
#[cfg(feature = "decompress")]
87-
Zstd(zstd::Decoder<'static, MemReader>),
87+
Zstd(zstd::Decoder<'static, Cursor<MemSlice>>),
8888
}
8989

9090
impl CompressedReader {
@@ -95,15 +95,15 @@ impl CompressedReader {
9595
None => CompressedReader::Uncompressed { slice, offset: 0 },
9696
#[cfg(feature = "decompress")]
9797
Some(SupportedCompression::GZIP) => {
98-
CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(MemReader::new(slice)))
98+
CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(Cursor::new(slice)))
9999
},
100100
#[cfg(feature = "decompress")]
101101
Some(SupportedCompression::ZLIB) => {
102-
CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(MemReader::new(slice)))
102+
CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(Cursor::new(slice)))
103103
},
104104
#[cfg(feature = "decompress")]
105105
Some(SupportedCompression::ZSTD) => {
106-
CompressedReader::Zstd(zstd::Decoder::with_buffer(MemReader::new(slice))?)
106+
CompressedReader::Zstd(zstd::Decoder::with_buffer(Cursor::new(slice))?)
107107
},
108108
#[cfg(not(feature = "decompress"))]
109109
_ => panic!("activate 'decompress' feature"),
@@ -141,14 +141,16 @@ impl CompressedReader {
141141
CompressedReader::Uncompressed { slice, .. } => slice.len(),
142142
#[cfg(feature = "decompress")]
143143
CompressedReader::Gzip(reader) => {
144-
reader.get_ref().total_len() * ESTIMATED_DEFLATE_RATIO
144+
reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
145145
},
146146
#[cfg(feature = "decompress")]
147147
CompressedReader::Zlib(reader) => {
148-
reader.get_ref().total_len() * ESTIMATED_DEFLATE_RATIO
148+
reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
149149
},
150150
#[cfg(feature = "decompress")]
151-
CompressedReader::Zstd(reader) => reader.get_ref().total_len() * ESTIMATED_ZSTD_RATIO,
151+
CompressedReader::Zstd(reader) => {
152+
reader.get_ref().get_ref().len() * ESTIMATED_ZSTD_RATIO
153+
},
152154
}
153155
}
154156

0 commit comments

Comments
 (0)