Skip to content

Commit efcbb59

Browse files
committed
refactor: simplify code, improve tests
1 parent fa29701 commit efcbb59

File tree

14 files changed

+172
-172
lines changed

14 files changed

+172
-172
lines changed

Diff for: src/core/beacon.rs

+25-87
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,13 @@ use crate::protobuf::drand::ChainInfoPacket;
1212
use crate::protobuf::drand::IdentityResponse;
1313
use crate::store::ChainStore;
1414
use crate::store::NewStore;
15-
use crate::store::StorageConfig;
1615

1716
use tracing::debug;
1817
use tracing::error;
19-
use tracing::info;
2018
use tracing::info_span;
2119
use tracing::Span;
2220

2321
use energon::drand::traits::BeaconDigest;
24-
use std::io::ErrorKind;
2522
use std::sync::Arc;
2623
use tokio::sync::mpsc;
2724
use tokio_util::task::TaskTracker;
@@ -86,68 +83,36 @@ impl BeaconID {
8683
pub struct BeaconProcess<S: Scheme>(Arc<InnerNode<S>>);
8784

8885
impl<S: Scheme> BeaconProcess<S> {
89-
fn from_arc(node: Arc<InnerNode<S>>) -> Self {
90-
Self(node)
91-
}
92-
93-
pub fn tracker(&self) -> &TaskTracker {
94-
&self.tracker
95-
}
96-
97-
pub fn run(fs: FileStore, pair: PairToml, id: &str) -> Result<BeaconHandler, FileStoreError> {
86+
fn new(fs: FileStore, pair: PairToml, id: &str) -> Result<Self, FileStoreError> {
9887
let keypair: Pair<S> = Toml::toml_decode(&pair).ok_or(FileStoreError::TomlError)?;
99-
let (tx, mut rx) = mpsc::channel::<BeaconCmd>(30);
100-
101-
let chain: Option<ChainHandler> = match fs.load_group() {
102-
// TODO: load share, db
103-
Ok(groupfile) => Some(ChainHandler::new(groupfile)),
104-
Err(err) => match err {
105-
// Ignore IO error `NotFound`.
106-
FileStoreError::IO(error) => {
107-
if error.kind() == ErrorKind::NotFound {
108-
info!("beacon id [{id}]: will run as fresh install -> expect to run DKG.");
109-
None
110-
} else {
111-
return Err(FileStoreError::IO(error));
112-
}
113-
}
114-
_ => return Err(err),
115-
},
116-
};
117-
11888
let span = info_span!(
11989
"",
12090
id = format!("{}.{id}", keypair.public_identity().address())
12191
);
12292

123-
let db_path = fs.db_path();
124-
125-
let beacon_node = Self(Arc::new(InnerNode {
93+
let process = Self(Arc::new(InnerNode {
12694
beacon_id: BeaconID::new(id),
12795
fs,
12896
keypair,
12997
tracker: TaskTracker::new(),
13098
span,
13199
}));
132100

133-
let node = Self::from_arc(beacon_node.0.clone());
134-
beacon_node.tracker().spawn(async move {
135-
// Attempt to load database
136-
let mut store = if chain.is_some() {
137-
let store = ChainStore::new(
138-
StorageConfig {
139-
path: Some(db_path.clone()), // TODO: postgres config
140-
..Default::default()
141-
},
142-
S::Beacon::is_chained(),
143-
)
144-
.await
145-
.unwrap();
146-
Some(Arc::new(store))
147-
} else {
148-
None
149-
};
101+
Ok(process)
102+
}
103+
104+
pub fn tracker(&self) -> &TaskTracker {
105+
&self.tracker
106+
}
150107

108+
pub fn run(fs: FileStore, pair: PairToml, id: &str) -> Result<BeaconHandler, FileStoreError> {
109+
let chain_handler = ChainHandler::try_init(&fs, id)?;
110+
let chain_store = Arc::new(ChainStore::new(&fs.db_path(), S::Beacon::is_chained())?);
111+
let node = Self::new(fs, pair, id)?;
112+
let tracker = node.tracker().to_owned();
113+
114+
let (tx, mut rx) = mpsc::channel::<BeaconCmd>(5);
115+
tracker.spawn(async move {
151116
while let Some(cmd) = rx.recv().await {
152117
match cmd {
153118
BeaconCmd::Shutdown(callback) => {
@@ -161,50 +126,23 @@ impl<S: Scheme> BeaconProcess<S> {
161126
error!("failed to send identity responce, {err}")
162127
}
163128
}
164-
BeaconCmd::Sync(callback) => match &store {
165-
Some(store) => {
166-
if callback.send(Ok(Arc::clone(store))).is_err() {
167-
error!("failed to proceed sync request")
168-
}
169-
}
170-
None => {
171-
let new_store = Arc::new(
172-
ChainStore::new(
173-
StorageConfig {
174-
path: Some(db_path.clone()), // TODO: postgres config
175-
..Default::default()
176-
},
177-
S::Beacon::is_chained(),
178-
)
179-
.await
180-
.unwrap(),
181-
);
182-
store = Some(new_store.clone());
183-
184-
if callback.send(Ok(new_store)).is_err() {
185-
error!("failed to send chain_info, receiver is dropped")
186-
}
187-
}
188-
},
189-
BeaconCmd::ChainInfo(callback) => match chain.as_ref() {
190-
Some(chain_handler) => {
191-
if callback.send(Ok(chain_handler.chain_info())).is_err() {
192-
error!("failed to send chain_info, receiver is dropped")
193-
}
129+
BeaconCmd::Sync(callback) => {
130+
if callback.send(Ok(Arc::clone(&chain_store))).is_err() {
131+
error!("failed to proceed sync request")
194132
}
195-
None => {
196-
if callback.send(Err("no dkg group setup yet")).is_err() {
197-
error!("failed to send chain_info, receiver is dropped")
198-
}
133+
}
134+
BeaconCmd::ChainInfo(callback) => {
135+
if callback.send(Ok(chain_handler.chain_info())).is_err() {
136+
error!("failed to send chain_info, receiver is dropped")
199137
}
200-
},
138+
}
201139
}
202140
}
203141
});
204142

205143
Ok(BeaconHandler {
206144
beacon_id: BeaconID::new(id),
207-
tracker: beacon_node.tracker().clone(),
145+
tracker,
208146
tx,
209147
})
210148
}

Diff for: src/core/chain.rs

+36-6
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,48 @@
1+
use crate::key::store::FileStore;
2+
use crate::key::store::FileStoreError;
13
use crate::protobuf::drand::ChainInfoPacket;
4+
use crate::protobuf::drand::Metadata;
25
use crate::transport::drand::GroupPacket;
6+
use std::io::ErrorKind;
7+
use tracing::info;
38

49
pub struct ChainHandler {
5-
groupfile: Box<GroupPacket>,
10+
group: Box<GroupPacket>,
611
}
712

813
impl ChainHandler {
9-
pub fn new(groupfile: GroupPacket) -> Self {
10-
Self {
11-
groupfile: groupfile.into(),
12-
}
14+
/// Attempt to initialize ChainHandler.
15+
#[allow(clippy::field_reassign_with_default)]
16+
pub fn try_init(fs: &FileStore, beacon_id: &str) -> Result<Self, FileStoreError> {
17+
let handler = match fs.load_group() {
18+
// to unblock early tests
19+
Ok(group) => Self {
20+
group: group.into(),
21+
},
22+
Err(err) => match err {
23+
FileStoreError::IO(error) => {
24+
// NotFound error is expected for groupfile loading and means that node is fresh.
25+
if error.kind() == ErrorKind::NotFound {
26+
info!(
27+
"beacon id [{beacon_id}]: will run as fresh install -> expect to run DKG.");
28+
let mut group = GroupPacket::default();
29+
30+
group.metadata = Metadata::mimic_version(beacon_id, &[]);
31+
Self {
32+
group: group.into(),
33+
}
34+
} else {
35+
return Err(FileStoreError::IO(error));
36+
}
37+
}
38+
_ => return Err(err),
39+
},
40+
};
41+
42+
Ok(handler)
1343
}
1444

1545
pub fn chain_info(&self) -> ChainInfoPacket {
16-
self.groupfile.get_chain_info()
46+
self.group.get_chain_info()
1747
}
1848
}

Diff for: src/core/daemon.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,16 @@ pub struct Daemon {
4444
}
4545

4646
impl Daemon {
47-
pub async fn new(c: &Config) -> Result<Arc<Self>, DaemonError> {
47+
pub async fn new(config: &Config) -> Result<Arc<Self>, DaemonError> {
4848
let tracker: TaskTracker = TaskTracker::new();
4949
let token: CancellationToken = CancellationToken::new();
5050

51-
let logger = Logger::register_node(&c.private_listen);
52-
debug!(parent: &logger.span, "DrandDaemon initializing: private_listen: {}, control_port: {}, folder: {}", c.private_listen, c.control, c.folder);
51+
let logger = Logger::register_node(&config.private_listen);
52+
debug!(parent: &logger.span,
53+
"DrandDaemon initializing: private_listen: {}, control_port: {}, folder: {}",
54+
config.private_listen, config.control, config.folder);
5355

54-
let (multibeacon_path, beacons) = MultiBeacon::new(&c.folder, c.id.as_deref())?;
56+
let (multibeacon_path, beacons) = MultiBeacon::new(config)?;
5557
let daemon = Arc::new(Self {
5658
tracker,
5759
token,

Diff for: src/core/multibeacon.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::beacon::BeaconCmd;
22
use super::beacon::BeaconID;
33
use super::beacon::BeaconProcess;
44

5+
use crate::cli::Config;
56
use crate::key::store::FileStore;
67
use crate::key::store::FileStoreError;
78
use crate::key::Scheme;
@@ -61,10 +62,10 @@ pub struct MultiBeacon(ArcSwapAny<Arc<Vec<BeaconHandler>>>);
6162
impl MultiBeacon {
6263
/// This call is success only if *all* detected storages has minimal valid structure.
6364
/// Succesfull value contains a turple with valid absolute path to multibeacon folder.
64-
pub fn new(folder: &str, id: Option<&str>) -> Result<(PathBuf, Self), FileStoreError> {
65-
let (multibeacon_path, fstores) = FileStore::read_multibeacon_folder(folder)?;
65+
pub fn new(config: &Config) -> Result<(PathBuf, Self), FileStoreError> {
66+
let (multibeacon_path, fstores) = FileStore::read_multibeacon_folder(&config.folder)?;
6667
let mut handlers = vec![];
67-
match id {
68+
match &config.id {
6869
// Non-empty value means request to load single beacon id
6970
Some(id) => {
7071
let fs = fstores

Diff for: src/key/store.rs

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub enum FileStoreError {
4949
BeaconNotFound,
5050
#[error("beacon id is failed to init, unknown scheme")]
5151
FailedInitID,
52+
#[error("chain_store error: {0}")]
53+
ChainStore(String),
5254
}
5355

5456
/// FileStore holds absolute path of **beacon_id** and abstracts the

Diff for: src/net/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
pub mod control;
22
pub mod dkg_control;
3+
pub mod pool;
34
pub mod protocol;
45
pub mod public;
5-
pub mod utils;
6-
pub mod pool;
6+
pub mod utils;

Diff for: src/net/public.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl PublicClient {
119119
}
120120

121121
pub async fn chain_info(&mut self, beacon_id: &str) -> anyhow::Result<ChainInfoPacket> {
122-
let metadata = Some(Metadata::mimic_version(2, 0, 4, beacon_id, &[]));
122+
let metadata = Some(Metadata::mimic_version(beacon_id, &[]));
123123
let request = ChainInfoRequest { metadata };
124124
let responce = self.client.chain_info(request).await?.into_inner();
125125

Diff for: src/net/utils.rs

+5-10
Original file line numberDiff line numberDiff line change
@@ -168,18 +168,13 @@ impl Metadata {
168168
Ok(metadata)
169169
}
170170

171-
pub fn mimic_version(
172-
major: u32,
173-
minor: u32,
174-
patch: u32,
175-
beacon_id: &str,
176-
chain_hash: &[u8],
177-
) -> Self {
171+
/// Bypass go-version check. This is weird and should be aligned.
172+
pub fn mimic_version(beacon_id: &str, chain_hash: &[u8]) -> Self {
178173
Metadata {
179174
node_version: Some(NodeVersion {
180-
major,
181-
minor,
182-
patch,
175+
major: 2,
176+
minor: 1,
177+
patch: 0,
183178
prerelease: String::new(),
184179
}),
185180
beacon_id: beacon_id.into(),

Diff for: src/store/memstore/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use cursor::MemDbCursor;
22
use std::collections::BTreeMap;
3+
use std::path::Path;
34
use tokio::sync::RwLock;
45
use tonic::async_trait;
56

6-
use super::{Beacon, NewStore, StorageConfig, StorageError, Store};
7+
use super::{Beacon, NewStore, StorageError, Store};
78

89
pub mod cursor;
910

@@ -62,9 +63,8 @@ impl MemStore {
6263
}
6364
}
6465

65-
#[async_trait]
6666
impl NewStore for MemStore {
67-
async fn new(_config: StorageConfig, requires_previous: bool) -> Result<Self, StorageError> {
67+
fn new(_path: &Path, requires_previous: bool) -> Result<Self, StorageError> {
6868
Ok(MemStore {
6969
data: RwLock::new(BTreeMap::new()),
7070
requires_previous,
@@ -162,7 +162,7 @@ mod tests {
162162
fn test_memstore() {
163163
let rt = tokio::runtime::Runtime::new().unwrap();
164164
rt.block_on(async {
165-
let mut store = MemStore::new(StorageConfig::default(), true).await.unwrap();
165+
let mut store = MemStore::new(Path::new("unused"), true).unwrap();
166166
test_store(&store).await;
167167

168168
store.requires_previous = false;

0 commit comments

Comments
 (0)