Skip to content

Commit

Permalink
fix(ext/node): implement DatabaseSync#applyChangeset() (#27967)
Browse files Browse the repository at this point in the history
https://nodejs.org/api/sqlite.html#databaseapplychangesetchangeset-options

```js
const sourceDb = new DatabaseSync(':memory:');
const targetDb = new DatabaseSync(':memory:');

sourceDb.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)');
targetDb.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)');

const session = sourceDb.createSession();

const insert = sourceDb.prepare('INSERT INTO data (key, value) VALUES (?, ?)');
insert.run(1, 'hello');
insert.run(2, 'world');

const changeset = session.changeset();
targetDb.applyChangeset(changeset);
// Now that the changeset has been applied, targetDb contains the same data as sourceDb. 
```
  • Loading branch information
littledivy authored Feb 6, 2025
1 parent bc85548 commit ece384c
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 0 deletions.
125 changes: 125 additions & 0 deletions ext/node/ops/sqlite/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

use std::cell::Cell;
use std::cell::RefCell;
use std::ffi::c_char;
use std::ffi::c_void;
use std::ffi::CStr;
use std::ffi::CString;
use std::ptr::null;
use std::rc::Rc;

use deno_core::op2;
use deno_core::serde_v8;
use deno_core::v8;
use deno_core::GarbageCollected;
use deno_core::OpState;
use deno_permissions::PermissionsContainer;
Expand Down Expand Up @@ -41,6 +46,13 @@ impl Default for DatabaseSyncOptions {
}
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ApplyChangesetOptions<'a> {
filter: Option<serde_v8::Value<'a>>,
on_conflict: Option<serde_v8::Value<'a>>,
}

pub struct DatabaseSync {
conn: Rc<RefCell<Option<rusqlite::Connection>>>,
options: DatabaseSyncOptions,
Expand Down Expand Up @@ -197,6 +209,119 @@ impl DatabaseSync {
})
}

// Applies a changeset to the database.
//
// This method is a wrapper around `sqlite3changeset_apply()`.
#[reentrant]
fn apply_changeset<'a>(
&self,
scope: &mut v8::HandleScope<'a>,
#[buffer] changeset: &[u8],
#[serde] options: Option<ApplyChangesetOptions<'a>>,
) -> Result<bool, SqliteError> {
struct HandlerCtx<'a, 'b> {
scope: &'a mut v8::HandleScope<'b>,
confict: Option<v8::Local<'b, v8::Function>>,
filter: Option<v8::Local<'b, v8::Function>>,
}

// Conflict handler callback for `sqlite3changeset_apply()`.
unsafe extern "C" fn conflict_handler(
p_ctx: *mut c_void,
e_conflict: i32,
_: *mut libsqlite3_sys::sqlite3_changeset_iter,
) -> i32 {
let ctx = &mut *(p_ctx as *mut HandlerCtx);

if let Some(conflict) = &mut ctx.confict {
let recv = v8::undefined(ctx.scope).into();
let args = [v8::Integer::new(ctx.scope, e_conflict).into()];

let ret = conflict.call(ctx.scope, recv, &args).unwrap();
return ret
.int32_value(ctx.scope)
.unwrap_or(libsqlite3_sys::SQLITE_CHANGESET_ABORT);
}

libsqlite3_sys::SQLITE_CHANGESET_ABORT
}

// Filter handler callback for `sqlite3changeset_apply()`.
unsafe extern "C" fn filter_handler(
p_ctx: *mut c_void,
z_tab: *const c_char,
) -> i32 {
let ctx = &mut *(p_ctx as *mut HandlerCtx);

if let Some(filter) = &mut ctx.filter {
let tab = CStr::from_ptr(z_tab).to_str().unwrap();

let recv = v8::undefined(ctx.scope).into();
let args = [v8::String::new(ctx.scope, tab).unwrap().into()];

let ret = filter.call(ctx.scope, recv, &args).unwrap();
return ret.boolean_value(ctx.scope) as i32;
}

1
}

let db = self.conn.borrow();
let db = db.as_ref().ok_or(SqliteError::AlreadyClosed)?;

// It is safe to use scope in the handlers because they are never
// called after the call to `sqlite3changeset_apply()`.
let mut ctx = HandlerCtx {
scope,
confict: None,
filter: None,
};

if let Some(options) = options {
if let Some(filter) = options.filter {
let filter_cb: v8::Local<v8::Function> = filter
.v8_value
.try_into()
.map_err(|_| SqliteError::InvalidCallback("filter"))?;
ctx.filter = Some(filter_cb);
}

if let Some(on_conflict) = options.on_conflict {
let on_conflict_cb: v8::Local<v8::Function> = on_conflict
.v8_value
.try_into()
.map_err(|_| SqliteError::InvalidCallback("onConflict"))?;
ctx.confict = Some(on_conflict_cb);
}
}

// SAFETY: lifetime of the connection is guaranteed by reference
// counting.
let raw_handle = unsafe { db.handle() };

// SAFETY: `changeset` points to a valid memory location and its
// length is correct. `ctx` is stack allocated and its lifetime is
// longer than the call to `sqlite3changeset_apply()`.
unsafe {
let r = libsqlite3_sys::sqlite3changeset_apply(
raw_handle,
changeset.len() as i32,
changeset.as_ptr() as *mut _,
Some(filter_handler),
Some(conflict_handler),
&mut ctx as *mut _ as *mut c_void,
);

if r == libsqlite3_sys::SQLITE_OK {
return Ok(true);
} else if r == libsqlite3_sys::SQLITE_ABORT {
return Ok(false);
}

Err(SqliteError::ChangesetApplyFailed)
}
}

// Creates and attaches a session to the database.
//
// This method is a wrapper around `sqlite3session_create()` and
Expand Down
6 changes: 6 additions & 0 deletions ext/node/ops/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ pub enum SqliteError {
#[class(range)]
#[error("The value of column {0} is too large to be represented as a JavaScript number: {1}")]
NumberTooLarge(i32, i64),
#[class(generic)]
#[error("Failed to apply changeset")]
ChangesetApplyFailed,
#[class(type)]
#[error("Invalid callback: {0}")]
InvalidCallback(&'static str),
}
29 changes: 29 additions & 0 deletions tests/unit_node/sqlite_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,32 @@ Deno.test({
}
},
});

Deno.test("[node/sqlite] applyChangeset across databases", () => {
const sourceDb = new DatabaseSync(":memory:");
const targetDb = new DatabaseSync(":memory:");

sourceDb.exec("CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)");
targetDb.exec("CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)");

const session = sourceDb.createSession();

const insert = sourceDb.prepare(
"INSERT INTO data (key, value) VALUES (?, ?)",
);
insert.run(1, "hello");
insert.run(2, "world");

const changeset = session.changeset();
targetDb.applyChangeset(changeset, {
filter(e) {
return e === "data";
},
});

const stmt = targetDb.prepare("SELECT * FROM data");
assertEquals(stmt.all(), [
{ key: 1, value: "hello", __proto__: null },
{ key: 2, value: "world", __proto__: null },
]);
});

0 comments on commit ece384c

Please sign in to comment.