Skip to content

Commit 07c61d2

Browse files
committed
dkg: add dkg_join + test
1 parent 51a9a5a commit 07c61d2

20 files changed

+364
-156
lines changed

Diff for: src/cli.rs

+32-4
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,19 @@ use crate::key::Scheme;
77

88
use crate::net::control;
99
use crate::net::control::ControlClient;
10+
use crate::net::dkg_control::DkgControlClient;
1011
use crate::net::protocol;
1112
use crate::net::utils::Address;
1213
use crate::net::utils::ControlListener;
1314
use crate::net::utils::NodeListener;
1415

15-
use anyhow::bail;
16-
use anyhow::Result;
1716
use clap::arg;
1817
use clap::command;
1918
use clap::Parser;
19+
use clap::Subcommand;
20+
21+
use anyhow::bail;
22+
use anyhow::Result;
2023
use energon::drand::schemes::*;
2124

2225
/// Generate the long-term keypair (drand.private, drand.public) for this node, and load it on the drand daemon if it is up and running
@@ -39,7 +42,7 @@ pub struct KeyGenConfig {
3942
}
4043

4144
/// Start the drand daemon.
42-
#[derive(Debug, Parser, Clone)] //TODO: mv Clone to tests
45+
#[derive(Debug, Parser, Clone)]
4346
pub struct Config {
4447
/// Folder to keep all drand cryptographic information, with absolute path.
4548
#[arg(long, default_value_t = FileStore::drand_home())]
@@ -81,6 +84,19 @@ pub struct SyncConfig {
8184
pub follow: bool,
8285
}
8386

87+
#[derive(Subcommand, Clone, Debug)]
88+
/// Commands for interacting with the DKG
89+
pub enum Dkg {
90+
Join {
91+
/// Set the port you want to listen to for control port commands. If not specified, we will use the default value.
92+
#[arg(long, default_value = control::DEFAULT_CONTROL_PORT)]
93+
control: String,
94+
/// Indicates the id for the randomness generation process which will be started
95+
#[arg(long)]
96+
id: String,
97+
},
98+
}
99+
84100
#[derive(Debug, Parser, Clone)]
85101
#[command(name = "git")]
86102
#[command(about = "", long_about = None)]
@@ -114,6 +130,8 @@ pub enum Cmd {
114130
id: String,
115131
},
116132
Sync(SyncConfig),
133+
#[command(subcommand)]
134+
Dkg(Dkg),
117135
}
118136

