Skip to content
Open
7 changes: 7 additions & 0 deletions quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl ClusterNode {
ingester_status: member.ingester_status,
is_ready: member.is_ready,
is_self_node,
availability_zone: member.availability_zone,
};
let node = ClusterNode {
inner: Arc::new(inner),
Expand Down Expand Up @@ -141,6 +142,10 @@ impl ClusterNode {
pub fn is_self_node(&self) -> bool {
self.inner.is_self_node
}

pub fn availability_zone(&self) -> Option<&str> {
self.inner.availability_zone.as_deref()
}
}

impl Debug for ClusterNode {
Expand All @@ -162,6 +167,7 @@ impl PartialEq for ClusterNode {
&& self.inner.indexing_tasks == other.inner.indexing_tasks
&& self.inner.is_ready == other.inner.is_ready
&& self.inner.is_self_node == other.inner.is_self_node
&& self.inner.availability_zone == other.inner.availability_zone
}
}

Expand All @@ -175,4 +181,5 @@ struct InnerNode {
ingester_status: IngesterStatus,
is_ready: bool,
is_self_node: bool,
availability_zone: Option<String>,
}
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub mod rate_limited_tracing;
pub mod rate_limiter;
pub mod rendezvous_hasher;
pub mod retry;
pub mod ring_buffer;
pub mod runtimes;
pub mod shared_consts;
pub mod sorted_iter;
Expand Down
170 changes: 170 additions & 0 deletions quickwit/quickwit-common/src/ring_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Debug, Formatter};

/// Fixed-size buffer that keeps the last N elements pushed into it.
///
/// `head` is the write cursor. It advances by one on each push and wraps
/// back to 0 when it reaches N, overwriting the oldest element.
///
/// ```text
/// RingBuffer<u32, 4> after pushing 1, 2, 3, 4, 5, 6:
///
/// buffer = [5, 6, 3, 4] head = 2 len = 4
/// ^
/// next write goes here
///
/// logical view (oldest → newest): [3, 4, 5, 6]
/// ```
pub struct RingBuffer<T: Copy + Default, const N: usize> {
buffer: [T; N],
head: usize,
len: usize,
}

impl<T: Copy + Default, const N: usize> Default for RingBuffer<T, N> {
fn default() -> Self {
Self {
buffer: [T::default(); N],
head: 0,
len: 0,
}
}
}

impl<T: Copy + Default + Debug, const N: usize> Debug for RingBuffer<T, N> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_list().entries(self.iter()).finish()
}
}

impl<T: Copy + Default, const N: usize> RingBuffer<T, N> {
pub fn push_back(&mut self, value: T) {
self.buffer[self.head] = value;
self.head = (self.head + 1) % N;
if self.len < N {
self.len += 1;
}
}

pub fn last(&self) -> Option<T> {
if self.len == 0 {
return None;
}
Some(self.buffer[(self.head + N - 1) % N])
}

pub fn front(&self) -> Option<T> {
if self.len == 0 {
return None;
}
Some(self.buffer[(self.head + N - self.len) % N])
}

pub fn len(&self) -> usize {
self.len
}

pub fn is_empty(&self) -> bool {
self.len == 0
}

pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
let start = (self.head + N - self.len) % N;
(0..self.len).map(move |i| &self.buffer[(start + i) % N])
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_empty() {
let rb = RingBuffer::<u32, 4>::default();
assert!(rb.is_empty());
assert_eq!(rb.len(), 0);
assert_eq!(rb.last(), None);
assert_eq!(rb.front(), None);
assert_eq!(rb.iter().count(), 0);
}

#[test]
fn test_single_push() {
let mut rb = RingBuffer::<u32, 4>::default();
rb.push_back(10);
assert_eq!(rb.len(), 1);
assert!(!rb.is_empty());
assert_eq!(rb.last(), Some(10));
assert_eq!(rb.front(), Some(10));
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![10]);
}

