Skip to content

Commit

Permalink
trigger update table callbacks only if changes are commited inside tr…
Browse files Browse the repository at this point in the history
…ansactions
  • Loading branch information
stevensJourney committed Jan 22, 2024
1 parent 7262d47 commit 8e6d64a
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changeset/chilled-queens-explain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/react-native-quick-sqlite': minor
---

Fixed table change updates to only trigger change updates for changes made in `writeTransaction` and `writeLock`s which have been commited. Added ability to listen to all table change events as they occur. Added listeners for when a transaction has started, been commited or rolled back.
96 changes: 96 additions & 0 deletions src/DBListenerManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import _ from 'lodash';
import { registerUpdateHook } from './table-updates';
import { UpdateCallback, UpdateNotification } from './types';
import { BaseListener, BaseObserver } from './utils/BaseObserver';

export interface DBListenerManagerOptions {
dbName: string;
}

export enum WriteTransactionEventType {
STARTED = 'started',
COMMIT = 'commit',
ROLLBACK = 'rollback'
}

export interface WriteTransactionEvent {
type: WriteTransactionEventType;
}

export interface DBListener extends BaseListener {
/**
* Register a listener to be fired for any table change.
* Changes inside write transactions are reported immediately.
*/
rawTableChange: UpdateCallback;

/**
* Register a listener for when table changes are persisted
* into the DB. Changes during write transactions which are
* rolled back are not reported.
* Any changes during write transactions are buffered and reported
* after commit.
* Table changes are reported individually for now in order to maintain
* API compatibility. These can be batched in future.
*/
tableUpdated: UpdateCallback;

/**
* Listener event triggered whenever a write transaction
* is started, committed or rolled back.
*/
writeTransaction: (event: WriteTransactionEvent) => void;
}

export class DBListenerManager extends BaseObserver<DBListener> {}

export class DBListenerManagerInternal extends DBListenerManager {
private _writeTransactionActive: boolean;
private updateBuffer: UpdateNotification[];

get writeTransactionActive() {
return this._writeTransactionActive;
}

constructor(protected options: DBListenerManagerOptions) {
super();
this._writeTransactionActive = false;
this.updateBuffer = [];
registerUpdateHook(this.options.dbName, (update) => this.handleTableUpdates(update));
}

transactionStarted() {
this._writeTransactionActive = true;
this.iterateListeners((l) => l?.writeTransaction?.({ type: WriteTransactionEventType.STARTED }));
}

transactionCommitted() {
this._writeTransactionActive = false;
// flush updates
const uniqueUpdates = _.uniq(this.updateBuffer);
this.updateBuffer = [];
this.iterateListeners((l) => {
l.writeTransaction?.({ type: WriteTransactionEventType.COMMIT });
uniqueUpdates.forEach((update) => l.tableUpdated?.(update));
});
}

transactionReverted() {
this._writeTransactionActive = false;
// clear updates
this.updateBuffer = [];
this.iterateListeners((l) => l?.writeTransaction?.({ type: WriteTransactionEventType.ROLLBACK }));
}

handleTableUpdates(notification: UpdateNotification) {
// Fire updates for any change
this.iterateListeners((l) => l.rawTableChange?.({ ...notification, pendingCommit: this._writeTransactionActive }));

if (this.writeTransactionActive) {
this.updateBuffer.push(notification);
return;
}

this.iterateListeners((l) => l.tableUpdated?.(notification));
}
}
17 changes: 17 additions & 0 deletions src/lock-hooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Hooks which can be triggered during the execution of read/write locks
*/
export interface LockHooks {
/**
* Executed after a SQL statement has been executed
*/
execute?: (sql: string, args?: any[]) => Promise<void>;
lockAcquired?: () => Promise<void>;
lockReleased?: () => Promise<void>;
}

export interface TransactionHooks extends LockHooks {
begin?: () => Promise<void>;
commit?: () => Promise<void>;
rollback?: () => Promise<void>;
}
122 changes: 96 additions & 26 deletions src/setup-open.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,26 @@ import {
TransactionContext,
UpdateCallback,
SQLBatchTuple,
OpenOptions
OpenOptions,
QueryResult
} from './types';

