Skip to content

Commit 8b6b928

Browse files
committed
feat(indexer): Introduce optimistic indexing
1 parent 80e89c0 commit 8b6b928

File tree

15 files changed

+1299
-127
lines changed

15 files changed

+1299
-127
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iota-indexer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ move-binary-format.workspace = true
5757
move-bytecode-utils.workspace = true
5858
move-core-types.workspace = true
5959
telemetry-subscribers.workspace = true
60+
rand = "0.8.5"
6061

6162
[features]
6263
pg_integration = []

crates/iota-indexer/src/apis/write_api.rs

Lines changed: 326 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,334 @@
22
// Modifications Copyright (c) 2024 IOTA Stiftung
33
// SPDX-License-Identifier: Apache-2.0
44

5+
use std::collections::HashSet;
6+
57
use async_trait::async_trait;
6-
use fastcrypto::encoding::Base64;
8+
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper};
9+
use fastcrypto::{encoding::Base64, traits::ToFromBytes};
710
use iota_json_rpc::IotaRpcModule;
811
use iota_json_rpc_api::{WriteApiClient, WriteApiServer, error_object_from_rpc};
912
use iota_json_rpc_types::{
1013
DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse,
1114
IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
1215
};
1316
use iota_open_rpc::Module;
17+
use iota_rest_api::{ExecuteTransactionQueryParameters, client::TransactionExecutionResponse};
1418
use iota_types::{
15-
base_types::IotaAddress, iota_serde::BigInt, quorum_driver_types::ExecuteTransactionRequestType,
19+
base_types::IotaAddress,
20+
effects::TransactionEffectsAPI,
21+
full_checkpoint_content::CheckpointTransaction,
22+
iota_serde::BigInt,
23+
quorum_driver_types::ExecuteTransactionRequestType,
24+
signature::GenericSignature,
25+
transaction::{Transaction, TransactionData},
1626
};
1727
use jsonrpsee::{RpcModule, core::RpcResult, http_client::HttpClient};
1828

19-
use crate::types::IotaTransactionBlockResponseWithOptions;
29+
use crate::{
30+
errors::IndexerError,
31+
handlers::{TransactionObjectChangesToCommit, checkpoint_handler::CheckpointHandler},
32+
indexer_reader::IndexerReader,
33+
metrics::IndexerMetrics,
34+
models::{
35+
event_indices::OptimisticEventIndices,
36+
events::StoredEvent,
37+
transactions::{StoredTransaction, TxInsertionOrder},
38+
tx_indices::OptimisticTxIndices,
39+
},
40+
run_query_async,
41+
schema::tx_insertion_order,
42+
spawn_read_only_blocking,
43+
store::{IndexerStore, PgIndexerStore},
44+
types::{
45+
EventIndex, IndexedDeletedObject, IndexedObject, IotaTransactionBlockResponseWithOptions,
46+
TxIndex,
47+
},
48+
};
2049

2150
pub(crate) struct WriteApi {
2251
fullnode: HttpClient,
52+
fullnode_rest_client: iota_rest_api::Client,
53+
inner: IndexerReader,
54+
store: PgIndexerStore,
55+
metrics: IndexerMetrics,
2356
}
2457

