Skip to content

Commit 82ad436

Browse files
committed
feat: add cursor for postgres and rocksdb
1 parent 8b06023 commit 82ad436

File tree

7 files changed

+246
-13
lines changed

7 files changed

+246
-13
lines changed

Diff for: src/store/memstore/cursor.rs

-2
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ use super::MemStore;
55
use crate::store::{Beacon, BeaconCursor, StorageError, Store};
66

77
pub struct MemDbCursor<'a> {
8-
// beacon_id is the id of the beacon
98
pos: usize,
10-
// db is the database connection
119
store: &'a MemStore,
1210
}
1311

Diff for: src/store/memstore/mod.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,10 @@ mod tests {
158158
fn test_memstore() {
159159
let rt = tokio::runtime::Runtime::new().unwrap();
160160
rt.block_on(async {
161-
let store = MemStore::new(true);
161+
let mut store = MemStore::new(true);
162162
test_store(&store).await;
163163

164-
drop(store);
165-
166-
let store = MemStore::new(false);
164+
store.requires_previous = false;
167165

168166
test_cursor(&store).await;
169167
});

Diff for: src/store/postgres/cursor.rs

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use tonic::async_trait;
2+
3+
use super::PgStore;
4+
5+
use crate::store::{Beacon, BeaconCursor, StorageError, Store};
6+
7+
pub struct PgCursor<'a> {
8+
pos: usize,
9+
store: &'a PgStore,
10+
}
11+
12+
impl<'a> PgCursor<'a> {
13+
pub fn new(store: &'a PgStore) -> Self {
14+
Self { pos: 0, store }
15+
}
16+
}
17+
18+
#[async_trait]
19+
impl BeaconCursor for PgCursor<'_> {
20+
async fn first(&mut self) -> Result<Option<Beacon>, StorageError> {
21+
match self.store.first().await {
22+
Ok(beacon) => {
23+
self.pos = 0;
24+
Ok(Some(beacon))
25+
}
26+
Err(StorageError::NotFound) => Ok(None),
27+
Err(e) => Err(e),
28+
}
29+
}
30+
31+
async fn next(&mut self) -> Result<Option<Beacon>, StorageError> {
32+
match self.store.get_at_position(self.pos + 1).await {
33+
Ok(beacon) => {
34+
self.pos += 1;
35+
Ok(Some(beacon))
36+
}
37+
Err(StorageError::NotFound) => Ok(None),
38+
Err(e) => Err(e),
39+
}
40+
}
41+
42+
async fn seek(&mut self, round: u64) -> Result<Option<Beacon>, StorageError> {
43+
match self.store.get_beacon(round).await {
44+
Ok(beacon) => {
45+
self.pos = self.store.get_pos(beacon.round).await?;
46+
Ok(Some(beacon))
47+
}
48+
Err(StorageError::NotFound) => Ok(None),
49+
Err(e) => Err(e),
50+
}
51+
}
52+
53+
async fn last(&mut self) -> Result<Option<Beacon>, StorageError> {
54+
match self.store.last().await {
55+
Ok(beacon) => {
56+
self.pos = self.store.len().await? - 1;
57+
Ok(Some(beacon))
58+
}
59+
Err(StorageError::NotFound) => Ok(None),
60+
Err(e) => Err(e),
61+
}
62+
}
63+
}

Diff for: src/store/postgres/mod.rs

+48-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use super::{Beacon, StorageError, Store};
2+
use cursor::PgCursor;
23
use sqlx::{PgPool, Row};
34
use tonic::async_trait;
45

56
#[cfg(test)]
67
mod db_container;
78

9+
pub mod cursor;
10+
811
pub struct PgStore {
912
// db is the database connection
1013
db: PgPool,
@@ -62,10 +65,44 @@ impl PgStore {
6265

6366
Ok(beacon)
6467
}
68+
69+
async fn get_at_position(&self, pos: usize) -> Result<Beacon, StorageError> {
70+
let row = sqlx::query("SELECT * FROM beacon_details WHERE beacon_id = $1 ORDER BY round ASC LIMIT 1 OFFSET $2")
71+
.bind(self.beacon_id)
72+
.bind(pos as i64)
73+
.fetch_one(&self.db)
74+
.await
75+
.map_err(|e| e.into())?;
76+
let mut beacon = Beacon {
77+
previous_sig: vec![],
78+
round: row.try_get::<i64, _>("round").map_err(|e| e.into())? as u64,
79+
signature: row.try_get("signature").map_err(|e| e.into())?,
80+
};
81+
82+
if beacon.round > 0 && self.requires_previous {
83+
let prev = self.get_beacon(beacon.round - 1).await?;
84+
beacon.previous_sig = prev.signature;
85+
}
86+
87+
Ok(beacon)
88+
}
89+
90+
async fn get_pos(&self, round: u64) -> Result<usize, StorageError> {
91+
let count =
92+
sqlx::query("SELECT COUNT(*) FROM beacon_details WHERE beacon_id = $1 AND round < $2")
93+
.bind(self.beacon_id)
94+
.bind(round as i64)
95+
.fetch_one(&self.db)
96+
.await
97+
.map_err(|e| e.into())?
98+
.get::<i64, _>("count");
99+
Ok(count as usize)
100+
}
65101
}
66102

