Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 45 additions & 9 deletions src/hooks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,39 @@ export const runAndTriggerHooks = async (hook: EventEmitter, cb: () => unknown)
try {
const result = await Promise.resolve(cb());

setImmediate(() => {
hook.emit('commit');

hook.emit('end', undefined);
hook.removeAllListeners();
// Store the promises returned by commit handlers
const commitPromises: Promise<unknown>[] = [];

// promiseCollector is an internal callback emitted with each event.
// This acts as a channel between the receiver (.once() listener) to send
// promises back to the emitter.
const promiseCollector = (promise: Promise<unknown>) => {
commitPromises.push(promise);
};

// Create a promise that will be resolved when all commit handlers are executed
const commitPromise = new Promise<void>((resolve, reject) => {
setImmediate(() => {
// Emit the 'commit' event and collect promises
hook.emit('commit', promiseCollector);

// Once all handlers have been called, resolve the promise
Promise.all(commitPromises)
.then(() => resolve())
.catch((err) => {
reject(err);
})
.finally(() => {
// Always clean up after commit handlers, regardless of success/failure
hook.emit('end', undefined);
hook.removeAllListeners();
});
});
});

// Wait for all commit handlers to complete
await commitPromise;

return result;
} catch (err) {
setImmediate(() => {
Expand All @@ -43,7 +69,7 @@ export const runAndTriggerHooks = async (hook: EventEmitter, cb: () => unknown)
}
};

export const createEventEmitterInNewContext = (context: StorageDriver) => {
export const createEventEmitterInNewContext = () => {
const options = getTransactionalOptions();

const emitter = new EventEmitter();
Expand All @@ -52,7 +78,7 @@ export const createEventEmitterInNewContext = (context: StorageDriver) => {
};

export const runInNewHookContext = async (context: StorageDriver, cb: () => unknown) => {
const hook = createEventEmitterInNewContext(context);
const hook = createEventEmitterInNewContext();

return await context.run(() => {
setHookInContext(context, hook);
Expand All @@ -61,8 +87,18 @@ export const runInNewHookContext = async (context: StorageDriver, cb: () => unkn
});
};

export const runOnTransactionCommit = (cb: () => void) => {
getTransactionalContextHook().once('commit', cb);
export const runOnTransactionCommit = (cb: () => void | Promise<unknown>) => {
getTransactionalContextHook().once(
'commit',
(promiseCollector: (promise: Promise<unknown>) => void) => {
const result = cb();
// If the original callback returns a promise, we need to collect it
if (result && typeof result.then === 'function') {
promiseCollector(result);
}
return result;
},
);
};

export const runOnTransactionRollback = (cb: (e: Error) => void) => {
Expand Down
2 changes: 1 addition & 1 deletion src/transactions/wrap-in-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface WrapInTransactionOptions {
name?: string | symbol;
}

export const wrapInTransaction = <Fn extends (this: any, ...args: any[]) => ReturnType<Fn>>(
export const wrapInTransaction = <Fn extends (this: unknown, ...args: unknown[]) => ReturnType<Fn>>(
fn: Fn,
options?: WrapInTransactionOptions,
) => {
Expand Down
77 changes: 77 additions & 0 deletions tests/simple.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,83 @@ describe('Transactional', () => {
expect(rollbackSpy).not.toHaveBeenCalled();
expect(completeSpy).toHaveBeenCalledTimes(1);
});

it('should run async "runOnTransactionCommit" hook and wait for it to complete', async () => {
const userRepository = new UserRepository(dataSource);
let asyncOperationComplete = false;

await runInTransaction(async () => {
await userRepository.createUser('John Doe');

runOnTransactionCommit(async () => {
await sleep(100);
asyncOperationComplete = true;
});
});

// The transaction should have waited for the async hook to complete
expect(asyncOperationComplete).toBe(true);
});

it('should run multiple async "runOnTransactionCommit" hooks in parallel', async () => {
const userRepository = new UserRepository(dataSource);
const results: number[] = [];
const start = Date.now();

await runInTransaction(async () => {
await userRepository.createUser('John Doe');

// Add three async hooks that take different times to complete
runOnTransactionCommit(async () => {
await sleep(100);
results.push(1);
});

runOnTransactionCommit(async () => {
await sleep(200);
results.push(2);
});

runOnTransactionCommit(async () => {
await sleep(50);
results.push(3);
});
});

// All hooks should have completed
expect(results).toContain(1);
expect(results).toContain(2);
expect(results).toContain(3);

// Total time should be approximately the longest hook (200ms) plus some overhead
// Rather than sequential (350ms)
const elapsed = Date.now() - start;
expect(elapsed).toBeLessThan(300);
});

it('should bubble up rejection from async "runOnTransactionCommit" hook but transaction should commit', async () => {
const userRepository = new UserRepository(dataSource);
const expectedError = new Error('Async hook error');

// Should reject with our hook error but the transaction should still commit
// since the side effects are run post commit
await expect(
async () => {
await runInTransaction(async () => {
await userRepository.createUser('John Doe');

runOnTransactionCommit(async () => {
throw expectedError;
});
})

}).rejects.toThrow();

const user = await userRepository.findUserByName('John Doe');
expect(user).not.toBeNull();
expect(user?.name).toBe('John Doe');
});

});

describe('Isolation', () => {
Expand Down