Skip to content

Commit

Permalink
close pending locks when database is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Jan 23, 2025
1 parent 421acd6 commit 8226d1e
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 39 deletions.
19 changes: 11 additions & 8 deletions cpp/ConnectionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,19 @@ std::future<void> ConnectionState::refreshSchema() {
}

void ConnectionState::close() {
// prevent any new work from being queued
isClosed = true;
{
// Now signal the thread to stop and notify it
std::unique_lock<std::mutex> g(workQueueMutex);
// prevent any new work from being queued
isClosed = true;
}

// Wait for the work queue to empty
waitFinished();

{
// Now signal the thread to stop and notify it
std::lock_guard<std::mutex> g(workQueueMutex);
std::unique_lock<std::mutex> g(workQueueMutex);
threadDone = true;
workQueueConditionVariable.notify_all();
}
Expand All @@ -81,12 +85,11 @@ void ConnectionState::close() {
}

void ConnectionState::queueWork(std::function<void(sqlite3 *)> task) {
if (isClosed) {
throw std::runtime_error("Connection is not open. Connection has been closed before queueing work.");
}

{
std::lock_guard<std::mutex> g(workQueueMutex);
std::unique_lock<std::mutex> g(workQueueMutex);
if (isClosed) {
throw std::runtime_error("Connection is not open. Connection has been closed before queueing work.");
}
workQueue.push(task);
}

Expand Down
8 changes: 1 addition & 7 deletions cpp/sqliteBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,6 @@ SQLiteOPResult sqliteRequestLock(std::string const dbName,

ConnectionPool *connection = dbMap[dbName];

if (connection == nullptr) {
return SQLiteOPResult{
.type = SQLiteOk,

};
}

switch (lockType) {
case ConcurrentLockType::ReadLock:
connection->readLock(contextId);
Expand All @@ -155,6 +148,7 @@ SQLiteOPResult sqliteRequestLock(std::string const dbName,

return SQLiteOPResult{
.type = SQLiteOk,

};
}

Expand Down
57 changes: 43 additions & 14 deletions src/setup-open.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import {
ISQLite,
ConcurrentLockType,
QuickSQLiteConnection,
ContextLockID,
ISQLite,
LockContext,
LockOptions,
TransactionContext,
UpdateCallback,
SQLBatchTuple,
OpenOptions,
QueryResult
QueryResult,
QuickSQLiteConnection,
SQLBatchTuple,
TransactionContext,
UpdateCallback
} from './types';

import { enhanceQueryResult } from './utils';
import { DBListenerManagerInternal } from './DBListenerManager';
import { LockHooks } from './lock-hooks';
import { enhanceQueryResult } from './utils';

type LockCallbackRecord = {
callback: (context: LockContext) => Promise<any>;
Expand All @@ -39,11 +39,16 @@ const getRequestId = () => {
const LockCallbacks: Record<ContextLockID, LockCallbackRecord> = {};
let proxy: ISQLite;

/**
* Creates a unique identifier for all a database's lock requests
*/
const lockKey = (dbName: string, id: ContextLockID) => `${dbName}:${id}`;

/**
* Closes the context in JS and C++
*/
function closeContextLock(dbName: string, id: ContextLockID) {
delete LockCallbacks[id];
delete LockCallbacks[lockKey(dbName, id)];

// This is configured by the setupOpen function
proxy.releaseLock(dbName, id);
Expand All @@ -59,7 +64,11 @@ global.onLockContextIsAvailable = async (dbName: string, lockId: ContextLockID)
// Don't hold C++ bridge side up waiting to complete
setImmediate(async () => {
try {
const record = LockCallbacks[lockId];
const key = lockKey(dbName, lockId);
const record = LockCallbacks[key];
// clear record after fetching, the hash should only contain pending requests
delete LockCallbacks[key];

if (record?.timeout) {
clearTimeout(record.timeout);
}
Expand Down Expand Up @@ -116,12 +125,12 @@ export function setupOpen(QuickSQLite: ISQLite) {
// Wrap the callback in a promise that will resolve to the callback result
return new Promise<T>((resolve, reject) => {
// Add callback to the queue for timing
const record = (LockCallbacks[id] = {
const key = lockKey(dbName, id);
const record = (LockCallbacks[key] = {
callback: async (context: LockContext) => {
try {
await hooks?.lockAcquired?.();
const res = await callback(context);

closeContextLock(dbName, id);
resolve(res);
} catch (ex) {
Expand All @@ -134,18 +143,19 @@ export function setupOpen(QuickSQLite: ISQLite) {
} as LockCallbackRecord);

try {
// throws if lock could not be requested
QuickSQLite.requestLock(dbName, id, type);
const timeout = options?.timeoutMs;
if (timeout) {
record.timeout = setTimeout(() => {
// The callback won't be executed
delete LockCallbacks[id];
delete LockCallbacks[key];
reject(new Error(`Lock request timed out after ${timeout}ms`));
}, timeout);
}
} catch (ex) {
// Remove callback from the queue
delete LockCallbacks[id];
delete LockCallbacks[key];
reject(ex);
}
});
Expand Down Expand Up @@ -224,7 +234,26 @@ export function setupOpen(QuickSQLite: ISQLite) {

// Return the concurrent connection object
return {
close: () => QuickSQLite.close(dbName),
close: () => {
QuickSQLite.close(dbName);
// Reject any pending lock requests
Object.entries(LockCallbacks).forEach(([key, record]) => {
const recordDBName = key.split(':')[0];
if (dbName !== recordDBName) {
return;
}
// A bit of a hack, let the callbacks run with an execute method that will fail
record
.callback({
execute: async () => {
throw new Error('Connection is closed');
}
})
.catch(() => {});

delete LockCallbacks[key];
});
},
refreshSchema: () => QuickSQLite.refreshSchema(dbName),
execute: (sql: string, args?: any[]) => writeLock((context) => context.execute(sql, args)),
readLock,
Expand Down
4 changes: 0 additions & 4 deletions tests/android/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
// Top-level build file where you can add configuration options common to all sub-projects/modules.

task printEnv() {
System.out.println( System.getenv().toString())
}

buildscript {
ext {
buildToolsVersion = findProperty('android.buildToolsVersion') ?: '35.0.0'
Expand Down
65 changes: 59 additions & 6 deletions tests/tests/sqlite/rawQueries.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -651,29 +651,82 @@ export function registerBaseTests() {
});

it('Should handle multiple closes', async () => {
// populate test data
const dbName = 'test-close';

// populate test data
const db = open(dbName);
await db.execute('CREATE TABLE IF NOT EXISTS t1(id INTEGER PRIMARY KEY, c TEXT)');
await db.execute('DELETE FROM t1');
// Bulk insert 50000 rows without using a transaction
// // Bulk insert 50000 rows without using a transaction
const bulkInsertCommands: SQLBatchTuple[] = [];
for (let i = 0; i < 50000; i++) {
bulkInsertCommands.push(['INSERT INTO t1(id, c) VALUES(?, ?)', [i + 1, `value${i + 1}`]]);
}
await db.executeBatch(bulkInsertCommands);
db.close();

for (let i = 1; i < 1_000; i++) {
for (let i = 1; i < 100; i++) {
const db = open(dbName, {
numReadConnections: NUM_READ_CONNECTIONS
});

// don't await this on purpose
const pExecute = db.execute(`SELECT * FROM t1} `);
// ensure a regular query works
const pExecute = await db.execute(`SELECT * FROM t1 `);
expect(pExecute.rows?.length).to.equal(50000);

// Queue a bunch of write locks, these will fail due to the db being closed
// before they are accepted.
const tests = [
db.execute(`SELECT * FROM t1 `),
db.execute(`SELECT * FROM t1 `),
db.execute(`SELECT * FROM t1 `),
db.execute(`SELECT * FROM t1 `)
];

db.close();

const results = await Promise.allSettled(tests);
expect(results.map((r) => r.status)).deep.equal(Array(tests.length).fill('rejected'));
}
});

it('Should wait for locks before close', async () => {
const dbName = 'test-lock-close';

// populate test data
const db = open(dbName);
await db.execute('CREATE TABLE IF NOT EXISTS t1(id INTEGER PRIMARY KEY, c TEXT)');
await db.execute('DELETE FROM t1');
// // Bulk insert 50000 rows without using a transaction
const bulkInsertCommands: SQLBatchTuple[] = [];
for (let i = 0; i < 50000; i++) {
bulkInsertCommands.push(['INSERT INTO t1(id, c) VALUES(?, ?)', [i + 1, `value${i + 1}`]]);
}
await db.executeBatch(bulkInsertCommands);
db.close();

for (let i = 1; i < 10; i++) {
const db = open(dbName, {
numReadConnections: NUM_READ_CONNECTIONS
});

const promises: Promise<QueryResult>[] = [];
// ensure a regular query
db.writeLock(async (tx) => {
await tx.execute(`SELECT * FROM t1 `);
// Don't await these
promises.push(
tx.execute(`SELECT * FROM t1 `),
tx.execute(`SELECT * FROM t1 `),
tx.execute(`SELECT * FROM t1 `),
tx.execute(`SELECT * FROM t1 `)
);
});

db.close();

await expect(pExecute).to.eventually.rejectedWith('is not open');
const results = await Promise.all(promises);
expect(results.map((r) => r.rows?.length)).deep.equal(Array(promises.length).fill(50000));
}
});
});
Expand Down

0 comments on commit 8226d1e

Please sign in to comment.