diff --git a/bench/Cargo.toml b/bench/Cargo.toml
index e582d29da81..f9208e1cc86 100644
--- a/bench/Cargo.toml
+++ b/bench/Cargo.toml
@@ -13,7 +13,7 @@ hashbrown = ["lightning/hashbrown"]
[dependencies]
lightning = { path = "../lightning", features = ["_test_utils", "criterion"] }
-lightning-persister = { path = "../lightning-persister", features = ["criterion"] }
+lightning-persister = { path = "../lightning-persister", features = ["criterion", "sqlite-bundled"] }
lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync", features = ["criterion"] }
criterion = { version = "0.4", default-features = false }
diff --git a/bench/benches/bench.rs b/bench/benches/bench.rs
index eaa3fcec50c..72098f6a066 100644
--- a/bench/benches/bench.rs
+++ b/bench/benches/bench.rs
@@ -19,6 +19,7 @@ criterion_group!(benches,
lightning::sign::benches::bench_get_secure_random_bytes,
lightning::ln::channelmanager::bench::bench_sends,
lightning_persister::fs_store::bench::bench_sends,
+ lightning_persister::sqlite_store::bench::bench_sends,
lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file,
lightning::routing::gossip::benches::read_network_graph,
lightning::routing::gossip::benches::write_network_graph);
diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml
index 43d97ebbe12..fcb60d61456 100644
--- a/lightning-persister/Cargo.toml
+++ b/lightning-persister/Cargo.toml
@@ -13,9 +13,14 @@ edition = "2018"
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
+[features]
+sqlite = ["rusqlite"]
+sqlite-bundled = ["sqlite", "rusqlite/bundled"]
+
[dependencies]
bitcoin = "0.29.0"
lightning = { version = "0.0.117-alpha2", path = "../lightning" }
+rusqlite = { version = "0.28.0", optional = true, default-features = false}
[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
@@ -26,3 +31,5 @@ criterion = { version = "0.4", optional = true, default-features = false }
[dev-dependencies]
lightning = { version = "0.0.117-alpha2", path = "../lightning", features = ["_test_utils"] }
bitcoin = { version = "0.29.0", default-features = false }
+rusqlite = { version = "0.28.0", default-features = false, features = ["bundled"]}
+rand = "0.8.5"
diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs
index ae258e137d7..d7aa1dad4eb 100644
--- a/lightning-persister/src/lib.rs
+++ b/lightning-persister/src/lib.rs
@@ -12,6 +12,9 @@
pub mod fs_store;
+#[cfg(any(test, feature = "sqlite"))]
+pub mod sqlite_store;
+
mod utils;
#[cfg(test)]
diff --git a/lightning-persister/src/sqlite_store/migrations.rs b/lightning-persister/src/sqlite_store/migrations.rs
new file mode 100644
index 00000000000..56fe7420740
--- /dev/null
+++ b/lightning-persister/src/sqlite_store/migrations.rs
@@ -0,0 +1,164 @@
+use rusqlite::Connection;
+
+use lightning::io;
+
+pub(super) fn migrate_schema(
+ connection: &mut Connection, kv_table_name: &str, from_version: u16, to_version: u16,
+) -> io::Result<()> {
+ assert!(from_version < to_version);
+ if from_version == 1 && to_version == 2 {
+ let tx = connection.transaction().map_err(|e| {
+ let msg = format!(
+ "Failed to migrate table {} from user_version {} to {}: {}",
+ kv_table_name, from_version, to_version, e
+ );
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+
+ // Rename 'namespace' column to 'primary_namespace'
+ let sql = format!(
+ "ALTER TABLE {}
+ RENAME COLUMN namespace TO primary_namespace;",
+ kv_table_name
+ );
+
+ tx.execute(&sql, []).map_err(|e| {
+ let msg = format!(
+ "Failed to migrate table {} from user_version {} to {}: {}",
+ kv_table_name, from_version, to_version, e
+ );
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+
+ // Add new 'secondary_namespace' column
+ let sql = format!(
+ "ALTER TABLE {}
+ ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;",
+ kv_table_name
+ );
+
+ tx.execute(&sql, []).map_err(|e| {
+ let msg = format!(
+ "Failed to migrate table {} from user_version {} to {}: {}",
+ kv_table_name, from_version, to_version, e
+ );
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+
+ // Update user_version
+ tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", to_version, |_| Ok(()))
+ .map_err(|e| {
+ let msg = format!(
+ "Failed to upgrade user_version from {} to {}: {}",
+ from_version, to_version, e
+ );
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+
+ tx.commit().map_err(|e| {
+ let msg = format!(
+ "Failed to migrate table {} from user_version {} to {}: {}",
+ kv_table_name, from_version, to_version, e
+ );
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+ }
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::sqlite_store::SqliteStore;
+ use crate::test_utils::{do_read_write_remove_list_persist, random_storage_path};
+
+ use lightning::util::persist::KVStore;
+
+ use rusqlite::{named_params, Connection};
+
+ use std::fs;
+
+ #[test]
+ fn rwrl_post_schema_1_migration() {
+ let old_schema_version = 1;
+
+ let mut temp_path = random_storage_path();
+ temp_path.push("rwrl_post_schema_1_migration");
+
+ let db_file_name = "test_db".to_string();
+ let kv_table_name = "test_table".to_string();
+
+ let test_namespace = "testspace".to_string();
+ let test_key = "testkey".to_string();
+ let test_data = [42u8; 32];
+
+ {
+ // We create a database with a SCHEMA_VERSION 1 table
+ fs::create_dir_all(temp_path.clone()).unwrap();
+ let mut db_file_path = temp_path.clone();
+ db_file_path.push(db_file_name.clone());
+
+ let connection = Connection::open(db_file_path.clone()).unwrap();
+
+ connection
+ .pragma(
+ Some(rusqlite::DatabaseName::Main),
+ "user_version",
+ old_schema_version,
+ |_| Ok(()),
+ )
+ .unwrap();
+
+ let sql = format!(
+ "CREATE TABLE IF NOT EXISTS {} (
+ namespace TEXT NOT NULL,
+ key TEXT NOT NULL CHECK (key <> ''),
+ value BLOB, PRIMARY KEY ( namespace, key )
+ );",
+ kv_table_name
+ );
+
+ connection.execute(&sql, []).unwrap();
+
+ // We write some data to to the table
+ let sql = format!(
+ "INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
+ kv_table_name
+ );
+ let mut stmt = connection.prepare_cached(&sql).unwrap();
+
+ stmt.execute(named_params! {
+ ":namespace": test_namespace,
+ ":key": test_key,
+ ":value": test_data,
+ })
+ .unwrap();
+
+ // We read the just written data back to assert it happened.
+ let sql = format!(
+ "SELECT value FROM {} WHERE namespace=:namespace AND key=:key;",
+ kv_table_name
+ );
+ let mut stmt = connection.prepare_cached(&sql).unwrap();
+
+ let res: Vec = stmt
+ .query_row(
+ named_params! {
+ ":namespace": test_namespace,
+ ":key": test_key,
+ },
+ |row| row.get(0),
+ )
+ .unwrap();
+
+ assert_eq!(res, test_data);
+ }
+
+ // Check we migrate the db just fine without losing our written data.
+ let store = SqliteStore::new(temp_path, Some(db_file_name), Some(kv_table_name)).unwrap();
+ let res = store.read(&test_namespace, "", &test_key).unwrap();
+ assert_eq!(res, test_data);
+
+ // Check we can continue to use the store just fine.
+ do_read_write_remove_list_persist(&store);
+ }
+}
diff --git a/lightning-persister/src/sqlite_store/mod.rs b/lightning-persister/src/sqlite_store/mod.rs
new file mode 100644
index 00000000000..3b353a9dd05
--- /dev/null
+++ b/lightning-persister/src/sqlite_store/mod.rs
@@ -0,0 +1,342 @@
+//! Objects related to [`SqliteStore`] live here.
+use crate::utils::check_namespace_key_validity;
+
+use lightning::io;
+use lightning::util::persist::KVStore;
+use lightning::util::string::PrintableString;
+
+use rusqlite::{named_params, Connection};
+
+use std::fs;
+use std::path::PathBuf;
+use std::sync::{Arc, Mutex};
+
+mod migrations;
+
+/// The default database file name.
+pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
+
+/// The default table in which we store all data.
+pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
+
+// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
+const SCHEMA_USER_VERSION: u16 = 2;
+
+/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
+///
+/// [SQLite]: https://sqlite.org
+pub struct SqliteStore {
+ connection: Arc>,
+ data_dir: PathBuf,
+ kv_table_name: String,
+}
+
+impl SqliteStore {
+ /// Constructs a new [`SqliteStore`].
+ ///
+ /// If not already existing, a new SQLite database will be created in the given `data_dir` under the
+ /// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`).
+ ///
+ /// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`].
+ pub fn new(
+ data_dir: PathBuf, db_file_name: Option, kv_table_name: Option,
+ ) -> io::Result {
+ let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string());
+ let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string());
+
+ fs::create_dir_all(data_dir.clone()).map_err(|e| {
+ let msg = format!(
+ "Failed to create database destination directory {}: {}",
+ data_dir.display(),
+ e
+ );
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+ let mut db_file_path = data_dir.clone();
+ db_file_path.push(db_file_name);
+
+ let mut connection = Connection::open(db_file_path.clone()).map_err(|e| {
+ let msg =
+ format!("Failed to open/create database file {}: {}", db_file_path.display(), e);
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+
+ let sql = format!("SELECT user_version FROM pragma_user_version");
+ let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap();
+
+ if version_res == 0 {
+ // New database, set our SCHEMA_USER_VERSION and continue
+ connection
+ .pragma(
+ Some(rusqlite::DatabaseName::Main),
+ "user_version",
+ SCHEMA_USER_VERSION,
+ |_| Ok(()),
+ )
+ .map_err(|e| {
+ let msg = format!("Failed to set PRAGMA user_version: {}", e);
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+ } else if version_res < SCHEMA_USER_VERSION {
+ migrations::migrate_schema(
+ &mut connection,
+ &kv_table_name,
+ version_res,
+ SCHEMA_USER_VERSION,
+ )?;
+ } else if version_res > SCHEMA_USER_VERSION {
+ let msg = format!(
+ "Failed to open database: incompatible schema version {}. Expected: {}",
+ version_res, SCHEMA_USER_VERSION
+ );
+ return Err(io::Error::new(io::ErrorKind::Other, msg));
+ }
+
+ let sql = format!(
+ "CREATE TABLE IF NOT EXISTS {} (
+ primary_namespace TEXT NOT NULL,
+ secondary_namespace TEXT DEFAULT \"\" NOT NULL,
+ key TEXT NOT NULL CHECK (key <> ''),
+ value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
+ );",
+ kv_table_name
+ );
+
+ connection.execute(&sql, []).map_err(|e| {
+ let msg = format!("Failed to create table {}: {}", kv_table_name, e);
+ io::Error::new(io::ErrorKind::Other, msg)
+ })?;
+
+ let connection = Arc::new(Mutex::new(connection));
+ Ok(Self { connection, data_dir, kv_table_name })
+ }
+
+ /// Returns the data directory.
+ pub fn get_data_dir(&self) -> PathBuf {
+ self.data_dir.clone()
+ }
+}
+
+impl KVStore for SqliteStore {
+ fn read(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
+ ) -> std::io::Result> {
+ check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
+
+ let locked_conn = self.connection.lock().unwrap();
+ let sql =
+ format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;",
+ self.kv_table_name);
+
+ let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
+ let msg = format!("Failed to prepare statement: {}", e);
+ std::io::Error::new(std::io::ErrorKind::Other, msg)
+ })?;
+
+ let res = stmt
+ .query_row(
+ named_params! {
+ ":primary_namespace": primary_namespace,
+ ":secondary_namespace": secondary_namespace,
+ ":key": key,
+ },
+ |row| row.get(0),
+ )
+ .map_err(|e| match e {
+ rusqlite::Error::QueryReturnedNoRows => {
+ let msg = format!(
+ "Failed to read as key could not be found: {}/{}/{}",
+ PrintableString(primary_namespace),
+ PrintableString(secondary_namespace),
+ PrintableString(key)
+ );
+ std::io::Error::new(std::io::ErrorKind::NotFound, msg)
+ }
+ e => {
+ let msg = format!(
+ "Failed to read from key {}/{}/{}: {}",
+ PrintableString(primary_namespace),
+ PrintableString(secondary_namespace),
+ PrintableString(key),
+ e
+ );
+ std::io::Error::new(std::io::ErrorKind::Other, msg)
+ }
+ })?;
+ Ok(res)
+ }
+
+ fn write(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
+ ) -> std::io::Result<()> {
+ check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
+
+ let locked_conn = self.connection.lock().unwrap();
+
+ let sql = format!(
+ "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);",
+ self.kv_table_name
+ );
+
+ let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
+ let msg = format!("Failed to prepare statement: {}", e);
+ std::io::Error::new(std::io::ErrorKind::Other, msg)
+ })?;
+
+ stmt.execute(named_params! {
+ ":primary_namespace": primary_namespace,
+ ":secondary_namespace": secondary_namespace,
+ ":key": key,
+ ":value": buf,
+ })
+ .map(|_| ())
+ .map_err(|e| {
+ let msg = format!(
+ "Failed to write to key {}/{}/{}: {}",
+ PrintableString(primary_namespace),
+ PrintableString(secondary_namespace),
+ PrintableString(key),
+ e
+ );
+ std::io::Error::new(std::io::ErrorKind::Other, msg)
+ })
+ }
+
+ fn remove(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
+ ) -> std::io::Result<()> {
+ check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
+
+ let locked_conn = self.connection.lock().unwrap();
+
+ let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name);
+
+ let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
+ let msg = format!("Failed to prepare statement: {}", e);
+ std::io::Error::new(std::io::ErrorKind::Other, msg)
+ })?;
+
+ stmt.execute(named_params! {
+ ":primary_namespace": primary_namespace,
+ ":secondary_namespace": secondary_namespace,
+ ":key": key,
+ })
+ .map_err(|e| {
+ let msg = format!(
+ "Failed to delete key {}/{}/{}: {}",
+ PrintableString(primary_namespace),
+ PrintableString(secondary_namespace),
+ PrintableString(key),
+ e
+ );
+ std::io::Error::new(std::io::ErrorKind::Other, msg)
+ })?;
+ Ok(())
+ }
+
+ fn list(
+ &self, primary_namespace: &str, secondary_namespace: &str,
+ ) -> std::io::Result> {
+ check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
+
+ let locked_conn = self.connection.lock().unwrap();
+
+ let sql = format!(
+ "SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace",
+ self.kv_table_name
+ );
+ let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
+ let msg = format!("Failed to prepare statement: {}", e);
+ std::io::Error::new(std::io::ErrorKind::Other, msg)
+ })?;
+
+ let mut keys = Vec::new();
+
+ let rows_iter = stmt
+ .query_map(
+ named_params! {
+ ":primary_namespace": primary_namespace,
+ ":secondary_namespace": secondary_namespace,
+ },
+ |row| row.get(0),
+ )
+ .map_err(|e| {
+ let msg = format!("Failed to retrieve queried rows: {}", e);
+ std::io::Error::new(std::io::ErrorKind::Other, msg)
+ })?;
+
+ for k in rows_iter {
+ keys.push(k.map_err(|e| {
+ let msg = format!("Failed to retrieve queried rows: {}", e);
+ std::io::Error::new(std::io::ErrorKind::Other, msg)
+ })?);
+ }
+
+ Ok(keys)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::test_utils::{do_read_write_remove_list_persist, do_test_store, random_storage_path};
+
+ impl Drop for SqliteStore {
+ fn drop(&mut self) {
+ match fs::remove_dir_all(&self.data_dir) {
+ Err(e) => println!("Failed to remove test store directory: {}", e),
+ _ => {}
+ }
+ }
+ }
+
+ #[test]
+ fn read_write_remove_list_persist() {
+ let mut temp_path = random_storage_path();
+ temp_path.push("read_write_remove_list_persist");
+ let store = SqliteStore::new(
+ temp_path,
+ Some("test_db".to_string()),
+ Some("test_table".to_string()),
+ )
+ .unwrap();
+ do_read_write_remove_list_persist(&store);
+ }
+
+ #[test]
+ fn test_sqlite_store() {
+ let mut temp_path = random_storage_path();
+ temp_path.push("test_sqlite_store");
+ let store_0 = SqliteStore::new(
+ temp_path.clone(),
+ Some("test_db_0".to_string()),
+ Some("test_table".to_string()),
+ )
+ .unwrap();
+ let store_1 = SqliteStore::new(
+ temp_path,
+ Some("test_db_1".to_string()),
+ Some("test_table".to_string()),
+ )
+ .unwrap();
+ do_test_store(&store_0, &store_1)
+ }
+}
+
+#[cfg(ldk_bench)]
+/// Benches
+pub mod bench {
+ use criterion::Criterion;
+
+ /// Bench!
+ pub fn bench_sends(bench: &mut Criterion) {
+ let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None).unwrap();
+ let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None).unwrap();
+ lightning::ln::channelmanager::bench::bench_two_sends(
+ bench,
+ "bench_sqlite_persisted_sends",
+ store_a,
+ store_b,
+ );
+ }
+}
diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs
index 360fa3492bf..89cd7e7f189 100644
--- a/lightning-persister/src/test_utils.rs
+++ b/lightning-persister/src/test_utils.rs
@@ -1,13 +1,30 @@
+use crate::fs_store::FilesystemStore;
+use crate::sqlite_store::SqliteStore;
+
use lightning::util::persist::{KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN, read_channel_monitors};
use lightning::ln::functional_test_utils::{connect_block, create_announced_chan_between_nodes,
create_chanmon_cfgs, create_dummy_block, create_network, create_node_cfgs, create_node_chanmgrs,
send_payment};
+
use lightning::chain::channelmonitor::CLOSED_CHANNEL_UPDATE_ID;
-use lightning::util::test_utils;
+use lightning::util::test_utils::{self, TestStore};
use lightning::{check_closed_broadcast, check_closed_event, check_added_monitors};
use lightning::events::ClosureReason;
+use rand::distributions::Alphanumeric;
+use rand::{thread_rng, Rng};
+
use std::panic::RefUnwindSafe;
+use std::path::PathBuf;
+use std::sync::RwLock;
+
+pub fn random_storage_path() -> PathBuf {
+ let mut temp_path = std::env::temp_dir();
+ let mut rng = thread_rng();
+ let rand_dir: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
+ temp_path.push(rand_dir);
+ temp_path
+}
pub(crate) fn do_read_write_remove_list_persist(kv_store: &K) {
let data = [42u8; 32];
@@ -120,3 +137,148 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) {
// Make sure everything is persisted as expected after close.
check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID);
}
+
+// A `KVStore` impl for testing purposes that wraps all our `KVStore`s and asserts their synchronicity.
+pub(crate) struct TestSyncStore {
+ serializer: RwLock<()>,
+ test_store: TestStore,
+ fs_store: FilesystemStore,
+ sqlite_store: SqliteStore,
+}
+
+impl TestSyncStore {
+ pub(crate) fn new(dest_dir: PathBuf) -> Self {
+ let serializer = RwLock::new(());
+ let mut fs_dir = dest_dir.clone();
+ fs_dir.push("fs_store");
+ let fs_store = FilesystemStore::new(fs_dir);
+ let mut sql_dir = dest_dir.clone();
+ sql_dir.push("sqlite_store");
+ let sqlite_store = SqliteStore::new(
+ sql_dir,
+ Some("test_sync_db".to_string()),
+ Some("test_sync_table".to_string()),
+ )
+ .unwrap();
+ let test_store = TestStore::new(false);
+ Self { serializer, fs_store, sqlite_store, test_store }
+ }
+
+ fn do_list(
+ &self, primary_namespace: &str, secondary_namespace: &str,
+ ) -> std::io::Result> {
+ let fs_res = self.fs_store.list(primary_namespace, secondary_namespace);
+ let sqlite_res = self.sqlite_store.list(primary_namespace, secondary_namespace);
+ let test_res = self.test_store.list(primary_namespace, secondary_namespace);
+
+ match fs_res {
+ Ok(mut list) => {
+ list.sort();
+
+ let mut sqlite_list = sqlite_res.unwrap();
+ sqlite_list.sort();
+ assert_eq!(list, sqlite_list);
+
+ let mut test_list = test_res.unwrap();
+ test_list.sort();
+ assert_eq!(list, test_list);
+
+ Ok(list)
+ }
+ Err(e) => {
+ assert!(sqlite_res.is_err());
+ assert!(test_res.is_err());
+ Err(e)
+ }
+ }
+ }
+}
+
+impl KVStore for TestSyncStore {
+ fn read(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
+ ) -> std::io::Result> {
+ let _guard = self.serializer.read().unwrap();
+
+ let fs_res = self.fs_store.read(primary_namespace, secondary_namespace, key);
+ let sqlite_res = self.sqlite_store.read(primary_namespace, secondary_namespace, key);
+ let test_res = self.test_store.read(primary_namespace, secondary_namespace, key);
+
+ match fs_res {
+ Ok(read) => {
+ assert_eq!(read, sqlite_res.unwrap());
+ assert_eq!(read, test_res.unwrap());
+ Ok(read)
+ }
+ Err(e) => {
+ assert!(sqlite_res.is_err());
+ assert_eq!(e.kind(), unsafe { sqlite_res.unwrap_err_unchecked().kind() });
+ assert!(test_res.is_err());
+ assert_eq!(e.kind(), unsafe { test_res.unwrap_err_unchecked().kind() });
+ Err(e)
+ }
+ }
+ }
+
+ fn write(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
+ ) -> std::io::Result<()> {
+ let _guard = self.serializer.write().unwrap();
+ let fs_res = self.fs_store.write(primary_namespace, secondary_namespace, key, buf);
+ let sqlite_res = self.sqlite_store.write(primary_namespace, secondary_namespace, key, buf);
+ let test_res = self.test_store.write(primary_namespace, secondary_namespace, key, buf);
+
+ assert!(self
+ .do_list(primary_namespace, secondary_namespace)
+ .unwrap()
+ .contains(&key.to_string()));
+
+ match fs_res {
+ Ok(()) => {
+ assert!(sqlite_res.is_ok());
+ assert!(test_res.is_ok());
+ Ok(())
+ }
+ Err(e) => {
+ assert!(sqlite_res.is_err());
+ assert!(test_res.is_err());
+ Err(e)
+ }
+ }
+ }
+
+ fn remove(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
+ ) -> std::io::Result<()> {
+ let _guard = self.serializer.write().unwrap();
+ let fs_res = self.fs_store.remove(primary_namespace, secondary_namespace, key, lazy);
+ let sqlite_res =
+ self.sqlite_store.remove(primary_namespace, secondary_namespace, key, lazy);
+ let test_res = self.test_store.remove(primary_namespace, secondary_namespace, key, lazy);
+
+ assert!(!self
+ .do_list(primary_namespace, secondary_namespace)
+ .unwrap()
+ .contains(&key.to_string()));
+
+ match fs_res {
+ Ok(()) => {
+ assert!(sqlite_res.is_ok());
+ assert!(test_res.is_ok());
+ Ok(())
+ }
+ Err(e) => {
+ assert!(sqlite_res.is_err());
+ assert!(test_res.is_err());
+ Err(e)
+ }
+ }
+ }
+
+ fn list(
+ &self, primary_namespace: &str, secondary_namespace: &str,
+ ) -> std::io::Result> {
+ let _guard = self.serializer.read().unwrap();
+ self.do_list(primary_namespace, secondary_namespace)
+ }
+}