Skip to content

refactor(indexer): Rework config and cli arguments #6517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions crates/iota-graphql-rpc/src/data/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ mod tests {
use diesel::QueryDsl;
use iota_framework::BuiltInFramework;
use iota_indexer::{
db::{get_pool_connection, new_connection_pool, reset_database},
db::{ConnectionPoolConfig, get_pool_connection, new_connection_pool, reset_database},
models::objects::StoredObject,
schema::objects,
types::IndexedObject,
Expand All @@ -216,11 +216,11 @@ mod tests {
#[test]
fn test_query_cost() {
let connection_config = ConnectionConfig::default();
let pool = new_connection_pool(
&connection_config.db_url,
Some(connection_config.db_pool_size),
)
.unwrap();
let connection_pool_config = ConnectionPoolConfig {
pool_size: connection_config.db_pool_size,
..Default::default()
};
let pool = new_connection_pool(&connection_config.db_url, &connection_pool_config).unwrap();
let mut conn = get_pool_connection(&pool).unwrap();
reset_database(&mut conn).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/iota-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use iota_graphql_rpc_client::simple_client::SimpleClient;
pub use iota_indexer::handlers::objects_snapshot_handler::SnapshotLagConfig;
pub use iota_indexer::config::SnapshotLagConfig;
use iota_indexer::{
errors::IndexerError,
store::{PgIndexerStore, indexer_store::IndexerStore},
Expand Down
2 changes: 1 addition & 1 deletion crates/iota-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ backoff.workspace = true
bcs.workspace = true
cached.workspace = true
chrono.workspace = true
clap.workspace = true
clap = { workspace = true, features = ["env"] }
diesel = { workspace = true, features = ["chrono", "r2d2", "serde_json", "postgres"] }
diesel_migrations = "2.2.0"
downcast = "0.11.0"
Expand Down
289 changes: 289 additions & 0 deletions crates/iota-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
// Copyright (c) Mysten Labs, Inc.
// Modifications Copyright (c) 2025 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::{net::SocketAddr, path::PathBuf};

use clap::{Args, Parser, Subcommand};
use iota_names::config::IotaNamesConfig;
use iota_types::base_types::{IotaAddress, ObjectID};
use url::Url;

use crate::db::ConnectionPoolConfig;

#[derive(Parser, Clone, Debug)]
#[clap(
name = "IOTA indexer",
about = "An off-fullnode service serving data from IOTA protocol"
)]
pub struct IndexerConfig {
#[clap(long, alias = "db-url")]
pub database_url: Url,

#[clap(flatten)]
pub connection_pool_config: ConnectionPoolConfig,

#[clap(long, default_value = "0.0.0.0:9184")]
pub metrics_address: SocketAddr,

#[command(subcommand)]
pub command: Command,
}

#[derive(Args, Debug, Clone)]
pub struct IotaNamesOptions {
#[arg(default_value_t = IotaNamesConfig::default().package_address)]
#[arg(long = "iota-names-package-address")]
pub package_address: IotaAddress,
#[arg(default_value_t = IotaNamesConfig::default().object_id)]
#[arg(long = "iota-names-object-id")]
pub object_id: ObjectID,
#[arg(default_value_t = IotaNamesConfig::default().payments_package_address)]
#[arg(long = "iota-names-payments-package-address")]
pub payments_package_address: IotaAddress,
#[arg(default_value_t = IotaNamesConfig::default().registry_id)]
#[arg(long = "iota-names-registry-id")]
pub registry_id: ObjectID,
#[arg(default_value_t = IotaNamesConfig::default().reverse_registry_id)]
#[arg(long = "iota-names-reverse-registry-id")]
pub reverse_registry_id: ObjectID,
}

impl From<IotaNamesOptions> for IotaNamesConfig {
fn from(options: IotaNamesOptions) -> Self {
let IotaNamesOptions {
package_address,
object_id,
payments_package_address,
registry_id,
reverse_registry_id,
} = options;
Self {
package_address,
object_id,
payments_package_address,
registry_id,
reverse_registry_id,
}
}
}

impl From<IotaNamesConfig> for IotaNamesOptions {
fn from(config: IotaNamesConfig) -> Self {
let IotaNamesConfig {
package_address,
object_id,
payments_package_address,
registry_id,
reverse_registry_id,
} = config;
Self {
package_address,
object_id,
payments_package_address,
registry_id,
reverse_registry_id,
}
}
}

impl Default for IotaNamesOptions {
fn default() -> Self {
IotaNamesConfig::default().into()
}
}

#[derive(Args, Debug, Clone)]
pub struct JsonRpcConfig {
#[command(flatten)]
pub iota_names_options: IotaNamesOptions,

#[clap(long, default_value = "0.0.0.0:9000")]
pub rpc_address: SocketAddr,

#[clap(long)]
pub rpc_client_url: String,
}

#[derive(Args, Debug, Default, Clone)]
#[group(required = true, multiple = true)]
pub struct IngestionSources {
#[arg(long)]
pub data_ingestion_path: Option<PathBuf>,

#[arg(long)]
pub remote_store_url: Option<Url>,

#[arg(long)]
pub rpc_client_url: Option<Url>,
}

#[derive(Args, Debug, Clone)]
pub struct IngestionConfig {
#[clap(flatten)]
pub sources: IngestionSources,

#[arg(
long,
default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE,
env = "DOWNLOAD_QUEUE_SIZE",
)]
pub checkpoint_download_queue_size: usize,

#[arg(
long,
default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT,
env = "INGESTION_READER_TIMEOUT_SECS",
)]
pub checkpoint_download_timeout: u64,

