Skip to content

Commit fe31df7

Browse files
committed
try(localfile): Eliminate data copy when flushing
1 parent 758e9e4 commit fe31df7

File tree

6 files changed

+82
-66
lines changed

6 files changed

+82
-66
lines changed

src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ fn as_default_healthy_check_min_disks() -> i32 {
135135
1
136136
}
137137
fn as_default_disk_write_buf_capacity() -> String {
138-
"1M".to_string()
138+
"1G".to_string()
139139
}
140140
fn as_default_disk_read_buf_capacity() -> String {
141141
"1M".to_string()

src/store/local/delegator.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::readable_size::ReadableSize;
1111
use crate::runtime::manager::RuntimeManager;
1212
use crate::store::local::sync_io::SyncLocalIO;
1313
use crate::store::local::{FileStat, LocalDiskStorage, LocalIO};
14+
use crate::store::BytesWrapper;
1415
use anyhow::Result;
1516
use async_trait::async_trait;
1617
use await_tree::InstrumentAwait;
@@ -227,7 +228,7 @@ impl LocalIO for LocalDiskDelegator {
227228
self.inner.io_handler.create_dir(dir).await
228229
}
229230

230-
async fn append(&self, path: &str, data: Bytes) -> Result<()> {
231+
async fn append(&self, path: &str, data: BytesWrapper) -> Result<()> {
231232
// todo: add the concurrency limitation. do we need? may be not.
232233

233234
let timer = LOCALFILE_DISK_APPEND_OPERATION_DURATION

src/store/local/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::store::BytesWrapper;
1819
use anyhow::Result;
1920
use async_trait::async_trait;
2021
use bytes::Bytes;
@@ -30,7 +31,7 @@ pub struct FileStat {
3031
#[async_trait]
3132
pub trait LocalIO: Clone {
3233
async fn create_dir(&self, dir: &str) -> Result<()>;
33-
async fn append(&self, path: &str, data: Bytes) -> Result<()>;
34+
async fn append(&self, path: &str, data: BytesWrapper) -> Result<()>;
3435
async fn read(&self, path: &str, offset: i64, length: Option<i64>) -> Result<Bytes>;
3536
async fn delete(&self, path: &str) -> Result<()>;
3637
async fn write(&self, path: &str, data: Bytes) -> Result<()>;

src/store/local/sync_io.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use crate::runtime::RuntimeRef;
22
use crate::store::local::{FileStat, LocalIO};
3+
use crate::store::BytesWrapper;
34
use anyhow::anyhow;
45
use async_trait::async_trait;
56
use bytes::Bytes;
6-
use std::fs;
77
use std::fs::{File, OpenOptions};
88
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
99
use std::path::Path;
1010
use std::sync::Arc;
11+
use std::{fs, io};
1112

1213
#[derive(Clone)]
1314
pub struct SyncLocalIO {
@@ -60,28 +61,31 @@ impl LocalIO for SyncLocalIO {
6061
Ok(())
6162
}
6263

63-
async fn append(&self, path: &str, data: Bytes) -> anyhow::Result<()> {
64+
async fn append(&self, path: &str, data: BytesWrapper) -> anyhow::Result<()> {
6465
let path = self.with_root(path);
65-
let buf = self.inner.buf_writer_capacity.clone();
66+
let buffer_capacity = self.inner.buf_writer_capacity.clone();
6667

6768
let r = self
6869
.inner
6970
.write_runtime_ref
7071
.spawn_blocking(move || {
7172
let path = Path::new(&path);
7273
let mut file = OpenOptions::new().append(true).create(true).open(path)?;
74+
let mut buf_writer = match buffer_capacity {
75+
Some(capacity) => BufWriter::with_capacity(capacity, file),
76+
_ => BufWriter::new(file),
77+
};
7378

74-
match buf {
75-
Some(capacity) => {
76-
let mut buf_writer = BufWriter::with_capacity(capacity, file);
77-
buf_writer.write_all(&data)?;
78-
buf_writer.flush()
79-
}
80-
_ => {
81-
file.write_all(&data)?;
82-
file.flush()
79+
match data {
80+
BytesWrapper::Direct(bytes) => buf_writer.write_all(&bytes)?,
81+
BytesWrapper::Composed(composed) => {
82+
for data in composed.iter() {
83+
buf_writer.write_all(data)?;
84+
}
8385
}
8486
}
87+
buf_writer.flush()?;
88+
Ok::<(), io::Error>(())
8589
})
8690
.await
8791
.map_err(|e| anyhow!(e))??;
@@ -207,9 +211,12 @@ mod test {
207211
);
208212

209213
// append
210-
base_runtime_ref.block_on(io_handler.append(data_file_name, Bytes::from(vec![0; 1000])))?;
211-
base_runtime_ref.block_on(io_handler.append(data_file_name, Bytes::from(vec![0; 1000])))?;
212-
base_runtime_ref.block_on(io_handler.append(data_file_name, Bytes::from(vec![0; 1000])))?;
214+
base_runtime_ref
215+
.block_on(io_handler.append(data_file_name, Bytes::from(vec![0; 1000]).into()))?;
216+
base_runtime_ref
217+
.block_on(io_handler.append(data_file_name, Bytes::from(vec![0; 1000]).into()))?;
218+
base_runtime_ref
219+
.block_on(io_handler.append(data_file_name, Bytes::from(vec![0; 1000]).into()))?;
213220

214221
// stat
215222
let stat = base_runtime_ref.block_on(io_handler.file_stat(data_file_name))?;

src/store/localfile.rs

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -244,57 +244,17 @@ impl LocalFileStore {
244244
}
245245
}
246246

247-
let mut index_bytes_holder = BytesMut::new();
248-
let mut data_bytes_holder = BytesMut::new();
249-
250-
let mut total_size = 0u64;
251-
for block in blocks {
252-
let block_id = block.block_id;
253-
let length = block.length;
254-
let uncompress_len = block.uncompress_length;
255-
let task_attempt_id = block.task_attempt_id;
256-
let crc = block.crc;
257-
258-
total_size += length as u64;
259-
260-
index_bytes_holder.put_i64(next_offset);
261-
index_bytes_holder.put_i32(length);
262-
index_bytes_holder.put_i32(uncompress_len);
263-
index_bytes_holder.put_i64(crc);
264-
index_bytes_holder.put_i64(block_id);
265-
index_bytes_holder.put_i64(task_attempt_id);
266-
267-
let data = &block.data;
268-
269-
// let actual = get_crc(data);
270-
// if crc != actual {
271-
// error!("Found not correct crc value while writing. expected: {}, actual: {} for uid: {:?}", crc, actual, &uid);
272-
// }
273-
274-
data_bytes_holder.put(data.clone());
275-
next_offset += length as i64;
276-
}
277-
278-
let data = data_bytes_holder.freeze();
279-
if data.len() != total_size as usize {
280-
error!(
281-
"Found not correct data len. expected: {}. actual: {} for uid: {:?}",
282-
total_size,
283-
data.len(),
284-
&uid
285-
);
286-
}
287-
247+
let shuffle_file_format = self.generate_shuffle_file_format(blocks, next_offset)?;
288248
local_disk
289-
.append(&data_file_path, data)
249+
.append(&data_file_path, shuffle_file_format.data)
290250
.instrument_await(format!("data flushing. path: {}", &data_file_path))
291251
.await?;
292252
local_disk
293-
.append(&index_file_path, index_bytes_holder.freeze())
253+
.append(&index_file_path, shuffle_file_format.index)
294254
.instrument_await(format!("index flushing. path: {}", &index_file_path))
295255
.await?;
296256

297-
TOTAL_LOCALFILE_USED.inc_by(total_size);
257+
TOTAL_LOCALFILE_USED.inc_by(shuffle_file_format.len as u64);
298258

299259
locked_obj
300260
.deref()

src/store/mod.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ pub mod memory;
2626
mod spill;
2727

2828
use crate::app::{
29-
PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingViewContext,
30-
RegisterAppContext, ReleaseTicketContext, RequireBufferContext, WritingViewContext,
29+
PurgeDataContext, ReadingIndexViewContext, ReadingViewContext, RegisterAppContext,
30+
ReleaseTicketContext, RequireBufferContext, WritingViewContext,
3131
};
3232
use crate::config::{Config, StorageType};
3333
use crate::error::WorkerError;
@@ -38,12 +38,12 @@ use std::fmt::{Display, Formatter};
3838
use crate::util::now_timestamp_as_sec;
3939
use anyhow::Result;
4040
use async_trait::async_trait;
41-
use bytes::Bytes;
41+
use bytes::{BufMut, Bytes, BytesMut};
4242

4343
use crate::composed_bytes::ComposedBytes;
4444
use crate::runtime::manager::RuntimeManager;
45-
use crate::store::mem::buffer::BatchMemoryBlock;
4645
use crate::store::spill::SpillWritingViewContext;
46+
use crate::store::BytesWrapper::{Composed, Direct};
4747
use std::sync::Arc;
4848

4949
#[derive(Debug)]
@@ -248,6 +248,53 @@ pub trait Store {
248248
async fn name(&self) -> StorageType;
249249

250250
async fn spill_insert(&self, ctx: SpillWritingViewContext) -> Result<(), WorkerError>;
251+
252+
fn generate_shuffle_file_format(
253+
&self,
254+
blocks: Vec<&Block>,
255+
offset: i64,
256+
) -> Result<ShuffleFileFormat> {
257+
let mut offset = offset;
258+
259+
let mut index_bytes_holder = BytesMut::new();
260+
let mut data_chain = Vec::with_capacity(blocks.len());
261+
262+
let mut total_size = 0;
263+
for block in blocks {
264+
let block_id = block.block_id;
265+
let length = block.length;
266+
let uncompress_len = block.uncompress_length;
267+
let task_attempt_id = block.task_attempt_id;
268+
let crc = block.crc;
269+
270+
total_size += length as usize;
271+
272+
index_bytes_holder.put_i64(offset);
273+
index_bytes_holder.put_i32(length);
274+
index_bytes_holder.put_i32(uncompress_len);
275+
index_bytes_holder.put_i64(crc);
276+
index_bytes_holder.put_i64(block_id);
277+
index_bytes_holder.put_i64(task_attempt_id);
278+
279+
let data = &block.data;
280+
data_chain.push(data.clone());
281+
offset += length as i64;
282+
}
283+
284+
Ok(ShuffleFileFormat {
285+
data: Composed(ComposedBytes::from(data_chain, total_size)),
286+
index: Direct(index_bytes_holder.into()),
287+
len: total_size,
288+
offset,
289+
})
290+
}
291+
}
292+
293+
pub struct ShuffleFileFormat {
294+
data: BytesWrapper,
295+
index: BytesWrapper,
296+
len: usize,
297+
offset: i64,
251298
}
252299

253300
pub trait Persistent {}

0 commit comments

Comments
 (0)