Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions light-client-lib/src/storage/db/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,13 @@ impl CommunicationChannel {
loop {
Atomics::wait(output_i32_arr, 0, OutputCommand::Waiting as i32).unwrap();
let output_cmd = OutputCommand::try_from(output_i32_arr.get_index(0)).unwrap();
output_i32_arr.set_index(0, 0);
output_i32_arr.set_index(0, OutputCommand::Waiting as i32);
log::trace!("Received output command: {:?}", output_cmd);
match output_cmd {
OutputCommand::OpenDatabaseResponse | OutputCommand::Waiting => unreachable!(),
s @ (OutputCommand::OpenDatabaseResponse | OutputCommand::Waiting) => {
log::warn!("Unreachable at light-client-lib: {:?}", s);
continue;
}
OutputCommand::RequestTakeWhile => {
let arg = read_command_payload::<Vec<u8>>(output_i32_arr, output_u8_arr)?;
let ok = take_while.as_ref().unwrap()(&arg);
Expand Down Expand Up @@ -264,10 +267,9 @@ impl CommunicationChannel {
}

OutputCommand::DbResponse => {
return read_command_payload::<DbCommandResponse>(
output_i32_arr,
output_u8_arr,
);
let result =
read_command_payload::<DbCommandResponse>(output_i32_arr, output_u8_arr);
return result;
}
OutputCommand::Error => {
let payload = read_command_payload::<String>(output_i32_arr, output_u8_arr)?;
Expand Down
13 changes: 8 additions & 5 deletions wasm/light-client-db-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,14 @@ pub async fn main_loop(log_level: &str) {
match wait_for_command_sync(&input_i32_arr, InputCommand::Waiting)
.unwrap()
{
InputCommand::Waiting
s @ (InputCommand::Waiting
| InputCommand::OpenDatabase
| InputCommand::Shutdown
| InputCommand::ResponseTakeWhile => {
| InputCommand::ResponseTakeWhile) => {
log::warn!(
"Unreachable branch at light-client-db-worker: {:?}",
s
);
unreachable!()
}
// Allow calling other db requests in filter map call
Expand Down Expand Up @@ -228,9 +232,8 @@ pub async fn main_loop(log_level: &str) {
};
}
InputCommand::Shutdown => break,
InputCommand::Waiting
| InputCommand::ResponseTakeWhile
| InputCommand::ResponseFilterMap => unreachable!(),
InputCommand::Waiting => continue,
InputCommand::ResponseTakeWhile | InputCommand::ResponseFilterMap => unreachable!(),
}
}
info!("Db worker main loop exited");
Expand Down
138 changes: 57 additions & 81 deletions wasm/light-client-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,13 +818,34 @@ pub fn get_transactions(
let storage = STORAGE_WITH_DATA.get().unwrap().storage();

if search_key.group_by_transaction.unwrap_or_default() {
let kvs: Vec<_> = storage.collect_iterator(
from_key.clone(),
let prefix_cloned = prefix.clone();
let mut kvs: Vec<_> = storage.collect_iterator(
from_key,
direction,
Box::new(move |key| key.starts_with(&prefix)),
Box::new(move |key| {
let value = storage.get(key).unwrap().unwrap();
debug!("get transactions iterator at {:?} {:?}", key, value);
Box::new(move |key| key.starts_with(&prefix_cloned)),
Box::new(|key| Some(key.to_vec())),
100,
skip,
);
let mut tx_with_cells: Vec<TxWithCells> = Vec::new();
let mut last_key = Vec::new();

'outer: while !kvs.is_empty() {
for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) {
let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash");
if tx_with_cells.len() == limit
&& tx_with_cells.last_mut().unwrap().transaction.hash != tx_hash.unpack()
{
break 'outer;
}
last_key = key.to_vec();
let tx = packed::Transaction::from_slice(
&storage
.get(Key::TxHash(&tx_hash).into_vec())
.expect("get tx should be OK")
.expect("stored tx")[12..],
)
.expect("from stored tx slice should be OK");

let block_number = u64::from_be_bytes(
key[key.len() - 17..key.len() - 9]
Expand Down Expand Up @@ -884,92 +905,47 @@ pub fn get_transactions(
};

if !filter_script_matched {
debug!("skipped at {}", line!());
return None;
continue;
}
}

if let Some([r0, r1]) = filter_block_range {
if block_number < r0 || block_number >= r1 {
debug!("skipped at {}", line!());
return None;
continue;
}
}

Some(key.to_vec())
}),
limit * 10, // Start with higher limit for grouped mode
skip,
);

let mut tx_with_cells: Vec<TxWithCells> = Vec::new();
let mut last_key = Vec::new();

for (key, value) in kvs.into_iter().map(|kv| (kv.key, kv.value)) {
let tx_hash = packed::Byte32::from_slice(&value).expect("stored tx hash");
if tx_with_cells.len() == limit
&& tx_with_cells.last_mut().unwrap().transaction.hash != tx_hash.unpack()
{
break;
let last_tx_hash_is_same = tx_with_cells
.last_mut()
.map(|last| {
if last.transaction.hash == tx_hash.unpack() {
last.cells.push((io_type.clone(), io_index.into()));
true
} else {
false
}
})
.unwrap_or_default();

if !last_tx_hash_is_same {
tx_with_cells.push(TxWithCells {
transaction: tx.into_view().into(),
block_number: block_number.into(),
tx_index: tx_index.into(),
cells: vec![(io_type, io_index.into())],
});
}
}
last_key = key.to_vec();
let tx = packed::Transaction::from_slice(
&storage
.get(Key::TxHash(&tx_hash).into_vec())
.expect("get tx should be OK")
.expect("stored tx")[12..],
)
.expect("from stored tx slice should be OK");

let block_number = u64::from_be_bytes(
key[key.len() - 17..key.len() - 9]
.try_into()
.expect("stored block_number"),
);
let tx_index = u32::from_be_bytes(
key[key.len() - 9..key.len() - 5]
.try_into()
.expect("stored tx_index"),
);
let io_index = u32::from_be_bytes(
key[key.len() - 5..key.len() - 1]
.try_into()
.expect("stored io_index"),
let prefix_cloned = prefix.clone();
kvs = storage.collect_iterator(
last_key.clone(),
direction,
Box::new(move |key| key.starts_with(&prefix_cloned)),
Box::new(|key| Some(key.to_vec())),
100,
1,
);
let io_type = if *key.last().expect("stored io_type") == 0 {
CellType::Input
} else {
CellType::Output
};

let last_tx_hash_is_same = tx_with_cells
.last_mut()
.map(|last| {
if last.transaction.hash == tx_hash.unpack() {
last.cells.push((io_type.clone(), io_index.into()));
true
} else {
false
}
})
.unwrap_or_default();

if !last_tx_hash_is_same {
tx_with_cells.push(TxWithCells {
transaction: tx.into_view().into(),
block_number: block_number.into(),
tx_index: tx_index.into(),
cells: vec![(io_type, io_index.into())],
});
}

if tx_with_cells.len() >= limit {
break;
}
}

debug!("get_transactions (grouped) last_key={:?}", last_key);

Ok((Pagination {
objects: tx_with_cells.into_iter().map(Tx::Grouped).collect(),
last_cursor: JsonBytes::from_vec(last_key),
Expand Down
Loading