Skip to content

Commit 9b7fc5e

Browse files
New delta trino (#82)
* Add trino & delta * up * up * up * up * up * up * clippy fix * update * remove not needed * up * up * up * up * update * update * update * up * refactor * custom table name * improve error * refactor logic * refactor * naming * remove not needed serde trait * format * refactor * refactor error --------- Co-authored-by: quannguyen <[email protected]>
1 parent 1a71d06 commit 9b7fc5e

File tree

21 files changed

+810
-282
lines changed

21 files changed

+810
-282
lines changed

Cargo.lock

Lines changed: 199 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ scylla = "0.10.1"
3737
apollo-parser = "0.7.3"
3838
prometheus = "0.13.3"
3939
warp = "0.3.6"
40+
prusto = "0.5.1"
41+
tokio-retry = "0.3.0"
4042

4143
[dev-dependencies]
4244
async-std = { version = "1.12.0", features = ["attributes"] }

src/components/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ mod subgraph_filter;
88
pub use manifest_loader::ManifestLoader;
99
pub use progress_ctrl::ProgressCtrl;
1010
pub use serializer::Serializer;
11-
pub use source::Source;
11+
pub use source::BlockSource;
1212
pub use subgraph::Subgraph;
1313
pub use subgraph_filter::SubgraphFilter;

src/components/progress_ctrl/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::database::DatabaseAgent;
77
use crate::errors::ProgressCtrlError;
88
use crate::messages::SerializedDataMessage;
99

10+
#[derive(Clone)]
1011
pub struct ProgressCtrl {
1112
db: DatabaseAgent,
1213
recent_block_ptrs: Vec<BlockPtr>,
@@ -31,9 +32,15 @@ impl ProgressCtrl {
3132
Ok(this)
3233
}
3334

34-
fn get_min_start_block(&self) -> u64 {
35+
pub fn get_min_start_block(&self) -> u64 {
3536
let min_start_block = self.sources.iter().filter_map(|s| s.startBlock).min();
36-
min_start_block.unwrap_or(0)
37+
min_start_block.unwrap_or(0).max(
38+
self.recent_block_ptrs
39+
.last()
40+
.cloned()
41+
.map(|b| b.number + 1)
42+
.unwrap_or_default(),
43+
)
3744
}
3845

3946
pub async fn progress_check(

0 commit comments

Comments
 (0)