119137
impl CLI {
@@ -128,6 +146,9 @@ impl CLI {
128146
Cmd::Load { control, id } => load_cmd(&control, &id).await?,
129147
Cmd::Stop { control, id } => stop_cmd(&control, id.as_deref()).await?,
130148
Cmd::Sync(config) => sync_cmd(config).await?,
149+
Cmd::Dkg(dkg) => match dkg {
150+
Dkg::Join { control, id } => join_cmd(&control, &id).await?,
151+
},
131152
}
132153

133154
Ok(())
@@ -200,7 +221,7 @@ async fn load_cmd(control_port: &str, beacon_id: &str) -> Result<()> {
200221
Ok(())
201222
}
202223

203-
pub async fn stop_cmd(control_port: &str, beacon_id: Option<&str>) -> anyhow::Result<()> {
224+
async fn stop_cmd(control_port: &str, beacon_id: Option<&str>) -> anyhow::Result<()> {
204225
let mut conn = ControlClient::new(control_port).await?;
205226

206227
match conn.shutdown(beacon_id).await {
@@ -229,3 +250,10 @@ async fn sync_cmd(config: SyncConfig) -> Result<()> {
229250

230251
Ok(())
231252
}
253+
254+
async fn join_cmd(control_port: &str, beacon_id: &str) -> Result<()> {
255+
let mut client = DkgControlClient::new(control_port).await?;
256+
client.dkg_join(beacon_id).await?;
257+
258+
Ok(())
259+
}

Diff for: src/core/beacon.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::key::toml::Toml;
99
use crate::key::ConversionError;
1010
use crate::key::Scheme;
1111

12-
use crate::dkg::dkg_handler::DkgActions;
12+
use crate::dkg::dkg_handler::Actions;
1313
use crate::dkg::dkg_handler::DkgHandler;
1414
use crate::store::ChainStore;
1515
use crate::store::NewStore;
@@ -48,7 +48,7 @@ pub enum BeaconCmd {
4848
IdentityRequest(Callback<IdentityResponse, ConversionError>),
4949
Sync(Callback<Arc<ChainStore>, &'static str>),
5050
ChainInfo(Callback<ChainInfoPacket, &'static str>),
51-
DkgActions(DkgActions),
51+
DkgActions(Actions),
5252
}
5353

5454
#[derive(thiserror::Error, Debug)]

Diff for: src/dkg/actions_active.rs

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
//! This module contains all the DKG actions that require user interaction:
2+
//! creating a network, accepting or rejecting a DKG, getting the status, etc.
3+
//! Both leader and follower interactions are contained herein
4+
5+
use super::dkg_handler::ActionsError;
6+
use super::state::State;
7+
8+
use crate::key::group::Group;
9+
use crate::key::keys::Identity;
10+
use crate::key::Scheme;
11+
12+
use crate::protobuf::dkg::DkgStatusResponse;
13+
use crate::protobuf::dkg::JoinOptions;
14+
use crate::transport::dkg::Command;
15+
16+
use tracing::info;
17+
18+
pub(super) async fn command<S: Scheme>(
19+
cmd: Command,
20+
state: &mut State<S>,
21+
me: &Identity<S>,
22+
) -> Result<(), ActionsError> {
23+
info!("running DKG command: {cmd}, beaconID: {}", state.id());
24+
25+
match cmd {
26+
Command::Join(join_options) => start_join(me, state, join_options).await,
27+
_ => super::dkg_handler::todo_request(&cmd),
28+
}
29+
}
30+
31+
pub(super) async fn start_join<S: Scheme>(
32+
me: &Identity<S>,
33+
state: &mut State<S>,
34+
options: JoinOptions,
35+
) -> Result<(), ActionsError> {
36+
let prev_group: Option<Group<S>> = if state.epoch() > 1 {
37+
// TODO: reshape
38+
drop(options);
39+
panic!("start_join: epoch can not be bigger than 1, reshape is not implemented yet")
40+
} else {
41+
None
42+
};
43+
44+
state
45+
.joined(me, prev_group)
46+
.map_err(ActionsError::DBState)?;
47+
// joiners don't need to gossip anything
48+
49+
Ok(())
50+
}
51+
52+
/// TODO: this method should make request to dkg.db
53+
pub(super) fn status<S: Scheme>(state: &State<S>) -> Result<DkgStatusResponse, ActionsError> {
54+
state.status().map_err(ActionsError::DBState)
55+
}
File renamed without changes.

Diff for: src/dkg/dkg_handler.rs

+64-66
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,85 @@
1-
use super::state_machine::db_state::DBState;
2-
use super::state_machine::db_state::DBStateError;
1+
use std::fmt::Display;
2+
3+
use super::actions_active as active;
4+
use super::state::DBStateError;
5+
use super::state::State;
36

47
use crate::core::beacon::BeaconProcess;
58
use crate::key::keys::Identity;
69
use crate::key::Scheme;
710

811
use crate::protobuf::dkg::packet::Bundle;
912
use crate::protobuf::dkg::DkgStatusResponse;
10-
use crate::transport::dkg::DkgCommand;
13+
use crate::transport::dkg::Command;
1114
use crate::transport::dkg::GossipData;
1215
use crate::transport::dkg::GossipPacket;
1316

1417
use tokio::sync::mpsc;
1518
use tokio::sync::oneshot;
1619
use tracing::debug;
1720
use tracing::error;
21+
use tracing::warn;
22+
23+
/// Callback for DKG actions
24+
pub struct Callback<T> {
25+
inner: oneshot::Sender<Result<T, ActionsError>>,
26+
}
27+
28+
/// For diagnostic purposes, this should never happen
29+
const ERR_SEND: &str = "dkg callback receiver is dropped";
1830

19-
/// Callback for all dkg actions
20-
pub type DkgCallback<T> = tokio::sync::oneshot::Sender<Result<T, DkgActionsError>>;
31+
impl<T> Callback<T> {
32+
/// Sends a response back through the callback channel.
33+
fn reply(self, result: Result<T, ActionsError>) {
34+
if let Err(err) = &result {
35+
error!("failed to proceed dkg request: {err}")
36+
}
37+
if self.inner.send(result).is_err() {
38+
error!("{ERR_SEND}");
39+
};
40+
}
41+
}
2142

22-
/// For diagnostic purposes, should be never possible
23-
const ERR_SEND: &str = "dkg receiver is dropped";
43+
impl<T> From<oneshot::Sender<Result<T, ActionsError>>> for Callback<T> {
44+
fn from(tx: oneshot::Sender<Result<T, ActionsError>>) -> Self {
45+
Self { inner: tx }
46+
}
47+
}
2448

2549
pub struct DkgHandler {
26-
sender: mpsc::Sender<DkgActions>,
50+
sender: mpsc::Sender<Actions>,
2751
}
2852

29-
pub enum DkgActions {
30-
Gossip(GossipPacket, DkgCallback<()>),
31-
Shutdown(DkgCallback<()>),
32-
Command(DkgCommand, DkgCallback<()>),
33-
Broadcast(Bundle, DkgCallback<()>),
34-
Status(DkgCallback<DkgStatusResponse>),
53+
pub enum Actions {
54+
Gossip(GossipPacket, Callback<()>),
55+
Shutdown(Callback<()>),
56+
Command(Command, Callback<()>),
57+
Broadcast(Bundle, Callback<()>),
58+
Status(Callback<DkgStatusResponse>),
3559
}
3660

3761
#[derive(thiserror::Error, Debug)]
38-
pub enum DkgActionsError {
62+
pub enum ActionsError {
3963
#[error("db state error: {0}")]
4064
DBState(#[from] DBStateError),
4165
#[error("TODO: this dkg action is not implemented yet")]
4266
TODO,
4367
}
4468

4569
impl DkgHandler {
46-
pub async fn new_action(&self, action: DkgActions) {
70+
pub async fn new_action(&self, action: Actions) {
4771
if self.sender.send(action).await.is_err() {
4872
error!("{ERR_SEND}")
4973
}
5074
}
5175

5276
pub fn fresh_install<S: Scheme>(bp: BeaconProcess<S>) -> Self {
53-
let (sender, mut receiver) = mpsc::channel::<DkgActions>(1);
77+
let (sender, mut receiver) = mpsc::channel::<Actions>(1);
5478
let tracker = bp.tracker().clone();
5579

5680
tracker.spawn(async move {
5781
let me = bp.keypair.public_identity();
58-
let mut db_state = DBState::<S>::new_fresh(bp.id());
82+
let mut db_state = State::<S>::new_fresh(bp.id());
5983

6084
while let Some(actions) = receiver.recv().await {
6185
handle_actions(&mut db_state, me, actions).await;
@@ -68,59 +92,33 @@ impl DkgHandler {
6892
}
6993
}
7094

71-
async fn handle_actions<S: Scheme>(db: &mut DBState<S>, me: &Identity<S>, actions: DkgActions) {
72-
match actions {
73-
DkgActions::Shutdown(callback) => todo_request(callback, "shutdown"),
74-
DkgActions::Status(callback) => status(db, callback),
75-
DkgActions::Command(command, callback) => handle_command(command, callback).await,
76-
DkgActions::Gossip(gossip, callback) => handle_gossip(db, me, gossip, callback).await,
77-
DkgActions::Broadcast(_bundle, callback) => todo_request(callback, "broadcast"),
95+
async fn handle_actions<S: Scheme>(db: &mut State<S>, me: &Identity<S>, request: Actions) {
96+
match request {
97+
Actions::Shutdown(callback) => callback.reply(todo_request("shutdown")),
98+
Actions::Status(callback) => callback.reply(active::status(db)),
99+
Actions::Command(cmd, callback) => callback.reply(active::command(cmd, db, me).await),
100+
Actions::Gossip(packet, callback) => callback.reply(gossip(db, me, packet).await),
101+
Actions::Broadcast(bundle, callback) => callback.reply(todo_request(&bundle.to_string())),
78102
}
79103
}
80104

81-
/// Dkg status request
82-
fn status<S: Scheme>(db: &mut DBState<S>, callback: DkgCallback<DkgStatusResponse>) {
83-
if callback
84-
.send(db.status().map_err(|err| {
85-
error!("dkg status request, id: {}, error: {err}", db.id());
86-
err.into()
87-
}))
88-
.is_err()
89-
{
90-
error!("dkg status request, id: {}, error: {ERR_SEND}", db.id());
91-
};
92-
}
93-
94-
async fn handle_command(dkg_command: DkgCommand, callback: DkgCallback<()>) {
95-
let _metadata = &dkg_command.metadata;
96-
todo_request(callback, dkg_command.command.to_string().as_str())
97-
}
98-
99-
async fn handle_gossip<S: Scheme>(
100-
db: &mut DBState<S>,
105+
async fn gossip<S: Scheme>(
106+
db: &mut State<S>,
101107
me: &Identity<S>,
102-
gossip: crate::transport::dkg::GossipPacket,
103-
callback: oneshot::Sender<Result<(), DkgActionsError>>,
104-
) {
105-
let metadata = &gossip.metadata;
106-
match gossip.data {
107-
GossipData::Proposal(proposal) => {
108-
if callback
109-
.send(db.proposed(me, proposal, metadata).map_err(|err| {
110-
error!("received dkg proposal, id: {}, error: {err}", db.id());
111-
err.into()
112-
}))
113-
.is_err()
114-
{
115-
error!("received dkg proposal, id: {}, error: {ERR_SEND}", db.id());
116-
};
117-
}
118-
_ => todo_request(callback, gossip.data.to_string().as_str()),
108+
packet: GossipPacket,
109+
) -> Result<(), ActionsError> {
110+
let meta = &packet.metadata;
111+
match packet.data {
112+
GossipData::Proposal(proposal) => db.proposed(me, proposal, meta)?,
113+
_ => todo_request(&packet.data)?,
119114
}
115+
116+
Ok(())
120117
}
121118

122-
fn todo_request<T>(callback: DkgCallback<T>, kind: &str) {
123-
if callback.send(Err(DkgActionsError::TODO)).is_err() {
124-
error!("failed to proceed {kind}, error: {ERR_SEND}",);
125-
};
119+
/// Temporary tracker for unfinished logic
120+
pub fn todo_request<D: Display + ?Sized>(kind: &D) -> Result<(), ActionsError> {
121+
warn!("received TODO request: {kind}");
122+
123+
Err(ActionsError::TODO)
126124
}

Diff for: src/dkg/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
1+
pub mod actions_active;
2+
pub(super) mod convert;
13
pub mod dkg_handler;
2-
pub mod state_machine;
4+
pub mod state;
5+
pub mod status;

0 commit comments

Comments
 (0)