Skip to content

Commit b92fc3b

Browse files
authored
create index for smart contract (#498)
* feat: add doc store * feat: add event processor * feat: remove chain id * feat: add get contract sync status * feat: add get sync status * feat: add get database api * feat: add get database * feat: remove ut * fix: resolve the code style error
1 parent 9b607de commit b92fc3b

13 files changed

+510
-44
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ members = [
1010
"src/storage",
1111
"src/node",
1212
"src/sdk",
13+
"src/event",
1314
"src/cmd"
1415
]
1516

src/event/Cargo.toml

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "db3-event"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
ethers = { workspace = true , features=["ws"]}
8+
tracing = "0.1"
9+
db3-error={path="../error", version="0.1.0"}
10+
db3-crypto={path="../crypto", version="0.1.0"}
11+
serde_json = { workspace=true}
12+
db3-storage={path="../storage", version="0.1.0"}
13+
ethabi = { version = "18.0.0", default-features = false, features = ["full-serde", "rlp"] }
14+
hex = "0.4.3"
15+
[dev-dependencies]
16+
tokio = { version = "1.17.0", features = ["full"] }

src/event/src/event_processor.rs

+209
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
//
2+
// event_processor.rs
3+
// Copyright (C) 2023 db3.network Author imotai <[email protected]>
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//
17+
18+
use db3_crypto::db3_address::DB3Address;
19+
use db3_error::{DB3Error, Result};
20+
use db3_storage::db_store_v2::DBStoreV2;
21+
use ethabi::{Log as EthLog, Token};
22+
use ethers::abi::RawLog;
23+
use ethers::types::Address;
24+
use ethers::types::Filter;
25+
use ethers::{
26+
core::abi::Abi,
27+
providers::{Middleware, Provider, StreamExt, Ws},
28+
};
29+
use std::collections::HashSet;
30+
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
31+
use std::sync::Arc;
32+
use tracing::{info, warn};
33+
34+
#[derive(Debug)]
35+
pub struct EventProcessorConfig {
36+
pub evm_node_url: String,
37+
pub db_addr: String,
38+
pub abi: String,
39+
pub target_events: HashSet<String>,
40+
pub contract_addr: String,
41+
}
42+
43+
pub struct EventProcessor {
44+
config: EventProcessorConfig,
45+
provider: Arc<Provider<Ws>>,
46+
running: Arc<AtomicBool>,
47+
db_store: DBStoreV2,
48+
block_number: Arc<AtomicU64>,
49+
event_number: Arc<AtomicU64>,
50+
}
51+
52+
unsafe impl Sync for EventProcessor {}
53+
unsafe impl Send for EventProcessor {}
54+
impl EventProcessor {
55+
pub async fn new(config: EventProcessorConfig, db_store: DBStoreV2) -> Result<Self> {
56+
info!("new event processor with config {:?}", config);
57+
let provider = Provider::<Ws>::connect(&config.evm_node_url)
58+
.await
59+
.map_err(|e| DB3Error::StoreEventError(format!("{e}")))?;
60+
let provider_arc = Arc::new(provider);
61+
Ok(Self {
62+
config,
63+
provider: provider_arc,
64+
running: Arc::new(AtomicBool::new(false)),
65+
db_store,
66+
block_number: Arc::new(AtomicU64::new(0)),
67+
event_number: Arc::new(AtomicU64::new(0)),
68+
})
69+
}
70+
71+
pub fn close(&self) {
72+
self.running.store(false, Ordering::Relaxed);
73+
info!(
74+
"stop the event processor for db {}",
75+
self.config.db_addr.as_str()
76+
);
77+
}
78+
79+
pub fn get_config<'a>(&'a self) -> &'a EventProcessorConfig {
80+
&self.config
81+
}
82+
83+
pub fn get_block_number(&self) -> u64 {
84+
self.block_number.load(Ordering::Relaxed)
85+
}
86+
87+
pub fn get_event_number(&self) -> u64 {
88+
self.event_number.load(Ordering::Relaxed)
89+
}
90+
91+
pub async fn start(&self) -> Result<()> {
92+
let abi: Abi = serde_json::from_str(self.config.abi.as_str())
93+
.map_err(|e| DB3Error::StoreEventError(format!("{e}")))?;
94+
info!(
95+
"event processor for contract {}",
96+
self.config.contract_addr.as_str()
97+
);
98+
self.running
99+
.store(true, std::sync::atomic::Ordering::Relaxed);
100+
let address = self
101+
.config
102+
.contract_addr
103+
.parse::<Address>()
104+
.map_err(|e| DB3Error::StoreEventError(format!("{e}")))?;
105+
let db_addr = DB3Address::from_hex(self.config.db_addr.as_str())
106+
.map_err(|e| DB3Error::StoreEventError(format!("{e}")))?;
107+
108+
let filter = Filter::new().address(address);
109+
let mut stream = self
110+
.provider
111+
.subscribe_logs(&filter)
112+
.await
113+
.map_err(|e| DB3Error::StoreEventError(format!("{e}")))?;
114+
while let Some(log) = stream.next().await {
115+
if !self.running.load(Ordering::Relaxed) {
116+
info!(
117+
"stop event processor for contract {}",
118+
self.config.contract_addr.as_str()
119+
);
120+
break;
121+
}
122+
if let Some(number) = log.block_number {
123+
self.block_number.store(number.as_u64(), Ordering::Relaxed)
124+
}
125+
for e in abi.events() {
126+
// verify
127+
let event_signature = log
128+
.topics
129+
.get(0)
130+
.ok_or(DB3Error::StoreEventError(format!("")))?;
131+
132+
if event_signature != &e.signature() {
133+
continue;
134+
}
135+
if !self.config.target_events.contains(e.name.as_str()) {
136+
continue;
137+
}
138+
let raw_log = RawLog {
139+
topics: log.topics.clone(),
140+
data: log.data.to_vec(),
141+
};
142+
if let Ok(log_entry) = e.parse_log(raw_log) {
143+
let json_value = Self::log_to_doc(&log_entry);
144+
match serde_json::to_string(&json_value) {
145+
Ok(value) => {
146+
let values = vec![value.to_string()];
147+
if let Err(e) = self.db_store.add_docs(
148+
&db_addr,
149+
&DB3Address::ZERO,
150+
e.name.as_str(),
151+
&values,
152+
) {
153+
warn!("fail to write json doc {} for {e}", value.as_str());
154+
} else {
155+
self.event_number.fetch_add(1, Ordering::Relaxed);
156+
}
157+
}
158+
Err(e) => {
159+
warn!("fail to convert to json for {e}");
160+
}
161+
}
162+
break;
163+
}
164+
}
165+
}
166+
Ok(())
167+
}
168+
169+
fn log_to_doc(log: &EthLog) -> serde_json::Value {
170+
let mut doc = serde_json::Map::new();
171+
for log_param in &log.params {
172+
doc.insert(
173+
log_param.name.to_string(),
174+
Self::param_to_value(&log_param.value),
175+
);
176+
}
177+
serde_json::Value::Object(doc)
178+
}
179+
180+
fn param_to_value(param: &Token) -> serde_json::Value {
181+
match param {
182+
Token::Address(addr) => {
183+
serde_json::value::Value::String(format!("0x{}", hex::encode(addr.as_bytes())))
184+
}
185+
Token::String(value) => serde_json::value::Value::String(value.to_string()),
186+
Token::Uint(value) | Token::Int(value) => {
187+
serde_json::value::Value::String(value.to_string())
188+
}
189+
_ => todo!(),
190+
}
191+
}
192+
}
193+
194+
#[cfg(test)]
195+
mod tests {
196+
use super::*;
197+
#[tokio::test]
198+
async fn test_event_processor() {
199+
// let contract_abi: &str = r#"[{"constant":true,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"guy","type":"address"},{"name":"wad","type":"uint256"}],"name":"approve","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"totalSupply","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"src","type":"address"},{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transferFrom","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"wad","type":"uint256"}],"name":"withdraw","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"decimals","outputs":[{"name":"","type":"uint8"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"symbol","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transfer","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[],"name":"deposit","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"},{"name":"","type":"address"}],"name":"allowance","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"payable":true,"stateMutability":"payable","type":"fallback"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"guy","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Transfer","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Deposit","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Withdrawal","type":"event"}]"#;
200+
// let config = EventProcessorConfig {
201+
// evm_node_url: "wss://polygon-mainnet.g.alchemy.com/v2/EH9ZSJ0gS7a1DEIohAWMbhP33lK6qHj9"
202+
// .to_string(),
203+
// db_addr: "0xaaaaa".to_string(),
204+
// abi: contract_abi.to_string(),
205+
// target_events: Has,
206+
// contract_addr: "0x0d500B1d8E8eF31E21C99d1Db9A6444d3ADf1270".to_string(),
207+
// };
208+
}
209+
}

src/event/src/lib.rs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
//
2+
// lib.rs
3+
// Copyright (C) 2023 db3.network Author imotai <[email protected]>
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//
17+
18+
pub mod event_processor;

src/node/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ db3-error={path="../error", version="0.1.0"}
2424
db3-cmd={path="../cmd", version="0.1.0"}
2525
db3-sdk={path="../sdk", version="0.1.0"}
2626
db3-session={path="../session", version="0.1.0"}
27+
db3-event={path="../event", version="0.1.0"}
2728
ethers = { workspace = true }
2829
tonic = { workspace = true }
2930
tonic-web = { workspace = true }

0 commit comments

Comments
 (0)