Skip to content

Commit 3e5c1f3

Browse files
committed
Add SqliteStore implementation
We upstream our `SqliteStore` implementation that allows persistence towards an SQLite database backend.
1 parent 00937c5 commit 3e5c1f3

File tree

4 files changed

+228
-0
lines changed

4 files changed

+228
-0
lines changed

bench/benches/bench.rs

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ criterion_group!(benches,
1616
lightning::sign::benches::bench_get_secure_random_bytes,
1717
lightning::ln::channelmanager::bench::bench_sends,
1818
lightning_storage::fs_store::bench::bench_sends,
19+
lightning_storage::sqlite_store::bench::bench_sends,
1920
lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file,
2021
lightning::routing::gossip::benches::read_network_graph,
2122
lightning::routing::gossip::benches::write_network_graph);

lightning-storage/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1717
lightning = { version = "0.0.116", path = "../lightning", default-features = false, features = ["std"] }
1818
rand = "0.8.5"
1919
libc = "0.2"
20+
rusqlite = { version = "0.28.0", features = ["bundled"] }
2021

2122
[dev-dependencies]
2223
bitcoin = { version = "0.29.0", default-features = false }

lightning-storage/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
99

1010
pub mod fs_store;
11+
pub mod sqlite_store;
1112

1213
#[cfg(test)]
1314
mod test_utils;

lightning-storage/src/sqlite_store.rs

+225
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
//! Objects related to [`SqliteStore`] live here.
2+
use lightning::util::persist::KVStore;
3+
4+
use rusqlite::{named_params, Connection};
5+
6+
use std::fs;
7+
use std::io::Cursor;
8+
use std::path::PathBuf;
9+
use std::sync::{Arc, Mutex};
10+
11+
/// The default database file name.
12+
pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
13+
14+
/// The default table in which we store all data.
15+
pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
16+
17+
// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
18+
const SCHEMA_USER_VERSION: u16 = 1;
19+
20+
/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
21+
///
22+
/// [SQLite]: https://sqlite.org
23+
pub struct SqliteStore {
24+
connection: Arc<Mutex<Connection>>,
25+
data_dir: PathBuf,
26+
kv_table_name: String,
27+
}
28+
29+
impl SqliteStore {
30+
/// Constructs a new [`SqliteStore`].
31+
///
32+
/// If not already existing, a new SQLite database will be created in the given `data_dir` under the
33+
/// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`).
34+
///
35+
/// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`].
36+
pub fn new(data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>) -> Self {
37+
let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string());
38+
let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string());
39+
40+
fs::create_dir_all(data_dir.clone()).unwrap_or_else(|_| {
41+
panic!("Failed to create database destination directory: {}", data_dir.display())
42+
});
43+
let mut db_file_path = data_dir.clone();
44+
db_file_path.push(db_file_name);
45+
46+
let connection = Connection::open(db_file_path.clone()).unwrap_or_else(|_| {
47+
panic!("Failed to open/create database file: {}", db_file_path.display())
48+
});
49+
50+
connection
51+
.pragma(Some(rusqlite::DatabaseName::Main), "user_version", SCHEMA_USER_VERSION, |_| {
52+
Ok(())
53+
})
54+
.unwrap_or_else(|_| panic!("Failed to set PRAGMA user_version"));
55+
56+
let sql = format!(
57+
"CREATE TABLE IF NOT EXISTS {} (
58+
namespace TEXT NOT NULL,
59+
key TEXT NOT NULL CHECK (key <> ''),
60+
value BLOB, PRIMARY KEY ( namespace, key )
61+
);",
62+
kv_table_name
63+
);
64+
connection
65+
.execute(&sql, [])
66+
.unwrap_or_else(|_| panic!("Failed to create table: {}", kv_table_name));
67+
68+
let connection = Arc::new(Mutex::new(connection));
69+
Self { connection, data_dir, kv_table_name }
70+
}
71+
72+
/// Returns the data directory.
73+
pub fn get_data_dir(&self) -> PathBuf {
74+
self.data_dir.clone()
75+
}
76+
}
77+
78+
impl KVStore for SqliteStore {
79+
type Reader = Cursor<Vec<u8>>;
80+
81+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
82+
let locked_conn = self.connection.lock().unwrap();
83+
let sql =
84+
format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", self.kv_table_name);
85+
86+
let res = locked_conn
87+
.query_row(
88+
&sql,
89+
named_params! {
90+
":namespace": namespace,
91+
":key": key,
92+
},
93+
|row| row.get(0),
94+
)
95+
.map_err(|e| match e {
96+
rusqlite::Error::QueryReturnedNoRows => {
97+
let msg =
98+
format!("Failed to read as key could not be found: {}/{}", namespace, key);
99+
std::io::Error::new(std::io::ErrorKind::NotFound, msg)
100+
}
101+
e => {
102+
let msg = format!("Failed to read from key {}/{}: {}", namespace, key, e);
103+
std::io::Error::new(std::io::ErrorKind::Other, msg)
104+
}
105+
})?;
106+
Ok(Cursor::new(res))
107+
}
108+
109+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
110+
let locked_conn = self.connection.lock().unwrap();
111+
112+
let sql = format!(
113+
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
114+
self.kv_table_name
115+
);
116+
117+
locked_conn
118+
.execute(
119+
&sql,
120+
named_params! {
121+
":namespace": namespace,
122+
":key": key,
123+
":value": buf,
124+
},
125+
)
126+
.map(|_| ())
127+
.map_err(|e| {
128+
let msg = format!("Failed to write to key {}/{}: {}", namespace, key, e);
129+
std::io::Error::new(std::io::ErrorKind::Other, msg)
130+
})
131+
}
132+
133+
fn remove(&self, namespace: &str, key: &str) -> lightning::io::Result<()> {
134+
let locked_conn = self.connection.lock().unwrap();
135+
136+
let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND key=:key;", self.kv_table_name);
137+
locked_conn
138+
.execute(
139+
&sql,
140+
named_params! {
141+
":namespace": namespace,
142+
":key": key,
143+
},
144+
)
145+
.map_err(|e| {
146+
let msg = format!("Failed to delete key {}/{}: {}", namespace, key, e);
147+
std::io::Error::new(std::io::ErrorKind::Other, msg)
148+
})?;
149+
Ok(())
150+
}
151+
152+
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
153+
let locked_conn = self.connection.lock().unwrap();
154+
155+
let sql = format!("SELECT key FROM {} WHERE namespace=:namespace", self.kv_table_name);
156+
let mut stmt = locked_conn.prepare(&sql).map_err(|e| {
157+
let msg = format!("Failed to prepare statement: {}", e);
158+
std::io::Error::new(std::io::ErrorKind::Other, msg)
159+
})?;
160+
161+
let mut keys = Vec::new();
162+
163+
let rows_iter = stmt
164+
.query_map(named_params! {":namespace": namespace, }, |row| row.get(0))
165+
.map_err(|e| {
166+
let msg = format!("Failed to retrieve queried rows: {}", e);
167+
std::io::Error::new(std::io::ErrorKind::Other, msg)
168+
})?;
169+
170+
for k in rows_iter {
171+
keys.push(k.map_err(|e| {
172+
let msg = format!("Failed to retrieve queried rows: {}", e);
173+
std::io::Error::new(std::io::ErrorKind::Other, msg)
174+
})?);
175+
}
176+
177+
Ok(keys)
178+
}
179+
}
180+
181+
#[cfg(test)]
182+
mod tests {
183+
use super::*;
184+
use crate::test_utils::{do_read_write_remove_list_persist,do_test_store};
185+
186+
impl Drop for SqliteStore {
187+
fn drop(&mut self) {
188+
match fs::remove_dir_all(&self.data_dir) {
189+
Err(e) => println!("Failed to remove test store directory: {}", e),
190+
_ => {}
191+
}
192+
}
193+
}
194+
195+
#[test]
196+
fn read_write_remove_list_persist() {
197+
let mut temp_path = std::env::temp_dir();
198+
temp_path.push("read_write_remove_list_persist");
199+
let store = SqliteStore::new(temp_path, Some("test_db".to_string()), Some("test_table".to_string()));
200+
do_read_write_remove_list_persist(&store);
201+
}
202+
203+
#[test]
204+
fn test_sqlite_store() {
205+
let mut temp_path = std::env::temp_dir();
206+
temp_path.push("test_sqlite_store");
207+
let store_0 = SqliteStore::new(temp_path.clone(), Some("test_db_0".to_string()), Some("test_table".to_string()));
208+
let store_1 = SqliteStore::new(temp_path, Some("test_db_1".to_string()), Some("test_table".to_string()));
209+
do_test_store(&store_0, &store_1)
210+
}
211+
}
212+
213+
#[cfg(ldk_bench)]
214+
/// Benches
215+
pub mod bench {
216+
use criterion::Criterion;
217+
218+
/// Bench!
219+
pub fn bench_sends(bench: &mut Criterion) {
220+
let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None);
221+
let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None);
222+
lightning::ln::channelmanager::bench::bench_two_sends(
223+
bench, "bench_sqlite_persisted_sends", store_a, store_b);
224+
}
225+
}

0 commit comments

Comments
 (0)