Socket
Socket
Sign inDemoInstall

tcp-base

Package Overview
Dependencies
9
Maintainers
4
Versions
11
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.0.0 to 3.0.0

6

History.md
3.0.0 / 2017-04-20
==================
* fix: invoke oneway after socket closed (#7)
* refactor: [BREAKING-CHANGE] not support reconnect
2.0.0 / 2017-02-17

@@ -3,0 +9,0 @@ ==================

157

lib/base.js

@@ -11,2 +11,3 @@ 'use strict';

noDelay: true,
connectTimeout: 3000,
responseTimeout: 3000,

@@ -16,4 +17,2 @@ heartbeatInterval: 5000,

concurrent: 0,
reConnectTimes: 0,
reConnectInterval: 1000,
logger: console,

@@ -36,5 +35,2 @@ };

* - {Number} [responseTimeout] - limit the maximum time for waiting a response
* - {Number} [reConnectTimes] - the maximum number of times the client will automatically try to
* reconnect to the remote server if the connection is dropped
* - {Number} [reConnectInterval] - the auto-reconnect interval, defaults to 1s
* - {Logger} [logger] - the logger client

@@ -55,3 +51,2 @@ * @constructor

this.clientId = ++seed;
this._reConnectTimes = this.options.reConnectTimes;
this._heartbeatTimer = null;

@@ -61,2 +56,3 @@ this._socket = null;

this._bodyLength = null;
this._lastError = null;
this._queue = [];

@@ -69,2 +65,7 @@ this._invokes = new Map();

this._connect();
this.ready(err => {
if (!err && this.options.needHeartbeat) {
this._startHeartbeat();
}
});
}

@@ -185,5 +186,16 @@

