Skip to content

Commit 38080c6

Browse files
committed
fix(config): allow configuring topo dcs via map, fix pg ssl mode config
1 parent bc5319a commit 38080c6

File tree

15 files changed

+284
-156
lines changed

15 files changed

+284
-156
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/artifacts/config-schema.json

Lines changed: 25 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/docker/template/src/services/edge/rivet-engine.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,17 @@ export function generateDatacenterRivetEngine(
1010
) {
1111
const clickhouseHost =
1212
context.config.networkMode === "host" ? "127.0.0.1" : "clickhouse";
13-
const datacenters = [];
13+
const datacenters: Record<string, any> = {};
1414

1515
for (const dc of context.config.datacenters) {
1616
const serviceHost = context.getServiceHost("rivet-engine", dc.name, 0);
17-
datacenters.push({
18-
name: dc.name,
17+
datacenters[dc.name] = {
1918
datacenter_label: dc.id,
2019
is_leader: dc.id === 1,
2120
peer_url: `http://${serviceHost}:${API_PEER_PORT}`,
2221
public_url: `http://${serviceHost}:${GUARD_PORT}`,
2322
valid_hosts: [`${serviceHost}`, `127.0.0.1`, `localhost`],
24-
});
23+
};
2524
}
2625

2726
// Generate a separate config file for each engine node

engine/packages/api-public/src/health.rs

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -59,66 +59,73 @@ async fn fanout_inner(ctx: ApiCtx) -> Result<FanoutResponse> {
5959
// Require datacenter read permissions to access health status
6060
ctx.auth().await?;
6161

62-
let dcs = &ctx.config().topology().datacenters;
62+
let dcs = ctx
63+
.config()
64+
.topology()
65+
.datacenters
66+
.iter()
67+
.cloned()
68+
.collect::<Vec<_>>();
6369

6470
tracing::debug!(datacenters = dcs.len(), "starting health fanout");
6571

66-
let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| {
67-
let ctx = ctx.clone();
68-
69-
async move {
70-
let start = Instant::now();
71-
72-
if dc.datacenter_label == ctx.config().dc_label() {
73-
// Local datacenter - check directly
74-
let response = HealthResponse {
75-
runtime: "engine".to_string(),
76-
status: "ok".to_string(),
77-
version: env!("CARGO_PKG_VERSION").to_string(),
78-
};
79-
80-
DatacenterHealth {
81-
datacenter_label: dc.datacenter_label,
82-
datacenter_name: dc.name.clone(),
83-
status: HealthStatus::Ok,
84-
rtt_ms: Some(start.elapsed().as_secs_f64() * 1000.0),
85-
response: Some(response),
86-
error: None,
87-
}
88-
} else {
89-
// Remote datacenter - HTTP request
90-
match send_health_checks(&dc).await {
91-
Ok(response) => DatacenterHealth {
72+
let results = futures_util::stream::iter(dcs)
73+
.map(|dc| {
74+
let ctx = ctx.clone();
75+
76+
async move {
77+
let start = Instant::now();
78+
79+
if dc.datacenter_label == ctx.config().dc_label() {
80+
// Local datacenter - check directly
81+
let response = HealthResponse {
82+
runtime: "engine".to_string(),
83+
status: "ok".to_string(),
84+
version: env!("CARGO_PKG_VERSION").to_string(),
85+
};
86+
87+
DatacenterHealth {
9288
datacenter_label: dc.datacenter_label,
9389
datacenter_name: dc.name.clone(),
9490
status: HealthStatus::Ok,
9591
rtt_ms: Some(start.elapsed().as_secs_f64() * 1000.0),
9692
response: Some(response),
9793
error: None,
98-
},
99-
Err(err) => {
100-
tracing::warn!(
101-
?dc.datacenter_label,
102-
?err,
103-
"health check failed for datacenter"
104-
);
105-
106-
DatacenterHealth {
94+
}
95+
} else {
96+
// Remote datacenter - HTTP request
97+
match send_health_checks(&dc).await {
98+
Ok(response) => DatacenterHealth {
10799
datacenter_label: dc.datacenter_label,
108100
datacenter_name: dc.name.clone(),
109-
status: HealthStatus::Error,
101+
status: HealthStatus::Ok,
110102
rtt_ms: Some(start.elapsed().as_secs_f64() * 1000.0),
111-
response: None,
112-
error: Some(err.to_string()),
103+
response: Some(response),
104+
error: None,
105+
},
106+
Err(err) => {
107+
tracing::warn!(
108+
?dc.datacenter_label,
109+
?err,
110+
"health check failed for datacenter"
111+
);
112+
113+
DatacenterHealth {
114+
datacenter_label: dc.datacenter_label,
115+
datacenter_name: dc.name.clone(),
116+
status: HealthStatus::Error,
117+
rtt_ms: Some(start.elapsed().as_secs_f64() * 1000.0),
118+
response: None,
119+
error: Some(err.to_string()),
120+
}
113121
}
114122
}
115123
}
116124
}
117-
}
118-
}))
119-
.buffer_unordered(16)
120-
.collect::<Vec<_>>()
121-
.await;
125+
})
126+
.buffer_unordered(16)
127+
.collect::<Vec<_>>()
128+
.await;
122129

123130
tracing::debug!(results = results.len(), "health fanout completed");
124131

engine/packages/api-public/src/runner_configs/delete.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ pub async fn delete(
3939
async fn delete_inner(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result<DeleteResponse> {
4040
ctx.auth().await?;
4141

42-
let dcs = ctx.config().topology().datacenters.clone();
42+
let dcs = ctx
43+
.config()
44+
.topology()
45+
.datacenters
46+
.iter()
47+
.cloned()
48+
.collect::<Vec<_>>();
4349
futures_util::stream::iter(dcs)
4450
.map(|dc| {
4551
let ctx = ctx.clone();

engine/packages/api-util/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,13 @@ where
112112
A: Fn(u16, I, &mut R),
113113
R: Default + Send + 'static,
114114
{
115-
let dcs = ctx.config().topology().datacenters.clone();
115+
let dcs = ctx
116+
.config()
117+
.topology()
118+
.datacenters
119+
.iter()
120+
.cloned()
121+
.collect::<Vec<_>>();
116122

117123
let results = futures_util::stream::iter(dcs)
118124
.map(|dc| {

engine/packages/config/src/config/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,31 @@ impl Root {
203203
}
204204
}
205205

206+
// Set name property for map topology
207+
if let Some(topology) = &mut self.topology {
208+
match &mut topology.datacenters {
209+
DatacentersRepr::Map(m) => {
210+
for (name, dc) in m {
211+
if !dc.name.is_empty() {
212+
bail!(
213+
"datacenter '{}' cannot have the `name` property set because it is automatically derived from key",
214+
dc.name
215+
);
216+
}
217+
218+
dc.name = name.clone();
219+
}
220+
}
221+
DatacentersRepr::List(l) => {
222+
for (i, dc) in l.iter().enumerate() {
223+
if dc.name.is_empty() {
224+
bail!("datacenter at index {} must have a name", i);
225+
}
226+
}
227+
}
228+
}
229+
}
230+
206231
// Validate force_shutdown_duration is greater than worker and guard shutdown durations
207232
let worker = self.runtime.worker_shutdown_duration();
208233
let guard = self.runtime.guard_shutdown_duration();

engine/packages/config/src/config/topology.rs

Lines changed: 78 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use anyhow::{Context, Result};
22
use schemars::JsonSchema;
33
use serde::{Deserialize, Serialize};
4+
use std::collections::HashMap;
45
use url::Url;
56

67
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
78
#[serde(deny_unknown_fields)]
89
pub struct Topology {
910
/// Must be included in `datacenters`
1011
pub datacenter_label: u16,
11-
/// List of all datacenters, including this datacenter.
12-
pub datacenters: Vec<Datacenter>,
12+
/// Map of all datacenters, including this datacenter.
13+
pub datacenters: DatacentersRepr,
1314
}
1415

1516
impl Topology {
@@ -47,30 +48,88 @@ impl Default for Topology {
4748
fn default() -> Self {
4849
Topology {
4950
datacenter_label: 1,
50-
datacenters: vec![Datacenter {
51-
name: "default".into(),
52-
datacenter_label: 1,
53-
is_leader: true,
54-
public_url: Url::parse(&format!(
55-
"http://127.0.0.1:{}",
56-
crate::defaults::ports::GUARD
57-
))
58-
.unwrap(),
59-
peer_url: Url::parse(&format!(
60-
"http://127.0.0.1:{}",
61-
crate::defaults::ports::API_PEER
62-
))
63-
.unwrap(),
64-
proxy_url: None,
65-
valid_hosts: None,
66-
}],
51+
datacenters: DatacentersRepr::Map(
52+
[(
53+
"default".to_string(),
54+
Datacenter {
55+
name: "default".into(),
56+
datacenter_label: 1,
57+
is_leader: true,
58+
public_url: Url::parse(&format!(
59+
"http://127.0.0.1:{}",
60+
crate::defaults::ports::GUARD
61+
))
62+
.unwrap(),
63+
peer_url: Url::parse(&format!(
64+
"http://127.0.0.1:{}",
65+
crate::defaults::ports::API_PEER
66+
))
67+
.unwrap(),
68+
proxy_url: None,
69+
valid_hosts: None,
70+
},
71+
)]
72+
.into(),
73+
),
74+
}
75+
}
76+
}
77+
78+
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
79+
#[serde(untagged)]
80+
pub enum DatacentersRepr {
81+
Map(HashMap<String, Datacenter>),
82+
/// Deprecated.
83+
List(Vec<Datacenter>),
84+
}
85+
86+
impl DatacentersRepr {
87+
pub fn iter(&self) -> DatacentersIter<'_> {
88+
self.into_iter()
89+
}
90+
91+
pub fn len(&self) -> usize {
92+
match self {
93+
DatacentersRepr::Map(m) => m.len(),
94+
DatacentersRepr::List(l) => l.len(),
95+
}
96+
}
97+
}
98+
99+
pub enum DatacentersIter<'a> {
100+
Map(std::collections::hash_map::Values<'a, String, Datacenter>),
101+
List(std::slice::Iter<'a, Datacenter>),
102+
}
103+
104+
impl<'a> Iterator for DatacentersIter<'a> {
105+
type Item = &'a Datacenter;
106+
107+
fn next(&mut self) -> Option<Self::Item> {
108+
match self {
109+
DatacentersIter::Map(iter) => iter.next(),
110+
DatacentersIter::List(iter) => iter.next(),
111+
}
112+
}
113+
}
114+
115+
impl<'a> IntoIterator for &'a DatacentersRepr {
116+
type Item = &'a Datacenter;
117+
type IntoIter = DatacentersIter<'a>;
118+
119+
fn into_iter(self) -> Self::IntoIter {
120+
match self {
121+
DatacentersRepr::Map(map) => DatacentersIter::Map(map.values()),
122+
DatacentersRepr::List(vec) => DatacentersIter::List(vec.iter()),
67123
}
68124
}
69125
}
70126

71127
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
72128
#[serde(deny_unknown_fields)]
73129
pub struct Datacenter {
130+
/// When configuring `datacenters` via a hashmap this is automatically derived from the key. Required
131+
/// when configuring via list (which is deprecated)
132+
#[serde(default = "String::new")]
74133
pub name: String,
75134
pub datacenter_label: u16,
76135
pub is_leader: bool,

0 commit comments

Comments
 (0)