|
| 1 | +--- |
| 2 | +title: "Creating a Processor" |
| 3 | +--- |
| 4 | +import { Callout } from 'nextra/components' |
| 5 | +import { IndexerBetaNotice } from '@components/index'; |
| 6 | + |
| 7 | +# Creating a Processor |
| 8 | + |
| 9 | +This guide will walk you through setting up the basic template for a new processor. |
| 10 | + |
| 11 | +## Pre-requisites |
| 12 | +You've already set up your environment and have the Indexer SDK `aptos-indexer-sdk` and `aptos-indexer-sdk-server-framework` installed. |
| 13 | +If you haven't, follow the [Indexer SDK installation guide](./setup.mdx). |
| 14 | + |
| 15 | +## Overview |
| 16 | +Creating and running a processor will require several pieces: |
| 17 | +1. `IndexerProcessorConfig` |
| 18 | +2. `ProcessorConfig` |
| 19 | +3. The processor itself. This is where you'll define a processor's config, the processor setup, and the steps that will be run to index transactions. |
| 20 | +4. `main.rs` - The main file that will run the processor. |
| 21 | + |
| 22 | +The next section goes through each of these pieces more explicitly and provides code examples. |
| 23 | + |
| 24 | +## How to define `IndexerProcessorConfig` |
| 25 | +The `IndexerProcessorConfig` defines the base configuration for all processors that you'll be running. |
| 26 | +It should include configuration for things that are shared across multiple processors, like the database configuration and [Transaction Stream](../../txn-stream.mdx) configuration. |
| 27 | + |
| 28 | +[`ServerArgs`](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk-server-framework/src/lib.rs#L26) parses a `config.yaml` file and bootstraps a server with all the common pieces to run a processor. |
| 29 | + |
| 30 | +To setup the configuration for your processor and make it work with `ServerArgs`, you'll need to define a `IndexerProcessorConfig` that implements the [`RunnableConfig`](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk-server-framework/src/lib.rs#L102) trait. |
| 31 | +It also triggers a run method, which can be invoked in `main.rs`. |
| 32 | + |
| 33 | +For basic cases, you can copy the [`IndexerProcessorConfig` from the `aptos-indexer-processor-example`](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/config/indexer_processor_config.rs) repository and modify it to fit your needs. |
| 34 | + |
| 35 | +## How to define `ProcessorConfig` |
| 36 | +`ProcessorConfig` is an enum that contains all the individual processor configs. |
| 37 | +It's used by `IndexerProcessorConfig.run()` to map the processor name to the right `ProcessorConfig`. |
| 38 | + |
| 39 | +You can see a basic example of a `ProcessorConfig` [here](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/config/processor_config.rs). |
| 40 | +An example of a more complex setup that includes multiple processors and configurations is [`aptos-indexer-processors`](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/config/processor_config.rs#L84). |
| 41 | + |
| 42 | +## How to create a processor |
| 43 | +Now that you've got the configuration pieces set up, the next step is to create the processor. |
| 44 | +The processor is represented by a struct and is usually named `{PROCESSOR_NAME}Processor`, like `EventsProcessor` or `TokenV2Processor`, depending on the type of data it's indexing. |
| 45 | +```rust |
| 46 | +pub struct EventsProcessor { |
| 47 | + pub config: IndexerProcessorConfig, |
| 48 | + pub db_pool: ArcDbPool, |
| 49 | +} |
| 50 | +``` |
| 51 | +The processor's constructor should be defined like so: |
| 52 | +```rust |
| 53 | +pub async fn new(config: IndexerProcessorConfig) -> Result<Self> { |
| 54 | + // Processor setup code here, if needed |
| 55 | +} |
| 56 | +``` |
| 57 | +It takes in the `IndexerProcessorConfig` that you've defined and performs any setup required to instantiate the processor. |
| 58 | +Next, your processor needs to implement the [`ProcessorTrait`](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/a56b641a6aaca60092fcc9bbd98252f3cd703299/aptos-indexer-processors-sdk/sdk/src/traits/processor_trait.rs#L4). |
| 59 | +```rust |
| 60 | +#[async_trait::async_trait] |
| 61 | +impl ProcessorTrait for EventsProcessor { |
| 62 | + fn name(&self) -> &'static str { |
| 63 | + self.config.processor_config.name() |
| 64 | + } |
| 65 | + |
| 66 | + async fn run_processor(&self) -> Result<()> { |
| 67 | + // Processor logic here |
| 68 | + } |
| 69 | +} |
| 70 | +``` |
| 71 | +The `run_processor` method is the most important method in the processor. |
| 72 | + |
| 73 | +If you're using a migration-based database, like PostgreSQL, running the migrations can go inside of `run_processor`. |
| 74 | +This is also where we implement logic to determine the appropriate starting version for the processor, verify the chain ID using [Transaction Stream](../../txn-stream.mdx), and validate the processor's configuration. |
| 75 | + |
| 76 | +`run_processor` also contains the instantiation of the processor's `Step`s and the specification of how these `Step`s are connected together by channels. |
| 77 | + |
| 78 | +```rust |
| 79 | +// Instantiate processor steps |
| 80 | +let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { |
| 81 | + starting_version: Some(starting_version), |
| 82 | + ..self.config.transaction_stream_config.clone() |
| 83 | +}) |
| 84 | +.await?; |
| 85 | +// ... Instantiate the rest of your processor's steps ... |
| 86 | + |
| 87 | +// Connect processor steps |
| 88 | +let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( |
| 89 | + transaction_stream.into_runnable_step(), |
| 90 | +) |
| 91 | +.connect_to(extractor_step.into_runnable_step(), channel_size) |
| 92 | +.connect_to(storer_step.into_runnable_step(), channel_size) |
| 93 | +.connect_to(version_tracker_step.into_runnable_step(), channel_size) |
| 94 | +.end_and_return_output_receiver(channel_size); |
| 95 | + |
| 96 | +// Read the results from the output of the last step |
| 97 | +loop { |
| 98 | + match buffer_receiver.recv().await { |
| 99 | + // Do something with th output |
| 100 | + } |
| 101 | +} |
| 102 | +``` |
| 103 | + |
| 104 | +You can see a full example of a processor that indexes raw Aptos events in [`aptos-indexer-processor-example`](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/processors/events_processor.rs). |
| 105 | +As a reference, you can also see all of the processors that make up the [Indexer API](../../aptos-hosted.mdx) in [`aptos-indexer-processors`](https://github.com/aptos-labs/aptos-indexer-processors/tree/main/rust/sdk-processor/src/processors). |
| 106 | + |
| 107 | +## How to define `main.rs` |
| 108 | +You may copy the [`main.rs`](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/main.rs) file from the `aptos-indexer-processor-example`. |
| 109 | + |
| 110 | +These lines of code uses the `ServerArgs` and the `IndexerProcessorConfig` that we've defined earlier: |
| 111 | +```rust |
| 112 | +let args = ServerArgs::parse(); |
| 113 | +args.run::<IndexerProcessorConfig>(tokio::runtime::Handle::current()) |
| 114 | + .await |
| 115 | +``` |
0 commit comments