Comparing version 0.0.3 to 0.1.0-alpha
@@ -6,73 +6,111 @@ /* vim: set expandtab tabstop=2 shiftwidth=2 foldmethod=marker: */ | ||
var util = require('util'); | ||
var mysql = require('mysql'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var Connection = require(__dirname + '/connection.js'); | ||
var Emitter = require('events').EventEmitter; | ||
var Pool = require(__dirname + '/pool.js'); | ||
var Queue = require('safequeue'); | ||
var READONLY = 1; | ||
var WRITABLE = 2; | ||
var MAX_QUEUED_SIZE = 1000; | ||
var SLAVE_JUDGE_SQL = 'SHOW VARIABLES LIKE "READ_ONLY"'; | ||
var READONLY = 1; | ||
var WRITABLE = 2; | ||
var _QueueTimeoutError = function (msg) { | ||
var e = new Error(msg); | ||
e.name = 'QueueTimeout'; | ||
return e; | ||
}; | ||
var _QueueIsFullError = function (msg) { | ||
var e = new Error(msg); | ||
e.name = 'QueueIsFull'; | ||
return e; | ||
}; | ||
/* {{{ private function _readonly() */ | ||
var _readonly = function (sql) { | ||
return sql.match(/^(SELECT|SHOW|DESC|DESCRIBE|KILL)\s+/i) ? true : false; | ||
}; | ||
/* }}} */ | ||
var QueryError = function (name, msg) { | ||
var e = new Error(msg || name); | ||
e.name = name; | ||
return e; | ||
/* {{{ private function _remove() */ | ||
var _remove = function (a, o) { | ||
var i = a.indexOf(o); | ||
if (i > -1) { | ||
a.splice(i, 1); | ||
} | ||
return a; | ||
}; | ||
/* }}} */ | ||
exports.create = function (options) { | ||
var _options = { | ||
'maxconnection' : 4, /**< 单机允许的最大连接数 */ | ||
'maxqueuedsql' : 1000, /**< 允许排队的最大SQL数 */ | ||
}; | ||
for (var i in options) { | ||
_options[i] = options[i]; | ||
} | ||
/** | ||
* @ 心跳SQL | ||
*/ | ||
var hbquery = 'SHOW VARIABLES LIKE "READ_ONLY"'; | ||
/* {{{ private function hbparse() */ | ||
/** | ||
* @ 心跳连接 | ||
* parse heartbeat result | ||
* | ||
* @return Integer 0 : offline; 1 : readonly; 3 : writable | ||
*/ | ||
var c_heart = {}; | ||
var hbparse = function (res) { | ||
if (!res || !res.length) { | ||
return 0; | ||
} | ||
var s = READONLY; | ||
if (((res.shift() || {}).Value + '').match(/^(off)$/i)) { | ||
s |= WRITABLE; | ||
} | ||
return s; | ||
}; | ||
/* }}} */ | ||
var Cluster = function () { | ||
Emitter.call(this); | ||
}; | ||
util.inherits(Cluster, Emitter); | ||
/** | ||
* @ 工作连接 | ||
* @ 连接池列表 | ||
*/ | ||
var c_query = []; | ||
var backups = {}; | ||
/** | ||
* @ 空闲写连接 | ||
* @ 读写列表 | ||
*/ | ||
var w_stack = []; | ||
var rwlists = []; | ||
/** | ||
* @ 空闲读连接 | ||
* @ 读写队列 | ||
*/ | ||
var r_stack = []; | ||
var rwqueue = Queue.create({'timeout' : 0, 'maxitem' : MAX_QUEUED_SIZE}); | ||
rwqueue.on('timeout', function (item, timeout) { | ||
(item[2])(_QueueTimeoutError('Query stays in the queue more than ' + timeout + ' ms')); | ||
}); | ||
/** | ||
* @ 写请求 | ||
* @ 只读列表 | ||
*/ | ||
var w_queue = []; | ||
var rolists = []; | ||
/** | ||
* @ 读请求 | ||
* @ 只读队列 | ||
*/ | ||
var r_queue = []; | ||
var roqueue = Queue.create({'timeout' : 0, 'maxitem' : MAX_QUEUED_SIZE}); | ||
roqueue.on('timeout', function (item, timeout) { | ||
(item[2])(_QueueTimeoutError('Query stays in the queue more than ' + timeout + ' ms')); | ||
}); | ||
/* {{{ private function _forkconn() */ | ||
var _forkconn = function (s) { | ||
var m, c; | ||
for (var i in c_heart) { | ||
m = c_heart[i]; | ||
if (m.n < _options.maxconnection && (s & m.s)) { | ||
c = m.c.clone(); | ||
c.on('error', function (e) {}); | ||
c._status = m.s; | ||
c_heart[i].n++; | ||
return c; | ||
} | ||
/* {{{ public prototype setHeartBeatQuery() */ | ||
Cluster.prototype.setHeartBeatQuery = function (sql, parser) { | ||
Object.keys(backups).forEach(function (i) { | ||
backups[i].setHeartBeatQuery(sql); | ||
}); | ||
hbquery = sql; | ||
if ('function' === (typeof parser)) { | ||
hbparse = parser; | ||
} | ||
@@ -82,203 +120,89 @@ }; | ||
/* {{{ private function _getwconn() */ | ||
var _getwconn = function () { | ||
var i; | ||
do { | ||
i = w_stack.pop(); | ||
if (i && c_query[i]) { | ||
if (c_query[i]._status & WRITABLE) { | ||
return i; | ||
} | ||
r_stack.push(i); | ||
/* {{{ private function checkQueue() */ | ||
var checkQueue = function (queue, pool, max) { | ||
var max = (~~max) || 4; | ||
while (max > 0) { | ||
var s = queue.shift(); | ||
if (!s || !s.length) { | ||
return; | ||
} | ||
} while (i); | ||
/** | ||
* XXX: 实质上连接还没有建立好 | ||
*/ | ||
var m = _forkconn(WRITABLE); | ||
if (!m) { | ||
return -1; | ||
pool.query(s[0], s[1], s[2]); | ||
max--; | ||
} | ||
return c_query.push(m) - 1; | ||
}; | ||
/* }}} */ | ||
/* {{{ private function _getrconn() */ | ||
var _getrconn = function () { | ||
var i; | ||
do { | ||
i = r_stack.pop(); | ||
if (i && c_query[i]) { | ||
return i; | ||
} | ||
} while (i); | ||
var m = _forkconn(READONLY); | ||
if (!m) { | ||
return -1; | ||
if (queue.size() > 0) { | ||
process.nextTick(function () { | ||
checkQueue(queue, pool, max); | ||
}); | ||
} | ||
return c_query.push(m) - 1; | ||
}; | ||
/* }}} */ | ||
/* {{{ private function _nextsql() */ | ||
var _nextsql = function (m, i) { | ||
var q; | ||
/* {{{ public prototype addserver() */ | ||
Cluster.prototype.addserver = function (config) { | ||
var p = Pool.create(options, config); | ||
var i = p._name(); | ||
backups[i] = p; | ||
if ((WRITABLE & m._status) > 0) { | ||
q = w_queue.shift() || r_queue.shift(); | ||
if (!q) { | ||
w_stack.push(i); | ||
} | ||
} else { | ||
q = r_queue.shift(); | ||
if (!q) { | ||
r_stack.push(i); | ||
} | ||
} | ||
if (!q) { | ||
return; | ||
} | ||
m.query(q[0], null, function (error, res) { | ||
(q[1])(error, res); | ||
_nextsql(m, i); | ||
var _self = this; | ||
p.on('error', function (e) { | ||
_self.emit('error', e); | ||
}); | ||
}; | ||
/* }}} */ | ||
/* {{{ private function _check_queued_query() */ | ||
var _check_queued_query = function () { | ||
var i = -1; | ||
while (w_queue.length > 0 || r_queue.length > 0) { | ||
i = _getwconn(); | ||
if (i < 0) { | ||
break; | ||
p.setHeartBeatQuery(hbquery); | ||
p.on('state', function (res) { | ||
var s = hbparse(res); | ||
if (!s) { | ||
rolists = _remove(rolists, i); | ||
rwlists = _remove(rwlists, i); | ||
return; | ||
} | ||
_nextsql(c_query[i], i); | ||
} | ||
while (r_queue.length > 0) { | ||
i = _getrconn(); | ||
if (i < 0) { | ||
break; | ||
if (rolists.indexOf(i) < 0) { | ||
checkQueue(roqueue, p); | ||
rolists.push(i); | ||
} | ||
_nextsql(c_query[i], i); | ||
} | ||
if ((WRITABLE & s) && rwlists.indexOf(i) < 0) { | ||
checkQueue(rwqueue, p); | ||
rwlists.push(i); | ||
} | ||
}); | ||
}; | ||
/* }}} */ | ||
/* {{{ private function _querywithouttimeout() */ | ||
var _querywithouttimeout = function (sql, callback) { | ||
/** | ||
* @ 读计数器 | ||
*/ | ||
var rocount = 0; | ||
var w = _readonly(sql) ? false : true; | ||
var i = _getwconn(); | ||
if (!w && i < 0) { | ||
i = _getrconn(); | ||
} | ||
/** | ||
* @ 写计数器 | ||
*/ | ||
var rwcount = 0; | ||
if (i > -1) { | ||
c_query[i].query(sql, null, function (error, res) { | ||
callback(error, res); | ||
_nextsql(c_query[i], i); | ||
}); | ||
} else if (w) { | ||
w_queue.push([sql, callback]); | ||
} else { | ||
r_queue.push([sql, callback]); | ||
} | ||
}; | ||
/* }}} */ | ||
/* {{{ public function query() */ | ||
Cluster.prototype.query = function (sql, tmout, callback) { | ||
var Cluster = function () { | ||
EventEmitter.call(this); | ||
}; | ||
util.inherits(Cluster, EventEmitter); | ||
/* {{{ public prototype addserver() */ | ||
Cluster.prototype.addserver = function (config) { | ||
var m = Connection.create(config); | ||
var i = m._name; | ||
var _self = this; | ||
m.on('error', function (e) { | ||
_self.emit('error', e); | ||
}); | ||
if (c_heart[i]) { | ||
m.close(); | ||
return _self; | ||
} | ||
c_heart[i] = { | ||
'n' : 0, /**< 连接数 */ | ||
's' : 0, /**< 状态 */ | ||
'c' : m, /**< 心跳 */ | ||
if ('function' === (typeof tmout)) { | ||
callback = tmout; | ||
tmout = 0; | ||
}; | ||
m.on('close', function () { | ||
delete c_heart[i]; | ||
_self.addserver(config); | ||
_self.emit('notice', util.format('hearbeat for "%s" closed, try to reconnect', i)); | ||
}); | ||
(function heartbeat () { | ||
m.query(SLAVE_JUDGE_SQL, 100, function (e, r) { | ||
var s = e ? 0 : READONLY; | ||
if (((r && r.shift() || {}).Value + '').match(/^(off)$/i)) { | ||
s |= WRITABLE; | ||
if (!_readonly(sql.sql || sql)) { | ||
if (rwlists.length < 1) { | ||
if (rwqueue.push([sql, tmout, callback], tmout) < 0) { | ||
callback(_QueueIsFullError('Too many queries queued')); | ||
} | ||
var n = 0; | ||
c_query.forEach(function (c, i) { | ||
if (c._name === m._name) { | ||
c_query[i]._status = s; | ||
n++; | ||
} | ||
}); | ||
c_heart[i].n = n; | ||
if (s !== c_heart[i].s) { | ||
_self.emit('notice', util.format('hearbeat for "%s" state changed to %d', i, s)); | ||
} else { | ||
backups[rwlists[(++rwcount) % rwlists.length]].query(sql, tmout, callback); | ||
} | ||
} else { | ||
if (rolists.length < 1) { | ||
if (roqueue.push([sql, tmout, callback], tmout) < 0) { | ||
callback(_QueueIsFullError('Too many queries queued')); | ||
} | ||
c_heart[i].s = s; | ||
if ((s & READONLY) > 0) { | ||
_check_queued_query(); | ||
} | ||
setTimeout(heartbeat, 1000); | ||
}); | ||
})(); | ||
return _self; | ||
}; | ||
/* }}} */ | ||
/* {{{ public prototype query() */ | ||
Cluster.prototype.query = function (sql, timeout, callback) { | ||
if ('function' === (typeof timeout)) { | ||
callback = timeout; | ||
timeout = 0; | ||
} else { | ||
backups[rolists[(++rocount) % rolists.length]].query(sql, tmout, callback); | ||
} | ||
} | ||
if (!timeout || timeout < 0) { | ||
return _querywithouttimeout(sql, callback); | ||
} | ||
var _self = this; | ||
var tmout = setTimeout(function () { | ||
callback(QueryError('QueryTimeout', 'Mysql query timeout after ' + timeout + ' ms.')); | ||
callback = function (error, res) { | ||
_self.emit('timeout', error, res, sql); | ||
} | ||
}, timeout); | ||
_querywithouttimeout(sql, function (error, res) { | ||
clearTimeout(tmout); | ||
tmout = null; | ||
callback(error, res); | ||
}); | ||
}; | ||
@@ -288,4 +212,3 @@ /* }}} */ | ||
return new Cluster(); | ||
}; | ||
@@ -16,10 +16,70 @@ /* vim: set expandtab tabstop=2 shiftwidth=2 foldmethod=marker: */ | ||
/** | ||
* 0 : 未连接 | ||
* 1 : 正在连接 | ||
* 2 : 连接成功 | ||
* -1: 准备断开 | ||
*/ | ||
this._flag = 0; | ||
this._conn = null; | ||
this._conn = mysql.createConnection(options); | ||
this._name = util.format('%s@%s:%d', options.user, options.host, options.port); | ||
this.connect(options); | ||
this.connect((options && options.timeout) ? options.timeout : 100); | ||
}; | ||
util.inherits(Connection, EventEmitter); | ||
Connection.prototype.connect = function (timeout) { | ||
if (this._flag > 0) { | ||
return true; | ||
} | ||
var _self = this; | ||
/** | ||
* @ 连接超时 | ||
*/ | ||
var tmout = (~~timeout) || 100; | ||
var timer = setTimeout(function () { | ||
_self._flag = 0; | ||
// XXX: this make "Aborted_connects" ++ in mysql server | ||
_self._conn._socket.end(); | ||
_self.emit('error', _self._error('ConnectTimeout', | ||
'Connect to mysql server timeout after ' + tmout + ' ms')); | ||
}, tmout); | ||
_self._conn.removeAllListeners(); | ||
['error', 'close', 'end'].forEach(function (i) { | ||
_self._conn.on(i, function (e) { | ||
if (_self._flag < 1) { | ||
return; | ||
} | ||
if (e) { | ||
e = _self._error(e); | ||
} | ||
if ((e && e.fatal) || 'error' !== i) { | ||
_self._flag = 0; | ||
_self._conn.end(); | ||
_self.emit('close', e); | ||
} | ||
_self.emit('error', e); | ||
}); | ||
}); | ||
_self._flag = 1; | ||
_self._conn.connect(function (e) { | ||
clearTimeout(timer); | ||
timer = null; | ||
if (!e) { | ||
_self._flag = 2; | ||
} else { | ||
if (1 === _self._flag) { | ||
_self.emit('error', _self._error(e)); | ||
} | ||
_self._flag = 0; | ||
} | ||
}); | ||
}; | ||
Connection.prototype._error = function (name, msg) { | ||
@@ -38,102 +98,33 @@ var e; | ||
Connection.prototype.clone = function () { | ||
return new Connection(this._conn.config); | ||
}; | ||
Connection.prototype.close = function (callback) { | ||
callback = callback || function () {}; | ||
var _self = this; | ||
if (_self._flag < 1) { | ||
if (this._flag < 1) { | ||
return callback(); | ||
} | ||
var _self = this; | ||
_self._flag = -1; | ||
_self._conn.end(function (error) { | ||
_self.emit('close'); | ||
callback(error); | ||
_self.emit('close'); | ||
}); | ||
}; | ||
Connection.prototype.connect = function (options) { | ||
Connection.prototype.query = function (sql, timeout, callback) { | ||
var _self = this; | ||
if (_self._flag > 0) { | ||
return; | ||
if (!timeout || timeout < 1) { | ||
return this._conn.query(sql, callback); | ||
} | ||
/** | ||
* @ 连接超时 | ||
*/ | ||
var tmout = options.timeout || 100; | ||
/** | ||
* @ 重连延迟 | ||
*/ | ||
var delay = 20; | ||
/** | ||
* onconnect() | ||
*/ | ||
/* {{{ */ | ||
var onconnect = function (e) { | ||
_self._flag = e ? 0 : 1; | ||
_self.emit('state', _self._flag); | ||
if (e) { | ||
delay = Math.min(10000, 2 * delay); | ||
setTimeout(_connect, delay); | ||
_self.emit('error', _self._error(e)); | ||
return; | ||
} | ||
delay = 20; | ||
_self._conn.on('error', function (error) { | ||
if (_self._flag < 0 || !error.fatal || 'PROTOCOL_CONNECTION_LOST' !== error.code) { | ||
return; | ||
} | ||
_connect(); | ||
}); | ||
}; | ||
/* }}} */ | ||
/* {{{ */ | ||
var _connect = function () { | ||
var timer = setTimeout(function () { | ||
// XXX: this make "Aborted_connects" ++ in mysql server | ||
_self._conn._socket.end(); | ||
}, tmout); | ||
_self._conn = mysql.createConnection(options); | ||
_self._conn.connect(function (error) { | ||
clearTimeout(timer); | ||
timer = null; | ||
onconnect(error); | ||
}); | ||
}; | ||
/* }}} */ | ||
_connect(); | ||
}; | ||
Connection.prototype.query = function (sql, timeout, callback) { | ||
if (!timeout || timeout < 0) { | ||
this._conn.query(sql, callback); | ||
return; | ||
} | ||
var _self = this; | ||
var tmout = setTimeout(function () { | ||
var timer = setTimeout(function () { | ||
callback(_self._error('QueryTimeout', 'Mysql query timeout after ' + timeout + ' ms')); | ||
callback = function (e, r) { | ||
_self.emit('timeout', e, r, sql); | ||
_self.emit('late', e, r, sql); | ||
}; | ||
}, timeout); | ||
_self._conn.query(sql, function (error, res) { | ||
clearTimeout(tmout); | ||
tmout = null; | ||
clearTimeout(timer); | ||
timer = null; | ||
callback(error ? _self._error(error) : null, res); | ||
@@ -140,0 +131,0 @@ }); |
247
lib/pool.js
@@ -5,16 +5,20 @@ /* vim: set expandtab tabstop=2 shiftwidth=2 foldmethod=marker: */ | ||
var Util = require('util'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var util = require('util'); | ||
var Emitter = require('events').EventEmitter; | ||
var Queue = require('safequeue'); | ||
var Connection = require(__dirname + '/connection.js'); | ||
var READONLY = 1; | ||
var WRITABLE = 2; | ||
var HEARTBEAT_TIMEOUT = 100; | ||
var SLAVE_JUDGE_QUERY = 'SHOW VARIABLES LIKE "READ_ONLY"'; | ||
var _QueueTimeoutError = function (msg) { | ||
var e = new Error(msg); | ||
e.name = 'QueueTimeout'; | ||
return e; | ||
}; | ||
exports.create = function (options, config) { | ||
var _options = { | ||
'maxconnections' : 4, | ||
'maxconnections' : 4, | ||
'maxidletime' : 60000, | ||
@@ -27,60 +31,71 @@ }; | ||
/** | ||
* @ 心跳连接 | ||
* @ 连接数组 | ||
*/ | ||
var c_heart = null; | ||
var conns = []; | ||
/** | ||
* @ 心跳状态 | ||
* @ 重连暂停 | ||
*/ | ||
var c_state = 0; | ||
var pause = 50; | ||
/** | ||
* @ 工作连接 | ||
* @ 心跳SQL | ||
*/ | ||
var _worker = {}; | ||
var hbsql = 'SHOW VARIABLES LIKE "READ_ONLY"'; | ||
/** | ||
* @ 连接标识符 | ||
* @ 心跳计时器 | ||
*/ | ||
var c_index = 0; | ||
var tbeat = null; | ||
/** | ||
* @ 空闲连接 | ||
* @ 空闲计时器 | ||
*/ | ||
var c_queue = []; | ||
var timer = {}; | ||
/* {{{ private function _reboot() */ | ||
/** | ||
* @ 断开定时器 | ||
* @ 重启心跳连接 | ||
*/ | ||
var c_timer = {}; | ||
var MysqlPool = function () { | ||
EventEmitter.call(this); | ||
this._init(); | ||
var _reboot = function (o) { | ||
var c = conns.shift(); | ||
if (!c) { | ||
return; | ||
} | ||
o.emit('state'); | ||
setTimeout(function () { | ||
c.removeAllListeners(); | ||
startup(o); | ||
}, pause); | ||
pause = pause + pause; | ||
}; | ||
Util.inherits(MysqlPool, EventEmitter); | ||
/* }}} */ | ||
/* {{{ private prototype _init() */ | ||
MysqlPool.prototype._init = function () { | ||
var _self = this; | ||
c_heart = Connection.create(config); | ||
c_heart.on('error', function (e) { | ||
_self.emit('error', e); | ||
/* {{{ private function startup() */ | ||
var startup = function (o) { | ||
var c = Connection.create(config); | ||
var s = ''; | ||
conns.unshift(c); | ||
['error', 'close'].forEach(function (i) { | ||
c.once(i, function (e) { | ||
_reboot(o); | ||
if (e) { | ||
o.emit('error', e); | ||
} | ||
}); | ||
}); | ||
c_heart.on('close', function () { | ||
_self._init(); | ||
}); | ||
(function heartbeat() { | ||
c_heart.query(SLAVE_JUDGE_QUERY, HEARTBEAT_TIMEOUT, function (e, r) { | ||
setTimeout(heartbeat, 10 * HEARTBEAT_TIMEOUT); | ||
var s = e ? 0 : READONLY; | ||
if (((r && r.shift() || {}).Value + '').match(/^(off)$/i)) { | ||
s |= WRITABLE; | ||
c.query(hbsql, HEARTBEAT_TIMEOUT, function (e, r) { | ||
tbeat = setTimeout(heartbeat, 10 * HEARTBEAT_TIMEOUT); | ||
if (e) { | ||
_reboot(o); | ||
o.emit('error', e); | ||
return; | ||
} | ||
if (s !== c_state) { | ||
c_state = s; | ||
_self.emit('state', c_state); | ||
pause = 50; | ||
var t = JSON.stringify(r); | ||
if (t !== s) { | ||
s = t; | ||
o.emit('state', r); | ||
} | ||
@@ -92,29 +107,80 @@ }); | ||
/* {{{ public prototype getconn() */ | ||
/* {{{ private function _remove() */ | ||
var _remove = function (c, o) { | ||
var i = conns.indexOf(c); | ||
if (i < 0) { | ||
return; | ||
} | ||
c.close(); | ||
conns.splice(i, 1); | ||
i = o._stack.indexOf(i); | ||
if (i > -1) { | ||
o._stack.splice(i, 1); | ||
} | ||
}; | ||
/* }}} */ | ||
/* {{{ private function execute() */ | ||
var execute = function (c, o, s) { | ||
c.query(s[0], s[1], function (e, r) { | ||
(s[2])(e, r); | ||
if (e && e.fatal) { | ||
c.close(); | ||
return; | ||
} | ||
s = o._queue.shift(); | ||
if (!s) { | ||
release(o, c); | ||
} else { | ||
execute(c, o, s); | ||
} | ||
}); | ||
}; | ||
/* }}} */ | ||
/* {{{ private function _wakeup() */ | ||
/** | ||
* @ 获取一个连接 | ||
* | ||
* @ access public | ||
* wake up a pool to execute query | ||
*/ | ||
MysqlPool.prototype.getconn = function () { | ||
var i; | ||
while (c_queue.length) { | ||
i = c_queue.pop(); | ||
if (i && _worker[i]) { | ||
if (c_timer[i]) { | ||
clearTimeout(c_timer[i]); | ||
delete c_timer[i]; | ||
var _wakeup = function (o) { | ||
var m = 1 + _options.maxconnections + o._stack.length - conns.length; | ||
while (m && o._queue.size()) { | ||
var s = o._queue.shift(); | ||
var i, c; | ||
do { | ||
i = o._stack.pop(); | ||
if (i && conns[i]) { | ||
c = conns[i]; | ||
if (timer[i]) { | ||
clearTimeout(timer[i]); | ||
delete timer[i]; | ||
} | ||
} | ||
} while (i && !c); | ||
return [i, _worker[i]]; | ||
if (!c) { | ||
c = Connection.create(config); | ||
conns.push(c); | ||
['error', 'close'].forEach(function (k) { | ||
c.once(k, function (e) { | ||
_remove(c, o); | ||
}); | ||
}); | ||
} | ||
execute(c, o, s); | ||
m--; | ||
} | ||
}; | ||
/* }}} */ | ||
if ((c_state & READONLY) && Object.keys(_worker).length < _options.maxconnections) { | ||
var m = c_heart.clone(); | ||
m.on('error', function (e) {}); | ||
c_index = (++c_index) % 65535; | ||
_worker[c_index] = m; | ||
return [c_index, m]; | ||
/* {{{ private function release() */ | ||
var release = function (o, c) { | ||
var i = conns.indexOf(c); | ||
// XXX: we use conns[0] to heartbeat | ||
if (i > 0) { | ||
o._stack.push(i); | ||
timer[i] = setTimeout(function () { | ||
_remove(c, o); | ||
}, _options.maxidletime); | ||
} | ||
@@ -124,15 +190,42 @@ }; | ||
/* {{{ public prototype release() */ | ||
var Mysql = function () { | ||
Emitter.call(this); | ||
startup(this); | ||
/** | ||
* @ 执行队列 | ||
*/ | ||
var _name = this._name(); | ||
this._queue = Queue.create({'timeout' : 0, 'maxitem' : 0}); | ||
this._queue.on('timeout', function (item, tmout, pos) { | ||
(item[2])(_QueueTimeoutError(util.format( | ||
'Query stays in the queue more than %d ms (%s)', tmout, _name))); | ||
}); | ||
/** | ||
* @ 空闲连接 | ||
*/ | ||
this._stack = []; | ||
}; | ||
util.inherits(Mysql, Emitter); | ||
/* {{{ public prototype _name() */ | ||
Mysql.prototype._name = function () { | ||
return conns[0]._name; | ||
}; | ||
/* }}} */ | ||
/* {{{ public prototype query() */ | ||
/** | ||
* @ 释放一个连接 | ||
* Get one connection and run sql | ||
* | ||
* @ access public | ||
* @ param {String|Object} sql | ||
* @ param {Function} cb | ||
*/ | ||
MysqlPool.prototype.release = function (i) { | ||
if (_worker[i]) { | ||
c_timer[i] = setTimeout(function () { | ||
c_timer[i].close(function () { | ||
delete _worker[i]; | ||
}); | ||
}, _options.maxidletime); | ||
Mysql.prototype.query = function (sql, tmout, cb) { | ||
var n = this._queue.push([sql, tmout, cb], tmout); | ||
if (1 === n) { | ||
_wakeup(this); | ||
} | ||
@@ -142,4 +235,10 @@ }; | ||
return new MysqlPool(); | ||
/* {{{ public prototype setHeartBeatQuery() */ | ||
Mysql.prototype.setHeartBeatQuery = function (sql) { | ||
hbsql = sql; | ||
}; | ||
/* }}} */ | ||
return new Mysql(); | ||
}; | ||
{ | ||
"name": "easymysql", | ||
"version": "0.0.3", | ||
"author": "Aleafs Zhang (zhangxc83@gmail.com)", | ||
"contributors": [], | ||
"homepage": "git@github.com:aleafs/mysql-cluster.git", | ||
"description": "Mysql client in cluster mode, based on node-mysql", | ||
"keywords": [ "mysql", "cluster" ], | ||
"dependencies": { | ||
"mysql" : "=2.0.0-alpha3" | ||
}, | ||
"engines": { | ||
"node": ">=0.6.9" | ||
}, | ||
"devDependencies": { | ||
"mocha" : ">=0.9.0", | ||
"should" : ">=0.4.2", | ||
"interceptor" : ">=0.0.2", | ||
"visionmedia-jscoverage" : ">=1.0.0" | ||
}, | ||
"main": "./index.js", | ||
"scripts": { | ||
"test": "make test" | ||
} | ||
"version": "0.1.0-alpha", | ||
"author": "Aleafs Zhang (zhangxc83@gmail.com)", | ||
"contributors": [], | ||
"homepage": "git@github.com:aleafs/mysql-cluster.git", | ||
"description": "Mysql client in cluster mode, based on node-mysql", | ||
"keywords": [ "mysql", "cluster" ], | ||
"dependencies": { | ||
"mysql" : "=2.0.0-alpha4", | ||
"safequeue" : ">=0.0.1" | ||
}, | ||
"engines": { | ||
"node": ">=0.6.9" | ||
}, | ||
"devDependencies": { | ||
"clone" : ">=0.1.1", | ||
"mocha" : ">=0.9.0", | ||
"should" : ">=0.4.2", | ||
"interceptor" : ">=0.0.5", | ||
"rewire" : ">=1.0.3", | ||
"visionmedia-jscoverage" : ">=1.0.0" | ||
}, | ||
"main": "./index.js", | ||
"scripts": { | ||
"test": "make test" | ||
} | ||
} |
@@ -1,2 +0,2 @@ | ||
[![Build Status](https://secure.travis-ci.org/aleafs/easymysql.png?branch=master)](http://travis-ci.org/aleafs/easymysql) | ||
[![Build Status](https://secure.travis-ci.org/aleafs/easymysql.png?branch=develop)](http://travis-ci.org/aleafs/easymysql) | ||
@@ -3,0 +3,0 @@ Click **[here](http://aleafs.github.com/coverage/easymysql.html)** to get the details of test coverage. |
/* vim: set expandtab tabstop=2 shiftwidth=2 foldmethod=marker: */ | ||
var Clone = require('clone'); | ||
var config = { | ||
@@ -7,3 +9,4 @@ 'host' : '127.0.0.1', | ||
'user' : 'root', | ||
'password' : '' | ||
'password' : '', | ||
'timeout' : 1000, /**< connect timeout in ms */ | ||
}; | ||
@@ -18,2 +21,3 @@ try { | ||
exports.istravis = process.env.CI ? true : false; | ||
exports.config = config; | ||
@@ -31,3 +35,92 @@ exports.extend = function (a) { | ||
exports.istravis = process.env.CI ? true : false; | ||
var util = require('util'); | ||
var Emitter = require('events').EventEmitter; | ||
exports.mockConnection = function () { | ||
/** | ||
* @ 请求过的SQL | ||
*/ | ||
var __queries = []; | ||
/** | ||
* @ 伪造的数据 | ||
*/ | ||
var __Results = []; | ||
/** | ||
* @ 连接对象 | ||
*/ | ||
var __Objects = []; | ||
/* {{{ private mocked Connection() */ | ||
var Connection = function () { | ||
Emitter.call(this); | ||
this._name = 'test'; | ||
}; | ||
util.inherits(Connection, Emitter); | ||
Connection.prototype.connect = function () { | ||
}; | ||
Connection.prototype.close = function () { | ||
this.emit('close'); | ||
}; | ||
Connection.prototype.query = function (sql, tmout, callback) { | ||
var n = __queries.push(sql); | ||
var r = [], e = null; | ||
for (var i = 0; i < __Results.length; i++) { | ||
var m = (__Results[i])(sql); | ||
if (m && m.length) { | ||
r = m[0]; | ||
e = m[1]; | ||
break; | ||
} | ||
} | ||
if (tmout < 10) { | ||
process.nextTick(function () { | ||
callback(e, r); | ||
}); | ||
} else { | ||
setTimeout(function () { | ||
callback(e, r); | ||
}, 3 + tmout); | ||
} | ||
}; | ||
/* }}} */ | ||
var _me = {}; | ||
_me.create = function () { | ||
var c = new Connection(); | ||
__Objects.push(c); | ||
return c; | ||
}; | ||
_me.makesureCleanAllData = function () { | ||
__queries = []; | ||
__Results = []; | ||
__Objects = []; | ||
}; | ||
_me.__mockQueryResult = function (p, res, e) { | ||
__Results.push(function (s) { | ||
if (s.match(new RegExp(p))) { | ||
return [Clone(res), Clone(e)]; | ||
} | ||
}); | ||
}; | ||
_me.__emitEvent = function (i, evt) { | ||
var c = __Objects[i]; | ||
if (!(c instanceof Connection)) { | ||
return; | ||
} | ||
var a = Array.prototype.slice.call(arguments, 1); | ||
c.emit.apply(c, a); | ||
}; | ||
return _me; | ||
}; | ||
@@ -9,9 +9,7 @@ /* vim: set expandtab tabstop=2 shiftwidth=2 foldmethod=marker: */ | ||
var LIBPATH = process.env.MYSQL_CLUSTER_COV ? 'lib-cov' : 'lib'; | ||
var Connection = require(util.format('%s/../%s/connection.js', __dirname, LIBPATH)); | ||
var getBlocker = function (port) { | ||
var Connection = require(__dirname + '/../lib/connection.js'); | ||
var getBlocker = function (port, cb) { | ||
var cfg = Common.extend(); | ||
var _me = interceptor.create(util.format('%s:%d', cfg.host, cfg.port || 3306)); | ||
_me.listen(port); | ||
_me.listen(port, cb); | ||
return _me; | ||
@@ -23,87 +21,37 @@ }; | ||
}; | ||
describe('mysql connection', function () { | ||
/* {{{ should_reconnect_works_fine() */ | ||
it('should_reconnect_works_fine', function (done) { | ||
var blocker = getBlocker(33061); | ||
blocker.block(); | ||
var config = Common.extend({ | ||
'host' : 'localhost', | ||
'port' : 33061 | ||
}); | ||
var _me = Connection.create(config); | ||
_me.on('error', function (e) { | ||
e.message.should.include(getAddress(config)); | ||
}); | ||
_me.query('SHOW DATABASES', 25, function (error, res) { | ||
error.should.have.property('name', 'QueryTimeout'); | ||
error.message.should.include(getAddress(config)); | ||
blocker.open(); | ||
_me.on('state', function (code) { | ||
if (code < 1) { | ||
return; | ||
} | ||
_me.query('SHOW DATABASES', 20, function (error, res) { | ||
should.ok(!error); | ||
JSON.stringify(res).should.include('{"Database":"mysql"}'); | ||
_me.close(function () { | ||
blocker.close(); | ||
done(); | ||
}); | ||
}); | ||
/* {{{ should_connnect_error_works_fine() */ | ||
it('should_connnect_error_works_fine', function (done) { | ||
var _me = Connection.create({'host' : 'localhost', 'port' : 80}); | ||
_me.on('error', function (e) { | ||
e.should.have.property('name', 'MysqlError'); | ||
e.message.should.include('@localhost:80'); | ||
_me.close(done); | ||
}); | ||
}); | ||
}); | ||
/* }}} */ | ||
/* {{{ should_mysql_restart_wroks_fine() */ | ||
it('should_mysql_restart_wroks_fine', function (done) { | ||
var blocker = getBlocker(33061); | ||
var config = Common.extend({ | ||
'host' : 'localhost', | ||
'port' : 33061 | ||
/* {{{ should_query_timeout_works_fine() */ | ||
it('should_query_timeout_works_fine', function (done) { | ||
var _me = Connection.create(Common.config); | ||
_me.on('late', function (e, r) { | ||
should.ok(!e); | ||
r.should.includeEql({'SLEEP(0.02)' : '0'}); | ||
_me.close(done); | ||
}); | ||
var _me = Connection.create(config); | ||
_me.on('error', function (e) { | ||
e.message.should.include(getAddress(config)); | ||
}); | ||
var state = 1; | ||
_me.on('state', function (mode) { | ||
mode.should.eql(state); | ||
var now = Date.now(); | ||
_me.query('SELECT SLEEP(0.01)', 0, function (e, r) { | ||
should.ok(!e); | ||
(Date.now() - now).should.above(9); | ||
r.should.includeEql({'SLEEP(0.01)' : '0'}); | ||
}); | ||
_me.query('SHOW ENGINES', 10, function (error, res) { | ||
should.ok(!error); | ||
blocker.close(); | ||
state = 0; | ||
var expect = JSON.stringify(res); | ||
setTimeout(function () { | ||
_me.query('SHOW ENGINES', 10, function (error, res) { | ||
error.should.have.property('name', 'QueryTimeout'); | ||
blocker = getBlocker(33061); | ||
state = 1; | ||
setTimeout(function () { | ||
_me.query('SHOW ENGINES', 1000, function (error, res) { | ||
should.ok(!error); | ||
JSON.stringify(res).should.eql(expect); | ||
_me.close(function () { | ||
blocker.close(); | ||
done(); | ||
}); | ||
}); | ||
}, 20); | ||
}); | ||
}, 20); | ||
var now = Date.now(); | ||
_me.query('SELECT SLEEP(0.02)', 15, function (e, r) { | ||
e.should.have.property('name', 'QueryTimeout'); | ||
e.message.should.include(getAddress(Common.config)); | ||
(Date.now() - now).should.below(20); | ||
}); | ||
@@ -113,21 +61,35 @@ }); | ||
/* {{{ should_query_timeout_works_fine() */ | ||
it('should_query_timeout_works_fine', function (done) { | ||
/* {{{ should_multi_connect_works_fine() */ | ||
it('should_multi_connect_works_fine', function (done) { | ||
var _me = Connection.create(Common.config); | ||
for (var i = 0; i < 10; i++) { | ||
_me.connect(Common.config.timeout); | ||
} | ||
_me.on('error', function (e) { | ||
e.message.should.include(getAddress(Common.config)); | ||
(true).should.eql(false); | ||
}); | ||
_me.on('timeout', function (error, res, sql) { | ||
_me.connect(); /**< 没有实际意义,纯粹为了覆盖率 */ | ||
should.ok(!error); | ||
sql.should.eql('SELECT SLEEP(0.02)'); | ||
_me.query('SHOW DATABASES', 1000, function (e, r) { | ||
should.ok(!e); | ||
r.should.includeEql({'Database' : 'mysql'}); | ||
_me.close(done); | ||
}); | ||
}); | ||
/* }}} */ | ||
_me.query('SELECT SLEEP(0.02)', 10, function (error, res) { | ||
error.should.have.property('name', 'QueryTimeout'); | ||
error.message.should.include('Mysql query timeout after 10 ms'); | ||
error.message.should.include(getAddress(Common.config)); | ||
/* {{{ should_connect_timeout_works_fine() */ | ||
it('should_connect_timeout_works_fine', function (done) { | ||
var blocker = getBlocker(33061, function () { | ||
var _config = Common.extend({ | ||
'host' : 'localhost', 'port' : 33061, 'timeout' : 50, | ||
}); | ||
blocker.blocking = true; | ||
//blocker.block(); | ||
var _me = Connection.create(_config); | ||
_me.on('error', function (e) { | ||
e.should.have.property('name', 'ConnectTimeout'); | ||
e.message.should.include(getAddress(_config)); | ||
blocker.close(); | ||
_me.close(done); | ||
}); | ||
}); | ||
@@ -137,37 +99,52 @@ }); | ||
/* {{{ should_auth_fail_works_fine() */ | ||
it('should_auth_fail_works_fine', function (done) { | ||
/* {{{ should_got_server_restart_event() */ | ||
it('should_got_server_restart_event', function (done) { | ||
var blocker = getBlocker(33061, function () { | ||
var _config = Common.extend({ | ||
'host' : 'localhost', 'port' : 33061 | ||
}); | ||
if (Common.istravis) { | ||
return done(); | ||
} | ||
blocker.open(); | ||
var _me = Connection.create(_config); | ||
var _me = Connection.create(Common.extend({'user' : 'i_am_not_exists'})); | ||
var _events = []; | ||
['error', 'close'].forEach(function (i) { | ||
_me.on(i, function (e) { | ||
if (e) { | ||
e.message.should.include(getAddress(_config)); | ||
} | ||
_events.push([i].concat(Array.prototype.slice.call(arguments, 0))); | ||
}); | ||
}); | ||
var err = 0; | ||
_me.on('error', function (e) { | ||
err++; | ||
}); | ||
_me.query('SHOW DATABASES', 100, function (error, res) { | ||
should.ok(!error); | ||
res.should.includeEql({'Database' : 'mysql'}); | ||
var afterClosed = function () { | ||
_events.should.eql([[ | ||
'close', { | ||
'fatal' : true, 'code' : 'PROTOCOL_CONNECTION_LOST', 'name' : 'MysqlError'} | ||
], [ | ||
'error', { | ||
'fatal' : true, 'code' : 'PROTOCOL_CONNECTION_LOST', 'name' : 'MysqlError'} | ||
]]); | ||
_me.query('SHOW DATABASES', 100, function (error, res) { | ||
should.ok(error); | ||
error.should.have.property('code', 'PROTOCOL_ENQUEUE_AFTER_QUIT'); | ||
error.message.should.include(getAddress(_config)); | ||
_me.close(done); | ||
}); | ||
}; | ||
_me.query('SHOW DATABASES', 100, function (error, res) { | ||
error.should.have.property('code', 'ER_ACCESS_DENIED_ERROR'); | ||
setTimeout(function () { | ||
err.should.eql(2); | ||
_me.close(done); | ||
}, 100); | ||
}); | ||
}); | ||
/* }}} */ | ||
blocker.outArr[0].once('close', function () { | ||
setTimeout(afterClosed, 20); | ||
}); | ||
/* {{{ should_error_message_contains_config() */ | ||
it('should_error_message_contains_config', function (done) { | ||
var _me = Connection.create(Common.config); | ||
_me.on('error', function (e) { | ||
e.message.should.include(getAddress(Common.config)); | ||
/** | ||
* XXX: server 端关闭 | ||
*/ | ||
_events = []; | ||
blocker.close(); | ||
}); | ||
}); | ||
_me.query('SELECT I_AM_NOT_DEFINED(123)', 10, function (error, res) { | ||
error.should.have.property('name', 'MysqlError'); | ||
error.message.should.include(getAddress(Common.config)); | ||
_me.close(done); | ||
}); | ||
}); | ||
@@ -174,0 +151,0 @@ /* }}} */ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
31793
15
1019
11
2
6
1
+ Addedsafequeue@>=0.0.1
+ Addedsafequeue@0.0.2(transitive)
Updatedmysql@=2.0.0-alpha4