From 19d633cbe3f35cc0960ed0b62190229c655a0a67 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Tue, 11 Jun 2024 15:54:13 -0400 Subject: [PATCH 1/2] allow destructive schema changes, when flag is provided --- crates/corro-agent/src/agent/tests.rs | 6 +++- crates/corro-agent/src/api/peer.rs | 10 +++++-- crates/corro-agent/src/api/public/mod.rs | 31 ++++++++++++++++++--- crates/corro-agent/src/api/public/pubsub.rs | 2 ++ crates/corro-client/src/lib.rs | 15 ++++++++-- crates/corro-tests/src/lib.rs | 2 +- crates/corro-tpl/src/lib.rs | 2 +- crates/corro-types/src/pubsub.rs | 6 ++-- crates/corro-types/src/schema.rs | 20 +++++++------ crates/corrosion/src/command/agent.rs | 2 +- crates/corrosion/src/command/reload.rs | 14 +++++++--- crates/corrosion/src/main.rs | 14 ++++++++-- 12 files changed, 92 insertions(+), 32 deletions(-) diff --git a/crates/corro-agent/src/agent/tests.rs b/crates/corro-agent/src/agent/tests.rs index c4dcaf944..98f9b9d13 100644 --- a/crates/corro-agent/src/agent/tests.rs +++ b/crates/corro-agent/src/agent/tests.rs @@ -5,7 +5,7 @@ use std::{ time::{Duration, Instant}, }; -use axum::Extension; +use axum::{extract::Query, Extension}; use futures::{future, stream::FuturesUnordered, StreamExt, TryStreamExt}; use hyper::StatusCode; use rand::{ @@ -783,6 +783,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> { // setup the schema, for both nodes let (status_code, _body) = api_v1_db_schema( Extension(ta1.agent.clone()), + Query(None), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -791,6 +792,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> { let (status_code, _body) = api_v1_db_schema( Extension(ta2.agent.clone()), + Query(None), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -1912,6 +1914,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> { // setup the schema, for both nodes let (status_code, _body) = api_v1_db_schema( Extension(ta1.agent.clone()), + Query(None), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -1920,6 +1923,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> { let (status_code, _body) = api_v1_db_schema( Extension(ta2.agent.clone()), + Query(None), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 88b040d13..5d38fe8c2 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1605,7 +1605,7 @@ pub async fn serve_sync( #[cfg(test)] mod tests { - use axum::{Extension, Json}; + use axum::{extract::Query, Extension, Json}; use camino::Utf8PathBuf; use corro_tests::TEST_SCHEMA; use corro_types::{ @@ -1645,8 +1645,12 @@ mod tests { ) .await?; - let (status_code, _res) = - api_v1_db_schema(Extension(agent.clone()), Json(vec![TEST_SCHEMA.to_owned()])).await; + let (status_code, _res) = api_v1_db_schema( + Extension(agent.clone()), + Query(None), + Json(vec![TEST_SCHEMA.to_owned()]), + ) + .await; assert_eq!(status_code, StatusCode::OK); diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index af1b72fa5..59be7aee8 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -3,7 +3,7 @@ use std::{ time::{Duration, Instant}, }; -use axum::{response::IntoResponse, Extension}; +use axum::{extract::Query, response::IntoResponse, Extension}; use bytes::{BufMut, BytesMut}; use compact_str::ToCompactString; use corro_types::{ @@ -19,6 +19,7 @@ use corro_types::{ }; use hyper::StatusCode; use rusqlite::{params_from_iter, ToSql, Transaction}; +use serde::Deserialize; use spawn::spawn_counted; use tokio::{ sync::{ @@ -440,7 +441,11 @@ pub async fn api_v1_queries( } } -async fn execute_schema(agent: &Agent, statements: Vec) -> eyre::Result<()> { +async fn execute_schema( + agent: &Agent, + statements: Vec, + allow_destructive: bool, +) -> eyre::Result<()> { let new_sql: String = statements.join(";"); let partial_schema = parse_sql(&new_sql)?; @@ -465,7 +470,7 @@ async fn execute_schema(agent: &Agent, statements: Vec) -> eyre::Result< block_in_place(|| { let tx = conn.immediate_transaction()?; - apply_schema(&tx, &schema_write, &mut new_schema)?; + apply_schema(&tx, &schema_write, &mut new_schema, allow_destructive)?; for tbl_name in partial_schema.tables.keys() { tx.execute("DELETE FROM __corro_schema WHERE tbl_name = ?", [tbl_name])?; @@ -484,8 +489,15 @@ async fn execute_schema(agent: &Agent, statements: Vec) -> eyre::Result< Ok(()) } +#[derive(Deserialize)] +pub struct SchemaQuery { + #[serde(default)] + allow_destructive: bool, +} + pub async fn api_v1_db_schema( Extension(agent): Extension, + query: Query>, axum::extract::Json(statements): axum::extract::Json>, ) -> (StatusCode, axum::Json) { if statements.is_empty() { @@ -503,7 +515,13 @@ pub async fn api_v1_db_schema( let start = Instant::now(); - if let Err(e) = execute_schema(&agent, statements).await { + if let Err(e) = execute_schema( + &agent, + statements, + query.0.map(|q| q.allow_destructive).unwrap_or(false), + ) + .await + { error!("could not merge schemas: {e}"); return ( StatusCode::INTERNAL_SERVER_ERROR, @@ -644,6 +662,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), + Query(None), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -726,6 +745,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), + Query(None), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -835,6 +855,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), + Query(None), axum::Json(vec![ "CREATE TABLE tests (id BIGINT NOT NULL PRIMARY KEY, foo TEXT);".into(), ]), @@ -866,6 +887,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), + Query(None), axum::Json(vec![ "CREATE TABLE tests2 (id BIGINT NOT NULL PRIMARY KEY, foo TEXT);".into(), "CREATE TABLE tests (id BIGINT NOT NULL PRIMARY KEY, foo TEXT);".into(), @@ -939,6 +961,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), + Query(None), axum::Json(vec![create_stmt.into()]), ) .await; diff --git a/crates/corro-agent/src/api/public/pubsub.rs b/crates/corro-agent/src/api/public/pubsub.rs index 105ecf373..dddfccd4c 100644 --- a/crates/corro-agent/src/api/public/pubsub.rs +++ b/crates/corro-agent/src/api/public/pubsub.rs @@ -865,6 +865,7 @@ async fn forward_bytes_to_body_sender( #[cfg(test)] mod tests { + use axum::extract::Query; use corro_types::{ api::{ChangeId, RowId}, config::Config, @@ -901,6 +902,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), + Query(None), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; diff --git a/crates/corro-client/src/lib.rs b/crates/corro-client/src/lib.rs index 6380ae2ee..d18f08c42 100644 --- a/crates/corro-client/src/lib.rs +++ b/crates/corro-client/src/lib.rs @@ -210,10 +210,18 @@ impl CorrosionApiClient { Ok(serde_json::from_slice(&bytes)?) } - pub async fn schema(&self, statements: &[Statement]) -> Result { + pub async fn schema( + &self, + statements: &[Statement], + allow_destructive: bool, + ) -> Result { + let mut url = format!("http://{}/v1/migrations", self.api_addr); + if allow_destructive { + url += "?allow_destructive=true"; + } let req = hyper::Request::builder() .method(hyper::Method::POST) - .uri(format!("http://{}/v1/migrations", self.api_addr)) + .uri(url) .header(hyper::header::CONTENT_TYPE, "application/json") .header(hyper::header::ACCEPT, "application/json") .body(Body::from(serde_json::to_vec(statements)?))?; @@ -232,6 +240,7 @@ impl CorrosionApiClient { pub async fn schema_from_paths>( &self, schema_paths: &[P], + allow_destructive: bool, ) -> Result, Error> { let mut statements = vec![]; @@ -312,7 +321,7 @@ impl CorrosionApiClient { return Ok(None); } - Ok(Some(self.schema(&statements).await?)) + Ok(Some(self.schema(&statements, allow_destructive).await?)) } } diff --git a/crates/corro-tests/src/lib.rs b/crates/corro-tests/src/lib.rs index 480007b33..8cdc25811 100644 --- a/crates/corro-tests/src/lib.rs +++ b/crates/corro-tests/src/lib.rs @@ -82,7 +82,7 @@ pub async fn launch_test_agent Result Result<(), ApplySchemaError> { if let Some(name) = schema .tables @@ -277,10 +278,11 @@ pub fn apply_schema( .difference(&new_schema.tables.keys().collect::>()) .next() { - // TODO: add options and check flag - return Err(ApplySchemaError::DropTableWithoutDestructiveFlag( - (*name).clone(), - )); + if !allow_destructive { + return Err(ApplySchemaError::DropTableWithoutDestructiveFlag( + (*name).clone(), + )); + } } let mut schema_to_merge = Schema::default(); @@ -417,10 +419,12 @@ pub fn apply_schema( debug!("dropped cols: {dropped_cols:?}"); if let Some(col_name) = dropped_cols.into_iter().next() { - return Err(ApplySchemaError::RemoveColumnWithoutDestructiveFlag( - name.clone(), - col_name.clone(), - )); + if !allow_destructive { + return Err(ApplySchemaError::RemoveColumnWithoutDestructiveFlag( + name.clone(), + col_name.clone(), + )); + } } // 2. check for changed columns diff --git a/crates/corrosion/src/command/agent.rs b/crates/corrosion/src/command/agent.rs index e72fd7693..48889fbfc 100644 --- a/crates/corrosion/src/command/agent.rs +++ b/crates/corrosion/src/command/agent.rs @@ -70,7 +70,7 @@ pub async fn run(config: Config, config_path: &Utf8PathBuf) -> eyre::Result<()> if !config.db.schema_paths.is_empty() { let client = corro_client::CorrosionApiClient::new(*config.api.bind_addr.first().unwrap()); match client - .schema_from_paths(config.db.schema_paths.as_slice()) + .schema_from_paths(config.db.schema_paths.as_slice(), false) .await { Ok(Some(res)) => { diff --git a/crates/corrosion/src/command/reload.rs b/crates/corrosion/src/command/reload.rs index 47d94a684..4d59e6ab3 100644 --- a/crates/corrosion/src/command/reload.rs +++ b/crates/corrosion/src/command/reload.rs @@ -3,10 +3,16 @@ use std::{net::SocketAddr, path::Path}; use corro_client::CorrosionApiClient; use tracing::info; -pub async fn run>(api_addr: SocketAddr, schema_paths: &[P]) -> eyre::Result<()> { +pub async fn run>( + api_addr: SocketAddr, + schema_paths: &[P], + allow_destructive: bool, +) -> eyre::Result<()> { let client = CorrosionApiClient::new(api_addr); - client.schema_from_paths(schema_paths).await?; + client + .schema_from_paths(schema_paths, allow_destructive) + .await?; info!("Successfully reloaded Corrosion's schema from paths!"); Ok(()) } @@ -27,7 +33,7 @@ mod tests { let client = corro_client::CorrosionApiClient::new(ta.agent.api_addr()); client - .schema_from_paths(&ta.agent.config().db.schema_paths) + .schema_from_paths(&ta.agent.config().db.schema_paths, false) .await?; let mut conf = ta.agent.config().as_ref().clone(); @@ -46,7 +52,7 @@ mod tests { println!("conf: {conf:?}"); - run(ta.agent.api_addr(), &conf.db.schema_paths).await?; + run(ta.agent.api_addr(), &conf.db.schema_paths, false).await?; assert!(ta.agent.schema().read().tables.contains_key("blah")); diff --git a/crates/corrosion/src/main.rs b/crates/corrosion/src/main.rs index 908cd46e9..f8c301225 100644 --- a/crates/corrosion/src/main.rs +++ b/crates/corrosion/src/main.rs @@ -443,8 +443,13 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> { } } } - Command::Reload => { - command::reload::run(cli.api_addr()?, &cli.config()?.db.schema_paths).await? + Command::Reload { allow_destructive } => { + command::reload::run( + cli.api_addr()?, + &cli.config()?.db.schema_paths, + *allow_destructive, + ) + .await? } Command::Sync(SyncCommand::Generate) => { let mut conn = AdminConn::connect(cli.admin_path()).await?; @@ -659,7 +664,10 @@ enum Command { }, /// Reload the config - Reload, + Reload { + #[arg(long, default_value = "false")] + allow_destructive: bool, + }, /// Sync-related commands #[command(subcommand)] From 5bef9a45248e01cdccf19bfc9a7dda2077ab356f Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Tue, 11 Jun 2024 16:01:14 -0400 Subject: [PATCH 2/2] use better query param struct --- crates/corro-agent/src/agent/tests.rs | 8 ++++---- crates/corro-agent/src/api/peer.rs | 2 +- crates/corro-agent/src/api/public/mod.rs | 22 ++++++++------------- crates/corro-agent/src/api/public/pubsub.rs | 2 +- 4 files changed, 14 insertions(+), 20 deletions(-) diff --git a/crates/corro-agent/src/agent/tests.rs b/crates/corro-agent/src/agent/tests.rs index 98f9b9d13..d70f22a32 100644 --- a/crates/corro-agent/src/agent/tests.rs +++ b/crates/corro-agent/src/agent/tests.rs @@ -783,7 +783,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> { // setup the schema, for both nodes let (status_code, _body) = api_v1_db_schema( Extension(ta1.agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -792,7 +792,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> { let (status_code, _body) = api_v1_db_schema( Extension(ta2.agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -1914,7 +1914,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> { // setup the schema, for both nodes let (status_code, _body) = api_v1_db_schema( Extension(ta1.agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -1923,7 +1923,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> { let (status_code, _body) = api_v1_db_schema( Extension(ta2.agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 5d38fe8c2..5b84b08d3 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1647,7 +1647,7 @@ mod tests { let (status_code, _res) = api_v1_db_schema( Extension(agent.clone()), - Query(None), + Query(Default::default()), Json(vec![TEST_SCHEMA.to_owned()]), ) .await; diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index 59be7aee8..64a30dc12 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -489,7 +489,7 @@ async fn execute_schema( Ok(()) } -#[derive(Deserialize)] +#[derive(Deserialize, Default)] pub struct SchemaQuery { #[serde(default)] allow_destructive: bool, @@ -497,7 +497,7 @@ pub struct SchemaQuery { pub async fn api_v1_db_schema( Extension(agent): Extension, - query: Query>, + query: Query, axum::extract::Json(statements): axum::extract::Json>, ) -> (StatusCode, axum::Json) { if statements.is_empty() { @@ -515,13 +515,7 @@ pub async fn api_v1_db_schema( let start = Instant::now(); - if let Err(e) = execute_schema( - &agent, - statements, - query.0.map(|q| q.allow_destructive).unwrap_or(false), - ) - .await - { + if let Err(e) = execute_schema(&agent, statements, query.0.allow_destructive).await { error!("could not merge schemas: {e}"); return ( StatusCode::INTERNAL_SERVER_ERROR, @@ -662,7 +656,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -745,7 +739,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await; @@ -855,7 +849,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![ "CREATE TABLE tests (id BIGINT NOT NULL PRIMARY KEY, foo TEXT);".into(), ]), @@ -887,7 +881,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![ "CREATE TABLE tests2 (id BIGINT NOT NULL PRIMARY KEY, foo TEXT);".into(), "CREATE TABLE tests (id BIGINT NOT NULL PRIMARY KEY, foo TEXT);".into(), @@ -961,7 +955,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![create_stmt.into()]), ) .await; diff --git a/crates/corro-agent/src/api/public/pubsub.rs b/crates/corro-agent/src/api/public/pubsub.rs index dddfccd4c..4a7e015b7 100644 --- a/crates/corro-agent/src/api/public/pubsub.rs +++ b/crates/corro-agent/src/api/public/pubsub.rs @@ -902,7 +902,7 @@ mod tests { let (status_code, _body) = api_v1_db_schema( Extension(agent.clone()), - Query(None), + Query(Default::default()), axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), ) .await;