Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 93 additions & 47 deletions light-client-lib/src/storage/db/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
};
use wasm_bindgen::{prelude::wasm_bindgen, JsCast, JsValue};
use web_sys::js_sys::{Atomics, Int32Array, SharedArrayBuffer, Uint8Array};
enum CommandRequestWithTakeWhile {
enum CommandRequestWithTakeWhileAndFilterMap {
Read {
keys: Vec<Vec<u8>>,
},
Expand All @@ -57,6 +57,7 @@ enum CommandRequestWithTakeWhile {
start_key_bound: Vec<u8>,
order: CursorDirection,
take_while: Box<dyn Fn(&[u8]) -> bool + Send + 'static>,
filter_map: Box<dyn Fn(&[u8]) -> Option<Vec<u8>> + Send + 'static>,
limit: usize,
skip: usize,
},
Expand All @@ -65,6 +66,7 @@ enum CommandRequestWithTakeWhile {
start_key_bound: Vec<u8>,
order: CursorDirection,
take_while: Box<dyn Fn(&[u8]) -> bool + Send + 'static>,
filter_map: Box<dyn Fn(&[u8]) -> Option<Vec<u8>> + Send + 'static>,
limit: usize,
skip: usize,
},
Expand Down Expand Up @@ -146,7 +148,8 @@ impl CommunicationChannel {
),
OutputCommand::RequestTakeWhile
| OutputCommand::Waiting
| OutputCommand::DbResponse => {
| OutputCommand::DbResponse
| OutputCommand::RequestFilterMap => {
unreachable!()
}
}
Expand All @@ -156,18 +159,23 @@ impl CommunicationChannel {
/// cmd: The command
fn dispatch_database_command(
&self,
cmd: CommandRequestWithTakeWhile,
cmd: CommandRequestWithTakeWhileAndFilterMap,
) -> anyhow::Result<DbCommandResponse> {
let (new_cmd, take_while) = match cmd {
CommandRequestWithTakeWhile::Read { keys } => (DbCommandRequest::Read { keys }, None),
CommandRequestWithTakeWhile::Put { kvs } => (DbCommandRequest::Put { kvs }, None),
CommandRequestWithTakeWhile::Delete { keys } => {
(DbCommandRequest::Delete { keys }, None)
let (new_cmd, take_while, filter_map) = match cmd {
CommandRequestWithTakeWhileAndFilterMap::Read { keys } => {
(DbCommandRequest::Read { keys }, None, None)
}
CommandRequestWithTakeWhile::Iterator {
CommandRequestWithTakeWhileAndFilterMap::Put { kvs } => {
(DbCommandRequest::Put { kvs }, None, None)
}
CommandRequestWithTakeWhileAndFilterMap::Delete { keys } => {
(DbCommandRequest::Delete { keys }, None, None)
}
CommandRequestWithTakeWhileAndFilterMap::Iterator {
start_key_bound,
order,
take_while,
filter_map,
limit,
skip,
} => (
Expand All @@ -178,11 +186,13 @@ impl CommunicationChannel {
skip,
},
Some(take_while),
Some(filter_map),
),
CommandRequestWithTakeWhile::IteratorKey {
CommandRequestWithTakeWhileAndFilterMap::IteratorKey {
start_key_bound,
order,
take_while,
filter_map,
limit,
skip,
} => (
Expand All @@ -193,6 +203,7 @@ impl CommunicationChannel {
skip,
},
Some(take_while),
Some(filter_map),
),
};
debug!("Dispatching database command: {:?}", new_cmd);
Expand All @@ -214,6 +225,7 @@ impl CommunicationChannel {
Atomics::wait(output_i32_arr, 0, OutputCommand::Waiting as i32).unwrap();
let output_cmd = OutputCommand::try_from(output_i32_arr.get_index(0)).unwrap();
output_i32_arr.set_index(0, 0);
log::trace!("Received output command: {:?}", output_cmd);
match output_cmd {
OutputCommand::OpenDatabaseResponse | OutputCommand::Waiting => unreachable!(),
OutputCommand::RequestTakeWhile => {
Expand All @@ -232,6 +244,25 @@ impl CommunicationChannel {
)?;
continue;
}
OutputCommand::RequestFilterMap => {
let arg = read_command_payload::<Vec<u8>>(output_i32_arr, output_u8_arr)?;
let result = filter_map.as_ref().unwrap()(&arg);

log::trace!(
"Received filter_map request with args {:?}, result {:?}",
arg,
result
);
write_command_with_payload(
InputCommand::ResponseFilterMap as i32,
result,
input_i32_arr,
input_u8_arr,
)?;
log::trace!("Result of RequestFilterMap written");
continue;
}

OutputCommand::DbResponse => {
return read_command_payload::<DbCommandResponse>(
output_i32_arr,
Expand Down Expand Up @@ -297,7 +328,7 @@ impl Storage {
V: AsRef<[u8]>,
{
self.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Put {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Put {
kvs: vec![KV {
key: key.as_ref().to_vec(),
value: value.as_ref().to_vec(),
Expand All @@ -309,7 +340,7 @@ impl Storage {
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>> {
let values = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Read {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Read {
keys: vec![key.as_ref().to_vec()],
})
.map_err(|e| Error::Indexdb(format!("{:?}", e)))?;
Expand All @@ -327,7 +358,7 @@ impl Storage {
}
fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<()> {
self.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Delete {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Delete {
keys: vec![key.as_ref().to_vec()],
})
.map(|_| ())
Expand All @@ -336,19 +367,19 @@ impl Storage {
pub fn is_filter_scripts_empty(&self) -> bool {
let key_prefix = Key::Meta(FILTER_SCRIPTS_KEY).into_vec();

let value =
match self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::IteratorKey {
start_key_bound: key_prefix.clone(),
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)),
limit: 1,
skip: 0,
}) {
Ok(v) => v,
Err(_) => return false,
};
let value = match self.channel.dispatch_database_command(
CommandRequestWithTakeWhileAndFilterMap::IteratorKey {
start_key_bound: key_prefix.clone(),
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)),
filter_map: Box::new(|s| Some(s.to_vec())),
limit: 1,
skip: 0,
},
) {
Ok(v) => v,
Err(_) => return false,
};

if let DbCommandResponse::IteratorKey { keys } = value {
keys.is_empty()
Expand All @@ -361,10 +392,11 @@ impl Storage {
let key_prefix_clone = key_prefix.clone();
let value = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Iterator {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator {
start_key_bound: key_prefix_clone.clone(),
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix_clone)),
filter_map: Box::new(|s| Some(s.to_vec())),
limit: usize::MAX,
skip: 0,
})
Expand Down Expand Up @@ -409,15 +441,18 @@ impl Storage {
let key_prefix_clone = key_prefix.clone();
let remove_keys = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::IteratorKey {
start_key_bound: key_prefix_clone.clone(),
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| {
raw_key.starts_with(&key_prefix_clone)
}),
limit: usize::MAX,
skip: 0,
})
.dispatch_database_command(
CommandRequestWithTakeWhileAndFilterMap::IteratorKey {
start_key_bound: key_prefix_clone.clone(),
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| {
raw_key.starts_with(&key_prefix_clone)
}),
filter_map: Box::new(|s| Some(s.to_vec())),
limit: usize::MAX,
skip: 0,
},
)
.unwrap();
debug!("Received {:?}", remove_keys);
if let DbCommandResponse::IteratorKey { keys } = remove_keys {
Expand Down Expand Up @@ -498,10 +533,11 @@ impl Storage {

let value = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Iterator {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator {
start_key_bound: key_prefix.clone(),
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)),
filter_map: Box::new(|s| Some(s.to_vec())),
limit: usize::MAX,
skip: 0,
})
Expand Down Expand Up @@ -533,10 +569,11 @@ impl Storage {
let key_prefix_clone = key_prefix.clone();
let value = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Iterator {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator {
start_key_bound: key_prefix_clone.clone(),
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix_clone)),
filter_map: Box::new(|s| Some(s.to_vec())),
limit: usize::MAX,
skip: 0,
})
Expand Down Expand Up @@ -575,10 +612,11 @@ impl Storage {

let value = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::IteratorKey {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::IteratorKey {
start_key_bound: key_prefix.clone(),
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)),
filter_map: Box::new(|s| Some(s.to_vec())),
limit: usize::MAX,
skip: 0,
})
Expand Down Expand Up @@ -610,10 +648,11 @@ impl Storage {

let value = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Iterator {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator {
start_key_bound: iter_from,
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix_clone)),
filter_map: Box::new(|s| Some(s.to_vec())),
limit: 1,
skip: 0,
})
Expand Down Expand Up @@ -653,10 +692,11 @@ impl Storage {

let value = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Iterator {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator {
start_key_bound: start_key,
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)),
filter_map: Box::new(|s| Some(s.to_vec())),
limit,
skip: 0,
})
Expand All @@ -677,10 +717,11 @@ impl Storage {

let value = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Iterator {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator {
start_key_bound: key_prefix.clone(),
order: CursorDirection::NextUnique,
take_while: Box::new(move |raw_key: &[u8]| raw_key.starts_with(&key_prefix)),
filter_map: Box::new(|s| Some(s.to_vec())),
limit: usize::MAX,
skip: 0,
})
Expand Down Expand Up @@ -722,7 +763,7 @@ impl Storage {

let value = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Iterator {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator {
start_key_bound: key_prefix.clone(),
order: CursorDirection::PrevUnique,
take_while: Box::new(move |raw_key: &[u8]| {
Expand All @@ -733,6 +774,7 @@ impl Storage {
.expect("stored BlockNumber"),
) >= to_number
}),
filter_map: Box::new(|s| Some(s.to_vec())),
limit: usize::MAX,
skip: 0,
})
Expand Down Expand Up @@ -901,22 +943,24 @@ impl Batch {

fn delete_many(&mut self, keys: Vec<Vec<u8>>) -> Result<()> {
self.comm_arrays
.dispatch_database_command(CommandRequestWithTakeWhile::Delete { keys })
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Delete { keys })
.map(|_| ())
.map_err(|e| Error::Indexdb(format!("{:?}", e)))
}

fn commit(self) -> Result<()> {
if !self.add.is_empty() {
self.comm_arrays
.dispatch_database_command(CommandRequestWithTakeWhile::Put { kvs: self.add })
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Put {
kvs: self.add,
})
.map(|_| ())
.map_err(|e| Error::Indexdb(format!("{:?}", e)))?;
}

if !self.delete.is_empty() {
self.comm_arrays
.dispatch_database_command(CommandRequestWithTakeWhile::Delete {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Delete {
keys: self.delete,
})
.map(|_| ())
Expand All @@ -934,15 +978,17 @@ impl Storage {
start_key_bound: Vec<u8>,
order: CursorDirection,
take_while: Box<dyn Fn(&[u8]) -> bool + Send + 'static>,
filter_map: Box<dyn Fn(&[u8]) -> Option<Vec<u8>> + Send + 'static>,
limit: usize,
skip: usize,
) -> Vec<KV> {
let value = self
.channel
.dispatch_database_command(CommandRequestWithTakeWhile::Iterator {
.dispatch_database_command(CommandRequestWithTakeWhileAndFilterMap::Iterator {
start_key_bound,
order,
take_while,
filter_map,
limit,
skip,
})
Expand Down
Loading