-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathstart.rs
More file actions
323 lines (276 loc) · 10.7 KB
/
start.rs
File metadata and controls
323 lines (276 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
//! Bothan CLI start subcommand module.
//!
//! Start the Bothan API server with various configuration options.
use std::collections::HashMap;
use std::fs::{File, create_dir_all, read_to_string, write};
use std::io::BufReader;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, bail};
use bothan_api::api::BothanServer;
use bothan_api::config::AppConfig;
use bothan_api::config::ipfs::IpfsAuthentication;
use bothan_api::config::manager::crypto_info::sources::CryptoSourceConfigs;
use bothan_api::config::manager::forex_info::sources::ForexSourceConfigs;
use bothan_api::proto::bothan::v1::{BothanServiceServer, FILE_DESCRIPTOR_SET};
use bothan_api::{REGISTRY_REQUIREMENT, VERSION};
use bothan_core::ipfs::{IpfsClient, IpfsClientBuilder};
use bothan_core::manager::AssetInfoManager;
use bothan_core::manager::asset_info::AssetWorkerOpts;
use bothan_core::monitoring::{Client as MonitoringClient, Signer};
use bothan_core::store::rocksdb::RocksDbStore;
use bothan_core::telemetry;
use bothan_lib::metrics::server::Metrics;
use bothan_lib::registry::{Registry, Valid};
use bothan_lib::store::Store;
use bothan_lib::worker::error::AssetWorkerError;
use clap::Parser;
use reqwest::header::{HeaderName, HeaderValue};
use semver::{Version, VersionReq};
use tonic::transport::Server;
use tonic_reflection::server::Builder as ReflectionBuilder;
use tracing::{debug, error, info};
#[derive(Parser)]
/// CLI arguments for the `start` command.
pub struct StartCli {
/// The configuration file to use with bothan
#[arg(long)]
config: Option<PathBuf>,
/// A flag to choose whether to start bothan as a fresh instance or not
#[arg(short, long)]
unsafe_reset: bool,
/// Flag to turn on dev mode
#[arg(short, long)]
dev: bool,
/// An optional initial registry state to use on startup to be used for testing purposes only.
#[arg(long, requires = "dev")]
registry: Option<PathBuf>,
}
impl StartCli {
/// Runs the start command.
pub async fn run(&self, app_config: AppConfig) -> anyhow::Result<()> {
init_telemetry_server(&app_config).await?;
let registry = match &self.registry {
Some(p) => {
let file = File::open(p).with_context(|| "Failed to open registry file")?;
let reader = BufReader::new(file);
let registry =
serde_json::from_reader(reader).with_context(|| "Failed to parse registry")?;
Some(registry)
}
None => None,
};
let store = init_rocks_db_store(&app_config, registry, self.unsafe_reset).await?;
let ipfs_client = init_ipfs_client(&app_config).await?;
let monitoring_client = init_monitoring_client(&app_config).await?;
let bothan_server =
init_bothan_server(&app_config, store, ipfs_client, monitoring_client).await?;
let reflection_service = ReflectionBuilder::configure()
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
.build_v1alpha()
.with_context(|| "Failed to build reflection service")?;
info!("server started at {:?}", app_config.grpc.addr);
Server::builder()
.add_service(reflection_service)
.add_service(BothanServiceServer::from_arc(bothan_server))
.serve(app_config.grpc.addr)
.await?;
Ok(())
}
}
async fn init_rocks_db_store(
config: &AppConfig,
registry: Option<Registry<Valid>>,
reset: bool,
) -> anyhow::Result<RocksDbStore> {
let flush_path = &config.store.path;
let store = match (reset, flush_path.is_dir()) {
// If reset is true and the path is a directory, remove the directory and create a new store
(true, true) => {
let db = RocksDbStore::new(flush_path)?;
debug!("store reset successfully at {:?}", &flush_path);
db
}
// If no reset, load the store
(false, true) => {
let db = RocksDbStore::load(flush_path)?;
debug!("store loaded successfully at {:?}", &flush_path);
db
}
// If the path does not exist, create the directory and create a new store
(_, false) => {
create_dir_all(flush_path).with_context(|| "Failed to create home directory")?;
let db = RocksDbStore::new(flush_path)?;
debug!("store created successfully at {:?}", &flush_path);
db
}
};
// If a registry is provided, overwrite the registry
if let Some(registry) = registry {
store.set_registry(registry, "".to_string()).await?;
}
Ok(store)
}
async fn init_signer(config: &AppConfig) -> anyhow::Result<Signer> {
if config.monitoring.path.is_file() {
let pk = read_to_string(&config.monitoring.path)
.with_context(|| "Failed to read monitoring key file")?;
Signer::from_hex(&pk).with_context(|| "Failed to parse monitoring key file")
} else {
let signer = Signer::random();
if let Some(parent) = config.monitoring.path.parent() {
create_dir_all(parent).with_context(|| "Failed to create monitoring key directory")?;
}
write(&config.monitoring.path, signer.to_hex().as_bytes())
.with_context(|| "Failed to write monitoring key file")?;
Ok(signer)
}
}
async fn init_monitoring_client(
config: &AppConfig,
) -> anyhow::Result<Option<Arc<MonitoringClient>>> {
if !config.monitoring.enabled {
return Ok(None);
}
let signer = init_signer(config)
.await
.with_context(|| "Failed to build signer")?;
let monitoring = Arc::new(MonitoringClient::new(&config.monitoring.endpoint, signer));
Ok(Some(monitoring))
}
async fn init_ipfs_client(config: &AppConfig) -> anyhow::Result<IpfsClient> {
let ipfs_builder = IpfsClientBuilder::new(&config.ipfs.endpoint);
let ipfs_client = match &config.ipfs.authentication {
IpfsAuthentication::Header { key, value } => {
let header_name = HeaderName::from_str(key)?;
let header_value = HeaderValue::from_str(value)?;
ipfs_builder
.with_header(header_name, header_value)
.build()?
}
IpfsAuthentication::None => ipfs_builder.build()?,
};
Ok(ipfs_client)
}
async fn init_bothan_server<S: Store + 'static>(
config: &AppConfig,
store: S,
ipfs_client: IpfsClient,
monitoring_client: Option<Arc<MonitoringClient>>,
) -> anyhow::Result<Arc<BothanServer<S>>> {
let prefix_stale_thresholds = init_prefix_stale_thresholds(
config.manager.crypto.stale_threshold,
config.manager.forex.stale_threshold,
);
let bothan_version =
Version::from_str(VERSION).with_context(|| "Failed to parse bothan version")?;
let registry_version_requirement = VersionReq::from_str(REGISTRY_REQUIREMENT)
.with_context(|| "Failed to parse registry version requirement")?;
let crypto_opts = match init_crypto_opts(&config.manager.crypto.source).await {
Ok(workers) => workers,
Err(e) => {
bail!("failed to initialize workers: {:?}", e);
}
};
let forex_opts = match init_forex_opts(&config.manager.forex.source).await {
Ok(workers) => workers,
Err(e) => {
bail!("failed to initialize workers: {:?}", e);
}
};
let worker_opts = crypto_opts.into_iter().chain(forex_opts).collect();
let manager = match AssetInfoManager::build(
store,
worker_opts,
ipfs_client,
prefix_stale_thresholds,
bothan_version,
registry_version_requirement,
monitoring_client,
)
.await
{
Ok(manager) => manager,
Err(e) => {
bail!("failed to build manager: {:?}", e);
}
};
let manager = Arc::new(manager);
let cloned_manager = manager.clone();
// Only spawn heartbeat if monitoring is enabled
if config.monitoring.enabled {
tokio::spawn(async move {
loop {
// Heartbeat is fixed at 1 minute.
tokio::time::sleep(Duration::from_secs(60)).await;
match cloned_manager.post_heartbeat().await {
Ok(_) => info!("heartbeat sent"),
Err(e) => error!("failed to send heartbeat: {e}"),
}
}
});
}
let metrics = Metrics::new();
Ok(Arc::new(BothanServer::new(manager, metrics)))
}
fn init_prefix_stale_thresholds(
crypto_stale_threshold: i64,
forex_stale_threshold: i64,
) -> HashMap<String, i64> {
HashMap::from([
("CS".to_string(), crypto_stale_threshold),
("FS".to_string(), forex_stale_threshold),
])
}
async fn init_crypto_opts(
source: &CryptoSourceConfigs,
) -> Result<HashMap<String, AssetWorkerOpts>, AssetWorkerError> {
let mut worker_opts = HashMap::new();
add_worker_opts(&mut worker_opts, &source.binance).await?;
add_worker_opts(&mut worker_opts, &source.bitfinex).await?;
add_worker_opts(&mut worker_opts, &source.bybit).await?;
add_worker_opts(&mut worker_opts, &source.coinbase).await?;
add_worker_opts(&mut worker_opts, &source.coingecko).await?;
add_worker_opts(&mut worker_opts, &source.coinmarketcap).await?;
add_worker_opts(&mut worker_opts, &source.htx).await?;
add_worker_opts(&mut worker_opts, &source.kraken).await?;
add_worker_opts(&mut worker_opts, &source.okx).await?;
add_worker_opts(&mut worker_opts, &source.band_kiwi).await?;
add_worker_opts(&mut worker_opts, &source.band_macaw).await?;
Ok(worker_opts)
}
async fn init_forex_opts(
source: &ForexSourceConfigs,
) -> Result<HashMap<String, AssetWorkerOpts>, AssetWorkerError> {
let mut worker_opts = HashMap::new();
add_worker_opts(&mut worker_opts, &source.band_owlet).await?;
add_worker_opts(&mut worker_opts, &source.band_fieldfare).await?;
add_worker_opts(&mut worker_opts, &source.band_xenops).await?;
Ok(worker_opts)
}
async fn add_worker_opts<O: Clone + Into<AssetWorkerOpts>>(
workers_opts: &mut HashMap<String, AssetWorkerOpts>,
opts: &Option<O>,
) -> Result<(), AssetWorkerError> {
if let Some(opts) = opts {
let worker_opts = opts.clone().into();
let worker_name = worker_opts.name();
info!("{} worker is enabled", worker_name);
workers_opts.insert(worker_name.to_string(), worker_opts);
}
Ok(())
}
async fn init_telemetry_server(config: &AppConfig) -> anyhow::Result<()> {
if config.telemetry.enabled {
let registry = telemetry::init_telemetry_registry()?;
let addr = config.telemetry.addr;
tokio::spawn(async move {
telemetry::spawn_server(addr, registry).await;
});
} else {
info!("telemetry disabled");
}
Ok(())
}