Skip to content

Commit

Permalink
Merge pull request #53 from powersync-ja/fix-wal-race-condition
Browse files Browse the repository at this point in the history
Fix journal_mode = WAL race condition
  • Loading branch information
rkistner authored Oct 29, 2024
2 parents 9c933ed + bda8e7b commit 5f702c5
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 34 deletions.
20 changes: 0 additions & 20 deletions cpp/ConnectionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,6 @@ ConnectionPool::ConnectionPool(std::string dbName, std::string docPath,
readConnections[i] = new ConnectionState(
dbName, docPath, SQLITE_OPEN_READONLY | SQLITE_OPEN_FULLMUTEX);
}

if (true == isConcurrencyEnabled) {
// Write connection WAL setup
writeConnection.queueWork([](sqlite3 *db) {
sqliteExecuteLiteralWithDB(db, "PRAGMA journal_mode = WAL;");
sqliteExecuteLiteralWithDB(
db,
"PRAGMA journal_size_limit = 6291456"); // 6Mb 1.5x default checkpoint
// size
// Default to normal on all connections
sqliteExecuteLiteralWithDB(db, "PRAGMA synchronous = NORMAL;");
});

// Read connections WAL setup
for (int i = 0; i < this->maxReads; i++) {
readConnections[i]->queueWork([](sqlite3 *db) {
sqliteExecuteLiteralWithDB(db, "PRAGMA synchronous = NORMAL;");
});
}
}
};

ConnectionPool::~ConnectionPool() {
Expand Down
40 changes: 33 additions & 7 deletions cpp/ConnectionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ ConnectionState::~ConnectionState() {
}

void ConnectionState::clearLock() {
if (!workQueue.empty()) {
waitFinished();
}
waitFinished();
_currentLockId = EMPTY_LOCK_ID;
}

Expand All @@ -47,9 +45,7 @@ bool ConnectionState::matchesLock(const ConnectionLockId &lockId) {
bool ConnectionState::isEmptyLock() { return _currentLockId == EMPTY_LOCK_ID; }

void ConnectionState::close() {
if (!workQueue.empty()) {
waitFinished();
}
waitFinished();
// So that the thread can stop (if not already)
threadDone = true;
sqlite3_close_v2(connection);
Expand Down Expand Up @@ -94,12 +90,18 @@ void ConnectionState::doWork() {
--threadBusy;
// Need to notify in order for waitFinished to be updated when
// the queue is empty and not busy
workQueueConditionVariable.notify_all();
{
std::unique_lock<std::mutex> g(workQueueMutex);
workQueueConditionVariable.notify_all();
}
}
}

void ConnectionState::waitFinished() {
std::unique_lock<std::mutex> g(workQueueMutex);
if (workQueue.empty()) {
return;
}
workQueueConditionVariable.wait(
g, [&] { return workQueue.empty() && (threadBusy == 0); });
}
Expand All @@ -116,5 +118,29 @@ SQLiteOPResult genericSqliteOpenDb(string const dbName, string const docPath,
.errorMessage = sqlite3_errmsg(*db)};
}

// Set journal mode directly when opening.
// This may have some overhead on the main thread,
// but prevents race conditions with multiple connections.
if (sqlOpenFlags & SQLITE_OPEN_READONLY) {
exit = sqlite3_exec(*db, "PRAGMA busy_timeout = 30000;"
// Default to normal on all connections
"PRAGMA synchronous = NORMAL;",
nullptr, nullptr, nullptr
);
} else {
exit = sqlite3_exec(*db, "PRAGMA busy_timeout = 30000;"
"PRAGMA journal_mode = WAL;"
// 6Mb 1.5x default checkpoint size
"PRAGMA journal_size_limit = 6291456;"
// Default to normal on all connections
"PRAGMA synchronous = NORMAL;",
nullptr, nullptr, nullptr
);
}
if (exit != SQLITE_OK) {
return SQLiteOPResult{.type = SQLiteError,
.errorMessage = sqlite3_errmsg(*db)};
}

return SQLiteOPResult{.type = SQLiteOk, .rowsAffected = 0};
}
2 changes: 1 addition & 1 deletion tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"nativewind": "^2.0.11",
"react": "18.2.0",
"react-native": "0.73.4",
"react-native-quick-sqlite": "./..",
"react-native-quick-sqlite": "link:..",
"react-native-safe-area-context": "4.8.2",
"reflect-metadata": "^0.1.13",
"stream-browserify": "^3.0.0",
Expand Down
16 changes: 12 additions & 4 deletions tests/scripts/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ program
'The virtual android device name (the adb device name will be fetched from this)',
DEFAULT_AVD_NAME
)
.option('--device', 'Use a physical device instead of emulator')
.option('--port', 'Port to run Express HTTP server for getting results on.', DEFAULT_PORT)
.action(async (str, options) => {
const opts = options.opts();
const avdName = opts.avdName;
const deviceName = await getADBDeviceName(avdName);
const useDevice = opts.device;
const deviceName = await getADBDeviceName(avdName, useDevice);
if (!deviceName) {
throw new Error(`Could not find adb device with AVD name ${avdName}`);
}
Expand All @@ -38,7 +40,11 @@ program
await spawnP('Reverse Port', `adb`, [`-s`, deviceName, `reverse`, `tcp:${port}`, `tcp:${port}`]);

/** Build and run the Expo app, don't await this, we will await a response. */
spawnP('Build Expo App', `yarn`, [`android`, `-d`, avdName]);
if (!useDevice) {
spawnP('Build Expo App', `yarn`, [`android`, `-d`, avdName]);
} else {
spawnP('Build Expo App', `yarn`, [`android`]);
}

const app = express();
app.use(bodyParser.json());
Expand Down Expand Up @@ -107,7 +113,7 @@ async function spawnP(tag, cmd, args = []) {
});
}

async function getADBDeviceName(avdName) {
async function getADBDeviceName(avdName, useDevice) {
const tag = 'Get ADB Device';
const devicesOutput = await spawnP(tag, 'adb', ['devices']);
const deviceNames = _.chain(devicesOutput.split('\n'))
Expand All @@ -116,7 +122,9 @@ async function getADBDeviceName(avdName) {
.map((line) => line.trim()) // Omit empty results
.filter((line) => !!line)
.value();

if (useDevice) {
return deviceNames[0];
}
// Need to check all devices for their AVD name
for (let deviceName of deviceNames) {
try {
Expand Down
20 changes: 20 additions & 0 deletions tests/tests/sqlite/rawQueries.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,5 +629,25 @@ export function registerBaseTests() {

expect(duration).lessThan(2000);
});

it('Should use WAL', async () => {
for (let i = 0; i < 5; i++) {
let db: QuickSQLiteConnection;
try {
db = open('test-wal' + i, {
numReadConnections: NUM_READ_CONNECTIONS
});

const journalMode = await db.execute('PRAGMA journal_mode');
const journalModeRO = await db.readLock((tx) => tx.execute('PRAGMA journal_mode'));
expect(journalMode.rows.item(0).journal_mode).equals('wal');

expect(journalModeRO.rows.item(0).journal_mode).equals('wal');
} finally {
db?.close();
db?.delete();
}
}
});
});
}
5 changes: 3 additions & 2 deletions tests/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6259,8 +6259,9 @@ react-native-quick-base64@^2.0.5:
dependencies:
base64-js "^1.5.1"

react-native-quick-sqlite@./..:
version "1.1.5"
"react-native-quick-sqlite@link:..":
version "0.0.0"
uid ""

[email protected]:
version "4.8.2"
Expand Down

0 comments on commit 5f702c5

Please sign in to comment.