Skip to content

Commit 13b5362

Browse files
authored
Merge pull request #76 from enviodev/dz/improve-get-events-join-logic
Improve Get Event join logic
2 parents fd505a4 + 0f67fb5 commit 13b5362

File tree

9 files changed

+146
-62
lines changed

9 files changed

+146
-62
lines changed

examples/call_decode_output/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ async fn main() -> anyhow::Result<()> {
7171
{
7272
if !results.is_empty() {
7373
let (balance, _) = results[0].as_uint().unwrap();
74-
println!("ADDRESS {} : {} DAI", address, balance);
74+
println!("ADDRESS {address} : {balance} DAI");
7575
}
7676
}
7777
}
7878
}
7979
}
8080
}
8181
Err(e) => {
82-
eprintln!("Error: {:?}", e);
82+
eprintln!("Error: {e:?}");
8383
}
8484
}
8585
}

examples/watch/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ async fn main() {
1515

1616
let height = client.get_height().await.unwrap();
1717

18-
println!("server height is {}", height);
18+
println!("server height is {height}");
1919

2020
// The query to run
2121
let mut query: Query = serde_json::from_value(serde_json::json!( {

hypersync-client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hypersync-client"
3-
version = "0.18.5"
3+
version = "0.19.0"
44
edition = "2021"
55
description = "client library for hypersync"
66
license = "MPL-2.0"

hypersync-client/src/column_mapping.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ fn binary_to_int_str_array(arr: &BinaryArray<i32>) -> Result<Utf8Array<i32>> {
204204

205205
fn binary_to_int_str(binary: &[u8]) -> Result<String> {
206206
let big_num = I256::try_from_be_slice(binary).context("failed to parse number into I256")?;
207-
Ok(format!("{}", big_num))
207+
Ok(format!("{big_num}"))
208208
}
209209

210210
fn map_to_f64(col: &dyn Array) -> Result<Float64Array> {

hypersync-client/src/lib.rs

Lines changed: 24 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ pub use hypersync_net_types as net_types;
2929
pub use hypersync_schema as schema;
3030

3131
use parse_response::parse_query_response;
32-
use simple_types::Event;
3332
use tokio::sync::mpsc;
3433
use types::{EventResponse, ResponseData};
3534
use url::Url;
@@ -41,6 +40,8 @@ pub use decode::Decoder;
4140
pub use decode_call::CallDecoder;
4241
pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
4342

43+
use crate::simple_types::InternalEventJoinStrategy;
44+
4445
type ArrowChunk = Chunk<Box<dyn Array>>;
4546

4647
/// Internal client to handle http requests and retries.
@@ -151,7 +152,8 @@ impl Client {
151152
) -> Result<EventResponse> {
152153
check_simple_stream_params(&config)?;
153154

154-
add_event_join_fields_to_selection(&mut query);
155+
let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
156+
event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
155157

156158
let mut recv = stream::stream_arrow(self, query, config)
157159
.await
@@ -165,9 +167,9 @@ impl Client {
165167
while let Some(res) = recv.recv().await {
166168
let res = res.context("get response")?;
167169
let res: QueryResponse = QueryResponse::from(&res);
168-
let events: Vec<Event> = res.data.into();
170+
let events = event_join_strategy.join_from_response_data(res.data);
169171

170-
data.push(events);
172+
data.extend(events);
171173

172174
archive_height = res.archive_height;
173175
next_block = res.next_block;
@@ -306,10 +308,9 @@ impl Client {
306308
Ok(res) => return Ok(res),
307309
Err(e) => {
308310
log::error!(
309-
"failed to get chain_id from server, retrying... The error was: {:?}",
310-
e
311+
"failed to get chain_id from server, retrying... The error was: {e:?}"
311312
);
312-
err = err.context(format!("{:?}", e));
313+
err = err.context(format!("{e:?}"));
313314
}
314315
}
315316

@@ -338,10 +339,9 @@ impl Client {
338339
Ok(res) => return Ok(res),
339340
Err(e) => {
340341
log::error!(
341-
"failed to get height from server, retrying... The error was: {:?}",
342-
e
342+
"failed to get height from server, retrying... The error was: {e:?}"
343343
);
344-
err = err.context(format!("{:?}", e));
344+
err = err.context(format!("{e:?}"));
345345
}
346346
}
347347

@@ -374,9 +374,13 @@ impl Client {
374374
/// Add block, transaction and log fields selection to the query, executes it with retries
375375
/// and returns the response.
376376
pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
377-
add_event_join_fields_to_selection(&mut query);
377+
let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
378+
event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
378379
let arrow_response = self.get_arrow(&query).await.context("get data")?;
379-
Ok(EventResponse::from(&arrow_response))
380+
Ok(EventResponse::from_arrow_response(
381+
&arrow_response,
382+
&event_join_strategy,
383+
))
380384
}
381385

382386
/// Executes query once and returns the result in (Arrow, size) format.
@@ -430,10 +434,9 @@ impl Client {
430434
Ok(res) => return Ok(res),
431435
Err(e) => {
432436
log::error!(
433-
"failed to get arrow data from server, retrying... The error was: {:?}",
434-
e
437+
"failed to get arrow data from server, retrying... The error was: {e:?}"
435438
);
436-
err = err.context(format!("{:?}", e));
439+
err = err.context(format!("{e:?}"));
437440
}
438441
}
439442

@@ -493,7 +496,9 @@ impl Client {
493496
) -> Result<mpsc::Receiver<Result<EventResponse>>> {
494497
check_simple_stream_params(&config)?;
495498

496-
add_event_join_fields_to_selection(&mut query);
499+
let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
500+
501+
event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
497502

498503
let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
499504
mpsc::channel(config.concurrency.unwrap_or(10));
@@ -507,7 +512,9 @@ impl Client {
507512
while let Some(resp) = inner_rx.recv().await {
508513
let is_err = resp.is_err();
509514
if tx
510-
.send(resp.map(|r| EventResponse::from(&r)))
515+
.send(
516+
resp.map(|r| EventResponse::from_arrow_response(&r, &event_join_strategy)),
517+
)
511518
.await
512519
.is_err()
513520
|| is_err
@@ -551,29 +558,3 @@ fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
551558

552559
Ok(())
553560
}
554-
555-
fn add_event_join_fields_to_selection(query: &mut Query) {
556-
// Field lists for implementing event based API, these fields are used for joining
557-
// so they should always be added to the field selection.
558-
const BLOCK_JOIN_FIELDS: &[&str] = &["number"];
559-
const TX_JOIN_FIELDS: &[&str] = &["hash"];
560-
const LOG_JOIN_FIELDS: &[&str] = &["transaction_hash", "block_number"];
561-
562-
if !query.field_selection.block.is_empty() {
563-
for field in BLOCK_JOIN_FIELDS.iter() {
564-
query.field_selection.block.insert(field.to_string());
565-
}
566-
}
567-
568-
if !query.field_selection.transaction.is_empty() {
569-
for field in TX_JOIN_FIELDS.iter() {
570-
query.field_selection.transaction.insert(field.to_string());
571-
}
572-
}
573-
574-
if !query.field_selection.log.is_empty() {
575-
for field in LOG_JOIN_FIELDS.iter() {
576-
query.field_selection.log.insert(field.to_string());
577-
}
578-
}
579-
}

hypersync-client/src/parquet_out.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ fn spawn_writer(path: PathBuf) -> Result<(mpsc::Sender<ArrowBatch>, JoinHandle<R
173173
match run_writer(rx, path).await {
174174
Ok(v) => Ok(v),
175175
Err(e) => {
176-
log::error!("failed to run parquet writer: {:?}", e);
176+
log::error!("failed to run parquet writer: {e:?}");
177177
Err(e)
178178
}
179179
}

hypersync-client/src/simple_types.rs

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use hypersync_format::{
66
AccessList, Address, Authorization, BlockNumber, BloomFilter, Data, Hash, LogArgument,
77
LogIndex, Nonce, Quantity, TransactionIndex, TransactionStatus, TransactionType, Withdrawal,
88
};
9+
use hypersync_net_types::FieldSelection;
910
use nohash_hasher::IntMap;
1011
use serde::{Deserialize, Serialize};
1112
use xxhash_rust::xxh3::Xxh3Builder;
@@ -23,8 +24,91 @@ pub struct Event {
2324
pub log: Log,
2425
}
2526

26-
impl From<ResponseData> for Vec<Event> {
27-
fn from(data: ResponseData) -> Self {
27+
// Field lists for implementing event based API, these fields are used for joining
28+
// so they should always be added to the field selection.
29+
const BLOCK_JOIN_FIELD: &str = "number";
30+
const TX_JOIN_FIELD: &str = "hash";
31+
const LOG_JOIN_FIELD_WITH_TX: &str = "transaction_hash";
32+
const LOG_JOIN_FIELD_WITH_BLOCK: &str = "block_number";
33+
34+
enum InternalJoinStrategy {
35+
NotSelected,
36+
OnlyLogJoinField,
37+
FullJoin,
38+
}
39+
40+
/// Internal event join strategy for determining how to join blocks and transactions with logs
41+
pub(crate) struct InternalEventJoinStrategy {
42+
block: InternalJoinStrategy,
43+
transaction: InternalJoinStrategy,
44+
}
45+
46+
impl From<&FieldSelection> for InternalEventJoinStrategy {
47+
fn from(field_selection: &FieldSelection) -> Self {
48+
let block_fields_num = field_selection.block.len();
49+
let transaction_fields_num = field_selection.transaction.len();
50+
51+
Self {
52+
block: if block_fields_num == 0 {
53+
InternalJoinStrategy::NotSelected
54+
} else if block_fields_num == 1 && field_selection.block.contains(BLOCK_JOIN_FIELD) {
55+
InternalJoinStrategy::OnlyLogJoinField
56+
} else {
57+
InternalJoinStrategy::FullJoin
58+
},
59+
transaction: if transaction_fields_num == 0 {
60+
InternalJoinStrategy::NotSelected
61+
} else if transaction_fields_num == 1
62+
&& field_selection.transaction.contains(TX_JOIN_FIELD)
63+
{
64+
InternalJoinStrategy::OnlyLogJoinField
65+
} else {
66+
InternalJoinStrategy::FullJoin
67+
},
68+
}
69+
}
70+
}
71+
72+
impl InternalEventJoinStrategy {
73+
/// Add join fields to field selection based on the event join strategy
74+
pub(crate) fn add_join_fields_to_selection(&self, field_selection: &mut FieldSelection) {
75+
match self.block {
76+
InternalJoinStrategy::NotSelected => (),
77+
InternalJoinStrategy::OnlyLogJoinField => {
78+
field_selection
79+
.log
80+
.insert(LOG_JOIN_FIELD_WITH_BLOCK.to_string());
81+
field_selection.block.remove(BLOCK_JOIN_FIELD);
82+
}
83+
InternalJoinStrategy::FullJoin => {
84+
field_selection
85+
.log
86+
.insert(LOG_JOIN_FIELD_WITH_BLOCK.to_string());
87+
field_selection.block.insert(BLOCK_JOIN_FIELD.to_string());
88+
}
89+
}
90+
91+
match self.transaction {
92+
InternalJoinStrategy::NotSelected => (),
93+
InternalJoinStrategy::OnlyLogJoinField => {
94+
field_selection
95+
.log
96+
.insert(LOG_JOIN_FIELD_WITH_TX.to_string());
97+
field_selection.transaction.remove(TX_JOIN_FIELD);
98+
}
99+
InternalJoinStrategy::FullJoin => {
100+
field_selection
101+
.log
102+
.insert(LOG_JOIN_FIELD_WITH_TX.to_string());
103+
field_selection
104+
.transaction
105+
.insert(TX_JOIN_FIELD.to_string());
106+
}
107+
}
108+
}
109+
110+
/// Join response data into events based on the event join strategy
111+
pub(crate) fn join_from_response_data(&self, data: ResponseData) -> Vec<Event> {
28112
let blocks = data
29113
.blocks
30114
.into_iter()
@@ -48,10 +132,26 @@ impl From<ResponseData> for Vec<Event> {
48132
.into_iter()
49133
.flat_map(|logs| {
50134
logs.into_iter().map(|log| {
51-
let block = blocks.get(&log.block_number.unwrap().into()).cloned();
52-
let transaction = transactions
53-
.get(log.transaction_hash.as_ref().unwrap())
54-
.cloned();
135+
let block = match self.block {
136+
InternalJoinStrategy::NotSelected => None,
137+
InternalJoinStrategy::OnlyLogJoinField => Some(Arc::new(Block {
138+
number: Some(log.block_number.unwrap().into()),
139+
..Block::default()
140+
})),
141+
InternalJoinStrategy::FullJoin => {
142+
blocks.get(&log.block_number.unwrap().into()).cloned()
143+
}
144+
};
145+
let transaction = match self.transaction {
146+
InternalJoinStrategy::NotSelected => None,
147+
InternalJoinStrategy::OnlyLogJoinField => Some(Arc::new(Transaction {
148+
hash: log.transaction_hash.clone(),
149+
..Transaction::default()
150+
})),
151+
InternalJoinStrategy::FullJoin => transactions
152+
.get(log.transaction_hash.as_ref().unwrap())
153+
.cloned(),
154+
};
55155

56156
Event {
57157
transaction,

hypersync-client/src/types.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

33
use crate::{
4-
simple_types::{Block, Event, Log, Trace, Transaction},
4+
simple_types::{Block, Event, InternalEventJoinStrategy, Log, Trace, Transaction},
55
ArrowChunk, FromArrow,
66
};
77
use anyhow::{anyhow, Context, Result};
@@ -38,14 +38,18 @@ pub struct ResponseData {
3838
pub traces: Vec<Vec<Trace>>,
3939
}
4040

41-
impl From<&'_ ArrowResponse> for EventResponse {
42-
fn from(arrow_response: &'_ ArrowResponse) -> Self {
41+
impl EventResponse {
42+
/// Create EventResponse from ArrowResponse with the specified event join strategy
43+
pub(crate) fn from_arrow_response(
44+
arrow_response: &ArrowResponse,
45+
event_join_strategy: &InternalEventJoinStrategy,
46+
) -> Self {
4347
let r: QueryResponse = arrow_response.into();
4448
Self {
4549
archive_height: r.archive_height,
4650
next_block: r.next_block,
4751
total_execution_time: r.total_execution_time,
48-
data: vec![r.data.into()],
52+
data: event_join_strategy.join_from_response_data(r.data),
4953
rollback_guard: r.rollback_guard,
5054
}
5155
}
@@ -114,7 +118,7 @@ pub struct QueryResponse<T = ResponseData> {
114118
/// Alias for Arrow Query response
115119
pub type ArrowResponse = QueryResponse<ArrowResponseData>;
116120
/// Alias for Event oriented, vectorized QueryResponse
117-
pub type EventResponse = QueryResponse<Vec<Vec<Event>>>;
121+
pub type EventResponse = QueryResponse<Vec<Event>>;
118122

119123
/// Arrow chunk with schema
120124
#[derive(Debug, Clone)]

hypersync-client/src/util.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,7 @@ pub fn decode_logs_batch(sig: &str, batch: &ArrowBatch) -> Result<ArrowBatch> {
134134
Err(e) => {
135135
log::trace!(
136136
"failed to decode body of a log, will write null instead. Error was: \
137-
{:?}",
138-
e
137+
{e:?}"
139138
);
140139
None
141140
}

0 commit comments

Comments
 (0)