#[test]
fn test_partial_fill() {
let mut rb = RingBuffer::<u32, 4>::default();
rb.push_back(1);
rb.push_back(2);
rb.push_back(3);
assert_eq!(rb.len(), 3);
assert_eq!(rb.last(), Some(3));
assert_eq!(rb.front(), Some(1));
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![1, 2, 3]);
}

#[test]
fn test_exactly_full() {
let mut rb = RingBuffer::<u32, 4>::default();
for i in 1..=4 {
rb.push_back(i);
}
assert_eq!(rb.len(), 4);
assert_eq!(rb.last(), Some(4));
assert_eq!(rb.front(), Some(1));
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![1, 2, 3, 4]);
}

#[test]
fn test_wrap_around() {
let mut rb = RingBuffer::<u32, 4>::default();
for i in 1..=6 {
rb.push_back(i);
}
assert_eq!(rb.len(), 4);
assert_eq!(rb.last(), Some(6));
assert_eq!(rb.front(), Some(3));
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![3, 4, 5, 6]);
}

#[test]
fn test_many_wraps() {
let mut rb = RingBuffer::<u32, 3>::default();
for i in 1..=100 {
rb.push_back(i);
}
assert_eq!(rb.len(), 3);
assert_eq!(rb.last(), Some(100));
assert_eq!(rb.front(), Some(98));
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![98, 99, 100]);
}

#[test]
fn test_debug() {
let mut rb = RingBuffer::<u32, 3>::default();
rb.push_back(1);
rb.push_back(2);
assert_eq!(format!("{:?}", rb), "[1, 2]");
}
}
3 changes: 3 additions & 0 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";
/// Key used in chitchat to broadcast the status of an ingester.
pub const INGESTER_STATUS_KEY: &str = "ingester.status";

/// Prefix used in chitchat to broadcast per-source ingester capacity scores and open shard counts.
pub const INGESTER_CAPACITY_SCORE_PREFIX: &str = "ingester.capacity_score:";

/// File name for the encoded list of fields in the split
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ quickwit-cluster = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
quickwit-indexing = { workspace = true }
quickwit-ingest = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true, features = ["testsuite"] }

Expand Down
24 changes: 4 additions & 20 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2223,11 +2223,7 @@ mod tests {
assert!(&retain_shards_for_source.shard_ids.is_empty());
Ok(RetainShardsResponse {})
});
let client = IngesterServiceClient::from_mock(mock_ingester);
let ingester = IngesterPoolEntry {
client,
status: IngesterStatus::Ready,
};
let ingester = IngesterPoolEntry::ready_with_client(IngesterServiceClient::from_mock(mock_ingester));
ingester_pool.insert("node1".into(), ingester);

let cluster_config = ClusterConfig::for_test();
Expand Down Expand Up @@ -2273,11 +2269,7 @@ mod tests {
);
Ok(RetainShardsResponse {})
});
let client = IngesterServiceClient::from_mock(mock_ingester);
let ingester = IngesterPoolEntry {
client,
status: IngesterStatus::Ready,
};
let ingester = IngesterPoolEntry::ready_with_client(IngesterServiceClient::from_mock(mock_ingester));
ingester_pool.insert("node1".into(), ingester);

let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
Expand Down Expand Up @@ -2652,11 +2644,7 @@ mod tests {
};
Ok(response)
});
let client = IngesterServiceClient::from_mock(mock_ingester);
let ingester = IngesterPoolEntry {
client,
status: IngesterStatus::Ready,
};
let ingester = IngesterPoolEntry::ready_with_client(IngesterServiceClient::from_mock(mock_ingester));
ingester_pool.insert(ingester_id, ingester);

let mut mock_metastore = MockMetastoreService::new();
Expand Down Expand Up @@ -2810,11 +2798,7 @@ mod tests {
};
Ok(response)
});
let client = IngesterServiceClient::from_mock(mock_ingester);
let ingester = IngesterPoolEntry {
client,
status: IngesterStatus::Ready,
};
let ingester = IngesterPoolEntry::ready_with_client(IngesterServiceClient::from_mock(mock_ingester));
ingester_pool.insert(ingester_id, ingester);

let mut mock_metastore = MockMetastoreService::new();
Expand Down
Loading
Loading