*/
send(packet, callback) {
callback = callback || noop;
send(packet, callback = noop) {
if (!this._socket) {
const err = new Error(`[TCPBase] The socket was closed. (address: ${this[addressKey]})`);
err.id = packet.id;
err.data = packet.data.toString('base64');
if (packet.oneway) {
err.oneway = true;
callback();
this.emit('error', err);
} else {
callback(err);
}
return;
}
if (packet.oneway) {

@@ -194,8 +206,4 @@ this._socket.write(packet.data);

}
if (!this._writable) {
this._queue.push([ packet, callback ]);
if (!this._socket && !this._reConnectTimes) {
this._cleanQueue();
}
return;

@@ -344,21 +352,16 @@ }

if (!this._socket) {
return;
return Promise.resolve();
}
this._socket.destroy();
this._handleClose(err);
this._socket.destroy(err);
return this.await('close');
}
_handleClose(err) {
_handleClose() {
if (!this._socket) {
return;
}
this._socket.removeAllListeners();
this._socket = null;
if (err) {
this.emit('error', err);
}
this._cleanInvokes(err);
this._cleanInvokes(this._lastError);
// clean timer

@@ -369,70 +372,76 @@ if (this._heartbeatTimer) {

}
// auto-reconnect
if (this._reConnectTimes) {
setTimeout(() => {
this._reConnectTimes--;
this._connect(() => {
this._reConnectTimes = this.options.reConnectTimes;
// try to recover
this._resume();
});
}, this.options.reConnectInterval);
return;
}
this._cleanQueue(err);
this._cleanQueue(this._lastError);
this.emit('close');
this.removeAllListeners();
}
_handleReadable() {
this._lastReceiveDataTime = Date.now();
try {
let remaining = false;
do {
remaining = this._readPacket();
} while (remaining);
} catch (err) {
this.close(err);
}
}
_connect(done) {
if (!done) {
done = () => this.ready(true);
done = err => {
this.ready(err ? err : true);
};
}
const socket = this._socket = net.connect(this.options.port, this.options.host);
socket.setNoDelay(this.options.noDelay);
socket.on('readable', () => {
this._lastReceiveDataTime = Date.now();
try {
let remaining = false;
do {
remaining = this._readPacket();
} while (remaining);
} catch (err) {
this.close(err);
socket.on('readable', () => { this._handleReadable(); });
socket.once('close', () => { this._handleClose(); });
socket.once('error', err => {
err.message += ' (address: ' + this[addressKey] + ')';
this._lastError = err;
if (err.code === 'ECONNRESET') {
this.logger.warn('[TCPBase] socket is closed by other side while there were still unhandled data in the socket buffer');
} else {
this.emit('error', err);
}
});
// receive `end` event that means the other end of the socket sends a FIN packet
socket.once('end', () => {
this.logger.info('[tcp-base] the connection: %s is closed by other side', this[addressKey]);
});
socket.once('close', () => this._handleClose());
socket.once('error', err => {
err.message += ' (address: ' + this[addressKey] + ')';
socket.setTimeout(this.options.connectTimeout, () => {
const err = new Error(`[TCPBase] socket connect timeout (${this.options.connectTimeout}ms)`);
err.name = 'TcpConnectionTimeoutError';
err.host = this.options.host;
err.port = this.options.port;
this.close(err);
});
socket.once('connect', done);
socket.once('connect', () => {
// set timeout back to zero after connected
socket.setTimeout(0);
this.emit('connect');
});
if (this.options.needHeartbeat) {
this._heartbeatTimer = setInterval(() => {
const duration = this._lastHeartbeatTime - this._lastReceiveDataTime;
if (this._lastReceiveDataTime && duration > this.options.heartbeatInterval) {
const err = new Error(`server ${this[addressKey]} no response in ${duration}ms, maybe the socket is end on the other side.`);
err.name = 'ServerNoResponseError';
this.close(err);
return;
}
// flow control
if (this._invokes.size > 0 || !this.isOK) {
return;
}
this._lastHeartbeatTime = Date.now();
this.sendHeartBeat();
}, this.options.heartbeatInterval);
}
Promise.race([
this.await('connect'),
this.await('error'),
]).then(done, done);
}
_startHeartbeat() {
this._heartbeatTimer = setInterval(() => {
const duration = this._lastHeartbeatTime - this._lastReceiveDataTime;
if (this._lastReceiveDataTime && duration > this.options.heartbeatInterval) {
const err = new Error(`server ${this[addressKey]} no response in ${duration}ms, maybe the socket is end on the other side.`);
err.name = 'ServerNoResponseError';
this.close(err);
return;
}
// flow control
if (this._invokes.size > 0 || !this.isOK) {
return;
}
this._lastHeartbeatTime = Date.now();
this.sendHeartBeat();
}, this.options.heartbeatInterval);
}
}
module.exports = TCPBase;
{
"name": "tcp-base",
"version": "2.0.0",
"version": "3.0.0",
"description": "A base class for tcp client with basic functions",

@@ -10,8 +10,9 @@ "main": "lib/base.js",

"scripts": {
"autod": "autod -w --prefix '^'",
"lint": "eslint --ext .js lib test example",
"autod": "autod",
"lint": "eslint --ext .js .",
"test": "npm run lint && npm run test-local",
"test-local": "egg-bin test",
"cov": "egg-bin cov",
"ci": "npm run lint && npm run cov"
"ci": "npm run lint && npm run cov",
"contributors": "contributors"
},

@@ -38,9 +39,11 @@ "repository": {

"devDependencies": {
"autod": "^2.7.1",
"egg-bin": "^2.2.0",
"egg-ci": "^1.1.0",
"eslint": "^3.15.0",
"autod": "^2.8.0",
"contributors": "^0.5.1",
"egg-bin": "^3.3.0",
"egg-ci": "^1.6.0",
"eslint": "^3.19.0",
"eslint-config-egg": "^3.2.0",
"mm": "^2.1.0",
"npminstall": "^2.24.0",
"mz-modules": "^1.0.0",
"npminstall": "^2.29.1",
"pedding": "^1.1.0"

@@ -50,4 +53,4 @@ },

"is-type-of": "^1.0.0",
"sdk-base": "^3.1.0"
"sdk-base": "^3.1.1"
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc