Skip to content

Commit bb0a0d5

Browse files
committed
Add file-based persistence to the KV store
1 parent 0afc373 commit bb0a0d5

File tree

10 files changed

+224
-99
lines changed

10 files changed

+224
-99
lines changed

Cargo.lock

Lines changed: 21 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ tracing-subscriber = {version = "0.3.1", features = ["env-filter"]}
2727
tower = "0.4.13"
2828
pin-project = "1.1"
2929
querystring = "1.1"
30+
async-tempfile = "0.7.0"
3031

3132
[build-dependencies]
3233
tonic-build = "0.11"

src/keyvalue/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ pub mod grpc {
1515
pub(in crate::keyvalue) mod http;
1616
#[path = "generated/keyvalue_proto.rs"]
1717
pub(in crate::keyvalue) mod keyvalue_proto;
18+
pub(in crate::keyvalue) mod persistence;
1819
pub(in crate::keyvalue) mod service;
1920
pub(in crate::keyvalue) mod store;

src/keyvalue/persistence.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use crate::keyvalue::keyvalue_proto::Entry;
2+
use async_std::fs;
3+
use async_std::fs::{File, OpenOptions};
4+
use async_std::io::{BufReader, ReadExt};
5+
use async_std::path::Path;
6+
use async_trait::async_trait;
7+
use bytes::Bytes;
8+
use futures::AsyncWriteExt;
9+
use prost::Message;
10+
use tracing::info;
11+
12+
#[async_trait]
13+
pub trait Persistence {
14+
async fn add(&mut self, entry: &Entry);
15+
}
16+
17+
pub struct FilePersistence {
18+
directory: String,
19+
20+
log_path: String,
21+
log: File,
22+
}
23+
24+
impl FilePersistence {
25+
pub async fn create(directory: String) -> FilePersistence {
26+
let meta = fs::metadata(&directory).await.expect("exists");
27+
assert!(meta.is_dir());
28+
let wal_path = Path::new(directory.as_str()).join("wal");
29+
info!("WAL path: {}", &wal_path.display());
30+
let wal_file = OpenOptions::new()
31+
.write(true)
32+
.append(true)
33+
.create_new(true)
34+
.open(wal_path.clone())
35+
.await
36+
.expect("open wal file");
37+
FilePersistence {
38+
directory,
39+
log: wal_file,
40+
log_path: wal_path.to_str().unwrap().to_string(),
41+
}
42+
}
43+
44+
pub async fn read_wal(&self) -> u64 {
45+
let file = OpenOptions::new()
46+
.read(true)
47+
.open(self.log_path.clone())
48+
.await
49+
.expect("open wal file");
50+
51+
let mut reader = BufReader::new(file);
52+
let mut buf = Vec::new();
53+
reader.read_to_end(&mut buf).await.expect("read");
54+
55+
let mut entries = Vec::new();
56+
let mut slice = &buf[..];
57+
58+
while !slice.is_empty() {
59+
match Entry::decode_length_delimited(&mut slice) {
60+
Ok(entry) => entries.push(entry),
61+
Err(err) => {
62+
eprintln!("Failed to decode Entry: {}", err);
63+
break;
64+
}
65+
}
66+
}
67+
entries.len() as u64
68+
}
69+
}
70+
71+
#[async_trait]
72+
impl Persistence for FilePersistence {
73+
async fn add(&mut self, entry: &Entry) {
74+
let mut buf = vec![];
75+
entry.encode_length_delimited(&mut buf).expect("encode");
76+
self.log.write(&buf).await.expect("write");
77+
self.log.flush().await.expect("flush");
78+
}
79+
}
80+
81+
#[cfg(test)]
82+
mod tests {
83+
use super::*;
84+
85+
use async_tempfile::TempDir;
86+
#[tokio::test]
87+
async fn test_something() {
88+
let tmp_dir = TempDir::new().await.unwrap();
89+
let tmp_dir_path = tmp_dir.dir_path().to_str().unwrap().to_string();
90+
let mut persistence = FilePersistence::create(tmp_dir_path).await;
91+
92+
let k1 = Bytes::from("key1");
93+
let v1 = Bytes::from("value1");
94+
let k2 = Bytes::from("key2");
95+
let v2 = Bytes::from("value2");
96+
97+
persistence
98+
.add(&Entry {
99+
key: k1.to_vec(),
100+
value: v1.to_vec(),
101+
})
102+
.await;
103+
assert_eq!(persistence.read_wal().await, 1);
104+
105+
persistence
106+
.add(&Entry {
107+
key: k2.to_vec(),
108+
value: v2.to_vec(),
109+
})
110+
.await;
111+
assert_eq!(persistence.read_wal().await, 2);
112+
}
113+
}

src/keyvalue/service.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ mod tests {
160160
.lock()
161161
.await
162162
.apply(&Bytes::from(copy))
163+
.await
163164
.expect("bad payload");
164165
Ok(EntryId { term: 0, index: 0 })
165166
}
@@ -175,14 +176,15 @@ mod tests {
175176

176177
#[tokio::test]
177178
async fn test_get() {
178-
let service = create_service();
179+
let service = create_service().await;
179180
let store = service.store.clone();
180181
let server = TestRpcServer::run(KeyValueServer::new(service)).await;
181182

182183
store
183184
.lock()
184185
.await
185-
.set(Bytes::from("foo"), Bytes::from("bar"));
186+
.set(Bytes::from("foo"), Bytes::from("bar"))
187+
.await;
186188

187189
let request = GetRequest {
188190
key: "foo".as_bytes().to_vec(),
@@ -203,7 +205,7 @@ mod tests {
203205

204206
#[tokio::test]
205207
async fn test_put() {
206-
let service = create_service();
208+
let service = create_service().await;
207209
let store = service.store.clone();
208210
let server = TestRpcServer::run(KeyValueServer::new(service)).await;
209211

@@ -227,8 +229,8 @@ mod tests {
227229
}
228230

229231
// Returns an instance of the service struct we're testing
230-
fn create_service() -> KeyValueService {
231-
let store = Arc::new(Mutex::new(MapStore::new()));
232+
async fn create_service() -> KeyValueService {
233+
let store = Arc::new(Mutex::new(MapStore::create_in_memory().await));
232234
KeyValueService {
233235
name: "test".into(),
234236
store: store.clone(),

0 commit comments

Comments
 (0)