From ece384c094eae0a22c8f862fe1deeab45f937813 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 6 Feb 2025 08:52:39 +0530 Subject: [PATCH] fix(ext/node): implement `DatabaseSync#applyChangeset()` (#27967) https://nodejs.org/api/sqlite.html#databaseapplychangesetchangeset-options ```js const sourceDb = new DatabaseSync(':memory:'); const targetDb = new DatabaseSync(':memory:'); sourceDb.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)'); targetDb.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)'); const session = sourceDb.createSession(); const insert = sourceDb.prepare('INSERT INTO data (key, value) VALUES (?, ?)'); insert.run(1, 'hello'); insert.run(2, 'world'); const changeset = session.changeset(); targetDb.applyChangeset(changeset); // Now that the changeset has been applied, targetDb contains the same data as sourceDb. ``` --- ext/node/ops/sqlite/database.rs | 125 ++++++++++++++++++++++++++++++++ ext/node/ops/sqlite/mod.rs | 6 ++ tests/unit_node/sqlite_test.ts | 29 ++++++++ 3 files changed, 160 insertions(+) diff --git a/ext/node/ops/sqlite/database.rs b/ext/node/ops/sqlite/database.rs index 73063b6276ba90..e935a634cef6df 100644 --- a/ext/node/ops/sqlite/database.rs +++ b/ext/node/ops/sqlite/database.rs @@ -2,11 +2,16 @@ use std::cell::Cell; use std::cell::RefCell; +use std::ffi::c_char; +use std::ffi::c_void; +use std::ffi::CStr; use std::ffi::CString; use std::ptr::null; use std::rc::Rc; use deno_core::op2; +use deno_core::serde_v8; +use deno_core::v8; use deno_core::GarbageCollected; use deno_core::OpState; use deno_permissions::PermissionsContainer; @@ -41,6 +46,13 @@ impl Default for DatabaseSyncOptions { } } +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ApplyChangesetOptions<'a> { + filter: Option>, + on_conflict: Option>, +} + pub struct DatabaseSync { conn: Rc>>, options: DatabaseSyncOptions, @@ -197,6 +209,119 @@ impl DatabaseSync { }) } + // Applies a changeset to the database. + // + // This method is a wrapper around `sqlite3changeset_apply()`. + #[reentrant] + fn apply_changeset<'a>( + &self, + scope: &mut v8::HandleScope<'a>, + #[buffer] changeset: &[u8], + #[serde] options: Option>, + ) -> Result { + struct HandlerCtx<'a, 'b> { + scope: &'a mut v8::HandleScope<'b>, + confict: Option>, + filter: Option>, + } + + // Conflict handler callback for `sqlite3changeset_apply()`. + unsafe extern "C" fn conflict_handler( + p_ctx: *mut c_void, + e_conflict: i32, + _: *mut libsqlite3_sys::sqlite3_changeset_iter, + ) -> i32 { + let ctx = &mut *(p_ctx as *mut HandlerCtx); + + if let Some(conflict) = &mut ctx.confict { + let recv = v8::undefined(ctx.scope).into(); + let args = [v8::Integer::new(ctx.scope, e_conflict).into()]; + + let ret = conflict.call(ctx.scope, recv, &args).unwrap(); + return ret + .int32_value(ctx.scope) + .unwrap_or(libsqlite3_sys::SQLITE_CHANGESET_ABORT); + } + + libsqlite3_sys::SQLITE_CHANGESET_ABORT + } + + // Filter handler callback for `sqlite3changeset_apply()`. + unsafe extern "C" fn filter_handler( + p_ctx: *mut c_void, + z_tab: *const c_char, + ) -> i32 { + let ctx = &mut *(p_ctx as *mut HandlerCtx); + + if let Some(filter) = &mut ctx.filter { + let tab = CStr::from_ptr(z_tab).to_str().unwrap(); + + let recv = v8::undefined(ctx.scope).into(); + let args = [v8::String::new(ctx.scope, tab).unwrap().into()]; + + let ret = filter.call(ctx.scope, recv, &args).unwrap(); + return ret.boolean_value(ctx.scope) as i32; + } + + 1 + } + + let db = self.conn.borrow(); + let db = db.as_ref().ok_or(SqliteError::AlreadyClosed)?; + + // It is safe to use scope in the handlers because they are never + // called after the call to `sqlite3changeset_apply()`. + let mut ctx = HandlerCtx { + scope, + confict: None, + filter: None, + }; + + if let Some(options) = options { + if let Some(filter) = options.filter { + let filter_cb: v8::Local = filter + .v8_value + .try_into() + .map_err(|_| SqliteError::InvalidCallback("filter"))?; + ctx.filter = Some(filter_cb); + } + + if let Some(on_conflict) = options.on_conflict { + let on_conflict_cb: v8::Local = on_conflict + .v8_value + .try_into() + .map_err(|_| SqliteError::InvalidCallback("onConflict"))?; + ctx.confict = Some(on_conflict_cb); + } + } + + // SAFETY: lifetime of the connection is guaranteed by reference + // counting. + let raw_handle = unsafe { db.handle() }; + + // SAFETY: `changeset` points to a valid memory location and its + // length is correct. `ctx` is stack allocated and its lifetime is + // longer than the call to `sqlite3changeset_apply()`. + unsafe { + let r = libsqlite3_sys::sqlite3changeset_apply( + raw_handle, + changeset.len() as i32, + changeset.as_ptr() as *mut _, + Some(filter_handler), + Some(conflict_handler), + &mut ctx as *mut _ as *mut c_void, + ); + + if r == libsqlite3_sys::SQLITE_OK { + return Ok(true); + } else if r == libsqlite3_sys::SQLITE_ABORT { + return Ok(false); + } + + Err(SqliteError::ChangesetApplyFailed) + } + } + // Creates and attaches a session to the database. // // This method is a wrapper around `sqlite3session_create()` and diff --git a/ext/node/ops/sqlite/mod.rs b/ext/node/ops/sqlite/mod.rs index d3c273a66347d9..05e2e256b8af16 100644 --- a/ext/node/ops/sqlite/mod.rs +++ b/ext/node/ops/sqlite/mod.rs @@ -58,4 +58,10 @@ pub enum SqliteError { #[class(range)] #[error("The value of column {0} is too large to be represented as a JavaScript number: {1}")] NumberTooLarge(i32, i64), + #[class(generic)] + #[error("Failed to apply changeset")] + ChangesetApplyFailed, + #[class(type)] + #[error("Invalid callback: {0}")] + InvalidCallback(&'static str), } diff --git a/tests/unit_node/sqlite_test.ts b/tests/unit_node/sqlite_test.ts index ec54780ae92980..5cfec0d5e43fef 100644 --- a/tests/unit_node/sqlite_test.ts +++ b/tests/unit_node/sqlite_test.ts @@ -152,3 +152,32 @@ Deno.test({ } }, }); + +Deno.test("[node/sqlite] applyChangeset across databases", () => { + const sourceDb = new DatabaseSync(":memory:"); + const targetDb = new DatabaseSync(":memory:"); + + sourceDb.exec("CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)"); + targetDb.exec("CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)"); + + const session = sourceDb.createSession(); + + const insert = sourceDb.prepare( + "INSERT INTO data (key, value) VALUES (?, ?)", + ); + insert.run(1, "hello"); + insert.run(2, "world"); + + const changeset = session.changeset(); + targetDb.applyChangeset(changeset, { + filter(e) { + return e === "data"; + }, + }); + + const stmt = targetDb.prepare("SELECT * FROM data"); + assertEquals(stmt.all(), [ + { key: 1, value: "hello", __proto__: null }, + { key: 2, value: "world", __proto__: null }, + ]); +});