Skip to content

Commit c3c38a9

Browse files
authored
refactor(rust): Remove ad-hoc buffer pool (#19553)
1 parent aaf0a11 commit c3c38a9

File tree

3 files changed

+20
-62
lines changed

3 files changed

+20
-62
lines changed

crates/polars-io/src/csv/write/write_impl.rs

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use arrow::legacy::time_zone::Tz;
77
use polars_core::prelude::*;
88
use polars_core::POOL;
99
use polars_error::polars_ensure;
10-
use polars_utils::contention_pool::LowContentionPool;
1110
use rayon::prelude::*;
1211
use serializer::{serializer_for, string_serializer};
1312

@@ -115,16 +114,12 @@ pub(crate) fn write<W: Write>(
115114

116115
let len = df.height();
117116
let total_rows_per_pool_iter = n_threads * chunk_size;
118-
let serializer_pool = LowContentionPool::<Vec<_>>::new(n_threads);
119-
let write_buffer_pool = LowContentionPool::<Vec<_>>::new(n_threads);
120117

121118
let mut n_rows_finished = 0;
122119

123-
// holds the buffers that will be written
124-
let mut result_buf: Vec<PolarsResult<Vec<u8>>> = Vec::with_capacity(n_threads);
125-
120+
let mut buffers: Vec<_> = (0..n_threads).map(|_| (Vec::new(), Vec::new())).collect();
126121
while n_rows_finished < len {
127-
let buf_writer = |thread_no| {
122+
let buf_writer = |thread_no, write_buffer: &mut Vec<_>, serializers_vec: &mut Vec<_>| {
128123
let thread_offset = thread_no * chunk_size;
129124
let total_offset = n_rows_finished + thread_offset;
130125
let mut df = df.slice(total_offset as i64, chunk_size);
@@ -141,15 +136,13 @@ pub(crate) fn write<W: Write>(
141136
// the vectors the buffer pool, the series have already been removed from the buffers
142137
// in other words, the lifetime does not leave this scope
143138
let cols = unsafe { std::mem::transmute::<&[Column], &[Column]>(cols) };
144-
let mut write_buffer = write_buffer_pool.get();
145139

146140
if df.is_empty() {
147-
return Ok(write_buffer);
141+
return Ok(());
148142
}
149143

150-
let mut serializers_vec = serializer_pool.get();
151144
if serializers_vec.is_empty() {
152-
serializers_vec = cols
145+
*serializers_vec = cols
153146
.iter()
154147
.enumerate()
155148
.map(|(i, col)| {
@@ -164,7 +157,7 @@ pub(crate) fn write<W: Write>(
164157
.collect::<Result<_, _>>()?;
165158
} else {
166159
debug_assert_eq!(serializers_vec.len(), cols.len());
167-
for (col_iter, col) in std::iter::zip(&mut serializers_vec, cols) {
160+
for (col_iter, col) in std::iter::zip(serializers_vec.iter_mut(), cols) {
168161
col_iter.update_array(&*col.as_materialized_series().chunks()[0]);
169162
}
170163
}
@@ -174,33 +167,34 @@ pub(crate) fn write<W: Write>(
174167
let len = std::cmp::min(cols[0].len(), chunk_size);
175168

176169
for _ in 0..len {
177-
serializers[0].serialize(&mut write_buffer, options);
170+
serializers[0].serialize(write_buffer, options);
178171
for serializer in &mut serializers[1..] {
179172
write_buffer.push(options.separator);
180-
serializer.serialize(&mut write_buffer, options);
173+
serializer.serialize(write_buffer, options);
181174
}
182175

183176
write_buffer.extend_from_slice(options.line_terminator.as_bytes());
184177
}
185178

186-
serializer_pool.set(serializers_vec);
187-
188-
Ok(write_buffer)
179+
Ok(())
189180
};
190181

191182
if n_threads > 1 {
192-
let par_iter = (0..n_threads).into_par_iter().map(buf_writer);
193-
// rayon will ensure the right order
194-
POOL.install(|| result_buf.par_extend(par_iter));
183+
POOL.install(|| {
184+
buffers
185+
.par_iter_mut()
186+
.enumerate()
187+
.map(|(i, (w, s))| buf_writer(i, w, s))
188+
.collect::<PolarsResult<()>>()
189+
})?;
195190
} else {
196-
result_buf.push(buf_writer(0));
191+
let (w, s) = &mut buffers[0];
192+
buf_writer(0, w, s)?;
197193
}
198194

199-
for buf in result_buf.drain(..) {
200-
let mut buf = buf?;
201-
writer.write_all(&buf)?;
202-
buf.clear();
203-
write_buffer_pool.set(buf);
195+
for (write_buffer, _) in &mut buffers {
196+
writer.write_all(write_buffer)?;
197+
write_buffer.clear();
204198
}
205199

206200
n_rows_finished += total_rows_per_pool_iter;

crates/polars-utils/src/contention_pool.rs

Lines changed: 0 additions & 35 deletions
This file was deleted.

crates/polars-utils/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ pub mod cache;
1313
pub mod cardinality_sketch;
1414
pub mod cell;
1515
pub mod clmul;
16-
pub mod contention_pool;
1716
pub mod cpuid;
1817
mod error;
1918
pub mod floor_divmod;

0 commit comments

Comments
 (0)