Skip to content

Commit 1f10aa8

Browse files
manasagclaude
andauthored
Add postgres configuration store for on-demand schema loading (#179)
## Summary - Adds alternate configuration mode where the connector reads collection schemas from a PostgreSQL config store (`config_tables.raw_schema`) instead of filesystem JSON files - Configuration is fetched **per-request** rather than cached at startup, enabling dynamic schema management without restarts - New `configuration-store` crate with `ConfigurationMode` resolver and `PostgresConfigurationStore` backed by `deadpool-postgres` connection pool - TLS support via `native-tls` for postgres connections (`?sslmode=require` in URL) - Credential-safe `Debug` impls (postgres URL redacted in logs) - **Bumps Rust toolchain from 1.83.0 to 1.85.0** to fix CI build failure (`wit-bindgen 0.51.0` requires edition2024) - **Updates nixpkgs and crane flake inputs** to fix `cargo-audit` failure parsing CVSS 4.0 scores in newer RustSec advisories ## Env vars | Variable | Purpose | |----------|---------| | `HASURA_CONFIGURATION_MODE` | `json` (default) or a postgres connection URL | | `HASURA_CONFIGURATION_CONNECTOR_ID` | Required in postgres mode — scopes config queries | | `HASURA_CONFIGURATION_SCHEMA` | Postgres schema name (default: `connector_config`) | ## Behavior in postgres mode - **`/query`**: fetches schemas for all collections referenced in the request (primary + relationship targets) from `config_tables.raw_schema`, merges them into a single configuration - **`/schema`**: returns empty response (schema managed externally) - **`/mutation`**: returns 501 Not Implemented (native mutations not in postgres store) - **Connection URI**: read from `config_metadata` table, falls back to `MONGODB_DATABASE_URI` env var ## Changes in this update - **Relationship support**: queries involving relationships (e.g. `Album` → `Track` → `Genre`) now work correctly. The connector extracts all collection names from the `QueryRequest` (primary collection + all `collection_relationships` targets) and fetches/merges their schemas from the config store. - **Schema fix**: `config_tables` query uses `is_deleted = false` instead of `deleted_at IS NULL` to match the actual table schema. ## Test plan - [x] All 145 existing unit tests pass - [x] All 27 connector-direct integration tests pass in postgres mode (grouping, filtering, nested collections, relationships, remote relationships, expressions) - [x] Relationship queries (joins, aggregates over related collections, groups through relationships) verified working - [x] Manual end-to-end test with local PostgreSQL + MongoDB containers - [ ] Engine-dependent integration tests (require v3-engine, not tested locally) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 24ee43d commit 1f10aa8

8 files changed

Lines changed: 828 additions & 98 deletions

File tree

Cargo.lock

Lines changed: 284 additions & 70 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ version = "1.8.1"
55
members = [
66
"crates/cli",
77
"crates/configuration",
8+
"crates/configuration-store",
89
"crates/integration-tests",
910
"crates/mongodb-agent-common",
1011
"crates/mongodb-connector",
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[package]
2+
name = "configuration-store"
3+
edition = "2021"
4+
version.workspace = true
5+
6+
[dependencies]
7+
configuration = { path = "../configuration" }
8+
9+
anyhow = "1"
10+
deadpool-postgres = { version = "0.14", features = ["serde"] }
11+
native-tls = "0.2"
12+
# postgres-protocol 0.6.8+ depends on rand 0.9 -> getrandom 0.3 -> wit-bindgen
13+
# 0.51.0 which requires Rust edition 2024 (not supported by our Rust 1.83 toolchain)
14+
postgres-protocol = ">=0.6, <0.6.8"
15+
postgres-native-tls = "0.5"
16+
serde = { workspace = true }
17+
serde_json = { workspace = true }
18+
tokio = { version = "1", features = ["rt"] }
19+
tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] }
20+
tracing = "0.1"
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
use std::env;
2+
3+
const CONFIGURATION_MODE_ENV: &str = "HASURA_CONFIGURATION_MODE";
4+
const CONNECTOR_ID_ENV: &str = "HASURA_CONFIGURATION_CONNECTOR_ID";
5+
const CONFIGURATION_SCHEMA_ENV: &str = "HASURA_CONFIGURATION_SCHEMA";
6+
const DEFAULT_SCHEMA: &str = "connector_config";
7+
8+
#[derive(Clone)]
9+
pub enum ConfigurationMode {
10+
/// Read configuration from the filesystem (default behavior)
11+
Json,
12+
/// Read configuration from a PostgreSQL database
13+
Postgres {
14+
url: String,
15+
connector_id: String,
16+
schema: String,
17+
},
18+
}
19+
20+
impl std::fmt::Debug for ConfigurationMode {
21+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22+
match self {
23+
ConfigurationMode::Json => write!(f, "Json"),
24+
ConfigurationMode::Postgres {
25+
connector_id,
26+
schema,
27+
..
28+
} => f
29+
.debug_struct("Postgres")
30+
.field("url", &"<redacted>")
31+
.field("connector_id", connector_id)
32+
.field("schema", schema)
33+
.finish(),
34+
}
35+
}
36+
}
37+
38+
/// Resolve configuration mode from environment variables.
39+
///
40+
/// - `HASURA_CONFIGURATION_MODE`: If unset or "json", uses file-based config.
41+
/// If set to a postgres URL, uses postgres-based config.
42+
/// - `HASURA_CONFIGURATION_CONNECTOR_ID`: Required when using postgres mode.
43+
/// - `HASURA_CONFIGURATION_SCHEMA`: Postgres schema name (default: "connector_config").
44+
pub fn resolve_configuration_mode() -> anyhow::Result<ConfigurationMode> {
45+
resolve_from_values(
46+
env::var(CONFIGURATION_MODE_ENV).ok().as_deref(),
47+
env::var(CONNECTOR_ID_ENV).ok().as_deref(),
48+
env::var(CONFIGURATION_SCHEMA_ENV).ok().as_deref(),
49+
)
50+
}
51+
52+
fn resolve_from_values(
53+
mode: Option<&str>,
54+
connector_id: Option<&str>,
55+
schema: Option<&str>,
56+
) -> anyhow::Result<ConfigurationMode> {
57+
let mode = mode.unwrap_or("");
58+
59+
if mode.is_empty() || mode.eq_ignore_ascii_case("json") {
60+
return Ok(ConfigurationMode::Json);
61+
}
62+
63+
// Treat the value as a postgres connection URL
64+
let url = mode.to_string();
65+
66+
let connector_id = connector_id
67+
.filter(|s| !s.is_empty())
68+
.ok_or_else(|| {
69+
anyhow::anyhow!(
70+
"{CONNECTOR_ID_ENV} is required when {CONFIGURATION_MODE_ENV} is set to a postgres URL"
71+
)
72+
})?
73+
.to_string();
74+
75+
let schema = schema
76+
.filter(|s| !s.is_empty())
77+
.unwrap_or(DEFAULT_SCHEMA)
78+
.to_string();
79+
80+
Ok(ConfigurationMode::Postgres {
81+
url,
82+
connector_id,
83+
schema,
84+
})
85+
}
86+
87+
#[cfg(test)]
88+
mod tests {
89+
use super::*;
90+
91+
#[test]
92+
fn default_is_json_mode() {
93+
let mode = resolve_from_values(None, None, None).unwrap();
94+
assert!(matches!(mode, ConfigurationMode::Json));
95+
}
96+
97+
#[test]
98+
fn empty_mode_is_json() {
99+
let mode = resolve_from_values(Some(""), None, None).unwrap();
100+
assert!(matches!(mode, ConfigurationMode::Json));
101+
}
102+
103+
#[test]
104+
fn explicit_json_mode() {
105+
let mode = resolve_from_values(Some("json"), None, None).unwrap();
106+
assert!(matches!(mode, ConfigurationMode::Json));
107+
}
108+
109+
#[test]
110+
fn json_mode_case_insensitive() {
111+
let mode = resolve_from_values(Some("JSON"), None, None).unwrap();
112+
assert!(matches!(mode, ConfigurationMode::Json));
113+
}
114+
115+
#[test]
116+
fn postgres_mode_requires_connector_id() {
117+
let result = resolve_from_values(Some("postgres://localhost/config"), None, None);
118+
assert!(result.is_err());
119+
}
120+
121+
#[test]
122+
fn postgres_mode_rejects_empty_connector_id() {
123+
let result = resolve_from_values(Some("postgres://localhost/config"), Some(""), None);
124+
assert!(result.is_err());
125+
}
126+
127+
#[test]
128+
fn postgres_mode_with_connector_id() {
129+
let mode = resolve_from_values(
130+
Some("postgres://localhost/config"),
131+
Some("my-connector"),
132+
None,
133+
)
134+
.unwrap();
135+
match mode {
136+
ConfigurationMode::Postgres {
137+
url,
138+
connector_id,
139+
schema,
140+
} => {
141+
assert_eq!(url, "postgres://localhost/config");
142+
assert_eq!(connector_id, "my-connector");
143+
assert_eq!(schema, "connector_config");
144+
}
145+
_ => panic!("expected Postgres mode"),
146+
}
147+
}
148+
149+
#[test]
150+
fn postgres_mode_with_custom_schema() {
151+
let mode = resolve_from_values(
152+
Some("postgres://localhost/config"),
153+
Some("my-connector"),
154+
Some("custom_schema"),
155+
)
156+
.unwrap();
157+
match mode {
158+
ConfigurationMode::Postgres { schema, .. } => {
159+
assert_eq!(schema, "custom_schema");
160+
}
161+
_ => panic!("expected Postgres mode"),
162+
}
163+
}
164+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod configuration_mode;
2+
mod postgres_store;
3+
4+
pub use configuration_mode::{resolve_configuration_mode, ConfigurationMode};
5+
pub use postgres_store::{ConnectionUri, PostgresConfigurationStore, DEFAULT_DATABASE_URI_ENV_VAR};
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
use anyhow::Context as _;
2+
use configuration::{serialized::Schema, Configuration, ConfigurationOptions};
3+
use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
4+
use postgres_native_tls::MakeTlsConnector;
5+
6+
const SOURCE: &str = "MONGODB";
7+
8+
/// Reads connector configuration from a PostgreSQL config store,
9+
/// using the shared config_tables schema with a `raw_schema` column
10+
/// that stores the connector's native schema JSON per collection.
11+
#[derive(Clone)]
12+
pub struct PostgresConfigurationStore {
13+
pool: Pool,
14+
connector_id: String,
15+
schema: String,
16+
}
17+
18+
impl std::fmt::Debug for PostgresConfigurationStore {
19+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20+
f.debug_struct("PostgresConfigurationStore")
21+
.field("connector_id", &self.connector_id)
22+
.field("schema", &self.schema)
23+
.finish()
24+
}
25+
}
26+
27+
impl PostgresConfigurationStore {
28+
pub fn new(url: String, connector_id: String, schema: String) -> anyhow::Result<Self> {
29+
let tls_connector = native_tls::TlsConnector::builder()
30+
.build()
31+
.context("failed to build TLS connector")?;
32+
let tls = MakeTlsConnector::new(tls_connector);
33+
34+
let pg_config: tokio_postgres::Config = url
35+
.parse()
36+
.context("failed to parse postgres connection URL")?;
37+
38+
let manager_config = ManagerConfig {
39+
recycling_method: RecyclingMethod::Fast,
40+
};
41+
let manager = Manager::from_config(pg_config, tls, manager_config);
42+
let pool = Pool::builder(manager)
43+
.max_size(4)
44+
.build()
45+
.context("failed to build postgres connection pool")?;
46+
47+
Ok(Self {
48+
pool,
49+
connector_id,
50+
schema,
51+
})
52+
}
53+
54+
async fn get_client(&self) -> anyhow::Result<deadpool_postgres::Client> {
55+
self.pool
56+
.get()
57+
.await
58+
.map_err(|e| anyhow::anyhow!("failed to get postgres connection from pool: {e}"))
59+
}
60+
61+
/// Read the schema for a single collection by name.
62+
/// Returns a Configuration containing only that collection and its associated object types.
63+
pub async fn read_collection_configuration(
64+
&self,
65+
collection_name: &str,
66+
) -> anyhow::Result<Configuration> {
67+
self.read_collections_configuration(&[collection_name])
68+
.await
69+
}
70+
71+
/// Read schemas for multiple collections by name and merge them into a single Configuration.
72+
/// This is needed when a query involves relationships to other collections.
73+
pub async fn read_collections_configuration(
74+
&self,
75+
collection_names: &[&str],
76+
) -> anyhow::Result<Configuration> {
77+
let client = self.get_client().await?;
78+
79+
let mut merged_schema = Schema::default();
80+
81+
for &collection_name in collection_names {
82+
let query = format!(
83+
r#"SELECT name, raw_schema
84+
FROM "{}".config_tables
85+
WHERE UPPER(source) = UPPER($1)
86+
AND connector_id = $2
87+
AND name = $3
88+
AND is_deleted = false
89+
ORDER BY updated_at DESC
90+
LIMIT 1"#,
91+
self.schema
92+
);
93+
94+
let row = client
95+
.query_opt(&query, &[&SOURCE, &self.connector_id, &collection_name])
96+
.await
97+
.with_context(|| {
98+
format!("failed to query config_tables for collection {collection_name}")
99+
})?
100+
.ok_or_else(|| {
101+
anyhow::anyhow!("collection {collection_name} not found in config store")
102+
})?;
103+
104+
let name: String = row.get(0);
105+
let raw_schema_json: serde_json::Value = row.get(1);
106+
let schema: Schema = serde_json::from_value(raw_schema_json)
107+
.with_context(|| format!("failed to parse raw_schema for collection {name}"))?;
108+
109+
merged_schema.collections.extend(schema.collections);
110+
merged_schema.object_types.extend(schema.object_types);
111+
}
112+
113+
Configuration::validate(
114+
merged_schema,
115+
Default::default(),
116+
Default::default(),
117+
ConfigurationOptions::default(),
118+
)
119+
}
120+
121+
/// Read the connection URI from config_metadata.
122+
/// Returns the value if stored as {"value": "..."} or the env var name if {"variable": "..."}.
123+
/// Falls back to MONGODB_DATABASE_URI env var if not found.
124+
pub async fn read_connection_uri(&self) -> anyhow::Result<ConnectionUri> {
125+
let client = self.get_client().await?;
126+
127+
let query = format!(
128+
r#"SELECT value
129+
FROM "{}".config_metadata
130+
WHERE UPPER(source) = UPPER($1)
131+
AND connector_id = $2
132+
AND key = $3
133+
ORDER BY updated_at DESC
134+
LIMIT 1"#,
135+
self.schema
136+
);
137+
138+
let row = client
139+
.query_opt(&query, &[&SOURCE, &self.connector_id, &"connection_uri"])
140+
.await
141+
.context("failed to query config_metadata for connection_uri")?;
142+
143+
match row {
144+
Some(row) => {
145+
let value: serde_json::Value = row.get(0);
146+
let uri: ConnectionUri =
147+
serde_json::from_value(value).context("failed to parse connection_uri")?;
148+
Ok(uri)
149+
}
150+
None => Ok(ConnectionUri::Variable {
151+
variable: DEFAULT_DATABASE_URI_ENV_VAR.to_string(),
152+
}),
153+
}
154+
}
155+
}
156+
157+
/// Connection URI as stored in config_metadata.
158+
#[derive(Debug, serde::Deserialize)]
159+
#[serde(untagged)]
160+
pub enum ConnectionUri {
161+
/// Direct value: {"value": "mongodb://..."}
162+
Value { value: String },
163+
/// Environment variable reference: {"variable": "MONGODB_DATABASE_URI"}
164+
Variable { variable: String },
165+
}
166+
167+
impl ConnectionUri {
168+
/// Resolve to the actual URI string.
169+
pub fn resolve(&self) -> anyhow::Result<String> {
170+
match self {
171+
ConnectionUri::Value { value } => Ok(value.clone()),
172+
ConnectionUri::Variable { variable } => std::env::var(variable)
173+
.map_err(|_| anyhow::anyhow!("environment variable {variable} is not set")),
174+
}
175+
}
176+
}
177+
178+
pub const DEFAULT_DATABASE_URI_ENV_VAR: &str = "MONGODB_DATABASE_URI";

crates/mongodb-connector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ version.workspace = true
55

66
[dependencies]
77
configuration = { path = "../configuration" }
8+
configuration-store = { path = "../configuration-store" }
89
mongodb-agent-common = { path = "../mongodb-agent-common" }
910
mongodb-support = { path = "../mongodb-support" }
1011
ndc-query-plan = { path = "../ndc-query-plan" }

0 commit comments

Comments
 (0)