Skip to content

Commit

Permalink
it just works
Browse files Browse the repository at this point in the history
  • Loading branch information
numinnex committed Nov 16, 2024
1 parent fb9e52b commit 83d4346
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 69 deletions.
144 changes: 88 additions & 56 deletions server/src/streaming/direct_io/storage.rs
Original file line number Diff line number Diff line change
@@ -1,96 +1,128 @@
use std::{io::{SeekFrom, Write}, os::unix::fs::OpenOptionsExt};
use std::{alloc::{self, Layout}, io::{Read, Seek, SeekFrom, Write}, os::unix::fs::OpenOptionsExt};

use bytes::{BufMut, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use iggy::error::IggyError;
use tracing::warn;
use tokio::{fs::OpenOptions, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}};
use tokio::{fs::OpenOptions, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader}, task::spawn_blocking};
use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD};

#[derive(Debug, Default)]
pub struct DirectIOStorage {
}

impl DirectIOStorage {
pub async fn read_batches(&self, file_path: &str, start_position: u64, end_offset: u64) -> Result<Vec<RetainedMessageBatch>, IggyError> {
let file = OpenOptions::new().read(true).custom_flags(libc::O_DIRECT).open(file_path).await?;
warn!("start_position: {}", start_position);

let sector_size = 4096;
pub async fn read_batches(&self, file_path: &str, start_position: u64, end_position: u64) -> Result<Vec<RetainedMessageBatch>, IggyError> {
//let mut file = OpenOptions::new().read(true).custom_flags(libc::O_DIRECT).open(file_path).await?;
let mut file = std::fs::File::options().read(true).custom_flags(libc::O_DIRECT).open(file_path)?;
file.seek(SeekFrom::Start(start_position))?;
let mut batches = Vec::new();
let file_size = file.metadata().await?.len();
let file_size = file.metadata()?.len();
if file_size == 0 {
warn!("file_size is 0");
return Ok(batches);
}
// Aloc the buf
let buf_size = if start_position == end_position {
file_size - start_position
} else {
end_position - start_position
};
let sector_size = 4096;
let alignment = buf_size % sector_size;
assert!(alignment == 0);

let layout = Layout::from_size_align(buf_size as _, sector_size as _).unwrap();
let ptr = unsafe { alloc::alloc(layout) };
// Not sure if this is required
unsafe { std::ptr::write_bytes(ptr, 0, buf_size as _) };
let mut bytes = unsafe {Vec::from_raw_parts(ptr, buf_size as _, buf_size as _)};
let result = spawn_blocking(move || {
if let Err(e) = file.read_exact(&mut bytes) {
warn!("error reading batch: {}", e);
}
Self::serialize_batches(bytes, &mut batches);
Ok(batches)
}).await.unwrap();
result
}

let mut reader = BufReader::with_capacity(4096 * 1000, file);
reader
.seek(SeekFrom::Start(start_position as u64))
.await?;
fn serialize_batches(bytes: Vec<u8>, batches: &mut Vec<RetainedMessageBatch>) {
let len = bytes.len();
let mut read_bytes = 0;
let sector_size = 4096;

let mut read_bytes = start_position as u64;
let mut last_batch_to_read = false;
while !last_batch_to_read {
let Ok(batch_base_offset) = reader.read_u64_le().await else {
break;
};
let batch_length = reader
.read_u32_le()
.await
.map_err(|_| IggyError::CannotReadBatchLength)?;
let last_offset_delta = reader
.read_u32_le()
.await
.map_err(|_| IggyError::CannotReadLastOffsetDelta)?;
let max_timestamp = reader
.read_u64_le()
.await
.map_err(|_| IggyError::CannotReadMaxTimestamp)?;
while read_bytes < len {
// Read batch_base_offset
let batch_base_offset = u64::from_le_bytes(
bytes[read_bytes..read_bytes + 8]
.try_into()
.expect("Failed to read batch_base_offset"),
);
read_bytes += 8;

let last_offset = batch_base_offset + (last_offset_delta as u64);
// Read batch_length
let batch_length = u32::from_le_bytes(
bytes[read_bytes..read_bytes + 4]
.try_into()
.expect("Failed to read batch_length"),
);
read_bytes += 4;

// Read last_offset_delta
let last_offset_delta = u32::from_le_bytes(
bytes[read_bytes..read_bytes + 4]
.try_into()
.expect("Failed to read last_offset_delta"),
);
read_bytes += 4;

// Read max_timestamp
let max_timestamp = u64::from_le_bytes(
bytes[read_bytes..read_bytes + 8]
.try_into()
.expect("Failed to read max_timestamp"),
);
read_bytes += 8;

// Calculate last_offset and other values
let total_batch_size = batch_length + RETAINED_BATCH_OVERHEAD;
let sectors = total_batch_size.div_ceil(sector_size);
let adjusted_size = sector_size * sectors;
warn!("adjusted_size: {}", adjusted_size);
let diff = adjusted_size - total_batch_size;

// Read payload
let payload_len = batch_length as usize;
let mut payload = BytesMut::with_capacity(payload_len);
payload.put_bytes(0, payload_len);
if let Err(error) = reader.read_exact(&mut payload).await {
let payload_start = read_bytes;
let payload_end = read_bytes + payload_len;
if payload_end > len {
warn!(
"Cannot read batch payload for batch with base offset: {batch_base_offset}, last offset delta: {last_offset_delta}, max timestamp: {max_timestamp}, batch length: {batch_length} and payload length: {payload_len}.\nProbably OS hasn't flushed the data yet, try setting `enforce_fsync = true` for partition configuration if this issue occurs again.\n{error}",
"Cannot read batch payload for batch with base offset: {batch_base_offset}, last offset delta: {last_offset_delta}, max timestamp: {max_timestamp}, batch length: {batch_length} and payload length: {payload_len}.\nProbably OS hasn't flushed the data yet, try setting `enforce_fsync = true` for partition configuration if this issue occurs again."
);
break;
}
// TEMP
let mut temp = BytesMut::with_capacity(diff as _);
temp.put_bytes(0, diff as _);
if let Err(e) = reader.read_exact(&mut temp).await {
warn!("lol error reading padding");
}

read_bytes += 8 + 4 + 4 + 8 + payload_len as u64;
last_batch_to_read = read_bytes >= file_size || last_offset == end_offset;

// Ergh....
let payload = Bytes::copy_from_slice(&bytes[payload_start..payload_end]);
read_bytes = payload_end + diff as usize;
let batch = RetainedMessageBatch::new(
batch_base_offset,
last_offset_delta,
max_timestamp,
batch_length,
payload.freeze(),
payload,
);
batches.push(batch);
}
Ok(batches)
}

pub async fn write_batches(&self, file_path: &str, bytes: &[u8]) -> Result<u32, IggyError> {
//let mut std_file = std::fs::File::options().append(true).custom_flags(libc::O_DIRECT).open(file_path)?;
let mut file = OpenOptions::new().append(true).custom_flags(libc::O_DIRECT).open(file_path).await?;
if let Err(e) = file.write_all(bytes).await {
warn!("error writing: {}", e);
}
Ok(bytes.len() as _)
pub async fn write_batches(&self, file_path: &str, bytes: Vec<u8>) -> Result<u32, IggyError> {
let mut std_file = std::fs::File::options().append(true).custom_flags(libc::O_DIRECT).open(file_path)?;
//let mut file = OpenOptions::new().append(true).custom_flags(libc::O_DIRECT).open(file_path).await?;
let size = bytes.len() as _;
spawn_blocking(move || {
if let Err(e) = std_file.write_all(&bytes) {
warn!("error writing: {}", e);
}
}).await.unwrap();
Ok(size)
}
}
18 changes: 14 additions & 4 deletions server/src/streaming/segments/index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::streaming::segments::segment::Segment;
use iggy::error::IggyError;
use iggy::error::IggyError::InvalidOffset;
use tracing::warn;

#[derive(Debug, Eq, Clone, Copy, Default)]
pub struct Index {
Expand Down Expand Up @@ -32,10 +33,19 @@ impl Segment {
let ending_offset_idx = binary_search_index(indices, end_offset);

match (starting_offset_idx, ending_offset_idx) {
(Some(starting_offset_idx), Some(ending_offset_idx)) => Ok(IndexRange {
start: indices[starting_offset_idx],
end: indices[ending_offset_idx],
}),
(Some(starting_offset_idx), Some(ending_offset_idx)) =>
{
// UGLY AS FOOOOOOOOOOOOOK, but will deal with it later on.
let end_idx = if ending_offset_idx == indices.len() - 1 {
ending_offset_idx
} else {
ending_offset_idx + 1
};
Ok(IndexRange {
start: indices[starting_offset_idx],
end: indices[end_idx],
})
},
(Some(starting_offset_idx), None) => Ok(IndexRange {
start: indices[starting_offset_idx],
end: *indices.last().unwrap(),
Expand Down
13 changes: 4 additions & 9 deletions server/src/streaming/segments/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,16 @@ impl Segment {
let messages_count = (start_offset + end_offset) as usize;
let path = self.log_path.as_str();
let start_position = index_range.start.position;
let end_offset = index_range.end.offset as u64 + self.start_offset;
let end_position = index_range.end.position;
let batch = self
.direct_io_storage
.read_batches(path, start_position as _, end_offset)
.read_batches(path, start_position as _, end_position as _)
.await?;
error!("batches_count: {}", batch.len());
let messages = batch
.iter()
.to_messages();
/*
.to_messages_with_filter(messages_count, &|msg| {
msg.offset >= start_offset && msg.offset <= end_offset
});
*/
error!("messages len: {}", messages.len());
trace!(
"Loaded {} messages from disk, segment start offset: {}, end offset: {}.",
messages.len(),
Expand Down Expand Up @@ -247,7 +242,7 @@ impl Segment {
}

pub async fn persist_messages(&mut self, fsync: bool) -> Result<usize, IggyError> {
let sector_size = 512;
let sector_size = 4096;
let storage = self.direct_io_storage.clone();
let index_storage = self.storage.segment.clone();
if self.unsaved_messages.is_none() {
Expand Down Expand Up @@ -285,7 +280,7 @@ impl Segment {
let mut bytes = unsafe {Vec::from_raw_parts(ptr, adjusted_size as _, adjusted_size as _)};
let diff = bytes.len() as u32 - batch_size;
batch.extend2(&mut bytes);
let saved_bytes = storage.write_batches(self.log_path.as_str(), &bytes).await?;
let saved_bytes = storage.write_batches(self.log_path.as_str(), bytes).await?;
index_storage.save_index(&self.index_path, index).await?;
self.last_index_position += adjusted_size;
let size_increment = RETAINED_BATCH_OVERHEAD + diff;
Expand Down

0 comments on commit 83d4346

Please sign in to comment.