Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.

Commit 19b59b9

Browse files
committed
create processor
1 parent edf8bbe commit 19b59b9

File tree

3 files changed

+111
-0
lines changed

3 files changed

+111
-0
lines changed

apps/nextra/next.config.mjs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,11 @@ export default withBundleAnalyzer(
407407
destination: "/en/build/indexer/indexer-sdk/documentation/setup",
408408
permanent: true,
409409
},
410+
{
411+
source: "/indexer/indexer-sdk/documentation/create-processor",
412+
destination: "/en/build/indexer/indexer-sdk/documentation/create-processor",
413+
permanent: true,
414+
},
410415
{
411416
source: "/indexer/indexer-sdk/documentation/steps",
412417
destination: "/en/build/indexer/indexer-sdk/documentation/steps",

apps/nextra/pages/en/build/indexer/indexer-sdk/documentation/_meta.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
export default {
22
setup: { title: "Initial Setup" },
3+
"create-processor": {
4+
title: "Create a Processor",
5+
},
36
steps: {
47
title: "Creating a Step",
58
},
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
---
2+
title: "Creating a Processor"
3+
---
4+
import { Callout } from 'nextra/components'
5+
import { IndexerBetaNotice } from '@components/index';
6+
7+
# Creating a Processor
8+
9+
This guide will walk you through setting up the basic template for a new processor.
10+
11+
## Pre-requisites
12+
You've already set up your environment and have the Indexer SDK `aptos-indexer-sdk` and `aptos-indexer-sdk-server-framework` installed.
13+
If you haven't, follow the [Indexer SDK installation guide](./installation.mdx).
14+
15+
Creating and running a processor will require several pieces:
16+
1. `IndexerProcessorConfig`
17+
2. `ProcessorConfig`
18+
3. The processor itself. This is where you'll define a processor's config, the processor setup, and the steps that will be run to index transactions.
19+
4. `main.rs` - The main file that will run the processor.
20+
21+
The next section goes through each of these pieces more explicitly and provides code examples.
22+
23+
## `IndexerProcessorConfig`
24+
The `IndexerProcessorConfig` defines the base configuration for all processors that you'll be running.
25+
It should include configuration for things that are shared across multiple processors, like the database configuration and [Transaction Stream](./txn-stream.mdx) configuration.
26+
27+
To setup the configuration for your processor and make it work with `ServerArgs`, you'll need to define `IndexerProcessorConfig` that implements the `RunnableConfig` trait.
28+
[`ServerArgs`](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk-server-framework/src/lib.rs#L26) (provided by the `aptos-indexer-sdk-server-framework` crate) parses the `config.yaml` file and bootstraps a server with all the common pieces to run a processor.
29+
It also triggers a run method, which can be invoked in `main.rs`.
30+
31+
For basic cases, you can copy the [`IndexerProcessorConfig` from the `aptos-indexer-processor-example`](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/config/indexer_processor_config.rs) repository and modify it to fit your needs.
32+
33+
## `ProcessorConfig`
34+
`ProcessorConfig` is an enum that contains all the individual processor configs.
35+
It's used by `IndexerProcessorConfig.run()` to map the processor name to the right `ProcessorConfig`.
36+
37+
You can see a basic example of a `ProcessorConfig` [here](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/config/processor_config.rs).
38+
An example of a more complex setup that includes multiple processors and configurations is [`aptos-indexer-processors`](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/config/processor_config.rs#L84).
39+
40+
## How to create a processor
41+
Now that you've got the configuration pieces set up, the next step is to create the processor.
42+
The processor is represented by a struct and is usually named `{PROCESSOR_NAME}_Processor`, like `EventsProcessor` or `TokenV2Processor`, depending on the type of data it's indexing.
43+
```rust
44+
pub struct EventsProcessor {
45+
pub config: IndexerProcessorConfig,
46+
pub db_pool: ArcDbPool,
47+
}
48+
```
49+
The processor's constructor should be defined like so:
50+
```rust
51+
pub async fn new(config: IndexerProcessorConfig) -> Result<Self> {
52+
// Processor setup code here, if needed
53+
}
54+
```
55+
It takes in the `IndexerProcessorConfig` that you've defined and performs any setup required to instantiate the processor.
56+
Next, your processor needs to implement the [`ProcessorTrait`](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/a56b641a6aaca60092fcc9bbd98252f3cd703299/aptos-indexer-processors-sdk/sdk/src/traits/processor_trait.rs#L4).
57+
```rust
58+
#[async_trait::async_trait]
59+
impl ProcessorTrait for EventsProcessor {
60+
fn name(&self) -> &'static str {
61+
self.config.processor_config.name()
62+
}
63+
64+
async fn run_processor(&self) -> Result<()> {
65+
// Processor logic here
66+
}
67+
}
68+
```
69+
The `run_processor` method is the most important method in the processor.
70+
71+
If you're using a migration-based database, like PostgreSQL, running the migrations can go inside of `run_processor`.
72+
This is also where we implement logic to determine the appropriate starting version for the processor, establish a connection to [Transaction Stream](../../txn-stream.mdx) to verify the chain ID, and validate the processor's configuration.
73+
74+
`run_processor` also contains the instantiation of the processor's `Step`s and the specification of how these `Step`s are connected together by channels.
75+
76+
```rust
77+
// Instantiate processor steps
78+
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
79+
starting_version: Some(starting_version),
80+
..self.config.transaction_stream_config.clone()
81+
})
82+
.await?;
83+
// ... Instantiate the rest of your processor's steps ...
84+
85+
// Connect processor steps
86+
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
87+
transaction_stream.into_runnable_step(),
88+
)
89+
.connect_to(extractor_step.into_runnable_step(), channel_size)
90+
.connect_to(storer_step.into_runnable_step(), channel_size)
91+
.connect_to(version_tracker_step.into_runnable_step(), channel_size)
92+
.end_and_return_output_receiver(channel_size);
93+
94+
// Read the results from the output of the last step
95+
loop {
96+
match buffer_receiver.recv().await {
97+
// Do something with th output
98+
}
99+
}
100+
```
101+
102+
You can see a full example of a processor that indexes raw Aptos events in [`aptos-indexer-processor-example`](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/processors/events_processor.rs).
103+
As a reference, you can also see all of the processors that make up the [Indexer API](../../aptos-hosted.mdx) in [`aptos-indexer-processors`](https://github.com/aptos-labs/aptos-indexer-processors/tree/main/rust/sdk-processor/src/processors).

0 commit comments

Comments
 (0)