Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mysql2

Package Overview
Dependencies
Maintainers
3
Versions
191
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mysql2 - npm Package Compare versions

Comparing version 3.11.4 to 3.11.5-canary.d5a76e6c

lib/base/connection.js

38

index.js

@@ -5,10 +5,8 @@ 'use strict';

const Connection = require('./lib/connection.js');
const ConnectionConfig = require('./lib/connection_config.js');
const parserCache = require('./lib/parsers/parser_cache');
const parserCache = require('./lib/parsers/parser_cache.js');
exports.createConnection = function(opts) {
return new Connection({ config: new ConnectionConfig(opts) });
};
const Connection = require('./lib/connection.js');
exports.createConnection = require('./lib/create_connection.js');
exports.connect = exports.createConnection;

@@ -20,12 +18,8 @@ exports.Connection = Connection;

const PoolCluster = require('./lib/pool_cluster.js');
const createPool = require('./lib/create_pool.js');
const createPoolCluster = require('./lib/create_pool_cluster.js');
exports.createPool = function(config) {
const PoolConfig = require('./lib/pool_config.js');
return new Pool({ config: new PoolConfig(config) });
};
exports.createPool = createPool;
exports.createPoolCluster = function(config) {
const PoolCluster = require('./lib/pool_cluster.js');
return new PoolCluster(config);
};
exports.createPoolCluster = createPoolCluster;

@@ -38,3 +32,3 @@ exports.createQuery = Connection.createQuery;

exports.createServer = function(handler) {
exports.createServer = function (handler) {
const Server = require('./lib/server.js');

@@ -48,3 +42,3 @@ const s = new Server();

exports.PoolConnection = require('./lib/pool_connection');
exports.PoolConnection = require('./lib/pool_connection.js');
exports.authPlugins = require('./lib/auth_plugins');

@@ -58,3 +52,3 @@ exports.escape = SqlString.escape;

'createConnectionPromise',
() => require('./promise.js').createConnection
() => require('./promise.js').createConnection,
);

@@ -64,3 +58,3 @@

'createPoolPromise',
() => require('./promise.js').createPool
() => require('./promise.js').createPool,
);

@@ -70,3 +64,3 @@

'createPoolClusterPromise',
() => require('./promise.js').createPoolCluster
() => require('./promise.js').createPoolCluster,
);

@@ -77,15 +71,15 @@

