Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/percas/src/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ struct FlattenConfig {
listen_peer_addr: Option<String>,
advertise_peer_addr: Option<String>,
initial_peer_addrs: Option<Vec<String>>,
cluster_id: Option<String>,
}

impl From<&ServerConfig> for FlattenConfig {
Expand All @@ -101,6 +102,7 @@ impl From<&ServerConfig> for FlattenConfig {
listen_peer_addr: None,
advertise_peer_addr: None,
initial_peer_addrs: None,
cluster_id: None,
},
ServerConfig::Cluster {
dir,
Expand All @@ -109,6 +111,7 @@ impl From<&ServerConfig> for FlattenConfig {
listen_peer_addr,
advertise_peer_addr,
initial_advertise_peer_addrs,
cluster_id,
} => FlattenConfig {
mode: ServerMode::Cluster,
dir: dir.clone(),
Expand All @@ -117,6 +120,7 @@ impl From<&ServerConfig> for FlattenConfig {
listen_peer_addr: Some(listen_peer_addr.clone()),
advertise_peer_addr: advertise_peer_addr.clone(),
initial_peer_addrs: initial_advertise_peer_addrs.clone(),
cluster_id: Some(cluster_id.clone()),
},
}
}
Expand Down Expand Up @@ -153,6 +157,9 @@ async fn run_server(rt: &Runtime, config: Config) -> Result<(), Error> {
let initial_peer_addrs = flatten_config.initial_peer_addrs.ok_or_else(|| {
Error("initial peer addresses are required for cluster mode".to_string())
})?;
let cluster_id = flatten_config
.cluster_id
.ok_or_else(|| Error("cluster id is required for cluster mode".to_string()))?;

let current_node = if let Some(mut node) =
NodeInfo::load(&node_file_path(&flatten_config.dir)).change_context_lazy(make_error)?
Expand All @@ -165,6 +172,7 @@ async fn run_server(rt: &Runtime, config: Config) -> Result<(), Error> {
let node = NodeInfo::init(
None,
"percas".to_string(),
cluster_id,
advertise_addr.clone(),
advertise_peer_addr,
);
Expand Down
23 changes: 15 additions & 8 deletions crates/cluster/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ impl GossipState {
log::debug!("received message: {:?}", message);
let result = match message {
Message::Ping(info) => {
if let Some(current) = self.membership.read().unwrap().members().get(&info.id) {
if let Some(current) = self.membership.read().unwrap().members().get(&info.node_id)
{
if current.info.incarnation < info.incarnation {
self.membership.write().unwrap().update_member(MemberState {
info: info.clone(),
Expand All @@ -146,7 +147,8 @@ impl GossipState {
Some(Message::Ack(self.current()))
}
Message::Ack(info) => {
if let Some(current) = self.membership.read().unwrap().members().get(&info.id) {
if let Some(current) = self.membership.read().unwrap().members().get(&info.node_id)
{
if current.info.incarnation < info.incarnation {
self.membership.write().unwrap().update_member(MemberState {
info: info.clone(),
Expand All @@ -161,7 +163,7 @@ impl GossipState {
Message::Sync { members } => {
let snapshot = self.membership.read().unwrap().members().clone();
for member in members {
if let Some(current) = snapshot.get(&member.info.id) {
if let Some(current) = snapshot.get(&member.info.node_id) {
// Update the member state
if current.heartbeat < member.heartbeat
&& current.info.incarnation < member.info.incarnation
Expand Down Expand Up @@ -189,7 +191,12 @@ impl GossipState {
}
};

if self.membership.read().unwrap().is_dead(self.current().id) {
if self
.membership
.read()
.unwrap()
.is_dead(self.current().node_id)
{
log::info!("current node is marked as dead, advancing incarnation");
self.advance_incarnation();
}
Expand Down Expand Up @@ -222,7 +229,7 @@ impl GossipState {
.collect();

for dead_member in &dead_members {
members.remove_member(dead_member.id);
members.remove_member(dead_member.node_id);
}

dead_members
Expand All @@ -232,7 +239,7 @@ impl GossipState {
let message = Message::Ping(self.current());
let do_send = || async {
self.transport
.send(&peer.peer_addr, &message)
.send(&peer.advertise_peer_addr, &message)
.await
.inspect_err(|e| log::error!("failed to send ping message: {:?}", e))
};
Expand All @@ -259,7 +266,7 @@ impl GossipState {
};
let do_send = || async {
self.transport
.send(&peer.peer_addr, &message)
.send(&peer.advertise_peer_addr, &message)
.await
.inspect_err(|e| log::error!("failed to send sync message: {:?}", e))
};
Expand Down Expand Up @@ -329,7 +336,7 @@ impl GossipState {

fn mark_dead(&self, peer: &NodeInfo) {
let mut members = self.membership.write().unwrap();
if let Some(last_seen) = members.members().get(&peer.id).map(|m| m.heartbeat) {
if let Some(last_seen) = members.members().get(&peer.node_id).map(|m| m.heartbeat) {
let member = MemberState {
info: peer.clone(),
status: MemberStatus::Dead,
Expand Down
2 changes: 1 addition & 1 deletion crates/cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Membership {
}

pub fn update_member(&mut self, member: MemberState) {
match self.members.entry(member.info.id) {
match self.members.entry(member.info.node_id) {
Entry::Occupied(mut entry) => {
let current = entry.get_mut();
if current.info.incarnation < member.info.incarnation {
Expand Down
26 changes: 17 additions & 9 deletions crates/cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,28 @@ use crate::ClusterError;

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct NodeInfo {
pub id: Uuid,
pub name: String,
pub addr: String,
pub peer_addr: String,
pub node_id: Uuid,
pub node_name: String,
pub cluster_id: String,
pub advertise_addr: String,
pub advertise_peer_addr: String,
pub incarnation: u64,
}

impl NodeInfo {
pub fn init(id: Option<Uuid>, name: String, addr: String, peer_addr: String) -> Self {
pub fn init(
node_id: Option<Uuid>,
node_name: String,
cluster_id: String,
addr: String,
peer_addr: String,
) -> Self {
Self {
id: id.unwrap_or_else(Uuid::new_v4),
name,
addr,
peer_addr,
node_id: node_id.unwrap_or_else(Uuid::new_v4),
node_name,
cluster_id,
advertise_addr: addr,
advertise_peer_addr: peer_addr,
incarnation: 0,
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/cluster/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ impl Proxy {
false
}) {
if let Some(target) = members.get(&id) {
if target.info.id == self.gossip.current().id {
if target.info.node_id == self.gossip.current().node_id {
return RouteDest::Local;
}

RouteDest::RemoteAddr(target.info.addr.clone())
RouteDest::RemoteAddr(target.info.advertise_addr.clone())
} else {
RouteDest::Local
}
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub enum ServerConfig {
advertise_peer_addr: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
initial_advertise_peer_addrs: Option<Vec<String>>,
#[serde(default = "default_cluster_id")]
cluster_id: String,
},
}

Expand Down Expand Up @@ -81,6 +83,10 @@ fn default_data_dir() -> PathBuf {
PathBuf::from("/var/lib/percas/data")
}

fn default_cluster_id() -> String {
"percas-cluster".to_string()
}

pub fn node_file_path(base_dir: &Path) -> PathBuf {
base_dir.join("node.json")
}
Expand Down
Loading