2558
impl WriteApi {
26-
pub fn new(fullnode_client: HttpClient) -> Self {
59+
pub fn new(
60+
fullnode_client: HttpClient,
61+
fullnode_rest_client: iota_rest_api::Client,
62+
inner: IndexerReader,
63+
store: PgIndexerStore,
64+
metrics: IndexerMetrics,
65+
) -> Self {
2766
Self {
2867
fullnode: fullnode_client,
68+
fullnode_rest_client,
69+
inner,
70+
store,
71+
metrics,
72+
}
73+
}
74+
75+
async fn execute_and_optimistically_index_tx_effects(
76+
&self,
77+
tx_bytes: Base64,
78+
signatures: Vec<Base64>,
79+
options: Option<IotaTransactionBlockResponseOptions>,
80+
) -> RpcResult<IotaTransactionBlockResponse> {
81+
let tx_data: TransactionData =
82+
bcs::from_bytes(&tx_bytes.to_vec().map_err(IndexerError::FastCrypto)?)
83+
.map_err(IndexerError::Bcs)?;
84+
let mut sigs = Vec::new();
85+
for sig in signatures {
86+
sigs.push(
87+
GenericSignature::from_bytes(&sig.to_vec().map_err(IndexerError::FastCrypto)?)
88+
.map_err(IndexerError::FastCrypto)?,
89+
);
90+
}
91+
let transaction = Transaction::from_generic_sig_data(tx_data, sigs);
92+
93+
// TODO: shouldn't return type below be from rust-sdk types? Is this type
94+
// correct?
95+
let result = self
96+
.fullnode_rest_client
97+
.execute_transaction(
98+
&ExecuteTransactionQueryParameters {
99+
events: true,
100+
balance_changes: false,
101+
input_objects: true,
102+
output_objects: true,
103+
},
104+
&transaction,
105+
)
106+
.await
107+
.map_err(|e| IndexerError::Generic(e.to_string()))?;
108+
109+
// println!("RESULT: {:#?}", result);
110+
111+
let TransactionExecutionResponse {
112+
effects,
113+
finality: _,
114+
events,
115+
balance_changes: _,
116+
input_objects,
117+
output_objects,
118+
} = result;
119+
120+
let full_tx_data = CheckpointTransaction {
121+
transaction,
122+
effects,
123+
events,
124+
input_objects: input_objects
125+
.expect("Input objects should always be present in the response"),
126+
output_objects: output_objects
127+
.expect("Output objects should always be present in the response"),
128+
};
129+
130+
// println!("FULLTXDATA: {:#?}", full_tx_data);
131+
self.optimistically_index_transaction(&full_tx_data).await?;
132+
133+
let tx_block_response = {
134+
self.inner
135+
.multi_get_transaction_block_response_in_blocking_task(
136+
vec![*full_tx_data.effects.transaction_digest()],
137+
options.clone().unwrap_or_default(),
138+
)
139+
.await?
140+
.pop()
141+
.expect("We just indexed this transaction is it should be there")
142+
};
143+
144+
Ok(IotaTransactionBlockResponseWithOptions {
145+
response: tx_block_response,
146+
options: options.unwrap_or_default(),
29147
}
148+
.into())
149+
}
150+
151+
async fn optimistically_index_transaction(
152+
&self,
153+
full_tx_data: &CheckpointTransaction,
154+
) -> Result<(), IndexerError> {
155+
let assigned_insertion_order: TxInsertionOrder = {
156+
let tx_digest_bytes = full_tx_data.transaction.digest().inner().to_vec();
157+
158+
// // how do we return info about failed txs if we don't index them?
159+
// if *full_tx_data.effects.status() == ExecutionStatus::Success {
160+
161+
self.store
162+
.persist_tx_insertion_order(vec![TxInsertionOrder {
163+
insertion_order: -1, // ignored value
164+
tx_digest: tx_digest_bytes.clone(),
165+
}])
166+
.await?;
167+
168+
let pool = self.inner.get_pool();
169+
run_query_async!(&pool, |conn| {
170+
tx_insertion_order::table
171+
.select(TxInsertionOrder::as_select())
172+
.filter(tx_insertion_order::tx_digest.eq(tx_digest_bytes))
173+
.first::<TxInsertionOrder>(conn)
174+
})?
175+
};
176+
177+
let object_changes = {
178+
let indexed_eventually_removed_objects = full_tx_data
179+
.removed_object_refs_post_version()
180+
.map(|obj_ref| IndexedDeletedObject {
181+
object_id: obj_ref.0,
182+
object_version: obj_ref.1.into(),
183+
checkpoint_sequence_number: 0,
184+
})
185+
.collect::<Vec<_>>();
186+
187+
let changed_objects = full_tx_data
188+
.output_objects
189+
.iter()
190+
.map(|o| {
191+
crate::handlers::checkpoint_handler::try_extract_df_kind(o)
192+
.map(|df_kind| IndexedObject::from_object(0, o.clone(), df_kind))
193+
})
194+
.collect::<Result<Vec<_>, _>>()?;
195+
196+
let output_objects_ids = changed_objects
197+
.iter()
198+
.map(|o| o.object.id())
199+
.collect::<HashSet<_>>();
200+
assert!(
201+
indexed_eventually_removed_objects
202+
.iter()
203+
.all(|o| !output_objects_ids.contains(&o.object_id))
204+
);
205+
206+
TransactionObjectChangesToCommit {
207+
changed_objects,
208+
deleted_objects: indexed_eventually_removed_objects,
209+
}
210+
};
211+
212+
let (indexed_tx, tx_indices, indexed_events, events_indices, indexed_displays) =
213+
CheckpointHandler::index_transaction(
214+
full_tx_data,
215+
assigned_insertion_order.insertion_order.try_into().unwrap(),
216+
0,
217+
0,
218+
&self.metrics,
219+
)
220+
.await?;
221+
222+
// TODO: we write common tables here, we need to make sure we do not overwrite
223+
// objects coming from checkpoint sync
224+
// TODO: both insert and delete need to be handled
225+
self.store.persist_objects(vec![object_changes]).await?; // this will insert, as well as remove from 'objects' table, is it ok?
226+
// TODO: potential race conditions here also need to be handled
227+
self.store.persist_displays(indexed_displays).await?;
228+
229+
self.store
230+
.persist_optimistic_transaction(StoredTransaction::from(&indexed_tx).into())
231+
.await?;
232+
self.store
233+
.persist_optimistic_events(
234+
indexed_events
235+
.into_iter()
236+
.map(StoredEvent::from)
237+
.map(Into::into)
238+
.collect(),
239+
)
240+
.await?;
241+
self.store
242+
.persist_optimistic_event_indices(
243+
Self::optimistic_event_indices_from_indexed_event_indices(events_indices),
244+
)
245+
.await?;
246+
self.store
247+
.persist_optimistic_tx_indices(Self::optimistic_tx_indices_from_indexed_tx_indices(
248+
tx_indices,
249+
))
250+
.await?;
251+
252+
Ok(())
253+
}
254+
255+
fn optimistic_event_indices_from_indexed_event_indices(
256+
event_indices: Vec<EventIndex>,
257+
) -> OptimisticEventIndices {
258+
let (
259+
event_emit_packages,
260+
event_emit_modules,
261+
event_senders,
262+
event_struct_packages,
263+
event_struct_modules,
264+
event_struct_names,
265+
event_struct_instantiations,
266+
) = event_indices.into_iter().map(|i| i.split()).fold(
267+
(
268+
Vec::new(),
269+
Vec::new(),
270+
Vec::new(),
271+
Vec::new(),
272+
Vec::new(),
273+
Vec::new(),
274+
Vec::new(),
275+
),
276+
|(
277+
mut event_emit_packages,
278+
mut event_emit_modules,
279+
mut event_senders,
280+
mut event_struct_packages,
281+
mut event_struct_modules,
282+
mut event_struct_names,
283+
mut event_struct_instantiations,
284+
),
285+
index| {
286+
event_emit_packages.push(index.0);
287+
event_emit_modules.push(index.1);
288+
event_senders.push(index.2);
289+
event_struct_packages.push(index.3);
290+
event_struct_modules.push(index.4);
291+
event_struct_names.push(index.5);
292+
event_struct_instantiations.push(index.6);
293+
(
294+
event_emit_packages,
295+
event_emit_modules,
296+
event_senders,
297+
event_struct_packages,
298+
event_struct_modules,
299+
event_struct_names,
300+
event_struct_instantiations,
301+
)
302+
},
303+
);
304+
305+
(
306+
event_emit_packages.into_iter().map(Into::into).collect(),
307+
event_emit_modules.into_iter().map(Into::into).collect(),
308+
event_senders.into_iter().map(Into::into).collect(),
309+
event_struct_packages.into_iter().map(Into::into).collect(),
310+
event_struct_modules.into_iter().map(Into::into).collect(),
311+
event_struct_names.into_iter().map(Into::into).collect(),
312+
event_struct_instantiations
313+
.into_iter()
314+
.map(Into::into)
315+
.collect(),
316+
)
317+
}
318+
319+
fn optimistic_tx_indices_from_indexed_tx_indices(tx_index: TxIndex) -> OptimisticTxIndices {
320+
let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, _, kinds) =
321+
tx_index.split();
322+
323+
(
324+
senders.into_iter().map(Into::into).collect(),
325+
recipients.into_iter().map(Into::into).collect(),
326+
input_objects.into_iter().map(Into::into).collect(),
327+
changed_objects.into_iter().map(Into::into).collect(),
328+
pkgs.into_iter().map(Into::into).collect(),
329+
mods.into_iter().map(Into::into).collect(),
330+
funs.into_iter().map(Into::into).collect(),
331+
kinds.into_iter().map(Into::into).collect(),
332+
)
30333
}
31334
}
32335

@@ -37,13 +340,26 @@ impl WriteApiServer for WriteApi {
37340
tx_bytes: Base64,
38341
signatures: Vec<Base64>,
39342
options: Option<IotaTransactionBlockResponseOptions>,
40-
request_type: Option<ExecuteTransactionRequestType>,
343+
_request_type: Option<ExecuteTransactionRequestType>,
41344
) -> RpcResult<IotaTransactionBlockResponse> {
42-
let iota_transaction_response = self
43-
.fullnode
44-
.execute_transaction_block(tx_bytes, signatures, options.clone(), request_type)
45-
.await
46-
.map_err(error_object_from_rpc)?;
345+
let request_type = Some(ExecuteTransactionRequestType::WaitForLocalExecution); // force local execution, for testing purposes
346+
let iota_transaction_response = match request_type {
347+
None | Some(ExecuteTransactionRequestType::WaitForEffectsCert) => self
348+
.fullnode
349+
.execute_transaction_block(tx_bytes, signatures, options.clone(), request_type)
350+
.await
351+
.map_err(error_object_from_rpc)?, // should this be the default option? What if
352+
// fullnode returns that the tx is locally
353+
// executed?
354+
Some(ExecuteTransactionRequestType::WaitForLocalExecution) => {
355+
self.execute_and_optimistically_index_tx_effects(
356+
tx_bytes,
357+
signatures,
358+
options.clone(),
359+
)
360+
.await?
361+
}
362+
};
47363
Ok(IotaTransactionBlockResponseWithOptions {
48364
response: iota_transaction_response,
49365
options: options.unwrap_or_default(),

0 commit comments

Comments
 (0)