Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer batch accumulator when rolling to new segment #1338

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions bench/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl Consumer {
}

current_iteration = 0;
let mut previous_start_offset = 0;
let start_timestamp = Instant::now();
let mut records = Vec::with_capacity(message_batches as usize);
let mut batch_user_size_bytes = 0;
Expand Down Expand Up @@ -186,7 +187,7 @@ impl Consumer {
);
continue;
}

assert!(previous_start_offset <= polled_messages.messages[0].offset);
if polled_messages.messages.len() != messages_per_batch as usize {
warn!(
"Consumer #{} → expected {} messages, but got {} messages, retrying...",
Expand All @@ -196,9 +197,8 @@ impl Consumer {
);
continue;
}

previous_start_offset = polled_messages.messages[0].offset;
latencies.push(latency);

received_messages += polled_messages.messages.len() as u64;

// We don't need to calculate the size whole batch every time by iterating over it - just always use the size of the first message
Expand Down
9 changes: 5 additions & 4 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ path = "compatibility"
# Determines whether to enforce file synchronization on state updates (boolean).
# `true` ensures immediate writing of data to disk for durability.
# `false` allows the OS to manage write operations, which can improve performance.
enforce_fsync = false
enforce_fsync = true

# Runtime configuration.
[system.runtime]
Expand Down Expand Up @@ -372,7 +372,7 @@ sysinfo_print_interval = "10 s"
# Enables or disables the system cache.
# `true` activates caching for frequently accessed data.
# `false` disables caching, data is always read from the source.
enabled = true
enabled = false

# Maximum size of the cache, e.g. "4GB".
size = "4 GB"
Expand Down Expand Up @@ -444,12 +444,12 @@ validate_checksum = false
# The threshold of buffered messages before triggering a save to disk (integer).
# Specifies how many messages accumulate before persisting to storage.
# Adjusting this can balance between write performance and data durability.
messages_required_to_save = 5000
messages_required_to_save = 2000

# Segment configuration
[system.segment]
# Defines the soft limit for the size of a storage segment.
# When a segment reaches this size, a new segment is created for subsequent data.
# When a segment reaches this size (maximum 4 GB), a new segment is created for subsequent data.
# Example: if `size` is set "1GB", the actual segment size may be 1GB + the size of remaining messages in received batch.
size = "1 GB"
# Configures the message time-based expiry setting.
Expand Down Expand Up @@ -477,6 +477,7 @@ max_entries = 1000
# Maximum age of ID entries in the deduplication cache in human-readable format.
expiry = "1 m"


# Recovery configuration in case of lost data
[system.recovery]
# Controls whether streams/topics/partitions should be recreated if the expected data for existing state is missing (boolean).
Expand Down
12 changes: 9 additions & 3 deletions integration/tests/server/scenarios/message_size_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.await;
send_message_and_check_result(
&client,
MessageToSend::OfSizeWithHeaders(100_001, 10_000_000),
MessageToSend::OfSizeWithHeaders(1_000_000, 10_000_000),
Ok(()),
)
.await;
send_message_and_check_result(
&client,
MessageToSend::OfSizeWithHeaders(1_000_001, 10_000_000),
Err(InvalidResponse(
4017,
23,
Expand All @@ -74,7 +80,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.await;
send_message_and_check_result(
&client,
MessageToSend::OfSizeWithHeaders(100_000, 10_000_001),
MessageToSend::OfSizeWithHeaders(100_000, 1_000_000_001),
Err(InvalidResponse(
4022,
23,
Expand All @@ -83,7 +89,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
)
.await;

assert_message_count(&client, 6).await;
assert_message_count(&client, 7).await;
cleanup_system(&client).await;
assert_clean_system(&client).await;
}
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/server/scenarios/system_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(topic.name, TOPIC_NAME);
assert_eq!(topic.partitions_count, PARTITIONS_COUNT);
assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize);
assert_eq!(topic.size, 55890);
assert_eq!(topic.size, 55914);
assert_eq!(topic.messages_count, MESSAGES_COUNT as u64);
let topic_partition = topic.partitions.get((PARTITION_ID - 1) as usize).unwrap();
assert_eq!(topic_partition.id, PARTITION_ID);
Expand Down
6 changes: 3 additions & 3 deletions integration/tests/streaming/common/test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use server::configs::system::SystemConfig;
use server::streaming::persistence::persister::FilePersister;
use server::streaming::storage::SystemStorage;
use server::streaming::iggy_storage::SystemStorage;
use server::streaming::persistence::persister::{FilePersister, FileWithSyncPersister};
use std::sync::Arc;
use tokio::fs;
use uuid::Uuid;
Expand All @@ -20,7 +20,7 @@ impl TestSetup {

let config = Arc::new(config);
fs::create_dir(config.get_system_path()).await.unwrap();
let persister = FilePersister {};
let persister = FilePersister;
let storage = Arc::new(SystemStorage::new(config.clone(), Arc::new(persister)));
TestSetup { config, storage }
}
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/consumer_offset.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::streaming::common::test_setup::TestSetup;
use iggy::consumer::ConsumerKind;
use server::configs::system::SystemConfig;
use server::streaming::iggy_storage::PartitionStorage;
use server::streaming::partitions::partition::ConsumerOffset;
use server::streaming::storage::PartitionStorage;
use std::sync::Arc;
use tokio::fs;

Expand Down
13 changes: 10 additions & 3 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async fn should_persist_segment() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -64,6 +65,7 @@ async fn should_load_existing_segment_from_disk() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -91,6 +93,7 @@ async fn should_load_existing_segment_from_disk() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -128,6 +131,7 @@ async fn should_persist_and_load_segment_with_messages() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -173,12 +177,13 @@ async fn should_persist_and_load_segment_with_messages() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(true).await.unwrap();
let mut loaded_segment = segment::Segment::create(
stream_id,
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
IggyExpiry::NeverExpire,
Expand Down Expand Up @@ -211,6 +216,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
message_expiry,
Expand Down Expand Up @@ -258,7 +264,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
.append_batch(batch_size, messages_count as u32, &messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(true).await.unwrap();

segment.is_closed = true;
let is_expired = segment.is_expired(now).await;
Expand All @@ -279,6 +285,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
topic_id,
partition_id,
start_offset,
None,
setup.config.clone(),
setup.storage.clone(),
message_expiry,
Expand Down Expand Up @@ -343,7 +350,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
.append_batch(not_expired_message_size, 1, &not_expired_messages)
.await
.unwrap();
segment.persist_messages().await.unwrap();
segment.persist_messages(true).await.unwrap();

let is_expired = segment.is_expired(now).await;
assert!(!is_expired);
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::utils::byte_size::IggyByteSize;
use crate::utils::topic_size::MaxTopicSize;
use strum::{EnumDiscriminants, FromRepr, IntoStaticStr};
use thiserror::Error;
use tokio::task::JoinError;

#[derive(Debug, Error, EnumDiscriminants, IntoStaticStr)]
#[repr(u32)]
Expand Down Expand Up @@ -124,6 +125,8 @@ pub enum IggyError {
AccessTokenMissing = 77,
#[error("Invalid access token")]
InvalidAccessToken = 78,
#[error("Failed to join the tokio handle")]
JoinHandle(#[from] JoinError) = 79,
#[error("Client with ID: {0} was not found.")]
ClientNotFound(u32) = 100,
#[error("Invalid client ID")]
Expand All @@ -148,6 +151,8 @@ pub enum IggyError {
CannotParseBool(#[from] std::str::ParseBoolError) = 208,
#[error("Cannot parse header kind from {0}")]
CannotParseHeaderKind(String) = 209,
#[error("End of file")]
Eof = 210,
#[error("HTTP response error, status: {0}, body: {1}")]
HttpResponseError(u16, String) = 300,
#[error("Request middleware error")]
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ pub mod flush_unsaved_buffer;
pub mod poll_messages;
pub mod send_messages;

const MAX_HEADERS_SIZE: u32 = 100 * 1000;
pub const MAX_PAYLOAD_SIZE: u32 = 10 * 1000 * 1000;
const MAX_HEADERS_SIZE: u32 = 1_000_000;
pub const MAX_PAYLOAD_SIZE: u32 = 1_000_000_000;
3 changes: 2 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dotenvy = { version = "0.15.7" }
figlet-rs = "0.1.5"
figment = { version = "0.10.18", features = ["toml", "env"] }
flume = "0.11.0"
futures = "0.3.30"
futures = { version = "0.3.30" }
iggy = { path = "../sdk" }
jsonwebtoken = "9.3.0"
log = "0.4.20"
Expand All @@ -53,6 +53,7 @@ opentelemetry_sdk = { version = "0.26.0", features = [
"trace",
"tokio",
] }
pin-project = "1.1.7"
prometheus-client = "0.22.2"
quinn = { version = "0.11.5" }
rcgen = "0.13.1"
Expand Down
2 changes: 1 addition & 1 deletion server/src/channels/commands/maintain_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ async fn delete_segments(

if partition.get_segments().is_empty() {
let start_offset = last_end_offset + 1;
partition.add_persisted_segment(start_offset).await?;
partition.add_persisted_segment(None, start_offset).await?;
}
}
Err(error) => {
Expand Down
2 changes: 1 addition & 1 deletion server/src/compat/storage_conversion/converter.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::state::command::EntryCommand;
use crate::state::models::CreatePersonalAccessTokenWithHash;
use crate::state::State;
use crate::streaming::iggy_storage::SystemStorage;
use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::storage::SystemStorage;
use crate::streaming::streams::stream::Stream;
use crate::streaming::users::user::User;
use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup;
Expand Down
23 changes: 3 additions & 20 deletions server/src/compat/storage_conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@ use crate::configs::system::SystemConfig;
use crate::state::system::{PartitionState, StreamState, TopicState};
use crate::state::State;
use crate::streaming::batching::message_batch::RetainedMessageBatch;
use crate::streaming::iggy_storage::{
PartitionStorage, SegmentStorage, StreamStorage, SystemInfoStorage, SystemStorage, TopicStorage,
};
use crate::streaming::partitions::partition::{ConsumerOffset, Partition};
use crate::streaming::persistence::persister::Persister;
use crate::streaming::segments::index::{Index, IndexRange};
use crate::streaming::segments::segment::Segment;
use crate::streaming::storage::{
PartitionStorage, SegmentStorage, StreamStorage, SystemInfoStorage, SystemStorage, TopicStorage,
};
use crate::streaming::streams::stream::Stream;
use crate::streaming::systems::info::SystemInfo;
use crate::streaming::topics::topic::Topic;
use async_trait::async_trait;
use iggy::consumer::ConsumerKind;
use iggy::error::IggyError;
use iggy::utils::byte_size::IggyByteSize;
use std::path::Path;
use std::sync::Arc;
use tokio::fs::{read_dir, rename};
Expand Down Expand Up @@ -215,14 +214,6 @@ impl SegmentStorage for NoopSegmentStorage {
Ok(())
}

async fn load_message_batches(
&self,
_segment: &Segment,
_index_range: &IndexRange,
) -> Result<Vec<RetainedMessageBatch>, IggyError> {
Ok(vec![])
}

async fn load_newest_batches_by_size(
&self,
_segment: &Segment,
Expand All @@ -231,14 +222,6 @@ impl SegmentStorage for NoopSegmentStorage {
Ok(vec![])
}

async fn save_batches(
&self,
_segment: &Segment,
_batch: RetainedMessageBatch,
) -> Result<IggyByteSize, IggyError> {
Ok(IggyByteSize::default())
}

async fn load_message_ids(&self, _segment: &Segment) -> Result<Vec<u128>, IggyError> {
Ok(vec![])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub async fn load(
partition.topic_id,
partition.partition_id,
start_offset,
None,
partition.config.clone(),
partition.storage.clone(),
partition.message_expiry,
Expand Down
2 changes: 1 addition & 1 deletion server/src/streaming/batching/batch_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::sizeable::Sizeable;
use std::sync::Arc;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BatchAccumulator {
base_offset: u64,
current_size: IggyByteSize,
Expand Down
10 changes: 10 additions & 0 deletions server/src/streaming/batching/message_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ impl RetainedMessageBatch {
self.base_offset + self.last_offset_delta as u64
}

pub fn extend2(&self, bytes: &mut [u8]) {
let length = self.length.as_bytes_u64() as u32;
bytes[0..8].copy_from_slice(&self.base_offset.to_le_bytes());
bytes[8..12].copy_from_slice(&length.to_le_bytes());
bytes[12..16].copy_from_slice(&self.last_offset_delta.to_le_bytes());
bytes[16..24].copy_from_slice(&self.max_timestamp.to_le_bytes());
bytes[24..(self.length.as_bytes_u64() + RETAINED_BATCH_OVERHEAD) as usize]
.copy_from_slice(&self.bytes);
}

pub fn extend(&self, bytes: &mut BytesMut) {
bytes.put_u64_le(self.base_offset);
bytes.put_u32_le(self.length.as_bytes_u64() as u32);
Expand Down
Loading
Loading