Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 63 additions & 36 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ rangemap = { version = "1.5.1", features = ["serde1"] }
rcgen = { version = "0.11.1", features = ["x509-parser"] }
reqwest = { version = "0.12", default-features = false, features = ["http2", "rustls-tls", "rustls-tls-webpki-roots"] }
rhai = { version = "1.15.1", features = ["sync"] }
rusqlite = { version = "0.33.0", features = ["serde_json", "time", "bundled", "uuid", "array", "load_extension", "column_decltype", "vtab", "functions", "chrono", "series", "trace"] }
rusqlite = { version = "0.38.0", features = ["serde_json", "time", "bundled", "uuid", "array", "load_extension", "column_decltype", "vtab", "functions", "chrono", "series", "trace", "cache", "fallible_uint"] }
rustls = { version = "0.23.0", default-features = false, features = ["ring"] }
seahash = "4.1.0"
serde = "1.0.159"
Expand Down
69 changes: 17 additions & 52 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use corro_types::{
channel::CorroReceiver,
config::AuthzConfig,
pubsub::SubsManager,
sqlite::{unnest_param, INSERT_CRSQL_CHANGES_QUERY},
sqlite::unnest_param,
updates::{match_changes, match_changes_from_db_version},
};

Expand Down Expand Up @@ -1288,7 +1288,15 @@ pub fn process_complete_version<T: Deref<Target = rusqlite::Connection> + Commit

// Insert all the changes in a single statement
// This will return a non zero rowid only if the change impacted the database
let mut stmt = sp.prepare_cached(INSERT_CRSQL_CHANGES_QUERY)?;
let mut stmt = sp.prepare_cached(
r#"
INSERT INTO crsql_changes ("table", pk, cid, val, col_version, db_version, site_id, cl, seq, ts)
SELECT value0, value1, value2, value3, value4, value5, value6, value7, value8, value9
FROM unnest(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
-- WARNING: This returns a row BEFORE inserting not after
RETURNING db_version, seq, last_insert_rowid()
"#,
)?;
trace!("inserting {:?} changes into crsql_changes", changes);
let params = params![
unnest_param(changes.iter().map(|c| c.table.as_str())),
Expand All @@ -1306,65 +1314,22 @@ pub fn process_complete_version<T: Deref<Target = rusqlite::Connection> + Commit
.query_map(params, |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
.collect::<rusqlite::Result<Vec<(CrsqlDbVersion, CrsqlSeq, i64)>>>()?;

// Ignore this, it's just for debugging if we hit this very rare VDBE bug in production
let last_rowids_len = last_rowids.len();
if last_rowids_len != len {
// This should never happen, but if it does, i need data for debugging
let query_plan = sp
.prepare(&format!("EXPLAIN QUERY PLAN {INSERT_CRSQL_CHANGES_QUERY}",))?
.query_map(params, |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
})?
// id, parent, notused, detail
.collect::<rusqlite::Result<Vec<(i32, i32, i32, String)>>>()?;
let vdbe_program = sp
.prepare(&format!("EXPLAIN {INSERT_CRSQL_CHANGES_QUERY}"))?
.query_map(params, |row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get(2)?,
row.get(3)?,
row.get(4)?,
row.get(5)?,
row.get(6)?,
row.get(7)?,
))
})?
.collect::<rusqlite::Result<
Vec<(
i32, // addr
String, // opcode
i32, // p1
i32, // p2
i32, // p3
Option<String>, // p4
Option<i32>, // p5
Option<String>, // comment
)>,
>>()?;
let details = json!({
"changes_len": len,
"returned_rowids_len": last_rowids.len(),
"query_plan": query_plan,
"vdbe_program": vdbe_program,
});
error!("Possible returning statement BUG: {details}");
assert_unreachable!("Possible returning statement BUG:", &details);
}

debug!("successfully inserted {len} changes into crsql_changes");
trace!("last_rowids before shift: {last_rowids:?}");

// RETURNING returns rows BEFORE the insert, not after
// so the rowids we get will be shifted by one
// we need to shift them back
for i in 0..(last_rowids_len - 1) {
last_rowids[i].2 = last_rowids[i + 1].2;
if last_rowids_len > 0 {
for i in 0..(last_rowids_len - 1) {
last_rowids[i].2 = last_rowids[i + 1].2;
}
last_rowids[last_rowids_len - 1].2 = sp
.prepare_cached("SELECT last_insert_rowid()")?
.query_row(params![], |row| row.get(0))?;
}
last_rowids[last_rowids_len - 1].2 = sp
.prepare_cached("SELECT last_insert_rowid()")?
.query_row(params![], |row| row.get(0))?;

trace!("last_rowids after shift: {last_rowids:?}");

Expand Down
4 changes: 2 additions & 2 deletions crates/corro-pg/src/vtab/pg_class.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{marker::PhantomData, os::raw::c_int};

use rusqlite::vtab::{
sqlite3_vtab, sqlite3_vtab_cursor, IndexInfo, VTab, VTabConnection, VTabCursor, Values,
sqlite3_vtab, sqlite3_vtab_cursor, Filters, IndexInfo, VTab, VTabConnection, VTabCursor,
};

#[repr(C)]
Expand Down Expand Up @@ -90,7 +90,7 @@ unsafe impl VTabCursor for PgClassTableCursor<'_> {
&mut self,
_idx_num: c_int,
_idx_str: Option<&str>,
_args: &Values<'_>,
_args: &Filters<'_>,
) -> rusqlite::Result<()> {
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-pg/src/vtab/pg_database.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{marker::PhantomData, os::raw::c_int, sync::Arc};

use rusqlite::vtab::{
sqlite3_vtab, sqlite3_vtab_cursor, IndexInfo, VTab, VTabConnection, VTabCursor, Values,
sqlite3_vtab, sqlite3_vtab_cursor, Filters, IndexInfo, VTab, VTabConnection, VTabCursor,
};

pub struct PgDatabase {
Expand Down Expand Up @@ -117,7 +117,7 @@ unsafe impl VTabCursor for PgDatabaseTableCursor<'_> {
&mut self,
_idx_num: c_int,
_idx_str: Option<&str>,
_args: &Values<'_>,
_args: &Filters<'_>,
) -> rusqlite::Result<()> {
self.row_id = 0;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-pg/src/vtab/pg_namespace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{marker::PhantomData, os::raw::c_int};

use rusqlite::vtab::{
sqlite3_vtab, sqlite3_vtab_cursor, IndexInfo, VTab, VTabConnection, VTabCursor, Values,
sqlite3_vtab, sqlite3_vtab_cursor, Filters, IndexInfo, VTab, VTabConnection, VTabCursor,
};

const PG_NAMESPACES: &[(i64, &str, i64, &str)] = &[
Expand Down Expand Up @@ -70,7 +70,7 @@ unsafe impl VTabCursor for PgNamespaceTableCursor<'_> {
&mut self,
_idx_num: c_int,
_idx_str: Option<&str>,
_args: &Values<'_>,
_args: &Filters<'_>,
) -> rusqlite::Result<()> {
self.row_id = 0;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-pg/src/vtab/pg_range.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{marker::PhantomData, os::raw::c_int};

use rusqlite::vtab::{
sqlite3_vtab, sqlite3_vtab_cursor, IndexInfo, VTab, VTabConnection, VTabCursor, Values,
sqlite3_vtab, sqlite3_vtab_cursor, Filters, IndexInfo, VTab, VTabConnection, VTabCursor,
};

#[repr(C)]
Expand Down Expand Up @@ -64,7 +64,7 @@ unsafe impl VTabCursor for PgRangeTableCursor<'_> {
&mut self,
_idx_num: c_int,
_idx_str: Option<&str>,
_args: &Values<'_>,
_args: &Filters<'_>,
) -> rusqlite::Result<()> {
Ok(())
}
Expand Down
Loading
Loading