Skip to content

Commit 99e7a2c

Browse files
committed
feat(indexer): Introduce optimistic indexing
1 parent 371adab commit 99e7a2c

File tree

9 files changed

+348
-144
lines changed

9 files changed

+348
-144
lines changed

crates/iota-indexer/src/apis/write_api.rs

Lines changed: 107 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,68 @@
22
// Modifications Copyright (c) 2024 IOTA Stiftung
33
// SPDX-License-Identifier: Apache-2.0
44

5+
use std::{collections::HashSet, time::Duration};
6+
7+
use anyhow::Context;
58
use async_trait::async_trait;
9+
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper};
10+
use downcast::Any;
611
use fastcrypto::{encoding::Base64, traits::ToFromBytes};
712
use iota_json_rpc::IotaRpcModule;
8-
use iota_json_rpc_api::{error_object_from_rpc, WriteApiClient, WriteApiServer};
13+
use iota_json_rpc_api::{WriteApiClient, WriteApiServer, error_object_from_rpc};
914
use iota_json_rpc_types::{
1015
DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse,
1116
IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
1217
};
1318
use iota_open_rpc::Module;
14-
use iota_rest_api::{client::TransactionExecutionResponse, ExecuteTransactionQueryParameters};
19+
use iota_rest_api::{ExecuteTransactionQueryParameters, client::TransactionExecutionResponse};
1520
use iota_types::{
1621
base_types::IotaAddress,
1722
effects::TransactionEffectsAPI,
23+
execution_status::ExecutionStatus,
1824
full_checkpoint_content::CheckpointTransaction,
1925
iota_serde::BigInt,
2026
quorum_driver_types::ExecuteTransactionRequestType,
2127
signature::GenericSignature,
2228
transaction::{Transaction, TransactionData},
2329
};
24-
use jsonrpsee::{core::RpcResult, http_client::HttpClient, RpcModule};
30+
use jsonrpsee::{RpcModule, core::RpcResult, http_client::HttpClient};
2531

2632
use crate::{
27-
errors::IndexerError, indexer_reader::IndexerReader,
28-
types::IotaTransactionBlockResponseWithOptions,
33+
errors::IndexerError,
34+
handlers::{TransactionObjectChangesToCommit, checkpoint_handler::CheckpointHandler},
35+
indexer_reader::IndexerReader,
36+
metrics::IndexerMetrics,
37+
models::transactions::{StoredTransaction, TxInsertionOrder},
38+
run_query_async,
39+
schema::tx_insertion_order,
40+
spawn_read_only_blocking,
41+
store::{IndexerStore, PgIndexerStore},
42+
types::{IndexedDeletedObject, IndexedObject, IotaTransactionBlockResponseWithOptions},
2943
};
3044

3145
pub(crate) struct WriteApi {
3246
fullnode: HttpClient,
3347
fullnode_rest_client: iota_rest_api::Client,
3448
inner: IndexerReader,
49+
store: PgIndexerStore,
50+
metrics: IndexerMetrics,
3551
}
3652

