Skip to content

Commit eadfef4

Browse files
authored
migrate to fastnear (#358)
* migrate to fastnear * imrovement * clear deadcode * improvement according github comments * reffactoring and improvement * fmt * clippy * add requests_blocks_counters metrics * revert deeted changes for near state indexer
1 parent bb9b352 commit eadfef4

File tree

14 files changed

+272
-426
lines changed

14 files changed

+272
-426
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased](https://github.com/near/read-rpc/compare/main...develop)
99

10+
### What's Changed
11+
* Migrate from lake data to fastnear data
12+
* Add metrics to calculate the number of blocks which fetched from the cache and fastnear
13+
1014
## [0.3.3](https://github.com/near/read-rpc/releases/tag/v0.3.3)
1115

1216
### Supported Nearcore Version

Cargo.lock

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

configuration/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ license.workspace = true
99

1010
[dependencies]
1111
anyhow = "1.0.70"
12-
aws-credential-types = "1.1.4"
13-
aws-sdk-s3 = { version = "1.14.0", features = ["behavior-version-latest"] }
14-
aws-types = "1.1.4"
1512
dotenv = "0.15.0"
1613
google-cloud-storage = "0.23.0"
1714
lazy_static = "1.4.0"

configuration/src/configs/lake.rs

Lines changed: 26 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,55 @@
1-
use aws_sdk_s3::config::StalledStreamProtectionConfig;
1+
use crate::configs::deserialize_optional_data_or_env;
22
use near_lake_framework::near_indexer_primitives::near_primitives;
33
use serde_derive::Deserialize;
44

5-
use crate::configs::{deserialize_optional_data_or_env, required_value_or_panic};
6-
75
#[derive(Debug, Clone)]
86
pub struct LakeConfig {
9-
pub aws_access_key_id: String,
10-
pub aws_secret_access_key: String,
11-
pub aws_default_region: String,
12-
pub aws_bucket_name: String,
7+
pub num_threads: Option<u64>,
138
}
149

1510
impl LakeConfig {
16-
pub async fn s3_config(&self) -> aws_sdk_s3::Config {
17-
let credentials = aws_credential_types::Credentials::new(
18-
&self.aws_access_key_id,
19-
&self.aws_secret_access_key,
20-
None,
21-
None,
22-
"",
23-
);
24-
aws_sdk_s3::Config::builder()
25-
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
26-
.credentials_provider(credentials)
27-
.region(aws_types::region::Region::new(
28-
self.aws_default_region.clone(),
29-
))
30-
.build()
31-
}
32-
3311
pub async fn lake_config(
3412
&self,
3513
start_block_height: near_primitives::types::BlockHeight,
36-
) -> anyhow::Result<near_lake_framework::LakeConfig> {
37-
let config_builder = near_lake_framework::LakeConfigBuilder::default();
14+
chain_id: crate::ChainId,
15+
) -> anyhow::Result<near_lake_framework::FastNearConfig> {
16+
let mut config_builder = near_lake_framework::FastNearConfigBuilder::default();
17+
match chain_id {
18+
crate::ChainId::Mainnet => config_builder = config_builder.mainnet(),
19+
// Testnet is the default chain for other chain_id
20+
_ => config_builder = config_builder.testnet(),
21+
};
22+
if let Some(num_threads) = self.num_threads {
23+
config_builder = config_builder.num_threads(num_threads);
24+
};
3825
Ok(config_builder
39-
.s3_config(self.s3_config().await)
40-
.s3_region_name(&self.aws_default_region)
41-
.s3_bucket_name(&self.aws_bucket_name)
4226
.start_block_height(start_block_height)
43-
.build()
44-
.expect("Failed to build LakeConfig"))
27+
.build()?)
4528
}
4629

47-
pub async fn lake_s3_client(&self) -> near_lake_framework::LakeS3Client {
48-
let s3_config = self.s3_config().await;
49-
near_lake_framework::LakeS3Client::new(aws_sdk_s3::Client::from_conf(s3_config))
30+
pub async fn lake_client(
31+
&self,
32+
chain_id: crate::ChainId,
33+
) -> anyhow::Result<near_lake_framework::FastNearClient> {
34+
let fast_near_endpoint = match chain_id {
35+
crate::ChainId::Mainnet => String::from("https://mainnet.neardata.xyz"),
36+
// Testnet is the default chain for other chain_id
37+
_ => String::from("https://testnet.neardata.xyz"),
38+
};
39+
Ok(near_lake_framework::FastNearClient::new(fast_near_endpoint))
5040
}
5141
}
5242

5343
#[derive(Deserialize, Debug, Clone, Default)]
5444
pub struct CommonLakeConfig {
5545
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
56-
pub aws_access_key_id: Option<String>,
57-
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
58-
pub aws_secret_access_key: Option<String>,
59-
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
60-
pub aws_default_region: Option<String>,
61-
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
62-
pub aws_bucket_name: Option<String>,
46+
pub num_threads: Option<u64>,
6347
}
6448

6549
impl From<CommonLakeConfig> for LakeConfig {
6650
fn from(common_config: CommonLakeConfig) -> Self {
6751
Self {
68-
aws_access_key_id: required_value_or_panic(
69-
"aws_access_key_id",
70-
common_config.aws_access_key_id,
71-
),
72-
aws_secret_access_key: required_value_or_panic(
73-
"aws_secret_access_key",
74-
common_config.aws_secret_access_key,
75-
),
76-
aws_default_region: required_value_or_panic(
77-
"aws_default_region",
78-
common_config.aws_default_region,
79-
),
80-
aws_bucket_name: required_value_or_panic(
81-
"aws_bucket_name",
82-
common_config.aws_bucket_name,
83-
),
52+
num_threads: common_config.num_threads,
8453
}
8554
}
8655
}

configuration/src/default_env_configs.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,18 +120,9 @@ tracked_changes = "${TRACKED_CHANGES}"
120120
121121
### Lake framework configuration
122122
[lake_config]
123-
124-
## Lake framework AWS access key id
125-
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
126-
127-
## Lake framework AWS secret access key
128-
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"
129-
130-
## Lake framework AWS default region
131-
aws_default_region = "${AWS_DEFAULT_REGION}"
132-
133-
## Lake framework bucket name
134-
aws_bucket_name = "${AWS_BUCKET_NAME}"
123+
# Number of threads to use for fetching data from fatnear
124+
# Default: 2 * available threads
125+
#num_threads = 8
135126
136127
## Transaction details are stored in the Google Cloud Storage
137128
[tx_details_storage]

rpc-server/src/config.rs

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use futures::executor::block_on;
44
use near_primitives::epoch_manager::{AllEpochConfig, EpochConfig};
55

66
use crate::modules::blocks::{BlocksInfoByFinality, CacheBlock};
7+
use crate::utils;
78

89
static NEARD_VERSION: &str = env!("CARGO_PKG_VERSION");
910
static NEARD_BUILD: &str = env!("BUILD_VERSION");
@@ -20,8 +21,7 @@ pub struct GenesisInfo {
2021
impl GenesisInfo {
2122
pub async fn get(
2223
near_rpc_client: &crate::utils::JsonRpcClient,
23-
s3_client: &near_lake_framework::LakeS3Client,
24-
s3_bucket_name: &str,
24+
fastnear_client: &near_lake_framework::FastNearClient,
2525
) -> Self {
2626
tracing::info!("Get genesis config...");
2727
let genesis_config = near_rpc_client
@@ -32,25 +32,20 @@ impl GenesisInfo {
3232
.await
3333
.expect("Error to get genesis config");
3434

35-
let genesis_block = near_lake_framework::s3::fetchers::fetch_block(
36-
s3_client,
37-
s3_bucket_name,
38-
genesis_config.genesis_height,
39-
)
40-
.await
41-
.expect("Error to get genesis block");
35+
let genesis_block =
36+
near_lake_framework::fastnear::fetchers::fetch_first_block(fastnear_client).await;
4237

4338
Self {
4439
genesis_config,
45-
genesis_block_cache: CacheBlock::from(&genesis_block),
40+
genesis_block_cache: CacheBlock::from(&genesis_block.block),
4641
}
4742
}
4843
}
4944

5045
#[derive(Clone)]
5146
pub struct ServerContext {
52-
/// Lake s3 client
53-
pub s3_client: near_lake_framework::LakeS3Client,
47+
/// Fastnear client
48+
pub fastnear_client: near_lake_framework::FastNearClient,
5449
/// Database manager
5550
pub db_manager: std::sync::Arc<Box<dyn database::ReaderDbManager + Sync + Send + 'static>>,
5651
/// TransactionDetails storage
@@ -61,8 +56,6 @@ pub struct ServerContext {
6156
pub genesis_info: GenesisInfo,
6257
/// Near rpc client
6358
pub near_rpc_client: crate::utils::JsonRpcClient,
64-
/// AWS s3 lake bucket name
65-
pub s3_bucket_name: String,
6659
/// Blocks cache
6760
pub blocks_cache: std::sync::Arc<crate::cache::RwLockLruMemoryCache<u64, CacheBlock>>,
6861
/// Final block info include final_block_cache and current_validators_info
@@ -89,27 +82,36 @@ pub struct ServerContext {
8982
}
9083

9184
impl ServerContext {
92-
pub async fn init(
93-
rpc_server_config: configuration::RpcServerConfig,
94-
near_rpc_client: crate::utils::JsonRpcClient,
95-
) -> anyhow::Result<Self> {
85+
pub async fn init(rpc_server_config: configuration::RpcServerConfig) -> anyhow::Result<Self> {
9686
let contract_code_cache_size_in_bytes =
97-
crate::utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size)
98-
.await;
87+
utils::gigabytes_to_bytes(rpc_server_config.general.contract_code_cache_size).await;
9988
let contract_code_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new(
10089
contract_code_cache_size_in_bytes,
10190
));
10291

10392
let block_cache_size_in_bytes =
104-
crate::utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await;
93+
utils::gigabytes_to_bytes(rpc_server_config.general.block_cache_size).await;
10594
let blocks_cache = std::sync::Arc::new(crate::cache::RwLockLruMemoryCache::new(
10695
block_cache_size_in_bytes,
10796
));
108-
109-
let blocks_info_by_finality =
110-
std::sync::Arc::new(BlocksInfoByFinality::new(&near_rpc_client, &blocks_cache).await);
111-
112-
let s3_client = rpc_server_config.lake_config.lake_s3_client().await;
97+
let near_rpc_client = utils::JsonRpcClient::new(
98+
rpc_server_config.general.near_rpc_url.clone(),
99+
rpc_server_config.general.near_archival_rpc_url.clone(),
100+
);
101+
// We want to set a custom referer to let NEAR JSON RPC nodes know that we are a read-rpc instance
102+
let near_rpc_client = near_rpc_client.header(
103+
"Referer".to_string(),
104+
rpc_server_config.general.referer_header_value.clone(),
105+
)?;
106+
107+
let fastnear_client = rpc_server_config
108+
.lake_config
109+
.lake_client(rpc_server_config.general.chain_id)
110+
.await?;
111+
112+
let blocks_info_by_finality = std::sync::Arc::new(
113+
BlocksInfoByFinality::new(&near_rpc_client, &fastnear_client).await,
114+
);
113115

114116
let tx_details_storage = tx_details_storage::TxDetailsStorage::new(
115117
rpc_server_config.tx_details_storage.storage_client().await,
@@ -124,12 +126,7 @@ impl ServerContext {
124126
})
125127
.ok();
126128

127-
let genesis_info = GenesisInfo::get(
128-
&near_rpc_client,
129-
&s3_client,
130-
&rpc_server_config.lake_config.aws_bucket_name,
131-
)
132-
.await;
129+
let genesis_info = GenesisInfo::get(&near_rpc_client, &fastnear_client).await;
133130

134131
let default_epoch_config = EpochConfig::from(&genesis_info.genesis_config);
135132
let all_epoch_config = AllEpochConfig::new(
@@ -159,13 +156,12 @@ impl ServerContext {
159156
.inc();
160157

161158
Ok(Self {
162-
s3_client,
159+
fastnear_client,
163160
db_manager: std::sync::Arc::new(Box::new(db_manager)),
164161
tx_details_storage: std::sync::Arc::new(tx_details_storage),
165162
tx_cache_storage,
166163
genesis_info,
167164
near_rpc_client,
168-
s3_bucket_name: rpc_server_config.lake_config.aws_bucket_name.clone(),
169165
blocks_cache,
170166
blocks_info_by_finality,
171167
compiled_contract_code_cache,

0 commit comments

Comments
 (0)