Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 84 additions & 6 deletions lib/base/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const Packets = require('../packets/index.js');
const Commands = require('../commands/index.js');
const ConnectionConfig = require('../connection_config.js');
const CharsetToEncoding = require('../constants/charset_encodings.js');
const { ER_UNKNOWN_STMT_HANDLER } = require('../constants/errors.js');
const {
traceCallback,
tracePromise,
Expand All @@ -38,6 +39,10 @@ const {
connectChannel,
} = require('../tracing.js');

const returnNull = function () {
return null;
};

let _connectionId = 0;

let convertNamedPlaceholders = null;
Expand Down Expand Up @@ -801,11 +806,8 @@ class BaseConnection extends EventEmitter {
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
// skip execute command if prepare failed
executeCommand.start = function () {
return null;
};
executeCommand.next = returnNull;
errorCb(err);
executeCommand.emit('end');
return;
}
executeCommand.statement = stmt;
Expand All @@ -814,13 +816,49 @@ class BaseConnection extends EventEmitter {
this.addCommand(executeCommand);
};

// We need to intercept and retry prepareAndExecute if we had a stale prepared statement in the cache that the server already released
const key = BaseConnection.statementKey(options);
const cacheHasKey = this._statements.has(key);

if (executeCommand.onResult) {
// Callback mode: traceCallback wraps the callback with tracing lifecycle, or calls through directly when no subscribers are registered
const origExecCb = executeCommand.onResult;
traceCallback(
executeChannel,
(wrappedCb) => {
executeCommand.onResult = wrappedCb;
if (cacheHasKey) {
executeCommand.onResult = (err, ...rest) => {
if (err && err.errno === ER_UNKNOWN_STMT_HANDLER) {
const origEmit = executeCommand.emit.bind(executeCommand);
executeCommand.emit = (eventName, ...rest) => {
if (eventName === 'end') {
// Intercept the 'end' event that will be emitted after this 'error' event is emitted
executeCommand.emit = origEmit;
return false;
}
// In this case there currently will not be any other events emitted before 'end', but leaving
// this here in case that changes in the future...
return origEmit(eventName, ...rest);
};

// Listeners may have been added to this execute command, so we reuse it
executeCommand.next = null;

// We know that the statement does not exist on the server, so there is no need to close it
executeCommand.statement.close = returnNull;
this._statements.delete(key);

executeCommand.onResult = wrappedCb;
prepareAndExecute(wrappedCb);
return;
}

wrappedCb(err, ...rest);
};
} else {
executeCommand.onResult = wrappedCb;
}

prepareAndExecute(wrappedCb);
},
0,
Expand All @@ -837,7 +875,47 @@ class BaseConnection extends EventEmitter {
null,
origExecCb
);
} else if (shouldTrace(executeChannel)) {

return executeCommand;
}

if (cacheHasKey) {
const origEmit = executeCommand.emit.bind(executeCommand);
executeCommand.emit = (eventName, firstArg, ...rest) => {
if (
eventName === 'error' &&
firstArg &&
firstArg.errno === ER_UNKNOWN_STMT_HANDLER
) {
executeCommand.emit = (eventName, ...rest) => {
if (eventName === 'end') {
// Intercept the 'end' event that will be emitted after this 'error' event is emitted
executeCommand.emit = origEmit;
return false;
}
// In this case there currently will not be any other events emitted before 'end', but leaving
// this here in case that changes in the future...
return origEmit(eventName, ...rest);
};

// Listeners may have been added to this execute command, so we reuse it
executeCommand.next = null;

// We know that the statement does not exist on the server, so there is no need to close it
executeCommand.statement.close = returnNull;
this._statements.delete(key);

prepareAndExecute((err) => {
executeCommand.emit('error', err);
});
return false;
}

return origEmit(eventName, firstArg, ...rest);
};
}

if (shouldTrace(executeChannel)) {
// Event-emitter mode: tracePromise wraps the async lifecycle
tracePromise(
executeChannel,
Expand Down
Loading
Loading