3753
impl WriteApi {
3854
pub fn new(
3955
fullnode_client: HttpClient,
4056
fullnode_rest_client: iota_rest_api::Client,
4157
inner: IndexerReader,
58+
store: PgIndexerStore,
59+
metrics: IndexerMetrics,
4260
) -> Self {
4361
Self {
4462
fullnode: fullnode_client,
4563
fullnode_rest_client,
4664
inner,
65+
store,
66+
metrics,
4767
}
4868
}
4969

@@ -101,6 +121,7 @@ impl WriteApi {
101121
};
102122

103123
println!("{:#?}", full_tx_data);
124+
self.optimistically_index_transaction(&full_tx_data).await?;
104125

105126
// TODO: optimistically index the TX, so we can read it in the lines below right
106127
// away, without looping
@@ -127,6 +148,84 @@ impl WriteApi {
127148
}
128149
.into())
129150
}
151+
152+
async fn optimistically_index_transaction(
153+
&self,
154+
full_tx_data: &CheckpointTransaction,
155+
) -> Result<(), IndexerError> {
156+
let assigned_insertion_order: TxInsertionOrder = {
157+
let tx_digest_bytes = full_tx_data.transaction.digest().inner().to_vec();
158+
159+
// // how do we return info about failed txs if we don't index them?
160+
// if *full_tx_data.effects.status() == ExecutionStatus::Success {
161+
162+
self.store
163+
.persist_tx_insertion_order(vec![TxInsertionOrder {
164+
insertion_order: -1, // ignored value
165+
tx_digest: tx_digest_bytes.clone(),
166+
}])
167+
.await?;
168+
169+
let pool = self.inner.get_pool();
170+
run_query_async!(&pool, |conn| {
171+
tx_insertion_order::table
172+
.select(TxInsertionOrder::as_select())
173+
.filter(tx_insertion_order::tx_digest.eq(tx_digest_bytes))
174+
.first::<TxInsertionOrder>(conn)
175+
})?
176+
};
177+
178+
let object_changes = {
179+
let indexed_eventually_removed_objects = full_tx_data
180+
.removed_object_refs_post_version()
181+
.map(|obj_ref| IndexedDeletedObject {
182+
object_id: obj_ref.0,
183+
object_version: obj_ref.1.into(),
184+
checkpoint_sequence_number: 0,
185+
})
186+
.collect::<Vec<_>>();
187+
188+
let changed_objects = full_tx_data
189+
.output_objects
190+
.iter()
191+
.map(|o| {
192+
crate::handlers::checkpoint_handler::try_extract_df_kind(o)
193+
.map(|df_kind| IndexedObject::from_object(0, o.clone(), df_kind))
194+
})
195+
.collect::<Result<Vec<_>, _>>()?;
196+
197+
let output_objects_ids = changed_objects
198+
.iter()
199+
.map(|o| o.object.id())
200+
.collect::<HashSet<_>>();
201+
assert!(
202+
indexed_eventually_removed_objects
203+
.iter()
204+
.all(|o| !output_objects_ids.contains(&o.object_id))
205+
);
206+
207+
TransactionObjectChangesToCommit {
208+
changed_objects,
209+
deleted_objects: indexed_eventually_removed_objects,
210+
}
211+
};
212+
213+
let (indexed_tx, tx_indices, indexed_events, events_indices, indexed_displays) =
214+
CheckpointHandler::index_transaction(
215+
&full_tx_data,
216+
assigned_insertion_order.insertion_order.try_into().unwrap(),
217+
0,
218+
0,
219+
&self.metrics,
220+
)
221+
.await?;
222+
223+
self.store.persist_objects(vec![object_changes]).await?; // this will insert, as well as remove from 'objects' table, is it ok?
224+
self.store.persist_displays(indexed_displays).await?;
225+
self.store
226+
.persist_optimistic_transaction(StoredTransaction::from(&indexed_tx).into())
227+
.await
228+
}
130229
}
131230

132231
#[async_trait]
@@ -143,7 +242,9 @@ impl WriteApiServer for WriteApi {
143242
.fullnode
144243
.execute_transaction_block(tx_bytes, signatures, options.clone(), request_type)
145244
.await
146-
.map_err(error_object_from_rpc)?,
245+
.map_err(error_object_from_rpc)?, // should this be the default option? What if
246+
// fullnode returns that the tx is locally
247+
// executed?
147248
Some(ExecuteTransactionRequestType::WaitForLocalExecution) => {
148249
self.execute_and_optimistically_index_tx_effects(
149250
tx_bytes,

crates/iota-indexer/src/db.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,15 @@ pub mod setup_postgres {
307307
let store = PgIndexerStore::new(blocking_cp, indexer_metrics.clone());
308308
return Indexer::start_writer(&indexer_config, store, indexer_metrics).await;
309309
} else if indexer_config.rpc_server_worker {
310-
return Indexer::start_reader(&indexer_config, &registry, db_url.to_string()).await;
310+
let store = PgIndexerStore::new(blocking_cp, indexer_metrics.clone());
311+
return Indexer::start_reader(
312+
&indexer_config,
313+
store,
314+
&registry,
315+
db_url.to_string(),
316+
indexer_metrics,
317+
)
318+
.await;
311319
} else if indexer_config.analytical_worker {
312320
let store = PgIndexerAnalyticalStore::new(blocking_cp);
313321
return Indexer::start_analytical_worker(store, indexer_metrics.clone()).await;

0 commit comments

Comments
 (0)