/// Limit indexing parallelism on big checkpoints to avoid OOMing by
/// limiting the total size of the checkpoint download queue.
#[arg(
long,
default_value_t = Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE_BYTES,
env = "CHECKPOINT_PROCESSING_BATCH_DATA_LIMIT",
)]
pub checkpoint_download_queue_size_bytes: usize,
}

impl IngestionConfig {
pub const DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE: usize = 200;
pub const DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE_BYTES: usize = 20_000_000;
pub const DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT: u64 = 20;
}

impl Default for IngestionConfig {
fn default() -> Self {
Self {
sources: Default::default(),
checkpoint_download_queue_size: Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE,
checkpoint_download_timeout: Self::DEFAULT_CHECKPOINT_DOWNLOAD_TIMEOUT,
checkpoint_download_queue_size_bytes:
Self::DEFAULT_CHECKPOINT_DOWNLOAD_QUEUE_SIZE_BYTES,
}
}
}

#[derive(Subcommand, Clone, Debug)]
pub enum Command {
Indexer {
#[command(flatten)]
ingestion_config: IngestionConfig,
#[command(flatten)]
snapshot_config: SnapshotLagConfig,
#[command(flatten)]
pruning_options: PruningOptions,
},
JsonRpcService(JsonRpcConfig),
ResetDatabase {
#[clap(long)]
force: bool,
},
AnalyticalWorker,
}

#[derive(Args, Default, Debug, Clone)]
pub struct PruningOptions {
#[arg(long, env = "EPOCHS_TO_KEEP")]
pub epochs_to_keep: Option<u64>,
}

#[derive(Args, Debug, Clone)]
pub struct SnapshotLagConfig {
#[arg(
long = "object-snapshot-min-checkpoint-lag",
default_value_t = Self::DEFAULT_MIN_LAG,
env = "OBJECTS_SNAPSHOT_MIN_CHECKPOINT_LAG",
)]
pub snapshot_min_lag: usize,

#[arg(
long = "objects-snapshot-sleep-duration",
default_value_t = Self::DEFAULT_SLEEP_DURATION_SEC,
)]
pub sleep_duration: u64,
}

impl SnapshotLagConfig {
pub const DEFAULT_MIN_LAG: usize = 300;
pub const DEFAULT_SLEEP_DURATION_SEC: u64 = 5;
}

impl Default for SnapshotLagConfig {
fn default() -> Self {
SnapshotLagConfig {
snapshot_min_lag: Self::DEFAULT_MIN_LAG,
sleep_duration: Self::DEFAULT_SLEEP_DURATION_SEC,
}
}
}

#[cfg(test)]
mod test {
use tap::Pipe;

use super::*;

fn parse_args<'a, T>(args: impl IntoIterator<Item = &'a str>) -> Result<T, clap::error::Error>
where
T: clap::Args + clap::FromArgMatches,
{
clap::Command::new("test")
.no_binary_name(true)
.pipe(T::augment_args)
.try_get_matches_from(args)
.and_then(|matches| T::from_arg_matches(&matches))
}

#[test]
fn name_service() {
parse_args::<IotaNamesOptions>(["--name-service-registry-id=0x1"]).unwrap();
parse_args::<IotaNamesOptions>([
"--name-service-package-address",
"0x0000000000000000000000000000000000000000000000000000000000000001",
])
.unwrap();
parse_args::<IotaNamesOptions>(["--name-service-reverse-registry-id=0x1"]).unwrap();
parse_args::<IotaNamesOptions>([
"--name-service-registry-id=0x1",
"--name-service-package-address",
"0x0000000000000000000000000000000000000000000000000000000000000002",
"--name-service-reverse-registry-id=0x3",
])
.unwrap();
parse_args::<IotaNamesOptions>([]).unwrap();
}

#[test]
fn ingestion_sources() {
parse_args::<IngestionSources>(["--data-ingestion-path=/tmp/foo"]).unwrap();
parse_args::<IngestionSources>(["--remote-store-url=http://example.com"]).unwrap();
parse_args::<IngestionSources>(["--rpc-client-url=http://example.com"]).unwrap();

parse_args::<IngestionSources>([
"--data-ingestion-path=/tmp/foo",
"--remote-store-url=http://example.com",
"--rpc-client-url=http://example.com",
])
.unwrap();

// At least one must be present
parse_args::<IngestionSources>([]).unwrap_err();
}

#[test]
fn json_rpc_config() {
parse_args::<JsonRpcConfig>(["--rpc-client-url=http://example.com"]).unwrap();

// Can include name service options and bind address
parse_args::<JsonRpcConfig>([
"--rpc-address=127.0.0.1:8080",
"--rpc-client-url=http://example.com",
])
.unwrap();

// fullnode rpc url must be present
parse_args::<JsonRpcConfig>([]).unwrap_err();
}
}
Loading