diff --git a/Cargo.lock b/Cargo.lock index 3dd5bc52b71c4..81a5f1df01038 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -755,7 +755,7 @@ version = "4.0.0-dev" dependencies = [ "array-bytes", "env_logger 0.9.3", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "log", "sp-core", "sp-runtime", @@ -3114,6 +3114,11 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e7d7786361d7425ae2fe4f9e407eb0efaa0840f5212d109cc018c40c35c6ab4" +[[package]] +name = "hash-db" +version = "0.16.0" +source = "git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value#77f19e2ca5fb45bec3e1dbcca82a9023d75c312d" + [[package]] name = "hash256-std-hasher" version = "0.15.2" @@ -3808,7 +3813,7 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19ea4653859ca2266a86419d3f592d3f22e7a854b482f99180d2498507902048" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "hash256-std-hasher", "tiny-keccak", ] @@ -4708,7 +4713,15 @@ version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808b50db46293432a45e63bc15ea51e0ab4c0a1647b8eb114e31a3e698dd6fbe" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "memory-db" +version = "0.32.0" +source = "git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value#77f19e2ca5fb45bec3e1dbcca82a9023d75c312d" +dependencies = [ + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -5107,7 +5120,7 @@ dependencies = [ "derive_more", "fs_extra", "futures", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "kitchensink-runtime", "kvdb", "kvdb-rocksdb", @@ -8757,7 +8770,7 @@ version = "0.10.0-dev" dependencies = [ "array-bytes", "criterion", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "kitchensink-runtime", "kvdb", "kvdb-memorydb", @@ -10404,7 +10417,7 @@ dependencies = [ name = "sp-api" version = "4.0.0-dev" dependencies = [ - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "log", "parity-scale-codec", "scale-info", @@ -10765,7 +10778,7 @@ dependencies = [ "dyn-clonable", "ed25519-zebra", "futures", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "hash256-std-hasher", "impl-serde", "lazy_static", @@ -11168,7 +11181,7 @@ version = "0.28.0" dependencies = [ "array-bytes", "assert_matches", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "log", "parity-scale-codec", "parking_lot 0.12.1", @@ -11183,7 +11196,7 @@ dependencies = [ "sp-trie", "thiserror", "tracing", - "trie-db", + "trie-db 0.27.1 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -11290,10 +11303,10 @@ dependencies = [ "ahash 0.8.3", "array-bytes", "criterion", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "hashbrown 0.13.2", "lazy_static", - "memory-db", + "memory-db 0.32.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "nohash-hasher", "parity-scale-codec", "parking_lot 0.12.1", @@ -11305,8 +11318,8 @@ dependencies = [ "thiserror", "tracing", "trie-bench", - "trie-db", - "trie-root", + "trie-db 0.27.1 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", + "trie-root 0.18.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "trie-standardmap", ] @@ -11645,7 +11658,7 @@ dependencies = [ "sp-runtime", "sp-state-machine", "sp-trie", - "trie-db", + "trie-db 0.27.1 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -11720,7 +11733,7 @@ dependencies = [ "sp-version", "substrate-test-runtime-client", "substrate-wasm-builder", - "trie-db", + "trie-db 0.27.1 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -12340,12 +12353,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f54b4f9d51d368e62cf7e0730c7c1e18fc658cc84333656bab5b328f44aa964" dependencies = [ "criterion", - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hasher", - "memory-db", + "memory-db 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec", - "trie-db", - "trie-root", + "trie-db 0.27.1 (registry+https://github.com/rust-lang/crates.io-index)", + "trie-root 0.18.0 (registry+https://github.com/rust-lang/crates.io-index)", "trie-standardmap", ] @@ -12355,7 +12368,19 @@ version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "767abe6ffed88a1889671a102c2861ae742726f52e0a5a425b92c9fbfa7e9c85" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hashbrown 0.13.2", + "log", + "rustc-hex", + "smallvec", +] + +[[package]] +name = "trie-db" +version = "0.27.1" +source = "git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value#77f19e2ca5fb45bec3e1dbcca82a9023d75c312d" +dependencies = [ + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "hashbrown 0.13.2", "log", "rustc-hex", @@ -12368,7 +12393,15 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4ed310ef5ab98f5fa467900ed906cb9232dd5376597e00fd4cba2a449d06c0b" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "trie-root" +version = "0.18.0" +source = "git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value#77f19e2ca5fb45bec3e1dbcca82a9023d75c312d" +dependencies = [ + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -12377,7 +12410,7 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "684aafb332fae6f83d7fe10b3fbfdbe39a1b3234c4e2a618f030815838519516" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hasher", ] diff --git a/bin/node/bench/Cargo.toml b/bin/node/bench/Cargo.toml index 7703f8ed2e4e0..2ce17ae94f90a 100644 --- a/bin/node/bench/Cargo.toml +++ b/bin/node/bench/Cargo.toml @@ -33,7 +33,7 @@ sc-basic-authorship = { version = "0.10.0-dev", path = "../../../client/basic-au sp-inherents = { version = "4.0.0-dev", path = "../../../primitives/inherents" } sp-timestamp = { version = "4.0.0-dev", default-features = false, path = "../../../primitives/timestamp" } sp-tracing = { version = "10.0.0", path = "../../../primitives/tracing" } -hash-db = "0.16.0" +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value" } tempfile = "3.1.0" fs_extra = "1" rand = { version = "0.8.5", features = ["small_rng"] } diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index 465e1988478d7..2d02a861c065e 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -476,6 +476,21 @@ pub trait StorageProvider> { child_info: &ChildInfo, key: &StorageKey, ) -> sp_blockchain::Result>; + + /// Given a block's `Hash` and a key, return the closest merkle value. + fn closest_merkle_value( + &self, + hash: Block::Hash, + key: &StorageKey, + ) -> sp_blockchain::Result>; + + /// Given a block's `Hash`, a key and a child storage key, return the closest merkle value. + fn child_closest_merkle_value( + &self, + hash: Block::Hash, + child_info: &ChildInfo, + key: &StorageKey, + ) -> sp_blockchain::Result>; } /// Client backend. diff --git a/client/db/Cargo.toml b/client/db/Cargo.toml index 1845158dac112..55ef7278abf4d 100644 --- a/client/db/Cargo.toml +++ b/client/db/Cargo.toml @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] codec = { package = "parity-scale-codec", version = "3.6.1", features = [ "derive", ] } -hash-db = "0.16.0" +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value" } kvdb = "0.13.0" kvdb-memorydb = "0.13.0" kvdb-rocksdb = { version = "0.19.0", optional = true } diff --git a/client/db/src/bench.rs b/client/db/src/bench.rs index 9307a63ad444e..d8ba45c3e6cc4 100644 --- a/client/db/src/bench.rs +++ b/client/db/src/bench.rs @@ -383,6 +383,24 @@ impl StateBackend> for BenchmarkingState { .child_storage_hash(child_info, key) } + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error> { + self.add_read_key(None, key); + self.state.borrow().as_ref().ok_or_else(state_err)?.closest_merkle_value(key) + } + + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + self.add_read_key(None, key); + self.state + .borrow() + .as_ref() + .ok_or_else(state_err)? + .child_closest_merkle_value(child_info, key) + } + fn exists_storage(&self, key: &[u8]) -> Result { self.add_read_key(None, key); self.state.borrow().as_ref().ok_or_else(state_err)?.exists_storage(key) diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index aba5b0829b5bb..fb3cc172a886f 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -215,6 +215,18 @@ impl StateBackend> for RefTrackingState { self.state.child_storage_hash(child_info, key) } + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error> { + self.state.closest_merkle_value(key) + } + + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + self.state.child_closest_merkle_value(child_info, key) + } + fn exists_storage(&self, key: &[u8]) -> Result { self.state.exists_storage(key) } diff --git a/client/db/src/record_stats_state.rs b/client/db/src/record_stats_state.rs index 005315ce9f458..b7244db5fd8ac 100644 --- a/client/db/src/record_stats_state.rs +++ b/client/db/src/record_stats_state.rs @@ -145,6 +145,18 @@ impl>, B: BlockT> StateBackend> self.state.child_storage_hash(child_info, key) } + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error> { + self.state.closest_merkle_value(key) + } + + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + self.state.child_closest_merkle_value(child_info, key) + } + fn exists_storage(&self, key: &[u8]) -> Result { self.state.exists_storage(key) } diff --git a/client/rpc-spec-v2/src/chain_head/api.rs b/client/rpc-spec-v2/src/chain_head/api.rs index c002b75efe037..682cd690dd10c 100644 --- a/client/rpc-spec-v2/src/chain_head/api.rs +++ b/client/rpc-spec-v2/src/chain_head/api.rs @@ -119,4 +119,31 @@ pub trait ChainHeadApi { /// This method is unstable and subject to change in the future. #[method(name = "chainHead_unstable_unpin", blocking)] fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>; + + /// Resumes a storage fetch started with `chainHead_storage` after it has generated an + /// `operationWaitingForContinue` event. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "chainHead_unstable_continue", blocking)] + fn chain_head_unstable_continue( + &self, + follow_subscription: String, + operation_id: String, + ) -> RpcResult<()>; + + /// Stops an operation started with chainHead_unstable_body, chainHead_unstable_call, or + /// chainHead_unstable_storage. If the operation was still in progress, this interrupts it. If + /// the operation was already finished, this call has no effect. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "chainHead_unstable_stopOperation", blocking)] + fn chain_head_unstable_stop_operation( + &self, + follow_subscription: String, + operation_id: String, + ) -> RpcResult<()>; } diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 79cf251f18068..7be934531332d 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -27,7 +27,7 @@ use crate::{ api::ChainHeadApiServer, chain_head_follow::ChainHeadFollower, error::Error as ChainHeadRpcError, - event::{FollowEvent, MethodResponse, OperationError, StorageQuery, StorageQueryType}, + event::{FollowEvent, MethodResponse, OperationError, StorageQuery}, hex_string, subscription::{SubscriptionManagement, SubscriptionManagementError}, }, @@ -61,6 +61,9 @@ pub struct ChainHeadConfig { pub subscription_max_pinned_duration: Duration, /// The maximum number of ongoing operations per subscription. pub subscription_max_ongoing_operations: usize, + /// The maximum number of items reported by the `chainHead_storage` before + /// pagination is required. + pub operation_max_storage_items: usize, } /// Maximum pinned blocks across all connections. @@ -78,12 +81,17 @@ const MAX_PINNED_DURATION: Duration = Duration::from_secs(60); /// Note: The lower limit imposed by the spec is 16. const MAX_ONGOING_OPERATIONS: usize = 16; +/// The maximum number of items the `chainHead_storage` can return +/// before paginations is required. +const MAX_STORAGE_ITER_ITEMS: usize = 5; + impl Default for ChainHeadConfig { fn default() -> Self { ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, + operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, } } } @@ -100,6 +108,9 @@ pub struct ChainHead, Block: BlockT, Client> { subscriptions: Arc>, /// The hexadecimal encoded hash of the genesis block. genesis_hash: String, + /// The maximum number of items reported by the `chainHead_storage` before + /// pagination is required. + operation_max_storage_items: usize, /// Phantom member to pin the block type. _phantom: PhantomData, } @@ -124,6 +135,7 @@ impl, Block: BlockT, Client> ChainHead { config.subscription_max_ongoing_operations, backend, )), + operation_max_storage_items: config.operation_max_storage_items, genesis_hash, _phantom: PhantomData, } @@ -232,7 +244,7 @@ where follow_subscription: String, hash: Block::Hash, ) -> RpcResult { - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { + let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached), @@ -243,6 +255,8 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; + let operation_id = block_guard.operation().operation_id(); + let event = match self.client.block(hash) { Ok(Some(signed_block)) => { let extrinsics = signed_block @@ -252,7 +266,7 @@ where .map(|extrinsic| hex_string(&extrinsic.encode())) .collect(); FollowEvent::::OperationBodyDone(OperationBodyDone { - operation_id: block_guard.operation_id(), + operation_id: operation_id.clone(), value: extrinsics, }) }, @@ -268,16 +282,13 @@ where return Err(ChainHeadRpcError::InvalidBlock.into()) }, Err(error) => FollowEvent::::OperationError(OperationError { - operation_id: block_guard.operation_id(), + operation_id: operation_id.clone(), error: error.to_string(), }), }; let _ = block_guard.response_sender().unbounded_send(event); - Ok(MethodResponse::Started(MethodResponseStarted { - operation_id: block_guard.operation_id(), - discarded_items: None, - })) + Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None })) } fn chain_head_unstable_header( @@ -318,26 +329,19 @@ where let items = items .into_iter() .map(|query| { - if query.query_type == StorageQueryType::ClosestDescendantMerkleValue { - // Note: remove this once all types are implemented. - return Err(ChainHeadRpcError::InvalidParam( - "Storage query type not supported".into(), - )) - } - Ok(StorageQuery { key: StorageKey(parse_hex_param(query.key)?), query_type: query.query_type, }) }) - .collect::, _>>()?; + .collect::, ChainHeadRpcError>>()?; let child_trie = child_trie .map(|child_trie| parse_hex_param(child_trie)) .transpose()? .map(ChildInfo::new_default_from_vec); - let block_guard = + let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | @@ -349,17 +353,21 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - let storage_client = ChainHeadStorage::::new(self.client.clone()); - let operation_id = block_guard.operation_id(); + let mut storage_client = ChainHeadStorage::::new( + self.client.clone(), + self.operation_max_storage_items, + ); + let operation = block_guard.operation(); + let operation_id = operation.operation_id(); // The number of operations we are allowed to execute. - let num_operations = block_guard.num_reserved(); + let num_operations = operation.num_reserved(); let discarded = items.len().saturating_sub(num_operations); let mut items = items; items.truncate(num_operations); let fut = async move { - storage_client.generate_events(block_guard, hash, items, child_trie); + storage_client.generate_events(block_guard, hash, items, child_trie).await; }; self.executor @@ -379,7 +387,7 @@ where ) -> RpcResult { let call_parameters = Bytes::from(parse_hex_param(call_parameters)?); - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { + let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | Err(SubscriptionManagementError::ExceededLimits) => { @@ -401,28 +409,26 @@ where .into()) } + let operation_id = block_guard.operation().operation_id(); let event = self .client .executor() .call(hash, &function, &call_parameters, CallContext::Offchain) .map(|result| { FollowEvent::::OperationCallDone(OperationCallDone { - operation_id: block_guard.operation_id(), + operation_id: operation_id.clone(), output: hex_string(&result), }) }) .unwrap_or_else(|error| { FollowEvent::::OperationError(OperationError { - operation_id: block_guard.operation_id(), + operation_id: operation_id.clone(), error: error.to_string(), }) }); let _ = block_guard.response_sender().unbounded_send(event); - Ok(MethodResponse::Started(MethodResponseStarted { - operation_id: block_guard.operation_id(), - discarded_items: None, - })) + Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None })) } fn chain_head_unstable_unpin( @@ -443,4 +449,35 @@ where Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()), } } + + fn chain_head_unstable_continue( + &self, + follow_subscription: String, + operation_id: String, + ) -> RpcResult<()> { + let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else { + return Ok(()) + }; + + if !operation.submit_continue() { + // Continue called without generating a `WaitingForContinue` event. + Err(ChainHeadRpcError::InvalidContinue.into()) + } else { + Ok(()) + } + } + + fn chain_head_unstable_stop_operation( + &self, + follow_subscription: String, + operation_id: String, + ) -> RpcResult<()> { + let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else { + return Ok(()) + }; + + operation.stop_operation(); + + Ok(()) + } } diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 393e4489c8c07..224a50b487787 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -18,7 +18,7 @@ //! Implementation of the `chainHead_storage` method. -use std::{marker::PhantomData, sync::Arc}; +use std::{collections::VecDeque, marker::PhantomData, sync::Arc}; use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider}; use sc_utils::mpsc::TracingUnboundedSender; @@ -37,10 +37,6 @@ use super::{ FollowEvent, }; -/// The maximum number of items the `chainHead_storage` can return -/// before paginations is required. -const MAX_ITER_ITEMS: usize = 10; - /// The query type of an interation. enum IterQueryType { /// Iterating over (key, value) pairs. @@ -53,16 +49,34 @@ enum IterQueryType { pub struct ChainHeadStorage { /// Substrate client. client: Arc, - _phantom: PhantomData<(Block, BE)>, + /// Queue of operations that may require pagination. + iter_operations: VecDeque, + /// The maximum number of items reported by the `chainHead_storage` before + /// pagination is required. + operation_max_storage_items: usize, + _phandom: PhantomData<(BE, Block)>, } impl ChainHeadStorage { /// Constructs a new [`ChainHeadStorage`]. - pub fn new(client: Arc) -> Self { - Self { client, _phantom: PhantomData } + pub fn new(client: Arc, operation_max_storage_items: usize) -> Self { + Self { + client, + iter_operations: VecDeque::new(), + operation_max_storage_items, + _phandom: PhantomData, + } } } +/// Query to iterate over storage. +struct QueryIter { + /// The next key from which the iteration should continue. + next_key: StorageKey, + /// The type of the query (either value or hash). + ty: IterQueryType, +} + /// Checks if the provided key (main or child key) is valid /// for queries. /// @@ -77,7 +91,7 @@ fn is_key_queryable(key: &[u8]) -> bool { type QueryResult = Result, String>; /// The result of iterating over keys. -type QueryIterResult = Result, String>; +type QueryIterResult = Result<(Vec, Option), String>; impl ChainHeadStorage where @@ -131,64 +145,143 @@ where .unwrap_or_else(|error| QueryResult::Err(error.to_string())) } - /// Handle iterating over (key, value) or (key, hash) pairs. - fn query_storage_iter( + /// Fetch the closest merkle value. + fn query_storage_merkle_value( &self, hash: Block::Hash, key: &StorageKey, child_key: Option<&ChildInfo>, - ty: IterQueryType, + ) -> QueryResult { + let result = if let Some(child_key) = child_key { + self.client.child_closest_merkle_value(hash, child_key, key) + } else { + self.client.closest_merkle_value(hash, key) + }; + + result + .map(|opt| { + QueryResult::Ok(opt.map(|storage_data| StorageResult { + key: hex_string(&key.0), + result: StorageResultType::ClosestDescendantMerkleValue(hex_string( + &storage_data.as_ref(), + )), + })) + }) + .unwrap_or_else(|error| QueryResult::Err(error.to_string())) + } + + /// Iterate over at most `operation_max_storage_items` keys. + /// + /// Returns the storage result with a potential next key to resume iteration. + fn query_storage_iter_pagination( + &self, + query: QueryIter, + hash: Block::Hash, + child_key: Option<&ChildInfo>, ) -> QueryIterResult { - let keys_iter = if let Some(child_key) = child_key { - self.client.child_storage_keys(hash, child_key.to_owned(), Some(key), None) + let QueryIter { next_key, ty } = query; + + let mut keys_iter = if let Some(child_key) = child_key { + self.client + .child_storage_keys(hash, child_key.to_owned(), Some(&next_key), None) } else { - self.client.storage_keys(hash, Some(key), None) + self.client.storage_keys(hash, Some(&next_key), None) } - .map_err(|error| error.to_string())?; + .map_err(|err| err.to_string())?; + + let mut ret = Vec::with_capacity(self.operation_max_storage_items); + for _ in 0..self.operation_max_storage_items { + let Some(key) = keys_iter.next() else { + break + }; - let mut ret = Vec::with_capacity(MAX_ITER_ITEMS); - let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS); - while let Some(key) = keys_iter.next() { let result = match ty { IterQueryType::Value => self.query_storage_value(hash, &key, child_key), IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key), }?; - if let Some(result) = result { - ret.push(result); + if let Some(value) = result { + ret.push(value); } } - QueryIterResult::Ok(ret) + // Save the next key if any to continue the iteration. + let maybe_next_query = keys_iter.next().map(|next_key| QueryIter { next_key, ty }); + Ok((ret, maybe_next_query)) } - /// Generate the block events for the `chainHead_storage` method. - pub fn generate_events( - &self, - block_guard: BlockGuard, + /// Iterate over (key, hash) and (key, value) generating the `WaitingForContinue` event if + /// necessary. + async fn generate_storage_iter_events( + &mut self, + mut block_guard: BlockGuard, hash: Block::Hash, - items: Vec>, child_key: Option, ) { - /// Build and send the opaque error back to the `chainHead_follow` method. - fn send_error( - sender: &TracingUnboundedSender>, - operation_id: String, - error: String, - ) { - let _ = - sender.unbounded_send(FollowEvent::::OperationError(OperationError { - operation_id, - error, - })); + let sender = block_guard.response_sender(); + let operation = block_guard.operation(); + + while let Some(query) = self.iter_operations.pop_front() { + if operation.was_stopped() { + return + } + + let result = self.query_storage_iter_pagination(query, hash, child_key.as_ref()); + let (events, maybe_next_query) = match result { + QueryIterResult::Ok(result) => result, + QueryIterResult::Err(error) => { + send_error::(&sender, operation.operation_id(), error.to_string()); + return + }, + }; + + if !events.is_empty() { + // Send back the results of the iteration produced so far. + let _ = sender.unbounded_send(FollowEvent::::OperationStorageItems( + OperationStorageItems { operation_id: operation.operation_id(), items: events }, + )); + } + + if let Some(next_query) = maybe_next_query { + let _ = + sender.unbounded_send(FollowEvent::::OperationWaitingForContinue( + OperationId { operation_id: operation.operation_id() }, + )); + + // The operation might be continued or cancelled only after the + // `OperationWaitingForContinue` is generated above. + operation.wait_for_continue().await; + + // Give a chance for the other items to advance next time. + self.iter_operations.push_back(next_query); + } } + if operation.was_stopped() { + return + } + + let _ = + sender.unbounded_send(FollowEvent::::OperationStorageDone(OperationId { + operation_id: operation.operation_id(), + })); + } + + /// Generate the block events for the `chainHead_storage` method. + pub async fn generate_events( + &mut self, + mut block_guard: BlockGuard, + hash: Block::Hash, + items: Vec>, + child_key: Option, + ) { let sender = block_guard.response_sender(); + let operation = block_guard.operation(); if let Some(child_key) = child_key.as_ref() { if !is_key_queryable(child_key.storage_key()) { let _ = sender.unbounded_send(FollowEvent::::OperationStorageDone( - OperationId { operation_id: block_guard.operation_id() }, + OperationId { operation_id: operation.operation_id() }, )); return } @@ -206,7 +299,7 @@ where Ok(Some(value)) => storage_results.push(value), Ok(None) => continue, Err(error) => { - send_error::(&sender, block_guard.operation_id(), error); + send_error::(&sender, operation.operation_id(), error); return }, } @@ -216,50 +309,49 @@ where Ok(Some(value)) => storage_results.push(value), Ok(None) => continue, Err(error) => { - send_error::(&sender, block_guard.operation_id(), error); + send_error::(&sender, operation.operation_id(), error); return }, }, - StorageQueryType::DescendantsValues => match self.query_storage_iter( - hash, - &item.key, - child_key.as_ref(), - IterQueryType::Value, - ) { - Ok(values) => storage_results.extend(values), - Err(error) => { - send_error::(&sender, block_guard.operation_id(), error); - return - }, - }, - StorageQueryType::DescendantsHashes => match self.query_storage_iter( - hash, - &item.key, - child_key.as_ref(), - IterQueryType::Hash, - ) { - Ok(values) => storage_results.extend(values), - Err(error) => { - send_error::(&sender, block_guard.operation_id(), error); - return + StorageQueryType::ClosestDescendantMerkleValue => + match self.query_storage_merkle_value(hash, &item.key, child_key.as_ref()) { + Ok(Some(value)) => storage_results.push(value), + Ok(None) => continue, + Err(error) => { + send_error::(&sender, operation.operation_id(), error); + return + }, }, - }, - _ => continue, + StorageQueryType::DescendantsValues => self + .iter_operations + .push_back(QueryIter { next_key: item.key, ty: IterQueryType::Value }), + StorageQueryType::DescendantsHashes => self + .iter_operations + .push_back(QueryIter { next_key: item.key, ty: IterQueryType::Hash }), }; } if !storage_results.is_empty() { let _ = sender.unbounded_send(FollowEvent::::OperationStorageItems( OperationStorageItems { - operation_id: block_guard.operation_id(), + operation_id: operation.operation_id(), items: storage_results, }, )); } - let _ = - sender.unbounded_send(FollowEvent::::OperationStorageDone(OperationId { - operation_id: block_guard.operation_id(), - })); + self.generate_storage_iter_events(block_guard, hash, child_key).await } } + +/// Build and send the opaque error back to the `chainHead_follow` method. +fn send_error( + sender: &TracingUnboundedSender>, + operation_id: String, + error: String, +) { + let _ = sender.unbounded_send(FollowEvent::::OperationError(OperationError { + operation_id, + error, + })); +} diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 9f42be4a2f7f6..d6f64acd63f5f 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -17,12 +17,13 @@ // along with this program. If not, see . use futures::channel::oneshot; +use parking_lot::Mutex; use sc_client_api::Backend; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap}, - sync::Arc, + sync::{atomic::AtomicBool, Arc}, time::{Duration, Instant}, }; @@ -154,12 +155,184 @@ struct PermitOperations { _permit: tokio::sync::OwnedSemaphorePermit, } -impl PermitOperations { +/// The state of one operation. +/// +/// This is directly exposed to users via `chain_head_unstable_continue` and +/// `chain_head_unstable_stop_operation`. +#[derive(Clone)] +pub struct OperationState { + /// The shared operation state that holds information about the + /// `waitingForContinue` event and cancellation. + shared_state: Arc, + /// Send notifications when the user calls `chainHead_continue` method. + send_continue: tokio::sync::mpsc::Sender<()>, +} + +impl OperationState { + /// Returns true if `chainHead_continue` is called after the + /// `waitingForContinue` event was emitted for the associated + /// operation ID. + pub fn submit_continue(&self) -> bool { + // `waitingForContinue` not generated. + if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) { + return false + } + + // Has enough capacity for 1 message. + // Can fail if the `stop_operation` propagated the stop first. + self.send_continue.try_send(()).is_ok() + } + + /// Stops the operation if `waitingForContinue` event was emitted for the associated + /// operation ID. + /// + /// Returns nothing in accordance with `chainHead_unstable_stopOperation`. + pub fn stop_operation(&self) { + // `waitingForContinue` not generated. + if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) { + return + } + + self.shared_state + .operation_stopped + .store(true, std::sync::atomic::Ordering::Release); + + // Send might not have enough capacity if `submit_continue` was sent first. + // However, the `operation_stopped` boolean was set. + let _ = self.send_continue.try_send(()); + } +} + +/// The shared operation state between the backend [`RegisteredOperation`] and frontend +/// [`RegisteredOperation`]. +struct SharedOperationState { + /// True if the `chainHead` generated `waitingForContinue` event. + requested_continue: AtomicBool, + /// True if the operation was cancelled by the user. + operation_stopped: AtomicBool, +} + +impl SharedOperationState { + /// Constructs a new [`SharedOperationState`]. + /// + /// This is efficiently cloned under a single heap allocation. + fn new() -> Arc { + Arc::new(SharedOperationState { + requested_continue: AtomicBool::new(false), + operation_stopped: AtomicBool::new(false), + }) + } +} + +/// The registered operation passed to the `chainHead` methods. +/// +/// This is used internally by the `chainHead` methods. +pub struct RegisteredOperation { + /// The shared operation state that holds information about the + /// `waitingForContinue` event and cancellation. + shared_state: Arc, + /// Receive notifications when the user calls `chainHead_continue` method. + recv_continue: tokio::sync::mpsc::Receiver<()>, + /// The operation ID of the request. + operation_id: String, + /// Track the operations ID of this subscription. + operations: Arc>>, + /// Permit a number of items to be executed by this operation. + permit: PermitOperations, +} + +impl RegisteredOperation { + /// Wait until the user calls `chainHead_continue` or the operation + /// is cancelled via `chainHead_stopOperation`. + pub async fn wait_for_continue(&mut self) { + self.shared_state + .requested_continue + .store(true, std::sync::atomic::Ordering::Release); + + // The sender part of this channel is around for as long as this object exists, + // because it is stored in the `OperationState` of the `operations` field. + // The sender part is removed from tracking when this object is dropped. + let _ = self.recv_continue.recv().await; + + self.shared_state + .requested_continue + .store(false, std::sync::atomic::Ordering::Release); + } + + /// Returns true if the current operation was stopped. + pub fn was_stopped(&self) -> bool { + self.shared_state.operation_stopped.load(std::sync::atomic::Ordering::Acquire) + } + + /// Get the operation ID. + pub fn operation_id(&self) -> String { + self.operation_id.clone() + } + /// Returns the number of reserved elements for this permit. /// /// This can be smaller than the number of items requested via [`LimitOperations::reserve()`]. - fn num_reserved(&self) -> usize { - self.num_ops + pub fn num_reserved(&self) -> usize { + self.permit.num_ops + } +} + +impl Drop for RegisteredOperation { + fn drop(&mut self) { + let mut operations = self.operations.lock(); + operations.remove(&self.operation_id); + } +} + +/// The ongoing operations of a subscription. +struct Operations { + /// The next operation ID to be generated. + next_operation_id: usize, + /// Limit the number of ongoing operations. + limits: LimitOperations, + /// Track the operations ID of this subscription. + operations: Arc>>, +} + +impl Operations { + /// Constructs a new [`Operations`]. + fn new(max_operations: usize) -> Self { + Operations { + next_operation_id: 0, + limits: LimitOperations::new(max_operations), + operations: Default::default(), + } + } + + /// Register a new operation. + pub fn register_operation(&mut self, to_reserve: usize) -> Option { + let permit = self.limits.reserve_at_most(to_reserve)?; + + let operation_id = self.next_operation_id(); + + // At most one message can be sent. + let (send_continue, recv_continue) = tokio::sync::mpsc::channel(1); + let shared_state = SharedOperationState::new(); + + let state = OperationState { send_continue, shared_state: shared_state.clone() }; + + // Cloned operations for removing the current ID on drop. + let operations = self.operations.clone(); + operations.lock().insert(operation_id.clone(), state); + + Some(RegisteredOperation { shared_state, operation_id, recv_continue, operations, permit }) + } + + /// Get the associated operation state with the ID. + pub fn get_operation(&self, id: &str) -> Option { + self.operations.lock().get(id).map(|state| state.clone()) + } + + /// Generate the next operation ID for this subscription. + fn next_operation_id(&mut self) -> String { + let op_id = self.next_operation_id; + self.next_operation_id += 1; + op_id.to_string() } } @@ -180,10 +353,8 @@ struct SubscriptionState { /// /// This object is cloned between methods. response_sender: TracingUnboundedSender>, - /// Limit the number of ongoing operations. - limits: LimitOperations, - /// The next operation ID. - next_operation_id: usize, + /// The ongoing operations of a subscription. + operations: Operations, /// Track the block hashes available for this subscription. /// /// This implementation assumes: @@ -296,18 +467,16 @@ impl SubscriptionState { timestamp } - /// Generate the next operation ID for this subscription. - fn next_operation_id(&mut self) -> usize { - let op_id = self.next_operation_id; - self.next_operation_id = self.next_operation_id.wrapping_add(1); - op_id + /// Register a new operation. + /// + /// The registered operation can execute at least one item and at most the requested items. + fn register_operation(&mut self, to_reserve: usize) -> Option { + self.operations.register_operation(to_reserve) } - /// Reserves capacity to execute at least one operation and at most the requested items. - /// - /// For more details see [`PermitOperations`]. - fn reserve_at_most(&self, to_reserve: usize) -> Option { - self.limits.reserve_at_most(to_reserve) + /// Get the associated operation state with the ID. + pub fn get_operation(&self, id: &str) -> Option { + self.operations.get_operation(id) } } @@ -318,8 +487,7 @@ pub struct BlockGuard> { hash: Block::Hash, with_runtime: bool, response_sender: TracingUnboundedSender>, - operation_id: String, - permit_operations: PermitOperations, + operation: RegisteredOperation, backend: Arc, } @@ -337,22 +505,14 @@ impl> BlockGuard { hash: Block::Hash, with_runtime: bool, response_sender: TracingUnboundedSender>, - operation_id: usize, - permit_operations: PermitOperations, + operation: RegisteredOperation, backend: Arc, ) -> Result { backend .pin_block(hash) .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; - Ok(Self { - hash, - with_runtime, - response_sender, - operation_id: operation_id.to_string(), - permit_operations, - backend, - }) + Ok(Self { hash, with_runtime, response_sender, operation, backend }) } /// The `with_runtime` flag of the subscription. @@ -365,16 +525,9 @@ impl> BlockGuard { self.response_sender.clone() } - /// The operation ID of this method. - pub fn operation_id(&self) -> String { - self.operation_id.clone() - } - - /// Returns the number of reserved elements for this permit. - /// - /// This can be smaller than the number of items requested. - pub fn num_reserved(&self) -> usize { - self.permit_operations.num_reserved() + /// Get the details of the registered operation. + pub fn operation(&mut self) -> &mut RegisteredOperation { + &mut self.operation } } @@ -445,9 +598,8 @@ impl> SubscriptionsInner { with_runtime, tx_stop: Some(tx_stop), response_sender, - limits: LimitOperations::new(self.max_ongoing_operations), - next_operation_id: 0, blocks: Default::default(), + operations: Operations::new(self.max_ongoing_operations), }; entry.insert(state); @@ -631,21 +783,24 @@ impl> SubscriptionsInner { return Err(SubscriptionManagementError::BlockHashAbsent) } - let Some(permit_operations) = sub.reserve_at_most(to_reserve) else { + let Some(operation) = sub.register_operation(to_reserve) else { // Error when the server cannot execute at least one operation. return Err(SubscriptionManagementError::ExceededLimits) }; - let operation_id = sub.next_operation_id(); BlockGuard::new( hash, sub.with_runtime, sub.response_sender.clone(), - operation_id, - permit_operations, + operation, self.backend.clone(), ) } + + pub fn get_operation(&mut self, sub_id: &str, id: &str) -> Option { + let state = self.subs.get(sub_id)?; + state.get_operation(id) + } } #[cfg(test)] @@ -758,8 +913,7 @@ mod tests { with_runtime: false, tx_stop: None, response_sender, - next_operation_id: 0, - limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB), + operations: Operations::new(MAX_OPERATIONS_PER_SUB), blocks: Default::default(), }; @@ -788,9 +942,8 @@ mod tests { with_runtime: false, tx_stop: None, response_sender, - next_operation_id: 0, - limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB), blocks: Default::default(), + operations: Operations::new(MAX_OPERATIONS_PER_SUB), }; let hash = H256::random(); @@ -1107,12 +1260,12 @@ mod tests { // One operation is reserved. let permit_one = ops.reserve_at_most(1).unwrap(); - assert_eq!(permit_one.num_reserved(), 1); + assert_eq!(permit_one.num_ops, 1); // Request 2 operations, however there is capacity only for one. let permit_two = ops.reserve_at_most(2).unwrap(); // Number of reserved permits is smaller than provided. - assert_eq!(permit_two.num_reserved(), 1); + assert_eq!(permit_two.num_ops, 1); // Try to reserve operations when there's no space. let permit = ops.reserve_at_most(1); @@ -1123,6 +1276,6 @@ mod tests { // Can reserve again let permit_three = ops.reserve_at_most(1).unwrap(); - assert_eq!(permit_three.num_reserved(), 1); + assert_eq!(permit_three.num_ops, 1); } } diff --git a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 39618ecfc1b3e..b25b1a4913b49 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -25,6 +25,8 @@ mod error; mod inner; use self::inner::SubscriptionsInner; + +pub use self::inner::OperationState; pub use error::SubscriptionManagementError; pub use inner::{BlockGuard, InsertedSubscriptionData}; @@ -126,4 +128,10 @@ impl> SubscriptionManagement { let mut inner = self.inner.write(); inner.lock_block(sub_id, hash, to_reserve) } + + /// Get the operation state. + pub fn get_operation(&self, sub_id: &str, operation_id: &str) -> Option { + let mut inner = self.inner.write(); + inner.get_operation(sub_id, operation_id) + } } diff --git a/client/rpc-spec-v2/src/chain_head/test_utils.rs b/client/rpc-spec-v2/src/chain_head/test_utils.rs index 54c585932a744..628aaa39e505b 100644 --- a/client/rpc-spec-v2/src/chain_head/test_utils.rs +++ b/client/rpc-spec-v2/src/chain_head/test_utils.rs @@ -198,6 +198,23 @@ impl< ) -> sp_blockchain::Result> { self.client.child_storage_hash(hash, child_info, key) } + + fn closest_merkle_value( + &self, + hash: ::Hash, + key: &StorageKey, + ) -> sp_blockchain::Result::Hash>> { + self.client.closest_merkle_value(hash, key) + } + + fn child_closest_merkle_value( + &self, + hash: ::Hash, + child_info: &ChildInfo, + key: &StorageKey, + ) -> sp_blockchain::Result::Hash>> { + self.client.child_closest_merkle_value(hash, child_info, key) + } } impl> CallApiAt for ChainHeadMockClient { diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 4bda06d3cf01c..746a799d225ce 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -25,7 +25,12 @@ use sp_core::{ Blake2Hasher, Hasher, }; use sp_version::RuntimeVersion; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + sync::Arc, + time::Duration, +}; use substrate_test_runtime::Transfer; use substrate_test_runtime_client::{ prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client, @@ -37,12 +42,14 @@ type Block = substrate_test_runtime_client::runtime::Block; const MAX_PINNED_BLOCKS: usize = 32; const MAX_PINNED_SECS: u64 = 60; const MAX_OPERATIONS: usize = 16; +const MAX_PAGINATION_LIMIT: usize = 5; const CHAIN_GENESIS: [u8; 32] = [0; 32]; const INVALID_HASH: [u8; 32] = [1; 32]; const KEY: &[u8] = b":mock"; const VALUE: &[u8] = b"hello world"; const CHILD_STORAGE_KEY: &[u8] = b"child"; const CHILD_VALUE: &[u8] = b"child value"; +const DOES_NOT_PRODUCE_EVENTS_SECONDS: u64 = 10; async fn get_next_event(sub: &mut RpcSubscription) -> T { let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next()) @@ -53,6 +60,13 @@ async fn get_next_event(sub: &mut RpcSubscriptio event } +async fn does_not_produce_event( + sub: &mut RpcSubscription, + duration: std::time::Duration, +) { + tokio::time::timeout(duration, sub.next::()).await.unwrap_err(); +} + async fn run_with_timeout(future: F) -> ::Output { tokio::time::timeout(std::time::Duration::from_secs(60 * 10), future) .await @@ -84,6 +98,7 @@ async fn setup_api() -> ( global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -127,6 +142,7 @@ async fn follow_subscription_produces_blocks() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -188,6 +204,7 @@ async fn follow_with_runtime() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -299,6 +316,7 @@ async fn get_genesis() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -508,6 +526,7 @@ async fn call_runtime_without_flag() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -743,11 +762,16 @@ async fn get_storage_multi_query_iter() { assert_matches!( get_next_event::>(&mut block_sub).await, FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && - res.items.len() == 2 && + res.items.len() == 1 && + res.items[0].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && res.items[0].key == key && - res.items[1].key == key && - res.items[0].result == StorageResultType::Hash(expected_hash) && - res.items[1].result == StorageResultType::Value(expected_value) + res.items[0].result == StorageResultType::Value(expected_value) ); assert_matches!( get_next_event::>(&mut block_sub).await, @@ -788,11 +812,16 @@ async fn get_storage_multi_query_iter() { assert_matches!( get_next_event::>(&mut block_sub).await, FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && - res.items.len() == 2 && + res.items.len() == 1 && res.items[0].key == key && - res.items[1].key == key && - res.items[0].result == StorageResultType::Hash(expected_hash) && - res.items[1].result == StorageResultType::Value(expected_value) + res.items[0].result == StorageResultType::Hash(expected_hash) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && + res.items[0].result == StorageResultType::Value(expected_value) ); assert_matches!( get_next_event::>(&mut block_sub).await, @@ -1137,6 +1166,7 @@ async fn separate_operation_ids_for_subscriptions() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1217,6 +1247,7 @@ async fn follow_generates_initial_blocks() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1348,6 +1379,7 @@ async fn follow_exceeding_pinned_blocks() { global_max_pinned_blocks: 2, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1402,6 +1434,7 @@ async fn follow_with_unpin() { global_max_pinned_blocks: 2, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1486,6 +1519,7 @@ async fn follow_prune_best_block() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1646,6 +1680,7 @@ async fn follow_forks_pruned_block() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1763,6 +1798,7 @@ async fn follow_report_multiple_pruned_block() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1971,6 +2007,7 @@ async fn pin_block_references() { global_max_pinned_blocks: 3, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -2084,6 +2121,7 @@ async fn follow_finalized_before_new_block() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -2184,6 +2222,7 @@ async fn ensure_operation_limits_works() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: 1, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -2259,3 +2298,414 @@ async fn ensure_operation_limits_works() { FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000" ); } + +#[tokio::test] +async fn check_continue_operation() { + let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); + let builder = TestClientBuilder::new().add_extra_child_storage( + &child_info, + KEY.to_vec(), + CHILD_VALUE.to_vec(), + ); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + // Configure the chainHead with maximum 1 item before asking for pagination. + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: 1, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + // Import a new block with storage changes. + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(b":m".to_vec(), Some(b"a".to_vec())).unwrap(); + builder.push_storage_change(b":mo".to_vec(), Some(b"ab".to_vec())).unwrap(); + builder.push_storage_change(b":moc".to_vec(), Some(b"abc".to_vec())).unwrap(); + builder.push_storage_change(b":mock".to_vec(), Some(b"abcd".to_vec())).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + let invalid_hash = hex_string(&INVALID_HASH); + + // Invalid subscription ID must produce no results. + let _res: () = api + .call("chainHead_unstable_continue", ["invalid_sub_id", &invalid_hash]) + .await + .unwrap(); + + // Invalid operation ID must produce no results. + let _res: () = api.call("chainHead_unstable_continue", [&sub_id, &invalid_hash]).await.unwrap(); + + // Valid call with storage at the key. + let response: MethodResponse = api + .call( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![StorageQuery { + key: hex_string(b":m"), + query_type: StorageQueryType::DescendantsValues + }] + ], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":m") && + res.items[0].result == StorageResultType::Value(hex_string(b"a")) + ); + + // Pagination event. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id + ); + + does_not_produce_event::>( + &mut sub, + std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), + ) + .await; + let _res: () = api.call("chainHead_unstable_continue", [&sub_id, &operation_id]).await.unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":mo") && + res.items[0].result == StorageResultType::Value(hex_string(b"ab")) + ); + + // Pagination event. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id + ); + + does_not_produce_event::>( + &mut sub, + std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), + ) + .await; + let _res: () = api.call("chainHead_unstable_continue", [&sub_id, &operation_id]).await.unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":moc") && + res.items[0].result == StorageResultType::Value(hex_string(b"abc")) + ); + + // Pagination event. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id + ); + does_not_produce_event::>( + &mut sub, + std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), + ) + .await; + let _res: () = api.call("chainHead_unstable_continue", [&sub_id, &operation_id]).await.unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":mock") && + res.items[0].result == StorageResultType::Value(hex_string(b"abcd")) + ); + + // Finished. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); +} + +#[tokio::test] +async fn stop_storage_operation() { + let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); + let builder = TestClientBuilder::new().add_extra_child_storage( + &child_info, + KEY.to_vec(), + CHILD_VALUE.to_vec(), + ); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + // Configure the chainHead with maximum 1 item before asking for pagination. + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: 1, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + // Import a new block with storage changes. + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(b":m".to_vec(), Some(b"a".to_vec())).unwrap(); + builder.push_storage_change(b":mo".to_vec(), Some(b"ab".to_vec())).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + let invalid_hash = hex_string(&INVALID_HASH); + + // Invalid subscription ID must produce no results. + let _res: () = api + .call("chainHead_unstable_stopOperation", ["invalid_sub_id", &invalid_hash]) + .await + .unwrap(); + + // Invalid operation ID must produce no results. + let _res: () = api + .call("chainHead_unstable_stopOperation", [&sub_id, &invalid_hash]) + .await + .unwrap(); + + // Valid call with storage at the key. + let response: MethodResponse = api + .call( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![StorageQuery { + key: hex_string(b":m"), + query_type: StorageQueryType::DescendantsValues + }] + ], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":m") && + res.items[0].result == StorageResultType::Value(hex_string(b"a")) + ); + + // Pagination event. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id + ); + + // Stop the operation. + let _res: () = api + .call("chainHead_unstable_stopOperation", [&sub_id, &operation_id]) + .await + .unwrap(); + + does_not_produce_event::>( + &mut sub, + std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), + ) + .await; +} + +#[tokio::test] +async fn storage_closest_merkle_value() { + let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); + let builder = TestClientBuilder::new().add_extra_child_storage( + &child_info, + KEY.to_vec(), + CHILD_VALUE.to_vec(), + ); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + // Import a new block with storage changes. + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(b":a".to_vec(), Some(b"a".to_vec())).unwrap(); + builder.push_storage_change(b":aa".to_vec(), Some(b"aa".to_vec())).unwrap(); + builder.push_storage_change(b":aaa".to_vec(), Some(b"aaa".to_vec())).unwrap(); + builder.push_storage_change(b":ab".to_vec(), Some(b"ab".to_vec())).unwrap(); + builder.push_storage_change(b":b".to_vec(), Some(b"b".to_vec())).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Valid call with storage at the keys. + let response: MethodResponse = api + .call( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![ + StorageQuery { + key: hex_string(b":a"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + StorageQuery { + key: hex_string(b":aa"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + StorageQuery { + key: hex_string(b":aaa"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + StorageQuery { + key: hex_string(b":ab"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + StorageQuery { + key: hex_string(b":b"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + // // Key not existant, the partial key is present however. + // StorageQuery { + // key: hex_string(b":aac"), + // query_type: StorageQueryType::ClosestDescendantMerkleValue + // }, + // Key not existant, the partial key is present however. + StorageQuery { + key: hex_string(b":abc"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + ] + ], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let event = get_next_event::>(&mut sub).await; + let merkle_values: HashMap<_, _> = match event { + FollowEvent::OperationStorageItems(res) => { + assert_eq!(res.operation_id, operation_id); + assert_eq!(res.items.len(), 6); + + res.items + .into_iter() + .map(|res| { + let value = match res.result { + StorageResultType::ClosestDescendantMerkleValue(value) => value, + _ => panic!("Unexpected StorageResultType"), + }; + (res.key, value) + }) + .collect() + }, + _ => panic!("Expected OperationStorageItems event"), + }; + + // Finished. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); + + // assert_eq!( + // merkle_values.get(&hex_string(b":aac")).unwrap(), + // merkle_values.get(&hex_string(b":aa")).unwrap() + // ); + + assert_eq!( + merkle_values.get(&hex_string(b":abc")).unwrap(), + merkle_values.get(&hex_string(b":ab")).unwrap() + ); +} diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index d0a46ab2c0118..931ad8140d60b 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1555,6 +1555,27 @@ where .child_storage_hash(child_info, &key.0) .map_err(|e| sp_blockchain::Error::from_state(Box::new(e))) } + + fn closest_merkle_value( + &self, + hash: ::Hash, + key: &StorageKey, + ) -> blockchain::Result::Hash>> { + self.state_at(hash)? + .closest_merkle_value(&key.0) + .map_err(|e| sp_blockchain::Error::from_state(Box::new(e))) + } + + fn child_closest_merkle_value( + &self, + hash: ::Hash, + child_info: &ChildInfo, + key: &StorageKey, + ) -> blockchain::Result::Hash>> { + self.state_at(hash)? + .child_closest_merkle_value(child_info, &key.0) + .map_err(|e| sp_blockchain::Error::from_state(Box::new(e))) + } } impl HeaderMetadata for Client diff --git a/primitives/api/Cargo.toml b/primitives/api/Cargo.toml index 2f0fe5d5d93cb..1c6a98ee2c734 100644 --- a/primitives/api/Cargo.toml +++ b/primitives/api/Cargo.toml @@ -22,7 +22,7 @@ sp-externalities = { version = "0.19.0", default-features = false, optional = tr sp-version = { version = "22.0.0", default-features = false, path = "../version" } sp-state-machine = { version = "0.28.0", default-features = false, optional = true, path = "../state-machine" } sp-trie = { version = "22.0.0", default-features = false, optional = true, path = "../trie" } -hash-db = { version = "0.16.0", optional = true } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value" , optional = true } thiserror = { version = "1.0.30", optional = true } scale-info = { version = "2.1.1", default-features = false, features = ["derive"] } sp-metadata-ir = { version = "0.1.0", default-features = false, optional = true, path = "../metadata-ir" } diff --git a/primitives/core/Cargo.toml b/primitives/core/Cargo.toml index ee4bf8924186c..9cd638cdc1344 100644 --- a/primitives/core/Cargo.toml +++ b/primitives/core/Cargo.toml @@ -21,7 +21,7 @@ serde = { version = "1.0.163", optional = true, default-features = false, featu bounded-collections = { version = "0.1.8", default-features = false } primitive-types = { version = "0.12.0", default-features = false, features = ["codec", "scale-info"] } impl-serde = { version = "0.4.0", default-features = false, optional = true } -hash-db = { version = "0.16.0", default-features = false } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } hash256-std-hasher = { version = "0.15.2", default-features = false } bs58 = { version = "0.4.0", default-features = false, optional = true } rand = { version = "0.8.5", features = ["small_rng"], optional = true } diff --git a/primitives/state-machine/Cargo.toml b/primitives/state-machine/Cargo.toml index 32be8e518f49f..37879a7e46dec 100644 --- a/primitives/state-machine/Cargo.toml +++ b/primitives/state-machine/Cargo.toml @@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] codec = { package = "parity-scale-codec", version = "3.6.1", default-features = false } -hash-db = { version = "0.16.0", default-features = false } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } log = { version = "0.4.17", default-features = false } parking_lot = { version = "0.12.1", optional = true } rand = { version = "0.8.5", optional = true } @@ -27,7 +27,7 @@ sp-externalities = { version = "0.19.0", default-features = false, path = "../ex sp-panic-handler = { version = "8.0.0", optional = true, path = "../panic-handler" } sp-std = { version = "8.0.0", default-features = false, path = "../std" } sp-trie = { version = "22.0.0", default-features = false, path = "../trie" } -trie-db = { version = "0.27.1", default-features = false } +trie-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } [dev-dependencies] array-bytes = "6.1" diff --git a/primitives/state-machine/src/backend.rs b/primitives/state-machine/src/backend.rs index f3244308a54cf..b4e878aa75ea2 100644 --- a/primitives/state-machine/src/backend.rs +++ b/primitives/state-machine/src/backend.rs @@ -191,7 +191,17 @@ pub trait Backend: sp_std::fmt::Debug { /// Get keyed storage value hash or None if there is nothing associated. fn storage_hash(&self, key: &[u8]) -> Result, Self::Error>; - /// Get keyed child storage or None if there is nothing associated. + /// Get the merkle value or None if there is nothing associated. + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error>; + + /// Get the child merkle value or None if there is nothing associated. + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error>; + + /// Get child keyed child storage or None if there is nothing associated. fn child_storage( &self, child_info: &ChildInfo, diff --git a/primitives/state-machine/src/trie_backend.rs b/primitives/state-machine/src/trie_backend.rs index b7940fa8c39df..5bc22be14e337 100644 --- a/primitives/state-machine/src/trie_backend.rs +++ b/primitives/state-machine/src/trie_backend.rs @@ -405,6 +405,18 @@ where self.essence.child_storage(child_info, key) } + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error> { + self.essence.closest_merkle_value(key) + } + + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + self.essence.child_closest_merkle_value(child_info, key) + } + fn next_storage_key(&self, key: &[u8]) -> Result, Self::Error> { let (is_cached, mut cache) = access_cache(&self.next_storage_key_cache, Option::take) .map(|cache| (cache.last_key == key, cache)) diff --git a/primitives/state-machine/src/trie_backend_essence.rs b/primitives/state-machine/src/trie_backend_essence.rs index 22c76b56deb05..556a860f97520 100644 --- a/primitives/state-machine/src/trie_backend_essence.rs +++ b/primitives/state-machine/src/trie_backend_essence.rs @@ -32,8 +32,9 @@ use sp_std::{boxed::Box, marker::PhantomData, vec::Vec}; #[cfg(feature = "std")] use sp_trie::recorder::Recorder; use sp_trie::{ - child_delta_trie_root, delta_trie_root, empty_child_trie_root, read_child_trie_hash, - read_child_trie_value, read_trie_value, + child_delta_trie_root, delta_trie_root, empty_child_trie_root, + read_child_trie_closest_merkle_value, read_child_trie_hash, read_child_trie_value, + read_trie_closest_merkle_value, read_trie_value, trie_types::{TrieDBBuilder, TrieError}, DBValue, KeySpacedDB, NodeCodec, Trie, TrieCache, TrieDBRawIterator, TrieRecorder, }; @@ -527,10 +528,7 @@ where /// Returns the hash value pub fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Result> { - let child_root = match self.child_root(child_info)? { - Some(root) => root, - None => return Ok(None), - }; + let Some(child_root) = self.child_root(child_info)? else { return Ok(None) }; let map_e = |e| format!("Trie lookup error: {}", e); @@ -553,10 +551,7 @@ where child_info: &ChildInfo, key: &[u8], ) -> Result> { - let child_root = match self.child_root(child_info)? { - Some(root) => root, - None => return Ok(None), - }; + let Some(child_root) = self.child_root(child_info)? else { return Ok(None) }; let map_e = |e| format!("Trie lookup error: {}", e); @@ -573,6 +568,39 @@ where }) } + /// Get the closest merkle value at given key. + pub fn closest_merkle_value(&self, key: &[u8]) -> Result> { + let map_e = |e| format!("Trie lookup error: {}", e); + + self.with_recorder_and_cache(None, |recorder, cache| { + read_trie_closest_merkle_value::, _>(self, &self.root, key, recorder, cache) + .map_err(map_e) + }) + } + + /// Get the child closest merkle value at given key. + pub fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result> { + let Some(child_root) = self.child_root(child_info)? else { return Ok(None) }; + + let map_e = |e| format!("Trie lookup error: {}", e); + + self.with_recorder_and_cache(Some(child_root), |recorder, cache| { + read_child_trie_closest_merkle_value::, _>( + child_info.keyspace(), + self, + &child_root, + key, + recorder, + cache, + ) + .map_err(map_e) + }) + } + /// Create a raw iterator over the storage. pub fn raw_iter(&self, args: IterArgs) -> Result> { let root = if let Some(child_info) = args.child_info.as_ref() { diff --git a/primitives/trie/Cargo.toml b/primitives/trie/Cargo.toml index 546d6786fc632..6e314478d9e85 100644 --- a/primitives/trie/Cargo.toml +++ b/primitives/trie/Cargo.toml @@ -21,16 +21,16 @@ harness = false ahash = { version = "0.8.2", optional = true } codec = { package = "parity-scale-codec", version = "3.6.1", default-features = false } hashbrown = { version = "0.13.2", optional = true } -hash-db = { version = "0.16.0", default-features = false } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } lazy_static = { version = "1.4.0", optional = true } -memory-db = { version = "0.32.0", default-features = false } +memory-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } nohash-hasher = { version = "0.2.0", optional = true } parking_lot = { version = "0.12.1", optional = true } scale-info = { version = "2.5.0", default-features = false, features = ["derive"] } thiserror = { version = "1.0.30", optional = true } tracing = { version = "0.1.29", optional = true } -trie-db = { version = "0.27.0", default-features = false } -trie-root = { version = "0.18.0", default-features = false } +trie-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } +trie-root = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } sp-core = { version = "21.0.0", default-features = false, path = "../core" } sp-std = { version = "8.0.0", default-features = false, path = "../std" } schnellru = { version = "0.2.1", optional = true } diff --git a/primitives/trie/src/lib.rs b/primitives/trie/src/lib.rs index 94155458569bf..4700d3bf8a412 100644 --- a/primitives/trie/src/lib.rs +++ b/primitives/trie/src/lib.rs @@ -295,6 +295,24 @@ pub fn read_trie_value( + db: &DB, + root: &TrieHash, + key: &[u8], + recorder: Option<&mut dyn TrieRecorder>>, + cache: Option<&mut dyn TrieCache>, +) -> Result>, Box>> +where + DB: hash_db::HashDBRef, +{ + TrieDBBuilder::::new(db, root) + .with_optional_cache(cache) + .with_optional_recorder(recorder) + .build() + .get_closest_merkle_value(key) +} + /// Read a value from the trie with given Query. pub fn read_trie_value_with< L: TrieLayout, @@ -397,6 +415,26 @@ where .get_hash(key) } +/// Read the closest merkle value from the child trie. +pub fn read_child_trie_closest_merkle_value( + keyspace: &[u8], + db: &DB, + root: &TrieHash, + key: &[u8], + recorder: Option<&mut dyn TrieRecorder>>, + cache: Option<&mut dyn TrieCache>, +) -> Result>, Box>> +where + DB: hash_db::HashDBRef, +{ + let db = KeySpacedDB::new(db, keyspace); + TrieDBBuilder::::new(&db, &root) + .with_optional_recorder(recorder) + .with_optional_cache(cache) + .build() + .get_closest_merkle_value(key) +} + /// Read a value from the child trie with given query. pub fn read_child_trie_value_with( keyspace: &[u8], diff --git a/test-utils/runtime/Cargo.toml b/test-utils/runtime/Cargo.toml index 320d0e07deceb..ab102b1e037d3 100644 --- a/test-utils/runtime/Cargo.toml +++ b/test-utils/runtime/Cargo.toml @@ -40,7 +40,7 @@ pallet-timestamp = { version = "4.0.0-dev", default-features = false, path = ".. sp-consensus-grandpa = { version = "4.0.0-dev", default-features = false, path = "../../primitives/consensus/grandpa", features = ["serde"] } sp-trie = { version = "22.0.0", default-features = false, path = "../../primitives/trie" } sp-transaction-pool = { version = "4.0.0-dev", default-features = false, path = "../../primitives/transaction-pool" } -trie-db = { version = "0.27.0", default-features = false } +trie-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } sc-service = { version = "0.10.0-dev", default-features = false, optional = true, features = ["test-helpers"], path = "../../client/service" } sp-state-machine = { version = "0.28.0", default-features = false, path = "../../primitives/state-machine" } sp-externalities = { version = "0.19.0", default-features = false, path = "../../primitives/externalities" } diff --git a/utils/binary-merkle-tree/Cargo.toml b/utils/binary-merkle-tree/Cargo.toml index 4b7b9e53ef872..cba832e3051ad 100644 --- a/utils/binary-merkle-tree/Cargo.toml +++ b/utils/binary-merkle-tree/Cargo.toml @@ -11,7 +11,7 @@ homepage = "https://substrate.io" [dependencies] array-bytes = { version = "6.1", optional = true } log = { version = "0.4", default-features = false, optional = true } -hash-db = { version = "0.16.0", default-features = false } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } [dev-dependencies] array-bytes = "6.1" diff --git a/utils/frame/rpc/state-trie-migration-rpc/Cargo.toml b/utils/frame/rpc/state-trie-migration-rpc/Cargo.toml index 9eee52aacba76..a87df85ad8624 100644 --- a/utils/frame/rpc/state-trie-migration-rpc/Cargo.toml +++ b/utils/frame/rpc/state-trie-migration-rpc/Cargo.toml @@ -19,7 +19,7 @@ serde = { version = "1", features = ["derive"] } sp-core = { path = "../../../../primitives/core" } sp-state-machine = { path = "../../../../primitives/state-machine" } sp-trie = { path = "../../../../primitives/trie" } -trie-db = "0.27.0" +trie-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } jsonrpsee = { version = "0.16.2", features = ["client-core", "server", "macros"] }