Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions crates/fiber-lib/src/rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ You may refer to the e2e test cases in the `tests/bruno/e2e` directory for examp
* [Method `graph_channels`](#graph-graph_channels)
* [Module Info](#module-info)
* [Method `node_info`](#info-node_info)
* [Method `backup_now`](#info-backup_now)
* [Module Invoice](#module-invoice)
* [Method `new_invoice`](#invoice-new_invoice)
* [Method `parse_invoice`](#invoice-parse_invoice)
Expand Down Expand Up @@ -528,6 +529,24 @@ Get the node information.



<a id="info-backup_now"></a>
#### Method `backup_now`

Backup the node database and key files to a specified path.

##### Params

* `String` - <em>String</em>,

##### Returns

* `path` - <em>`String`</em>, The path of backup file
* `timestamp` - <em>`u64`</em>, The timestamp of backup

---



<a id="invoice"></a>
### Module `Invoice`
RPC module for invoice management.
Expand Down
220 changes: 213 additions & 7 deletions crates/fiber-lib/src/rpc/info.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
#[cfg(not(target_arch = "wasm32"))]
use std::path::{Path, PathBuf};

use super::graph::UdtCfgInfos;
use crate::ckb::CkbConfig;
use crate::fiber::serde_utils::U32Hex;
use crate::fiber::{
serde_utils::{U128Hex, U64Hex},
types::{Hash256, Pubkey},
NetworkActorCommand, NetworkActorMessage,
FiberConfig, NetworkActorCommand, NetworkActorMessage,
};
#[cfg(not(target_arch = "wasm32"))]
use crate::now_timestamp_as_millis_u64;
#[cfg(not(target_arch = "wasm32"))]
use crate::rpc::server::RpcServerStore;
#[cfg(not(target_arch = "wasm32"))]
use crate::store::store_impl::KVStore;
use crate::{handle_actor_call, log_and_error};
use ckb_jsonrpc_types::Script;
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -14,6 +23,8 @@ use jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE;
use jsonrpsee::types::ErrorObjectOwned;

use ractor::{call, ActorRef};
#[cfg(not(target_arch = "wasm32"))]
use rocksdb::checkpoint::Checkpoint;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tentacle::multiaddr::MultiAddr;
Expand Down Expand Up @@ -81,16 +92,52 @@ pub struct NodeInfoResult {
pub udt_cfg_infos: UdtCfgInfos,
}

pub struct InfoRpcServerImpl {
/// The result of a backup operation.
#[serde_as]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct BackupResult {
/// The path of backup file
path: String,
/// The timestamp of backup
#[serde_as(as = "U64Hex")]
timestamp: u64,
}

pub struct InfoRpcServerImpl<S> {
actor: ActorRef<NetworkActorMessage>,

default_funding_lock_script: Script,

#[cfg(not(target_arch = "wasm32"))]
store: S,
#[cfg(not(target_arch = "wasm32"))]
fiber_key_path: PathBuf,
#[cfg(not(target_arch = "wasm32"))]
ckb_key_path: PathBuf,

#[cfg(target_arch = "wasm32")]
_marker: std::marker::PhantomData<S>,
}

impl InfoRpcServerImpl {
#[cfg(not(target_arch = "wasm32"))]
pub trait StoreInfo: RpcServerStore + KVStore + Clone + Send + Sync + 'static {}
#[cfg(not(target_arch = "wasm32"))]
impl<T> StoreInfo for T where T: RpcServerStore + KVStore + Clone + Send + Sync + 'static {}
#[cfg(target_arch = "wasm32")]
pub trait StoreInfo: Clone + Send + Sync + 'static {}
#[cfg(target_arch = "wasm32")]
impl<T> StoreInfo for T where T: Clone + Send + Sync + 'static {}

impl<S: StoreInfo> InfoRpcServerImpl<S> {
#[allow(unused_variables)]
pub fn new(actor: ActorRef<NetworkActorMessage>, config: CkbConfig) -> Self {
pub fn new(
actor: ActorRef<NetworkActorMessage>,
store: S,
ckb_config: CkbConfig,
fiber_config: Option<FiberConfig>,
) -> Self {
#[cfg(not(test))]
let default_funding_lock_script = config
let default_funding_lock_script = ckb_config
.get_default_funding_lock_script()
.expect("get default funding lock script should be ok")
.into();
Expand All @@ -100,9 +147,21 @@ impl InfoRpcServerImpl {
#[cfg(test)]
let default_funding_lock_script = Default::default();

#[cfg(not(target_arch = "wasm32"))]
let fiber_config = fiber_config.expect("fiber config should be set");

InfoRpcServerImpl {
actor,
default_funding_lock_script,

#[cfg(not(target_arch = "wasm32"))]
store,
#[cfg(not(target_arch = "wasm32"))]
ckb_key_path: ckb_config.base_dir().join("key"),
#[cfg(not(target_arch = "wasm32"))]
fiber_key_path: fiber_config.base_dir().join("sk"),
#[cfg(target_arch = "wasm32")]
_marker: std::marker::PhantomData,
}
}
}
Expand All @@ -114,16 +173,25 @@ trait InfoRpc {
/// Get the node information.
#[method(name = "node_info")]
async fn node_info(&self) -> Result<NodeInfoResult, ErrorObjectOwned>;

/// Backup the node database and key files to a specified path.
#[method(name = "backup_now")]
async fn backup_now(&self, path: String) -> Result<BackupResult, ErrorObjectOwned>;
}

#[async_trait::async_trait]
#[cfg(not(target_arch = "wasm32"))]
impl InfoRpcServer for InfoRpcServerImpl {
impl<S: StoreInfo> InfoRpcServer for InfoRpcServerImpl<S> {
async fn node_info(&self) -> Result<NodeInfoResult, ErrorObjectOwned> {
self.node_info().await
}

async fn backup_now(&self, path: String) -> Result<BackupResult, ErrorObjectOwned> {
self.backup_now(path).await
}
}
impl InfoRpcServerImpl {

impl<S: StoreInfo> InfoRpcServerImpl<S> {
pub async fn node_info(&self) -> Result<NodeInfoResult, ErrorObjectOwned> {
let version = env!("CARGO_PKG_VERSION").to_string();
let commit_hash = crate::get_git_commit_info();
Expand Down Expand Up @@ -152,4 +220,142 @@ impl InfoRpcServerImpl {
udt_cfg_infos: response.udt_cfg_infos.into(),
})
}

#[cfg(not(target_arch = "wasm32"))]
async fn backup_now(&self, path: String) -> Result<BackupResult, ErrorObjectOwned> {
let target_dir = PathBuf::from(&path);

// Prevent overwriting existing data
if target_dir.exists() {
return log_and_error!(path, "Backup directory already exists".to_string());
}

if let Err(e) = std::fs::create_dir_all(&target_dir) {
return log_and_error!(path, format!("Failed to create backup directory: {}", e));
}
tracing::info!("Starting node backup to: {:?}", target_dir);

let db_backup_path = target_dir.join("db");
let checkpoint = match Checkpoint::new(self.store.inner_db()) {
Ok(c) => c,
Err(e) => return log_and_error!(path, format!("RocksDB checkpoint init error: {}", e)),
};

if let Err(e) = checkpoint.create_checkpoint(&db_backup_path) {
return log_and_error!(path, format!("Failed to create DB checkpoint: {}", e));
}

self.perform_key_backup(&target_dir)?;

let now = now_timestamp_as_millis_u64();

tracing::info!("Backup completed successfully at block height (synced): [Approach A]");

Ok(BackupResult {
path,
timestamp: now,
})
}

#[cfg(not(target_arch = "wasm32"))]
fn perform_key_backup(&self, target_dir: &Path) -> Result<(), ErrorObjectOwned> {
let keys_to_copy = [(&self.ckb_key_path, "key"), (&self.fiber_key_path, "sk")];

for (src_file, dest_name) in keys_to_copy {
if src_file.exists() {
let dest_file = target_dir.join(dest_name);
if let Err(e) = std::fs::copy(src_file, &dest_file) {
return log_and_error!(
target_dir,
format!("Failed to copy key file {:?}: {}", src_file, e)
);
}
tracing::info!("Successfully backed up key: {}", dest_name);
} else {
tracing::warn!("Key file not found at {:?}, skipping", src_file);
}
}
Ok(())
}
}

#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
use crate::test_utils::{generate_store, get_fiber_config, NetworkNode, TempDir};
use std::fs;

async fn setup_test_impl() -> (InfoRpcServerImpl<crate::store::Store>, TempDir) {
let (store, tempdir) = generate_store();
let fiber_config = get_fiber_config(tempdir.as_ref(), Some("backup_test"));

let ckb_config = CkbConfig {
base_dir: Some(PathBuf::from(tempdir.as_ref())),
rpc_url: "http://127.0.0.1:8114".to_string(),
udt_whitelist: None,
tx_tracing_polling_interval_ms: 4000,
#[cfg(not(target_arch = "wasm32"))]
funding_tx_shell_builder: None,
};

let ckb_key_dir = ckb_config.base_dir.as_ref().unwrap();
let fiber_key_dir = fiber_config.base_dir().to_path_buf();

fs::create_dir_all(ckb_key_dir).unwrap();
fs::create_dir_all(&fiber_key_dir).unwrap();
fs::write(ckb_key_dir.join("key"), "mock_ckb_key").unwrap();
fs::write(fiber_key_dir.join("sk"), "mock_fiber_key").unwrap();

let node = NetworkNode::new_with_node_name_opt(Some("backup_test".to_string()));
let actor = node.await.get_actor();

let server = InfoRpcServerImpl::new(actor, store, ckb_config, Some(fiber_config));

(server, tempdir)
}

#[tokio::test]
async fn test_rpc_backup_now_success() {
let (server, root) = setup_test_impl().await;

// Construct path
let backup_path = root.as_ref().join("backup_v1");
let path_str = backup_path.to_str().unwrap().to_string();

let result = server
.backup_now(path_str)
.await
.expect("Backup should succeed");

// Assert path correction
assert!(result.path.contains("backup_v1"));

// Assert path exists
assert!(backup_path.exists());
assert!(backup_path.join("db").exists());

// Check copied file
let ckb_key = backup_path.join("key");
let fiber_key = backup_path.join("sk");

assert!(
ckb_key.exists() || fiber_key.exists(),
"At least one key should be backed up"
);
}

#[tokio::test]
async fn test_rpc_backup_now_already_exists() {
let (server, root) = setup_test_impl().await;

// Using an existing path
let path_str = root.as_ref().to_str().unwrap().to_string();

let result = server.backup_now(path_str).await;

assert!(result.is_err());
let err = result.unwrap_err();
// Should be -32000 (Invalid Params)
assert_eq!(err.code(), -32000);
}
}
14 changes: 11 additions & 3 deletions crates/fiber-lib/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod server {
use crate::rpc::peer::{PeerRpcServer, PeerRpcServerImpl};
#[cfg(all(feature = "pprof", not(target_arch = "wasm32")))]
use crate::rpc::prof::{ProfRpcServer, ProfRpcServerImpl};
use crate::store::store_impl::KVStore;
use crate::{
cch::CchMessage,
fiber::{
Expand Down Expand Up @@ -218,7 +219,7 @@ pub mod server {

#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
pub async fn start_rpc<S: RpcServerStore + Clone + Send + Sync + 'static>(
pub async fn start_rpc<S: RpcServerStore + KVStore + Clone + Send + Sync + 'static>(
config: RpcConfig,
ckb_config: Option<CkbConfig>,
fiber_config: Option<FiberConfig>,
Expand Down Expand Up @@ -249,8 +250,12 @@ pub mod server {
if config.is_module_enabled("invoice") {
modules
.merge(
InvoiceRpcServerImpl::new(store.clone(), network_actor.clone(), fiber_config)
.into_rpc(),
InvoiceRpcServerImpl::new(
store.clone(),
network_actor.clone(),
fiber_config.clone(),
)
.into_rpc(),
)
.unwrap();
}
Expand All @@ -261,11 +266,14 @@ pub mod server {
}
if let Some(network_actor) = network_actor {
if config.is_module_enabled("info") {
#[cfg(not(target_arch = "wasm32"))]
modules
.merge(
InfoRpcServerImpl::new(
network_actor.clone(),
store.clone(),
ckb_config.clone().expect("ckb config should be set"),
fiber_config.clone(),
)
.into_rpc(),
)
Expand Down
5 changes: 5 additions & 0 deletions crates/fiber-lib/src/store/store_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ where
.unwrap_or_else(|e| panic!("deserialization of {} failed: {}", field_name, e))
}

#[cfg(not(target_arch = "wasm32"))]
pub trait KVStore {
fn inner_db(&self) -> &std::sync::Arc<rocksdb::DB>;
}

impl Store {
/// Open or create a rocksdb
fn check_migrate<P: AsRef<Path>>(path: P, db: &Self) -> Result<(), String> {
Expand Down
10 changes: 8 additions & 2 deletions crates/fiber-lib/src/store/store_impl/native.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::check_migrate;
use super::{KeyValue, StoreChange, StoreKeyValue};

use super::{KVStore, KeyValue, StoreChange, StoreKeyValue};
use ractor::OutputPort;
pub use rocksdb::Direction as DbDirection;
pub use rocksdb::IteratorMode;
Expand Down Expand Up @@ -93,6 +92,13 @@ impl Store {
}
}

impl KVStore for Store {
/// Returns the underlying database instance for backup and maintenance.
fn inner_db(&self) -> &Arc<DB> {
&self.db
}
}

pub struct BatchWatcher {
inner: Arc<dyn StoreChangeWatcher>,
pending_changes: Vec<StoreChange>,
Expand Down
Loading
Loading