From 7f3526c4cf93a64e67030b438a4967397b9483de Mon Sep 17 00:00:00 2001 From: officeyutong Date: Mon, 25 Aug 2025 17:24:24 +0800 Subject: [PATCH 1/7] try fixing the issue --- wasm/light-client-wasm/src/lib.rs | 261 +++++++++++++++++------------- 1 file changed, 147 insertions(+), 114 deletions(-) diff --git a/wasm/light-client-wasm/src/lib.rs b/wasm/light-client-wasm/src/lib.rs index 1f68ecbe..5082cb46 100644 --- a/wasm/light-client-wasm/src/lib.rs +++ b/wasm/light-client-wasm/src/lib.rs @@ -592,143 +592,176 @@ pub fn get_cells( let storage = STORAGE_WITH_DATA.get().unwrap().storage(); - let kvs: Vec<_> = storage.collect_iterator( - from_key, - direction, - Box::new(move |key| key.starts_with(&prefix)), - limit, - skip, - ); + let mut internal_limit: usize = limit; - let mut cells = Vec::new(); - let mut last_key = Vec::new(); - for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { - debug!("get cells iterator at {:?} {:?}", key, value); - let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); - let output_index = u32::from_be_bytes( - key[key.len() - 4..] - .try_into() - .expect("stored output_index"), - ); - let tx_index = u32::from_be_bytes( - key[key.len() - 8..key.len() - 4] - .try_into() - .expect("stored tx_index"), - ); - let block_number = u64::from_be_bytes( - key[key.len() - 16..key.len() - 8] - .try_into() - .expect("stored block_number"), + // The new implementation iterates to try "limit * 2^0", "limit * 2^1", "limit * 2^2".. (until meets n such that limit * 2^n > u32::MAX) as the limit passed to storage.collect_iterator and finds result to return from what `collect_iterator` returns. + // Since this is a light client, we can assume that storage.collect_iterator won't return too much data + // If internal_limit reaches u32::MAX and we still can't get any cells from filtered results, we can assume that no cells will be found. + loop { + let prefix_cloned = prefix.clone(); + let kvs: Vec<_> = storage.collect_iterator( + from_key.clone(), + direction, + Box::new(move |key| { + if !key.starts_with(&prefix_cloned) { + return false; + } + + return true; + }), + internal_limit, + skip, ); - let tx = packed::Transaction::from_slice( - &storage - .get(Key::TxHash(&tx_hash).into_vec()) - .unwrap() - .expect("stored tx")[12..], - ) - .expect("from stored tx slice should be OK"); - let output = tx - .raw() - .outputs() - .get(output_index as usize) - .expect("get output by index should be OK"); - let output_data = tx - .raw() - .outputs_data() - .get(output_index as usize) - .expect("get output data by index should be OK"); + let mut cells = Vec::new(); + let mut last_key = Vec::new(); + for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { + debug!("get cells iterator at {:?} {:?}", key, value); + let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); + let output_index = u32::from_be_bytes( + key[key.len() - 4..] + .try_into() + .expect("stored output_index"), + ); + let tx_index = u32::from_be_bytes( + key[key.len() - 8..key.len() - 4] + .try_into() + .expect("stored tx_index"), + ); + let block_number = u64::from_be_bytes( + key[key.len() - 16..key.len() - 8] + .try_into() + .expect("stored block_number"), + ); - if let Some(prefix) = filter_prefix.as_ref() { - match filter_script_type { - ScriptType::Lock => { - if !extract_raw_data(&output.lock()) - .as_slice() - .starts_with(prefix) - { - debug!("skipped at {}", line!()); - continue; - } - } - ScriptType::Type => { - if output.type_().is_none() - || !extract_raw_data(&output.type_().to_opt().unwrap()) + let tx = packed::Transaction::from_slice( + &storage + .get(Key::TxHash(&tx_hash).into_vec()) + .unwrap() + .expect("stored tx")[12..], + ) + .expect("from stored tx slice should be OK"); + let output = tx + .raw() + .outputs() + .get(output_index as usize) + .expect("get output by index should be OK"); + let output_data = tx + .raw() + .outputs_data() + .get(output_index as usize) + .expect("get output data by index should be OK"); + + if let Some(prefix) = filter_prefix.as_ref() { + match filter_script_type { + ScriptType::Lock => { + if !extract_raw_data(&output.lock()) .as_slice() .starts_with(prefix) - { - debug!("skipped at {}", line!()); - continue; + { + debug!("skipped at {}", line!()); + continue; + } + } + ScriptType::Type => { + if output.type_().is_none() + || !extract_raw_data(&output.type_().to_opt().unwrap()) + .as_slice() + .starts_with(prefix) + { + debug!("skipped at {}", line!()); + continue; + } } } } - } - if let Some([r0, r1]) = filter_script_len_range { - match filter_script_type { - ScriptType::Lock => { - let script_len = extract_raw_data(&output.lock()).len(); - if script_len < r0 || script_len > r1 { - debug!("skipped at {}", line!()); - continue; + if let Some([r0, r1]) = filter_script_len_range { + match filter_script_type { + ScriptType::Lock => { + let script_len = extract_raw_data(&output.lock()).len(); + if script_len < r0 || script_len > r1 { + debug!("skipped at {}", line!()); + continue; + } } - } - ScriptType::Type => { - let script_len = output - .type_() - .to_opt() - .map(|script| extract_raw_data(&script).len()) - .unwrap_or_default(); - if script_len < r0 || script_len > r1 { - debug!("skipped at {}", line!()); - continue; + ScriptType::Type => { + let script_len = output + .type_() + .to_opt() + .map(|script| extract_raw_data(&script).len()) + .unwrap_or_default(); + if script_len < r0 || script_len > r1 { + debug!("skipped at {}", line!()); + continue; + } } } } - } - if let Some([r0, r1]) = filter_output_data_len_range { - if output_data.len() < r0 || output_data.len() >= r1 { - debug!("skipped at {}", line!()); - continue; + if let Some([r0, r1]) = filter_output_data_len_range { + if output_data.len() < r0 || output_data.len() >= r1 { + debug!("skipped at {}", line!()); + continue; + } } - } - if let Some([r0, r1]) = filter_output_capacity_range { - let capacity: core::Capacity = output.capacity().unpack(); - if capacity < r0 || capacity >= r1 { - debug!("skipped at {}", line!()); - continue; + if let Some([r0, r1]) = filter_output_capacity_range { + let capacity: core::Capacity = output.capacity().unpack(); + if capacity < r0 || capacity >= r1 { + debug!("skipped at {}", line!()); + continue; + } } - } - if let Some([r0, r1]) = filter_block_range { - if block_number < r0 || block_number >= r1 { - debug!("skipped at {}", line!()); - continue; + if let Some([r0, r1]) = filter_block_range { + if block_number < r0 || block_number >= r1 { + debug!("skipped at {}", line!()); + continue; + } + } + + last_key = key.to_vec(); + let cell_to_push = Cell { + output: output.into(), + output_data: if with_data { + Some(output_data.into()) + } else { + None + }, + out_point: packed::OutPoint::new(tx_hash, output_index).into(), + block_number: block_number.into(), + tx_index: tx_index.into(), + }; + debug!("pushed cell {:#?}", cell_to_push); + cells.push(cell_to_push); + if cells.len() >= limit { + break; } } + debug!("get_cells last_key={:?}", last_key); - last_key = key.to_vec(); - let cell_to_push = Cell { - output: output.into(), - output_data: if with_data { - Some(output_data.into()) - } else { - None - }, - out_point: packed::OutPoint::new(tx_hash, output_index).into(), - block_number: block_number.into(), - tx_index: tx_index.into(), - }; - debug!("pushed cell {:#?}", cell_to_push); - cells.push(cell_to_push); + if cells.len() > 0 { + return Ok((Pagination { + objects: cells, + last_cursor: JsonBytes::from_vec(last_key), + }) + .serialize(&SERIALIZER)?); + } else { + internal_limit *= 2; + if internal_limit > u32::MAX as usize { + debug!( + "Internal limit is now greater than {}, assuming no data is found", + u32::MAX + ); + return Ok((Pagination { + objects: Vec::::default(), + last_cursor: JsonBytes::from_vec(vec![]), + }) + .serialize(&SERIALIZER)?); + } + } } - debug!("get_cells last_key={:?}", last_key); - Ok((Pagination { - objects: cells, - last_cursor: JsonBytes::from_vec(last_key), - }) - .serialize(&SERIALIZER)?) } #[wasm_bindgen] From a3800716b76815e2789f389734214f8baf06ab94 Mon Sep 17 00:00:00 2001 From: officeyutong Date: Mon, 25 Aug 2025 17:31:59 +0800 Subject: [PATCH 2/7] fix get_transactions --- wasm/light-client-wasm/src/lib.rs | 441 +++++++++++++++++------------- 1 file changed, 250 insertions(+), 191 deletions(-) diff --git a/wasm/light-client-wasm/src/lib.rs b/wasm/light-client-wasm/src/lib.rs index 5082cb46..cb81735e 100644 --- a/wasm/light-client-wasm/src/lib.rs +++ b/wasm/light-client-wasm/src/lib.rs @@ -595,7 +595,7 @@ pub fn get_cells( let mut internal_limit: usize = limit; // The new implementation iterates to try "limit * 2^0", "limit * 2^1", "limit * 2^2".. (until meets n such that limit * 2^n > u32::MAX) as the limit passed to storage.collect_iterator and finds result to return from what `collect_iterator` returns. - // Since this is a light client, we can assume that storage.collect_iterator won't return too much data + // Since this is a light client, we can assume that storage.collect_iterator won't return too much data // If internal_limit reaches u32::MAX and we still can't get any cells from filtered results, we can assume that no cells will be found. loop { let prefix_cloned = prefix.clone(); @@ -763,7 +763,6 @@ pub fn get_cells( } } } - #[wasm_bindgen] pub fn get_transactions( search_key: JsValue, @@ -774,6 +773,10 @@ pub fn get_transactions( if !status(0b1) { return Err(JsValue::from_str("light client not on start state")); } + debug!( + "Calling get_transactions with {:?}, {:?}, {:?}, {:?}", + search_key, order, limit, after_cursor + ); let search_key: SearchKey = serde_wasm_bindgen::from_value(search_key)?; let (prefix, from_key, direction, skip) = build_query_options( @@ -816,26 +819,181 @@ pub fn get_transactions( let storage = STORAGE_WITH_DATA.get().unwrap().storage(); if search_key.group_by_transaction.unwrap_or_default() { - let prefix_cloned = prefix.clone(); - let mut kvs: Vec<_> = storage.collect_iterator( - from_key, - direction, - Box::new(move |key| key.starts_with(&prefix_cloned)), - 100, - skip, - ); - let mut tx_with_cells: Vec = Vec::new(); - let mut last_key = Vec::new(); + let mut internal_limit: usize = limit * 10; // Start with higher limit for grouped mode - 'outer: while !kvs.is_empty() { - for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { - let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); - if tx_with_cells.len() == limit - && tx_with_cells.last_mut().unwrap().transaction.hash != tx_hash.unpack() - { + loop { + let prefix_cloned = prefix.clone(); + let mut kvs: Vec<_> = storage.collect_iterator( + from_key.clone(), + direction, + Box::new(move |key| key.starts_with(&prefix_cloned)), + internal_limit, + skip, + ); + let mut tx_with_cells: Vec = Vec::new(); + let mut last_key = Vec::new(); + + 'outer: while !kvs.is_empty() { + for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { + let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); + if tx_with_cells.len() == limit + && tx_with_cells.last_mut().unwrap().transaction.hash != tx_hash.unpack() + { + break 'outer; + } + last_key = key.to_vec(); + let tx = packed::Transaction::from_slice( + &storage + .get(Key::TxHash(&tx_hash).into_vec()) + .expect("get tx should be OK") + .expect("stored tx")[12..], + ) + .expect("from stored tx slice should be OK"); + + let block_number = u64::from_be_bytes( + key[key.len() - 17..key.len() - 9] + .try_into() + .expect("stored block_number"), + ); + let tx_index = u32::from_be_bytes( + key[key.len() - 9..key.len() - 5] + .try_into() + .expect("stored tx_index"), + ); + let io_index = u32::from_be_bytes( + key[key.len() - 5..key.len() - 1] + .try_into() + .expect("stored io_index"), + ); + let io_type = if *key.last().expect("stored io_type") == 0 { + CellType::Input + } else { + CellType::Output + }; + + if let Some(filter_script) = filter_script.as_ref() { + let filter_script_matched = match filter_script_type { + ScriptType::Lock => storage + .get( + Key::TxLockScript( + filter_script, + block_number, + tx_index, + io_index, + match io_type { + CellType::Input => storage::CellType::Input, + CellType::Output => storage::CellType::Output, + }, + ) + .into_vec(), + ) + .expect("get TxLockScript should be OK") + .is_some(), + ScriptType::Type => storage + .get( + Key::TxTypeScript( + filter_script, + block_number, + tx_index, + io_index, + match io_type { + CellType::Input => storage::CellType::Input, + CellType::Output => storage::CellType::Output, + }, + ) + .into_vec(), + ) + .expect("get TxTypeScript should be OK") + .is_some(), + }; + + if !filter_script_matched { + debug!("skipped at {}", line!()); + continue; + } + } + + if let Some([r0, r1]) = filter_block_range { + if block_number < r0 || block_number >= r1 { + debug!("skipped at {}", line!()); + continue; + } + } + + let last_tx_hash_is_same = tx_with_cells + .last_mut() + .map(|last| { + if last.transaction.hash == tx_hash.unpack() { + last.cells.push((io_type.clone(), io_index.into())); + true + } else { + false + } + }) + .unwrap_or_default(); + + if !last_tx_hash_is_same { + tx_with_cells.push(TxWithCells { + transaction: tx.into_view().into(), + block_number: block_number.into(), + tx_index: tx_index.into(), + cells: vec![(io_type, io_index.into())], + }); + } + } + if tx_with_cells.len() >= limit { break 'outer; } - last_key = key.to_vec(); + let prefix_cloned = prefix.clone(); + kvs = storage.collect_iterator( + last_key.clone(), + direction, + Box::new(move |key| key.starts_with(&prefix_cloned)), + 100, + 1, + ); + } + + debug!("get_transactions (grouped) last_key={:?}", last_key); + + if tx_with_cells.len() > 0 { + return Ok((Pagination { + objects: tx_with_cells.into_iter().map(Tx::Grouped).collect(), + last_cursor: JsonBytes::from_vec(last_key), + }) + .serialize(&SERIALIZER)?); + } else { + internal_limit *= 2; + if internal_limit > u32::MAX as usize { + debug!( + "Internal limit is now greater than {}, assuming no data is found", + u32::MAX + ); + return Ok((Pagination { + objects: Vec::::default(), + last_cursor: JsonBytes::from_vec(vec![]), + }) + .serialize(&SERIALIZER)?); + } + } + } + } else { + let mut internal_limit: usize = limit; + + loop { + let prefix_cloned = prefix.clone(); + let kvs: Vec<_> = storage.collect_iterator( + from_key.clone(), + direction, + Box::new(move |key| key.starts_with(&prefix_cloned)), + internal_limit, + skip, + ); + let mut last_key = Vec::new(); + let mut txs = Vec::new(); + for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { + debug!("get transactions iterator at {:?} {:?}", key, value); + let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); let tx = packed::Transaction::from_slice( &storage .get(Key::TxHash(&tx_hash).into_vec()) @@ -866,196 +1024,97 @@ pub fn get_transactions( }; if let Some(filter_script) = filter_script.as_ref() { - let filter_script_matched = match filter_script_type { - ScriptType::Lock => storage - .get( - Key::TxLockScript( - filter_script, - block_number, - tx_index, - io_index, - match io_type { - CellType::Input => storage::CellType::Input, - CellType::Output => storage::CellType::Output, - }, + match filter_script_type { + ScriptType::Lock => { + if storage + .get( + Key::TxLockScript( + filter_script, + block_number, + tx_index, + io_index, + match io_type { + CellType::Input => storage::CellType::Input, + CellType::Output => storage::CellType::Output, + }, + ) + .into_vec(), ) - .into_vec(), - ) - .expect("get TxLockScript should be OK") - .is_some(), - ScriptType::Type => storage - .get( - Key::TxTypeScript( - filter_script, - block_number, - tx_index, - io_index, - match io_type { - CellType::Input => storage::CellType::Input, - CellType::Output => storage::CellType::Output, - }, + .expect("get TxLockScript should be OK") + .is_none() + { + debug!("skipped at {}", line!()); + continue; + }; + } + ScriptType::Type => { + if storage + .get( + Key::TxTypeScript( + filter_script, + block_number, + tx_index, + io_index, + match io_type { + CellType::Input => storage::CellType::Input, + CellType::Output => storage::CellType::Output, + }, + ) + .into_vec(), ) - .into_vec(), - ) - .expect("get TxTypeScript should be OK") - .is_some(), - }; - - if !filter_script_matched { - continue; + .expect("get TxTypeScript should be OK") + .is_none() + { + debug!("skipped at {}", line!()); + continue; + }; + } } } if let Some([r0, r1]) = filter_block_range { if block_number < r0 || block_number >= r1 { + debug!("skipped at {}", line!()); continue; } } - let last_tx_hash_is_same = tx_with_cells - .last_mut() - .map(|last| { - if last.transaction.hash == tx_hash.unpack() { - last.cells.push((io_type.clone(), io_index.into())); - true - } else { - false - } - }) - .unwrap_or_default(); - - if !last_tx_hash_is_same { - tx_with_cells.push(TxWithCells { - transaction: tx.into_view().into(), - block_number: block_number.into(), - tx_index: tx_index.into(), - cells: vec![(io_type, io_index.into())], - }); + last_key = key.to_vec(); + let tx_to_push = Tx::Ungrouped(TxWithCell { + transaction: tx.into_view().into(), + block_number: block_number.into(), + tx_index: tx_index.into(), + io_index: io_index.into(), + io_type, + }); + txs.push(tx_to_push); + if txs.len() >= limit { + break; } } - let prefix_cloned = prefix.clone(); - kvs = storage.collect_iterator( - last_key.clone(), - direction, - Box::new(move |key| key.starts_with(&prefix_cloned)), - 100, - 1, - ); - } - Ok((Pagination { - objects: tx_with_cells.into_iter().map(Tx::Grouped).collect(), - last_cursor: JsonBytes::from_vec(last_key), - }) - .serialize(&SERIALIZER)?) - } else { - let kvs: Vec<_> = storage.collect_iterator( - from_key, - direction, - Box::new(move |key| key.starts_with(&prefix)), - limit, - skip, - ); - let mut last_key = Vec::new(); - let mut txs = Vec::new(); - for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { - let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); - let tx = packed::Transaction::from_slice( - &storage - .get(Key::TxHash(&tx_hash).into_vec()) - .expect("get tx should be OK") - .expect("stored tx")[12..], - ) - .expect("from stored tx slice should be OK"); + debug!("get_transactions last_key={:?}", last_key); - let block_number = u64::from_be_bytes( - key[key.len() - 17..key.len() - 9] - .try_into() - .expect("stored block_number"), - ); - let tx_index = u32::from_be_bytes( - key[key.len() - 9..key.len() - 5] - .try_into() - .expect("stored tx_index"), - ); - let io_index = u32::from_be_bytes( - key[key.len() - 5..key.len() - 1] - .try_into() - .expect("stored io_index"), - ); - let io_type = if *key.last().expect("stored io_type") == 0 { - CellType::Input + if txs.len() > 0 { + return Ok((Pagination { + objects: txs, + last_cursor: JsonBytes::from_vec(last_key), + }) + .serialize(&SERIALIZER)?); } else { - CellType::Output - }; - - if let Some(filter_script) = filter_script.as_ref() { - match filter_script_type { - ScriptType::Lock => { - if storage - .get( - Key::TxLockScript( - filter_script, - block_number, - tx_index, - io_index, - match io_type { - CellType::Input => storage::CellType::Input, - CellType::Output => storage::CellType::Output, - }, - ) - .into_vec(), - ) - .expect("get TxLockScript should be OK") - .is_none() - { - continue; - }; - } - ScriptType::Type => { - if storage - .get( - Key::TxTypeScript( - filter_script, - block_number, - tx_index, - io_index, - match io_type { - CellType::Input => storage::CellType::Input, - CellType::Output => storage::CellType::Output, - }, - ) - .into_vec(), - ) - .expect("get TxTypeScript should be OK") - .is_none() - { - continue; - }; - } - } - } - - if let Some([r0, r1]) = filter_block_range { - if block_number < r0 || block_number >= r1 { - continue; + internal_limit *= 2; + if internal_limit > u32::MAX as usize { + debug!( + "Internal limit is now greater than {}, assuming no data is found", + u32::MAX + ); + return Ok((Pagination { + objects: Vec::::default(), + last_cursor: JsonBytes::from_vec(vec![]), + }) + .serialize(&SERIALIZER)?); } } - - last_key = key.to_vec(); - txs.push(Tx::Ungrouped(TxWithCell { - transaction: tx.into_view().into(), - block_number: block_number.into(), - tx_index: tx_index.into(), - io_index: io_index.into(), - io_type, - })) } - - Ok((Pagination { - objects: txs, - last_cursor: JsonBytes::from_vec(last_key), - }) - .serialize(&SERIALIZER)?) } } From 192679a94ca4c7b49cbbc38b53f0843e1e81fbfc Mon Sep 17 00:00:00 2001 From: officeyutong Date: Mon, 25 Aug 2025 18:35:31 +0800 Subject: [PATCH 3/7] make clippy happy --- wasm/light-client-wasm/src/lib.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/wasm/light-client-wasm/src/lib.rs b/wasm/light-client-wasm/src/lib.rs index cb81735e..cd14841d 100644 --- a/wasm/light-client-wasm/src/lib.rs +++ b/wasm/light-client-wasm/src/lib.rs @@ -602,13 +602,7 @@ pub fn get_cells( let kvs: Vec<_> = storage.collect_iterator( from_key.clone(), direction, - Box::new(move |key| { - if !key.starts_with(&prefix_cloned) { - return false; - } - - return true; - }), + Box::new(move |key| key.starts_with(&prefix_cloned)), internal_limit, skip, ); @@ -741,7 +735,7 @@ pub fn get_cells( } debug!("get_cells last_key={:?}", last_key); - if cells.len() > 0 { + if !cells.is_empty() { return Ok((Pagination { objects: cells, last_cursor: JsonBytes::from_vec(last_key), @@ -956,7 +950,7 @@ pub fn get_transactions( debug!("get_transactions (grouped) last_key={:?}", last_key); - if tx_with_cells.len() > 0 { + if !tx_with_cells.is_empty() { return Ok((Pagination { objects: tx_with_cells.into_iter().map(Tx::Grouped).collect(), last_cursor: JsonBytes::from_vec(last_key), @@ -1094,7 +1088,7 @@ pub fn get_transactions( } debug!("get_transactions last_key={:?}", last_key); - if txs.len() > 0 { + if !txs.is_empty() { return Ok((Pagination { objects: txs, last_cursor: JsonBytes::from_vec(last_key), From 4958aa6e0a3d4e43a1e3ca832a1a90dad2a76ecf Mon Sep 17 00:00:00 2001 From: officeyutong Date: Tue, 26 Aug 2025 18:53:07 +0800 Subject: [PATCH 4/7] support filter_map on wasm db implementation `collect_iterator` --- light-client-lib/src/storage/db/browser.rs | 137 ++-- wasm/light-client-db-common/src/lib.rs | 9 + wasm/light-client-db-worker/src/db.rs | 91 ++- wasm/light-client-db-worker/src/lib.rs | 132 +++- wasm/light-client-wasm/src/lib.rs | 769 +++++++++++---------- 5 files changed, 680 insertions(+), 458 deletions(-) diff --git a/light-client-lib/src/storage/db/browser.rs b/light-client-lib/src/storage/db/browser.rs index 8fe7637c..e886e5d5 100644 --- a/light-client-lib/src/storage/db/browser.rs +++ b/light-client-lib/src/storage/db/browser.rs @@ -42,7 +42,7 @@ use crate::{ }; use wasm_bindgen::{prelude::wasm_bindgen, JsCast, JsValue}; use web_sys::js_sys::{Atomics, Int32Array, SharedArrayBuffer, Uint8Array}; -enum CommandRequestWithTakeWhile { +enum CommandRequestWithTakeWhileAndFilterMap { Read { keys: Vec>, }, @@ -57,6 +57,7 @@ enum CommandRequestWithTakeWhile { start_key_bound: Vec, order: CursorDirection, take_while: Box bool + Send + 'static>, + filter_map: Box Option> + Send + 'static>, limit: usize, skip: usize, }, @@ -65,6 +66,7 @@ enum CommandRequestWithTakeWhile { start_key_bound: Vec, order: CursorDirection, take_while: Box bool + Send + 'static>, + filter_map: Box Option> + Send + 'static>, limit: usize, skip: usize, }, @@ -146,7 +148,8 @@ impl CommunicationChannel { ), OutputCommand::RequestTakeWhile | OutputCommand::Waiting - | OutputCommand::DbResponse => { + | OutputCommand::DbResponse + | OutputCommand::RequestFilterMap => { unreachable!() } } @@ -156,18 +159,23 @@ impl CommunicationChannel { /// cmd: The command fn dispatch_database_command( &self, - cmd: CommandRequestWithTakeWhile, + cmd: CommandRequestWithTakeWhileAndFilterMap, ) -> anyhow::Result { - let (new_cmd, take_while) = match cmd { - CommandRequestWithTakeWhile::Read { keys } => (DbCommandRequest::Read { keys }, None), - CommandRequestWithTakeWhile::Put { kvs } => (DbCommandRequest::Put { kvs }, None), - CommandRequestWithTakeWhile::Delete { keys } => { - (DbCommandRequest::Delete { keys }, None) + let (new_cmd, take_while, filter_map) = match cmd { + CommandRequestWithTakeWhileAndFilterMap::Read { keys } => { + (DbCommandRequest::Read { keys }, None, None) } - CommandRequestWithTakeWhile::Iterator { + CommandRequestWithTakeWhileAndFilterMap::Put { kvs } => { + (DbCommandRequest::Put { kvs }, None, None) + } + CommandRequestWithTakeWhileAndFilterMap::Delete { keys } => { + (DbCommandRequest::Delete { keys }, None, None) + } + CommandRequestWithTakeWhileAndFilterMap::Iterator { start_key_bound, order, take_while, + filter_map, limit, skip, } => ( @@ -178,11 +186,13 @@ impl CommunicationChannel { skip, }, Some(take_while), + Some(filter_map), ), - CommandRequestWithTakeWhile::IteratorKey { + CommandRequestWithTakeWhileAndFilterMap::IteratorKey { start_key_bound, order, take_while, + filter_map, limit, skip, } => ( @@ -193,6 +203,7 @@ impl CommunicationChannel { skip, }, Some(take_while), + Some(filter_map), ), }; debug!("Dispatching database command: {:?}", new_cmd); @@ -232,6 +243,23 @@ impl CommunicationChannel { )?; continue; } + OutputCommand::RequestFilterMap => { + let arg = read_command_payload::>(output_i32_arr, output_u8_arr)?; + let result = filter_map.as_ref().unwrap()(&arg); + + debug!( + "Received filter_map request with args {:?}, result {:?}", + arg, result + ); + write_command_with_payload( + InputCommand::ResponseFilterMap as i32, + result, + input_i32_arr, + input_u8_arr, + )?; + continue; + } + OutputCommand::DbResponse => { return read_command_payload::( output_i32_arr, @@ -297,7 +325,7 @@ impl Storage { V: AsRef<[u8]>, { self.channel - .dispatch_database_command(CommandRequestWithTakeWhile::Put { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Put { kvs: vec![KV { key: key.as_ref().to_vec(), value: value.as_ref().to_vec(), @@ -309,7 +337,7 @@ impl Storage { pub fn get>(&self, key: K) -> Result>> { let values = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::Read { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Read { keys: vec![key.as_ref().to_vec()], }) .map_err(|e| Error::Indexdb(format!("{:?}", e)))?; @@ -327,7 +355,7 @@ impl Storage { } fn delete>(&self, key: K) -> Result<()> { self.channel - .dispatch_database_command(CommandRequestWithTakeWhile::Delete { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Delete { keys: vec![key.as_ref().to_vec()], }) .map(|_| ()) @@ -336,19 +364,19 @@ impl Storage { pub fn is_filter_scripts_empty(&self) -> bool { let key_prefix = Key::Meta(FILTER_SCRIPTS_KEY).into_vec(); - let value = - match self - .channel - .dispatch_database_command(CommandRequestWithTakeWhile::IteratorKey { - start_key_bound: key_prefix.clone(), - order: CursorDirection::NextUnique, - take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)), - limit: 1, - skip: 0, - }) { - Ok(v) => v, - Err(_) => return false, - }; + let value = match self.channel.dispatch_database_command( + CommandRequestWithTakeWhileAndFilterMap::IteratorKey { + start_key_bound: key_prefix.clone(), + order: CursorDirection::NextUnique, + take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)), + filter_map: Box::new(|s| Some(s.to_vec())), + limit: 1, + skip: 0, + }, + ) { + Ok(v) => v, + Err(_) => return false, + }; if let DbCommandResponse::IteratorKey { keys } = value { keys.is_empty() @@ -361,10 +389,11 @@ impl Storage { let key_prefix_clone = key_prefix.clone(); let value = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::Iterator { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator { start_key_bound: key_prefix_clone.clone(), order: CursorDirection::NextUnique, take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix_clone)), + filter_map: Box::new(|s| Some(s.to_vec())), limit: usize::MAX, skip: 0, }) @@ -409,15 +438,18 @@ impl Storage { let key_prefix_clone = key_prefix.clone(); let remove_keys = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::IteratorKey { - start_key_bound: key_prefix_clone.clone(), - order: CursorDirection::NextUnique, - take_while: Box::new(move |raw_key: &[u8]| { - raw_key.starts_with(&key_prefix_clone) - }), - limit: usize::MAX, - skip: 0, - }) + .dispatch_database_command( + CommandRequestWithTakeWhileAndFilterMap::IteratorKey { + start_key_bound: key_prefix_clone.clone(), + order: CursorDirection::NextUnique, + take_while: Box::new(move |raw_key: &[u8]| { + raw_key.starts_with(&key_prefix_clone) + }), + filter_map: Box::new(|s| Some(s.to_vec())), + limit: usize::MAX, + skip: 0, + }, + ) .unwrap(); debug!("Received {:?}", remove_keys); if let DbCommandResponse::IteratorKey { keys } = remove_keys { @@ -498,10 +530,11 @@ impl Storage { let value = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::Iterator { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator { start_key_bound: key_prefix.clone(), order: CursorDirection::NextUnique, take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)), + filter_map: Box::new(|s| Some(s.to_vec())), limit: usize::MAX, skip: 0, }) @@ -533,10 +566,11 @@ impl Storage { let key_prefix_clone = key_prefix.clone(); let value = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::Iterator { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator { start_key_bound: key_prefix_clone.clone(), order: CursorDirection::NextUnique, take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix_clone)), + filter_map: Box::new(|s| Some(s.to_vec())), limit: usize::MAX, skip: 0, }) @@ -575,10 +609,11 @@ impl Storage { let value = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::IteratorKey { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::IteratorKey { start_key_bound: key_prefix.clone(), order: CursorDirection::NextUnique, take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)), + filter_map: Box::new(|s| Some(s.to_vec())), limit: usize::MAX, skip: 0, }) @@ -610,10 +645,11 @@ impl Storage { let value = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::Iterator { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator { start_key_bound: iter_from, order: CursorDirection::NextUnique, take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix_clone)), + filter_map: Box::new(|s| Some(s.to_vec())), limit: 1, skip: 0, }) @@ -653,10 +689,11 @@ impl Storage { let value = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::Iterator { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator { start_key_bound: start_key, order: CursorDirection::NextUnique, take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)), + filter_map: Box::new(|s| Some(s.to_vec())), limit, skip: 0, }) @@ -677,10 +714,11 @@ impl Storage { let value = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::Iterator { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator { start_key_bound: key_prefix.clone(), order: CursorDirection::NextUnique, take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)), + filter_map: Box::new(|s| Some(s.to_vec())), limit: usize::MAX, skip: 0, }) @@ -722,7 +760,7 @@ impl Storage { let value = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::Iterator { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator { start_key_bound: key_prefix.clone(), order: CursorDirection::PrevUnique, take_while: Box::new(move |raw_key: &[u8]| { @@ -733,6 +771,7 @@ impl Storage { .expect("stored BlockNumber"), ) >= to_number }), + filter_map: Box::new(|s| Some(s.to_vec())), limit: usize::MAX, skip: 0, }) @@ -901,7 +940,7 @@ impl Batch { fn delete_many(&mut self, keys: Vec>) -> Result<()> { self.comm_arrays - .dispatch_database_command(CommandRequestWithTakeWhile::Delete { keys }) + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Delete { keys }) .map(|_| ()) .map_err(|e| Error::Indexdb(format!("{:?}", e))) } @@ -909,14 +948,16 @@ impl Batch { fn commit(self) -> Result<()> { if !self.add.is_empty() { self.comm_arrays - .dispatch_database_command(CommandRequestWithTakeWhile::Put { kvs: self.add }) + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Put { + kvs: self.add, + }) .map(|_| ()) .map_err(|e| Error::Indexdb(format!("{:?}", e)))?; } if !self.delete.is_empty() { self.comm_arrays - .dispatch_database_command(CommandRequestWithTakeWhile::Delete { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Delete { keys: self.delete, }) .map(|_| ()) @@ -934,15 +975,17 @@ impl Storage { start_key_bound: Vec, order: CursorDirection, take_while: Box bool + Send + 'static>, + filter_map: Box Option> + Send + 'static>, limit: usize, skip: usize, ) -> Vec { let value = self .channel - .dispatch_database_command(CommandRequestWithTakeWhile::Iterator { + .dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator { start_key_bound, order, take_while, + filter_map, limit, skip, }) diff --git a/wasm/light-client-db-common/src/lib.rs b/wasm/light-client-db-common/src/lib.rs index e8c093a8..e47cf3b1 100644 --- a/wasm/light-client-db-common/src/lib.rs +++ b/wasm/light-client-db-common/src/lib.rs @@ -63,6 +63,7 @@ pub enum DbCommandRequest { }, } #[repr(i32)] +#[derive(Debug)] /// Represent a 4-byte command which will be put in input buffer pub enum InputCommand { /// Indicates that there is no command and light client is waiting for next call @@ -81,6 +82,9 @@ pub enum InputCommand { /// Used for response from take_while, not for users /// Payload: result of the call, in bool, bincode-encoded ResponseTakeWhile = 20, + /// Used for response from filter_map, not for users + /// Payload: result of the call, in Option>, bincode-encoded + ResponseFilterMap = 21, } impl TryFrom for InputCommand { @@ -93,6 +97,7 @@ impl TryFrom for InputCommand { 2 => Ok(Self::DbRequest), 3 => Ok(Self::Shutdown), 20 => Ok(Self::ResponseTakeWhile), + 21 => Ok(Self::ResponseFilterMap), s => Err(anyhow!("Invalid command: {}", s)), } } @@ -116,6 +121,9 @@ pub enum OutputCommand { /// DbWorker wants to call take_while /// Payload: bincode-encoded bytes, argument of take_while RequestTakeWhile = 20, + /// DbWorker wants to call filter_map + /// Payload: bincode-encoded bytes, argument of filter_map + RequestFilterMap = 21, } impl TryFrom for OutputCommand { @@ -128,6 +136,7 @@ impl TryFrom for OutputCommand { 2 => Ok(Self::DbResponse), 10 => Ok(Self::Error), 20 => Ok(Self::RequestTakeWhile), + 21 => Ok(Self::RequestFilterMap), s => Err(anyhow!("Invalid command: {}", s)), } } diff --git a/wasm/light-client-db-worker/src/db.rs b/wasm/light-client-db-worker/src/db.rs index ffd91c76..49e9d2dc 100644 --- a/wasm/light-client-db-worker/src/db.rs +++ b/wasm/light-client-db-worker/src/db.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use anyhow::{anyhow, Context}; use idb::{ CursorDirection, Database, DatabaseEvent, Factory, IndexParams, KeyPath, KeyRange, @@ -41,16 +43,19 @@ async fn open_iterator( .into_managed()) } -pub async fn collect_iterator( +pub async fn collect_iterator( store: &ObjectStore, start_key_bound: &[u8], order: CursorDirection, take_while: F, + filter_map: FnFilterMap, limit: usize, skip: usize, ) -> anyhow::Result> where F: Fn(&[u8]) -> bool, + FnFilterMap: Fn(&[u8]) -> FnFilterMapOutput, + FnFilterMapOutput: Future>>, { let mut iter = open_iterator(store, start_key_bound, order) .await @@ -74,7 +79,12 @@ where if take_while(&raw_kv.key) { skip_index += 1; if skip_index > skip { - res.push(raw_kv); + if let Some(new_key) = filter_map(&raw_kv.key).await { + res.push(KV { + key: new_key, + value: raw_kv.value, + }); + } } } else { return Ok(res); @@ -102,7 +112,12 @@ where if take_while(&raw_kv.key) { skip_index += 1; if skip_index > skip { - res.push(raw_kv); + if let Some(new_key) = filter_map(&raw_kv.key).await { + res.push(KV { + key: new_key, + value: raw_kv.value, + }); + } } } else { return Ok(res); @@ -111,16 +126,19 @@ where Ok(res) } -async fn collect_iterator_keys( +async fn collect_iterator_keys( store: &ObjectStore, start_key_bound: &[u8], order: CursorDirection, take_while: F, + filter_map: FnFilterMap, limit: usize, skip: usize, ) -> anyhow::Result>> where F: Fn(&[u8]) -> bool, + FnFilterMap: Fn(&[u8]) -> FnFilterMapOutput, + FnFilterMapOutput: Future>>, { let mut iter = open_iterator(store, start_key_bound, order) .await @@ -143,7 +161,9 @@ where if take_while(&raw_key) { skip_index += 1; if skip_index > skip { - res.push(raw_key); + if let Some(new_key) = filter_map(&raw_key).await { + res.push(new_key); + } } } else { return Ok(res); @@ -167,7 +187,9 @@ where if take_while(&raw_key) { skip_index += 1; if skip_index > skip { - res.push(raw_key); + if let Some(new_key) = filter_map(&raw_key).await { + res.push(new_key); + } } } else { return Ok(res); @@ -176,33 +198,42 @@ where Ok(res) } -pub(crate) async fn handle_db_command( +pub(crate) async fn handle_db_command( db: &Database, store_name: &str, cmd: DbCommandRequest, invoke_take_while: F, + invoke_filter_map: FnFilterMap, + custom_store: Option, ) -> anyhow::Result where F: Fn(&[u8]) -> bool, + FnFilterMap: FnOnce(&[u8], ObjectStore) -> FnFilterMapOutput + Clone, + FnFilterMapOutput: Future>>, { debug!("Handle command: {:?}", cmd); - let tx_mode = match cmd { - DbCommandRequest::Iterator { .. } | DbCommandRequest::IteratorKey { .. } => { - TransactionMode::ReadOnly - } - DbCommandRequest::Read { .. } => TransactionMode::ReadOnly, - DbCommandRequest::Put { .. } | DbCommandRequest::Delete { .. } => { - TransactionMode::ReadWrite - } - }; - let tran = db - .transaction(&[&store_name], tx_mode) - .map_err(|e| anyhow!("Failed to create transaction: {:?}", e))?; - let store = tran - .object_store(store_name) - .map_err(|e| anyhow!("Unable to find store {}: {}", store_name, e))?; + let (tran, store) = if let Some(store) = custom_store { + (None, store) + } else { + let tx_mode = match cmd { + DbCommandRequest::Iterator { .. } | DbCommandRequest::IteratorKey { .. } => { + TransactionMode::ReadOnly + } + DbCommandRequest::Read { .. } => TransactionMode::ReadOnly, + DbCommandRequest::Put { .. } | DbCommandRequest::Delete { .. } => { + TransactionMode::ReadWrite + } + }; + let tran = db + .transaction(&[&store_name], tx_mode) + .map_err(|e| anyhow!("Failed to create transaction: {:?}", e))?; + let store = tran + .object_store(store_name) + .map_err(|e| anyhow!("Unable to find store {}: {}", store_name, e))?; + (Some(tran), store) + }; let result = match cmd { DbCommandRequest::Read { keys } => { let mut res = Vec::new(); @@ -257,6 +288,12 @@ where &start_key_bound, ckb_cursor_direction_to_idb(order), invoke_take_while, + |key| { + let key = key.to_vec(); + let store = store.clone(); + let invoke_filter_map = invoke_filter_map.clone(); + async move { invoke_filter_map(&key, store.clone()).await } + }, limit, skip, ) @@ -279,6 +316,12 @@ where &start_key_bound, ckb_cursor_direction_to_idb(order), invoke_take_while, + |key| { + let key = key.to_vec(); + let store = store.clone(); + let invoke_filter_map = invoke_filter_map.clone(); + async move { invoke_filter_map(&key, store.clone()).await } + }, limit, skip, ) @@ -287,7 +330,9 @@ where DbCommandResponse::IteratorKey { keys } } }; - assert_eq!(TransactionResult::Committed, tran.await.unwrap()); + if let Some(tran) = tran { + assert_eq!(TransactionResult::Committed, tran.await.unwrap()); + } debug!("Command result={:?}", result); Ok(result) } diff --git a/wasm/light-client-db-worker/src/lib.rs b/wasm/light-client-db-worker/src/lib.rs index ea360a7b..0522c986 100644 --- a/wasm/light-client-db-worker/src/lib.rs +++ b/wasm/light-client-db-worker/src/lib.rs @@ -2,8 +2,8 @@ use std::cell::RefCell; use std::str::FromStr; use db::{handle_db_command, open_database}; -use idb::Database; -use light_client_db_common::{read_command_payload, write_command_with_payload}; +use idb::{Database, Transaction}; +use light_client_db_common::{read_command_payload, write_command_with_payload, DbCommandRequest}; use light_client_db_common::{InputCommand, OutputCommand}; use log::{debug, info}; use util::{wait_for_command, wait_for_command_sync}; @@ -16,6 +16,7 @@ mod util; thread_local! { static INPUT_BUFFER: RefCell> = const { RefCell::new(None) }; static OUTPUT_BUFFER: RefCell> = const { RefCell::new(None) }; + pub(crate) static GLOBAL_TRANSACTION: RefCell> = const { RefCell::new(None) }; } #[wasm_bindgen] /// Set `SharedArrayBuffer` used for communicating with light client worker. This must be called before executing `main_loop` @@ -95,25 +96,112 @@ pub async fn main_loop(log_level: &str) { InputCommand::DbRequest => { let db_cmd = read_command_payload(&input_i32_arr, &input_u8_arr).unwrap(); let db = db.as_ref().expect("Database not opened yet"); - let result = handle_db_command(db, STORE_NAME, db_cmd, |buf| { - input_i32_arr.set_index(0, InputCommand::Waiting as i32); - debug!("Invoking request take while with args {:?}", buf); - write_command_with_payload( - OutputCommand::RequestTakeWhile as i32, - buf, - &output_i32_arr, - &output_u8_arr, - ) - .unwrap(); - // Sync wait here, so transaction of IndexedDB won't be commited (it will be commited once control flow was returned from sync call stack) - wait_for_command_sync(&input_i32_arr, InputCommand::Waiting).unwrap(); + let result = handle_db_command( + db, + STORE_NAME, + db_cmd, + |buf| { + input_i32_arr.set_index(0, InputCommand::Waiting as i32); + debug!("Invoking request take while with args {:?}", buf); + write_command_with_payload( + OutputCommand::RequestTakeWhile as i32, + buf, + &output_i32_arr, + &output_u8_arr, + ) + .unwrap(); + // Sync wait here, so transaction of IndexedDB won't be commited (it will be commited once control flow was returned from sync call stack) + wait_for_command_sync(&input_i32_arr, InputCommand::Waiting).unwrap(); + let result = + read_command_payload::(&input_i32_arr, &input_u8_arr).unwrap(); + debug!("Received take while result {}", result); + input_i32_arr.set_index(0, InputCommand::Waiting as i32); + result + }, + |buf, store| { + let buf = buf.to_vec(); + input_i32_arr.set_index(0, InputCommand::Waiting as i32); + debug!("Invoking request filter_map with args {:?}", buf); + write_command_with_payload( + OutputCommand::RequestFilterMap as i32, + buf, + &output_i32_arr, + &output_u8_arr, + ) + .unwrap(); + let input_i32_arr = input_i32_arr.clone(); + let input_u8_arr = input_u8_arr.clone(); + let output_i32_arr = output_i32_arr.clone(); + let output_u8_arr = output_u8_arr.clone(); - let result = - read_command_payload::(&input_i32_arr, &input_u8_arr).unwrap(); - debug!("Received take while result {}", result); - input_i32_arr.set_index(0, InputCommand::Waiting as i32); - result - }) + async move { + loop { + let store = store.clone(); + match wait_for_command_sync(&input_i32_arr, InputCommand::Waiting) + .unwrap() + { + InputCommand::Waiting + | InputCommand::OpenDatabase + | InputCommand::Shutdown + | InputCommand::ResponseTakeWhile => { + unreachable!() + } + // Allow calling other db requests in filter map call + InputCommand::DbRequest => { + let db_cmd: DbCommandRequest = + read_command_payload(&input_i32_arr, &input_u8_arr) + .unwrap(); + debug!( + "Received DbCommandRequest\ + when waiting for ResponseTakeWhile: {:?}", + db_cmd + ); + let db_result = handle_db_command( + db, + STORE_NAME, + db_cmd, + |_| panic!("Can't call take while in filter map"), + |_, _| async { + panic!("Can't call filter map in filter map") + }, + Some(store), + ) + .await; + debug!("db command result at filter map: {:?}", db_result); + match db_result { + Ok(o) => write_command_with_payload( + OutputCommand::DbResponse as i32, + &o, + &output_i32_arr, + &output_u8_arr, + ) + .unwrap(), + Err(e) => write_command_with_payload( + OutputCommand::Error as i32, + format!("{:?}", e), + &output_i32_arr, + &output_u8_arr, + ) + .unwrap(), + }; + input_i32_arr.set_index(0, InputCommand::Waiting as i32); + } + InputCommand::ResponseFilterMap => { + let result = read_command_payload::>>( + &input_i32_arr, + &input_u8_arr, + ) + .unwrap(); + debug!("Received filter map result {:?}", result); + input_i32_arr.set_index(0, InputCommand::Waiting as i32); + return result; + } + } + } + } + }, + None, + ) .await; debug!("db command result: {:?}", result); match result { @@ -134,7 +222,9 @@ pub async fn main_loop(log_level: &str) { }; } InputCommand::Shutdown => break, - InputCommand::Waiting | InputCommand::ResponseTakeWhile => unreachable!(), + InputCommand::Waiting + | InputCommand::ResponseTakeWhile + | InputCommand::ResponseFilterMap => unreachable!(), } } info!("Db worker main loop exited"); diff --git a/wasm/light-client-wasm/src/lib.rs b/wasm/light-client-wasm/src/lib.rs index cd14841d..03811b8c 100644 --- a/wasm/light-client-wasm/src/lib.rs +++ b/wasm/light-client-wasm/src/lib.rs @@ -590,51 +590,43 @@ pub fn get_cells( filter_block_range, ) = build_filter_options(search_key)?; - let storage = STORAGE_WITH_DATA.get().unwrap().storage(); - - let mut internal_limit: usize = limit; - - // The new implementation iterates to try "limit * 2^0", "limit * 2^1", "limit * 2^2".. (until meets n such that limit * 2^n > u32::MAX) as the limit passed to storage.collect_iterator and finds result to return from what `collect_iterator` returns. - // Since this is a light client, we can assume that storage.collect_iterator won't return too much data - // If internal_limit reaches u32::MAX and we still can't get any cells from filtered results, we can assume that no cells will be found. - loop { - let prefix_cloned = prefix.clone(); - let kvs: Vec<_> = storage.collect_iterator( - from_key.clone(), - direction, - Box::new(move |key| key.starts_with(&prefix_cloned)), - internal_limit, - skip, + fn extract_data_from_key(key: &[u8]) -> (u32, u32, u64) { + let output_index = u32::from_be_bytes( + key[key.len() - 4..] + .try_into() + .expect("stored output_index"), + ); + let tx_index = u32::from_be_bytes( + key[key.len() - 8..key.len() - 4] + .try_into() + .expect("stored tx_index"), + ); + let block_number = u64::from_be_bytes( + key[key.len() - 16..key.len() - 8] + .try_into() + .expect("stored block_number"), ); + (output_index, tx_index, block_number) + } + + let storage = STORAGE_WITH_DATA.get().unwrap().storage(); + let kvs: Vec<_> = storage.collect_iterator( + from_key.clone(), + direction, + Box::new(move |key| key.starts_with(&prefix)), + Box::new(move |key| { + let value = storage.get(key).unwrap().unwrap(); - let mut cells = Vec::new(); - let mut last_key = Vec::new(); - for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { debug!("get cells iterator at {:?} {:?}", key, value); let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); - let output_index = u32::from_be_bytes( - key[key.len() - 4..] - .try_into() - .expect("stored output_index"), - ); - let tx_index = u32::from_be_bytes( - key[key.len() - 8..key.len() - 4] - .try_into() - .expect("stored tx_index"), - ); - let block_number = u64::from_be_bytes( - key[key.len() - 16..key.len() - 8] - .try_into() - .expect("stored block_number"), - ); - - let tx = packed::Transaction::from_slice( - &storage - .get(Key::TxHash(&tx_hash).into_vec()) - .unwrap() - .expect("stored tx")[12..], - ) - .expect("from stored tx slice should be OK"); + let (output_index, _tx_index, block_number) = extract_data_from_key(key); + let tx_data = &storage + .get(Key::TxHash(&tx_hash).into_vec()) + .unwrap() + .expect("stored tx")[12..]; + debug!("tx hash = {:?}, tx data = {:?}", tx_hash, tx_data); + let tx = packed::Transaction::from_slice(tx_data) + .expect("from stored tx slice should be OK"); let output = tx .raw() .outputs() @@ -654,7 +646,7 @@ pub fn get_cells( .starts_with(prefix) { debug!("skipped at {}", line!()); - continue; + return None; } } ScriptType::Type => { @@ -664,7 +656,7 @@ pub fn get_cells( .starts_with(prefix) { debug!("skipped at {}", line!()); - continue; + return None; } } } @@ -676,7 +668,7 @@ pub fn get_cells( let script_len = extract_raw_data(&output.lock()).len(); if script_len < r0 || script_len > r1 { debug!("skipped at {}", line!()); - continue; + return None; } } ScriptType::Type => { @@ -687,7 +679,7 @@ pub fn get_cells( .unwrap_or_default(); if script_len < r0 || script_len > r1 { debug!("skipped at {}", line!()); - continue; + return None; } } } @@ -696,7 +688,7 @@ pub fn get_cells( if let Some([r0, r1]) = filter_output_data_len_range { if output_data.len() < r0 || output_data.len() >= r1 { debug!("skipped at {}", line!()); - continue; + return None; } } @@ -704,58 +696,70 @@ pub fn get_cells( let capacity: core::Capacity = output.capacity().unpack(); if capacity < r0 || capacity >= r1 { debug!("skipped at {}", line!()); - continue; + return None; } } if let Some([r0, r1]) = filter_block_range { if block_number < r0 || block_number >= r1 { debug!("skipped at {}", line!()); - continue; + return None; } } + return Some(key.to_vec()); + }), + limit, + skip, + ); + debug!("get_cells: collect_iterator done"); + let mut cells = Vec::new(); + let mut last_key = Vec::new(); + for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { + let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); + let (output_index, tx_index, block_number) = extract_data_from_key(&key); + let tx = packed::Transaction::from_slice( + &storage + .get(Key::TxHash(&tx_hash).into_vec()) + .unwrap() + .expect("stored tx")[12..], + ) + .expect("from stored tx slice should be OK"); + let output = tx + .raw() + .outputs() + .get(output_index as usize) + .expect("get output by index should be OK"); + let output_data = tx + .raw() + .outputs_data() + .get(output_index as usize) + .expect("get output data by index should be OK"); - last_key = key.to_vec(); - let cell_to_push = Cell { - output: output.into(), - output_data: if with_data { - Some(output_data.into()) - } else { - None - }, - out_point: packed::OutPoint::new(tx_hash, output_index).into(), - block_number: block_number.into(), - tx_index: tx_index.into(), - }; - debug!("pushed cell {:#?}", cell_to_push); - cells.push(cell_to_push); - if cells.len() >= limit { - break; - } - } - debug!("get_cells last_key={:?}", last_key); - - if !cells.is_empty() { - return Ok((Pagination { - objects: cells, - last_cursor: JsonBytes::from_vec(last_key), - }) - .serialize(&SERIALIZER)?); - } else { - internal_limit *= 2; - if internal_limit > u32::MAX as usize { - debug!( - "Internal limit is now greater than {}, assuming no data is found", - u32::MAX - ); - return Ok((Pagination { - objects: Vec::::default(), - last_cursor: JsonBytes::from_vec(vec![]), - }) - .serialize(&SERIALIZER)?); - } + last_key = key.to_vec(); + let cell_to_push = Cell { + output: output.into(), + output_data: if with_data { + Some(output_data.into()) + } else { + None + }, + out_point: packed::OutPoint::new(tx_hash, output_index).into(), + block_number: block_number.into(), + tx_index: tx_index.into(), + }; + debug!("pushed cell {:#?}", cell_to_push); + cells.push(cell_to_push); + if cells.len() >= limit { + break; } } + debug!("get_cells last_key={:?}", last_key); + + return Ok((Pagination { + objects: cells, + last_cursor: JsonBytes::from_vec(last_key), + }) + .serialize(&SERIALIZER)?); } #[wasm_bindgen] pub fn get_transactions( @@ -780,11 +784,17 @@ pub fn get_transactions( order, after_cursor.map(JsonBytes::from_vec), )?; + let limit = limit as usize; if limit == 0 { return Err(JsValue::from_str("limit should be greater than 0")); } + let filter_script_type = match search_key.script_type { + ScriptType::Lock => ScriptType::Type, + ScriptType::Type => ScriptType::Lock, + }; + let (filter_script, filter_block_range) = if let Some(filter) = search_key.filter.as_ref() { if filter.output_data_len_range.is_some() { return Err(JsValue::from_str( @@ -805,196 +815,174 @@ pub fn get_transactions( (None, None) }; - let filter_script_type = match search_key.script_type { - ScriptType::Lock => ScriptType::Type, - ScriptType::Type => ScriptType::Lock, - }; - let storage = STORAGE_WITH_DATA.get().unwrap().storage(); if search_key.group_by_transaction.unwrap_or_default() { - let mut internal_limit: usize = limit * 10; // Start with higher limit for grouped mode - - loop { - let prefix_cloned = prefix.clone(); - let mut kvs: Vec<_> = storage.collect_iterator( - from_key.clone(), - direction, - Box::new(move |key| key.starts_with(&prefix_cloned)), - internal_limit, - skip, - ); - let mut tx_with_cells: Vec = Vec::new(); - let mut last_key = Vec::new(); - - 'outer: while !kvs.is_empty() { - for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { - let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); - if tx_with_cells.len() == limit - && tx_with_cells.last_mut().unwrap().transaction.hash != tx_hash.unpack() - { - break 'outer; - } - last_key = key.to_vec(); - let tx = packed::Transaction::from_slice( - &storage - .get(Key::TxHash(&tx_hash).into_vec()) - .expect("get tx should be OK") - .expect("stored tx")[12..], - ) - .expect("from stored tx slice should be OK"); - - let block_number = u64::from_be_bytes( - key[key.len() - 17..key.len() - 9] - .try_into() - .expect("stored block_number"), - ); - let tx_index = u32::from_be_bytes( - key[key.len() - 9..key.len() - 5] - .try_into() - .expect("stored tx_index"), - ); - let io_index = u32::from_be_bytes( - key[key.len() - 5..key.len() - 1] - .try_into() - .expect("stored io_index"), - ); - let io_type = if *key.last().expect("stored io_type") == 0 { - CellType::Input - } else { - CellType::Output - }; + let kvs: Vec<_> = storage.collect_iterator( + from_key.clone(), + direction, + Box::new(move |key| key.starts_with(&prefix)), + Box::new(move |key| { + let value = storage.get(key).unwrap().unwrap(); + debug!("get transactions iterator at {:?} {:?}", key, value); - if let Some(filter_script) = filter_script.as_ref() { - let filter_script_matched = match filter_script_type { - ScriptType::Lock => storage - .get( - Key::TxLockScript( - filter_script, - block_number, - tx_index, - io_index, - match io_type { - CellType::Input => storage::CellType::Input, - CellType::Output => storage::CellType::Output, - }, - ) - .into_vec(), + let block_number = u64::from_be_bytes( + key[key.len() - 17..key.len() - 9] + .try_into() + .expect("stored block_number"), + ); + let tx_index = u32::from_be_bytes( + key[key.len() - 9..key.len() - 5] + .try_into() + .expect("stored tx_index"), + ); + let io_index = u32::from_be_bytes( + key[key.len() - 5..key.len() - 1] + .try_into() + .expect("stored io_index"), + ); + let io_type = if *key.last().expect("stored io_type") == 0 { + CellType::Input + } else { + CellType::Output + }; + + if let Some(filter_script) = filter_script.as_ref() { + let filter_script_matched = match filter_script_type { + ScriptType::Lock => storage + .get( + Key::TxLockScript( + filter_script, + block_number, + tx_index, + io_index, + match io_type { + CellType::Input => storage::CellType::Input, + CellType::Output => storage::CellType::Output, + }, ) - .expect("get TxLockScript should be OK") - .is_some(), - ScriptType::Type => storage - .get( - Key::TxTypeScript( - filter_script, - block_number, - tx_index, - io_index, - match io_type { - CellType::Input => storage::CellType::Input, - CellType::Output => storage::CellType::Output, - }, - ) - .into_vec(), + .into_vec(), + ) + .expect("get TxLockScript should be OK") + .is_some(), + ScriptType::Type => storage + .get( + Key::TxTypeScript( + filter_script, + block_number, + tx_index, + io_index, + match io_type { + CellType::Input => storage::CellType::Input, + CellType::Output => storage::CellType::Output, + }, ) - .expect("get TxTypeScript should be OK") - .is_some(), - }; - - if !filter_script_matched { - debug!("skipped at {}", line!()); - continue; - } - } + .into_vec(), + ) + .expect("get TxTypeScript should be OK") + .is_some(), + }; - if let Some([r0, r1]) = filter_block_range { - if block_number < r0 || block_number >= r1 { - debug!("skipped at {}", line!()); - continue; - } + if !filter_script_matched { + debug!("skipped at {}", line!()); + return None; } + } - let last_tx_hash_is_same = tx_with_cells - .last_mut() - .map(|last| { - if last.transaction.hash == tx_hash.unpack() { - last.cells.push((io_type.clone(), io_index.into())); - true - } else { - false - } - }) - .unwrap_or_default(); - - if !last_tx_hash_is_same { - tx_with_cells.push(TxWithCells { - transaction: tx.into_view().into(), - block_number: block_number.into(), - tx_index: tx_index.into(), - cells: vec![(io_type, io_index.into())], - }); + if let Some([r0, r1]) = filter_block_range { + if block_number < r0 || block_number >= r1 { + debug!("skipped at {}", line!()); + return None; } } - if tx_with_cells.len() >= limit { - break 'outer; - } - let prefix_cloned = prefix.clone(); - kvs = storage.collect_iterator( - last_key.clone(), - direction, - Box::new(move |key| key.starts_with(&prefix_cloned)), - 100, - 1, - ); + + return Some(key.to_vec()); + }), + limit * 10, // Start with higher limit for grouped mode + skip, + ); + + let mut tx_with_cells: Vec = Vec::new(); + let mut last_key = Vec::new(); + + for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { + let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); + if tx_with_cells.len() == limit + && tx_with_cells.last_mut().unwrap().transaction.hash != tx_hash.unpack() + { + break; } + last_key = key.to_vec(); + let tx = packed::Transaction::from_slice( + &storage + .get(Key::TxHash(&tx_hash).into_vec()) + .expect("get tx should be OK") + .expect("stored tx")[12..], + ) + .expect("from stored tx slice should be OK"); - debug!("get_transactions (grouped) last_key={:?}", last_key); + let block_number = u64::from_be_bytes( + key[key.len() - 17..key.len() - 9] + .try_into() + .expect("stored block_number"), + ); + let tx_index = u32::from_be_bytes( + key[key.len() - 9..key.len() - 5] + .try_into() + .expect("stored tx_index"), + ); + let io_index = u32::from_be_bytes( + key[key.len() - 5..key.len() - 1] + .try_into() + .expect("stored io_index"), + ); + let io_type = if *key.last().expect("stored io_type") == 0 { + CellType::Input + } else { + CellType::Output + }; - if !tx_with_cells.is_empty() { - return Ok((Pagination { - objects: tx_with_cells.into_iter().map(Tx::Grouped).collect(), - last_cursor: JsonBytes::from_vec(last_key), + let last_tx_hash_is_same = tx_with_cells + .last_mut() + .map(|last| { + if last.transaction.hash == tx_hash.unpack() { + last.cells.push((io_type.clone(), io_index.into())); + true + } else { + false + } }) - .serialize(&SERIALIZER)?); - } else { - internal_limit *= 2; - if internal_limit > u32::MAX as usize { - debug!( - "Internal limit is now greater than {}, assuming no data is found", - u32::MAX - ); - return Ok((Pagination { - objects: Vec::::default(), - last_cursor: JsonBytes::from_vec(vec![]), - }) - .serialize(&SERIALIZER)?); - } + .unwrap_or_default(); + + if !last_tx_hash_is_same { + tx_with_cells.push(TxWithCells { + transaction: tx.into_view().into(), + block_number: block_number.into(), + tx_index: tx_index.into(), + cells: vec![(io_type, io_index.into())], + }); + } + + if tx_with_cells.len() >= limit { + break; } } + + debug!("get_transactions (grouped) last_key={:?}", last_key); + + return Ok((Pagination { + objects: tx_with_cells.into_iter().map(Tx::Grouped).collect(), + last_cursor: JsonBytes::from_vec(last_key), + }) + .serialize(&SERIALIZER)?); } else { - let mut internal_limit: usize = limit; - - loop { - let prefix_cloned = prefix.clone(); - let kvs: Vec<_> = storage.collect_iterator( - from_key.clone(), - direction, - Box::new(move |key| key.starts_with(&prefix_cloned)), - internal_limit, - skip, - ); - let mut last_key = Vec::new(); - let mut txs = Vec::new(); - for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { + let kvs: Vec<_> = storage.collect_iterator( + from_key.clone(), + direction, + Box::new(move |key| key.starts_with(&prefix)), + Box::new(move |key| { + let value = storage.get(key).unwrap().unwrap(); debug!("get transactions iterator at {:?} {:?}", key, value); - let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); - let tx = packed::Transaction::from_slice( - &storage - .get(Key::TxHash(&tx_hash).into_vec()) - .expect("get tx should be OK") - .expect("stored tx")[12..], - ) - .expect("from stored tx slice should be OK"); let block_number = u64::from_be_bytes( key[key.len() - 17..key.len() - 9] @@ -1038,7 +1026,7 @@ pub fn get_transactions( .is_none() { debug!("skipped at {}", line!()); - continue; + return None; }; } ScriptType::Type => { @@ -1060,7 +1048,7 @@ pub fn get_transactions( .is_none() { debug!("skipped at {}", line!()); - continue; + return None; }; } } @@ -1069,49 +1057,72 @@ pub fn get_transactions( if let Some([r0, r1]) = filter_block_range { if block_number < r0 || block_number >= r1 { debug!("skipped at {}", line!()); - continue; + return None; } } - last_key = key.to_vec(); - let tx_to_push = Tx::Ungrouped(TxWithCell { - transaction: tx.into_view().into(), - block_number: block_number.into(), - tx_index: tx_index.into(), - io_index: io_index.into(), - io_type, - }); - txs.push(tx_to_push); - if txs.len() >= limit { - break; - } - } - debug!("get_transactions last_key={:?}", last_key); + return Some(key.to_vec()); + }), + limit, + skip, + ); - if !txs.is_empty() { - return Ok((Pagination { - objects: txs, - last_cursor: JsonBytes::from_vec(last_key), - }) - .serialize(&SERIALIZER)?); + let mut last_key = Vec::new(); + let mut txs = Vec::new(); + + for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) { + let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); + let tx = packed::Transaction::from_slice( + &storage + .get(Key::TxHash(&tx_hash).into_vec()) + .expect("get tx should be OK") + .expect("stored tx")[12..], + ) + .expect("from stored tx slice should be OK"); + + let block_number = u64::from_be_bytes( + key[key.len() - 17..key.len() - 9] + .try_into() + .expect("stored block_number"), + ); + let tx_index = u32::from_be_bytes( + key[key.len() - 9..key.len() - 5] + .try_into() + .expect("stored tx_index"), + ); + let io_index = u32::from_be_bytes( + key[key.len() - 5..key.len() - 1] + .try_into() + .expect("stored io_index"), + ); + let io_type = if *key.last().expect("stored io_type") == 0 { + CellType::Input } else { - internal_limit *= 2; - if internal_limit > u32::MAX as usize { - debug!( - "Internal limit is now greater than {}, assuming no data is found", - u32::MAX - ); - return Ok((Pagination { - objects: Vec::::default(), - last_cursor: JsonBytes::from_vec(vec![]), - }) - .serialize(&SERIALIZER)?); - } + CellType::Output + }; + + last_key = key.to_vec(); + let tx_to_push = Tx::Ungrouped(TxWithCell { + transaction: tx.into_view().into(), + block_number: block_number.into(), + tx_index: tx_index.into(), + io_index: io_index.into(), + io_type, + }); + txs.push(tx_to_push); + if txs.len() >= limit { + break; } } + debug!("get_transactions last_key={:?}", last_key); + + return Ok((Pagination { + objects: txs, + last_cursor: JsonBytes::from_vec(last_key), + }) + .serialize(&SERIALIZER)?); } } - #[wasm_bindgen] pub fn get_cells_capacity(search_key: JsValue) -> Result { if !status(0b1) { @@ -1144,6 +1155,103 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { from_key, direction, Box::new(move |key| key.starts_with(&prefix)), + Box::new(move |key| { + let value = storage.get(key).unwrap().unwrap(); + + let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); + let output_index = u32::from_be_bytes( + key[key.len() - 4..] + .try_into() + .expect("stored output_index"), + ); + let block_number = u64::from_be_bytes( + key[key.len() - 16..key.len() - 8] + .try_into() + .expect("stored block_number"), + ); + + let tx = packed::Transaction::from_slice( + &storage + .get(Key::TxHash(&tx_hash).into_vec()) + .expect("get tx should be OK") + .expect("stored tx")[12..], + ) + .expect("from stored tx slice should be OK"); + let output = tx + .raw() + .outputs() + .get(output_index as usize) + .expect("get output by index should be OK"); + let output_data = tx + .raw() + .outputs_data() + .get(output_index as usize) + .expect("get output data by index should be OK"); + + if let Some(prefix) = filter_prefix.as_ref() { + match filter_script_type { + ScriptType::Lock => { + if !extract_raw_data(&output.lock()) + .as_slice() + .starts_with(prefix) + { + return None; + } + } + ScriptType::Type => { + if output.type_().is_none() + || !extract_raw_data(&output.type_().to_opt().unwrap()) + .as_slice() + .starts_with(prefix) + { + return None; + } + } + } + } + + if let Some([r0, r1]) = filter_script_len_range { + match filter_script_type { + ScriptType::Lock => { + let script_len = extract_raw_data(&output.lock()).len(); + if script_len < r0 || script_len > r1 { + return None; + } + } + ScriptType::Type => { + let script_len = output + .type_() + .to_opt() + .map(|script| extract_raw_data(&script).len()) + .unwrap_or_default(); + if script_len < r0 || script_len > r1 { + return None; + } + } + } + } + + if let Some([r0, r1]) = filter_output_data_len_range { + if output_data.len() < r0 || output_data.len() >= r1 { + return None; + } + } + + if let Some([r0, r1]) = filter_output_capacity_range { + let capacity: core::Capacity = output.capacity().unpack(); + if capacity < r0 || capacity >= r1 { + return None; + } + } + + if let Some([r0, r1]) = filter_block_range { + if block_number < r0 || block_number >= r1 { + return None; + } + } + + return Some(key.to_vec()); + }), usize::MAX, skip, ); @@ -1156,11 +1264,6 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { .try_into() .expect("stored output_index"), ); - let block_number = u64::from_be_bytes( - key[key.len() - 16..key.len() - 8] - .try_into() - .expect("stored block_number"), - ); let tx = packed::Transaction::from_slice( &storage @@ -1174,74 +1277,6 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { .outputs() .get(output_index as usize) .expect("get output by index should be OK"); - let output_data = tx - .raw() - .outputs_data() - .get(output_index as usize) - .expect("get output data by index should be OK"); - - if let Some(prefix) = filter_prefix.as_ref() { - match filter_script_type { - ScriptType::Lock => { - if !extract_raw_data(&output.lock()) - .as_slice() - .starts_with(prefix) - { - continue; - } - } - ScriptType::Type => { - if output.type_().is_none() - || !extract_raw_data(&output.type_().to_opt().unwrap()) - .as_slice() - .starts_with(prefix) - { - continue; - } - } - } - } - - if let Some([r0, r1]) = filter_script_len_range { - match filter_script_type { - ScriptType::Lock => { - let script_len = extract_raw_data(&output.lock()).len(); - if script_len < r0 || script_len > r1 { - continue; - } - } - ScriptType::Type => { - let script_len = output - .type_() - .to_opt() - .map(|script| extract_raw_data(&script).len()) - .unwrap_or_default(); - if script_len < r0 || script_len > r1 { - continue; - } - } - } - } - - if let Some([r0, r1]) = filter_output_data_len_range { - if output_data.len() < r0 || output_data.len() >= r1 { - continue; - } - } - - if let Some([r0, r1]) = filter_output_capacity_range { - let capacity: core::Capacity = output.capacity().unpack(); - if capacity < r0 || capacity >= r1 { - continue; - } - } - - if let Some([r0, r1]) = filter_block_range { - if block_number < r0 || block_number >= r1 { - continue; - } - } - capacity += Unpack::::unpack(&output.capacity()).as_u64() } From 5d569e32e9405127b009632d48e3f08f2912550b Mon Sep 17 00:00:00 2001 From: officeyutong Date: Tue, 26 Aug 2025 18:53:44 +0800 Subject: [PATCH 5/7] make clippy happy --- wasm/light-client-wasm/src/lib.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/wasm/light-client-wasm/src/lib.rs b/wasm/light-client-wasm/src/lib.rs index 03811b8c..b08d3057 100644 --- a/wasm/light-client-wasm/src/lib.rs +++ b/wasm/light-client-wasm/src/lib.rs @@ -706,7 +706,7 @@ pub fn get_cells( return None; } } - return Some(key.to_vec()); + Some(key.to_vec()) }), limit, skip, @@ -755,11 +755,11 @@ pub fn get_cells( } debug!("get_cells last_key={:?}", last_key); - return Ok((Pagination { + Ok((Pagination { objects: cells, last_cursor: JsonBytes::from_vec(last_key), }) - .serialize(&SERIALIZER)?); + .serialize(&SERIALIZER)?) } #[wasm_bindgen] pub fn get_transactions( @@ -896,7 +896,7 @@ pub fn get_transactions( } } - return Some(key.to_vec()); + Some(key.to_vec()) }), limit * 10, // Start with higher limit for grouped mode skip, @@ -970,11 +970,11 @@ pub fn get_transactions( debug!("get_transactions (grouped) last_key={:?}", last_key); - return Ok((Pagination { + Ok((Pagination { objects: tx_with_cells.into_iter().map(Tx::Grouped).collect(), last_cursor: JsonBytes::from_vec(last_key), }) - .serialize(&SERIALIZER)?); + .serialize(&SERIALIZER)?) } else { let kvs: Vec<_> = storage.collect_iterator( from_key.clone(), @@ -1061,7 +1061,7 @@ pub fn get_transactions( } } - return Some(key.to_vec()); + Some(key.to_vec()) }), limit, skip, @@ -1116,11 +1116,11 @@ pub fn get_transactions( } debug!("get_transactions last_key={:?}", last_key); - return Ok((Pagination { + Ok((Pagination { objects: txs, last_cursor: JsonBytes::from_vec(last_key), }) - .serialize(&SERIALIZER)?); + .serialize(&SERIALIZER)?) } } #[wasm_bindgen] @@ -1250,7 +1250,7 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { } } - return Some(key.to_vec()); + Some(key.to_vec()) }), usize::MAX, skip, From 41237aecd97949cecb45bf7c733fbf8c886bda07 Mon Sep 17 00:00:00 2001 From: officeyutong Date: Wed, 27 Aug 2025 20:16:46 +0800 Subject: [PATCH 6/7] fix a dead lock in database implementation --- light-client-lib/src/storage/db/browser.rs | 7 +++++-- wasm/light-client-db-common/src/lib.rs | 1 + wasm/light-client-db-worker/src/lib.rs | 19 +++++++++++-------- wasm/light-client-wasm/src/lib.rs | 13 ++++++++++++- 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/light-client-lib/src/storage/db/browser.rs b/light-client-lib/src/storage/db/browser.rs index e886e5d5..8af572d6 100644 --- a/light-client-lib/src/storage/db/browser.rs +++ b/light-client-lib/src/storage/db/browser.rs @@ -225,6 +225,7 @@ impl CommunicationChannel { Atomics::wait(output_i32_arr, 0, OutputCommand::Waiting as i32).unwrap(); let output_cmd = OutputCommand::try_from(output_i32_arr.get_index(0)).unwrap(); output_i32_arr.set_index(0, 0); + log::trace!("Received output command: {:?}", output_cmd); match output_cmd { OutputCommand::OpenDatabaseResponse | OutputCommand::Waiting => unreachable!(), OutputCommand::RequestTakeWhile => { @@ -247,9 +248,10 @@ impl CommunicationChannel { let arg = read_command_payload::>(output_i32_arr, output_u8_arr)?; let result = filter_map.as_ref().unwrap()(&arg); - debug!( + log::trace!( "Received filter_map request with args {:?}, result {:?}", - arg, result + arg, + result ); write_command_with_payload( InputCommand::ResponseFilterMap as i32, @@ -257,6 +259,7 @@ impl CommunicationChannel { input_i32_arr, input_u8_arr, )?; + log::trace!("Result of RequestFilterMap written"); continue; } diff --git a/wasm/light-client-db-common/src/lib.rs b/wasm/light-client-db-common/src/lib.rs index e47cf3b1..a7b775b7 100644 --- a/wasm/light-client-db-common/src/lib.rs +++ b/wasm/light-client-db-common/src/lib.rs @@ -104,6 +104,7 @@ impl TryFrom for InputCommand { } #[repr(i32)] +#[derive(Debug)] /// Represent a 4-byte command which will be put in output buffer pub enum OutputCommand { /// Waiting for db worker to handle the command diff --git a/wasm/light-client-db-worker/src/lib.rs b/wasm/light-client-db-worker/src/lib.rs index 0522c986..1e9e09d1 100644 --- a/wasm/light-client-db-worker/src/lib.rs +++ b/wasm/light-client-db-worker/src/lib.rs @@ -69,6 +69,7 @@ pub async fn main_loop(log_level: &str) { .expect("Unable to wait for command"); // Clean it to avoid infinite loop input_i32_arr.set_index(0, InputCommand::Waiting as i32); + log::trace!("Received input command: {:?}", cmd); match cmd { InputCommand::OpenDatabase => { let database_name = @@ -121,7 +122,7 @@ pub async fn main_loop(log_level: &str) { |buf, store| { let buf = buf.to_vec(); input_i32_arr.set_index(0, InputCommand::Waiting as i32); - debug!("Invoking request filter_map with args {:?}", buf); + log::trace!("Invoking request filter_map with args {:?}", buf); write_command_with_payload( OutputCommand::RequestFilterMap as i32, buf, @@ -135,7 +136,7 @@ pub async fn main_loop(log_level: &str) { let output_u8_arr = output_u8_arr.clone(); async move { - loop { + let result = loop { let store = store.clone(); match wait_for_command_sync(&input_i32_arr, InputCommand::Waiting) .unwrap() @@ -167,7 +168,8 @@ pub async fn main_loop(log_level: &str) { Some(store), ) .await; - debug!("db command result at filter map: {:?}", db_result); + log::trace!("db command result at filter map: {:?}", db_result); + input_i32_arr.set_index(0, InputCommand::Waiting as i32); match db_result { Ok(o) => write_command_with_payload( OutputCommand::DbResponse as i32, @@ -184,7 +186,6 @@ pub async fn main_loop(log_level: &str) { ) .unwrap(), }; - input_i32_arr.set_index(0, InputCommand::Waiting as i32); } InputCommand::ResponseFilterMap => { let result = read_command_payload::>>( @@ -192,12 +193,14 @@ pub async fn main_loop(log_level: &str) { &input_u8_arr, ) .unwrap(); - debug!("Received filter map result {:?}", result); - input_i32_arr.set_index(0, InputCommand::Waiting as i32); - return result; + log::trace!("Received filter map result {:?}", result); + + break result; } } - } + }; + input_i32_arr.set_index(0, InputCommand::Waiting as i32); + result } }, None, diff --git a/wasm/light-client-wasm/src/lib.rs b/wasm/light-client-wasm/src/lib.rs index b08d3057..6aa000db 100644 --- a/wasm/light-client-wasm/src/lib.rs +++ b/wasm/light-client-wasm/src/lib.rs @@ -1151,11 +1151,14 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { ) = build_filter_options(search_key)?; let storage = STORAGE_WITH_DATA.get().unwrap().storage(); + log::trace!("get_cells_capacity: before entering collect iterator"); + let kvs: Vec<_> = storage.collect_iterator( from_key, direction, Box::new(move |key| key.starts_with(&prefix)), Box::new(move |key| { + log::trace!("At key {:?}", key); let value = storage.get(key).unwrap().unwrap(); let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash"); @@ -1195,6 +1198,7 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { .as_slice() .starts_with(prefix) { + log::trace!("break at {}", line!()); return None; } } @@ -1204,6 +1208,7 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { .as_slice() .starts_with(prefix) { + log::trace!("break at {}", line!()); return None; } } @@ -1215,6 +1220,7 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { ScriptType::Lock => { let script_len = extract_raw_data(&output.lock()).len(); if script_len < r0 || script_len > r1 { + log::trace!("break at {}", line!()); return None; } } @@ -1225,6 +1231,7 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { .map(|script| extract_raw_data(&script).len()) .unwrap_or_default(); if script_len < r0 || script_len > r1 { + log::trace!("break at {}", line!()); return None; } } @@ -1233,6 +1240,7 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { if let Some([r0, r1]) = filter_output_data_len_range { if output_data.len() < r0 || output_data.len() >= r1 { + log::trace!("break at {}", line!()); return None; } } @@ -1240,16 +1248,18 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { if let Some([r0, r1]) = filter_output_capacity_range { let capacity: core::Capacity = output.capacity().unpack(); if capacity < r0 || capacity >= r1 { + log::trace!("break at {}", line!()); return None; } } if let Some([r0, r1]) = filter_block_range { if block_number < r0 || block_number >= r1 { + log::trace!("break at {}", line!()); return None; } } - + log::trace!("Returning normally at {:?}", key); Some(key.to_vec()) }), usize::MAX, @@ -1286,6 +1296,7 @@ pub fn get_cells_capacity(search_key: JsValue) -> Result { .expect("snapshot get last state should be ok") .map(|data| packed::HeaderReader::from_slice_should_be_ok(&data[32..]).to_entity()) .expect("tip header should be inited"); + log::trace!("Get cells capacity done"); Ok((CellsCapacity { capacity: capacity.into(), block_hash: tip_header.calc_header_hash().unpack(), From ccbc2bdc2b183cfc98448a5038bd90db58f4eda3 Mon Sep 17 00:00:00 2001 From: officeyutong Date: Thu, 28 Aug 2025 14:19:03 +0800 Subject: [PATCH 7/7] make fmt happy --- wasm/light-client-db-worker/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/wasm/light-client-db-worker/src/lib.rs b/wasm/light-client-db-worker/src/lib.rs index 1e9e09d1..9605911d 100644 --- a/wasm/light-client-db-worker/src/lib.rs +++ b/wasm/light-client-db-worker/src/lib.rs @@ -168,7 +168,10 @@ pub async fn main_loop(log_level: &str) { Some(store), ) .await; - log::trace!("db command result at filter map: {:?}", db_result); + log::trace!( + "db command result at filter map: {:?}", + db_result + ); input_i32_arr.set_index(0, InputCommand::Waiting as i32); match db_result { Ok(o) => write_command_with_payload(