Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Potential crash on db close #72

Merged
merged 12 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/purple-drinks-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/react-native-quick-sqlite': minor
---

Improved behaviour for closing a database connection. This should prevent some crash issues.
3 changes: 3 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ jobs:
cd tests
yarn install --frozen-lockfile

- name: Initialize Android Folder
run: mkdir -p ~/.android/avd

- name: create AVD and generate snapshot for caching
if: steps.avd-cache.outputs.cache-hit != 'true'
uses: reactivecircus/[email protected]
Expand Down
18 changes: 17 additions & 1 deletion cpp/ConnectionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ ConnectionPool::ConnectionPool(std::string dbName, std::string docPath,

onContextCallback = nullptr;
isConcurrencyEnabled = maxReads > 0;
isClosed = false;

readConnections = new ConnectionState *[maxReads];
// Open the read connections
Expand Down Expand Up @@ -94,7 +95,14 @@ ConnectionPool::queueInContext(ConnectionLockId contextId,
};
}

state->queueWork(task);
try {
state->queueWork(task);
} catch (const std::exception &e) {
return SQLiteOPResult{
.errorMessage = e.what(),
.type = SQLiteError,
};
}

return SQLiteOPResult{
.type = SQLiteOk,
Expand Down Expand Up @@ -162,6 +170,14 @@ void ConnectionPool::closeContext(ConnectionLockId contextId) {
}

void ConnectionPool::closeAll() {
isClosed = true;
// Stop any callbacks
sqlite3_commit_hook(writeConnection.connection,
NULL, NULL);
sqlite3_rollback_hook(writeConnection.connection,
NULL, NULL);
sqlite3_update_hook(writeConnection.connection,
NULL, NULL);
writeConnection.close();
for (int i = 0; i < maxReads; i++) {
readConnections[i]->close();
Expand Down
2 changes: 2 additions & 0 deletions cpp/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class ConnectionPool {
bool isConcurrencyEnabled;

public:
bool isClosed;

ConnectionPool(std::string dbName, std::string docPath,
unsigned int numReadConnections);
~ConnectionPool();
Expand Down
67 changes: 39 additions & 28 deletions cpp/ConnectionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,17 @@ SQLiteOPResult genericSqliteOpenDb(string const dbName, string const docPath,
ConnectionState::ConnectionState(const std::string dbName,
const std::string docPath, int SQLFlags) {
auto result = genericSqliteOpenDb(dbName, docPath, &connection, SQLFlags);

this->clearLock();
threadDone = false;
thread = new std::thread(&ConnectionState::doWork, this);
if (result.type != SQLiteOk) {
throw std::runtime_error("Failed to open SQLite database: " + result.errorMessage);
}
thread = std::thread(&ConnectionState::doWork, this);
this->clearLock();
}

ConnectionState::~ConnectionState() {
// So threads know it's time to shut down
threadDone = true;

// Wake up all the threads, so they can finish and be joined
workQueueConditionVariable.notify_all();
if (thread->joinable()) {
thread->join();
if (!isClosed) {
close();
}

delete thread;
}

void ConnectionState::clearLock() {
Expand Down Expand Up @@ -64,21 +58,41 @@ std::future<void> ConnectionState::refreshSchema() {
}

void ConnectionState::close() {
{
std::unique_lock<std::mutex> g(workQueueMutex);
// prevent any new work from being queued
isClosed = true;
}

// Wait for the work queue to empty
waitFinished();
// So that the thread can stop (if not already)
threadDone = true;

{
// Now signal the thread to stop and notify it
std::unique_lock<std::mutex> g(workQueueMutex);
threadDone = true;
workQueueConditionVariable.notify_all();
}

// Join the worker thread
if (thread.joinable()) {
thread.join();
}

// Safely close the SQLite connection
sqlite3_close_v2(connection);
}

void ConnectionState::queueWork(std::function<void(sqlite3 *)> task) {
// Grab the mutex
std::lock_guard<std::mutex> g(workQueueMutex);

// Push the request to the queue
workQueue.push(task);
{
std::unique_lock<std::mutex> g(workQueueMutex);
if (isClosed) {
throw std::runtime_error("Connection is not open. Connection has been closed before queueing work.");
}
workQueue.push(task);
}

// Notify one thread that there are requests to process
workQueueConditionVariable.notify_all();
workQueueConditionVariable.notify_all();
}

void ConnectionState::doWork() {
Expand All @@ -104,9 +118,9 @@ void ConnectionState::doWork() {
workQueue.pop();
}

++threadBusy;
threadBusy = true;
task(connection);
--threadBusy;
threadBusy = false;
// Need to notify in order for waitFinished to be updated when
// the queue is empty and not busy
{
Expand All @@ -118,11 +132,8 @@ void ConnectionState::doWork() {

void ConnectionState::waitFinished() {
std::unique_lock<std::mutex> g(workQueueMutex);
if (workQueue.empty()) {
return;
}
workQueueConditionVariable.wait(
g, [&] { return workQueue.empty() && (threadBusy == 0); });
g, [&] { return workQueue.empty() && !threadBusy; });
}

SQLiteOPResult genericSqliteOpenDb(string const dbName, string const docPath,
Expand Down
8 changes: 5 additions & 3 deletions cpp/ConnectionState.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ class ConnectionState {
// Mutex to protect workQueue
std::mutex workQueueMutex;
// Store thread in order to stop it gracefully
std::thread *thread;
std::thread thread;
// This condition variable is used for the threads to wait until there is work
// to do
std::condition_variable_any workQueueConditionVariable;
unsigned int threadBusy;
bool threadDone;
std::atomic<bool> threadBusy{false};
std::atomic<bool> threadDone{false};

public:
std::atomic<bool> isClosed{false};

ConnectionState(const std::string dbName, const std::string docPath,
int SQLFlags);
~ConnectionState();
Expand Down
28 changes: 24 additions & 4 deletions cpp/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,30 @@ void transactionFinalizerHandler(const TransactionCallbackPayload *payload) {
* This function triggers an async invocation to call watch callbacks,
* avoiding holding SQLite up.
*/
invoker->invokeAsync([payload] {

// Make a copy of the payload data, this avoids a potential race condition
// where the async invocation might occur after closing a connection
auto dbName = std::make_shared<std::string>(*payload->dbName);
int event = payload->event;
invoker->invokeAsync([dbName, event] {
try {

ConnectionPool* connection = getConnection(*dbName);
if (connection == nullptr || connection->isClosed) {
return;
}

auto global = runtime->global();
jsi::Function handlerFunction = global.getPropertyAsFunction(
*runtime, "triggerTransactionFinalizerHook");

auto jsiDbName = jsi::String::createFromAscii(*runtime, *payload->dbName);
auto jsiEventType = jsi::Value((int)payload->event);
auto jsiDbName = jsi::String::createFromAscii(*runtime, *dbName);
auto jsiEventType = jsi::Value(event);
handlerFunction.call(*runtime, move(jsiDbName), move(jsiEventType));
} catch (jsi::JSINativeException e) {
std::cout << e.what() << std::endl;
} catch (const std::exception& e) {
std::cout << "Standard Exception: " << e.what() << std::endl;
} catch (...) {
std::cout << "Unknown error" << std::endl;
}
Expand Down Expand Up @@ -384,7 +397,14 @@ void osp::install(jsi::Runtime &rt,
}
};

sqliteQueueInContext(dbName, contextLockId, task);
auto response = sqliteQueueInContext(dbName, contextLockId, task);
if (response.type == SQLiteError) {
auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error");
auto error = errorCtr.callAsConstructor(
rt, jsi::String::createFromUtf8(
rt, response.errorMessage));
reject->asObject(rt).asFunction(rt).call(rt, error);
}
return {};
}));

Expand Down
34 changes: 23 additions & 11 deletions cpp/sqliteBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ SQLiteOPResult generateNotOpenResult(std::string const &dbName) {
};
}

ConnectionPool *getConnection(std::string const dbName) {
if (dbMap.count(dbName) == 0) {
// Connection is already closed
return nullptr;
}

return dbMap[dbName];
}


/**
* Opens SQL database with default settings
*/
Expand All @@ -50,10 +60,18 @@ sqliteOpenDb(string const dbName, string const docPath,
};
}

dbMap[dbName] = new ConnectionPool(dbName, docPath, numReadConnections);
dbMap[dbName]->setOnContextAvailable(contextAvailableCallback);
dbMap[dbName]->setTableUpdateHandler(updateTableCallback);
dbMap[dbName]->setTransactionFinalizerHandler(onTransactionFinalizedCallback);
try {
// Open the database
dbMap[dbName] = new ConnectionPool(dbName, docPath, numReadConnections);
dbMap[dbName]->setOnContextAvailable(contextAvailableCallback);
dbMap[dbName]->setTableUpdateHandler(updateTableCallback);
dbMap[dbName]->setTransactionFinalizerHandler(onTransactionFinalizedCallback);
} catch (const std::exception &e) {
return SQLiteOPResult{
.type = SQLiteError,
.errorMessage = e.what(),
};
}

return SQLiteOPResult{
.type = SQLiteOk,
Expand Down Expand Up @@ -126,13 +144,6 @@ SQLiteOPResult sqliteRequestLock(std::string const dbName,

ConnectionPool *connection = dbMap[dbName];

if (connection == nullptr) {
return SQLiteOPResult{
.type = SQLiteOk,

};
}

switch (lockType) {
case ConcurrentLockType::ReadLock:
connection->readLock(contextId);
Expand All @@ -147,6 +158,7 @@ SQLiteOPResult sqliteRequestLock(std::string const dbName,

return SQLiteOPResult{
.type = SQLiteOk,

};
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/sqliteBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ SQLiteOPResult sqliteCloseDb(string const dbName);

void sqliteCloseAll();

ConnectionPool *getConnection(std::string const dbName);

SQLiteOPResult sqliteRemoveDb(string const dbName, string const docPath);

/**
Expand Down
2 changes: 2 additions & 0 deletions src/DBListenerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export interface DBListener extends BaseListener {
* is started, committed or rolled back.
*/
writeTransaction: (event: WriteTransactionEvent) => void;

closed: () => void;
}

export class DBListenerManager extends BaseObserver<DBListener> {}
Expand Down
Loading
Loading