-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathdata_sources.rs
More file actions
187 lines (171 loc) · 6.16 KB
/
data_sources.rs
File metadata and controls
187 lines (171 loc) · 6.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
use authority_selection_inherents::AuthoritySelectionDataSource;
use pallet_sidechain_rpc::SidechainRpcDataSource;
use partner_chains_data_source_metrics::McFollowerMetrics;
use partner_chains_demo_runtime::AccountId;
use sc_service::error::Error as ServiceError;
use sidechain_mc_hash::McHashDataSource;
use sp_block_participation::inherent_data::BlockParticipationDataSource;
use sp_governed_map::GovernedMapDataSource;
use sp_partner_chains_bridge::TokenBridgeDataSource;
use std::{error::Error, sync::Arc};
pub const DATA_SOURCE_VAR: &str = "CARDANO_DATA_SOURCE";
#[derive(Clone, Debug, PartialEq)]
pub enum DataSourceType {
DbSync,
Mock,
Dolos,
}
impl DataSourceType {
pub fn from_env() -> Result<Self, Box<dyn Error + Send + Sync + 'static>> {
let env_value =
std::env::var(DATA_SOURCE_VAR).map_err(|_| format!("{DATA_SOURCE_VAR} is not set"))?;
env_value.parse().map_err(|err: String| err.into())
}
}
impl std::str::FromStr for DataSourceType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"db-sync" => Ok(DataSourceType::DbSync),
"mock" => Ok(DataSourceType::Mock),
"dolos" => Ok(DataSourceType::Dolos),
_ => {
Err(format!("Invalid data source type: {}. Valid options: db-sync, mock, dolos", s))
},
}
}
}
impl std::fmt::Display for DataSourceType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DataSourceType::DbSync => write!(f, "db-sync"),
DataSourceType::Mock => write!(f, "mock"),
DataSourceType::Dolos => write!(f, "dolos"),
}
}
}
#[derive(Clone)]
pub struct DataSources {
pub mc_hash: Arc<dyn McHashDataSource + Send + Sync>,
pub authority_selection: Arc<dyn AuthoritySelectionDataSource + Send + Sync>,
pub sidechain_rpc: Arc<dyn SidechainRpcDataSource + Send + Sync>,
pub block_participation: Arc<dyn BlockParticipationDataSource + Send + Sync>,
pub governed_map: Arc<dyn GovernedMapDataSource + Send + Sync>,
pub bridge: Arc<dyn TokenBridgeDataSource<AccountId> + Send + Sync>,
}
pub(crate) async fn create_cached_data_sources(
metrics_opt: Option<McFollowerMetrics>,
) -> std::result::Result<DataSources, ServiceError> {
let data_source_type = DataSourceType::from_env()
.map_err(|err| ServiceError::Application(err.to_string().into()))?;
match data_source_type {
DataSourceType::DbSync => {
create_cached_db_sync_data_sources(metrics_opt).await.map_err(|err| {
ServiceError::Application(
format!("Failed to create db-sync data sources: {err}").into(),
)
})
},
DataSourceType::Mock => create_mock_data_sources().map_err(|err| {
ServiceError::Application(format!("Failed to create mock data sources: {err}").into())
}),
DataSourceType::Dolos => create_dolos_data_sources(metrics_opt).await.map_err(|err| {
ServiceError::Application(format!("Failed to create dolos data sources: {err}").into())
}),
}
}
pub fn create_mock_data_sources()
-> std::result::Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
use partner_chains_mock_data_sources::*;
let block = Arc::new(BlockDataSourceMock::new_from_env()?);
Ok(DataSources {
sidechain_rpc: Arc::new(SidechainRpcDataSourceMock::new(block.clone())),
mc_hash: Arc::new(McHashDataSourceMock::new(block)),
authority_selection: Arc::new(AuthoritySelectionDataSourceMock::new_from_env()?),
block_participation: Arc::new(StakeDistributionDataSourceMock::new()),
governed_map: Arc::new(GovernedMapDataSourceMock::default()),
bridge: Arc::new(TokenBridgeDataSourceMock::new()),
})
}
// TODO Currently uses db-sync for unimplemented Dolos data sources
pub async fn create_dolos_data_sources(
_metrics_opt: Option<McFollowerMetrics>,
) -> std::result::Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
let dolos_client = partner_chains_dolos_data_sources::get_connection_from_env()?;
let block_dolos = Arc::new(
partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(dolos_client.clone())
.await?,
);
Ok(DataSources {
sidechain_rpc: Arc::new(
partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new(
dolos_client.clone(),
),
),
mc_hash: Arc::new(partner_chains_dolos_data_sources::McHashDataSourceImpl::new(
block_dolos.clone(),
)),
authority_selection: Arc::new(
partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl::new(
dolos_client.clone(),
),
),
block_participation: Arc::new(
partner_chains_dolos_data_sources::StakeDistributionDataSourceImpl::new(
dolos_client.clone(),
),
),
governed_map: Arc::new(
partner_chains_dolos_data_sources::GovernedMapDataSourceImpl::new(
dolos_client.clone(),
),
),
bridge: Arc::new(partner_chains_dolos_data_sources::TokenBridgeDataSourceImpl::new(
dolos_client,
)),
})
}
pub const CANDIDATES_FOR_EPOCH_CACHE_SIZE: usize = 64;
pub const STAKE_CACHE_SIZE: usize = 100;
pub const GOVERNED_MAP_CACHE_SIZE: u16 = 100;
pub const BRIDGE_TRANSFER_CACHE_LOOKAHEAD: u32 = 1000;
pub async fn create_cached_db_sync_data_sources(
metrics_opt: Option<McFollowerMetrics>,
) -> Result<DataSources, Box<dyn Error + Send + Sync + 'static>> {
use partner_chains_db_sync_data_sources::*;
let pool = partner_chains_db_sync_data_sources::get_connection_from_env().await?;
// block data source is reused between mc_hash and sidechain_rpc to share cache
let block = Arc::new(BlockDataSourceImpl::new_from_env(pool.clone()).await?);
Ok(DataSources {
sidechain_rpc: Arc::new(SidechainRpcDataSourceImpl::new(
block.clone(),
metrics_opt.clone(),
)),
mc_hash: Arc::new(McHashDataSourceImpl::new(block.clone(), metrics_opt.clone())),
authority_selection: Arc::new(
CandidatesDataSourceImpl::new(pool.clone(), metrics_opt.clone())
.await?
.cached(CANDIDATES_FOR_EPOCH_CACHE_SIZE)?,
),
block_participation: Arc::new(StakeDistributionDataSourceImpl::new(
pool.clone(),
metrics_opt.clone(),
STAKE_CACHE_SIZE,
)),
governed_map: Arc::new(
GovernedMapDataSourceCachedImpl::new(
pool.clone(),
metrics_opt.clone(),
GOVERNED_MAP_CACHE_SIZE,
block.clone(),
)
.await?,
),
bridge: Arc::new(CachedTokenBridgeDataSourceImpl::new(
pool,
metrics_opt,
block,
BRIDGE_TRANSFER_CACHE_LOOKAHEAD,
)),
})
}