|
| 1 | +--- |
| 2 | +title: "Migrate to Indexer SDK" |
| 3 | +--- |
| 4 | + |
| 5 | +# Migrate to Indexer SDK |
| 6 | + |
| 7 | +This guide contains instructions on how to migrate your legacy custom processor (that's written in the [old way](https://github.com/aptos-labs/aptos-indexer-processors/blob/aptos-indexer-processors-v1.20.0/rust/processor/src/processors/events_processor.rs)) to Indexer SDK. |
| 8 | + |
| 9 | +## 1. Clone the example repo |
| 10 | +We use example events processor in `aptos-indexer-processor-example` as a starting point for the migration. |
| 11 | +```bash |
| 12 | +git clone https://github.com/aptos-labs/aptos-indexer-processor-example.git |
| 13 | +``` |
| 14 | + |
| 15 | +## 2. Migrate your processor config |
| 16 | +Previously, you would create a branch of `aptos-indexer-processors` and update the processor config to include your custom processor. |
| 17 | +This legacy approach made it very difficult to upgrade your processor. |
| 18 | +To address this, the SDK no longer depends on `aptos-indexer-processors`. |
| 19 | +As a result, you'll need to define your own `IndexerProcessorConfig` and `ProcessorConfig` structs. |
| 20 | + |
| 21 | +The `IndexerProcessorConfig` defines the base configuration for all processors that you'll be running. |
| 22 | +The `ProcessorConfig` is an enum that contains all the individual processor configs. |
| 23 | + |
| 24 | +Update the following files in your project: |
| 25 | +- [`ProcessorConfig`](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/config/processor_config.rs): Replace `EventsProcessor` with your processor. |
| 26 | +- [`IndexerProcessorConfig`](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/config/indexer_processor_config.rs): Update the `.run()` method to include your processor. |
| 27 | + |
| 28 | +If you'd like to read more about configuration in the SDK, take a look at the [Create a Processor](../documentation/create-processor.mdx) guide. |
| 29 | + |
| 30 | +## 3. Migrate processing logic to steps |
| 31 | +In the old way, you defined your processor's logic by implementing `ProcessorTrait`'s `process_transactions` method. |
| 32 | + |
| 33 | +Example events processor written with the old way: |
| 34 | +```rust |
| 35 | +#[async_trait] |
| 36 | +impl ProcessorTrait for EventsProcessor { |
| 37 | + async fn process_transactions( |
| 38 | + ... |
| 39 | + ) -> anyhow::Result<ProcessingResult> { |
| 40 | + // Extract events from transactions |
| 41 | + let events: Vec<EventModel> = process_events(transactions); |
| 42 | + |
| 43 | + // Store the events in the database |
| 44 | + let tx_result = insert_to_db( |
| 45 | + self.get_pool(), |
| 46 | + self.name(), |
| 47 | + start_version, |
| 48 | + end_version, |
| 49 | + &events, |
| 50 | + &self.per_table_chunk_sizes, |
| 51 | + ) |
| 52 | + .await; |
| 53 | + |
| 54 | + return tx_result; |
| 55 | + } |
| 56 | +} |
| 57 | + |
| 58 | +async fn insert_to_db( |
| 59 | + conn: ArcDbPool, |
| 60 | + name: &'static str, |
| 61 | + start_version: u64, |
| 62 | + end_version: u64, |
| 63 | + events: &[EventModel], |
| 64 | + per_table_chunk_sizes: &AHashMap<String, usize>, |
| 65 | +) -> Result<(), diesel::result::Error> { |
| 66 | + tracing::trace!( |
| 67 | + name = name, |
| 68 | + start_version = start_version, |
| 69 | + end_version = end_version, |
| 70 | + "Inserting to db", |
| 71 | + ); |
| 72 | + execute_in_chunks( |
| 73 | + conn, |
| 74 | + insert_events_query, |
| 75 | + events, |
| 76 | + get_config_table_chunk_size::<EventModel>("events", per_table_chunk_sizes), |
| 77 | + ) |
| 78 | + .await?; |
| 79 | + Ok(()) |
| 80 | +} |
| 81 | +``` |
| 82 | + |
| 83 | +With the SDK, we've introduced the concept of steps, which represent independent units of processing logic. |
| 84 | +In the `EventsProcessor` example, the extraction of events and storing them in the database can be broken down into two steps. |
| 85 | + |
| 86 | +To migrate your processor to the SDK, you'll need to define these steps in your processor. |
| 87 | +You can use the `EventsExtractor` and `EventsStorer` steps in the example as a starting point for defining your own steps. |
| 88 | + |
| 89 | +Make the following changes to [`events_extractor.rs`](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/processors/events/events_extractor.rs). |
| 90 | +```rust |
| 91 | +// TODO: Update the step name |
| 92 | +pub struct EventsExtractor |
| 93 | +where |
| 94 | + Self: Sized + Send + 'static, {} |
| 95 | + |
| 96 | +#[async_trait] |
| 97 | +impl Processable for EventsExtractor { |
| 98 | + type Input = Vec<Transaction>; |
| 99 | + // TODO: Update the output type |
| 100 | + // This should be the data model you're extracting from the transactions |
| 101 | + type Output = Vec<EventModel>; |
| 102 | + type RunType = AsyncRunType; |
| 103 | + |
| 104 | + async fn process( |
| 105 | + &mut self, |
| 106 | + item: TransactionContext<Vec<Transaction>>, |
| 107 | + ) -> Result<Option<TransactionContext<Vec<EventModel>>>, ProcessorError> { |
| 108 | + // TODO: Update extraction logic. |
| 109 | + // This should be the same as the extraction logic in the old `process_transactions` method |
| 110 | + let events = item |
| 111 | + .data |
| 112 | + .par_iter() |
| 113 | + .map(|txn| { |
| 114 | + process_events(txn) |
| 115 | + }) |
| 116 | + .flatten() |
| 117 | + .collect::<Vec<EventModel>>(); |
| 118 | + |
| 119 | + Ok(Some(TransactionContext { |
| 120 | + data: events, |
| 121 | + metadata: item.metadata, |
| 122 | + })) |
| 123 | + } |
| 124 | +} |
| 125 | +``` |
| 126 | + |
| 127 | +Make the following changes to [`events_storer.rs`](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/processors/events/events_processor.rs). |
| 128 | +```rust |
| 129 | +pub struct EventsStorer |
| 130 | +where |
| 131 | + Self: Sized + Send + 'static, |
| 132 | +{ |
| 133 | + conn_pool: ArcDbPool, |
| 134 | + processor_config: DefaultProcessorConfig, |
| 135 | +} |
| 136 | + |
| 137 | +impl EventsStorer { |
| 138 | + pub fn new(conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig) -> Self { |
| 139 | + Self { |
| 140 | + conn_pool, |
| 141 | + processor_config, |
| 142 | + } |
| 143 | + } |
| 144 | +} |
| 145 | + |
| 146 | +#[async_trait] |
| 147 | +// TODO: Update step name |
| 148 | +impl Processable for EventsStorer { |
| 149 | + // TODO: Update input type for the step. |
| 150 | + // The input type should match the output type of the extractor step. |
| 151 | + type Input = Vec<EventModel>; |
| 152 | + type Output = (); |
| 153 | + type RunType = AsyncRunType; |
| 154 | + |
| 155 | + async fn process( |
| 156 | + &mut self, |
| 157 | + events: TransactionContext<Vec<EventModel>>, |
| 158 | + ) -> Result<Option<TransactionContext<()>>, ProcessorError> { |
| 159 | + let per_table_chunk_sizes: AHashMap<String, usize> = AHashMap::new(); |
| 160 | + let execute_res = execute_in_chunks( |
| 161 | + self.conn_pool.clone(), |
| 162 | + // TODO: Update this to the insertion query of your old processor |
| 163 | + insert_events_query, |
| 164 | + &events.data, |
| 165 | + get_config_table_chunk_size::<EventModel>("events", &per_table_chunk_sizes), |
| 166 | + ) |
| 167 | + .await; |
| 168 | + match execute_res { |
| 169 | + Ok(_) => { |
| 170 | + Ok(Some(TransactionContext { |
| 171 | + data: (), |
| 172 | + metadata: events.metadata, |
| 173 | + })) |
| 174 | + }, |
| 175 | + Err(e) => Err(ProcessorError::DBStoreError { |
| 176 | + message: format!( |
| 177 | + "Failed to store events versions {} to {}: {:?}", |
| 178 | + events.metadata.start_version, events.metadata.end_version, e, |
| 179 | + ), |
| 180 | + query: None, |
| 181 | + }), |
| 182 | + } |
| 183 | + } |
| 184 | +} |
| 185 | + |
| 186 | +impl AsyncStep for EventsStorer {} |
| 187 | + |
| 188 | +impl NamedStep for EventsStorer { |
| 189 | + fn name(&self) -> String { |
| 190 | + "EventsStorer".to_string() |
| 191 | + } |
| 192 | +} |
| 193 | +``` |
| 194 | + |
| 195 | +## 4. Migrate your processor |
| 196 | +Now that we've migrated the processing logic to steps, we need to also migrate the processor to instantiate the steps and connect them together. |
| 197 | +In [`events_processor.rs`](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/processors/events/events_processor.rs), make the following changes: |
| 198 | + |
| 199 | +```rust |
| 200 | +// TODO: Update processor name |
| 201 | +pub struct EventsProcessor { |
| 202 | + pub config: IndexerProcessorConfig, |
| 203 | + pub db_pool: ArcDbPool, |
| 204 | + // If you have any other fields in your processor, add them here |
| 205 | + // You can instantiate them accordingly in the processor's `new` method |
| 206 | +} |
| 207 | +``` |
| 208 | + |
| 209 | +In the `run_processor` method, you'll need to update the code to use the steps you created in [Step 3](#3-migrate-processing-logic-to-steps). |
| 210 | +```rust |
| 211 | +pub async fn run_processor(self) -> Result<()> { |
| 212 | + {...} |
| 213 | + |
| 214 | + // Define processor steps |
| 215 | + let transaction_stream_config = self.config.transaction_stream_config.clone(); |
| 216 | + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { |
| 217 | + starting_version: Some(starting_version), |
| 218 | + ..transaction_stream_config |
| 219 | + }) |
| 220 | + .await?; |
| 221 | + // TODO: Replace the next 2 lines with your steps |
| 222 | + let events_extractor = EventsExtractor {}; |
| 223 | + let events_storer = EventsStorer::new(self.db_pool.clone()); |
| 224 | + |
| 225 | + let version_tracker = VersionTrackerStep::new( |
| 226 | + get_processor_status_saver(self.db_pool.clone(), self.config.clone()), |
| 227 | + DEFAULT_UPDATE_PROCESSOR_STATUS_SECS, |
| 228 | + ); |
| 229 | + |
| 230 | + // Connect processor steps together |
| 231 | + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( |
| 232 | + transaction_stream.into_runnable_step(), |
| 233 | + ) |
| 234 | + // TODO: Replace the next 2 lines with your steps |
| 235 | + .connect_to(events_extractor.into_runnable_step(), 10) |
| 236 | + .connect_to(events_storer.into_runnable_step(), 10) |
| 237 | + .connect_to(version_tracker.into_runnable_step(), 10) |
| 238 | + .end_and_return_output_receiver(10); |
| 239 | + |
| 240 | + {...} |
| 241 | +} |
| 242 | +``` |
| 243 | + |
| 244 | +## 5. Update your `config.yaml` |
| 245 | +`IndexerProcessorConfig` reworks the format of the `config.yaml` file. |
| 246 | +Use the example [`config.yaml`](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/config.yaml). |
| 247 | + |
| 248 | +```yaml |
| 249 | +health_check_port: 8085 |
| 250 | +server_config: |
| 251 | + processor_config: |
| 252 | + # TODO: Update with processor type |
| 253 | + type: "events_processor" |
| 254 | + transaction_stream_config: |
| 255 | + indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443" |
| 256 | + # TODO: Update auth token |
| 257 | + auth_token: "AUTH_TOKEN" |
| 258 | + # TODO: Update with processor name |
| 259 | + request_name_header: "events-processor" |
| 260 | + db_config: |
| 261 | + # TODO: Update with your database connection string |
| 262 | + postgres_connection_string: postgresql://postgres:@localhost:5432/example |
| 263 | + # backfill_config: |
| 264 | + # backfill_alias: "events_processor_backfill_1" |
| 265 | +``` |
| 266 | + |
| 267 | +## 6. Run your migrated processor |
| 268 | +```bash |
| 269 | +cd ~/{DIRECTORY_OF_PROJECT}/aptos-indexer-processor-example |
| 270 | +cargo run --release -- -c config.yaml |
| 271 | +``` |
| 272 | + |
| 273 | +In your terminal, you should start to see logs like this: |
| 274 | +```bash |
| 275 | +{"timestamp":"2025-01-13T21:23:21.785452Z","level":"INFO","message":"[Transaction Stream] Successfully connected to GRPC stream","stream_address":"https://grpc.mainnet.aptoslabs.com/","connection_id":"ec67ecc4-e041-4f17-a2e2-441e7ff21487","start_version":2186504987,"filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/transaction-stream/src/transaction_stream.rs","line_number":349,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"} |
| 276 | +{"timestamp":"2025-01-13T21:23:21.785664Z","level":"INFO","message":"Spawning polling task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"} |
| 277 | +{"timestamp":"2025-01-13T21:23:21.785693Z","level":"INFO","message":"Spawning processing task","step_name":"TransactionStreamStep","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(23)"} |
| 278 | +{"timestamp":"2025-01-13T21:23:21.785710Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetExtractor","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"} |
| 279 | +{"timestamp":"2025-01-13T21:23:21.785912Z","level":"INFO","message":"Spawning processing task","step_name":"FungibleAssetStorer","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/async_step.rs","line_number":87,"threadName":"tokio-runtime-worker","threadId":"ThreadId(4)"} |
| 280 | +{"timestamp":"2025-01-13T21:23:21.785978Z","level":"INFO","message":"Spawning polling task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":112,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"} |
| 281 | +{"timestamp":"2025-01-13T21:23:21.786018Z","level":"INFO","message":"Spawning processing task","step_name":"VersionTrackerStep: ()","filename":"/Users/reneetso/.cargo/git/checkouts/aptos-indexer-processor-sdk-2f3940a333c8389d/e6867c5/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs","line_number":204,"threadName":"tokio-runtime-worker","threadId":"ThreadId(14)"} |
| 282 | +``` |
| 283 | + |
| 284 | +## 7. Backfilling with the SDK |
| 285 | +With the SDK, we've made some improvements to the backfilling process. |
| 286 | +There are two options on backfilling: |
| 287 | +1. You can keep following the old way of backfilling, which is to run a second instance of the processor and updating `starting_version` to the backfill version. |
| 288 | +2. The SDK introduces an improvement where you can track progress of a backfill and start and stop the backfill as needed. |
| 289 | +If you'd like to use the new backfilling process, update your `config.yaml` like so: |
| 290 | +```yaml |
| 291 | +health_check_port: 8085 |
| 292 | +server_config: |
| 293 | + processor_config: |
| 294 | + # TODO: Update with processor type |
| 295 | + type: "events_processor" |
| 296 | + transaction_stream_config: |
| 297 | + indexer_grpc_data_service_address: "https://grpc.testnet.aptoslabs.com:443" |
| 298 | + # TODO: Update with backfill version |
| 299 | + starting_version: {backfill version} |
| 300 | + # TODO: Update auth token |
| 301 | + auth_token: "AUTH_TOKEN" |
| 302 | + # TODO: Update with processor name |
| 303 | + request_name_header: "events-processor" |
| 304 | + db_config: |
| 305 | + # TODO: Update with your database connection string |
| 306 | + postgres_connection_string: postgresql://postgres:@localhost:5432/example |
| 307 | + backfill_config: |
| 308 | + # TODO: Update with your backfill alias. This should be unique for each backfill |
| 309 | + backfill_alias: "events_processor_backfill_1" |
| 310 | +``` |
0 commit comments