Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions components/platform/src/DB.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const SQLite3 = require('better-sqlite3');

const { getLogger, getConfig } = require('@pryv/boiler');
const logger = getLogger('platform:db');
const concurrentSafeWrite = require('storage/src/sqliteUtils/concurrentSafeWrite');

class DB {
db;
Expand All @@ -21,10 +22,11 @@ class DB {
mkdirp.sync(basePath);

this.db = new SQLite3(basePath + '/platform-wide.db');
this.db.pragma('journal_mode = WAL');

this.db.prepare('CREATE TABLE IF NOT EXISTS keyValue (key TEXT PRIMARY KEY, value TEXT NOT NULL);').run();
await concurrentSafeWrite.initWALAndConcurrentSafeWriteCapabilities(this.db);

await concurrentSafeWrite.execute(() => {
this.db.prepare('CREATE TABLE IF NOT EXISTS keyValue (key TEXT PRIMARY KEY, value TEXT NOT NULL);').run();
});
this.queries = {};
this.queries.getValueWithKey = this.db.prepare('SELECT key, value FROM keyValue WHERE key = ?');
this.queries.upsertUniqueKeyValue = this.db.prepare('INSERT OR REPLACE INTO keyValue (key, value) VALUES (@key, @value);');
Expand Down Expand Up @@ -52,46 +54,59 @@ class DB {
}

/**
*
* @param {string} key
* @param {string} value
* @returns
*/
set (key, value) {
async set (key, value) {
logger.debug('set', key, value);
return this.queries.upsertUniqueKeyValue.run({ key, value });
let result;
await concurrentSafeWrite.execute(() => {
result = this.queries.upsertUniqueKeyValue.run({ key, value });
});
return result;
}

delete (key) {
/**
* @param {string} key
* @returns
*/
async delete (key) {
logger.debug('delete', key);
return this.queries.deleteWithKey.run(key);
let result;
await concurrentSafeWrite.execute(() => {
result = this.queries.deleteWithKey.run(key);
});
return result;
}

deleteAll () {
async deleteAll () {
logger.debug('deleteAll');
this.queries.deleteAll.run();
await concurrentSafeWrite.execute(() => {
this.queries.deleteAll.run();
});
}

// ----- utilities ------- //

async setUserUniqueField (username, field, value) {
const key = getUserUniqueKey(field, value);
this.set(key, username);
await this.set(key, username);
}

async deleteUserUniqueField (field, value) {
const key = getUserUniqueKey(field, value);
this.delete(key);
await this.delete(key);
}

async setUserIndexedField (username, field, value) {
const key = getUserIndexedKey(username, field);
this.set(key, value);
await this.set(key, value);
}

async deleteUserIndexedField (username, field) {
const key = getUserIndexedKey(username, field);
this.delete(key);
await this.delete(key);
}

async getUserIndexedField (username, field) {
Expand Down
4 changes: 2 additions & 2 deletions components/platform/src/Platform.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ class Platform {

// for tests only - called by repository
async deleteAll () {
this.#db.deleteAll();
await this.#db.deleteAll();
}

/**
* Get if value exists for this unique key (only test on local db)
* Exposes directly a platform db method as it's needed by service_register in dnsLess mode
*/
async getLocalUsersUniqueField (field, value) {
return this.#db.getUsersUniqueField(field, value);
return await this.#db.getUsersUniqueField(field, value);
}

/**
Expand Down
51 changes: 51 additions & 0 deletions components/storage/src/sqliteUtils/concurrentSafeWrite.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* @license
* Copyright (C) 2012–2023 Pryv S.A. https://pryv.com - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited
* Proprietary and confidential
*/

const WAIT_LIST_MS = [1, 2, 5, 10, 15, 20, 25, 25, 25, 50, 50, 100];
const logger = require('@pryv/boiler').getLogger('sqliteConcurentWrites');
const { setTimeout } = require('timers/promises');

module.exports = {
execute,
initWALAndConcurrentSafeWriteCapabilities
};

/**
* Init the given DB in WAL and unsafe mode, as we will take care of managing concurrent writes errors.
*/
async function initWALAndConcurrentSafeWriteCapabilities (db) {
await execute(() => {
db.pragma('journal_mode = WAL');
});
await execute(() => {
db.pragma('busy_timeout = 0'); // We take care of busy timeout ourselves as long as current driver does not go below the second
});
await execute(() => {
db.unsafeMode(true);
});
}

/**
* Executes the given statement function, retrying `retries` times in case of `SQLITE_BUSY`.
* This is CPU intensive, but tests have shown this solution to be efficient.
*/
async function execute (statement, retries = 100) {
for (let i = 0; i < retries; i++) {
try {
statement();
return;
} catch (err) {
if (err.code !== 'SQLITE_BUSY') {
throw err;
}
const waitTime = i > (WAIT_LIST_MS.length - 1) ? 100 : WAIT_LIST_MS[i];
await setTimeout(waitTime);
logger.debug(`SQLITE_BUSY, retrying in ${waitTime} ms`);
}
}
throw new Error(`Failed write action on SQLite after ${retries} retries`);
}
89 changes: 34 additions & 55 deletions components/storage/src/userSQLite/UserDatabase.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Proprietary and confidential
*/

const concurrentSafeWrite = require('../sqliteUtils/concurrentSafeWrite');
const SQLite3 = require('better-sqlite3');
const { Readable } = require('stream');

Expand All @@ -22,8 +23,6 @@ const tables = {

const ALL_EVENTS_TAG = events.ALL_EVENTS_TAG;

const WAIT_LIST_MS = [1, 2, 5, 10, 15, 20, 25, 25, 25, 50, 50, 100];

/**
* TODO: refactor the structure of tables and queries
* (currently not consistent, either internally or with the Mongo code)
Expand Down Expand Up @@ -54,10 +53,7 @@ class UserDatabase {
}

async init () {
this.db.pragma('journal_mode = WAL');
this.db.pragma('busy_timeout = 0'); // We take care of busy timeout ourselves as long as current driver does not go below the second
this.db.unsafeMode(true);

await concurrentSafeWrite.initWALAndConcurrentSafeWriteCapabilities(this.db);
// here we might want to skip DB initialization if version is not null

this.create = {};
Expand All @@ -66,7 +62,7 @@ class UserDatabase {
this.delete = {};

// create all tables
Object.keys(tables).forEach((tableName) => {
for (const tableName of Object.keys(tables)) {
const columnsTypes = [];
const indexes = [];
const columnNames = Object.keys(tables[tableName]);
Expand All @@ -76,20 +72,24 @@ class UserDatabase {
if (column.index) { indexes.push(columnName); }
});

this.db.prepare('CREATE TABLE IF NOT EXISTS events ( ' +
columnsTypes.join(', ') +
');').run();

indexes.forEach((columnName) => {
this.db.prepare(`CREATE INDEX IF NOT EXISTS ${tableName}_${columnName} ON ${tableName}(${columnName})`).run();
await concurrentSafeWrite.execute(() => {
this.db.prepare('CREATE TABLE IF NOT EXISTS events ( ' +
columnsTypes.join(', ') +
');').run();
});

for (const columnName of indexes) {
await concurrentSafeWrite.execute(() => {
this.db.prepare(`CREATE INDEX IF NOT EXISTS ${tableName}_${columnName} ON ${tableName}(${columnName})`).run();
});
}

this.create[tableName] = this.db.prepare(`INSERT INTO ${tableName} (` +
columnNames.join(', ') + ') VALUES (@' +
columnNames.join(', @') + ')');

this.getAll[tableName] = this.db.prepare(`SELECT * FROM ${tableName}`);
});
}

// -- create FTS for streamIds on events
createFTSFor(this.db, 'events', tables.events, ['streamIds']);
Expand All @@ -114,9 +114,9 @@ class UserDatabase {
eventForDb.eventid = eventId;
const update = this.db.prepare(queryString);

await this.concurentSafeWriteStatement(() => {
await concurrentSafeWrite.execute(() => {
const res = update.run(eventForDb);
this.logger.debug('UPDATE events changes:' + res.changes + ' eventId:' + eventId + ' event:' + JSON.stringify(eventForDb));
this.logger.debug(`UPDATE events changes: ${res.changes} eventId: ${eventId} event: ${JSON.stringify(eventForDb)}`);
if (res.changes !== 1) {
throw new Error('Event not found');
}
Expand All @@ -132,14 +132,14 @@ class UserDatabase {
*/
createEventSync (event) {
const eventForDb = eventSchemas.eventToDB(event);
this.logger.debug('(sync) CREATE event:' + JSON.stringify(eventForDb));
this.logger.debug(`(sync) CREATE event: ${JSON.stringify(eventForDb)}`);
this.create.events.run(eventForDb);
}

async createEvent (event) {
const eventForDb = eventSchemas.eventToDB(event);
await this.concurentSafeWriteStatement(() => {
this.logger.debug('(async) CREATE event:' + JSON.stringify(eventForDb));
this.logger.debug(`(async) CREATE event: ${JSON.stringify(eventForDb)}`);
await concurrentSafeWrite.execute(() => {
this.create.events.run(eventForDb);
});
}
Expand All @@ -153,47 +153,47 @@ class UserDatabase {
}

async deleteEventsHistory (eventId) {
await this.concurentSafeWriteStatement(() => {
this.logger.debug('(async) DELETE event history for eventId:' + eventId);
this.logger.debug(`(async) DELETE event history for eventId: ${eventId}`);
await concurrentSafeWrite.execute(() => {
return this.delete.eventsByHeadId.run(eventId);
});
}

async minimizeEventHistory (eventId, fieldsToRemove) {
const minimizeHistoryStatement = `UPDATE events SET ${fieldsToRemove.map(field => `${field} = ${field === 'streamIds' ? '\'' + ALL_EVENTS_TAG + '\'' : 'NULL'}`).join(', ')} WHERE headId = ?`;
await this.concurentSafeWriteStatement(() => {
this.logger.debug('(async) Minimize event history :' + minimizeHistoryStatement);
this.logger.debug(`(async) Minimize event history: ${minimizeHistoryStatement}`);
await concurrentSafeWrite.execute(() => {
this.db.prepare(minimizeHistoryStatement).run(eventId);
});
}

async deleteEvents (params) {
const queryString = prepareEventsDeleteQuery(params);
if (queryString.indexOf('MATCH') > 0) {
this.logger.debug('DELETE events one by one as queryString includes MATCH: ' + queryString);
this.logger.debug(`DELETE events one by one as queryString includes MATCH: ${queryString}`);
// SQLite does not know how to delete with "MATCH" statement
// going by the doddgy task of getting events that matches the query and deleting them one by one
const selectEventsToBeDeleted = prepareEventsGetQuery(params);

for (const event of this.db.prepare(selectEventsToBeDeleted).iterate()) {
await this.concurentSafeWriteStatement(() => {
this.logger.debug(' > DELETE event: ' + event.eventid);
this.logger.debug(` > DELETE event: ${event.eventid}`);
await concurrentSafeWrite.execute(() => {
this.delete.eventById.run(event.eventid);
});
}
return null;
}
// else
let res = null;
await this.concurentSafeWriteStatement(() => {
this.logger.debug('DELETE events: ' + queryString);
this.logger.debug(`DELETE events: ${queryString}`);
await concurrentSafeWrite.execute(() => {
res = this.db.prepare(queryString).run();
});
return res;
}

getOneEvent (eventId) {
this.logger.debug('GET ONE event: ' + eventId);
this.logger.debug(`GET ONE event: ${eventId}`);
const event = this.get.eventById.get(eventId);
if (event == null) return null;
return eventSchemas.eventFromDB(event);
Expand All @@ -202,7 +202,7 @@ class UserDatabase {
getEvents (params) {
const queryString = prepareEventsGetQuery(params);

this.logger.debug('GET Events:' + queryString);
this.logger.debug(`GET Events: ${queryString}`);
const res = this.db.prepare(queryString).all();
if (res != null) {
return res.map(eventSchemas.eventFromDB);
Expand All @@ -212,18 +212,18 @@ class UserDatabase {

getEventsStream (params) {
const queryString = prepareEventsGetQuery(params);
this.logger.debug('GET Events Stream: ' + queryString);
this.logger.debug(`GET Events Stream: ${queryString}`);
const query = this.db.prepare(queryString);
return this.readableEventsStreamForIterator(query.iterate());
}

getEventsDeletionsStream (deletedSince) {
this.logger.debug('GET Events Deletions since: ' + deletedSince);
this.logger.debug(`GET Events Deletions since: ${deletedSince}`);
return this.readableEventsStreamForIterator(this.get.eventsDeletedSince.iterate(deletedSince));
}

getEventsHistory (eventId) {
this.logger.debug('GET Events History for: ' + eventId);
this.logger.debug(`GET Events History for: ${eventId}`);
return this.get.eventHistory.all(eventId).map(eventSchemas.historyEventFromDB);
}

Expand Down Expand Up @@ -254,31 +254,10 @@ class UserDatabase {
close () {
this.db.close();
}

/**
* Will look "retries" times, in case of "SQLITE_BUSY".
* This is CPU intensive, but tests have shown this solution to be efficient
*/
async concurentSafeWriteStatement (statement, retries = 100) {
for (let i = 0; i < retries; i++) {
try {
statement();
return;
} catch (error) {
if (error.code !== 'SQLITE_BUSY') { // ignore
throw error;
}
const waitTime = i > (WAIT_LIST_MS.length - 1) ? 100 : WAIT_LIST_MS[i];
await new Promise((resolve) => setTimeout(resolve, waitTime));
this.logger.debug('SQLITE_BUSY, retrying in ' + waitTime + 'ms');
}
}
throw new Error('Failed write action on Audit after ' + retries + ' retries');
}
}

function prepareEventsDeleteQuery (params) {
if (params.streams) { throw new Error('events DELETE with stream query not supported yet: ' + JSON.stringify(params)); }
if (params.streams) { throw new Error(`Events DELETE with stream query not supported yet: ${JSON.stringify(params)}`); }
return 'DELETE FROM events ' + prepareQuery(params, true);
}

Expand Down
Loading