exports.__defineGetter__('Charsets', () =>
require('./lib/constants/charsets.js')
require('./lib/constants/charsets.js'),
);
exports.__defineGetter__('CharsetToEncoding', () =>
require('./lib/constants/charset_encodings.js')
require('./lib/constants/charset_encodings.js'),
);
exports.setMaxParserCache = function(max) {
exports.setMaxParserCache = function (max) {
parserCache.setMaxCache(max);
};
exports.clearParserCache = function() {
exports.clearParserCache = function () {
parserCache.clearCache();
};

@@ -1,948 +0,12 @@

// This file was modified by Oracle on June 1, 2021.
// The changes involve new logic to handle an additional ERR Packet sent by
// the MySQL server when the connection is closed unexpectedly.
// Modifications copyright (c) 2021, Oracle and/or its affiliates.
// This file was modified by Oracle on June 17, 2021.
// The changes involve logic to ensure the socket connection is closed when
// there is a fatal error.
// Modifications copyright (c) 2021, Oracle and/or its affiliates.
// This file was modified by Oracle on September 21, 2021.
// The changes involve passing additional authentication factor passwords
// to the ChangeUser Command instance.
// Modifications copyright (c) 2021, Oracle and/or its affiliates.
'use strict';
const Net = require('net');
const Tls = require('tls');
const Timers = require('timers');
const EventEmitter = require('events').EventEmitter;
const Readable = require('stream').Readable;
const Queue = require('denque');
const SqlString = require('sqlstring');
const { createLRU } = require('lru.min');
const BaseConnection = require('./base/connection.js');
const PacketParser = require('./packet_parser.js');
const Packets = require('./packets/index.js');
const Commands = require('./commands/index.js');
const ConnectionConfig = require('./connection_config.js');
const CharsetToEncoding = require('./constants/charset_encodings.js');
let _connectionId = 0;
let convertNamedPlaceholders = null;
class Connection extends EventEmitter {
constructor(opts) {
super();
this.config = opts.config;
// TODO: fill defaults
// if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
// if host is given, connect to host:3306
// TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
// if there is no host/port and no socketPath parameters?
if (!opts.config.stream) {
if (opts.config.socketPath) {
this.stream = Net.connect(opts.config.socketPath);
} else {
this.stream = Net.connect(
opts.config.port,
opts.config.host
);
// Optionally enable keep-alive on the socket.
if (this.config.enableKeepAlive) {
this.stream.on('connect', () => {
this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
});
}
// Enable TCP_NODELAY flag. This is needed so that the network packets
// are sent immediately to the server
this.stream.setNoDelay(true);
}
// if stream is a function, treat it as "stream agent / factory"
} else if (typeof opts.config.stream === 'function') {
this.stream = opts.config.stream(opts);
} else {
this.stream = opts.config.stream;
}
this._internalId = _connectionId++;
this._commands = new Queue();
this._command = null;
this._paused = false;
this._paused_packets = new Queue();
this._statements = createLRU({
max: this.config.maxPreparedStatements,
onEviction: function(_, statement) {
statement.close();
}
});
this.serverCapabilityFlags = 0;
this.authorized = false;
this.sequenceId = 0;
this.compressedSequenceId = 0;
this.threadId = null;
this._handshakePacket = null;
this._fatalError = null;
this._protocolError = null;
this._outOfOrderPackets = [];
this.clientEncoding = CharsetToEncoding[this.config.charsetNumber];
this.stream.on('error', this._handleNetworkError.bind(this));
// see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
this.packetParser = new PacketParser(p => {
this.handlePacket(p);
});
this.stream.on('data', data => {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.packetParser.execute(data);
});
this.stream.on('end', () => {
// emit the end event so that the pooled connection can close the connection
this.emit('end');
});
this.stream.on('close', () => {
// we need to set this flag everywhere where we want connection to close
if (this._closing) {
return;
}
if (!this._protocolError) {
// no particular error message before disconnect
this._protocolError = new Error(
'Connection lost: The server closed the connection.'
);
this._protocolError.fatal = true;
this._protocolError.code = 'PROTOCOL_CONNECTION_LOST';
}
this._notifyError(this._protocolError);
});
let handshakeCommand;
if (!this.config.isServer) {
handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
handshakeCommand.on('end', () => {
// this happens when handshake finishes early either because there was
// some fatal error or the server sent an error packet instead of
// an hello packet (for example, 'Too many connections' error)
if (!handshakeCommand.handshake || this._fatalError || this._protocolError) {
return;
}
this._handshakePacket = handshakeCommand.handshake;
this.threadId = handshakeCommand.handshake.connectionId;
this.emit('connect', handshakeCommand.handshake);
});
handshakeCommand.on('error', err => {
this._closing = true;
this._notifyError(err);
});
this.addCommand(handshakeCommand);
}
// in case there was no initial handshake but we need to read sting, assume it utf-8
// most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
// will be overwritten with actual encoding value as soon as server handshake packet is received
this.serverEncoding = 'utf8';
if (this.config.connectTimeout) {
const timeoutHandler = this._handleTimeoutError.bind(this);
this.connectTimeout = Timers.setTimeout(
timeoutHandler,
this.config.connectTimeout
);
}
}
class Connection extends BaseConnection {
promise(promiseImpl) {
const PromiseConnection = require('../promise').PromiseConnection;
const PromiseConnection = require('./promise/connection.js');
return new PromiseConnection(this, promiseImpl);
}
_addCommandClosedState(cmd) {
const err = new Error(
"Can't add new command when connection is in closed state"
);
err.fatal = true;
if (cmd.onResult) {
cmd.onResult(err);
} else {
this.emit('error', err);
}
}
_handleFatalError(err) {
err.fatal = true;
// stop receiving packets
this.stream.removeAllListeners('data');
this.addCommand = this._addCommandClosedState;
this.write = () => {
this.emit('error', new Error("Can't write in closed state"));
};
this._notifyError(err);
this._fatalError = err;
}
_handleNetworkError(err) {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
// Do not throw an error when a connection ends with a RST,ACK packet
if (err.code === 'ECONNRESET' && this._closing) {
return;
}
this._handleFatalError(err);
}
_handleTimeoutError() {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.stream.destroy && this.stream.destroy();
const err = new Error('connect ETIMEDOUT');
err.errorno = 'ETIMEDOUT';
err.code = 'ETIMEDOUT';
err.syscall = 'connect';
this._handleNetworkError(err);
}
// notify all commands in the queue and bubble error as connection "error"
// called on stream error or unexpected termination
_notifyError(err) {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
// prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET
if (this._fatalError) {
return;
}
let command;
// if there is no active command, notify connection
// if there are commands and all of them have callbacks, pass error via callback
let bubbleErrorToConnection = !this._command;
if (this._command && this._command.onResult) {
this._command.onResult(err);
this._command = null;
// connection handshake is special because we allow it to be implicit
// if error happened during handshake, but there are others commands in queue
// then bubble error to other commands and not to connection
} else if (
!(
this._command &&
this._command.constructor === Commands.ClientHandshake &&
this._commands.length > 0
)
) {
bubbleErrorToConnection = true;
}
while ((command = this._commands.shift())) {
if (command.onResult) {
command.onResult(err);
} else {
bubbleErrorToConnection = true;
}
}
// notify connection if some comands in the queue did not have callbacks
// or if this is pool connection ( so it can be removed from pool )
if (bubbleErrorToConnection || this._pool) {
this.emit('error', err);
}
// close connection after emitting the event in case of a fatal error
if (err.fatal) {
this.close();
}
}
write(buffer) {
const result = this.stream.write(buffer, err => {
if (err) {
this._handleNetworkError(err);
}
});
if (!result) {
this.stream.emit('pause');
}
}
// http://dev.mysql.com/doc/internals/en/sequence-id.html
//
// The sequence-id is incremented with each packet and may wrap around.
// It starts at 0 and is reset to 0 when a new command
// begins in the Command Phase.
// http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
_resetSequenceId() {
this.sequenceId = 0;
this.compressedSequenceId = 0;
}
_bumpCompressedSequenceId(numPackets) {
this.compressedSequenceId += numPackets;
this.compressedSequenceId %= 256;
}
_bumpSequenceId(numPackets) {
this.sequenceId += numPackets;
this.sequenceId %= 256;
}
writePacket(packet) {
const MAX_PACKET_LENGTH = 16777215;
const length = packet.length();
let chunk, offset, header;
if (length < MAX_PACKET_LENGTH) {
packet.writeHeader(this.sequenceId);
if (this.config.debug) {
// eslint-disable-next-line no-console
console.log(
`${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
);
// eslint-disable-next-line no-console
console.log(
`${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`
);
}
this._bumpSequenceId(1);
this.write(packet.buffer);
} else {
if (this.config.debug) {
// eslint-disable-next-line no-console
console.log(
`${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`
);
// eslint-disable-next-line no-console
console.log(
`${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
);
}
for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) {
chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH);
if (chunk.length === MAX_PACKET_LENGTH) {
header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]);
} else {
header = Buffer.from([
chunk.length & 0xff,
(chunk.length >> 8) & 0xff,
(chunk.length >> 16) & 0xff,
this.sequenceId
]);
}
this._bumpSequenceId(1);
this.write(header);
this.write(chunk);
}
}
}
// 0.11+ environment
startTLS(onSecure) {
if (this.config.debug) {
// eslint-disable-next-line no-console
console.log('Upgrading connection to TLS');
}
const secureContext = Tls.createSecureContext({
ca: this.config.ssl.ca,
cert: this.config.ssl.cert,
ciphers: this.config.ssl.ciphers,
key: this.config.ssl.key,
passphrase: this.config.ssl.passphrase,
minVersion: this.config.ssl.minVersion,
maxVersion: this.config.ssl.maxVersion
});
const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
const verifyIdentity = this.config.ssl.verifyIdentity;
const servername = this.config.host;
let secureEstablished = false;
this.stream.removeAllListeners('data');
const secureSocket = Tls.connect({
rejectUnauthorized,
requestCert: rejectUnauthorized,
checkServerIdentity: verifyIdentity
? Tls.checkServerIdentity
: function() { return undefined; },
secureContext,
isServer: false,
socket: this.stream,
servername
}, () => {
secureEstablished = true;
if (rejectUnauthorized) {
if (typeof servername === 'string' && verifyIdentity) {
const cert = secureSocket.getPeerCertificate(true);
const serverIdentityCheckError = Tls.checkServerIdentity(servername, cert);
if (serverIdentityCheckError) {
onSecure(serverIdentityCheckError);
return;
}
}
}
onSecure();
});
// error handler for secure socket
secureSocket.on('error', err => {
if (secureEstablished) {
this._handleNetworkError(err);
} else {
onSecure(err);
}
});
secureSocket.on('data', data => {
this.packetParser.execute(data);
});
this.write = buffer => secureSocket.write(buffer);
}
protocolError(message, code) {
// Starting with MySQL 8.0.24, if the client closes the connection
// unexpectedly, the server will send a last ERR Packet, which we can
// safely ignore.
// https://dev.mysql.com/worklog/task/?id=12999
if (this._closing) {
return;
}
const err = new Error(message);
err.fatal = true;
err.code = code || 'PROTOCOL_ERROR';
this.emit('error', err);
}
get fatalError() {
return this._fatalError;
}
handlePacket(packet) {
if (this._paused) {
this._paused_packets.push(packet);
return;
}
if (this.config.debug) {
if (packet) {
// eslint-disable-next-line no-console
console.log(
` raw: ${packet.buffer
.slice(packet.offset, packet.offset + packet.length())
.toString('hex')}`
);
// eslint-disable-next-line no-console
console.trace();
const commandName = this._command
? this._command._commandName
: '(no command)';
const stateName = this._command
? this._command.stateName()
: '(no command)';
// eslint-disable-next-line no-console
console.log(
`${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`
);
}
}
if (!this._command) {
const marker = packet.peekByte();
// If it's an Err Packet, we should use it.
if (marker === 0xff) {
const error = Packets.Error.fromPacket(packet);
this.protocolError(error.message, error.code);
} else {
// Otherwise, it means it's some other unexpected packet.
this.protocolError(
'Unexpected packet while no commands in the queue',
'PROTOCOL_UNEXPECTED_PACKET'
);
}
this.close();
return;
}
if (packet) {
// Note: when server closes connection due to inactivity, Err packet ER_CLIENT_INTERACTION_TIMEOUT from MySQL 8.0.24, sequenceId will be 0
if (this.sequenceId !== packet.sequenceId) {
const err = new Error(
`Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`
);
err.expected = this.sequenceId;
err.received = packet.sequenceId;
this.emit('warn', err); // REVIEW
// eslint-disable-next-line no-console
console.error(err.message);
}
this._bumpSequenceId(packet.numPackets);
}
try {
if (this._fatalError) {
// skip remaining packets after client is in the error state
return;
}
const done = this._command.execute(packet, this);
if (done) {
this._command = this._commands.shift();
if (this._command) {
this.sequenceId = 0;
this.compressedSequenceId = 0;
this.handlePacket();
}
}
} catch (err) {
this._handleFatalError(err);
this.stream.destroy();
}
}
addCommand(cmd) {
// this.compressedSequenceId = 0;
// this.sequenceId = 0;
if (this.config.debug) {
const commandName = cmd.constructor.name;
// eslint-disable-next-line no-console
console.log(`Add command: ${commandName}`);
cmd._commandName = commandName;
}
if (!this._command) {
this._command = cmd;
this.handlePacket();
} else {
this._commands.push(cmd);
}
return cmd;
}
format(sql, values) {
if (typeof this.config.queryFormat === 'function') {
return this.config.queryFormat.call(
this,
sql,
values,
this.config.timezone
);
}
const opts = {
sql: sql,
values: values
};
this._resolveNamedPlaceholders(opts);
return SqlString.format(
opts.sql,
opts.values,
this.config.stringifyObjects,
this.config.timezone
);
}
escape(value) {
return SqlString.escape(value, false, this.config.timezone);
}
escapeId(value) {
return SqlString.escapeId(value, false);
}
raw(sql) {
return SqlString.raw(sql);
}
_resolveNamedPlaceholders(options) {
let unnamed;
if (this.config.namedPlaceholders || options.namedPlaceholders) {
if (Array.isArray(options.values)) {
// if an array is provided as the values, assume the conversion is not necessary.
// this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled.
return
}
if (convertNamedPlaceholders === null) {
convertNamedPlaceholders = require('named-placeholders')();
}
unnamed = convertNamedPlaceholders(options.sql, options.values);
options.sql = unnamed[0];
options.values = unnamed[1];
}
}
query(sql, values, cb) {
let cmdQuery;
if (sql.constructor === Commands.Query) {
cmdQuery = sql;
} else {
cmdQuery = Connection.createQuery(sql, values, cb, this.config);
}
this._resolveNamedPlaceholders(cmdQuery);
const rawSql = this.format(cmdQuery.sql, cmdQuery.values !== undefined ? cmdQuery.values : []);
cmdQuery.sql = rawSql;
return this.addCommand(cmdQuery);
}
pause() {
this._paused = true;
this.stream.pause();
}
resume() {
let packet;
this._paused = false;
while ((packet = this._paused_packets.shift())) {
this.handlePacket(packet);
// don't resume if packet handler paused connection
if (this._paused) {
return;
}
}
this.stream.resume();
}
// TODO: named placeholders support
prepare(options, cb) {
if (typeof options === 'string') {
options = { sql: options };
}
return this.addCommand(new Commands.Prepare(options, cb));
}
unprepare(sql) {
let options = {};
if (typeof sql === 'object') {
options = sql;
} else {
options.sql = sql;
}
const key = Connection.statementKey(options);
const stmt = this._statements.get(key);
if (stmt) {
this._statements.delete(key);
stmt.close();
}
return stmt;
}
execute(sql, values, cb) {
let options = {
infileStreamFactory: this.config.infileStreamFactory
};
if (typeof sql === 'object') {
// execute(options, cb)
options = {
...options,
...sql,
sql: sql.sql,
values: sql.values
};
if (typeof values === 'function') {
cb = values;
} else {
options.values = options.values || values;
}
} else if (typeof values === 'function') {
// execute(sql, cb)
cb = values;
options.sql = sql;
options.values = undefined;
} else {
// execute(sql, values, cb)
options.sql = sql;
options.values = values;
}
this._resolveNamedPlaceholders(options);
// check for values containing undefined
if (options.values) {
//If namedPlaceholder is not enabled and object is passed as bind parameters
if (!Array.isArray(options.values)) {
throw new TypeError(
'Bind parameters must be array if namedPlaceholders parameter is not enabled'
);
}
options.values.forEach(val => {
//If namedPlaceholder is not enabled and object is passed as bind parameters
if (!Array.isArray(options.values)) {
throw new TypeError(
'Bind parameters must be array if namedPlaceholders parameter is not enabled'
);
}
if (val === undefined) {
throw new TypeError(
'Bind parameters must not contain undefined. To pass SQL NULL specify JS null'
);
}
if (typeof val === 'function') {
throw new TypeError(
'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first'
);
}
});
}
const executeCommand = new Commands.Execute(options, cb);
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
// skip execute command if prepare failed, we have main
// combined callback here
executeCommand.start = function() {
return null;
};
if (cb) {
cb(err);
} else {
executeCommand.emit('error', err);
}
executeCommand.emit('end');
return;
}
executeCommand.statement = stmt;
});
this.addCommand(prepareCommand);
this.addCommand(executeCommand);
return executeCommand;
}
changeUser(options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
const charsetNumber = options.charset
? ConnectionConfig.getCharsetNumber(options.charset)
: this.config.charsetNumber;
return this.addCommand(
new Commands.ChangeUser(
{
user: options.user || this.config.user,
// for the purpose of multi-factor authentication, or not, the main
// password (used for the 1st authentication factor) can also be
// provided via the "password1" option
password: options.password || options.password1 || this.config.password || this.config.password1,
password2: options.password2 || this.config.password2,
password3: options.password3 || this.config.password3,
passwordSha1: options.passwordSha1 || this.config.passwordSha1,
database: options.database || this.config.database,
timeout: options.timeout,
charsetNumber: charsetNumber,
currentConfig: this.config
},
err => {
if (err) {
err.fatal = true;
}
if (callback) {
callback(err);
}
}
)
);
}
// transaction helpers
beginTransaction(cb) {
return this.query('START TRANSACTION', cb);
}
commit(cb) {
return this.query('COMMIT', cb);
}
rollback(cb) {
return this.query('ROLLBACK', cb);
}
ping(cb) {
return this.addCommand(new Commands.Ping(cb));
}
_registerSlave(opts, cb) {
return this.addCommand(new Commands.RegisterSlave(opts, cb));
}
_binlogDump(opts, cb) {
return this.addCommand(new Commands.BinlogDump(opts, cb));
}
// currently just alias to close
destroy() {
this.close();
}
close() {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this._closing = true;
this.stream.end();
this.addCommand = this._addCommandClosedState;
}
createBinlogStream(opts) {
// TODO: create proper stream class
// TODO: use through2
let test = 1;
const stream = new Readable({ objectMode: true });
stream._read = function() {
return {
data: test++
};
};
this._registerSlave(opts, () => {
const dumpCmd = this._binlogDump(opts);
dumpCmd.on('event', ev => {
stream.push(ev);
});
dumpCmd.on('eof', () => {
stream.push(null);
// if non-blocking, then close stream to prevent errors
if (opts.flags && opts.flags & 0x01) {
this.close();
}
});
// TODO: pipe errors as well
});
return stream;
}
connect(cb) {
if (!cb) {
return;
}
if (this._fatalError || this._protocolError) {
return cb(this._fatalError || this._protocolError);
}
if (this._handshakePacket) {
return cb(null, this);
}
let connectCalled = 0;
function callbackOnce(isErrorHandler) {
return function(param) {
if (!connectCalled) {
if (isErrorHandler) {
cb(param);
} else {
cb(null, param);
}
}
connectCalled = 1;
};
}
this.once('error', callbackOnce(true));
this.once('connect', callbackOnce(false));
}
// ===================================
// outgoing server connection methods
// ===================================
writeColumns(columns) {
this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
columns.forEach(column => {
this.writePacket(
Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding)
);
});
this.writeEof();
}
// row is array of columns, not hash
writeTextRow(column) {
this.writePacket(
Packets.TextRow.toPacket(column, this.serverConfig.encoding)
);
}
writeBinaryRow(column) {
this.writePacket(
Packets.BinaryRow.toPacket(column, this.serverConfig.encoding)
);
}
writeTextResult(rows, columns, binary=false) {
this.writeColumns(columns);
rows.forEach(row => {
const arrayRow = new Array(columns.length);
columns.forEach(column => {
arrayRow.push(row[column.name]);
});
if(binary) {
this.writeBinaryRow(arrayRow);
}
else this.writeTextRow(arrayRow);
});
this.writeEof();
}
writeEof(warnings, statusFlags) {
this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
}
writeOk(args) {
if (!args) {
args = { affectedRows: 0 };
}
this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding));
}
writeError(args) {
// if we want to send error before initial hello was sent, use default encoding
const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8';
this.writePacket(Packets.Error.toPacket(args, encoding));
}
serverHandshake(args) {
this.serverConfig = args;
this.serverConfig.encoding =
CharsetToEncoding[this.serverConfig.characterSet];
return this.addCommand(new Commands.ServerHandshake(args));
}
// ===============================================================
end(callback) {
if (this.config.isServer) {
this._closing = true;
const quitCmd = new EventEmitter();
setImmediate(() => {
this.stream.end();
quitCmd.emit('end');
});
return quitCmd;
}
// trigger error if more commands enqueued after end command
const quitCmd = this.addCommand(new Commands.Quit(callback));
this.addCommand = this._addCommandClosedState;
return quitCmd;
}
static createQuery(sql, values, cb, config) {
let options = {
rowsAsArray: config.rowsAsArray,
infileStreamFactory: config.infileStreamFactory
};
if (typeof sql === 'object') {
// query(options, cb)
options = {
...options,
...sql,
sql: sql.sql,
values: sql.values
};
if (typeof values === 'function') {
cb = values;
} else if (values !== undefined) {
options.values = values;
}
} else if (typeof values === 'function') {
// query(sql, cb)
cb = values;
options.sql = sql;
options.values = undefined;
} else {
// query(sql, values, cb)
options.sql = sql;
options.values = values;
}
return new Commands.Query(options, cb);
}
static statementKey(options) {
return (
`${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`
);
}
}
module.exports = Connection;
'use strict';
const Connection = require('../index.js').Connection;
const BasePoolConnection = require('./base/pool_connection.js');
class PoolConnection extends Connection {
constructor(pool, options) {
super(options);
this._pool = pool;
// The last active time of this connection
this.lastActiveTime = Date.now();
// When a fatal error occurs the connection's protocol ends, which will cause
// the connection to end as well, thus we only need to watch for the end event
// and we will be notified of disconnects.
// REVIEW: Moved to `once`
this.once('end', () => {
this._removeFromPool();
});
this.once('error', () => {
this._removeFromPool();
});
}
release() {
if (!this._pool || this._pool._closed) {
return;
}
// update last active time
this.lastActiveTime = Date.now();
this._pool.releaseConnection(this);
}
class PoolConnection extends BasePoolConnection {
promise(promiseImpl) {
const PromisePoolConnection = require('../promise').PromisePoolConnection;
const PromisePoolConnection = require('./promise/pool_connection.js');
return new PromisePoolConnection(this, promiseImpl);
}
end() {
const err = new Error(
'Calling conn.end() to release a pooled connection is ' +
'deprecated. In next version calling conn.end() will be ' +
'restored to default conn.end() behavior. Use ' +
'conn.release() instead.'
);
this.emit('warn', err);
// eslint-disable-next-line no-console
console.warn(err.message);
this.release();
}
destroy() {
this._removeFromPool();
super.destroy();
}
_removeFromPool() {
if (!this._pool || this._pool._closed) {
return;
}
const pool = this._pool;
this._pool = null;
pool._removeConnection(this);
}
}
PoolConnection.statementKey = Connection.statementKey;
module.exports = PoolConnection;
// TODO: Remove this when we are removing PoolConnection#end
PoolConnection.prototype._realEnd = Connection.prototype.end;
'use strict';
const process = require('process');
const mysql = require('../index.js');
const BasePool = require('./base/pool.js');
const EventEmitter = require('events').EventEmitter;
const PoolConnection = require('./pool_connection.js');
const Queue = require('denque');
const Connection = require('./connection.js');
function spliceConnection(queue, connection) {
const len = queue.length;
for (let i = 0; i < len; i++) {
if (queue.get(i) === connection) {
queue.removeOne(i);
break;
}
}
}
class Pool extends EventEmitter {
constructor(options) {
super();
this.config = options.config;
this.config.connectionConfig.pool = this;
this._allConnections = new Queue();
this._freeConnections = new Queue();
this._connectionQueue = new Queue();
this._closed = false;
if (this.config.maxIdle < this.config.connectionLimit) {
// create idle connection timeout automatically release job
this._removeIdleTimeoutConnections();
}
}
class Pool extends BasePool {
promise(promiseImpl) {
const PromisePool = require('../promise').PromisePool;
const PromisePool = require('./promise/pool.js');
return new PromisePool(this, promiseImpl);
}
getConnection(cb) {
if (this._closed) {
return process.nextTick(() => cb(new Error('Pool is closed.')));
}
let connection;
if (this._freeConnections.length > 0) {
connection = this._freeConnections.pop();
this.emit('acquire', connection);
return process.nextTick(() => cb(null, connection));
}
if (
this.config.connectionLimit === 0 ||
this._allConnections.length < this.config.connectionLimit
) {
connection = new PoolConnection(this, {
config: this.config.connectionConfig
});
this._allConnections.push(connection);
return connection.connect(err => {
if (this._closed) {
return cb(new Error('Pool is closed.'));
}
if (err) {
return cb(err);
}
this.emit('connection', connection);
this.emit('acquire', connection);
return cb(null, connection);
});
}
if (!this.config.waitForConnections) {
return process.nextTick(() => cb(new Error('No connections available.')));
}
if (
this.config.queueLimit &&
this._connectionQueue.length >= this.config.queueLimit
) {
return cb(new Error('Queue limit reached.'));
}
this.emit('enqueue');
return this._connectionQueue.push(cb);
}
releaseConnection(connection) {
let cb;
if (!connection._pool) {
// The connection has been removed from the pool and is no longer good.
if (this._connectionQueue.length) {
cb = this._connectionQueue.shift();
process.nextTick(this.getConnection.bind(this, cb));
}
} else if (this._connectionQueue.length) {
cb = this._connectionQueue.shift();
process.nextTick(cb.bind(null, null, connection));
} else {
this._freeConnections.push(connection);
this.emit('release', connection);
}
}
end(cb) {
this._closed = true;
clearTimeout(this._removeIdleTimeoutConnectionsTimer);
if (typeof cb !== 'function') {
cb = function(err) {
if (err) {
throw err;
}
};
}
let calledBack = false;
let closedConnections = 0;
let connection;
const endCB = function(err) {
if (calledBack) {
return;
}
if (err || ++closedConnections >= this._allConnections.length) {
calledBack = true;
cb(err);
return;
}
}.bind(this);
if (this._allConnections.length === 0) {
endCB();
return;
}
for (let i = 0; i < this._allConnections.length; i++) {
connection = this._allConnections.get(i);
connection._realEnd(endCB);
}
}
query(sql, values, cb) {
const cmdQuery = Connection.createQuery(
sql,
values,
cb,
this.config.connectionConfig
);
if (typeof cmdQuery.namedPlaceholders === 'undefined') {
cmdQuery.namedPlaceholders = this.config.connectionConfig.namedPlaceholders;
}
this.getConnection((err, conn) => {
if (err) {
if (typeof cmdQuery.onResult === 'function') {
cmdQuery.onResult(err);
} else {
cmdQuery.emit('error', err);
}
return;
}
try {
conn.query(cmdQuery).once('end', () => {
conn.release();
});
} catch (e) {
conn.release();
throw e;
}
});
return cmdQuery;
}
execute(sql, values, cb) {
// TODO construct execute command first here and pass it to connection.execute
// so that polymorphic arguments logic is there in one place
if (typeof values === 'function') {
cb = values;
values = [];
}
this.getConnection((err, conn) => {
if (err) {
return cb(err);
}
try {
conn.execute(sql, values, cb).once('end', () => {
conn.release();
});
} catch (e) {
conn.release();
return cb(e);
}
});
}
_removeConnection(connection) {
// Remove connection from all connections
spliceConnection(this._allConnections, connection);
// Remove connection from free connections
spliceConnection(this._freeConnections, connection);
this.releaseConnection(connection);
}
_removeIdleTimeoutConnections() {
if (this._removeIdleTimeoutConnectionsTimer) {
clearTimeout(this._removeIdleTimeoutConnectionsTimer);
}
this._removeIdleTimeoutConnectionsTimer = setTimeout(() => {
try {
while (
this._freeConnections.length > this.config.maxIdle ||
(this._freeConnections.length > 0 &&
Date.now() - this._freeConnections.get(0).lastActiveTime >
this.config.idleTimeout)
) {
this._freeConnections.get(0).destroy();
}
} finally {
this._removeIdleTimeoutConnections();
}
}, 1000);
}
format(sql, values) {
return mysql.format(
sql,
values,
this.config.connectionConfig.stringifyObjects,
this.config.connectionConfig.timezone
);
}
escape(value) {
return mysql.escape(
value,
this.config.connectionConfig.stringifyObjects,
this.config.connectionConfig.timezone
);
}
escapeId(value) {
return mysql.escapeId(value, false);
}
}
module.exports = Pool;
{
"name": "mysql2",
"version": "3.11.4",
"version": "3.11.5-canary.d5a76e6c",
"description": "fast mysql driver. Implements core protocol, prepared statements, ssl and compression in native JS",

@@ -5,0 +5,0 @@ "main": "index.js",

'use strict';
const core = require('./index.js');
const SqlString = require('sqlstring');
const EventEmitter = require('events').EventEmitter;
const parserCache = require('./lib/parsers/parser_cache.js');
const PoolCluster = require('./lib/pool_cluster.js');
const createConnection = require('./lib/create_connection.js');
const createPool = require('./lib/create_pool.js');
const createPoolCluster = require('./lib/create_pool_cluster.js');
const PromiseConnection = require('./lib/promise/connection.js');
const PromisePool = require('./lib/promise/pool.js');
const makeDoneCb = require('./lib/promise/make_done_cb.js');
const PromisePoolConnection = require('./lib/promise/pool_connection.js');
const inheritEvents = require('./lib/promise/inherit_events.js');
function makeDoneCb(resolve, reject, localErr) {
return function (err, rows, fields) {
if (err) {
localErr.message = err.message;
localErr.code = err.code;
localErr.errno = err.errno;
localErr.sql = err.sql;
localErr.sqlState = err.sqlState;
localErr.sqlMessage = err.sqlMessage;
reject(localErr);
} else {
resolve([rows, fields]);
}
};
}
function inheritEvents(source, target, events) {
const listeners = {};
target
.on('newListener', eventName => {
if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) {
source.on(
eventName,
(listeners[eventName] = function () {
const args = [].slice.call(arguments);
args.unshift(eventName);
target.emit.apply(target, args);
})
);
}
})
.on('removeListener', eventName => {
if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) {
source.removeListener(eventName, listeners[eventName]);
delete listeners[eventName];
}
});
}
class PromisePreparedStatementInfo {
constructor(statement, promiseImpl) {
this.statement = statement;
this.Promise = promiseImpl;
}
execute(parameters) {
const s = this.statement;
const localErr = new Error();
return new this.Promise((resolve, reject) => {
const done = makeDoneCb(resolve, reject, localErr);
if (parameters) {
s.execute(parameters, done);
} else {
s.execute(done);
}
});
}
close() {
return new this.Promise(resolve => {
this.statement.close();
resolve();
});
}
}
class PromiseConnection extends EventEmitter {
constructor(connection, promiseImpl) {
super();
this.connection = connection;
this.Promise = promiseImpl || Promise;
inheritEvents(connection, this, [
'error',
'drain',
'connect',
'end',
'enqueue'
]);
}
release() {
this.connection.release();
}
query(query, params) {
const c = this.connection;
const localErr = new Error();
if (typeof params === 'function') {
throw new Error(
'Callback function is not available with promise clients.'
);
}
return new this.Promise((resolve, reject) => {
const done = makeDoneCb(resolve, reject, localErr);
if (params !== undefined) {
c.query(query, params, done);
} else {
c.query(query, done);
}
});
}
execute(query, params) {
const c = this.connection;
const localErr = new Error();
if (typeof params === 'function') {
throw new Error(
'Callback function is not available with promise clients.'
);
}
return new this.Promise((resolve, reject) => {
const done = makeDoneCb(resolve, reject, localErr);
if (params !== undefined) {
c.execute(query, params, done);
} else {
c.execute(query, done);
}
});
}
end() {
return new this.Promise(resolve => {
this.connection.end(resolve);
});
}
beginTransaction() {
const c = this.connection;
const localErr = new Error();
return new this.Promise((resolve, reject) => {
const done = makeDoneCb(resolve, reject, localErr);
c.beginTransaction(done);
});
}
commit() {
const c = this.connection;
const localErr = new Error();
return new this.Promise((resolve, reject) => {
const done = makeDoneCb(resolve, reject, localErr);
c.commit(done);
});
}
rollback() {
const c = this.connection;
const localErr = new Error();
return new this.Promise((resolve, reject) => {
const done = makeDoneCb(resolve, reject, localErr);
c.rollback(done);
});
}
ping() {
const c = this.connection;
const localErr = new Error();
return new this.Promise((resolve, reject) => {
c.ping(err => {
if (err) {
localErr.message = err.message;
localErr.code = err.code;
localErr.errno = err.errno;
localErr.sqlState = err.sqlState;
localErr.sqlMessage = err.sqlMessage;
reject(localErr);
} else {
resolve(true);
}
});
});
}
connect() {
const c = this.connection;
const localErr = new Error();
return new this.Promise((resolve, reject) => {
c.connect((err, param) => {
if (err) {
localErr.message = err.message;
localErr.code = err.code;
localErr.errno = err.errno;
localErr.sqlState = err.sqlState;
localErr.sqlMessage = err.sqlMessage;
reject(localErr);
} else {
resolve(param);
}
});
});
}
prepare(options) {
const c = this.connection;
const promiseImpl = this.Promise;
const localErr = new Error();
return new this.Promise((resolve, reject) => {
c.prepare(options, (err, statement) => {
if (err) {
localErr.message = err.message;
localErr.code = err.code;
localErr.errno = err.errno;
localErr.sqlState = err.sqlState;
localErr.sqlMessage = err.sqlMessage;
reject(localErr);
} else {
const wrappedStatement = new PromisePreparedStatementInfo(
statement,
promiseImpl
);
resolve(wrappedStatement);
}
});
});
}
changeUser(options) {
const c = this.connection;
const localErr = new Error();
return new this.Promise((resolve, reject) => {
c.changeUser(options, err => {
if (err) {
localErr.message = err.message;
localErr.code = err.code;
localErr.errno = err.errno;
localErr.sqlState = err.sqlState;
localErr.sqlMessage = err.sqlMessage;
reject(localErr);
} else {
resolve();
}
});
});
}
get config() {
return this.connection.config;
}
get threadId() {
return this.connection.threadId;
}
}
function createConnection(opts) {
const coreConnection = core.createConnection(opts);
function createConnectionPromise(opts) {
const coreConnection = createConnection(opts);
const createConnectionErr = new Error();

@@ -258,4 +23,4 @@ const thePromise = opts.Promise || Promise;

'no Promise implementation available.' +
'Use promise-enabled node version or pass userland Promise' +
" implementation as parameter, for example: { Promise: require('bluebird') }"
'Use promise-enabled node version or pass userland Promise' +
" implementation as parameter, for example: { Promise: require('bluebird') }",
);

@@ -267,3 +32,3 @@ }

});
coreConnection.once('error', err => {
coreConnection.once('error', (err) => {
createConnectionErr.message = err.message;

@@ -281,135 +46,4 @@ createConnectionErr.code = err.code;

// patching PromiseConnection
// create facade functions for prototype functions on "Connection" that are not yet
// implemented with PromiseConnection
// proxy synchronous functions only
(function (functionsToWrap) {
for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
const func = functionsToWrap[i];
if (
typeof core.Connection.prototype[func] === 'function' &&
PromiseConnection.prototype[func] === undefined
) {
PromiseConnection.prototype[func] = (function factory(funcName) {
return function () {
return core.Connection.prototype[funcName].apply(
this.connection,
arguments
);
};
})(func);
}
}
})([
// synchronous functions
'close',
'createBinlogStream',
'destroy',
'escape',
'escapeId',
'format',
'pause',
'pipe',
'resume',
'unprepare'
]);
class PromisePoolConnection extends PromiseConnection {
constructor(connection, promiseImpl) {
super(connection, promiseImpl);
}
destroy() {
return core.PoolConnection.prototype.destroy.apply(
this.connection,
arguments
);
}
}
class PromisePool extends EventEmitter {
constructor(pool, thePromise) {
super();
this.pool = pool;
this.Promise = thePromise || Promise;
inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']);
}
getConnection() {
const corePool = this.pool;
return new this.Promise((resolve, reject) => {
corePool.getConnection((err, coreConnection) => {
if (err) {
reject(err);
} else {
resolve(new PromisePoolConnection(coreConnection, this.Promise));
}
});
});
}
releaseConnection(connection) {
if (connection instanceof PromisePoolConnection) connection.release();
}
query(sql, args) {
const corePool = this.pool;
const localErr = new Error();
if (typeof args === 'function') {
throw new Error(
'Callback function is not available with promise clients.'
);
}
return new this.Promise((resolve, reject) => {
const done = makeDoneCb(resolve, reject, localErr);
if (args !== undefined) {
corePool.query(sql, args, done);
} else {
corePool.query(sql, done);
}
});
}
execute(sql, args) {
const corePool = this.pool;
const localErr = new Error();
if (typeof args === 'function') {
throw new Error(
'Callback function is not available with promise clients.'
);
}
return new this.Promise((resolve, reject) => {
const done = makeDoneCb(resolve, reject, localErr);
if (args) {
corePool.execute(sql, args, done);
} else {
corePool.execute(sql, done);
}
});
}
end() {
const corePool = this.pool;
const localErr = new Error();
return new this.Promise((resolve, reject) => {
corePool.end(err => {
if (err) {
localErr.message = err.message;
localErr.code = err.code;
localErr.errno = err.errno;
localErr.sqlState = err.sqlState;
localErr.sqlMessage = err.sqlMessage;
reject(localErr);
} else {
resolve();
}
});
});
}
}
function createPool(opts) {
const corePool = core.createPool(opts);
function createPromisePool(opts) {
const corePool = createPool(opts);
const thePromise = opts.Promise || Promise;

@@ -419,4 +53,4 @@ if (!thePromise) {

'no Promise implementation available.' +
'Use promise-enabled node version or pass userland Promise' +
" implementation as parameter, for example: { Promise: require('bluebird') }"
'Use promise-enabled node version or pass userland Promise' +
" implementation as parameter, for example: { Promise: require('bluebird') }",
);

@@ -428,24 +62,2 @@ }

(function (functionsToWrap) {
for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
const func = functionsToWrap[i];
if (
typeof core.Pool.prototype[func] === 'function' &&
PromisePool.prototype[func] === undefined
) {
PromisePool.prototype[func] = (function factory(funcName) {
return function () {
return core.Pool.prototype[funcName].apply(this.pool, arguments);
};
})(func);
}
}
})([
// synchronous functions
'escape',
'escapeId',
'format'
]);
class PromisePoolCluster extends EventEmitter {

@@ -462,9 +74,13 @@ constructor(poolCluster, thePromise) {

return new this.Promise((resolve, reject) => {
corePoolCluster.getConnection(pattern, selector, (err, coreConnection) => {
if (err) {
reject(err);
} else {
resolve(new PromisePoolConnection(coreConnection, this.Promise));
}
});
corePoolCluster.getConnection(
pattern,
selector,
(err, coreConnection) => {
if (err) {
reject(err);
} else {
resolve(new PromisePoolConnection(coreConnection, this.Promise));
}
},
);
});

@@ -478,3 +94,3 @@ }

throw new Error(
'Callback function is not available with promise clients.'
'Callback function is not available with promise clients.',
);

@@ -493,3 +109,3 @@ }

throw new Error(
'Callback function is not available with promise clients.'
'Callback function is not available with promise clients.',
);

@@ -506,3 +122,3 @@ }

this.poolCluster.of(pattern, selector),
this.Promise
this.Promise,
);

@@ -515,3 +131,3 @@ }