67103
#[async_trait]
68104
impl Store for PgStore {
105+
type Cursor<'a> = PgCursor<'a>;
69106
async fn len(&self) -> Result<usize, StorageError> {
70107
let count = sqlx::query("SELECT COUNT(*) FROM beacon_details WHERE beacon_id = $1")
71108
.bind(self.beacon_id)
@@ -172,6 +209,10 @@ impl Store for PgStore {
172209
.map_err(|e| e.into())?;
173210
Ok(())
174211
}
212+
213+
fn cursor(&self) -> PgCursor<'_> {
214+
PgCursor::new(self)
215+
}
175216
}
176217

177218
#[allow(clippy::from_over_into)]
@@ -189,7 +230,7 @@ impl Into<StorageError> for sqlx::Error {
189230
mod tests {
190231
use sqlx::postgres::PgPoolOptions;
191232

192-
use crate::store::testing::test_store;
233+
use crate::store::testing::{test_cursor, test_store};
193234

194235
use super::*;
195236

@@ -217,6 +258,7 @@ mod tests {
217258
#[test]
218259
fn test_pg() {
219260
let rt = tokio::runtime::Runtime::new().unwrap();
261+
220262
rt.block_on(async {
221263
let (pg_id, pg_ip) = db_container::start_pg().await.unwrap();
222264
let testdata = PgTestData { pg_id, pg_ip };
@@ -230,11 +272,15 @@ mod tests {
230272
.await
231273
.unwrap();
232274

233-
let store = PgStore::new(pool, true, "beacon_name")
275+
let mut store = PgStore::new(pool, true, "beacon_name")
234276
.await
235277
.expect("Failed to create store");
236278

237279
test_store(&store).await;
280+
281+
store.requires_previous = false;
282+
283+
test_cursor(&store).await;
238284
});
239285
}
240286
}

Diff for: src/store/rocksdb/cursor.rs

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use tonic::async_trait;
2+
3+
use super::RocksStore;
4+
5+
use crate::store::{Beacon, BeaconCursor, StorageError, Store};
6+
7+
pub struct RocksCursor<'a> {
8+
pos: usize,
9+
store: &'a RocksStore,
10+
}
11+
12+
impl<'a> RocksCursor<'a> {
13+
pub fn new(store: &'a RocksStore) -> Self {
14+
Self { pos: 0, store }
15+
}
16+
}
17+
18+
#[async_trait]
19+
impl BeaconCursor for RocksCursor<'_> {
20+
async fn first(&mut self) -> Result<Option<Beacon>, StorageError> {
21+
match self.store.first().await {
22+
Ok(beacon) => {
23+
self.pos = 0;
24+
Ok(Some(beacon))
25+
}
26+
Err(StorageError::NotFound) => Ok(None),
27+
Err(e) => Err(e),
28+
}
29+
}
30+
31+
async fn next(&mut self) -> Result<Option<Beacon>, StorageError> {
32+
match self.store.get_at_position(self.pos + 1).await {
33+
Ok(beacon) => {
34+
self.pos += 1;
35+
Ok(Some(beacon))
36+
}
37+
Err(StorageError::NotFound) => Ok(None),
38+
Err(e) => Err(e),
39+
}
40+
}
41+
42+
async fn seek(&mut self, round: u64) -> Result<Option<Beacon>, StorageError> {
43+
match self.store.get_beacon(round).await {
44+
Ok(beacon) => {
45+
self.pos = self.store.get_pos(beacon.round).await?;
46+
Ok(Some(beacon))
47+
}
48+
Err(StorageError::NotFound) => Ok(None),
49+
Err(e) => Err(e),
50+
}
51+
}
52+
53+
async fn last(&mut self) -> Result<Option<Beacon>, StorageError> {
54+
match self.store.last().await {
55+
Ok(beacon) => {
56+
self.pos = self.store.len().await? - 1;
57+
Ok(Some(beacon))
58+
}
59+
Err(StorageError::NotFound) => Ok(None),
60+
Err(e) => Err(e),
61+
}
62+
}
63+
}

Diff for: src/store/rocksdb/mod.rs

+54-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
use std::path::Path;
22
use std::path::PathBuf;
33

4+
use cursor::RocksCursor;
45
use rocksdb::Options;
56
use tonic::async_trait;
67

78
use super::{Beacon, StorageError, Store};
89

10+
pub mod cursor;
11+
912
pub fn open_rocksdb<P: AsRef<Path>>(path: P) -> Result<rocksdb::DB, StorageError> {
1013
let mut option = Options::default();
1114
option.create_if_missing(true);
@@ -42,6 +45,10 @@ impl RocksStore {
4245
round.to_be_bytes()
4346
}
4447

48+
fn decode_key(&self, key: &[u8]) -> u64 {
49+
u64::from_be_bytes(key[..8].try_into().unwrap())
50+
}
51+
4552
async fn get_beacon(&self, round: u64) -> Result<Beacon, StorageError> {
4653
let signature = self.db.get(self.key(round)).map_err(|e| e.into())?;
4754

@@ -58,10 +65,48 @@ impl RocksStore {
5865

5966
Ok(beacon)
6067
}
68+
69+
async fn get_at_position(&self, pos: usize) -> Result<Beacon, StorageError> {
70+
let (round, signature) = self
71+
.db
72+
.iterator(rocksdb::IteratorMode::Start)
73+
.nth(pos)
74+
.ok_or(StorageError::NotFound)?
75+
.map_err(|e| e.into())?;
76+
77+
let mut beacon = Beacon {
78+
round: self.decode_key(&round),
79+
signature: signature.to_vec(),
80+
previous_sig: vec![],
81+
};
82+
83+
if beacon.round > 0 && self.requires_previous {
84+
let prev = self.get_beacon(beacon.round - 1).await?;
85+
beacon.previous_sig = prev.signature;
86+
}
87+
88+
Ok(beacon)
89+
}
90+
91+
async fn get_pos(&self, round: u64) -> Result<usize, StorageError> {
92+
let mut pos = 0;
93+
94+
for data in self.db.iterator(rocksdb::IteratorMode::Start) {
95+
let (k, _) = data.map_err(|e| e.into())?;
96+
let r = u64::from_be_bytes(k[..8].try_into().unwrap());
97+
if r == round {
98+
break;
99+
}
100+
pos += 1;
101+
}
102+
103+
Ok(pos)
104+
}
61105
}
62106

63107
#[async_trait]
64108
impl Store for RocksStore {
109+
type Cursor<'a> = RocksCursor<'a>;
65110
async fn len(&self) -> Result<usize, StorageError> {
66111
let res = self.db.iterator(rocksdb::IteratorMode::End).count();
67112
Ok(res)
@@ -85,7 +130,7 @@ impl Store for RocksStore {
85130
None => return Err(StorageError::NotFound),
86131
};
87132

88-
let round = u64::from_be_bytes(key[..8].try_into().unwrap());
133+
let round = self.decode_key(&key);
89134

90135
let mut beacon = Beacon {
91136
round,
@@ -148,8 +193,8 @@ impl Store for RocksStore {
148193
Ok(())
149194
}
150195

151-
async fn cursor(&self) -> Result<Box<dyn BeaconCursor>, StorageError> {
152-
Ok(Box::new(MemDbCursor::new(self)))
196+
fn cursor(&self) -> RocksCursor<'_> {
197+
RocksCursor::new(self)
153198
}
154199
}
155200

@@ -172,7 +217,7 @@ mod tests {
172217

173218
use tempfile::tempdir;
174219

175-
use crate::store::testing::test_store;
220+
use crate::store::testing::{test_cursor, test_store};
176221

177222
use super::*;
178223

@@ -183,11 +228,15 @@ mod tests {
183228
let tmp_dir = tempdir().unwrap();
184229
let path = tmp_dir.path();
185230

186-
let store = RocksStore::new(path.into(), true, "beacon_name".to_owned())
231+
let mut store = RocksStore::new(path.into(), true, "beacon_name".to_owned())
187232
.expect("Failed to create store");
188233

189234
test_store(&store).await;
190235

236+
store.requires_previous = false;
237+
238+
test_cursor(&store).await;
239+
191240
tmp_dir.close().unwrap();
192241
});
193242
}

0 commit comments

Comments
 (0)