Skip to content

Commit d771a7f

Browse files
authored
feat(storage): add Batch trait abstraction for atomic write operations (#231)
* feat(storage): add Batch trait abstraction for atomic write operations (#222) This commit introduces a Batch trait to abstract batch write operations, enabling support for both standalone (RocksDB) and future cluster (Raft) modes. Changes: - Add batch.rs module with Batch trait, RocksBatch, and BinlogBatch implementations - Add create_batch() method to Redis struct for creating batch instances - Refactor redis_strings.rs to use Batch trait (2 places) - Refactor redis_hashes.rs to use Batch trait (6 places) - Refactor redis_lists.rs to use Batch trait (7 places) - Refactor redis_sets.rs to use Batch trait (6 places) - Refactor redis_zsets.rs to use Batch trait (8 places) - Add explicit error handling for invalid column family index This abstraction allows seamless switching between direct RocksDB writes and Raft consensus-based writes in the future. * fix(storage): address review issues for batch abstraction PR - Fix zadd score equality check bug (comparing difference instead of just sm.score) - Fix lset to use batch layer instead of direct db.put_cf() - Fix zunionstore/zinterstore atomicity by merging delete and write into single batch commit - Improve EXPECTED_CF_COUNT by deriving from ColumnFamilyIndex::COUNT - Fix del_key prefix scan to use correct data key prefix format - Optimize flush_db with chunked deletion (1000 keys per batch) to avoid memory issues with large databases
1 parent 42cd43d commit d771a7f

File tree

10 files changed

+1288
-696
lines changed

10 files changed

+1288
-696
lines changed

src/raft/src/network.rs

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -382,24 +382,22 @@ impl MessageEnvelope {
382382
/// Add authentication to the message
383383
pub fn add_authentication(&mut self, auth: &NodeAuth) -> RaftResult<()> {
384384
// Create serializable data without HMAC for authentication
385+
// Use a simplified message representation for HMAC
386+
let msg_type = match &self.message {
387+
RaftMessage::AppendEntries(_) => "AppendEntries".to_string(),
388+
RaftMessage::AppendEntriesResponse(_) => "AppendEntriesResponse".to_string(),
389+
RaftMessage::Vote(_) => "Vote".to_string(),
390+
RaftMessage::VoteResponse(_) => "VoteResponse".to_string(),
391+
RaftMessage::InstallSnapshot(_) => "InstallSnapshot".to_string(),
392+
RaftMessage::InstallSnapshotResponse(_) => "InstallSnapshotResponse".to_string(),
393+
RaftMessage::Heartbeat { from, term } => format!("Heartbeat:{}:{}", from, term),
394+
RaftMessage::HeartbeatResponse { from, success } => {
395+
format!("HeartbeatResponse:{}:{}", from, success)
396+
}
397+
};
385398
let data_for_hmac = format!(
386399
"{}:{}:{}:{}:{}",
387-
self.message_id,
388-
self.from,
389-
self.to,
390-
self.timestamp,
391-
// Use a simplified message representation for HMAC
392-
match &self.message {
393-
RaftMessage::AppendEntries(_) => "AppendEntries",
394-
RaftMessage::AppendEntriesResponse(_) => "AppendEntriesResponse",
395-
RaftMessage::Vote(_) => "Vote",
396-
RaftMessage::VoteResponse(_) => "VoteResponse",
397-
RaftMessage::InstallSnapshot(_) => "InstallSnapshot",
398-
RaftMessage::InstallSnapshotResponse(_) => "InstallSnapshotResponse",
399-
RaftMessage::Heartbeat { from, term } => &format!("Heartbeat:{}:{}", from, term),
400-
RaftMessage::HeartbeatResponse { from, success } =>
401-
&format!("HeartbeatResponse:{}:{}", from, success),
402-
}
400+
self.message_id, self.from, self.to, self.timestamp, msg_type
403401
);
404402

405403
self.hmac = Some(auth.generate_hmac(data_for_hmac.as_bytes())?);
@@ -410,24 +408,21 @@ impl MessageEnvelope {
410408
pub fn verify_authentication(&self, auth: &NodeAuth) -> bool {
411409
if let Some(ref expected_hmac) = self.hmac {
412410
// Recreate the same data format used for HMAC generation
411+
let msg_type = match &self.message {
412+
RaftMessage::AppendEntries(_) => "AppendEntries".to_string(),
413+
RaftMessage::AppendEntriesResponse(_) => "AppendEntriesResponse".to_string(),
414+
RaftMessage::Vote(_) => "Vote".to_string(),
415+
RaftMessage::VoteResponse(_) => "VoteResponse".to_string(),
416+
RaftMessage::InstallSnapshot(_) => "InstallSnapshot".to_string(),
417+
RaftMessage::InstallSnapshotResponse(_) => "InstallSnapshotResponse".to_string(),
418+
RaftMessage::Heartbeat { from, term } => format!("Heartbeat:{}:{}", from, term),
419+
RaftMessage::HeartbeatResponse { from, success } => {
420+
format!("HeartbeatResponse:{}:{}", from, success)
421+
}
422+
};
413423
let data_for_hmac = format!(
414424
"{}:{}:{}:{}:{}",
415-
self.message_id,
416-
self.from,
417-
self.to,
418-
self.timestamp,
419-
match &self.message {
420-
RaftMessage::AppendEntries(_) => "AppendEntries",
421-
RaftMessage::AppendEntriesResponse(_) => "AppendEntriesResponse",
422-
RaftMessage::Vote(_) => "Vote",
423-
RaftMessage::VoteResponse(_) => "VoteResponse",
424-
RaftMessage::InstallSnapshot(_) => "InstallSnapshot",
425-
RaftMessage::InstallSnapshotResponse(_) => "InstallSnapshotResponse",
426-
RaftMessage::Heartbeat { from, term } =>
427-
&format!("Heartbeat:{}:{}", from, term),
428-
RaftMessage::HeartbeatResponse { from, success } =>
429-
&format!("HeartbeatResponse:{}:{}", from, success),
430-
}
425+
self.message_id, self.from, self.to, self.timestamp, msg_type
431426
);
432427

433428
auth.verify_hmac(data_for_hmac.as_bytes(), expected_hmac)

src/storage/src/batch.rs

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
// Copyright (c) 2024-present, arana-db Community. All rights reserved.
2+
//
3+
// Licensed to the Apache Software Foundation (ASF) under one or more
4+
// contributor license agreements. See the NOTICE file distributed with
5+
// this work for additional information regarding copyright ownership.
6+
// The ASF licenses this file to You under the Apache License, Version 2.0
7+
// (the "License"); you may not use this file except in compliance with
8+
// the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
18+
//! Batch abstraction for atomic write operations.
19+
//!
20+
//! This module provides a unified interface for batch operations that can work
21+
//! in both standalone and cluster (Raft) modes.
22+
//!
23+
//! # Design
24+
//!
25+
//! The batch system is designed with two implementations:
26+
//! - `RocksBatch`: For standalone mode, directly writes to RocksDB
27+
//! - `BinlogBatch`: For cluster mode, writes through Raft consensus (TODO)
28+
//!
29+
//! # Usage
30+
//!
31+
//! ```ignore
32+
//! let mut batch = redis.create_batch()?;
33+
//! batch.put(ColumnFamilyIndex::MetaCF, key, value)?;
34+
//! batch.delete(ColumnFamilyIndex::HashesDataCF, key)?;
35+
//! batch.commit()?;
36+
//! ```
37+
38+
use std::sync::Arc;
39+
40+
use rocksdb::{BoundColumnFamily, WriteBatch, WriteOptions};
41+
use snafu::ResultExt;
42+
43+
use crate::ColumnFamilyIndex;
44+
use crate::error::{BatchSnafu, Result, RocksSnafu};
45+
use engine::Engine;
46+
47+
/// Trait for batch write operations.
48+
///
49+
/// This trait abstracts the batch write mechanism to support both standalone
50+
/// (RocksDB direct write) and cluster (Raft consensus) modes.
51+
///
52+
/// # Error Handling
53+
///
54+
/// All operations return `Result<()>` to properly propagate errors instead of
55+
/// panicking. This is important for production stability in storage systems.
56+
pub trait Batch: Send {
57+
/// Add a put operation to the batch.
58+
///
59+
/// # Arguments
60+
/// * `cf_idx` - The column family index to write to
61+
/// * `key` - The key to write
62+
/// * `value` - The value to write
63+
///
64+
/// # Errors
65+
/// Returns an error if the column family index is invalid.
66+
fn put(&mut self, cf_idx: ColumnFamilyIndex, key: &[u8], value: &[u8]) -> Result<()>;
67+
68+
/// Add a delete operation to the batch.
69+
///
70+
/// # Arguments
71+
/// * `cf_idx` - The column family index to delete from
72+
/// * `key` - The key to delete
73+
///
74+
/// # Errors
75+
/// Returns an error if the column family index is invalid.
76+
fn delete(&mut self, cf_idx: ColumnFamilyIndex, key: &[u8]) -> Result<()>;
77+
78+
/// Commit all operations in the batch atomically.
79+
///
80+
/// # Returns
81+
/// * `Ok(())` - if all operations were committed successfully
82+
/// * `Err(_)` - if the commit failed
83+
fn commit(self: Box<Self>) -> Result<()>;
84+
85+
/// Get the number of operations in the batch.
86+
fn count(&self) -> u32;
87+
88+
/// Clear all operations from the batch.
89+
fn clear(&mut self);
90+
}
91+
92+
/// Type alias for column family handles used in batch operations.
93+
pub type CfHandles<'a> = Vec<Option<Arc<BoundColumnFamily<'a>>>>;
94+
95+
/// RocksDB batch implementation for standalone mode.
96+
///
97+
/// This implementation directly uses RocksDB's WriteBatch for atomic writes.
98+
pub struct RocksBatch<'a> {
99+
inner: WriteBatch,
100+
db: &'a dyn Engine,
101+
write_options: &'a WriteOptions,
102+
cf_handles: CfHandles<'a>,
103+
count: u32,
104+
}
105+
106+
impl<'a> RocksBatch<'a> {
107+
/// Create a new RocksBatch.
108+
///
109+
/// # Arguments
110+
/// * `db` - Reference to the database engine
111+
/// * `write_options` - Write options for the batch commit
112+
/// * `cf_handles` - Column family handles for all column families
113+
///
114+
/// # Panics
115+
/// Panics if cf_handles length doesn't match ColumnFamilyIndex::COUNT.
116+
/// This is a programming error that should be caught during development.
117+
pub fn new(
118+
db: &'a dyn Engine,
119+
write_options: &'a WriteOptions,
120+
cf_handles: CfHandles<'a>,
121+
) -> Self {
122+
// Validate cf_handles length matches expected column family count.
123+
// This catches mismatches between ColumnFamilyIndex enum and cf_handles vec
124+
// at batch creation time rather than during put/delete operations.
125+
assert_eq!(
126+
cf_handles.len(),
127+
ColumnFamilyIndex::COUNT,
128+
"cf_handles length ({}) must match ColumnFamilyIndex::COUNT ({}). \
129+
Update ColumnFamilyIndex::COUNT when adding new column families.",
130+
cf_handles.len(),
131+
ColumnFamilyIndex::COUNT
132+
);
133+
134+
Self {
135+
inner: WriteBatch::default(),
136+
db,
137+
write_options,
138+
cf_handles,
139+
count: 0,
140+
}
141+
}
142+
}
143+
144+
/// Convert ColumnFamilyIndex to its corresponding array index.
145+
///
146+
/// This function uses an explicit match to ensure compile-time safety.
147+
/// When a new ColumnFamilyIndex variant is added, the compiler will
148+
/// require this match to be updated.
149+
#[inline]
150+
fn cf_index_to_usize(cf_idx: ColumnFamilyIndex) -> usize {
151+
match cf_idx {
152+
ColumnFamilyIndex::MetaCF => 0,
153+
ColumnFamilyIndex::HashesDataCF => 1,
154+
ColumnFamilyIndex::SetsDataCF => 2,
155+
ColumnFamilyIndex::ListsDataCF => 3,
156+
ColumnFamilyIndex::ZsetsDataCF => 4,
157+
ColumnFamilyIndex::ZsetsScoreCF => 5,
158+
}
159+
}
160+
161+
/// Get the column family handle from the handles vector.
162+
///
163+
/// This function provides validated access to column family handles,
164+
/// ensuring the handle exists at the given index.
165+
///
166+
/// # Arguments
167+
/// * `cf_handles` - The vector of column family handles
168+
/// * `cf_idx` - The column family index to look up
169+
///
170+
/// # Returns
171+
/// A reference to the column family handle, or an error if invalid.
172+
fn get_cf_handle<'a>(
173+
cf_handles: &'a CfHandles<'a>,
174+
cf_idx: ColumnFamilyIndex,
175+
) -> Result<&'a Arc<BoundColumnFamily<'a>>> {
176+
let idx = cf_index_to_usize(cf_idx);
177+
178+
cf_handles
179+
.get(idx)
180+
.and_then(|opt| opt.as_ref())
181+
.ok_or_else(|| crate::error::Error::Batch {
182+
message: format!(
183+
"Column family handle is None for {:?} (index {}) - \
184+
this indicates a bug in initialization",
185+
cf_idx, idx
186+
),
187+
location: snafu::Location::new(file!(), line!(), column!()),
188+
})
189+
}
190+
191+
impl<'a> Batch for RocksBatch<'a> {
192+
fn put(&mut self, cf_idx: ColumnFamilyIndex, key: &[u8], value: &[u8]) -> Result<()> {
193+
let cf = get_cf_handle(&self.cf_handles, cf_idx)?;
194+
self.inner.put_cf(cf, key, value);
195+
self.count += 1;
196+
Ok(())
197+
}
198+
199+
fn delete(&mut self, cf_idx: ColumnFamilyIndex, key: &[u8]) -> Result<()> {
200+
let cf = get_cf_handle(&self.cf_handles, cf_idx)?;
201+
self.inner.delete_cf(cf, key);
202+
self.count += 1;
203+
Ok(())
204+
}
205+
206+
fn commit(self: Box<Self>) -> Result<()> {
207+
self.db
208+
.write_opt(self.inner, self.write_options)
209+
.context(RocksSnafu)
210+
}
211+
212+
fn count(&self) -> u32 {
213+
self.count
214+
}
215+
216+
fn clear(&mut self) {
217+
self.inner.clear();
218+
self.count = 0;
219+
}
220+
}
221+
222+
/// Binlog batch implementation for cluster (Raft) mode.
223+
///
224+
/// This implementation serializes operations to a binlog format and commits
225+
/// through the Raft consensus layer.
226+
///
227+
/// TODO: Implement when Raft integration is ready.
228+
#[allow(dead_code)]
229+
pub struct BinlogBatch {
230+
// TODO: Add binlog entries
231+
// entries: Vec<BinlogEntry>,
232+
// append_log_fn: AppendLogFunction,
233+
count: u32,
234+
}
235+
236+
#[allow(dead_code)]
237+
impl BinlogBatch {
238+
/// Create a new BinlogBatch.
239+
///
240+
/// # Arguments
241+
/// * `append_log_fn` - Function to append log entries to Raft
242+
pub fn new() -> Self {
243+
Self { count: 0 }
244+
}
245+
}
246+
247+
impl Default for BinlogBatch {
248+
fn default() -> Self {
249+
Self::new()
250+
}
251+
}
252+
253+
impl Batch for BinlogBatch {
254+
fn put(&mut self, _cf_idx: ColumnFamilyIndex, _key: &[u8], _value: &[u8]) -> Result<()> {
255+
// TODO: Implement when Raft integration is ready
256+
// Create binlog entry and add to entries
257+
self.count += 1;
258+
Ok(())
259+
}
260+
261+
fn delete(&mut self, _cf_idx: ColumnFamilyIndex, _key: &[u8]) -> Result<()> {
262+
// TODO: Implement when Raft integration is ready
263+
// Create binlog entry and add to entries
264+
self.count += 1;
265+
Ok(())
266+
}
267+
268+
fn commit(self: Box<Self>) -> Result<()> {
269+
// BinlogBatch commit is not yet implemented.
270+
// Return an error to prevent silent data loss.
271+
BatchSnafu {
272+
message: "BinlogBatch commit is not implemented - Raft integration pending".to_string(),
273+
}
274+
.fail()
275+
}
276+
277+
fn count(&self) -> u32 {
278+
self.count
279+
}
280+
281+
fn clear(&mut self) {
282+
// TODO: Clear entries
283+
self.count = 0;
284+
}
285+
}
286+
287+
#[cfg(test)]
288+
mod tests {
289+
use super::*;
290+
291+
#[test]
292+
fn test_binlog_batch_default() {
293+
let batch = BinlogBatch::default();
294+
assert_eq!(batch.count(), 0);
295+
}
296+
297+
#[test]
298+
fn test_binlog_batch_commit_returns_error() {
299+
let batch = BinlogBatch::default();
300+
let result = Box::new(batch).commit();
301+
assert!(result.is_err());
302+
}
303+
}

src/storage/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ mod slot_indexer;
3939
mod statistics;
4040
mod util;
4141

42+
mod batch;
4243
mod redis;
4344
mod storage_define;
4445
mod storage_impl;
@@ -65,6 +66,7 @@ mod redis_zsets;
6566

6667
pub use base_key_format::BaseMetaKey;
6768
pub use base_value_format::*;
69+
pub use batch::{Batch, BinlogBatch, RocksBatch};
6870
pub use cluster_storage::ClusterStorage;
6971
pub use error::Result;
7072
pub use expiration_manager::ExpirationManager;

0 commit comments

Comments
 (0)