return new this.Promise((resolve, reject) => {
corePoolCluster.end(err => {
corePoolCluster.end((err) => {
if (err) {

@@ -540,3 +156,3 @@ localErr.message = err.message;

if (
typeof core.PoolCluster.prototype[func] === 'function' &&
typeof PoolCluster.prototype[func] === 'function' &&
PromisePoolCluster.prototype[func] === undefined

@@ -546,3 +162,6 @@ ) {

return function () {
return core.PoolCluster.prototype[funcName].apply(this.poolCluster, arguments);
return PoolCluster.prototype[funcName].apply(
this.poolCluster,
arguments,
);
};

@@ -552,8 +171,6 @@ })(func);

}
})([
'add'
]);
})(['add']);
function createPoolCluster(opts) {
const corePoolCluster = core.createPoolCluster(opts);
function createPromisePoolCluster(opts) {
const corePoolCluster = createPoolCluster(opts);
const thePromise = (opts && opts.Promise) || Promise;

@@ -563,4 +180,4 @@ if (!thePromise) {

'no Promise implementation available.' +
'Use promise-enabled node version or pass userland Promise' +
" implementation as parameter, for example: { Promise: require('bluebird') }"
'Use promise-enabled node version or pass userland Promise' +
" implementation as parameter, for example: { Promise: require('bluebird') }",
);

@@ -571,9 +188,9 @@ }

exports.createConnection = createConnection;
exports.createPool = createPool;
exports.createPoolCluster = createPoolCluster;
exports.escape = core.escape;
exports.escapeId = core.escapeId;
exports.format = core.format;
exports.raw = core.raw;
exports.createConnection = createConnectionPromise;
exports.createPool = createPromisePool;
exports.createPoolCluster = createPromisePoolCluster;
exports.escape = SqlString.escape;
exports.escapeId = SqlString.escapeId;
exports.format = SqlString.format;
exports.raw = SqlString.raw;
exports.PromisePool = PromisePool;

@@ -586,15 +203,15 @@ exports.PromiseConnection = PromiseConnection;

exports.__defineGetter__('Charsets', () =>
require('./lib/constants/charsets.js')
require('./lib/constants/charsets.js'),
);
exports.__defineGetter__('CharsetToEncoding', () =>
require('./lib/constants/charset_encodings.js')
require('./lib/constants/charset_encodings.js'),
);
exports.setMaxParserCache = function(max) {
exports.setMaxParserCache = function (max) {
parserCache.setMaxCache(max);
};
exports.clearParserCache = function() {
exports.clearParserCache = function () {
parserCache.clearCache();
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc