Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
535 changes: 275 additions & 260 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ clap = { version = "4.5.35", features = ["derive"] }
const_format = { version = "0.2.34" }
criterion = { version = "0.7.0", features = ["async_tokio"] }
ctrlc = { version = "3.4", features = ["termination"] }
exn = { version = "0.1.0-alpha.5" }
exn = { version = "0.1.0" }
fastimer = { version = "0.9.0" }
fastrace = { version = "0.7.9" }
fastrace-opentelemetry = { version = "0.13.0" }
Expand All @@ -69,15 +69,15 @@ jemallocator = { version = "0.5.4" }
jiff = { version = "0.2", features = ["serde"] }
local-ip-address = { version = "0.6.3" }
log = { version = "0.4.27", features = ["kv"] }
logforth = { version = "0.26.0", features = [
logforth = { version = "0.27.0", features = [
"colored",
"append-fastrace",
"append-opentelemetry",
"append-rolling-file",
"diagnostic-fastrace",
"layout-json",
] }
mea = { version = "0.3.11" }
mea = { version = "0.4.2" }
mixtrics = { version = "0.2", features = ["opentelemetry_0_30"] }
opentelemetry = { version = "0.30.0", features = ["trace", "metrics"] }
opentelemetry-otlp = { version = "0.30.0", features = [
Expand All @@ -103,14 +103,14 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
sha2 = { version = "0.10.8" }
shadow-rs = { version = "1.0.0", default-features = false }
sysinfo = { version = "0.36.1" }
sysinfo = { version = "0.37.0" }
tempfile = { version = "3.19.1" }
test-harness = { version = "0.3.0" }
thiserror = { version = "2.0" }
tokio = { version = "1.44.2" }
toml_edit = { version = "0.23.2" }
unindent = { version = "0.2.4" }
uuid = { version = "1.16.0", features = ["v4", "serde"] }
uuid = { version = "1.16.0", features = ["v7", "serde"] }

[workspace.lints.rust]
unknown_lints = "deny"
Expand Down
1 change: 1 addition & 0 deletions cmd/percas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ percas-version = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
toml_edit = { workspace = true, features = ["serde"] }
uuid = { workspace = true }

[dev-dependencies]
sealed_test = { workspace = true }
Expand Down
43 changes: 35 additions & 8 deletions cmd/percas/src/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use percas_metrics::GlobalMetrics;
use percas_server::PercasContext;
use percas_server::server::make_acceptor_and_advertise_addr;
use percas_server::telemetry;
use uuid::Uuid;

use crate::Error;
use crate::config::LoadConfigResult;
Expand All @@ -44,15 +45,25 @@ use crate::config::load_config;
pub struct CommandStart {
#[clap(short, long, help = "Path to config file", value_hint = ValueHint::FilePath)]
config_file: PathBuf,
/// The service name used for telemetry; default to 'scopedb'.
#[clap(short = 's', long = "service-name")]
service_name: Option<String>,
}

impl CommandStart {
pub fn run(self) -> Result<(), Error> {
let LoadConfigResult { config, warnings } = load_config(self.config_file)?;

let node_id = Uuid::now_v7();
let service_name = self.service_name.unwrap_or("percas".to_string()).leak();

let telemetry_runtime = make_telemetry_runtime();
let mut drop_guards =
telemetry::init(&telemetry_runtime, "percas", config.telemetry.clone());
let mut drop_guards = telemetry::init(
&telemetry_runtime,
service_name,
node_id,
config.telemetry.clone(),
);
drop_guards.push(Box::new(telemetry_runtime));
for warning in warnings {
log::warn!("{warning}");
Expand All @@ -61,7 +72,12 @@ impl CommandStart {

let server_runtime = make_server_runtime();
let gossip_runtime = make_gossip_runtime();
server_runtime.block_on(run_server(&server_runtime, &gossip_runtime, config))
server_runtime.block_on(run_server(
&server_runtime,
&gossip_runtime,
node_id,
config,
))
}
}

Expand Down Expand Up @@ -135,7 +151,12 @@ impl From<&ServerConfig> for FlattenConfig {
}
}

async fn run_server(server_rt: &Runtime, gossip_rt: &Runtime, config: Config) -> Result<(), Error> {
async fn run_server(
server_rt: &Runtime,
gossip_rt: &Runtime,
node_id: uuid::Uuid,
config: Config,
) -> Result<(), Error> {
let make_error = || Error("failed to start server".to_string());

let engine = FoyerEngine::try_new(
Expand Down Expand Up @@ -167,8 +188,14 @@ async fn run_server(server_rt: &Runtime, gossip_rt: &Runtime, config: Config) ->
ServerMode::Cluster => {
let advertise_addr = advertise_addr.to_string();
let shutdown_rx = shutdown_rx.clone();
let (proxy, futs) =
run_gossip_proxy(gossip_rt, shutdown_rx, flatten_config, advertise_addr).await?;
let (proxy, futs) = run_gossip_proxy(
gossip_rt,
shutdown_rx,
flatten_config,
node_id,
advertise_addr,
)
.await?;
(Some(proxy), futs)
}
};
Expand Down Expand Up @@ -196,6 +223,7 @@ async fn run_gossip_proxy(
gossip_rt: &Runtime,
shutdown_rx: ShutdownRecv,
flatten_config: FlattenConfig,
node_id: Uuid,
advertise_addr: String,
) -> Result<(Proxy, Vec<GossipFuture>), Error> {
let make_error = || Error("failed to start gossip proxy".to_string());
Expand Down Expand Up @@ -232,8 +260,7 @@ async fn run_gossip_proxy(
node
} else {
let node = NodeInfo::init(
None,
"percas".to_string(),
node_id,
cluster_id,
advertise_addr.clone(),
advertise_peer_addr,
Expand Down
15 changes: 2 additions & 13 deletions crates/cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::ClusterError;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct PersistentNodeInfo {
pub node_id: Uuid,
pub node_name: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leiysky I noticed that the node_name is never used and actually always "percas" now.

The node_id should be necessary to identify a node. Also I'd like to make node_id a settable string, like ScopeDB's node ID. But it may be incompatible with current runnning instance.

Removing node_name field wouldn't be a break since an unknown field is simply ignored.

pub cluster_id: String,
pub incarnation: u64,
}
Expand All @@ -38,7 +37,6 @@ impl From<NodeInfo> for PersistentNodeInfo {
fn from(node_info: NodeInfo) -> Self {
Self {
node_id: node_info.node_id,
node_name: node_info.node_name,
cluster_id: node_info.cluster_id,
incarnation: node_info.incarnation,
}
Expand Down Expand Up @@ -76,24 +74,16 @@ impl PersistentNodeInfo {
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct NodeInfo {
pub node_id: Uuid,
pub node_name: String,
pub cluster_id: String,
pub advertise_addr: String,
pub advertise_peer_addr: String,
pub incarnation: u64,
}

impl NodeInfo {
pub fn init(
node_id: Option<Uuid>,
node_name: String,
cluster_id: String,
addr: String,
peer_addr: String,
) -> Self {
pub fn init(node_id: Uuid, cluster_id: String, addr: String, peer_addr: String) -> Self {
Self {
node_id: node_id.unwrap_or_else(Uuid::new_v4),
node_name,
node_id,
cluster_id,
advertise_addr: addr,
advertise_peer_addr: peer_addr,
Expand All @@ -113,7 +103,6 @@ impl NodeInfo {
if let Some(info) = PersistentNodeInfo::load(path)? {
Ok(Some(Self {
node_id: info.node_id,
node_name: info.node_name,
cluster_id: info.cluster_id,
advertise_addr,
advertise_peer_addr,
Expand Down
2 changes: 1 addition & 1 deletion crates/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ percas-core = { workspace = true }
percas-metrics = { workspace = true }
poem = { workspace = true }
scopeguard = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }

[lints]
workspace = true
10 changes: 8 additions & 2 deletions crates/server/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use percas_core::TracesConfig;
pub fn init(
rt: &Runtime,
service_name: &'static str,
node_id: uuid::Uuid,
config: TelemetryConfig,
) -> Vec<Box<dyn Send + Sync + 'static>> {
let mut drop_guards = vec![];
Expand All @@ -41,7 +42,7 @@ pub fn init(
if let Some(traces) = &config.traces {
drop_guards.extend(init_traces(rt, service_name, traces));
}
drop_guards.extend(init_logs(rt, service_name, &config));
drop_guards.extend(init_logs(rt, service_name, node_id, &config));
drop_guards
}

Expand Down Expand Up @@ -131,9 +132,14 @@ fn init_traces(
fn init_logs(
rt: &Runtime,
service_name: &'static str,
node_id: uuid::Uuid,
config: &TelemetryConfig,
) -> Vec<Box<dyn Send + Sync + 'static>> {
let static_diagnostic = StaticDiagnostic::default();
let static_diagnostic = {
let mut static_diagnostic = StaticDiagnostic::default();
static_diagnostic.insert("node_id", node_id.to_string());
static_diagnostic
};

let mut drop_guards: Vec<Box<dyn Send + Sync + 'static>> = Vec::new();
let mut builder = logforth::builder();
Expand Down
1 change: 1 addition & 0 deletions tests/toolkit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ percas-core = { workspace = true }
percas-server = { workspace = true }
regex = { workspace = true }
tempfile = { workspace = true }
uuid = { workspace = true }

[lints]
workspace = true
7 changes: 5 additions & 2 deletions tests/toolkit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@ impl TestServerState {
}
}

pub fn start_test_server(_test_name: &str, rt: &Runtime) -> Option<TestServerState> {
pub fn start_test_server(test_name: &str, rt: &Runtime) -> Option<TestServerState> {
let service_name = format!("testkit:harness:{test_name}").leak();

let mut drop_guard = Vec::<DropGuard>::new();
drop_guard.extend(
telemetry::init(
rt,
"percas",
service_name,
uuid::Uuid::now_v7(),
TelemetryConfig {
logs: LogsConfig::disabled(),
traces: None,
Expand Down