Skip to content

Commit 3a4d79e

Browse files
committed
feat(indexer): Introduce optimistic indexing
1 parent d8b6af8 commit 3a4d79e

File tree

10 files changed

+1473
-126
lines changed

10 files changed

+1473
-126
lines changed

crates/iota-indexer/src/apis/write_api.rs

Lines changed: 349 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,358 @@
22
// Modifications Copyright (c) 2024 IOTA Stiftung
33
// SPDX-License-Identifier: Apache-2.0
44

5+
use std::collections::HashSet;
6+
57
use async_trait::async_trait;
6-
use fastcrypto::encoding::Base64;
8+
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper};
9+
use fastcrypto::{encoding::Base64, traits::ToFromBytes};
710
use iota_json_rpc::IotaRpcModule;
811
use iota_json_rpc_api::{WriteApiClient, WriteApiServer, error_object_from_rpc};
912
use iota_json_rpc_types::{
1013
DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse,
1114
IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
1215
};
1316
use iota_open_rpc::Module;
17+
use iota_rest_api::{ExecuteTransactionQueryParameters, client::TransactionExecutionResponse};
1418
use iota_types::{
15-
base_types::IotaAddress, iota_serde::BigInt, quorum_driver_types::ExecuteTransactionRequestType,
19+
base_types::IotaAddress,
20+
digests::TransactionDigest,
21+
effects::TransactionEffectsAPI,
22+
full_checkpoint_content::CheckpointTransaction,
23+
iota_serde::BigInt,
24+
quorum_driver_types::ExecuteTransactionRequestType,
25+
signature::GenericSignature,
26+
transaction::{Transaction, TransactionData},
1627
};
1728
use jsonrpsee::{RpcModule, core::RpcResult, http_client::HttpClient};
1829

19-
use crate::types::IotaTransactionBlockResponseWithOptions;
30+
use crate::{
31+
errors::IndexerError,
32+
handlers::{TransactionObjectChangesToCommit, checkpoint_handler::CheckpointHandler},
33+
indexer_reader::IndexerReader,
34+
metrics::IndexerMetrics,
35+
models::{
36+
event_indices::OptimisticEventIndices,
37+
events::StoredEvent,
38+
transactions::{StoredTransaction, TxInsertionOrder},
39+
tx_indices::OptimisticTxIndices,
40+
},
41+
run_query_async,
42+
schema::tx_insertion_order,
43+
spawn_read_only_blocking,
44+
store::{IndexerStore, PgIndexerStore},
45+
types::{
46+
EventIndex, IndexedDeletedObject, IndexedObject, IotaTransactionBlockResponseWithOptions,
47+
TxIndex,
48+
},
49+
};
2050

2151
pub(crate) struct WriteApi {
2252
fullnode: HttpClient,
53+
fullnode_rest_client: iota_rest_api::Client,
54+
inner: IndexerReader,
55+
store: PgIndexerStore,
56+
metrics: IndexerMetrics,
2357
}
2458

