Skip to content

Commit 01bd319

Browse files
committed
dkg: add initial handler with requests
1 parent e58d9dd commit 01bd319

File tree

10 files changed

+364
-55
lines changed

10 files changed

+364
-55
lines changed

Diff for: src/core/beacon.rs

+33-7
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@ use crate::key::toml::PairToml;
88
use crate::key::toml::Toml;
99
use crate::key::ConversionError;
1010
use crate::key::Scheme;
11-
use crate::protobuf::drand::ChainInfoPacket;
12-
use crate::protobuf::drand::IdentityResponse;
11+
12+
use crate::dkg::dkg_handler::DkgActions;
13+
use crate::dkg::dkg_handler::DkgHandler;
1314
use crate::store::ChainStore;
1415
use crate::store::NewStore;
1516

17+
use crate::protobuf::drand::ChainInfoPacket;
18+
use crate::protobuf::drand::IdentityResponse;
19+
1620
use tracing::debug;
1721
use tracing::error;
1822
use tracing::info_span;
@@ -44,14 +48,15 @@ pub enum BeaconCmd {
4448
IdentityRequest(Callback<IdentityResponse, ConversionError>),
4549
Sync(Callback<Arc<ChainStore>, &'static str>),
4650
ChainInfo(Callback<ChainInfoPacket, &'static str>),
51+
DkgActions(DkgActions),
4752
}
4853

4954
#[derive(thiserror::Error, Debug)]
5055
/// For diagnostic, should not be possible.
5156
#[error("callback receiver has already been dropped")]
5257
struct SendError;
5358

54-
#[derive(PartialEq, Eq, Clone)]
59+
#[derive(PartialEq, Eq)]
5560
pub struct BeaconID {
5661
inner: Arc<str>,
5762
}
@@ -101,10 +106,6 @@ impl<S: Scheme> BeaconProcess<S> {
101106
Ok(process)
102107
}
103108

104-
pub fn tracker(&self) -> &TaskTracker {
105-
&self.tracker
106-
}
107-
108109
pub fn run(fs: FileStore, pair: PairToml, id: &str) -> Result<BeaconHandler, FileStoreError> {
109110
let chain_handler = ChainHandler::try_init(&fs, id)?;
110111
let chain_store = Arc::new(ChainStore::new(&fs.db_path(), S::Beacon::is_chained())?);
@@ -113,6 +114,8 @@ impl<S: Scheme> BeaconProcess<S> {
113114

114115
let (tx, mut rx) = mpsc::channel::<BeaconCmd>(5);
115116
tracker.spawn(async move {
117+
let dkg_handler = DkgHandler::fresh_install(node.clone());
118+
116119
while let Some(cmd) = rx.recv().await {
117120
match cmd {
118121
BeaconCmd::Shutdown(callback) => {
@@ -136,6 +139,7 @@ impl<S: Scheme> BeaconProcess<S> {
136139
error!("failed to send chain_info, receiver is dropped")
137140
}
138141
}
142+
BeaconCmd::DkgActions(action) => dkg_handler.new_action(action).await,
139143
}
140144
}
141145
});
@@ -173,6 +177,14 @@ impl<S: Scheme> BeaconProcess<S> {
173177

174178
Ok(())
175179
}
180+
181+
pub fn tracker(&self) -> &TaskTracker {
182+
&self.tracker
183+
}
184+
185+
pub fn id(&self) -> &BeaconID {
186+
&self.beacon_id
187+
}
176188
}
177189

178190
impl<S: Scheme> std::ops::Deref for BeaconProcess<S> {
@@ -182,3 +194,17 @@ impl<S: Scheme> std::ops::Deref for BeaconProcess<S> {
182194
&self.0
183195
}
184196
}
197+
198+
impl<S: Scheme> Clone for BeaconProcess<S> {
199+
fn clone(&self) -> Self {
200+
Self(Arc::clone(&self.0))
201+
}
202+
}
203+
204+
impl Clone for BeaconID {
205+
fn clone(&self) -> Self {
206+
Self {
207+
inner: Arc::clone(&self.inner),
208+
}
209+
}
210+
}

Diff for: src/dkg/dkg_handler.rs

+126
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use super::state_machine::db_state::DBState;
2+
use super::state_machine::db_state::DBStateError;
3+
4+
use crate::core::beacon::BeaconProcess;
5+
use crate::key::keys::Identity;
6+
use crate::key::Scheme;
7+
8+
use crate::protobuf::dkg::packet::Bundle;
9+
use crate::protobuf::dkg::DkgStatusResponse;
10+
use crate::transport::dkg::DkgCommand;
11+
use crate::transport::dkg::GossipData;
12+
use crate::transport::dkg::GossipPacket;
13+
14+
use tokio::sync::mpsc;
15+
use tokio::sync::oneshot;
16+
use tracing::debug;
17+
use tracing::error;
18+
19+
/// Callback for all dkg actions
20+
pub type DkgCallback<T> = tokio::sync::oneshot::Sender<Result<T, DkgActionsError>>;
21+
22+
/// For diagnostic purposes, should be never possible
23+
const ERR_SEND: &str = "dkg receiver is dropped";
24+
25+
pub struct DkgHandler {
26+
sender: mpsc::Sender<DkgActions>,
27+
}
28+
29+
pub enum DkgActions {
30+
Gossip(GossipPacket, DkgCallback<()>),
31+
Shutdown(DkgCallback<()>),
32+
Command(DkgCommand, DkgCallback<()>),
33+
Broadcast(Bundle, DkgCallback<()>),
34+
Status(DkgCallback<DkgStatusResponse>),
35+
}
36+
37+
#[derive(thiserror::Error, Debug)]
38+
pub enum DkgActionsError {
39+
#[error("db state error: {0}")]
40+
DBState(#[from] DBStateError),
41+
#[error("TODO: this dkg action is not implemented yet")]
42+
TODO,
43+
}
44+
45+
impl DkgHandler {
46+
pub async fn new_action(&self, action: DkgActions) {
47+
if self.sender.send(action).await.is_err() {
48+
error!("{ERR_SEND}")
49+
}
50+
}
51+
52+
pub fn fresh_install<S: Scheme>(bp: BeaconProcess<S>) -> Self {
53+
let (sender, mut receiver) = mpsc::channel::<DkgActions>(1);
54+
let tracker = bp.tracker().clone();
55+
56+
tracker.spawn(async move {
57+
let me = bp.keypair.public_identity();
58+
let mut db_state = DBState::<S>::new_fresh(bp.id());
59+
60+
while let Some(actions) = receiver.recv().await {
61+
handle_actions(&mut db_state, me, actions).await;
62+
}
63+
64+
debug!("dkg handler is shutting down")
65+
});
66+
67+
Self { sender }
68+
}
69+
}
70+
71+
async fn handle_actions<S: Scheme>(db: &mut DBState<S>, me: &Identity<S>, actions: DkgActions) {
72+
match actions {
73+
DkgActions::Shutdown(callback) => todo_request(callback, "shutdown"),
74+
DkgActions::Status(callback) => status(db, callback),
75+
DkgActions::Command(command, callback) => handle_command(command, callback).await,
76+
DkgActions::Gossip(gossip, callback) => handle_gossip(db, me, gossip, callback).await,
77+
DkgActions::Broadcast(_bundle, callback) => todo_request(callback, "broadcast"),
78+
}
79+
}
80+
81+
/// Dkg status request
82+
fn status<S: Scheme>(db: &mut DBState<S>, callback: DkgCallback<DkgStatusResponse>) {
83+
if callback
84+
.send(db.status().map_err(|err| {
85+
error!("dkg status request, id: {}, error: {err}", db.id());
86+
err.into()
87+
}))
88+
.is_err()
89+
{
90+
error!("dkg status request, id: {}, error: {ERR_SEND}", db.id());
91+
};
92+
}
93+
94+
async fn handle_command(dkg_command: DkgCommand, callback: DkgCallback<()>) {
95+
let _metadata = &dkg_command.metadata;
96+
todo_request(callback, dkg_command.command.to_string().as_str())
97+
}
98+
99+
async fn handle_gossip<S: Scheme>(
100+
db: &mut DBState<S>,
101+
me: &Identity<S>,
102+
gossip: crate::transport::dkg::GossipPacket,
103+
callback: oneshot::Sender<Result<(), DkgActionsError>>,
104+
) {
105+
let metadata = &gossip.metadata;
106+
match gossip.data {
107+
GossipData::Proposal(proposal) => {
108+
if callback
109+
.send(db.proposed(me, proposal, metadata).map_err(|err| {
110+
error!("received dkg proposal, id: {}, error: {err}", db.id());
111+
err.into()
112+
}))
113+
.is_err()
114+
{
115+
error!("received dkg proposal, id: {}, error: {ERR_SEND}", db.id());
116+
};
117+
}
118+
_ => todo_request(callback, gossip.data.to_string().as_str()),
119+
}
120+
}
121+
122+
fn todo_request<T>(callback: DkgCallback<T>, kind: &str) {
123+
if callback.send(Err(DkgActionsError::TODO)).is_err() {
124+
error!("failed to proceed {kind}, error: {ERR_SEND}",);
125+
};
126+
}

Diff for: src/dkg/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
pub mod dkg_handler;
12
pub mod state_machine;

Diff for: src/dkg/state_machine/db_state.rs

+12-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::status::StateError;
22
use super::status::Status;
33

4+
use crate::core::beacon::BeaconID;
45
use crate::key::group::minimum_t;
56
use crate::key::group::Group;
67
use crate::key::keys::Identity;
@@ -17,6 +18,7 @@ use crate::transport::dkg::Timestamp;
1718

1819
use crate::net::utils::Seconds;
1920
use std::time::SystemTime;
21+
use tracing::debug;
2022

2123
#[derive(thiserror::Error, Debug)]
2224
pub enum DBStateError {
@@ -106,7 +108,7 @@ pub enum DBStateError {
106108
ReceivedRejection,
107109
#[error("dkg state error: {0}")]
108110
InvalidStateChange(#[from] StateError),
109-
#[error("dkg: conversion: {0}")]
111+
#[error("conversion: {0}")]
110112
ConversionError(#[from] ConversionError),
111113
#[error("the key's scheme may not match the beacon's scheme")]
112114
InvalidKeyScheme,
@@ -115,7 +117,7 @@ pub enum DBStateError {
115117
#[derive(PartialEq)]
116118
pub struct DBState<S: Scheme> {
117119
// Parameters
118-
beacon_id: String,
120+
beacon_id: BeaconID,
119121
epoch: u32,
120122
state: Status,
121123
threshold: u32,
@@ -137,6 +139,10 @@ pub struct DBState<S: Scheme> {
137139
}
138140

139141
impl<S: Scheme> DBState<S> {
142+
pub fn id(&self) -> &BeaconID {
143+
&self.beacon_id
144+
}
145+
140146
pub fn show_status(&self) -> &super::status::Status {
141147
&self.state
142148
}
@@ -152,7 +158,7 @@ impl<S: Scheme> DBState<S> {
152158
}
153159

154160
/// Returns default DBState representation which is used only at fresh state.
155-
pub fn new_fresh(beacon_id: &str) -> Self {
161+
pub fn new_fresh(beacon_id: &BeaconID) -> Self {
156162
Self {
157163
beacon_id: beacon_id.to_owned(),
158164
genesis_time: Timestamp {
@@ -223,7 +229,7 @@ impl<S: Scheme> DBState<S> {
223229
// TODO: Complete entry should be updated if dkg is finished.
224230
complete: None,
225231
current: Some(DkgEntry {
226-
beacon_id: beacon_id.to_owned(),
232+
beacon_id: beacon_id.to_string(),
227233
state: *state as u32,
228234
epoch: *epoch,
229235
threshold: *threshold,
@@ -312,6 +318,7 @@ impl<S: Scheme> DBState<S> {
312318
return Err(DBStateError::SelfMissingFromProposal);
313319
}
314320

321+
debug!("received proposal is valid");
315322
*self = proposed;
316323

317324
Ok(())
@@ -363,7 +370,7 @@ impl<S: Scheme> TryFrom<ProposalTerms> for DBState<S> {
363370
} = value;
364371

365372
let state = DBState::<S> {
366-
beacon_id,
373+
beacon_id: BeaconID::new(beacon_id),
367374
epoch,
368375
state: Status::Proposed,
369376
threshold,

0 commit comments

Comments
 (0)