diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 93872bc72..ee16b0aa4 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -83,6 +83,8 @@ use crate::replication::ReplicationHandle; use crate::replication::ReplicationSessionId; use crate::runtime::RaftRuntime; use crate::storage::LogFlushed; +use crate::storage::LogFlushKind; +use crate::storage::LogEventChannel; use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; @@ -155,6 +157,8 @@ impl LeaderData { } } +type LogFLushResult = Result, std::io::Error>; + // TODO: remove SM /// The core type implementing the Raft protocol. pub struct RaftCore @@ -207,6 +211,8 @@ where pub(crate) span: Span, + pub(crate) chan_log_flushed: LogEventChannel>, + pub(crate) _p: PhantomData, } @@ -707,27 +713,29 @@ where /// A temp wrapper to make non-blocking `append_to_log` a blocking. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) async fn append_to_log( + pub(crate) async fn append_to_log( &mut self, - entries: I, + entries: Vec, vote: Vote, - last_log_id: LogId, - ) -> Result<(), StorageError> - where - I: IntoIterator + OptionalSend, - I::IntoIter: OptionalSend, - { - tracing::debug!("append_to_log"); - - let (tx, rx) = C::AsyncRuntime::oneshot(); + ) -> Result<(), StorageError> { + tracing::debug!("append_to_log: {}", DisplaySlice::<_>(&entries)); + + let last_log_id = *entries.last().unwrap().get_log_id(); let log_io_id = LogIOId::new(vote, Some(last_log_id)); + let callback = LogFlushed::with_append(log_io_id, &mut self.chan_log_flushed); + self.log_store.append(entries, callback).await?; - let callback = LogFlushed::new(log_io_id, tx); + Ok(()) + } - self.log_store.append(entries, callback).await?; - rx.await - .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))? - .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?; + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) async fn save_vote( + &mut self, vote: Vote + ) -> Result<(), StorageError> { + tracing::debug!("save_vote: {}", &vote); + + let callback = LogFlushed::with_save_vote(vote, &mut self.chan_log_flushed); + self.log_store.save_vote(&vote, callback).await?; Ok(()) } @@ -930,6 +938,13 @@ where return Err(Fatal::Stopped); } + res = self.chan_log_flushed.wait_next() => { + if res.is_err() { + tracing::info!("log event channel is closed"); + return Err(Fatal::Stopped); + } + } + notify_res = self.rx_notify.recv() => { match notify_res { Some(notify) => self.handle_notify(notify)?, @@ -951,6 +966,7 @@ where } } + self.process_log_flushed()?; self.run_engine_commands().await?; // There is a message waking up the loop, process channels one by one. @@ -973,6 +989,27 @@ where } } + /// Process log flushed events as many as possible. + fn process_log_flushed(&mut self) -> Result<(), Fatal> { + while let Some(flush_res) = self.chan_log_flushed.try_recv() { + let io_kind = flush_res.map_err(|e| + Into::>::into(StorageIOError::write_logs(AnyError::new(&e))))?; + // The leader may have changed. + // But reporting to a different leader is not a problem. + match io_kind { + LogFlushKind::Append(log_io_id) => { + if let Ok(mut lh) = self.engine.leader_handler() { + lh.replication_handler().update_local_progress(log_io_id.log_id); + } + } + LogFlushKind::SaveVote(vote) => { + self.engine.state.io_state_mut().update_vote(vote); + } + } + } + Ok(()) + } + /// Process RaftMsg as many as possible. /// /// It returns the number of processed message. @@ -1597,20 +1634,10 @@ where self.leader_data = None; } Command::AppendInputEntries { vote, entries } => { - let last_log_id = *entries.last().unwrap().get_log_id(); - tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); - - self.append_to_log(entries, vote, last_log_id).await?; - - // The leader may have changed. - // But reporting to a different leader is not a problem. - if let Ok(mut lh) = self.engine.leader_handler() { - lh.replication_handler().update_local_progress(Some(last_log_id)); - } + self.append_to_log(entries, vote).await?; } Command::SaveVote { vote } => { - self.log_store.save_vote(&vote).await?; - self.engine.state.io_state_mut().update_vote(vote); + self.save_vote(vote).await?; } Command::PurgeLog { upto } => { self.log_store.purge(upto).await?; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index de92a916b..e93003920 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -76,6 +76,7 @@ pub use crate::raft::runtime_config_handle::RuntimeConfigHandle; use crate::raft::trigger::Trigger; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; +use crate::storage::LogEventChannel; use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::JoinErrorOf; use crate::type_config::alias::ResponderOf; @@ -300,6 +301,8 @@ where C: RaftTypeConfig command_state: CommandState::default(), span: core_span, + chan_log_flushed: LogEventChannel::new(), + _p: Default::default(), }; diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 7521bc4da..1f2ad449f 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -21,7 +21,8 @@ mod membership_state; pub(crate) mod snapshot_streaming; mod vote_state_reader; -#[allow(unused)] pub(crate) use io_state::log_io_id::LogIOId; +#[allow(unused)] +pub(crate) use io_state::log_io_id::LogIOId; pub(crate) use io_state::IOState; #[cfg(test)] diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index 5ba2a9cc1..1fd87989c 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -1,32 +1,158 @@ //! Callbacks used by Storage API use std::io; +use std::cmp::Ordering; +use std::collections::BinaryHeap; -use tokio::sync::oneshot; +use tokio::sync::mpsc; -use crate::async_runtime::AsyncOneshotSendExt; use crate::raft_state::io_state::log_io_id::LogIOId; -use crate::type_config::alias::OneshotSenderOf; +use crate::Vote; use crate::LogId; use crate::RaftTypeConfig; use crate::StorageIOError; +struct LogEvent (R, u64); + +impl Ord for LogEvent { + fn cmp(&self, other: &Self) -> Ordering { + self.1.cmp(&other.1).reverse() + } +} + +impl PartialOrd for LogEvent { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.1.cmp(&other.1).reverse()) + } +} + +impl PartialEq for LogEvent { + fn eq(&self, other: &Self) -> bool { + self.1 == other.1 + } +} + +impl Eq for LogEvent {} + +pub struct LogEventSender { + event_seq: u64, + sender: Option>>, +} + +impl Drop for LogEventSender { + fn drop(&mut self) { + if self.sender.take().is_some() { + panic!("dropped without sending event"); + } + } +} + +impl LogEventSender { + fn send(mut self, evt: R) -> Result<(), mpsc::error::SendError> { + let sender = self.sender.take().unwrap(); + sender.send(LogEvent(evt, self.event_seq)) + .map_err(|err| mpsc::error::SendError(err.0.0)) + } +} + +pub struct LogEventChannel { + tx_log_events: mpsc::UnboundedSender>, + rx_log_events: mpsc::UnboundedReceiver>, + send_seq: u64, + recv_seq: u64, + recv_heap: BinaryHeap>, +} + +impl LogEventChannel { + pub fn new() -> Self { + let (tx_log_events, rx_log_events) = mpsc::unbounded_channel(); + Self { + tx_log_events, + rx_log_events, + send_seq: 0, + recv_seq: 0, + recv_heap: BinaryHeap::new(), + } + } + + pub fn try_recv(&mut self) -> Option { + if let Some(ev) = self.recv_heap.peek() { + if ev.1 == self.recv_seq + 1 { + self.recv_seq += 1; + return Some(self.recv_heap.pop().unwrap().0); + } + } + while let Ok(ev) = self.rx_log_events.try_recv() { + if ev.1 == self.recv_seq + 1 { + self.recv_seq += 1; + return Some(ev.0) + } + self.recv_heap.push(ev); + } + None + } + + pub async fn wait_next(&mut self) -> Result<(), mpsc::error::TryRecvError> { + if let Some(ev) = self.recv_heap.peek() { + if ev.1 == self.recv_seq + 1 { + return Ok(()); + } + } + while let Some(ev) = self.rx_log_events.recv().await { + let event_seq = ev.1; + self.recv_heap.push(ev); + if event_seq == self.recv_seq + 1 { + return Ok(()); + } + } + Err(mpsc::error::TryRecvError::Disconnected) + } + + fn new_sender(&mut self) -> LogEventSender { + let sender = Some(self.tx_log_events.clone()); + self.send_seq += 1; + let event_seq = self.send_seq; + LogEventSender { event_seq, sender } + } +} + +#[derive(Debug)] +pub(crate) enum LogFlushKind +where C: RaftTypeConfig +{ + Append(LogIOId), + SaveVote(Vote), +} + /// A oneshot callback for completion of log io operation. pub struct LogFlushed where C: RaftTypeConfig { - log_io_id: LogIOId, - tx: OneshotSenderOf, io::Error>>, + io_kind: LogFlushKind, + sender: LogEventSender, io::Error>>, } impl LogFlushed where C: RaftTypeConfig { - pub(crate) fn new( + pub(crate) fn with_append( log_io_id: LogIOId, - tx: OneshotSenderOf, io::Error>>, + event_chan: &mut LogEventChannel, io::Error>>, ) -> Self { - Self { log_io_id, tx } + Self { + io_kind: LogFlushKind::Append(log_io_id), + sender: event_chan.new_sender() + } + } + + pub(crate) fn with_save_vote( + vote: Vote, + event_chan: &mut LogEventChannel, io::Error>>, + ) -> Self { + Self { + io_kind: LogFlushKind::SaveVote(vote), + sender: event_chan.new_sender() + } } /// Report log io completion event. @@ -34,10 +160,10 @@ where C: RaftTypeConfig /// It will be called when the log is successfully appended to the storage or an error occurs. pub fn log_io_completed(self, result: Result<(), io::Error>) { let res = if let Err(e) = result { - tracing::error!("LogFlush error: {}, while flushing upto {}", e, self.log_io_id); - self.tx.send(Err(e)) + tracing::error!("LogFlush error: {}, io_kind {:?}", e, self.io_kind); + self.sender.send(Err(e)) } else { - self.tx.send(Ok(self.log_io_id)) + self.sender.send(Ok(self.io_kind)) }; if let Err(e) = res { @@ -51,7 +177,7 @@ pub struct LogApplied where C: RaftTypeConfig { last_log_id: LogId, - tx: oneshot::Sender, Vec), StorageIOError>>, + sender: LogEventSender, Vec), StorageIOError>>, } impl LogApplied @@ -60,9 +186,12 @@ where C: RaftTypeConfig #[allow(dead_code)] pub(crate) fn new( last_log_id: LogId, - tx: oneshot::Sender, Vec), StorageIOError>>, + event_chan: &mut LogEventChannel, Vec), StorageIOError>>, ) -> Self { - Self { last_log_id, tx } + Self { + last_log_id, + sender: event_chan.new_sender() + } } /// Report apply io completion event. @@ -74,11 +203,11 @@ where C: RaftTypeConfig Ok(x) => { tracing::debug!("LogApplied upto {}", self.last_log_id); let resp = (self.last_log_id, x); - self.tx.send(Ok(resp)) + self.sender.send(Ok(resp)) } Err(e) => { tracing::error!("LogApplied error: {}, while applying upto {}", e, self.last_log_id); - self.tx.send(Err(e)) + self.sender.send(Err(e)) } }; diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index 1bc075765..a275265b7 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -22,6 +22,8 @@ use crate::display_ext::DisplayOption; use crate::raft_types::SnapshotId; pub use crate::storage::callback::LogApplied; pub use crate::storage::callback::LogFlushed; +pub use crate::storage::callback::LogEventChannel; +pub(crate) use crate::storage::callback::LogFlushKind; use crate::LogId; use crate::OptionalSend; use crate::OptionalSync; diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index 99219c64e..c16d1b275 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -64,7 +64,7 @@ where C: RaftTypeConfig /// ### To ensure correctness: /// /// The vote must be persisted on disk before returning. - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError>; + async fn save_vote(&mut self, vote: &Vote, callback: LogFlushed) -> Result<(), StorageError>; /// Saves the last committed log id to storage. /// diff --git a/openraft/src/storage/v2/raft_log_storage_ext.rs b/openraft/src/storage/v2/raft_log_storage_ext.rs index c5a80e9fc..7b5843fff 100644 --- a/openraft/src/storage/v2/raft_log_storage_ext.rs +++ b/openraft/src/storage/v2/raft_log_storage_ext.rs @@ -4,8 +4,7 @@ use openraft_macros::add_async_trait; use crate::raft_state::LogIOId; use crate::storage::LogFlushed; use crate::storage::RaftLogStorage; -use crate::type_config::alias::AsyncRuntimeOf; -use crate::AsyncRuntime; +use crate::storage::LogEventChannel; use crate::OptionalSend; use crate::RaftTypeConfig; use crate::StorageError; @@ -27,16 +26,16 @@ where C: RaftTypeConfig I: IntoIterator + OptionalSend, I::IntoIter: OptionalSend, { - let (tx, rx) = AsyncRuntimeOf::::oneshot(); + let mut chan = LogEventChannel::new(); // dummy log_io_id let log_io_id = LogIOId::::new(Vote::::default(), None); - let callback = LogFlushed::::new(log_io_id, tx); + let callback = LogFlushed::::with_append(log_io_id, &mut chan); self.append(entries, callback).await?; - rx.await - .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))? + chan.wait_next().await .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?; + let _ = chan.try_recv().unwrap(); Ok(()) } diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 7de7d36ea..167136830 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -18,11 +18,10 @@ use crate::storage::LogState; use crate::storage::RaftLogReaderExt; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; +use crate::storage::LogEventChannel; use crate::storage::StorageHelper; use crate::testing::StoreBuilder; -use crate::type_config::alias::AsyncRuntimeOf; use crate::vote::CommittedLeaderId; -use crate::AsyncRuntime; use crate::LogId; use crate::Membership; use crate::NodeId; @@ -691,7 +690,7 @@ where } pub async fn save_vote(mut store: LS, mut sm: SM) -> Result<(), StorageError> { - store.save_vote(&Vote::new(100, NODE_ID.into())).await?; + save_vote(&mut store, Vote::new(100, NODE_ID.into())).await?; let got = store.read_vote().await?; @@ -1203,8 +1202,8 @@ where Ok(()) } - pub async fn default_vote(sto: &mut LS) -> Result<(), StorageError> { - sto.save_vote(&Vote::new(1, NODE_ID.into())).await?; + pub async fn default_vote(store: &mut LS) -> Result<(), StorageError> { + save_vote(store, Vote::new(1, NODE_ID.into())).await?; Ok(()) } @@ -1281,13 +1280,31 @@ where I: IntoIterator + OptionalSend, I::IntoIter: OptionalSend, { - let (tx, rx) = AsyncRuntimeOf::::oneshot(); + + let mut chan = LogEventChannel::new(); // Dummy log io id for blocking append let log_io_id = LogIOId::::new(Vote::::default(), None); - let cb = LogFlushed::new(log_io_id, tx); + let cb = LogFlushed::with_append(log_io_id, &mut chan); store.append(entries, cb).await?; - rx.await.unwrap().map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?; + chan.wait_next().await.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?; + let _ = chan.try_recv().unwrap(); Ok(()) } + +// A wrapper for calling nonblocking `RaftLogStorage::append()` +async fn save_vote(store: &mut LS, vote: Vote::) -> Result<(), StorageError> +where + C: RaftTypeConfig, + LS: RaftLogStorage, +{ + + let mut chan = LogEventChannel::new(); + let cb = LogFlushed::with_save_vote(vote, &mut chan); + + store.save_vote(&vote, cb).await?; + chan.wait_next().await.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?; + let _ = chan.try_recv().unwrap(); + Ok(()) +} diff --git a/stores/memstore/src/lib.rs b/stores/memstore/src/lib.rs index 57f1e4b76..56852978d 100644 --- a/stores/memstore/src/lib.rs +++ b/stores/memstore/src/lib.rs @@ -342,12 +342,14 @@ impl RaftLogStorage for Arc { self.clone() } - #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + #[tracing::instrument(level = "trace", skip(self, callback))] + async fn save_vote( + &mut self, vote: &Vote, callback: LogFlushed + ) -> Result<(), StorageError> { tracing::debug!(?vote, "save_vote"); let mut h = self.vote.write().await; - *h = Some(*vote); + callback.log_io_completed(Ok(())); Ok(()) } diff --git a/stores/rocksstore/src/lib.rs b/stores/rocksstore/src/lib.rs index a070c71e1..93f6011af 100644 --- a/stores/rocksstore/src/lib.rs +++ b/stores/rocksstore/src/lib.rs @@ -337,9 +337,12 @@ impl RaftLogStorage for RocksLogStore { }) } - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote( + &mut self, vote: &Vote, callback: LogFlushed + ) -> Result<(), StorageError> { self.put_meta::(vote)?; self.db.flush_wal(true).map_err(|e| StorageIOError::write_vote(&e))?; + callback.log_io_completed(Ok(())); Ok(()) } diff --git a/stores/sledstore/src/lib.rs b/stores/sledstore/src/lib.rs index 6bce2f250..d8c6472f1 100644 --- a/stores/sledstore/src/lib.rs +++ b/stores/sledstore/src/lib.rs @@ -495,9 +495,13 @@ impl RaftLogStorage for Arc { }) } - #[tracing::instrument(level = "trace", skip(self))] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { - self.set_vote_(vote).await + #[tracing::instrument(level = "trace", skip(self, callback))] + async fn save_vote( + &mut self, vote: &Vote, callback: LogFlushed + ) -> Result<(), StorageError> { + self.set_vote_(vote).await?; + callback.log_io_completed(Ok(())); + Ok(()) } async fn get_log_reader(&mut self) -> Self::LogReader {