2559
impl WriteApi {
26-
pub fn new(fullnode_client: HttpClient) -> Self {
60+
pub fn new(
61+
fullnode_client: HttpClient,
62+
fullnode_rest_client: iota_rest_api::Client,
63+
inner: IndexerReader,
64+
store: PgIndexerStore,
65+
metrics: IndexerMetrics,
66+
) -> Self {
2767
Self {
2868
fullnode: fullnode_client,
69+
fullnode_rest_client,
70+
inner,
71+
store,
72+
metrics,
2973
}
3074
}
75+
76+
async fn execute_and_optimistically_index_tx_effects(
77+
&self,
78+
tx_bytes: Base64,
79+
signatures: Vec<Base64>,
80+
options: Option<IotaTransactionBlockResponseOptions>,
81+
) -> RpcResult<IotaTransactionBlockResponse> {
82+
let tx_data: TransactionData =
83+
bcs::from_bytes(&tx_bytes.to_vec().map_err(IndexerError::FastCrypto)?)
84+
.map_err(IndexerError::Bcs)?;
85+
let mut sigs = Vec::new();
86+
for sig in signatures {
87+
sigs.push(
88+
GenericSignature::from_bytes(&sig.to_vec().map_err(IndexerError::FastCrypto)?)
89+
.map_err(IndexerError::FastCrypto)?,
90+
);
91+
}
92+
let transaction = Transaction::from_generic_sig_data(tx_data, sigs);
93+
94+
// TODO: shouldn't return type below be from rust-sdk types? Is this type
95+
// correct?
96+
let result = self
97+
.fullnode_rest_client
98+
.execute_transaction(
99+
&ExecuteTransactionQueryParameters {
100+
events: true,
101+
balance_changes: false,
102+
input_objects: true,
103+
output_objects: true,
104+
},
105+
&transaction,
106+
)
107+
.await
108+
.map_err(|e| IndexerError::Generic(e.to_string()))?;
109+
110+
// println!("RESULT: {:#?}", result);
111+
112+
let TransactionExecutionResponse {
113+
effects,
114+
finality: _,
115+
events,
116+
balance_changes: _,
117+
input_objects,
118+
output_objects,
119+
} = result;
120+
121+
let tx_digest = match (input_objects, output_objects) {
122+
(Some(input_objects), Some(output_objects)) => {
123+
let full_tx_data = CheckpointTransaction {
124+
transaction,
125+
effects,
126+
events,
127+
input_objects,
128+
output_objects,
129+
};
130+
self.optimistically_index_transaction(&full_tx_data).await?;
131+
*full_tx_data.effects.transaction_digest()
132+
}
133+
(_, _) => {
134+
// todo: this could be simplified probably
135+
let tx_digest = *effects.transaction_digest();
136+
tx_digest
137+
}
138+
};
139+
140+
let tx_block_response = self
141+
.wait_for_and_return_tx_block_response(tx_digest, options.clone())
142+
.await?;
143+
144+
Ok(IotaTransactionBlockResponseWithOptions {
145+
response: tx_block_response,
146+
options: options.unwrap_or_default(),
147+
}
148+
.into())
149+
}
150+
151+
async fn wait_for_and_return_tx_block_response(
152+
&self,
153+
tx_digest: TransactionDigest,
154+
options: Option<IotaTransactionBlockResponseOptions>,
155+
) -> RpcResult<IotaTransactionBlockResponse> {
156+
loop {
157+
let tx_block_response = self
158+
.inner
159+
.multi_get_transaction_block_response_in_blocking_task(
160+
vec![tx_digest],
161+
options.clone().unwrap_or_default(),
162+
)
163+
.await? // the error returned here could also be a reason for retry I suppose (not fully
164+
// indexed tx?)
165+
.pop();
166+
167+
match tx_block_response {
168+
Some(tx_block_response) => return Ok(tx_block_response),
169+
None => continue, // TODO: implement some timeout, and delay between retries
170+
}
171+
}
172+
}
173+
174+
async fn optimistically_index_transaction(
175+
&self,
176+
full_tx_data: &CheckpointTransaction,
177+
) -> Result<(), IndexerError> {
178+
let assigned_insertion_order: TxInsertionOrder = {
179+
let tx_digest_bytes = full_tx_data.transaction.digest().inner().to_vec();
180+
181+
// // how do we return info about failed txs if we don't index them?
182+
// if *full_tx_data.effects.status() == ExecutionStatus::Success {
183+
184+
self.store
185+
.persist_tx_insertion_order(vec![TxInsertionOrder {
186+
insertion_order: -1, // ignored value
187+
tx_digest: tx_digest_bytes.clone(),
188+
}])
189+
.await?;
190+
191+
let pool = self.inner.get_pool();
192+
run_query_async!(&pool, |conn| {
193+
tx_insertion_order::table
194+
.select(TxInsertionOrder::as_select())
195+
.filter(tx_insertion_order::tx_digest.eq(tx_digest_bytes))
196+
.first::<TxInsertionOrder>(conn)
197+
})?
198+
};
199+
200+
let object_changes = {
201+
let indexed_eventually_removed_objects = full_tx_data
202+
.removed_object_refs_post_version()
203+
.map(|obj_ref| IndexedDeletedObject {
204+
object_id: obj_ref.0,
205+
object_version: obj_ref.1.into(),
206+
checkpoint_sequence_number: 0,
207+
})
208+
.collect::<Vec<_>>();
209+
210+
let changed_objects = full_tx_data
211+
.output_objects
212+
.iter()
213+
.map(|o| {
214+
crate::handlers::checkpoint_handler::try_extract_df_kind(o)
215+
.map(|df_kind| IndexedObject::from_object(0, o.clone(), df_kind))
216+
})
217+
.collect::<Result<Vec<_>, _>>()?;
218+
219+
let output_objects_ids = changed_objects
220+
.iter()
221+
.map(|o| o.object.id())
222+
.collect::<HashSet<_>>();
223+
assert!(
224+
indexed_eventually_removed_objects
225+
.iter()
226+
.all(|o| !output_objects_ids.contains(&o.object_id))
227+
);
228+
229+
TransactionObjectChangesToCommit {
230+
changed_objects,
231+
deleted_objects: indexed_eventually_removed_objects,
232+
}
233+
};
234+
235+
let (indexed_tx, tx_indices, indexed_events, events_indices, indexed_displays) =
236+
CheckpointHandler::index_transaction(
237+
full_tx_data,
238+
assigned_insertion_order.insertion_order.try_into().unwrap(),
239+
0,
240+
0,
241+
&self.metrics,
242+
)
243+
.await?;
244+
245+
// TODO: we write common tables here, we need to make sure we do not overwrite
246+
// objects coming from checkpoint sync
247+
// TODO: both insert and delete need to be handled
248+
self.store.persist_objects(vec![object_changes]).await?; // this will insert, as well as remove from 'objects' table, is it ok?
249+
// TODO: potential race conditions here also need to be handled
250+
self.store.persist_displays(indexed_displays).await?;
251+
252+
self.store
253+
.persist_optimistic_transaction(StoredTransaction::from(&indexed_tx).into())
254+
.await?;
255+
self.store
256+
.persist_optimistic_events(
257+
indexed_events
258+
.into_iter()
259+
.map(StoredEvent::from)
260+
.map(Into::into)
261+
.collect(),
262+
)
263+
.await?;
264+
self.store
265+
.persist_optimistic_event_indices(
266+
Self::optimistic_event_indices_from_indexed_event_indices(events_indices),
267+
)
268+
.await?;
269+
self.store
270+
.persist_optimistic_tx_indices(Self::optimistic_tx_indices_from_indexed_tx_indices(
271+
tx_indices,
272+
))
273+
.await?;
274+
275+
Ok(())
276+
}
277+
278+
fn optimistic_event_indices_from_indexed_event_indices(
279+
event_indices: Vec<EventIndex>,
280+
) -> OptimisticEventIndices {
281+
let (
282+
event_emit_packages,
283+
event_emit_modules,
284+
event_senders,
285+
event_struct_packages,
286+
event_struct_modules,
287+
event_struct_names,
288+
event_struct_instantiations,
289+
) = event_indices.into_iter().map(|i| i.split()).fold(
290+
(
291+
Vec::new(),
292+
Vec::new(),
293+
Vec::new(),
294+
Vec::new(),
295+
Vec::new(),
296+
Vec::new(),
297+
Vec::new(),
298+
),
299+
|(
300+
mut event_emit_packages,
301+
mut event_emit_modules,
302+
mut event_senders,
303+
mut event_struct_packages,
304+
mut event_struct_modules,
305+
mut event_struct_names,
306+
mut event_struct_instantiations,
307+
),
308+
index| {
309+
event_emit_packages.push(index.0);
310+
event_emit_modules.push(index.1);
311+
event_senders.push(index.2);
312+
event_struct_packages.push(index.3);
313+
event_struct_modules.push(index.4);
314+
event_struct_names.push(index.5);
315+
event_struct_instantiations.push(index.6);
316+
(
317+
event_emit_packages,
318+
event_emit_modules,
319+
event_senders,
320+
event_struct_packages,
321+
event_struct_modules,
322+
event_struct_names,
323+
event_struct_instantiations,
324+
)
325+
},
326+
);
327+
328+
(
329+
event_emit_packages.into_iter().map(Into::into).collect(),
330+
event_emit_modules.into_iter().map(Into::into).collect(),
331+
event_senders.into_iter().map(Into::into).collect(),
332+
event_struct_packages.into_iter().map(Into::into).collect(),
333+
event_struct_modules.into_iter().map(Into::into).collect(),
334+
event_struct_names.into_iter().map(Into::into).collect(),
335+
event_struct_instantiations
336+
.into_iter()
337+
.map(Into::into)
338+
.collect(),
339+
)
340+
}
341+
342+
fn optimistic_tx_indices_from_indexed_tx_indices(tx_index: TxIndex) -> OptimisticTxIndices {
343+
let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, _, kinds) =
344+
tx_index.split();
345+
346+
(
347+
senders.into_iter().map(Into::into).collect(),
348+
recipients.into_iter().map(Into::into).collect(),
349+
input_objects.into_iter().map(Into::into).collect(),
350+
changed_objects.into_iter().map(Into::into).collect(),
351+
pkgs.into_iter().map(Into::into).collect(),
352+
mods.into_iter().map(Into::into).collect(),
353+
funs.into_iter().map(Into::into).collect(),
354+
kinds.into_iter().map(Into::into).collect(),
355+
)
356+
}
31357
}
32358

33359
#[async_trait]
@@ -37,13 +363,26 @@ impl WriteApiServer for WriteApi {
37363
tx_bytes: Base64,
38364
signatures: Vec<Base64>,
39365
options: Option<IotaTransactionBlockResponseOptions>,
40-
request_type: Option<ExecuteTransactionRequestType>,
366+
_request_type: Option<ExecuteTransactionRequestType>,
41367
) -> RpcResult<IotaTransactionBlockResponse> {
42-
let iota_transaction_response = self
43-
.fullnode
44-
.execute_transaction_block(tx_bytes, signatures, options.clone(), request_type)
45-
.await
46-
.map_err(error_object_from_rpc)?;
368+
let request_type = Some(ExecuteTransactionRequestType::WaitForLocalExecution); // force local execution, for testing purposes
369+
let iota_transaction_response = match request_type {
370+
None | Some(ExecuteTransactionRequestType::WaitForEffectsCert) => self
371+
.fullnode
372+
.execute_transaction_block(tx_bytes, signatures, options.clone(), request_type)
373+
.await
374+
.map_err(error_object_from_rpc)?, // should this be the default option? What if
375+
// fullnode returns that the tx is locally
376+
// executed?
377+
Some(ExecuteTransactionRequestType::WaitForLocalExecution) => {
378+
self.execute_and_optimistically_index_tx_effects(
379+
tx_bytes,
380+
signatures,
381+
options.clone(),
382+
)
383+
.await?
384+
}
385+
};
47386
Ok(IotaTransactionBlockResponseWithOptions {
48387
response: iota_transaction_response,
49388
options: options.unwrap_or_default(),

0 commit comments

Comments
 (0)