- function mysql.Connection (options)
- function mysql.ConnectionConfig (options)
- function mysql.Pool (options)
- function mysql.PoolCluster (config)
- function mysql.PoolConfig (options)
- function mysql.PoolConnection (pool, options)
- function mysql.PoolNamespace (cluster, pattern, selector)
- function mysql.createConnection (config)
- function mysql.createPool (config)
- function mysql.createPoolCluster (config)
- function mysql.createQuery (sql, values, callback)
- function mysql.escape (value, stringifyObjects, timeZone)
- function mysql.escapeId (value, forbidQualified)
- function mysql.format (sql, values, stringifyObjects, timeZone)
- object mysql.Connection.prototype
- object mysql.Pool.prototype
- object mysql.PoolCluster.prototype
- object mysql.PoolConfig.prototype
- object mysql.PoolConnection.prototype
- object mysql.PoolNamespace.prototype
- object mysql.PoolSelector
- function mysql.Connection.prototype._handleConnectTimeout ()
- function mysql.Connection.prototype._handleNetworkError (err)
- function mysql.Connection.prototype._handleProtocolConnect ()
- function mysql.Connection.prototype._handleProtocolDrain ()
- function mysql.Connection.prototype._handleProtocolEnd (err)
- function mysql.Connection.prototype._handleProtocolEnqueue (sequence)
- function mysql.Connection.prototype._handleProtocolError (err)
- function mysql.Connection.prototype._handleProtocolHandshake (packet)
- function mysql.Connection.prototype._implyConnect ()
- function mysql.Connection.prototype._startTLS (onSecure)
- function mysql.Connection.prototype.beginTransaction (options, callback)
- function mysql.Connection.prototype.changeUser (options, callback)
- function mysql.Connection.prototype.commit (options, callback)
- function mysql.Connection.prototype.connect (options, callback)
- function mysql.Connection.prototype.destroy ()
- function mysql.Connection.prototype.end (options, callback)
- function mysql.Connection.prototype.escape (value)
- function mysql.Connection.prototype.escapeId (value)
- function mysql.Connection.prototype.format (sql, values)
- function mysql.Connection.prototype.pause ()
- function mysql.Connection.prototype.ping (options, callback)
- function mysql.Connection.prototype.query (sql, values, cb)
- function mysql.Connection.prototype.resume ()
- function mysql.Connection.prototype.rollback (options, callback)
- function mysql.Connection.prototype.statistics (options, callback)
- description and source-code
function Connection(options) {
Events.EventEmitter.call(this);
this.config = options.config;
this._socket = options.socket;
this._protocol = new Protocol({config: this.config, connection: this});
this._connectCalled = false;
this.state = 'disconnected';
this.threadId = null;
}
n/a
- description and source-code
function ConnectionConfig(options) {
if (typeof options === 'string') {
options = ConnectionConfig.parseUrl(options);
}
this.host = options.host || 'localhost';
this.port = options.port || 3306;
this.localAddress = options.localAddress;
this.socketPath = options.socketPath;
this.user = options.user || undefined;
this.password = options.password || undefined;
this.database = options.database;
this.connectTimeout = (options.connectTimeout === undefined)
? (10 * 1000)
: options.connectTimeout;
this.insecureAuth = options.insecureAuth || false;
this.supportBigNumbers = options.supportBigNumbers || false;
this.bigNumberStrings = options.bigNumberStrings || false;
this.dateStrings = options.dateStrings || false;
this.debug = options.debug;
this.trace = options.trace !== false;
this.stringifyObjects = options.stringifyObjects || false;
this.timezone = options.timezone || 'local';
this.flags = options.flags || '';
this.queryFormat = options.queryFormat;
this.pool = options.pool || undefined;
this.ssl = (typeof options.ssl === 'string')
? ConnectionConfig.getSSLProfile(options.ssl)
: (options.ssl || false);
this.multipleStatements = options.multipleStatements || false;
this.typeCast = (options.typeCast === undefined)
? true
: options.typeCast;
if (this.timezone[0] === ' ') {
this.timezone = '+' + this.timezone.substr(1);
}
if (this.ssl) {
this.ssl.rejectUnauthorized = this.ssl.rejectUnauthorized !== false;
}
this.maxPacketSize = 0;
this.charsetNumber = (options.charset)
? ConnectionConfig.getCharsetNumber(options.charset)
: options.charsetNumber || Charsets.UTF8_GENERAL_CI;
var defaultFlags = ConnectionConfig.getDefaultFlags(options);
this.clientFlags = ConnectionConfig.mergeFlags(defaultFlags, options.flags);
}
n/a
- description and source-code
function Pool(options) {
EventEmitter.call(this);
this.config = options.config;
this.config.connectionConfig.pool = this;
this._acquiringConnections = [];
this._allConnections = [];
this._freeConnections = [];
this._connectionQueue = [];
this._closed = false;
}
n/a
- description and source-code
function PoolCluster(config) {
EventEmitter.call(this);
config = config || {};
this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry;
this._defaultSelector = config.defaultSelector || 'RR';
this._removeNodeErrorCount = config.removeNodeErrorCount || 5;
this._restoreNodeTimeout = config.restoreNodeTimeout || 0;
this._closed = false;
this._findCaches = Object.create(null);
this._lastId = 0;
this._namespaces = Object.create(null);
this._nodes = Object.create(null);
}
n/a
- description and source-code
function PoolConfig(options) {
if (typeof options === 'string') {
options = ConnectionConfig.parseUrl(options);
}
this.acquireTimeout = (options.acquireTimeout === undefined)
? 10 * 1000
: Number(options.acquireTimeout);
this.connectionConfig = new ConnectionConfig(options);
this.waitForConnections = (options.waitForConnections === undefined)
? true
: Boolean(options.waitForConnections);
this.connectionLimit = (options.connectionLimit === undefined)
? 10
: Number(options.connectionLimit);
this.queueLimit = (options.queueLimit === undefined)
? 0
: Number(options.queueLimit);
}
n/a
- description and source-code
function PoolConnection(pool, options) {
Connection.call(this, options);
this._pool = pool;
if (Events.usingDomains) {
this.domain = pool.domain;
}
this.on('end', this._removeFromPool);
this.on('error', function (err) {
if (err.fatal) {
this._removeFromPool();
}
});
}
n/a
- description and source-code
function PoolNamespace(cluster, pattern, selector) {
this._cluster = cluster;
this._pattern = pattern;
this._selector = new PoolSelector[selector]();
}
n/a
- description and source-code
function createConnection(config) {
var Connection = loadClass('Connection');
var ConnectionConfig = loadClass('ConnectionConfig');
return new Connection({config: new ConnectionConfig(config)});
}
...
This is a node.js driver for mysql. It is written in JavaScript, does not
require compiling, and is 100% MIT licensed.
Here is an example on how to use it:
'''js
var mysql = require('mysql');
var connection = mysql.createConnection({
host : 'localhost',
user : 'me',
password : 'secret',
database : 'my_db'
});
connection.connect();
...
- description and source-code
function createPool(config) {
var Pool = loadClass('Pool');
var PoolConfig = loadClass('PoolConfig');
return new Pool({config: new PoolConfig(config)});
}
...
'''
Unlike 'end()' the 'destroy()' method does not take a callback argument.
#
Rather than creating and managing connections one-by-one, this module also
provides built-in connection pooling using 'mysql.createPool(config)'.
[Read more about connection pooling](https://en.wikipedia.org/wiki/Connection_pool).
Use pool directly.
'''js
var mysql = require('mysql');
var pool = mysql.createPool({
connectionLimit : 10,
...
- description and source-code
function createPoolCluster(config) {
var PoolCluster = loadClass('PoolCluster');
return new PoolCluster(config);
}
...
#
PoolCluster provides multiple hosts connection. (group & retry & selector)
'''js
// create
var poolCluster = mysql.createPoolCluster();
// add configurations (the config is a pool config object)
poolCluster.add(config); // add configuration with automatic name
poolCluster.add('MASTER', masterConfig); // add a named configuration
poolCluster.add('SLAVE1', slave1Config);
poolCluster.add('SLAVE2', slave2Config);
...
- description and source-code
function createQuery(sql, values, callback) {
var Connection = loadClass('Connection');
return Connection.createQuery(sql, values, callback);
}
...
* @param {function} [callback] The callback to use when query is complete
* @return {Query} New query object
* @public
*/
exports.createQuery = function createQuery(sql, values, callback) {
var Connection = loadClass('Connection');
return Connection.createQuery(sql, values, callback);
};
/**
* Escape a value for SQL.
* @param {*} value The value to escape
* @param {boolean} [stringifyObjects=false] Setting if objects should be stringified
* @param {string} [timeZone=local] Setting for time zone to use for Date conversion
...
- description and source-code
function escape(value, stringifyObjects, timeZone) {
var SqlString = loadClass('SqlString');
return SqlString.escape(value, stringifyObjects, timeZone);
}
...
);
'''
#
In order to avoid SQL Injection attacks, you should always escape any user
provided data before using it inside a SQL query. You can do so using the
'mysql.escape()', 'connection.escape()' or 'pool.escape()' methods:
'''js
var userId = 'some user provided value';
var sql = 'SELECT * FROM users WHERE id = ' + connection.escape(userId);
connection.query(sql, function (error, results, fields) {
if (error) throw error;
// ...
...
- description and source-code
function escapeId(value, forbidQualified) {
var SqlString = loadClass('SqlString');
return SqlString.escapeId(value, forbidQualified);
}
...
console.log(query); // SELECT * FROM posts WHERE title='Hello MySQL'
'''
#
If you can't trust an SQL identifier (database / table / column name) because it is
provided by a user, you should escape it with 'mysql.escapeId(identifier)',
'connection.escapeId(identifier)' or 'pool.escapeId(identifier)' like this:
'''js
var sorter = 'date';
var sql = 'SELECT * FROM posts ORDER BY ' + connection.escapeId(sorter);
connection.query(sql, function (error, results, fields) {
if (error) throw error;
...
- description and source-code
function format(sql, values, stringifyObjects, timeZone) {
var SqlString = loadClass('SqlString');
return SqlString.format(sql, values, stringifyObjects, timeZone);
}
...
#
You can use mysql.format to prepare a query with multiple insertion points, utilizing the proper escaping for ids and values. A \
simple example of this follows:
'''js
var sql = "SELECT * FROM ?? WHERE ?? = ?";
var inserts = ['users', 'id', userId];
sql = mysql.format(sql, inserts);
'''
Following this you then have a valid, escaped query that you can then send to the database safely. This is useful if you are loo\
king to prepare the query before actually sending it to the database. As mysql.format is exposed from SqlString.format you also \
have the option (but are not required) to pass in stringifyObject and timezone, allowing you provide a custom means of turning o\
bjects into strings, as well as a location-specific/timezone-aware Date.
#
If you prefer to have another type of query escape format, there's a connection configuration option you can use to define a cus\
tom format function. You can access the connection object if you want to use the built-in '.escape()' or any other connection fu\
nction.
...
- description and source-code
function Connection(options) {
Events.EventEmitter.call(this);
this.config = options.config;
this._socket = options.socket;
this._protocol = new Protocol({config: this.config, connection: this});
this._connectCalled = false;
this.state = 'disconnected';
this.threadId = null;
}
n/a
- description and source-code
function createQuery(sql, values, callback) {
if (sql instanceof Query) {
return sql;
}
var cb = bindToCurrentDomain(callback);
var options = {};
if (typeof sql === 'function') {
cb = bindToCurrentDomain(sql);
return new Query(options, cb);
}
if (typeof sql === 'object') {
for (var prop in sql) {
options[prop] = sql[prop];
}
if (typeof values === 'function') {
cb = bindToCurrentDomain(values);
} else if (values !== undefined) {
options.values = values;
}
return new Query(options, cb);
}
options.sql = sql;
options.values = values;
if (typeof values === 'function') {
cb = bindToCurrentDomain(values);
options.values = undefined;
}
if (cb === undefined && callback !== undefined) {
throw new TypeError('argument callback must be a function when provided');
}
return new Query(options, cb);
}
...
* @param {function} [callback] The callback to use when query is complete
* @return {Query} New query object
* @public
*/
exports.createQuery = function createQuery(sql, values, callback) {
var Connection = loadClass('Connection');
return Connection.createQuery(sql, values, callback);
};
/**
* Escape a value for SQL.
* @param {*} value The value to escape
* @param {boolean} [stringifyObjects=false] Setting if objects should be stringified
* @param {string} [timeZone=local] Setting for time zone to use for Date conversion
...
- description and source-code
function EventEmitter() {
EventEmitter.init.call(this);
}
n/a
- description and source-code
_handleConnectTimeout = function () {
if (this._socket) {
this._socket.setTimeout(0);
this._socket.destroy();
}
var err = new Error('connect ETIMEDOUT');
err.errorno = 'ETIMEDOUT';
err.code = 'ETIMEDOUT';
err.syscall = 'connect';
this._handleNetworkError(err);
}
n/a
- description and source-code
_handleNetworkError = function (err) {
this._protocol.handleNetworkError(err);
}
...
secureContext : secureContext,
isServer : false
});
// error handler for secure socket
secureSocket.on('_tlsError', function(err) {
if (secureEstablished) {
connection._handleNetworkError(err);
} else {
onSecure(err);
}
});
// cleartext <-> protocol
secureSocket.pipe(this._protocol);
...
- description and source-code
_handleProtocolConnect = function () {
this.state = 'connected';
this.emit('connect');
}
n/a
- description and source-code
_handleProtocolDrain = function () {
this.emit('drain');
}
n/a
- description and source-code
_handleProtocolEnd = function (err) {
this.state = 'disconnected';
this.emit('end', err);
}
n/a
- description and source-code
function _handleProtocolEnqueue(sequence) {
this.emit('enqueue', sequence);
}
n/a
- description and source-code
_handleProtocolError = function (err) {
this.state = 'protocol_error';
this.emit('error', err);
}
n/a
- description and source-code
function _handleProtocolHandshake(packet) {
this.state = 'authenticated';
this.threadId = packet.threadId;
}
n/a
- description and source-code
_implyConnect = function () {
if (!this._connectCalled) {
this.connect();
}
}
...
Connection.prototype.changeUser = function changeUser(options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
this._implyConnect();
var charsetNumber = (options.charset)
? ConnectionConfig.getCharsetNumber(options.charset)
: this.config.charsetNumber;
return this._protocol.changeUser({
user : options.user || this.config.user,
...
- description and source-code
function _startTLS(onSecure) {
var connection = this;
var secureContext = tls.createSecureContext({
ca : this.config.ssl.ca,
cert : this.config.ssl.cert,
ciphers : this.config.ssl.ciphers,
key : this.config.ssl.key,
passphrase : this.config.ssl.passphrase
});
this._socket.removeAllListeners('data');
this._protocol.removeAllListeners('data');
var rejectUnauthorized = this.config.ssl.rejectUnauthorized;
var secureEstablished = false;
var secureSocket = new tls.TLSSocket(this._socket, {
rejectUnauthorized : rejectUnauthorized,
requestCert : true,
secureContext : secureContext,
isServer : false
});
secureSocket.on('_tlsError', function(err) {
if (secureEstablished) {
connection._handleNetworkError(err);
} else {
onSecure(err);
}
});
secureSocket.pipe(this._protocol);
this._protocol.on('data', function(data) {
secureSocket.write(data);
});
secureSocket.on('secure', function() {
secureEstablished = true;
onSecure(rejectUnauthorized ? this.ssl.verifyError() : null);
});
secureSocket._start();
}
n/a
- description and source-code
function beginTransaction(options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
options = options || {};
options.sql = 'START TRANSACTION';
options.values = null;
return this.query(options, callback);
}
...
'''
#
Simple transaction support is available at the connection level:
'''js
connection.beginTransaction(function(err) {
if (err) { throw err; }
connection.query('INSERT INTO posts SET title=?', title, function (error, results, fields) {
if (error) {
return connection.rollback(function() {
throw error;
});
}
...
- description and source-code
function changeUser(options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
this._implyConnect();
var charsetNumber = (options.charset)
? ConnectionConfig.getCharsetNumber(options.charset)
: this.config.charsetNumber;
return this._protocol.changeUser({
user : options.user || this.config.user,
password : options.password || this.config.password,
database : options.database || this.config.database,
timeout : options.timeout,
charsetNumber : charsetNumber,
currentConfig : this.config
}, bindToCurrentDomain(callback));
}
...
#
MySQL offers a changeUser command that allows you to alter the current user and
other aspects of the connection without shutting down the underlying socket:
'''js
connection.changeUser({user : 'john'}, function(err) {
if (err) throw err;
});
'''
The available options for this feature are:
* 'user': The name of the new user (defaults to the previous one).
...
- description and source-code
function commit(options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
options = options || {};
options.sql = 'COMMIT';
options.values = null;
return this.query(options, callback);
}
...
connection.query('INSERT INTO log SET data=?', log, function (error, results, fields) {
if (error) {
return connection.rollback(function() {
throw error;
});
}
connection.commit(function(err) {
if (err) {
return connection.rollback(function() {
throw err;
});
}
console.log('success!');
});
...
- description and source-code
function connect(options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
if (!this._connectCalled) {
this._connectCalled = true;
this._socket = (this.config.socketPath)
? Net.createConnection(this.config.socketPath)
: Net.createConnection(this.config.port, this.config.host);
if (Events.usingDomains) {
this._socket.domain = this.domain;
}
var connection = this;
this._protocol.on('data', function(data) {
connection._socket.write(data);
});
this._socket.on('data', function(data) {
connection._protocol.write(data);
});
this._protocol.on('end', function() {
connection._socket.end();
});
this._socket.on('end', function() {
connection._protocol.end();
});
this._socket.on('error', this._handleNetworkError.bind(this));
this._socket.on('connect', this._handleProtocolConnect.bind(this));
this._protocol.on('handshake', this._handleProtocolHandshake.bind(this));
this._protocol.on('unhandledError', this._handleProtocolError.bind(this));
this._protocol.on('drain', this._handleProtocolDrain.bind(this));
this._protocol.on('end', this._handleProtocolEnd.bind(this));
this._protocol.on('enqueue', this._handleProtocolEnqueue.bind(this));
if (this.config.connectTimeout) {
var handleConnectTimeout = this._handleConnectTimeout.bind(this);
this._socket.setTimeout(this.config.connectTimeout, handleConnectTimeout);
this._socket.once('connect', function() {
this.setTimeout(0, handleConnectTimeout);
});
}
}
this._protocol.handshake(options, bindToCurrentDomain(callback));
}
...
var connection = mysql.createConnection({
host : 'localhost',
user : 'me',
password : 'secret',
database : 'my_db'
});
connection.connect();
connection.query('SELECT 1 + 1 AS solution', function (error, results, fields) {
if (error) throw error;
console.log('The solution is: ', results[0].solution);
});
connection.end();
...
- description and source-code
destroy = function () {
this.state = 'disconnected';
this._implyConnect();
this._socket.destroy();
this._protocol.destroy();
}
...
An alternative way to end the connection is to call the 'destroy()' method.
This will cause an immediate termination of the underlying socket.
Additionally 'destroy()' guarantees that no more events or callbacks will be
triggered for the connection.
'''js
connection.destroy();
'''
Unlike 'end()' the 'destroy()' method does not take a callback argument.
#
Rather than creating and managing connections one-by-one, this module also
...
- description and source-code
function end(options, callback) {
var cb = callback;
var opts = options;
if (!callback && typeof options === 'function') {
cb = options;
opts = null;
}
opts = Object.create(opts || null);
if (opts.timeout === undefined) {
opts.timeout = 30000;
}
this._implyConnect();
this._protocol.quit(opts, bindToCurrentDomain(cb));
}
...
connection.connect();
connection.query('SELECT 1 + 1 AS solution', function (error, results, fields) {
if (error) throw error;
console.log('The solution is: ', results[0].solution);
});
connection.end();
'''
From this example, you can learn the following:
* Every method you invoke on a connection is queued and executed in sequence.
* Closing the connection is done using 'end()' which makes sure all remaining
queries are executed before sending a quit packet to the mysql server.
...
- description and source-code
escape = function (value) {
return SqlString.escape(value, false, this.config.timezone);
}
...
);
'''
#
In order to avoid SQL Injection attacks, you should always escape any user
provided data before using it inside a SQL query. You can do so using the
'mysql.escape()', 'connection.escape()' or 'pool.escape()' methods:
'''js
var userId = 'some user provided value';
var sql = 'SELECT * FROM users WHERE id = ' + connection.escape(userId);
connection.query(sql, function (error, results, fields) {
if (error) throw error;
// ...
...
- description and source-code
function escapeId(value) {
return SqlString.escapeId(value, false);
}
...
console.log(query); // SELECT * FROM posts WHERE title='Hello MySQL'
'''
#
If you can't trust an SQL identifier (database / table / column name) because it is
provided by a user, you should escape it with 'mysql.escapeId(identifier)',
'connection.escapeId(identifier)' or 'pool.escapeId(identifier)' like this:
'''js
var sorter = 'date';
var sql = 'SELECT * FROM posts ORDER BY ' + connection.escapeId(sorter);
connection.query(sql, function (error, results, fields) {
if (error) throw error;
...
- description and source-code
format = function (sql, values) {
if (typeof this.config.queryFormat === 'function') {
return this.config.queryFormat.call(this, sql, values, this.config.timezone);
}
return SqlString.format(sql, values, this.config.stringifyObjects, this.config.timezone);
}
...
#
You can use mysql.format to prepare a query with multiple insertion points, utilizing the proper escaping for ids and values. A \
simple example of this follows:
'''js
var sql = "SELECT * FROM ?? WHERE ?? = ?";
var inserts = ['users', 'id', userId];
sql = mysql.format(sql, inserts);
'''
Following this you then have a valid, escaped query that you can then send to the database safely. This is useful if you are loo\
king to prepare the query before actually sending it to the database. As mysql.format is exposed from SqlString.format you also \
have the option (but are not required) to pass in stringifyObject and timezone, allowing you provide a custom means of turning o\
bjects into strings, as well as a location-specific/timezone-aware Date.
#
If you prefer to have another type of query escape format, there's a connection configuration option you can use to define a cus\
tom format function. You can access the connection object if you want to use the built-in '.escape()' or any other connection fu\
nction.
...
- description and source-code
pause = function () {
this._socket.pause();
this._protocol.pause();
}
...
// Handle error, an 'end' event will be emitted after this as well
})
.on('fields', function(fields) {
// the field packets for the rows to follow
})
.on('result', function(row) {
// Pausing the connnection is useful if your processing involves I/O
connection.pause();
processRow(row, function() {
connection.resume();
});
})
.on('end', function() {
// all rows have been received
...
- description and source-code
function ping(options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
this._implyConnect();
this._protocol.ping(options, bindToCurrentDomain(callback));
}
...
#
A ping packet can be sent over a connection using the 'connection.ping' method. This
method will send a ping packet to the server and when the server responds, the callback
will fire. If an error occurred, the callback will fire with an error argument.
'''js
connection.ping(function (err) {
if (err) throw err;
console.log('Server responded to ping');
})
'''
#
...
- description and source-code
function query(sql, values, cb) {
var query = Connection.createQuery(sql, values, cb);
query._connection = this;
if (!(typeof sql === 'object' && 'typeCast' in sql)) {
query.typeCast = this.config.typeCast;
}
if (query.sql) {
query.sql = this.format(query.sql, query.values);
}
this._implyConnect();
return this._protocol._enqueue(query);
}
...
user : 'me',
password : 'secret',
database : 'my_db'
});
connection.connect();
connection.query('SELECT 1 + 1 AS solution', function (error, results, fields) {
if (error) throw error;
console.log('The solution is: ', results[0].solution);
});
connection.end();
'''
...
- description and source-code
resume = function () {
this._socket.resume();
this._protocol.resume();
}
...
// the field packets for the rows to follow
})
.on('result', function(row) {
// Pausing the connnection is useful if your processing involves I/O
connection.pause();
processRow(row, function() {
connection.resume();
});
})
.on('end', function() {
// all rows have been received
});
'''
...
- description and source-code
function rollback(options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
options = options || {};
options.sql = 'ROLLBACK';
options.values = null;
return this.query(options, callback);
}
...
Simple transaction support is available at the connection level:
'''js
connection.beginTransaction(function(err) {
if (err) { throw err; }
connection.query('INSERT INTO posts SET title=?', title, function (error, results, fields) {
if (error) {
return connection.rollback(function() {
throw error;
});
}
var log = 'Post ' + result.insertId + ' added';
connection.query('INSERT INTO log SET data=?', log, function (error, results, fields) {
...
- description and source-code
function statistics(options, callback) {
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
this._implyConnect();
this._protocol.stats(options, bindToCurrentDomain(callback));
}
n/a
- description and source-code
function ConnectionConfig(options) {
if (typeof options === 'string') {
options = ConnectionConfig.parseUrl(options);
}
this.host = options.host || 'localhost';
this.port = options.port || 3306;
this.localAddress = options.localAddress;
this.socketPath = options.socketPath;
this.user = options.user || undefined;
this.password = options.password || undefined;
this.database = options.database;
this.connectTimeout = (options.connectTimeout === undefined)
? (10 * 1000)
: options.connectTimeout;
this.insecureAuth = options.insecureAuth || false;
this.supportBigNumbers = options.supportBigNumbers || false;
this.bigNumberStrings = options.bigNumberStrings || false;
this.dateStrings = options.dateStrings || false;
this.debug = options.debug;
this.trace = options.trace !== false;
this.stringifyObjects = options.stringifyObjects || false;
this.timezone = options.timezone || 'local';
this.flags = options.flags || '';
this.queryFormat = options.queryFormat;
this.pool = options.pool || undefined;
this.ssl = (typeof options.ssl === 'string')
? ConnectionConfig.getSSLProfile(options.ssl)
: (options.ssl || false);
this.multipleStatements = options.multipleStatements || false;
this.typeCast = (options.typeCast === undefined)
? true
: options.typeCast;
if (this.timezone[0] === ' ') {
this.timezone = '+' + this.timezone.substr(1);
}
if (this.ssl) {
this.ssl.rejectUnauthorized = this.ssl.rejectUnauthorized !== false;
}
this.maxPacketSize = 0;
this.charsetNumber = (options.charset)
? ConnectionConfig.getCharsetNumber(options.charset)
: options.charsetNumber || Charsets.UTF8_GENERAL_CI;
var defaultFlags = ConnectionConfig.getDefaultFlags(options);
this.clientFlags = ConnectionConfig.mergeFlags(defaultFlags, options.flags);
}
n/a
- description and source-code
function getCharsetNumber(charset) {
var num = Charsets[charset.toUpperCase()];
if (num === undefined) {
throw new TypeError('Unknown charset \'' + charset + '\'');
}
return num;
}
...
callback = options;
options = {};
}
this._implyConnect();
var charsetNumber = (options.charset)
? ConnectionConfig.getCharsetNumber(options.charset)
: this.config.charsetNumber;
return this._protocol.changeUser({
user : options.user || this.config.user,
password : options.password || this.config.password,
database : options.database || this.config.database,
timeout : options.timeout,
...
- description and source-code
function getDefaultFlags(options) {
var defaultFlags = [
'-COMPRESS',
'-CONNECT_ATTRS',
'+CONNECT_WITH_DB',
'+FOUND_ROWS',
'+IGNORE_SIGPIPE',
'+IGNORE_SPACE',
'+LOCAL_FILES',
'+LONG_FLAG',
'+LONG_PASSWORD',
'+MULTI_RESULTS',
'+ODBC',
'-PLUGIN_AUTH',
'+PROTOCOL_41',
'+PS_MULTI_RESULTS',
'+RESERVED',
'+SECURE_CONNECTION',
'+TRANSACTIONS'
];
if (options && options.multipleStatements) {
defaultFlags.push('+MULTI_STATEMENTS');
}
return defaultFlags;
}
...
this.maxPacketSize = 0;
this.charsetNumber = (options.charset)
? ConnectionConfig.getCharsetNumber(options.charset)
: options.charsetNumber || Charsets.UTF8_GENERAL_CI;
// Set the client flags
var defaultFlags = ConnectionConfig.getDefaultFlags(options);
this.clientFlags = ConnectionConfig.mergeFlags(defaultFlags, options.flags);
}
ConnectionConfig.mergeFlags = function mergeFlags(defaultFlags, userFlags) {
var allFlags = ConnectionConfig.parseFlagList(defaultFlags);
var newFlags = ConnectionConfig.parseFlagList(userFlags);
...
- description and source-code
function getSSLProfile(name) {
if (!SSLProfiles) {
SSLProfiles = require('./protocol/constants/ssl_profiles');
}
var ssl = SSLProfiles[name];
if (ssl === undefined) {
throw new TypeError('Unknown SSL profile \'' + name + '\'');
}
return ssl;
}
...
this.trace = options.trace !== false;
this.stringifyObjects = options.stringifyObjects || false;
this.timezone = options.timezone || 'local';
this.flags = options.flags || '';
this.queryFormat = options.queryFormat;
this.pool = options.pool || undefined;
this.ssl = (typeof options.ssl === 'string')
? ConnectionConfig.getSSLProfile(options.ssl)
: (options.ssl || false);
this.multipleStatements = options.multipleStatements || false;
this.typeCast = (options.typeCast === undefined)
? true
: options.typeCast;
if (this.timezone[0] === ' ') {
...
- description and source-code
function mergeFlags(defaultFlags, userFlags) {
var allFlags = ConnectionConfig.parseFlagList(defaultFlags);
var newFlags = ConnectionConfig.parseFlagList(userFlags);
for (var flag in newFlags) {
if (allFlags[flag] !== false) {
allFlags[flag] = newFlags[flag];
}
}
var flags = 0x0;
for (var flag in allFlags) {
if (allFlags[flag]) {
flags |= ClientConstants['CLIENT_' + flag] || 0x0;
}
}
return flags;
}
...
this.maxPacketSize = 0;
this.charsetNumber = (options.charset)
? ConnectionConfig.getCharsetNumber(options.charset)
: options.charsetNumber || Charsets.UTF8_GENERAL_CI;
// Set the client flags
var defaultFlags = ConnectionConfig.getDefaultFlags(options);
this.clientFlags = ConnectionConfig.mergeFlags(defaultFlags, options.flags);
}
ConnectionConfig.mergeFlags = function mergeFlags(defaultFlags, userFlags) {
var allFlags = ConnectionConfig.parseFlagList(defaultFlags);
var newFlags = ConnectionConfig.parseFlagList(userFlags);
// Merge the new flags
...
- description and source-code
function parseFlagList(flagList) {
var allFlags = Object.create(null);
if (!flagList) {
return allFlags;
}
var flags = !Array.isArray(flagList)
? String(flagList || '').toUpperCase().split(/\s*,+\s*/)
: flagList;
for (var i = 0; i < flags.length; i++) {
var flag = flags[i];
var offset = 1;
var state = flag[0];
if (state === undefined) {
continue;
}
if (state !== '-' && state !== '+') {
offset = 0;
state = '+';
}
allFlags[flag.substr(offset)] = state === '+';
}
return allFlags;
}
...
// Set the client flags
var defaultFlags = ConnectionConfig.getDefaultFlags(options);
this.clientFlags = ConnectionConfig.mergeFlags(defaultFlags, options.flags);
}
ConnectionConfig.mergeFlags = function mergeFlags(defaultFlags, userFlags) {
var allFlags = ConnectionConfig.parseFlagList(defaultFlags);
var newFlags = ConnectionConfig.parseFlagList(userFlags);
// Merge the new flags
for (var flag in newFlags) {
if (allFlags[flag] !== false) {
allFlags[flag] = newFlags[flag];
}
...
- description and source-code
parseUrl = function (url) {
url = urlParse(url, true);
var options = {
host : url.hostname,
port : url.port,
database : url.pathname.substr(1)
};
if (url.auth) {
var auth = url.auth.split(':');
options.user = auth.shift();
options.password = auth.join(':');
}
if (url.query) {
for (var key in url.query) {
var value = url.query[key];
try {
options[key] = JSON.parse(value);
} catch (err) {
options[key] = value;
}
}
}
return options;
}
...
var ClientConstants = require('./protocol/constants/client');
var Charsets = require('./protocol/constants/charsets');
var SSLProfiles = null;
module.exports = ConnectionConfig;
function ConnectionConfig(options) {
if (typeof options === 'string') {
options = ConnectionConfig.parseUrl(options);
}
this.host = options.host || 'localhost';
this.port = options.port || 3306;
this.localAddress = options.localAddress;
this.socketPath = options.socketPath;
this.user = options.user || undefined;
...
- description and source-code
function Pool(options) {
EventEmitter.call(this);
this.config = options.config;
this.config.connectionConfig.pool = this;
this._acquiringConnections = [];
this._allConnections = [];
this._freeConnections = [];
this._connectionQueue = [];
this._closed = false;
}
n/a
- description and source-code
function EventEmitter() {
EventEmitter.init.call(this);
}
n/a
- description and source-code
function _enqueueCallback(callback) {
if (this.config.queueLimit && this._connectionQueue.length >= this.config.queueLimit) {
process.nextTick(function () {
var err = new Error('Queue limit reached.');
err.code = 'POOL_ENQUEUELIMIT';
callback(err);
});
return;
}
var cb = process.domain
? process.domain.bind(callback)
: callback;
this._connectionQueue.push(cb);
this.emit('enqueue');
}
...
var err = new Error('No connections available.');
err.code = 'POOL_CONNLIMIT';
cb(err);
});
return;
}
this._enqueueCallback(cb);
};
Pool.prototype.acquireConnection = function acquireConnection(connection, cb) {
if (connection._pool !== this) {
throw new Error('Connection acquired from wrong pool.');
}
...
- description and source-code
function _needsChangeUser(connection) {
var connConfig = connection.config;
var poolConfig = this.config.connectionConfig;
return connConfig.user !== poolConfig.user
|| connConfig.database !== poolConfig.database
|| connConfig.password !== poolConfig.password
|| connConfig.charsetNumber !== poolConfig.charsetNumber;
}
...
};
Pool.prototype.acquireConnection = function acquireConnection(connection, cb) {
if (connection._pool !== this) {
throw new Error('Connection acquired from wrong pool.');
}
var changeUser = this._needsChangeUser(connection);
var pool = this;
this._acquiringConnections.push(connection);
function onOperationComplete(err) {
spliceConnection(pool._acquiringConnections, connection);
...
- description and source-code
function _purgeConnection(connection, callback) {
var cb = callback || function () {};
if (connection.state === 'disconnected') {
connection.destroy();
}
this._removeConnection(connection);
if (connection.state !== 'disconnected' && !connection._protocol._quitSequence) {
connection._realEnd(cb);
return;
}
process.nextTick(cb);
}
...
if (pool._closed) {
err = new Error('Pool is closed.');
err.code = 'POOL_CLOSED';
}
if (err) {
pool._purgeConnection(connection);
cb(err);
return;
}
pool.emit('connection', connection);
pool.emit('acquire', connection);
cb(null, connection);
...
- description and source-code
_removeConnection = function (connection) {
connection._pool = null;
spliceConnection(this._allConnections, connection);
spliceConnection(this._freeConnections, connection);
this.releaseConnection(connection);
}
...
Pool.prototype._purgeConnection = function _purgeConnection(connection, callback) {
var cb = callback || function () {};
if (connection.state === 'disconnected') {
connection.destroy();
}
this._removeConnection(connection);
if (connection.state !== 'disconnected' && !connection._protocol._quitSequence) {
connection._realEnd(cb);
return;
}
process.nextTick(cb);
...
- description and source-code
function acquireConnection(connection, cb) {
if (connection._pool !== this) {
throw new Error('Connection acquired from wrong pool.');
}
var changeUser = this._needsChangeUser(connection);
var pool = this;
this._acquiringConnections.push(connection);
function onOperationComplete(err) {
spliceConnection(pool._acquiringConnections, connection);
if (pool._closed) {
err = new Error('Pool is closed.');
err.code = 'POOL_CLOSED';
}
if (err) {
pool._connectionQueue.unshift(cb);
pool._purgeConnection(connection);
return;
}
if (changeUser) {
pool.emit('connection', connection);
}
pool.emit('acquire', connection);
cb(null, connection);
}
if (changeUser) {
connection.config = this.config.newConnectionConfig();
connection.changeUser({timeout: this.config.acquireTimeout}, onOperationComplete);
} else {
connection.ping({timeout: this.config.acquireTimeout}, onOperationComplete);
}
}
...
}
var connection;
var pool = this;
if (this._freeConnections.length > 0) {
connection = this._freeConnections.shift();
this.acquireConnection(connection, cb);
return;
}
if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) {
connection = new PoolConnection(this, { config: this.config.newConnectionConfig() });
this._acquiringConnections.push(connection);
...
- description and source-code
end = function (cb) {
this._closed = true;
if (typeof cb !== 'function') {
cb = function (err) {
if (err) throw err;
};
}
var calledBack = false;
var waitingClose = 0;
function onEnd(err) {
if (!calledBack && (err || --waitingClose <= 0)) {
calledBack = true;
cb(err);
}
}
while (this._allConnections.length !== 0) {
waitingClose++;
this._purgeConnection(this._allConnections[0], onEnd);
}
if (waitingClose === 0) {
process.nextTick(onEnd);
}
}
...
connection.connect();
connection.query('SELECT 1 + 1 AS solution', function (error, results, fields) {
if (error) throw error;
console.log('The solution is: ', results[0].solution);
});
connection.end();
'''
From this example, you can learn the following:
* Every method you invoke on a connection is queued and executed in sequence.
* Closing the connection is done using 'end()' which makes sure all remaining
queries are executed before sending a quit packet to the mysql server.
...
- description and source-code
escape = function (value) {
return mysql.escape(value, this.config.connectionConfig.stringifyObjects, this.config.connectionConfig.timezone);
}
...
);
'''
#
In order to avoid SQL Injection attacks, you should always escape any user
provided data before using it inside a SQL query. You can do so using the
'mysql.escape()', 'connection.escape()' or 'pool.escape()' methods:
'''js
var userId = 'some user provided value';
var sql = 'SELECT * FROM users WHERE id = ' + connection.escape(userId);
connection.query(sql, function (error, results, fields) {
if (error) throw error;
// ...
...
- description and source-code
function escapeId(value) {
return mysql.escapeId(value, false);
}
...
console.log(query); // SELECT * FROM posts WHERE title='Hello MySQL'
'''
#
If you can't trust an SQL identifier (database / table / column name) because it is
provided by a user, you should escape it with 'mysql.escapeId(identifier)',
'connection.escapeId(identifier)' or 'pool.escapeId(identifier)' like this:
'''js
var sorter = 'date';
var sql = 'SELECT * FROM posts ORDER BY ' + connection.escapeId(sorter);
connection.query(sql, function (error, results, fields) {
if (error) throw error;
...
- description and source-code
getConnection = function (cb) {
if (this._closed) {
var err = new Error('Pool is closed.');
err.code = 'POOL_CLOSED';
process.nextTick(function () {
cb(err);
});
return;
}
var connection;
var pool = this;
if (this._freeConnections.length > 0) {
connection = this._freeConnections.shift();
this.acquireConnection(connection, cb);
return;
}
if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) {
connection = new PoolConnection(this, { config: this.config.newConnectionConfig() });
this._acquiringConnections.push(connection);
this._allConnections.push(connection);
connection.connect({timeout: this.config.acquireTimeout}, function onConnect(err) {
spliceConnection(pool._acquiringConnections, connection);
if (pool._closed) {
err = new Error('Pool is closed.');
err.code = 'POOL_CLOSED';
}
if (err) {
pool._purgeConnection(connection);
cb(err);
return;
}
pool.emit('connection', connection);
pool.emit('acquire', connection);
cb(null, connection);
});
return;
}
if (!this.config.waitForConnections) {
process.nextTick(function(){
var err = new Error('No connections available.');
err.code = 'POOL_CONNLIMIT';
cb(err);
});
return;
}
this._enqueueCallback(cb);
}
...
var pool = mysql.createPool({
host : 'example.org',
user : 'bob',
password : 'secret',
database : 'my_db'
});
pool.getConnection(function(err, connection) {
// connected! (unless 'err' is set)
});
'''
When you are done with a connection, just call 'connection.release()' and the
connection will return to the pool, ready to be used again by someone else.
...
- description and source-code
query = function (sql, values, cb) {
var query = Connection.createQuery(sql, values, cb);
if (!(typeof sql === 'object' && 'typeCast' in sql)) {
query.typeCast = this.config.connectionConfig.typeCast;
}
if (this.config.connectionConfig.trace) {
query._callSite = new Error();
}
this.getConnection(function (err, conn) {
if (err) {
query.on('error', function () {});
query.end(err);
return;
}
query.once('end', function() {
conn.release();
});
conn.query(query);
});
return query;
}
...
user : 'me',
password : 'secret',
database : 'my_db'
});
connection.connect();
connection.query('SELECT 1 + 1 AS solution', function (error, results, fields) {
if (error) throw error;
console.log('The solution is: ', results[0].solution);
});
connection.end();
'''
...
- description and source-code
function releaseConnection(connection) {
if (this._acquiringConnections.indexOf(connection) !== -1) {
return;
}
if (connection._pool) {
if (connection._pool !== this) {
throw new Error('Connection released to wrong pool');
}
if (this._freeConnections.indexOf(connection) !== -1) {
throw new Error('Connection already released');
} else {
this._freeConnections.push(connection);
this.emit('release', connection);
}
}
if (this._closed) {
this._connectionQueue.splice(0).forEach(function (cb) {
var err = new Error('Pool is closed.');
err.code = 'POOL_CLOSED';
process.nextTick(function () {
cb(err);
});
});
} else if (this._connectionQueue.length) {
this.getConnection(this._connectionQueue.shift());
}
}
...
// Remove connection from all connections
spliceConnection(this._allConnections, connection);
// Remove connection from free connections
spliceConnection(this._freeConnections, connection);
this.releaseConnection(connection);
};
Pool.prototype.escape = function(value) {
return mysql.escape(value, this.config.connectionConfig.stringifyObjects, this.config.connectionConfig.timezone);
};
Pool.prototype.escapeId = function escapeId(value) {
...
- description and source-code
function PoolCluster(config) {
EventEmitter.call(this);
config = config || {};
this._canRetry = typeof config.canRetry === 'undefined' ? true : config.canRetry;
this._defaultSelector = config.defaultSelector || 'RR';
this._removeNodeErrorCount = config.removeNodeErrorCount || 5;
this._restoreNodeTimeout = config.restoreNodeTimeout || 0;
this._closed = false;
this._findCaches = Object.create(null);
this._lastId = 0;
this._namespaces = Object.create(null);
this._nodes = Object.create(null);
}
n/a
- description and source-code
function EventEmitter() {
EventEmitter.init.call(this);
}
n/a
- description and source-code
function _clearFindCaches() {
this._findCaches = Object.create(null);
}
...
this._nodes[nodeId] = {
id : nodeId,
errorCount : 0,
pool : new Pool({config: poolConfig}),
_offlineUntil : 0
};
this._clearFindCaches();
};
PoolCluster.prototype.end = function end(callback) {
var cb = callback !== undefined
? callback
: _cb;
...
- description and source-code
function _decreaseErrorCount(node) {
var errorCount = node.errorCount;
if (errorCount > this._removeNodeErrorCount) {
errorCount = this._removeNodeErrorCount;
}
if (errorCount < 1) {
errorCount = 1;
}
node.errorCount = errorCount - 1;
if (node._offlineUntil) {
node._offlineUntil = 0;
this.emit('online', node.id);
}
}
...
node.pool.getConnection(function (err, connection) {
if (err) {
self._increaseErrorCount(node);
cb(err);
return;
} else {
self._decreaseErrorCount(node);
}
connection._clusterId = node.id;
cb(null, connection);
});
};
...
- description and source-code
function _findNodeIds(pattern, includeOffline) {
var currentTime = 0;
var foundNodeIds = this._findCaches[pattern];
if (foundNodeIds === undefined) {
var expression = patternRegExp(pattern);
var nodeIds = Object.keys(this._nodes);
foundNodeIds = nodeIds.filter(function (id) {
return id.match(expression);
});
this._findCaches[pattern] = foundNodeIds;
}
if (includeOffline) {
return foundNodeIds;
}
return foundNodeIds.filter(function (nodeId) {
var node = this._getNode(nodeId);
if (!node._offlineUntil) {
return true;
}
if (!currentTime) {
currentTime = getMonotonicMilliseconds();
}
return node._offlineUntil <= currentTime;
}, this);
}
...
this._namespaces[key] = new PoolNamespace(this, pattern, selector);
}
return this._namespaces[key];
};
PoolCluster.prototype.remove = function remove(pattern) {
var foundNodeIds = this._findNodeIds(pattern, true);
for (var i = 0; i < foundNodeIds.length; i++) {
var node = this._getNode(foundNodeIds[i]);
if (node) {
this._removeNode(node);
}
...
- description and source-code
_getConnection = function (node, cb) {
var self = this;
node.pool.getConnection(function (err, connection) {
if (err) {
self._increaseErrorCount(node);
cb(err);
return;
} else {
self._decreaseErrorCount(node);
}
connection._clusterId = node.id;
cb(null, connection);
});
}
...
err.code = 'POOL_NOEXIST';
}
cb(err);
return;
}
cluster._getConnection(clusterNode, function(err, connection) {
var retry = err && cluster._canRetry
&& cluster._findNodeIds(namespace._pattern).length !== 0;
if (retry) {
namespace.getConnection(cb);
return;
}
...
- description and source-code
function _getNode(id) {
return this._nodes[id] || null;
}
...
return this._namespaces[key];
};
PoolCluster.prototype.remove = function remove(pattern) {
var foundNodeIds = this._findNodeIds(pattern, true);
for (var i = 0; i < foundNodeIds.length; i++) {
var node = this._getNode(foundNodeIds[i]);
if (node) {
this._removeNode(node);
}
}
};
...
- description and source-code
function _increaseErrorCount(node) {
var errorCount = ++node.errorCount;
if (this._removeNodeErrorCount > errorCount) {
return;
}
if (this._restoreNodeTimeout > 0) {
node._offlineUntil = getMonotonicMilliseconds() + this._restoreNodeTimeout;
this.emit('offline', node.id);
return;
}
this._removeNode(node);
this.emit('remove', node.id);
}
...
};
PoolCluster.prototype._getConnection = function(node, cb) {
var self = this;
node.pool.getConnection(function (err, connection) {
if (err) {
self._increaseErrorCount(node);
cb(err);
return;
} else {
self._decreaseErrorCount(node);
}
connection._clusterId = node.id;
...
- description and source-code
function _removeNode(node) {
delete this._nodes[node.id];
this._clearFindCaches();
node.pool.end(_noop);
}
...
PoolCluster.prototype.remove = function remove(pattern) {
var foundNodeIds = this._findNodeIds(pattern, true);
for (var i = 0; i < foundNodeIds.length; i++) {
var node = this._getNode(foundNodeIds[i]);
if (node) {
this._removeNode(node);
}
}
};
PoolCluster.prototype.getConnection = function(pattern, selector, cb) {
var namespace;
if (typeof pattern === 'function') {
...
- description and source-code
function add(id, config) {
if (this._closed) {
throw new Error('PoolCluster is closed.');
}
var nodeId = typeof id === 'object'
? 'CLUSTER::' + (++this._lastId)
: String(id);
if (this._nodes[nodeId] !== undefined) {
throw new Error('Node ID "' + nodeId + '" is already defined in PoolCluster.');
}
var poolConfig = typeof id !== 'object'
? new PoolConfig(config)
: new PoolConfig(id);
this._nodes[nodeId] = {
id : nodeId,
errorCount : 0,
pool : new Pool({config: poolConfig}),
_offlineUntil : 0
};
this._clearFindCaches();
}
...
PoolCluster provides multiple hosts connection. (group & retry & selector)
'''js
// create
var poolCluster = mysql.createPoolCluster();
// add configurations (the config is a pool config object)
poolCluster.add(config); // add configuration with automatic name
poolCluster.add('MASTER', masterConfig); // add a named configuration
poolCluster.add('SLAVE1', slave1Config);
poolCluster.add('SLAVE2', slave2Config);
// remove configurations
poolCluster.remove('SLAVE2'); // By nodeId
poolCluster.remove('SLAVE*'); // By target group : SLAVE1-2
...
- description and source-code
function end(callback) {
var cb = callback !== undefined
? callback
: _cb;
if (typeof cb !== 'function') {
throw TypeError('callback argument must be a function');
}
if (this._closed) {
process.nextTick(cb);
return;
}
this._closed = true;
var calledBack = false;
var nodeIds = Object.keys(this._nodes);
var waitingClose = 0;
function onEnd(err) {
if (!calledBack && (err || --waitingClose <= 0)) {
calledBack = true;
cb(err);
}
}
for (var i = 0; i < nodeIds.length; i++) {
var nodeId = nodeIds[i];
var node = this._nodes[nodeId];
waitingClose++;
node.pool.end(onEnd);
}
if (waitingClose === 0) {
process.nextTick(onEnd);
}
}
...
connection.connect();
connection.query('SELECT 1 + 1 AS solution', function (error, results, fields) {
if (error) throw error;
console.log('The solution is: ', results[0].solution);
});
connection.end();
'''
From this example, you can learn the following:
* Every method you invoke on a connection is queued and executed in sequence.
* Closing the connection is done using 'end()' which makes sure all remaining
queries are executed before sending a quit packet to the mysql server.
...
- description and source-code
getConnection = function (pattern, selector, cb) {
var namespace;
if (typeof pattern === 'function') {
cb = pattern;
namespace = this.of();
} else {
if (typeof selector === 'function') {
cb = selector;
selector = this._defaultSelector;
}
namespace = this.of(pattern, selector);
}
namespace.getConnection(cb);
}
...
var pool = mysql.createPool({
host : 'example.org',
user : 'bob',
password : 'secret',
database : 'my_db'
});
pool.getConnection(function(err, connection) {
// connected! (unless 'err' is set)
});
'''
When you are done with a connection, just call 'connection.release()' and the
connection will return to the pool, ready to be used again by someone else.
...
- description and source-code
of = function (pattern, selector) {
pattern = pattern || '*';
selector = selector || this._defaultSelector;
selector = selector.toUpperCase();
if (typeof PoolSelector[selector] === 'undefined') {
selector = this._defaultSelector;
}
var key = pattern + selector;
if (typeof this._namespaces[key] === 'undefined') {
this._namespaces[key] = new PoolNamespace(this, pattern, selector);
}
return this._namespaces[key];
}
...
// A pattern can be passed with * as wildcard
poolCluster.getConnection('SLAVE*', 'ORDER', function (err, connection) {});
// The pattern can also be a regular expression
poolCluster.getConnection(/^SLAVE[12]$/, function (err, connection) {});
// of namespace : of(pattern, selector)
poolCluster.of('*').getConnection(function (err, connection) {});
var pool = poolCluster.of('SLAVE*', 'RANDOM');
pool.getConnection(function (err, connection) {});
pool.getConnection(function (err, connection) {});
pool.query(function (error, results, fields) {});
// close all connections
...
- description and source-code
function remove(pattern) {
var foundNodeIds = this._findNodeIds(pattern, true);
for (var i = 0; i < foundNodeIds.length; i++) {
var node = this._getNode(foundNodeIds[i]);
if (node) {
this._removeNode(node);
}
}
}
...
// add configurations (the config is a pool config object)
poolCluster.add(config); // add configuration with automatic name
poolCluster.add('MASTER', masterConfig); // add a named configuration
poolCluster.add('SLAVE1', slave1Config);
poolCluster.add('SLAVE2', slave2Config);
// remove configurations
poolCluster.remove('SLAVE2'); // By nodeId
poolCluster.remove('SLAVE*'); // By target group : SLAVE1-2
// Target Group : ALL(anonymous, MASTER, SLAVE1-2), Selector : round-robin(default)
poolCluster.getConnection(function (err, connection) {});
// Target Group : MASTER, Selector : round-robin
poolCluster.getConnection('MASTER', function (err, connection) {});
...
- description and source-code
function PoolConfig(options) {
if (typeof options === 'string') {
options = ConnectionConfig.parseUrl(options);
}
this.acquireTimeout = (options.acquireTimeout === undefined)
? 10 * 1000
: Number(options.acquireTimeout);
this.connectionConfig = new ConnectionConfig(options);
this.waitForConnections = (options.waitForConnections === undefined)
? true
: Boolean(options.waitForConnections);
this.connectionLimit = (options.connectionLimit === undefined)
? 10
: Number(options.connectionLimit);
this.queueLimit = (options.queueLimit === undefined)
? 0
: Number(options.queueLimit);
}
n/a
- description and source-code
function newConnectionConfig() {
var connectionConfig = new ConnectionConfig(this.connectionConfig);
connectionConfig.clientFlags = this.connectionConfig.clientFlags;
connectionConfig.maxPacketSize = this.connectionConfig.maxPacketSize;
return connectionConfig;
}
...
if (this._freeConnections.length > 0) {
connection = this._freeConnections.shift();
this.acquireConnection(connection, cb);
return;
}
if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) {
connection = new PoolConnection(this, { config: this.config.newConnectionConfig() });
this._acquiringConnections.push(connection);
this._allConnections.push(connection);
connection.connect({timeout: this.config.acquireTimeout}, function onConnect(err) {
spliceConnection(pool._acquiringConnections, connection);
...
- description and source-code
function PoolConnection(pool, options) {
Connection.call(this, options);
this._pool = pool;
if (Events.usingDomains) {
this.domain = pool.domain;
}
this.on('end', this._removeFromPool);
this.on('error', function (err) {
if (err.fatal) {
this._removeFromPool();
}
});
}
n/a
- description and source-code
function Connection(options) {
Events.EventEmitter.call(this);
this.config = options.config;
this._socket = options.socket;
this._protocol = new Protocol({config: this.config, connection: this});
this._connectCalled = false;
this.state = 'disconnected';
this.threadId = null;
}
n/a
- description and source-code
function end(options, callback) {
var cb = callback;
var opts = options;
if (!callback && typeof options === 'function') {
cb = options;
opts = null;
}
opts = Object.create(opts || null);
if (opts.timeout === undefined) {
opts.timeout = 30000;
}
this._implyConnect();
this._protocol.quit(opts, bindToCurrentDomain(cb));
}
...
if (connection.state === 'disconnected') {
connection.destroy();
}
this._removeConnection(connection);
if (connection.state !== 'disconnected' && !connection._protocol._quitSequence) {
connection._realEnd(cb);
return;
}
process.nextTick(cb);
};
Pool.prototype._removeConnection = function(connection) {
...
- description and source-code
function _removeFromPool() {
if (!this._pool || this._pool._closed) {
return;
}
var pool = this._pool;
this._pool = null;
pool._purgeConnection(this);
}
...
// When a fatal error occurs the connection's protocol ends, which will cause
// the connection to end as well, thus we only need to watch for the end event
// and we will be notified of disconnects.
this.on('end', this._removeFromPool);
this.on('error', function (err) {
if (err.fatal) {
this._removeFromPool();
}
});
}
PoolConnection.prototype.release = function release() {
var pool = this._pool;
...
- description and source-code
destroy = function () {
Connection.prototype.destroy.apply(this, arguments);
this._removeFromPool(this);
}
...
An alternative way to end the connection is to call the 'destroy()' method.
This will cause an immediate termination of the underlying socket.
Additionally 'destroy()' guarantees that no more events or callbacks will be
triggered for the connection.
'''js
connection.destroy();
'''
Unlike 'end()' the 'destroy()' method does not take a callback argument.
#
Rather than creating and managing connections one-by-one, this module also
...
- description and source-code
end = function () {
console.warn( 'Calling conn.end() to release a pooled connection is '
+ 'deprecated. In next version calling conn.end() will be '
+ 'restored to default conn.end() behavior. Use '
+ 'conn.release() instead.'
);
this.release();
}
...
connection.connect();
connection.query('SELECT 1 + 1 AS solution', function (error, results, fields) {
if (error) throw error;
console.log('The solution is: ', results[0].solution);
});
connection.end();
'''
From this example, you can learn the following:
* Every method you invoke on a connection is queued and executed in sequence.
* Closing the connection is done using 'end()' which makes sure all remaining
queries are executed before sending a quit packet to the mysql server.
...
- description and source-code
function release() {
var pool = this._pool;
if (!pool || pool._closed) {
return undefined;
}
return pool.releaseConnection(this);
}
...
});
pool.getConnection(function(err, connection) {
// connected! (unless 'err' is set)
});
'''
When you are done with a connection, just call 'connection.release()' and the
connection will return to the pool, ready to be used again by someone else.
'''js
var mysql = require('mysql');
var pool = mysql.createPool(...);
pool.getConnection(function(err, connection) {
...
- description and source-code
function PoolNamespace(cluster, pattern, selector) {
this._cluster = cluster;
this._pattern = pattern;
this._selector = new PoolSelector[selector]();
}
n/a
- description and source-code
function _getClusterNode() {
var foundNodeIds = this._cluster._findNodeIds(this._pattern);
var nodeId;
switch (foundNodeIds.length) {
case 0:
nodeId = null;
break;
case 1:
nodeId = foundNodeIds[0];
break;
default:
nodeId = this._selector(foundNodeIds);
break;
}
return nodeId !== null
? this._cluster._getNode(nodeId)
: null;
}
...
function PoolNamespace(cluster, pattern, selector) {
this._cluster = cluster;
this._pattern = pattern;
this._selector = new PoolSelector[selector]();
}
PoolNamespace.prototype.getConnection = function(cb) {
var clusterNode = this._getClusterNode();
var cluster = this._cluster;
var namespace = this;
if (clusterNode === null) {
var err = null;
if (this._cluster._findNodeIds(this._pattern, true).length !== 0) {
...
- description and source-code
getConnection = function (cb) {
var clusterNode = this._getClusterNode();
var cluster = this._cluster;
var namespace = this;
if (clusterNode === null) {
var err = null;
if (this._cluster._findNodeIds(this._pattern, true).length !== 0) {
err = new Error('Pool does not have online node.');
err.code = 'POOL_NONEONLINE';
} else {
err = new Error('Pool does not exist.');
err.code = 'POOL_NOEXIST';
}
cb(err);
return;
}
cluster._getConnection(clusterNode, function(err, connection) {
var retry = err && cluster._canRetry
&& cluster._findNodeIds(namespace._pattern).length !== 0;
if (retry) {
namespace.getConnection(cb);
return;
}
if (err) {
cb(err);
return;
}
cb(null, connection);
});
}
...
var pool = mysql.createPool({
host : 'example.org',
user : 'bob',
password : 'secret',
database : 'my_db'
});
pool.getConnection(function(err, connection) {
// connected! (unless 'err' is set)
});
'''
When you are done with a connection, just call 'connection.release()' and the
connection will return to the pool, ready to be used again by someone else.
...
- description and source-code
query = function (sql, values, cb) {
var cluster = this._cluster;
var clusterNode = this._getClusterNode();
var query = Connection.createQuery(sql, values, cb);
var namespace = this;
if (clusterNode === null) {
var err = null;
if (this._cluster._findNodeIds(this._pattern, true).length !== 0) {
err = new Error('Pool does not have online node.');
err.code = 'POOL_NONEONLINE';
} else {
err = new Error('Pool does not exist.');
err.code = 'POOL_NOEXIST';
}
process.nextTick(function () {
query.on('error', function () {});
query.end(err);
});
return query;
}
if (!(typeof sql === 'object' && 'typeCast' in sql)) {
query.typeCast = clusterNode.pool.config.connectionConfig.typeCast;
}
if (clusterNode.pool.config.connectionConfig.trace) {
query._callSite = new Error();
}
cluster._getConnection(clusterNode, function (err, conn) {
var retry = err && cluster._canRetry
&& cluster._findNodeIds(namespace._pattern).length !== 0;
if (retry) {
namespace.query(query);
return;
}
if (err) {
query.on('error', function () {});
query.end(err);
return;
}
query.once('end', function() {
conn.release();
});
conn.query(query);
});
return query;
}
...
user : 'me',
password : 'secret',
database : 'my_db'
});
connection.connect();
connection.query('SELECT 1 + 1 AS solution', function (error, results, fields) {
if (error) throw error;
console.log('The solution is: ', results[0].solution);
});
connection.end();
'''
...
- description and source-code
function PoolSelectorOrder() {
return function(clusterIds) {
return clusterIds[0];
};
}
n/a
- description and source-code
function PoolSelectorRandom() {
return function(clusterIds) {
return clusterIds[Math.floor(Math.random() * clusterIds.length)];
};
}
n/a
- description and source-code
function PoolSelectorRoundRobin() {
var index = 0;
return function(clusterIds) {
if (index >= clusterIds.length) {
index = 0;
}
var clusterId = clusterIds[index++];
return clusterId;
};
}
n/a
misc