import uuid from 'uuid';
import _ from 'lodash';
import { enhanceQueryResult } from './utils';
import { registerUpdateHook } from './table-updates';
import { DBListenerManagerInternal } from './DBListenerManager';
import { LockHooks, TransactionHooks } from './lock-hooks';

type LockCallbackRecord = {
callback: (context: LockContext) => Promise<any>;
timeout?: NodeJS.Timeout;
};

enum TransactionFinalizer {
COMMIT = 'commit',
ROLLBACK = 'rollback'
}

const DEFAULT_READ_CONNECTIONS = 4;

const LockCallbacks: Record<ContextLockID, LockCallbackRecord> = {};
Expand Down Expand Up @@ -90,14 +97,17 @@ export function setupOpen(QuickSQLite: ISQLite) {
numReadConnections: options?.numReadConnections ?? DEFAULT_READ_CONNECTIONS
});

const listenerManager = new DBListenerManagerInternal({ dbName });

/**
* Wraps lock requests and their callbacks in order to resolve the lock
* request with the callback result once triggered from the connection pool.
*/
const requestLock = <T>(
type: ConcurrentLockType,
callback: (context: LockContext) => Promise<T>,
options?: LockOptions
options?: LockOptions,
hooks?: LockHooks
): Promise<T> => {
const id = uuid.v4(); // TODO maybe do this in C++
// Wrap the callback in a promise that will resolve to the callback result
Expand All @@ -106,12 +116,22 @@ export function setupOpen(QuickSQLite: ISQLite) {
const record = (LockCallbacks[id] = {
callback: async (context: LockContext) => {
try {
const res = await callback(context);
await hooks?.lockAcquired?.();
const res = await callback({
...context,
execute: async (sql, args) => {
const result = await context.execute(sql, args);
await hooks?.execute?.(sql, args);
return result;
}
});

// Ensure that we only resolve after locks are freed
_.defer(() => resolve(res));
} catch (ex) {
_.defer(() => reject(ex));
} finally {
_.defer(() => hooks?.lockReleased?.());
}
}
} as LockCallbackRecord);
Expand All @@ -137,15 +157,20 @@ export function setupOpen(QuickSQLite: ISQLite) {
const readLock = <T>(callback: (context: LockContext) => Promise<T>, options?: LockOptions): Promise<T> =>
requestLock(ConcurrentLockType.READ, callback, options);

const writeLock = <T>(callback: (context: LockContext) => Promise<T>, options?: LockOptions): Promise<T> =>
requestLock(ConcurrentLockType.WRITE, callback, options);
const writeLock = <T>(
callback: (context: LockContext) => Promise<T>,
options?: LockOptions,
hooks?: LockHooks
): Promise<T> => requestLock(ConcurrentLockType.WRITE, callback, options, hooks);

const wrapTransaction = async <T>(
context: LockContext,
callback: (context: TransactionContext) => Promise<T>,
defaultFinally: 'commit' | 'rollback' = 'commit'
defaultFinalizer: TransactionFinalizer = TransactionFinalizer.COMMIT,
hooks?: TransactionHooks
) => {
await context.execute('BEGIN TRANSACTION');
await hooks?.begin();
let finalized = false;

const finalizedStatement =
Expand All @@ -158,19 +183,29 @@ export function setupOpen(QuickSQLite: ISQLite) {
return action();
};

const commit = finalizedStatement(() => context.execute('COMMIT'));
const commitAsync = finalizedStatement(() => context.execute('COMMIT'));
const commit = finalizedStatement(async () => {
const result = await context.execute('COMMIT');
await hooks?.commit?.();
return result;
});

const rollback = finalizedStatement(() => context.execute('ROLLBACK'));
const rollbackAsync = finalizedStatement(() => context.execute('ROLLBACK'));
const rollback = finalizedStatement(async () => {
const result = await context.execute('ROLLBACK');
await hooks?.rollback?.();
return result;
});

const wrapExecute =
<T>(method: (sql: string, params?: any[]) => T): ((sql: string, params?: any[]) => T) =>
(sql: string, params?: any[]) => {
<T>(
method: (sql: string, params?: any[]) => Promise<QueryResult>
): ((sql: string, params?: any[]) => Promise<QueryResult>) =>
async (sql: string, params?: any[]) => {
if (finalized) {
throw new Error(`Cannot execute in transaction after it has been finalized with commit/rollback.`);
}
return method(sql, params);
const result = await method(sql, params);
await hooks?.execute?.(sql, params);
return result;
};

try {
Expand All @@ -180,17 +215,17 @@ export function setupOpen(QuickSQLite: ISQLite) {
rollback,
execute: wrapExecute(context.execute)
});
switch (defaultFinally) {
case 'commit':
await commitAsync();
switch (defaultFinalizer) {
case TransactionFinalizer.COMMIT:
await commit();
break;
case 'rollback':
await rollbackAsync();
case TransactionFinalizer.ROLLBACK:
await rollback();
break;
}
return res;
} catch (ex) {
await rollbackAsync();
await rollback();
throw ex;
}
};
Expand All @@ -202,20 +237,55 @@ export function setupOpen(QuickSQLite: ISQLite) {
readLock,
readTransaction: async <T>(callback: (context: TransactionContext) => Promise<T>, options?: LockOptions) =>
readLock((context) => wrapTransaction(context, callback)),
writeLock,
writeLock: async <T>(callback: (context: TransactionContext) => Promise<T>, options?: LockOptions) =>
writeLock(callback, options, {
execute: async (sql) => {
if (!listenerManager.writeTransactionActive) {
// check if starting a transaction
if (sql == 'BEGIN' || sql == 'BEGIN IMMEDIATE') {
listenerManager.transactionStarted();
return;
}
}
// check if finishing a transaction
switch (sql) {
case 'ROLLBACK':
listenerManager.transactionReverted();
break;
case 'COMMIT':
case 'END TRANSACTION':
listenerManager.transactionCommitted();
break;
}
},
lockReleased: async () => {
if (listenerManager.writeTransactionActive) {
// The lock was completed without ending the transaction.
// This should not occur, but do not report these updates
listenerManager.transactionReverted();
}
}
}),
writeTransaction: async <T>(callback: (context: TransactionContext) => Promise<T>, options?: LockOptions) =>
writeLock((context) => wrapTransaction(context, callback), options),
registerUpdateHook: (callback: UpdateCallback) => {
registerUpdateHook(dbName, callback);
},
writeLock(
(context) =>
wrapTransaction(context, callback, TransactionFinalizer.COMMIT, {
begin: async () => listenerManager.transactionStarted(),
commit: async () => listenerManager.transactionCommitted(),
rollback: async () => listenerManager.transactionReverted()
}),
options
),
registerUpdateHook: (callback: UpdateCallback) => listenerManager.registerListener({ tableUpdated: callback }),
delete: () => QuickSQLite.delete(dbName, options?.location),
executeBatch: (commands: SQLBatchTuple[]) =>
writeLock((context) => QuickSQLite.executeBatch(dbName, commands, (context as any)._contextId)),
attach: (dbNameToAttach: string, alias: string, location?: string) =>
QuickSQLite.attach(dbName, dbNameToAttach, alias, location),
detach: (alias: string) => QuickSQLite.detach(dbName, alias),
loadFile: (location: string) =>
writeLock((context) => QuickSQLite.loadFile(dbName, location, (context as any)._contextId))
writeLock((context) => QuickSQLite.loadFile(dbName, location, (context as any)._contextId)),
listenerManager
};
}
};
Expand Down
12 changes: 10 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { DBListenerManager } from './DBListenerManager';

/**
* Object returned by SQL Query executions {
* insertId: Represent the auto-generated row id if applicable
Expand Down Expand Up @@ -76,6 +78,11 @@ export interface UpdateNotification {
opType: RowUpdateType;
table: string;
rowId: number;
/**
* If this change ocurred during a write transaction which has not been
* committed yet.
*/
pendingCommit?: boolean;
}

export type UpdateCallback = (update: UpdateNotification) => void;
Expand Down Expand Up @@ -149,8 +156,9 @@ export type QuickSQLiteConnection = {
executeBatch: (commands: SQLBatchTuple[]) => Promise<BatchQueryResult>;
loadFile: (location: string) => Promise<FileLoadResult>;
/**
* Note that only one listener can be registered per database connection.
* Any new hook registration will override the previous one.
* @deprecated
* Use listenerManager instead
*/
registerUpdateHook(callback: UpdateCallback): void;
listenerManager: DBListenerManager;
};
Loading

0 comments on commit 8e6d64a

Please sign in to comment.