add management API with account parsing capabilities [MTG-1477]#473
add management API with account parsing capabilities [MTG-1477]#473
Conversation
…ities - Add new management API service - Implement account fixer functionality to parse and process accounts - Add integration tests for management API - Refactor account processors into a wrapper for better reusability - Add configuration option for management API port
Summary by CodeRabbit
WalkthroughThe changes introduce several new components and enhancements. In the integration tests, a workspace dependency ( Changes
Sequence Diagram(s)sequenceDiagram
participant Test as Integration Test
participant Client as API Client (reqwest)
participant API as Management API Server
participant Handler as AssetFixHandler
participant Fixer as AssetFixer
participant Processor as ProcessorsWrapper
Test->>Client: Initiate asset fix test
Client->>API: POST /management/fix-account-assets
API->>Handler: Route request to AssetFixHandler.fix_account_assets()
Handler->>Fixer: Call fix_accounts() asynchronously
Fixer->>Processor: Process accounts in batches via process_account_by_type()
Processor-->>Fixer: Return FixResult (success and failed pubkeys)
Fixer-->>Handler: Return FixResult
Handler-->>API: Formulate HTTP response
API-->>Client: Return response (success/failure)
Suggested reviewers
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 16
🔭 Outside diff range comments (2)
nft_ingester/src/message_parser.rs (2)
104-134: 🧹 Nitpick (assertive)Consider improving error handling for unexpected account types
The extracted
parse_account_infomethod improves modularity and enables the new asset fixing functionality. However, it silently returns an empty vector for unrecognized account types (lines 122-124), which could make debugging issues more difficult.Consider adding debug or trace logging for unrecognized account types:
} else { + debug!(pubkey = %account_info.pubkey, owner = %account_owner, "Skipping unrecognized account type"); Vec::new() }
96-99: 🧹 Nitpick (assertive)Review usage of unwrap to prevent potential panics
There's use of
unwrap()which could potentially lead to panics if the underlying operations fail. Consider using proper error handling instead to make the code more robust.-let account_info = plerkle_serialization::root_as_account_info(account_info_bytes.as_ref()) - .map_err(|e| IngesterError::AccountParsingError(e.to_string())) - .unwrap(); +let account_info = plerkle_serialization::root_as_account_info(account_info_bytes.as_ref()) + .map_err(|e| IngesterError::AccountParsingError(e.to_string()))?;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (16)
integration_tests/Cargo.toml(1 hunks)integration_tests/src/lib.rs(1 hunks)integration_tests/src/management_api_tests.rs(1 hunks)nft_ingester/Cargo.toml(1 hunks)nft_ingester/src/asset_fixer.rs(1 hunks)nft_ingester/src/bin/ingester/main.rs(2 hunks)nft_ingester/src/config.rs(1 hunks)nft_ingester/src/lib.rs(2 hunks)nft_ingester/src/management_api/handlers/asset_fix.rs(1 hunks)nft_ingester/src/management_api/handlers/mod.rs(1 hunks)nft_ingester/src/management_api/mod.rs(1 hunks)nft_ingester/src/management_api/service.rs(1 hunks)nft_ingester/src/message_parser.rs(1 hunks)nft_ingester/src/processors/accounts_processor.rs(8 hunks)nft_ingester/src/processors/mod.rs(1 hunks)nft_ingester/src/processors/processors_wrapper.rs(1 hunks)
🧰 Additional context used
🧬 Code Definitions (5)
nft_ingester/src/management_api/mod.rs (1)
nft_ingester/src/management_api/service.rs (1)
start_management_api(20-60)
nft_ingester/src/bin/ingester/main.rs (1)
nft_ingester/src/management_api/service.rs (1)
start_management_api(20-60)
nft_ingester/src/processors/accounts_processor.rs (2)
nft_ingester/src/processors/processors_wrapper.rs (1)
build(32-56)nft_ingester/src/processors/account_based/mpl_core_fee_indexing_processor.rs (2)
build(30-38)store_mpl_assets_fee(73-101)
nft_ingester/src/asset_fixer.rs (2)
nft_ingester/src/management_api/handlers/asset_fix.rs (1)
new(32-34)nft_ingester/src/message_parser.rs (1)
new(52-59)
nft_ingester/src/management_api/handlers/asset_fix.rs (2)
nft_ingester/src/asset_fixer.rs (1)
new(31-40)nft_ingester/src/message_parser.rs (1)
new(52-59)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: lint
- GitHub Check: test
🔇 Additional comments (26)
nft_ingester/src/processors/mod.rs (1)
3-3: Module declaration looks good.This adds a new public module
processors_wrapperwhich will be responsible for providing a unified interface for processing different types of accounts, aligning with the new management API functionality.nft_ingester/Cargo.toml (1)
28-28: LGTM - Appropriate dependency for account decoding.The addition of
solana-account-decoderis necessary for the new asset fixing functionality which will need to decode Solana account data from RPC responses.nft_ingester/src/config.rs (1)
188-189: Config parameter correctly defined.The management API port is properly defined as an optional u16 with appropriate Clap attributes for command-line and environment variable configuration. Making it optional is a good choice as it allows the management API to be disabled by default.
nft_ingester/src/bin/ingester/main.rs (2)
96-99: Good logging for observability.Proper logging of the management API status and port improves observability and helps with debugging.
479-502: Management API initialization looks good.The implementation correctly:
- Checks if a port is provided before starting the API
- Appropriately clones necessary dependencies
- Uses the executor to spawn an async task that won't block the main thread
- Passes all required parameters to the management API service
The approach is consistent with how other services are started in this application.
nft_ingester/src/management_api/service.rs (1)
44-47: Restrict management endpoints if needed.Exposing potentially sensitive endpoints (like
/management/fix-account-assets) on public interfaces can pose security risks. If these endpoints are internal or require authentication, ensure proper access controls are in place (e.g., restricting access to localhost or using authentication).nft_ingester/src/management_api/handlers/mod.rs (1)
1-3: Module organization appears clean.Exporting
AssetFixHandlerat this level helps keep imports tidy in consumer code. Good job.integration_tests/src/lib.rs (1)
10-11: New test module integration looks good.The new
management_api_testsmodule, conditioned under#[cfg(test)], nicely follows the established testing pattern and ensures these tests won't be compiled unless in test mode.integration_tests/Cargo.toml (1)
35-35: LGTM: Adding reqwest for integration testing of the new management APIAdding the reqwest crate as a workspace dependency is appropriate for testing the new management API endpoints. This follows the same pattern as other dependencies in the project.
nft_ingester/src/lib.rs (2)
3-3: LGTM: New asset_fixer module aligns with PR objectivesThe introduction of this public module is consistent with the PR's goal of adding asset fixing capabilities.
18-18: LGTM: New management_api module aligns with PR objectivesThe introduction of this public module is consistent with the PR's goal of adding a management API service.
nft_ingester/src/management_api/mod.rs (1)
1-4: LGTM: Well-structured API module organizationThe module structure follows good practices for web service organization, with a clear separation between handlers and service implementation. Re-exporting the
start_management_apifunction improves API ergonomics by allowing users to access it directly from the management_api module.nft_ingester/src/message_parser.rs (1)
100-102: LGTM: Good refactoring to reuse account parsing logicExtracting the call to
parse_account_infofromparse_accountis a good refactoring that promotes code reuse and makes the account parsing logic available to other components like the newAssetFixer.nft_ingester/src/processors/processors_wrapper.rs (1)
44-46:Details
✅ Verification successful
Verify if
update_rentrequires async invocation.
You are callingcore_fees_processor.update_rent(cancellation_token.child_token());withoutawait. If the method is truly asynchronous, ignoring the result may cause race conditions or unreported errors. Consider providing a.awaitor clarifying its operation.
🏁 Script executed:
#!/bin/bash # Searching for update_rent definition to confirm if it’s async or sync rg -A 10 "fn update_rent"Length of output: 1403
Verified:
update_rentis a synchronous call.The search confirms that
update_rentis defined as a non-async function (innft_ingester/src/processors/account_based/mpl_core_fee_indexing_processor.rs), which internally spawns an async task. As a result, it does not require anawaitat the call site inprocessors_wrapper.rs.nft_ingester/src/asset_fixer.rs (5)
1-16: Structure and Dependencies CheckThis file organizes its dependencies and imports in a clear and coherent manner. The usage of
Arc,HashMap, and RPC-related crates aligns well with the intended functionality of fetching accounts and storing them in batches. No issues found at this stage.
30-40: Initiation ofAssetFixerThe
newconstructor sets up relevant fields, including instantiating aMessageParser. This is a neat initialization approach. Ensure all components (e.g.,ProcessorsWrapper) share the same configuration (logging levels, etc.) so that debugging is simpler.
48-61: Batch Storage SetupInitializing a
BatchSaveStoragewithbatch_sizeis a well-reasoned approach. Keep an eye on potential large memory usage ifbatch_sizeis set too high.Would you like to confirm that your chosen
batch_sizeis well-tested under load by running performance checks? If so, I can provide a shell script that stress-tests the ingester.
141-141: Batch Storage FlushFlushing batch storage at the end is crucial for ensuring data durability. This is correctly handled with error propagation as a
FixError::Storage. Good practice!
62-69:Details
✅ Verification successful
RPC Configuration
Using
Base64Zstdencoding and a finalized commitment ensures data integrity. Verify externally that all expected account data is correctly decodable (some older accounts might not support certain encodings). Otherwise, this is a valid choice.
🏁 Script executed:
#!/bin/bash # Searching the codebase for references to Base64Zstd and verifying known issues rg -A10 "Base64Zstd"Length of output: 1708
RPC Configuration – Consistency Verified
The use of
Base64Zstdencoding with a finalized commitment innft_ingester/src/asset_fixer.rsis consistent with other parts of the codebase (see integration tests inintegration_tests/src/common.rs). This configuration should ensure data integrity, but please externally verify that all expected account data—especially from older accounts potentially lacking support for this encoding—is correctly decodable.nft_ingester/src/processors/accounts_processor.rs (7)
8-8: New Imports for Fee and Unprocessed AccountsAdding
CoreAssetFeeandUnprocessedAccountMessageprovides better clarity on data structures used in processing. This is a logical step in consolidating account and fee processing logic.
23-23: Wrapper Import
use super::processors_wrapper::ProcessorsWrapper;indicates the shift to the unified processors approach. Centralizing processor logic in a single wrapper can enhance maintainability and reduce duplication.
107-107: Refactored Field inAccountsProcessorReplaced multiple processor references with
processors_wrapper. This reduces the number of fields, streamlining interactions. Good refactor for readability and maintainability.
130-137: Integration withProcessorsWrapper::buildBundling multiple processors into
ProcessorsWrapperkeeps the build logic consistent. Should some external configuration or additional parameters be needed in the future, you can extendProcessorsWrapper::buildin one place.
142-142: Replacement of Processor FieldsAssigning the newly built
processors_wrappertoAccountsProcessorcontinues the refactor’s goal of decoupling and centralizing logic, which looks clean.
207-210: Final Fees FlushAt process exit, fees are once again stored. This is critical for ensuring no pending data is lost. The same recommendation applies: monitor performance if the list is large.
229-233: Unified Account Processing CallCalling
self.processors_wrapper.process_account_by_typefor account processing centralizes the processing logic. This fosters consistency across different usage scenarios.
| let asset_fixer = | ||
| Arc::new(AssetFixer::new(rocks_storage, rpc_client, accounts_processor, metrics, 10000)); |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Clarify the hard-coded limit (10000).
The numeric value 10000 passed to the AssetFixer constructor appears to be a "magic number." Consider documenting its purpose or making it configurable to improve maintainability.
There was a problem hiding this comment.
agree, pls make it a constant, can be either somewhere in this file or nft_ingester/src/consts.rs
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if consts.rs exists
if [ -f "nft_ingester/src/consts.rs" ]; then
echo "consts.rs exists. Examining its content:"
cat "nft_ingester/src/consts.rs"
else
echo "consts.rs does not exist"
fi
# Examine AssetFixer implementation to understand what the 10000 value is for
cat "nft_ingester/src/asset_fixer.rs" | grep -A 3 -B 3 "batch_size"
Length of output: 11310
@armyhaylenko I'll implement your suggestion by adding a constant for the batch size in consts.rs. The 10000 value is used as the batch size parameter when initializing the AssetFixer - it controls how many items are processed in a batch before saving to storage.
🏁 Script executed:
#!/bin/bash
# Create a patch for adding the constant to consts.rs and using it in service.rs
cat << 'EOF' > batch_size_constant.patch
diff --git a/nft_ingester/src/consts.rs b/nft_ingester/src/consts.rs
index 5a42d2c..bc90a76 100644
--- a/nft_ingester/src/consts.rs
+++ b/nft_ingester/src/consts.rs
@@ -5,6 +5,7 @@ use serde_json::Value;
pub const RAYDIUM_API_HOST: &str = "https://api-v3.raydium.io";
pub const VERSION_FILE_PATH: &str = "./VERSION.txt";
pub const WELLKNOWN_FUNGIBLE_TOKENS: &str = r#"[["HxhWkVpk5NS4Ltg5nij2G671CKXFRKPK8vy271Ub4uEK", "Hxro"],["2b1kV6DkPAnxd5ixfnxCpjxmKwqjjaYmCZfHsFu24GXo","PYUSD"],["2FPyTwcZLUg1MDrwsyoP4D6s1tM7hAkHYRjkNb5w6Pxk","ETH"],["2VhjJ9WxaGC3EZFwJG9BDUs9KxKCAjQY4vgd1qxgYWVg","EUROe"],["2wmKXX1xsxLfrvjEPrt2UHiqj8Gbzwxvffr9qmNjsw8g","Solar"],["2zMMhcVQEXDtdE6vsFS7S7D5oUodfJHE8vd1gnBouauv","PENGU"],["333iHoRM2Awhf9uVZtSyTfU8AekdGrgQePZsKMFPgKmS","ISOLA"],["3bRTivrVsitbmCTGtqwp7hxXPsybkjn4XLNtPsHqa3zR","LIKE"],["3eamaYJ7yicyRd3mYz4YeNyNPGVo6zMmKUp5UP25AxRM","NIRV"],["3FoUAsGDbvTD6YZ4wVKJgTB76onJUKz7GPEBNiR5b8wc","CHEEMS"],["3joMReCCSESngJEpFLoKR2dNcChjSRCDtybQet5uSpse","CAT"],["3JSf5tPeuscJGtaCp5giEiDhv51gQ4v3zWg8DGgyLfAB","YFI"],["3K6rftdAaQYMPunrtNRHgnK2UAtjm2JwyT2oCiTDouYE","XCOPE"],["49c7WuCZkQgc3M4qH8WuEUNXfgwupZf1xqWkDQ7gjRGt","SAND"],["4BVrtjWqPsn4oyWDsB8oCLiS5DJN8Wwhmy4RKEzpys4u","1000X"],["4dmKkXNHdgYsXqBHCuMikNQWwVomZURhYvkkX5c4pQ7y","SNY"],["4Hx6Bj56eGyw8EJrrheM6LBQAvVYRikYCWsALeTrwyRU","DYDX"],["4k3Dyjzvzp8eMZWUXbBCjEvwSkkk59S5iCNLY3QrkX6R","RAY"],["4SZjjNABoqhbd4hnapbvoEPEqT8mnNkfbEoAwALf1V8t","CAVE"],["4TGxgCSJQx2GQk9oHZ8dC5m3JNXTYZHjXumKAW3vLnNx","OXS"],["4wjPQJ6PrkC4dHhYghwJzGBVP78DkBzA2U3kHoFNBuhj","LIQ"],["51tMb3zBKDiQhNwGqpgwbavaGH54mk8fXFzxTc1xnasg","APEX"],["5DkzT65YJvCsZcot9L6qwkJnsBCPmKHjJz3QU7t7QeRW","ANA"],["5gs8nf4wojB5EXgDUWNLwXpknzgV2YWDhveAeBZpVLbp","XTAG"],["5LSFpvLDkcdV2a3Kiyzmg5YmJsj2XDLySaXvnfP1cgLT","DOGO"],["5tN42n9vMi6ubp67Uy4NnmM5DMZYN8aS8GeB3bEDHr6E","WAG"],["5U9QqCPhqXAJcEv9uyzFJd5zhN93vuPk1aNNkXnUfPnt","SPWN"],["5Wsd311hY8NXQhkt9cWHwTnqafk7BGEbLu8Py3DSnPAr","CMFI"],["5z3EqYQo9HiCEs3R84RCDMu2n7anpDMxRhdK8PSWmrRC","PONKE"],["674PmuiDtgKx3uKuJ1B16f9m5L84eFvNwj3xDMvHcbo7","$WOOD"],["6cVgJUqo4nmvQpbgrDZwyfd6RwWw5bfnCamS3M9N1fd","SHILL"],["6F9XriABHfWhit6zmMUYAQBSy6XK5VF1cHXuW5LDpRtC","RUN"],["76aYNHbDfHemxSS7vmh6eJGfjodK8m7srCxiYCrKxzY1","ROLL"],["7dgHoN8wBZCc5wbnQ2C47TDnBMAxG4Q5L3KjP67z8kNi","MANA"],["7dHbWXmci3dT8UFYWYZweBLXgycu7Y3iL6trKn1Y7ARj","stSOL"],["7Q2afV64in6N6SeZsAAB81TJzwDoD6zpqmHkzi9Dcavn","JSOL"],["7vfCXTUXx5WJV5JADk17DUJ4ksgau7utNKj4b963voxs","WETH"],["7xKXtg2CW87d97TXJSDpbD5jBkheTqA83TZRuJosgAsU","SAMO"],["7zBWymxbZt7PVHQzfi3i85frc1YRiQc23K7bh3gos8ZC","VI"],["8FU95xFJhUUkyyCLU13HSzDLs7oC4QZdXQHL6SCeab36","UNI"],["8HGyAAB1yoM1ttS7pXjHMa3dukTFGQggnFFH3hJZgzQh","COPE"],["8PMHT4swUMtBzgHnh5U564N5sjPSiUz2cjEQzFnnP1Fo","ROPE"],["8upjSpvjcdpuzhfR1zriwg5NXkwDruejqNE9WNbPRtyA","GRAPE"],["9F9fNTT6qwjsu4X4yWYKZpsbw5qT7o6yR2i57JF2jagy","SWAG"],["9gP2kCy3wA1ctvYWQk75guqXuHfrEomqydHLtcTCqiLa","WBNB"],["9LzCMqDgTKYz9Drzqnpgee3SGa89up3a247ypMj2xrqM","AUDIO"],["9n4nbM75f5Ui33ZbPYXn59EwSgE8CGsHtAeTH5YFeJ9E","BTC"],["9nEqaUcb16sQ3Tn1psbkWqyhPdLmfHWjKGymREjsAgTE","WOOF"],["9TE7ebz1dsFo1uQ2T4oYAKSm39Y6fWuHrd6Uk6XaiD16","MIMO"],["9zoqdwEBKWEi9G5Ze8BSkdmppxGgVv1Kw4LuigDiNr9m","STR"],["a11bdAAuV8iB2fu7X6AxAvDTo1QZ8FXB3kk5eecdasp","ABR"],["A98UDy7z8MfmWnTQt6cKjje7UfqV3pTLf4yEbuwL2HrH","MODUL"],["AAXng5czWLNtTXHdWEn9Ef7kXMXEaraHj2JQKo7ZoLux","DGE"],["ABt79MkRXUsoHuV2CVQT32YMXQhTparKFjmidQxgiQ6E","ORDER"],["AD27ov5fVU2XzwsbvnFvb1JpCBaCB5dRXrczV9CqSVGb","REAL"],["AFbX8oGjGpmVFywbVouvhQSRmiW2aR1mohfahi4Y2AdB","GST"],["AGFEad2et2ZJif9jaGpdMixQqvW5i81aBdvKe7PHNfz3","FTT"],["AH3g889eGTV1qwaLmbiGrmYEbbK8pcaPjnd4TxUrCCsZ","GRAIN"],["AkhdZGVbJXPuQZ53u2LrimCjkRP6ZyxG1SoM85T98eE1","BOT"],["APTtJyaRX5yGTsJU522N4VYWg3vCvSb65eam5GrPT5Rt","APT"],["AR1Mtgh7zAtxuxGd2XPovXPVjcSdY3i4rQYisNadjfKy","SUSHI"],["ArUkYE2XDKzqy77PRRGjo4wREWwqk6RXTfM9NeqzPvjU","renDOGE"],["ATLASXmbPQxBUYbxPsV97usA3fPQYEqzQBUHgiFCUsXx","ATLAS"],["AURYydfxJib1ZkTir1Jn1J9ECYUtjb6rKQVmtYaixWPP","AURY"],["Basis9oJw9j8cw53oMV7iqsgo6ihi9ALw4QR31rcjUJa","BASIS"],["BKipkearSqAUdNKa1WDstvcMjoPsSKBuNyvKDQDDu9WE","HAWK"],["BLwTnYKqf7u4qjgZrrsKeNs2EzWkMLqVCu6j8iHyrNA3","BOP"],["bonegFPgrpZ4bfVn3kQK1aMbGYddWtfMAywNt5LsuVE","BONES"],["BRLsMczKuaR5w9vSubF4j8HwEGGprVAyyVgS4EX7DKEg","CYS"],["Bx1fDtvTN6NvE4kjdPHQXtmGSg582bZx9fGy4DQNMmAT","SOLC"],["cbbtcf3aa214zXHbiAZQwf4122FBYbraNdFqgw4iMij","cbBTC"],["CDJWUqTcYTVAKXAVXoQZFes5JUFc7owSeq7eMQcDSbo5","renBTC"],["Ce3PSQfkxT5ua4r2JqCoWYrMwKWC5hEzwsrT9Hb7mAz9","DATE"],["CH74tuRLTYcxG7qNJCsV9rghfLXJCQJbsu7i52a8F1Gn","SOLX"],["ChVzxWRmrTeSgwd3Ui3UumcN8KX7VK3WaD4KGeSKpypj","SUSHI"],["CiKu4eHsVrc1eueVQeHn7qhXTcVu95gSQmBpX4utjL9z","SHIB"],["CKaKtYvz6dKPyMvYq9Rh3UBrnNqYZAyd7iF4hJtjUvks","GARI"],["CLr7G2af9VSfH1PFZ5fYvB8WK1DTgE85qrVjpa8Xkg4N","prANA"],["CN7qFa5iYkHz99PTctvT4xXUHnxwjQ5MHxCuTJtPN5uS","BOKU"],["CobcsUrt3p91FwvULYKorQejgsm5HoQdv5T8RUZ6PnLA","PEOPLE"],["CRWNYkqdgvhGGae9CKfNka58j6QQkaD5bLhKXvUYqnc1","CRWNY"],["CsZ5LZkDS7h9TDKjrbL7VAwQZ9nsRu8vJLhRYfmGaN8K","ALEPH"],["CWE8jPTUYhdCTZYWPTe1o5DFqfdjzWKc9WKz6rSjQUdG","LINK"],["cxxShYRVcepDudXhe7U62QHvw8uBJoKFifmzggGKVC2","CHICKS"],["Czt7Fc4dz6BpLh2vKiSYyotNK2uPPDhvbWrrLeD9QxhV","NESTA"],["DFL1zNkaGPWm1BqAVqRjCZvHmwTFrEaJtbzJWgseoNJh","DFL"],["DMCUFm2ZAnSU7UgsdVq23gRogMU3MEBjPgQF1gK53rEn","UM"],["DubwWZNWiNGMMeeQHPnMATNj77YZPZSAz2WVR5WjLJqz","CRP"],["Dz8VutERqbHR2aFL5A3s1Ky4dG1unJT1jUFXXPaY9ytX","WOO"],["E5ndSkaB17Dm7CsD22dvcjfrYSDLCxFcMd6z8ddCk5wp","RIN"],["Ea5SjE2Y6yvCeW5dYTn7PYMuW5ikXkvbGdcmSnXeaLjS","PAI"],["EchesyfXePKdLtoiZSL8pBe8Myagyy8ZRqsACNCFGnvp","FIDA"],["EdAhkbj5nF9sRM7XN7ewuW8C9XEUMs8P7cnoQ57SYE96","FAB"],["EFFECT1A1R3Dz8Hg4q5SXKjkiPc6KDRUWQ7Czjvy4H7E","EFFECT"],["EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v","USDC"],["EqWCKXfs3x47uVosDpTRgFniThL9Y8iCztJaapxbEaVX","LUA"],["ErGB9xa24Szxbk1M28u2Tx8rKPqzL6BroNkkzk5rG4zj","FRKT"],["Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB","USDT"],["ETAtLmCmsoiEEKfNrHKJ2kYy3MoABhU6NQvpSfij5tDs","MEDIA"],["EwJN2GqUGXXzYmoAciwuABtorHczTA5LqbukKXV1viH7","UPS"],["F3nefJBcejYbtdREjui1T9DPh5dBgpkKq7u2GAAMXs5B","AART"],["FANTafPFBAt93BNJVpdu25pGPmca3RfwdsDsRrT3LX1r","FANT"],["FgX1WD9WzMU3yLwXaFSarPfkgzjLb2DZCqmkx9ExpuvJ","NINJA"],["FLWRna1gxehQ9pSyZMzxfp4UhewvLPwuKfdUTgdZuMBY","DPC"],["Fm9rHUTF5v3hwMLbStjZXqNBBoZyGriQaFM6sTFz3K8A","MBS"],["FNFKRV3V8DtA3gVJN6UshMiLGYA8izxFwkNWmJbFjmRj","TTT"],["FnKE9n6aGjQoNWRBZXy4RW6LZVao7qwBonUbiD7edUmZ","SYP"],["fujiCeCeP9AFDVCv27P5JRcKLoH7wfs2C9xmDECs24m","FUJI"],["G9tt98aYSznRk7jWsfuz9FnTdokxS6Brohdo9hSmjTRB","PUFF"],["Gdck9KXSSiMMhNyjUjo4sVT1GDzeZnZP2yse9jhax3GR","HVLO"],["GENEtH5amGSi8kHAtQoezp1XEXwZJ8vcuePYnXdKrMYz","GENE"],["GFX1ZjR2P15tmrSwow6FjyDYcEkoFb4p4gJCpLBjaxHD","GOFX"],["GsNzxJfFn6zQdJGeYsupJWzUAm57Ba7335mfhWvFiE9Z","DXLC"],["GUohe4DJUA5FKPWo3joiPgsB7yzer7LpDmt1Vhzy3Zht","KEEP"],["GXMvfY2jpQctDqZ9RoU3oWPhufKiCcFEfchvYumtX7jd","TOMO"],["H7Qc9APCWWGDVxGD5fJHmLTmdEgT9GFatAKFNg6sHh8A","OOGI"],["HCgybxq5Upy8Mccihrp7EsmwwFqYZtrHrsmsKwtGXLgW","STARS"],["HfYFjMKNZygfMC8LsQ8LtpPsPxEJoXJx4M6tqi75Hajo","CWAR"],["HJbNXx2YMRxgfUJ6K4qeWtjatMK5KYQT1QnsCdDWywNv","ATS"],["HKfs24UEDQpHS5hUyKYkHd9q7GY5UQ679q2bokeL2whu","TINY"],["HovGjrBGTfna4dvg6exkMxXuexB3tUfEZKcut8AWowXj","FCON"],["HX6zNkjJ7zy653VoDWzbaYpSg7BrgLfq4i4RA7D5nkkz","ISLAND"],["HysWcbHiYY9888pHbaqhwLYZQeZrcQMXKQWRqS7zcPK5","AXS"],["HZRCwxP2Vq9PCpPXooayhJ2bxTpo5xfpQrwB1svh332p","LDO"],["inL8PMVd6iiW3RCBJnr5AsrRN6nqr4BTrcNuQWQSkvY","IN"],["iVNcrNE9BRZBC9Aqf753iZiZfbszeAVUoikgT9yvr2a","IVN"],["kiNeKo77w1WBEzFFCXrTDRWGRWGP8yHvKC9rX6dqjQh","KKO"],["kinXdEcpDQeHPEuQnqmUgtYykqKGVFq6CeVX5iAHJq6","KIN"],["Lrxqnh6ZHKbGy3dcrCED43nsoLkM1LTzU2jRfWe8qUC","LARIX"],["MangoCzJ36AjZyKwVj3VnYU4GTonjfVEnJmvvWaxLac","MNGO"],["MAPS41MDahZ9QdKXhVa4dWB9RuyfV4XqhyAZ8XcYepb","MAPS"],["MEANeD3XDdUmNMsRGjASkSWdC8prLYsoRJ61pPeHctD","MEAN"],["MEFNBXixkEbait3xn9bkm8WsJzXtVsaJEn4c8Sam21u","ME"],["MERt85fc5boKw3BW1eYdxonEuJNvXbiMbs6hvheau5K","MER"],["METAmTMXwdb8gYzyCPfXXFmZZw4rUsXX58PNsDg7zjL","SLC"],["MNDEFzGvMt87ueuHvVU9VcTqsAP5b3fTGPsHuuPA5ey","MNDE"],["mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So","mSOL"],["NFTUkR4u7wKxy9QLaX2TGvd9oZSWoMo4jqSJqdMb7Nk","BLOCK"],["nosXBVoaCTtYdLvKY6Csb4AC8JCdQKKAaWYtx2ZMoo7","NOS"],["orcaEKTdK7LKz57vaAYr9QeNsVEPfiu6QeMU1kektZE","ORCA"],["poLisWXnNRwC6oBu1vHiuKQzFjGL4XDSu4g9qjz9qVk","POLIS"],["PoRTjZMPXb9T7dyU7tpLEZRQj7e6ssfAE62j2oQuc6y","PORT"],["PRAxfbouRoJ9yZqhyejEAH6RvjJ86Y82vfiZTBSM3xG","PRANA"],["PRSMNsEPqhGVCH1TtWiJqPjJyh2cKrLostPZTNy1o5x","PRISM"],["PRT88RkA4Kg5z7pKnezeNH4mafTvtQdfFgpQTGRjz44","PRT"],["PsyFiqqjiv41G7o5SMRzDJCu4psptThNR2GtfeGHfSq","PSY"],["Saber2gLauYim4Mvftnrasomsv6NvAuncvMEZwcLpD1","SBR"],["SCYfrGCw8aDiqdgcpdGjV6jp4UVVQLuphxTDLNWu36f","SCY"],["seedEDBqu63tJ7PFqvcbwvThrYUkQeqT6NLf81kLibs","SEEDED"],["SHDWyBxihqiCj6YekG2GUr7wqKLeLAMK1gHZck9pL6y","SHDW"],["SLNDpmoWTVADgEdndyvWzroNL7zSi1dF9PC3xHGtPwp","SLND"],["SLRSSpSLUTP7okbCUBYStWCo1vUgyt775faPqz8HUMr","SLRS"],["SNSNkV9zfG5ZKWQs6x4hxvBRV6s8SqMfSGCtECDvdMd","SNS"],["So11111111111111111111111111111111111111112","WSOL"],["sonarX4VtVkQemriJeLm6CKeW3GDMyiBnnAEMw1MRAE","SONAR"],["SRMuApVNdxXokk5GT7XD5cUUgXMBCoAz2LHeuAoKWRt","SRM"],["StepAscQoEioFxxWGnh2sLBDFp9d8rvKz2Yp39iDpyT","STEP"],["SuperbZyz7TsSdSoFAZ6RYHfAWe9NmjXBLVQpS8hqdx","SB"],["susdabGDNbhrnCa6ncrYo81u4s9GM8ecK2UwMyZiq4X","sUSD"],["svtMpL5eQzdmB3uqK9NXaQkq8prGZoKQFNVJghdWCkV","SVT"],["TuLipcqtGVXP9XR62wM8WWCm6a9vhLs7T1uoWBk6FDs","TULIP"],["USDrbBQwQbQ2oWHUPfA8QBHcyVxKUq1xHyXsSLKdUq2","USDr"],["UXPhBoR3qG4UCiGNJfV7MqhHyFqKN68g45GoYvAeL2M","UXP"],["vSoLxydx6akxyMD9XEcPvGYNGq6Nn66oqVb3UkGkei7","vSOL"],["xxxxa1sKNGwFtw2kFn8XauW9xq8hBZ5kVtcSesTT9fW","SLIM"],["YAWtS7vWCSRPckx1agB6sKidVXiXiDUfehXdEUSRGKE","YAW"],["z3dn17yLaGMKffVogeFHQ9zWVcXgqgf3PQnDsNs2g6M","OXY"],["zebeczgi5fSEtbpfQKVZKCJ3WgYXxjkMUkNNx7fLKAF","ZBC"],["ZEXy1pqteRu3n13kdyh4LwPQknkFk3GzmMYMuNadWPo","ZEX"]]"#;
pub fn wellknown_fungible_tokens_map() -> HashMap<String, String> {
let mut map = HashMap::new();
let data: Value = match serde_json::from_str(WELLKNOWN_FUNGIBLE_TOKENS) {
Ok(value) => value,
Err(_) => return HashMap::new(),
};
if let Value::Array(arr) = data {
for item in arr {
if let Value::Array(pair) = item {
if let (Some(Value::String(address)), Some(Value::String(symbol))) =
(pair.first(), pair.get(1))
{
map.insert(address.clone(), symbol.clone());
}
}
}
}
map
}
# Add a new constant for the asset fixer batch size
pub const ASSET_FIXER_DEFAULT_BATCH_SIZE: usize = 10000;
EOF
# Show the diff for service.rs to use the constant
cat << 'EOF' > service_rs_update.patch
diff --git a/nft_ingester/src/management_api/service.rs b/nft_ingester/src/management_api/service.rs
index eac4fb6..4fa8fc8 100644
--- a/nft_ingester/src/management_api/service.rs
+++ b/nft_ingester/src/management_api/service.rs
@@ -13,6 +13,7 @@ use tracing::{error, info};
use crate::{
asset_fixer::AssetFixer, management_api::handlers::AssetFixHandler,
+ consts::ASSET_FIXER_DEFAULT_BATCH_SIZE,
processors::processors_wrapper::ProcessorsWrapper,
};
@@ -36,7 +37,7 @@ pub async fn start_management_api(
.unwrap();
let asset_fixer =
- Arc::new(AssetFixer::new(rocks_storage, rpc_client, accounts_processor, metrics, 10000));
+ Arc::new(AssetFixer::new(rocks_storage, rpc_client, accounts_processor, metrics, ASSET_FIXER_DEFAULT_BATCH_SIZE));
let asset_fix_handler = Arc::new(AssetFixHandler::new(asset_fixer));
EOF
echo "Proposed changes:"
echo -e "\n1. Add constant to nft_ingester/src/consts.rs:"
cat batch_size_constant.patch
echo -e "\n2. Use constant in nft_ingester/src/management_api/service.rs:"
cat service_rs_update.patch
Length of output: 11767
@armyhaylenko I've prepared a solution to convert the magic number 10000 to a named constant. After examining the code, I found that this value is used as the batch size for processing accounts in the AssetFixer.
Here's the implementation:
- Add a new constant to
nft_ingester/src/consts.rs:
pub const ASSET_FIXER_DEFAULT_BATCH_SIZE: usize = 10000;- Use this constant in
nft_ingester/src/management_api/service.rs:
use crate::{
asset_fixer::AssetFixer, management_api::handlers::AssetFixHandler,
consts::ASSET_FIXER_DEFAULT_BATCH_SIZE,
processors::processors_wrapper::ProcessorsWrapper,
};
// Later in the code:
let asset_fixer =
Arc::new(AssetFixer::new(rocks_storage, rpc_client, accounts_processor, metrics, ASSET_FIXER_DEFAULT_BATCH_SIZE));This improves code clarity by naming the magic number appropriately based on its purpose as the batch size for asset processing.
| async fn health_check() -> &'static str { | ||
| "Management API is running" | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Return structured JSON for consistency.
While returning a string literal is fine, it's often more useful to return a small JSON object for health checks (e.g., { "status": "ok" }) to maintain a more consistent response format across diverse services.
| let accounts_processor = ProcessorsWrapper::build( | ||
| cancellation_token.child_token(), | ||
| metrics.clone(), | ||
| postgre_client, | ||
| rpc_client.clone(), | ||
| wellknown_fungible_accounts, | ||
| ) | ||
| .await | ||
| .unwrap(); |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Consider graceful error handling rather than unwrapping.
Using .unwrap() here may cause the entire application to terminate if ProcessorsWrapper::build() fails. It also makes it harder to diagnose precise errors. Prefer handling this error gracefully or propagating it with ?, accompanied by proper logging and/or fallback behavior.
-.await
-.unwrap();
+.await
+?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let accounts_processor = ProcessorsWrapper::build( | |
| cancellation_token.child_token(), | |
| metrics.clone(), | |
| postgre_client, | |
| rpc_client.clone(), | |
| wellknown_fungible_accounts, | |
| ) | |
| .await | |
| .unwrap(); | |
| let accounts_processor = ProcessorsWrapper::build( | |
| cancellation_token.child_token(), | |
| metrics.clone(), | |
| postgre_client, | |
| rpc_client.clone(), | |
| wellknown_fungible_accounts, | |
| ) | |
| .await?; |
| let cancellation_token = CancellationToken::new(); | ||
| let metrics = setup.metrics.ingester_metrics.clone(); | ||
| let rocks_storage = setup.rocks_db.clone(); | ||
| let postgre_client = setup.db.clone(); | ||
| let cloned_rpc_client = setup.client.clone(); | ||
| let well_known_fungible_accounts = HashMap::new(); | ||
| let port = 3333; | ||
| usecase::executor::spawn({ | ||
| let cancellation_token = cancellation_token.child_token(); | ||
| async move { | ||
| nft_ingester::management_api::start_management_api( | ||
| cancellation_token, | ||
| metrics, | ||
| rocks_storage, | ||
| postgre_client, | ||
| cloned_rpc_client, | ||
| well_known_fungible_accounts, | ||
| port, | ||
| ) | ||
| .await | ||
| } | ||
| }); |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Consider using a dynamic port or checking port availability before binding.
Using a fixed port (3333) in a test environment can create port conflicts when running multiple tests in parallel or when another service uses the same port. Switching to an ephemeral port or verifying port availability can reduce test flakiness.
| .await; | ||
| assert!(response.is_ok()); | ||
| let asset_response = setup | ||
| .das_api | ||
| .get_asset(GetAsset { id: account.to_string(), options: Default::default() }) | ||
| .await; | ||
| assert!(asset_response.is_ok()); | ||
| let asset = asset_response.unwrap(); | ||
| let asset = | ||
| serde_json::from_value::<nft_ingester::api::dapi::rpc_asset_models::Asset>(asset).unwrap(); | ||
| // assert the owner is present in the response | ||
| assert_ne!(asset.ownership.owner, "".to_string()); |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Add assertions to verify the fix operation outcome.
Currently, the test only asserts that the request is successful and that the asset ownership is populated. For better coverage, verify whether the API returns the right fix status and whether no failures were reported in the response.
| // Process accounts in chunks to avoid RPC limits | ||
| for chunk in pubkeys.chunks(1000) { | ||
| match self.rpc_client.get_multiple_accounts_with_config(chunk, config.clone()).await { | ||
| Ok(Response { value: accounts, context: ctx }) => { | ||
| for (i, account_opt) in accounts.iter().enumerate() { | ||
| if let Some(account) = account_opt { | ||
| let pubkey = chunk[i]; | ||
|
|
||
| let account_info = plerkle::AccountInfo { | ||
| slot: ctx.slot, | ||
| pubkey, | ||
| lamports: account.lamports, | ||
| owner: account.owner, | ||
| executable: account.executable, | ||
| rent_epoch: account.rent_epoch, | ||
| data: account.data.clone(), | ||
| write_version: 0, | ||
| }; | ||
|
|
||
| // Transform RPC account into UnprocessedAccountMessage | ||
|
|
||
| let unprocessed_account_result = | ||
| self.message_parser.parse_account_info(&account_info); | ||
|
|
||
| match unprocessed_account_result { | ||
| Ok(unprocessed_accounts) => { | ||
| let unprocessed_accounts = unprocessed_accounts | ||
| .into_iter() | ||
| .map(|unprocessed_account| UnprocessedAccountMessage { | ||
| account: unprocessed_account.unprocessed_account, | ||
| key: unprocessed_account.pubkey, | ||
| id: String::new(), | ||
| }); | ||
| for unprocessed_account in unprocessed_accounts { | ||
| // Process the account through processors wrapper | ||
| if let Err(e) = | ||
| self.accounts_processor.process_account_by_type( | ||
| &mut batch_storage, | ||
| &unprocessed_account, | ||
| &mut core_fees, | ||
| ) | ||
| { | ||
| error!("Error processing account {}: {}", pubkey, e); | ||
| failed_pubkeys.push(pubkey); | ||
| continue; | ||
| } | ||
|
|
||
| fixed_count += 1; | ||
| debug!("Successfully processed account {}", pubkey); | ||
| } | ||
| }, | ||
| Err(e) => { | ||
| error!("Error transforming account {}: {}", pubkey, e); | ||
| failed_pubkeys.push(pubkey); | ||
| }, | ||
| } | ||
| } else { | ||
| // Account not found | ||
| let pubkey = chunk[i]; | ||
| error!("Account {} not found", pubkey); | ||
| failed_pubkeys.push(pubkey); | ||
| } | ||
| } | ||
| }, | ||
| Err(e) => { | ||
| error!("RPC error fetching accounts: {}", e); | ||
| // Add all pubkeys in this chunk to failed list | ||
| failed_pubkeys.extend_from_slice(chunk); | ||
| }, | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Chunked RPC Fetch and Account Processing
Overall logic flow:
- Chunking protects against RPC limits.
- Each account is parsed via
MessageParserand processed byaccounts_processor. - Errors are logged, and failed keys are collected.
Possible Enhancements:
- Retries: If an RPC error or intermittent network issue occurs, consider a small retry mechanism before declaring an entire chunk as failed.
- Parallelization: If desired, you could process each chunk concurrently for performance gains, ensuring thread-safe usage of shared state.
| /// Result of the fix operation | ||
| pub struct FixResult { | ||
| pub fixed_count: usize, | ||
| pub failed_pubkeys: Vec<Pubkey>, | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Result Structure
FixResult is straightforward and conveys the final status effectively. Adding a timestamp or more diagnostic information could be helpful for logging or later analysis, but this is optional.
| /// Errors that can occur during the fix operation | ||
| #[derive(thiserror::Error, Debug)] | ||
| pub enum FixError { | ||
| #[error("RPC error: {0}")] | ||
| Rpc(#[from] ClientError), | ||
|
|
||
| #[error("Storage error: {0}")] | ||
| Storage(#[from] StorageError), | ||
|
|
||
| #[error("Other error: {0}")] | ||
| Other(String), | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Error Handling
FixError accommodates RPC, storage, or generic errors. This is a clean and extensible design. If future expansion is anticipated, consider more granular error variants for correctness or to guide specific remediation steps.
| _ = interval.tick() => { | ||
| self.flush(&mut batch_storage, &mut ack_ids, &mut interval, &mut batch_fill_instant, &source); | ||
| self.core_fees_processor.store_mpl_assets_fee(&std::mem::take(&mut core_fees)).await; | ||
| self.processors_wrapper.core_fees_processor.store_mpl_assets_fee(&std::mem::take(&mut core_fees)).await; |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Storing Fees During Regular Interval Flush
Switching from a direct processor call to self.processors_wrapper.core_fees_processor is well-structured. Ensure this call remains lightweight if it happens frequently (e.g., large loads).
| self.processors_wrapper | ||
| .core_fees_processor | ||
| .store_mpl_assets_fee(&std::mem::take(core_fees)) | ||
| .await; |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Batch Threshold for Fees
Flushing fees once core_fees.len() exceeds fees_batch_size ensures you don’t hold large amounts in memory. If large spikes of accounts are possible, consider adopting a more dynamic threshold or time-based flush to prevent big data bursts.
| usecase::executor::spawn({ | ||
| let cancellation_token = cancellation_token.child_token(); | ||
| async move { | ||
| nft_ingester::management_api::start_management_api( | ||
| cancellation_token, | ||
| metrics, | ||
| rocks_storage, | ||
| postgre_client, | ||
| cloned_rpc_client, | ||
| well_known_fungible_accounts, | ||
| port, | ||
| ) | ||
| .await | ||
| } | ||
| }); |
There was a problem hiding this comment.
i would advise against using our static executor (joinset) here. you're not performing graceful shutdown in tests anyways, so tokio::task::spawn is a better alternative here.
| if let Some(port) = args.management_api_port { | ||
| info!("Starting Management API..."); | ||
| let cloned_rocks_storage = primary_rocks_storage.clone(); | ||
| let cloned_rpc_client = rpc_client.clone(); | ||
| let metrics = metrics_state.ingester_metrics.clone(); | ||
| let postgre_client = index_pg_storage.clone(); | ||
| let well_known_fungible_accounts = well_known_fungible_accounts.clone(); | ||
| usecase::executor::spawn({ | ||
| let cancellation_token = cancellation_token.child_token(); | ||
| async move { | ||
| nft_ingester::management_api::start_management_api( | ||
| cancellation_token, | ||
| metrics, | ||
| cloned_rocks_storage, | ||
| postgre_client, | ||
| cloned_rpc_client, | ||
| well_known_fungible_accounts, | ||
| port, | ||
| ) | ||
| .await | ||
| } | ||
| }); | ||
| } |
There was a problem hiding this comment.
nit: no need for renames if these clones are moved into the block inside of spawn (L488)
| let asset_fixer = | ||
| Arc::new(AssetFixer::new(rocks_storage, rpc_client, accounts_processor, metrics, 10000)); |
There was a problem hiding this comment.
agree, pls make it a constant, can be either somewhere in this file or nft_ingester/src/consts.rs
feat(management-api): add management API with account parsing capabilities
This change introduces a new management API that allows fixing account-based
assets by retrieving them from RPC and processing them through the existing
AccountsProcessor infrastructure. The API includes a health check endpoint
and a fix-account-assets endpoint that accepts a list of account IDs to fix.
The implementation includes:
code organization and reusability
Closes MTG-1477