tcp-base
Advanced tools
Comparing version 2.0.0 to 3.0.0
3.0.0 / 2017-04-20 | ||
================== | ||
* fix: invoke oneway after socket closed (#7) | ||
* refactor: [BREAKING-CHANGE] not support reconnect | ||
2.0.0 / 2017-02-17 | ||
@@ -3,0 +9,0 @@ ================== |
157
lib/base.js
@@ -11,2 +11,3 @@ 'use strict'; | ||
noDelay: true, | ||
connectTimeout: 3000, | ||
responseTimeout: 3000, | ||
@@ -16,4 +17,2 @@ heartbeatInterval: 5000, | ||
concurrent: 0, | ||
reConnectTimes: 0, | ||
reConnectInterval: 1000, | ||
logger: console, | ||
@@ -36,5 +35,2 @@ }; | ||
* - {Number} [responseTimeout] - limit the maximum time for waiting a response | ||
* - {Number} [reConnectTimes] - the maximum number of times the client will automatically try to | ||
* reconnect to the remote server if the connection is dropped | ||
* - {Number} [reConnectInterval] - the auto-reconnect interval, defaults to 1s | ||
* - {Logger} [logger] - the logger client | ||
@@ -55,3 +51,2 @@ * @constructor | ||
this.clientId = ++seed; | ||
this._reConnectTimes = this.options.reConnectTimes; | ||
this._heartbeatTimer = null; | ||
@@ -61,2 +56,3 @@ this._socket = null; | ||
this._bodyLength = null; | ||
this._lastError = null; | ||
this._queue = []; | ||
@@ -69,2 +65,7 @@ this._invokes = new Map(); | ||
this._connect(); | ||
this.ready(err => { | ||
if (!err && this.options.needHeartbeat) { | ||
this._startHeartbeat(); | ||
} | ||
}); | ||
} | ||
@@ -185,5 +186,16 @@ | ||
*/ | ||
send(packet, callback) { | ||
callback = callback || noop; | ||
send(packet, callback = noop) { | ||
if (!this._socket) { | ||
const err = new Error(`[TCPBase] The socket was closed. (address: ${this[addressKey]})`); | ||
err.id = packet.id; | ||
err.data = packet.data.toString('base64'); | ||
if (packet.oneway) { | ||
err.oneway = true; | ||
callback(); | ||
this.emit('error', err); | ||
} else { | ||
callback(err); | ||
} | ||
return; | ||
} | ||
if (packet.oneway) { | ||
@@ -194,8 +206,4 @@ this._socket.write(packet.data); | ||
} | ||
if (!this._writable) { | ||
this._queue.push([ packet, callback ]); | ||
if (!this._socket && !this._reConnectTimes) { | ||
this._cleanQueue(); | ||
} | ||
return; | ||
@@ -344,21 +352,16 @@ } | ||
if (!this._socket) { | ||
return; | ||
return Promise.resolve(); | ||
} | ||
this._socket.destroy(); | ||
this._handleClose(err); | ||
this._socket.destroy(err); | ||
return this.await('close'); | ||
} | ||
_handleClose(err) { | ||
_handleClose() { | ||
if (!this._socket) { | ||
return; | ||
} | ||
this._socket.removeAllListeners(); | ||
this._socket = null; | ||
if (err) { | ||
this.emit('error', err); | ||
} | ||
this._cleanInvokes(err); | ||
this._cleanInvokes(this._lastError); | ||
// clean timer | ||
@@ -369,70 +372,76 @@ if (this._heartbeatTimer) { | ||
} | ||
// auto-reconnect | ||
if (this._reConnectTimes) { | ||
setTimeout(() => { | ||
this._reConnectTimes--; | ||
this._connect(() => { | ||
this._reConnectTimes = this.options.reConnectTimes; | ||
// try to recover | ||
this._resume(); | ||
}); | ||
}, this.options.reConnectInterval); | ||
return; | ||
} | ||
this._cleanQueue(err); | ||
this._cleanQueue(this._lastError); | ||
this.emit('close'); | ||
this.removeAllListeners(); | ||
} | ||
_handleReadable() { | ||
this._lastReceiveDataTime = Date.now(); | ||
try { | ||
let remaining = false; | ||
do { | ||
remaining = this._readPacket(); | ||
} while (remaining); | ||
} catch (err) { | ||
this.close(err); | ||
} | ||
} | ||
_connect(done) { | ||
if (!done) { | ||
done = () => this.ready(true); | ||
done = err => { | ||
this.ready(err ? err : true); | ||
}; | ||
} | ||
const socket = this._socket = net.connect(this.options.port, this.options.host); | ||
socket.setNoDelay(this.options.noDelay); | ||
socket.on('readable', () => { | ||
this._lastReceiveDataTime = Date.now(); | ||
try { | ||
let remaining = false; | ||
do { | ||
remaining = this._readPacket(); | ||
} while (remaining); | ||
} catch (err) { | ||
this.close(err); | ||
socket.on('readable', () => { this._handleReadable(); }); | ||
socket.once('close', () => { this._handleClose(); }); | ||
socket.once('error', err => { | ||
err.message += ' (address: ' + this[addressKey] + ')'; | ||
this._lastError = err; | ||
if (err.code === 'ECONNRESET') { | ||
this.logger.warn('[TCPBase] socket is closed by other side while there were still unhandled data in the socket buffer'); | ||
} else { | ||
this.emit('error', err); | ||
} | ||
}); | ||
// receive `end` event that means the other end of the socket sends a FIN packet | ||
socket.once('end', () => { | ||
this.logger.info('[tcp-base] the connection: %s is closed by other side', this[addressKey]); | ||
}); | ||
socket.once('close', () => this._handleClose()); | ||
socket.once('error', err => { | ||
err.message += ' (address: ' + this[addressKey] + ')'; | ||
socket.setTimeout(this.options.connectTimeout, () => { | ||
const err = new Error(`[TCPBase] socket connect timeout (${this.options.connectTimeout}ms)`); | ||
err.name = 'TcpConnectionTimeoutError'; | ||
err.host = this.options.host; | ||
err.port = this.options.port; | ||
this.close(err); | ||
}); | ||
socket.once('connect', done); | ||
socket.once('connect', () => { | ||
// set timeout back to zero after connected | ||
socket.setTimeout(0); | ||
this.emit('connect'); | ||
}); | ||
if (this.options.needHeartbeat) { | ||
this._heartbeatTimer = setInterval(() => { | ||
const duration = this._lastHeartbeatTime - this._lastReceiveDataTime; | ||
if (this._lastReceiveDataTime && duration > this.options.heartbeatInterval) { | ||
const err = new Error(`server ${this[addressKey]} no response in ${duration}ms, maybe the socket is end on the other side.`); | ||
err.name = 'ServerNoResponseError'; | ||
this.close(err); | ||
return; | ||
} | ||
// flow control | ||
if (this._invokes.size > 0 || !this.isOK) { | ||
return; | ||
} | ||
this._lastHeartbeatTime = Date.now(); | ||
this.sendHeartBeat(); | ||
}, this.options.heartbeatInterval); | ||
} | ||
Promise.race([ | ||
this.await('connect'), | ||
this.await('error'), | ||
]).then(done, done); | ||
} | ||
_startHeartbeat() { | ||
this._heartbeatTimer = setInterval(() => { | ||
const duration = this._lastHeartbeatTime - this._lastReceiveDataTime; | ||
if (this._lastReceiveDataTime && duration > this.options.heartbeatInterval) { | ||
const err = new Error(`server ${this[addressKey]} no response in ${duration}ms, maybe the socket is end on the other side.`); | ||
err.name = 'ServerNoResponseError'; | ||
this.close(err); | ||
return; | ||
} | ||
// flow control | ||
if (this._invokes.size > 0 || !this.isOK) { | ||
return; | ||
} | ||
this._lastHeartbeatTime = Date.now(); | ||
this.sendHeartBeat(); | ||
}, this.options.heartbeatInterval); | ||
} | ||
} | ||
module.exports = TCPBase; |
{ | ||
"name": "tcp-base", | ||
"version": "2.0.0", | ||
"version": "3.0.0", | ||
"description": "A base class for tcp client with basic functions", | ||
@@ -10,8 +10,9 @@ "main": "lib/base.js", | ||
"scripts": { | ||
"autod": "autod -w --prefix '^'", | ||
"lint": "eslint --ext .js lib test example", | ||
"autod": "autod", | ||
"lint": "eslint --ext .js .", | ||
"test": "npm run lint && npm run test-local", | ||
"test-local": "egg-bin test", | ||
"cov": "egg-bin cov", | ||
"ci": "npm run lint && npm run cov" | ||
"ci": "npm run lint && npm run cov", | ||
"contributors": "contributors" | ||
}, | ||
@@ -38,9 +39,11 @@ "repository": { | ||
"devDependencies": { | ||
"autod": "^2.7.1", | ||
"egg-bin": "^2.2.0", | ||
"egg-ci": "^1.1.0", | ||
"eslint": "^3.15.0", | ||
"autod": "^2.8.0", | ||
"contributors": "^0.5.1", | ||
"egg-bin": "^3.3.0", | ||
"egg-ci": "^1.6.0", | ||
"eslint": "^3.19.0", | ||
"eslint-config-egg": "^3.2.0", | ||
"mm": "^2.1.0", | ||
"npminstall": "^2.24.0", | ||
"mz-modules": "^1.0.0", | ||
"npminstall": "^2.29.1", | ||
"pedding": "^1.1.0" | ||
@@ -50,4 +53,4 @@ }, | ||
"is-type-of": "^1.0.0", | ||
"sdk-base": "^3.1.0" | ||
"sdk-base": "^3.1.1" | ||
} | ||
} |
17154
399
10
Updatedsdk-base@^3.1.1