Skip to content

Add io finished callback to RaftLogStorage::save_vote #1131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 55 additions & 28 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ use crate::replication::ReplicationHandle;
use crate::replication::ReplicationSessionId;
use crate::runtime::RaftRuntime;
use crate::storage::LogFlushed;
use crate::storage::LogFlushKind;
use crate::storage::LogEventChannel;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
Expand Down Expand Up @@ -155,6 +157,8 @@ impl<C: RaftTypeConfig> LeaderData<C> {
}
}

type LogFLushResult<C> = Result<LogFlushKind<C>, std::io::Error>;

// TODO: remove SM
/// The core type implementing the Raft protocol.
pub struct RaftCore<C, N, LS, SM>
Expand Down Expand Up @@ -207,6 +211,8 @@ where

pub(crate) span: Span,

pub(crate) chan_log_flushed: LogEventChannel<LogFLushResult<C>>,

pub(crate) _p: PhantomData<SM>,
}

Expand Down Expand Up @@ -707,27 +713,29 @@ where

/// A temp wrapper to make non-blocking `append_to_log` a blocking.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn append_to_log<I>(
pub(crate) async fn append_to_log(
&mut self,
entries: I,
entries: Vec<C::Entry>,
vote: Vote<C::NodeId>,
last_log_id: LogId<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>>
where
I: IntoIterator<Item = C::Entry> + OptionalSend,
I::IntoIter: OptionalSend,
{
tracing::debug!("append_to_log");

let (tx, rx) = C::AsyncRuntime::oneshot();
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!("append_to_log: {}", DisplaySlice::<_>(&entries));

let last_log_id = *entries.last().unwrap().get_log_id();
let log_io_id = LogIOId::new(vote, Some(last_log_id));
let callback = LogFlushed::with_append(log_io_id, &mut self.chan_log_flushed);
self.log_store.append(entries, callback).await?;

let callback = LogFlushed::new(log_io_id, tx);
Ok(())
}

self.log_store.append(entries, callback).await?;
rx.await
.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?
.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?;
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn save_vote(
&mut self, vote: Vote<C::NodeId>
) -> Result<(), StorageError<C::NodeId>> {
tracing::debug!("save_vote: {}", &vote);

let callback = LogFlushed::with_save_vote(vote, &mut self.chan_log_flushed);
self.log_store.save_vote(&vote, callback).await?;
Ok(())
}

Expand Down Expand Up @@ -930,6 +938,13 @@ where
return Err(Fatal::Stopped);
}

res = self.chan_log_flushed.wait_next() => {
if res.is_err() {
tracing::info!("log event channel is closed");
return Err(Fatal::Stopped);
}
}

notify_res = self.rx_notify.recv() => {
match notify_res {
Some(notify) => self.handle_notify(notify)?,
Expand All @@ -951,6 +966,7 @@ where
}
}

self.process_log_flushed()?;
self.run_engine_commands().await?;

// There is a message waking up the loop, process channels one by one.
Expand All @@ -973,6 +989,27 @@ where
}
}

/// Process log flushed events as many as possible.
fn process_log_flushed(&mut self) -> Result<(), Fatal<C>> {
while let Some(flush_res) = self.chan_log_flushed.try_recv() {
let io_kind = flush_res.map_err(|e|
Into::<StorageError<_>>::into(StorageIOError::write_logs(AnyError::new(&e))))?;
// The leader may have changed.
// But reporting to a different leader is not a problem.
match io_kind {
LogFlushKind::Append(log_io_id) => {
if let Ok(mut lh) = self.engine.leader_handler() {
lh.replication_handler().update_local_progress(log_io_id.log_id);
}
}
LogFlushKind::SaveVote(vote) => {
self.engine.state.io_state_mut().update_vote(vote);
}
}
}
Ok(())
}

/// Process RaftMsg as many as possible.
///
/// It returns the number of processed message.
Expand Down Expand Up @@ -1597,20 +1634,10 @@ where
self.leader_data = None;
}
Command::AppendInputEntries { vote, entries } => {
let last_log_id = *entries.last().unwrap().get_log_id();
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);

self.append_to_log(entries, vote, last_log_id).await?;

// The leader may have changed.
// But reporting to a different leader is not a problem.
if let Ok(mut lh) = self.engine.leader_handler() {
lh.replication_handler().update_local_progress(Some(last_log_id));
}
self.append_to_log(entries, vote).await?;
}
Command::SaveVote { vote } => {
self.log_store.save_vote(&vote).await?;
self.engine.state.io_state_mut().update_vote(vote);
self.save_vote(vote).await?;
}
Command::PurgeLog { upto } => {
self.log_store.purge(upto).await?;
Expand Down
3 changes: 3 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub use crate::raft::runtime_config_handle::RuntimeConfigHandle;
use crate::raft::trigger::Trigger;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::storage::LogEventChannel;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::JoinErrorOf;
use crate::type_config::alias::ResponderOf;
Expand Down Expand Up @@ -300,6 +301,8 @@ where C: RaftTypeConfig
command_state: CommandState::default(),
span: core_span,

chan_log_flushed: LogEventChannel::new(),

_p: Default::default(),
};

Expand Down
3 changes: 2 additions & 1 deletion openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ mod membership_state;
pub(crate) mod snapshot_streaming;
mod vote_state_reader;

#[allow(unused)] pub(crate) use io_state::log_io_id::LogIOId;
#[allow(unused)]
pub(crate) use io_state::log_io_id::LogIOId;
pub(crate) use io_state::IOState;

#[cfg(test)]
Expand Down
161 changes: 145 additions & 16 deletions openraft/src/storage/callback.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,169 @@
//! Callbacks used by Storage API

use std::io;
use std::cmp::Ordering;
use std::collections::BinaryHeap;

use tokio::sync::oneshot;
use tokio::sync::mpsc;

use crate::async_runtime::AsyncOneshotSendExt;
use crate::raft_state::io_state::log_io_id::LogIOId;
use crate::type_config::alias::OneshotSenderOf;
use crate::Vote;
use crate::LogId;
use crate::RaftTypeConfig;
use crate::StorageIOError;

struct LogEvent<R> (R, u64);

impl<R> Ord for LogEvent<R> {
fn cmp(&self, other: &Self) -> Ordering {
self.1.cmp(&other.1).reverse()
}
}

impl<R> PartialOrd for LogEvent<R> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.1.cmp(&other.1).reverse())
}
}

impl<R> PartialEq for LogEvent<R> {
fn eq(&self, other: &Self) -> bool {
self.1 == other.1
}
}

impl<R> Eq for LogEvent<R> {}

pub struct LogEventSender<R> {
event_seq: u64,
sender: Option<mpsc::UnboundedSender<LogEvent<R>>>,
}

impl<R> Drop for LogEventSender<R> {
fn drop(&mut self) {
if self.sender.take().is_some() {
panic!("dropped without sending event");
}
}
}

impl<R> LogEventSender<R> {
fn send(mut self, evt: R) -> Result<(), mpsc::error::SendError<R>> {
let sender = self.sender.take().unwrap();
sender.send(LogEvent(evt, self.event_seq))
.map_err(|err| mpsc::error::SendError(err.0.0))
}
}

pub struct LogEventChannel<R> {
tx_log_events: mpsc::UnboundedSender<LogEvent<R>>,
rx_log_events: mpsc::UnboundedReceiver<LogEvent<R>>,
send_seq: u64,
recv_seq: u64,
recv_heap: BinaryHeap<LogEvent<R>>,
}

impl<R> LogEventChannel<R> {
pub fn new() -> Self {
let (tx_log_events, rx_log_events) = mpsc::unbounded_channel();
Self {
tx_log_events,
rx_log_events,
send_seq: 0,
recv_seq: 0,
recv_heap: BinaryHeap::new(),
}
}

pub fn try_recv(&mut self) -> Option<R> {
if let Some(ev) = self.recv_heap.peek() {
if ev.1 == self.recv_seq + 1 {
self.recv_seq += 1;
return Some(self.recv_heap.pop().unwrap().0);
}
}
while let Ok(ev) = self.rx_log_events.try_recv() {
if ev.1 == self.recv_seq + 1 {
self.recv_seq += 1;
return Some(ev.0)
}
self.recv_heap.push(ev);
}
None
}

pub async fn wait_next(&mut self) -> Result<(), mpsc::error::TryRecvError> {
if let Some(ev) = self.recv_heap.peek() {
if ev.1 == self.recv_seq + 1 {
return Ok(());
}
}
while let Some(ev) = self.rx_log_events.recv().await {
let event_seq = ev.1;
self.recv_heap.push(ev);
if event_seq == self.recv_seq + 1 {
return Ok(());
}
}
Err(mpsc::error::TryRecvError::Disconnected)
}

fn new_sender(&mut self) -> LogEventSender<R> {
let sender = Some(self.tx_log_events.clone());
self.send_seq += 1;
let event_seq = self.send_seq;
LogEventSender { event_seq, sender }
}
}

#[derive(Debug)]
pub(crate) enum LogFlushKind<C>
where C: RaftTypeConfig
{
Append(LogIOId<C::NodeId>),
SaveVote(Vote<C::NodeId>),
}

/// A oneshot callback for completion of log io operation.
pub struct LogFlushed<C>
where C: RaftTypeConfig
{
log_io_id: LogIOId<C::NodeId>,
tx: OneshotSenderOf<C, Result<LogIOId<C::NodeId>, io::Error>>,
io_kind: LogFlushKind<C>,
sender: LogEventSender<Result<LogFlushKind<C>, io::Error>>,
}

impl<C> LogFlushed<C>
where C: RaftTypeConfig
{
pub(crate) fn new(
pub(crate) fn with_append(
log_io_id: LogIOId<C::NodeId>,
tx: OneshotSenderOf<C, Result<LogIOId<C::NodeId>, io::Error>>,
event_chan: &mut LogEventChannel<Result<LogFlushKind<C>, io::Error>>,
) -> Self {
Self { log_io_id, tx }
Self {
io_kind: LogFlushKind::Append(log_io_id),
sender: event_chan.new_sender()
}
}

pub(crate) fn with_save_vote(
vote: Vote<C::NodeId>,
event_chan: &mut LogEventChannel<Result<LogFlushKind<C>, io::Error>>,
) -> Self {
Self {
io_kind: LogFlushKind::SaveVote(vote),
sender: event_chan.new_sender()
}
}

/// Report log io completion event.
///
/// It will be called when the log is successfully appended to the storage or an error occurs.
pub fn log_io_completed(self, result: Result<(), io::Error>) {
let res = if let Err(e) = result {
tracing::error!("LogFlush error: {}, while flushing upto {}", e, self.log_io_id);
self.tx.send(Err(e))
tracing::error!("LogFlush error: {}, io_kind {:?}", e, self.io_kind);
self.sender.send(Err(e))
} else {
self.tx.send(Ok(self.log_io_id))
self.sender.send(Ok(self.io_kind))
};

if let Err(e) = res {
Expand All @@ -51,7 +177,7 @@ pub struct LogApplied<C>
where C: RaftTypeConfig
{
last_log_id: LogId<C::NodeId>,
tx: oneshot::Sender<Result<(LogId<C::NodeId>, Vec<C::R>), StorageIOError<C::NodeId>>>,
sender: LogEventSender<Result<(LogId<C::NodeId>, Vec<C::R>), StorageIOError<C::NodeId>>>,
}

impl<C> LogApplied<C>
Expand All @@ -60,9 +186,12 @@ where C: RaftTypeConfig
#[allow(dead_code)]
pub(crate) fn new(
last_log_id: LogId<C::NodeId>,
tx: oneshot::Sender<Result<(LogId<C::NodeId>, Vec<C::R>), StorageIOError<C::NodeId>>>,
event_chan: &mut LogEventChannel<Result<(LogId<C::NodeId>, Vec<C::R>), StorageIOError<C::NodeId>>>,
) -> Self {
Self { last_log_id, tx }
Self {
last_log_id,
sender: event_chan.new_sender()
}
}

/// Report apply io completion event.
Expand All @@ -74,11 +203,11 @@ where C: RaftTypeConfig
Ok(x) => {
tracing::debug!("LogApplied upto {}", self.last_log_id);
let resp = (self.last_log_id, x);
self.tx.send(Ok(resp))
self.sender.send(Ok(resp))
}
Err(e) => {
tracing::error!("LogApplied error: {}, while applying upto {}", e, self.last_log_id);
self.tx.send(Err(e))
self.sender.send(Err(e))
}
};

Expand Down
Loading
Loading