Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

connection

Package Overview
Dependencies
Maintainers
3
Versions
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

connection - npm Package Compare versions

Comparing version 0.0.0 to 1.0.0

History.md

292

lib/connection.js

@@ -1,75 +0,269 @@

// Author : Lijian Qiu
// Email : loye.qiu@gmail.com
// Description : Connection
'use strict';
const assert = require('assert');
const Base = require('sdk-base');
const pump = require('pump');
// const awaitEvent = require('await-event');
const awaitFirst = require('await-first');
var util = require('util');
var stream = require('stream');
//var WebSocket = require('ws');
const utils = require('./utils');
const DEFAULT_ERROR_PREFIX = '';
var Endpoint = require('./endpoint');
var ConnectionServer = require('./connection-server');
class Connection extends Base {
/**
* @constructor
* @param {object} options -
* @param {Socket} options.socket -
* @param {Logger} options.logger -
* @param {Protocol} options.protocol -
* @param {Map} [options.sentReqs] -
* @param {string} [options.url] -
* @param {number} [options.connectTimeout] -
*/
constructor(options) {
assert(options.logger, '[Connection] options.logger is required');
assert(options.socket, '[Connection] options.socket is required');
assert(options.protocol, '[Connection] options.protocol is required');
assert(!options.socket.destroyed, '[Connection] options.socket should not be destroyed');
assert(options.protocol.encoder, '[Connection] options.protocol have not encoder impl');
assert(options.protocol.decoder, '[Connection] options.protocol have not decoder impl');
super(Object.assign(Connection.defaultOptions(), options, { initMethod: '_init' }));
this._encoder = this.protocol.encoder(this.protocolOptions);
this._decoder = this.protocol.decoder(this.protocolOptions);
this._closed = false;
this._userClosed = false;
this._connected = !this.socket.connecting;
this.bindEvent();
// @refer https://nodejs.org/en/docs/guides/backpressuring-in-streams/
pump(this._encoder, this.socket, this._decoder, err => {
this._handleClose(err);
});
this.buildErrorNames(this.protocol.name || DEFAULT_ERROR_PREFIX);
}
async _init() {
this.url = this.options.url;
if (this._connected) {
if (!this.url) {
this.url = `${this.socket.remoteAddress}:${this.socket.remotePort}`;
}
return;
}
this.socket.setTimeout(this.options.connectTimeout);
const { event } = await awaitFirst(this.socket, [ 'connect', 'timeout', 'error' ]);
if (event === 'timeout') {
const err = new Error('connect timeout(' + this.options.connectTimeout + 'ms), ' + this.url);
err.name = this.SocketConnectTimeoutError;
this.close();
throw err;
}
if (!this.url) {
this.url = `${this.socket.remoteAddress}:${this.socket.remotePort}`;
}
this.socket.setTimeout(0);
this._connected = true;
}
function Connection(endpoint) {
if (!(endpoint instanceof Endpoint)) endpoint = new Endpoint(endpoint);
bindEvent() {
this._decoder.on('request', req => this.emit('request', req));
this._decoder.on('heartbeat', hb => this.emit('heartbeat', hb));
this._decoder.on('response', res => this._handleResponse(res));
}
stream.Duplex.call(this);
buildErrorNames(errorPrefix) {
this.OneWayEncodeErrorName = `${errorPrefix}OneWayEncodeError`;
this.SocketConnectTimeoutError = `${errorPrefix}SocketConnectTimtoutError`;
this.ResponseEncodeErrorName = `${errorPrefix}ResponseEncodeError`;
this.ResponseTimeoutErrorName = `${errorPrefix}ResponseTimeoutError`;
this.RequestEncodeErrorName = `${errorPrefix}RequestEncodeError`;
this.SocketErrorName = `${errorPrefix}SocketError`;
this.SocketCloseError = `${errorPrefix}SocketCloseError`;
}
this.status = { sent: 0, received: 0 };
async writeRequest(req) {
const id = utils.nextId();
const timer = this._requestTimer(id, req);
try {
const p = this._waitResponse(id, req);
this._writeRequest(id, req);
return await p;
} catch (e) {
this._cleanReq(id);
throw e;
} finally {
clearTimeout(timer);
}
}
oneway(req) {
assert(this._encoder.writeRequest, '[Connection] encoder have not impl writeRequest');
const id = utils.nextId();
req.oneway = true;
this._encoder.writeRequest(id, req, err => {
if (err) {
err.name = this.OneWayEncodeErrorName;
err.resultCode = '02';
this.logger.error(err);
}
});
}
}
util.inherits(Connection, stream.Duplex);
async writeResponse(req, res) {
assert(this._encoder.writeResponse, '[Connection] encoder have not impl writeResponse');
return new Promise((resolve, reject) => {
this._encoder.writeResponse(req, res, err => {
if (!err) {
return resolve();
}
err.name = this.ResponseEncodeErrorName;
err.resultCode = '02';
return reject(err);
});
});
}
//function onopen() {
//
//}
//
//function onerror(err) {
// this.emit('error', err);
//}
_requestTimer(id, req) {
const start = Date.now();
return setTimeout(() => {
const rt = Date.now() - start;
const err = new Error('no response in ' + rt + 'ms, ' + this.url);
err.name = this.ResponseTimeoutErrorName;
err.resultCode = '03'; // 超时
this._handleRequestError(id, err);
}, req.timeout);
}
(function (proto) {
proto._write = function (chunk, encoding, callback) {
var self = this;
if (!this.connected) {
this.once('connect', function () {
self._write(chunk, encoding, callback);
});
_writeRequest(id, req) {
assert(this._encoder.writeRequest, '[Connection] encoder have not impl writeRequest');
this._encoder.writeRequest(id, req, err => {
if (err) {
err.name = this.RequestEncodeErrorName;
err.resultCode = '02';
process.nextTick(() => {
this._handleRequestError(id, err);
});
}
});
}
_waitResponse(id, req) {
const event = 'response_' + id;
let resReject;
const resPromise = new Promise((resolve, reject) => {
resReject = reject;
this.once(event, resolve);
});
this._sentReqs.set(id, { req, resPromise, resReject });
return resPromise;
}
async forceClose(err) {
const closePromise = this.await('close');
if (err) {
this._decoder.emit('error', err);
} else {
var data = typeof chunk === 'string' ? new Buffer(chunk, encoding) : chunk;
this._decoder.end(() => this._decoder.destroy());
}
await closePromise;
}
async close(err) {
if (this._userClosed) return;
this._userClosed = true;
const closeEvent = this.await('close');
// await pending request done
await Promise.all(
Array.from(this._sentReqs.values())
.map(data => data.resPromise)
// catch the error, do noop, writeRequest will handle it
).catch(() => {});
if (err) {
this._decoder.emit('error', err);
} else {
// flush data
this._decoder.end();
}
await closeEvent;
}
_cleanReq(id) {
return this._sentReqs.delete(id);
}
_handleResponse(res) {
const id = res.packetId;
if (this._cleanReq(id)) {
this.emit('response_' + id, res);
} else {
this.logger.warn('[Connection] can not find invoke request for response: %j, maybe it\'s timeout.', res);
}
};
}
proto._read = function (n) { };
_handleRequestError(id, err) {
if (!this._sentReqs.has(id)) {
return;
}
const { resReject } = this._sentReqs.get(id);
this._cleanReq(id);
return resReject(err);
}
proto.close = function () {
this.status.closing = true;
_handleClose(err) {
if (this._closed) return;
this._closed = true;
if (err) {
if (err.code === 'ECONNRESET') {
this.logger.warn('[Connection] ECONNRESET, %s', this.url);
} else {
err.name = err.name === 'Error' ? this.SocketErrorName : err.name;
err.message = err.message + ', ' + this.url;
this.emit('error', err);
}
}
this._cleanRequest(err);
this._decoder.destroy();
this.emit('close');
}
_cleanRequest(err) {
if (!err) {
err = new Error('The socket was closed. ' + this.url);
err.name = this.SocketCloseError;
err.resultCode = '02';
}
for (const id of this._sentReqs.keys()) {
this._handleRequestError(id, err);
}
}
return this;
};
get _sentReqs() {
return this.options.sentReqs;
}
})(Connection.prototype);
get protocol() {
return this.options.protocol;
}
get socket() {
return this.options.socket;
}
function listen(endpoint, onconnection) {
var server = new ConnectionServer(endpoint);
typeof onconnection === 'function' && server.on('connect', onconnection);
return server;
}
get logger() {
return this.options.logger;
}
function connect(endpoint, onconnect) {
var conn = new Connection(endpoint);
typeof onconnect === 'function' && conn.on('connect', onconnect);
return conn;
get protocolOptions() {
return {
sentReqs: this._sentReqs,
};
}
static defaultOptions() {
return {
sentReqs: new Map(),
connectTimeout: 5000,
};
}
}
//exports
Connection.listen = listen;
Connection.connect = connect;
module.exports = Connection;
{
"name": "connection",
"description": "connection",
"version": "0.0.0",
"author": "Lijian Qiu",
"keywords": ["connection", "stream"],
"version": "1.0.0",
"description": "wrap for socket",
"dependencies": {
"await-event": "^2.1.0",
"await-first": "^1.0.0",
"pump": "^3.0.0",
"sdk-base": "^3.5.1"
},
"files": [
"lib",
"index.js"
],
"scripts": {
"autod": "autod",
"lint": "eslint . --ext .js",
"cov": "TEST_TIMEOUT=10000 egg-bin cov",
"test": "npm run lint && npm run test-local",
"test-local": "egg-bin test",
"pkgfiles": "egg-bin pkgfiles --check",
"ci": "npm run autod -- --check && npm run pkgfiles && npm run lint && npm run cov",
"contributors": "contributors -f plain -o AUTHORS"
},
"repository": {
"type": "git",
"url": "https://github.com/loye/connection.git"
"url": "git+https://github.com/node-modules/connection.git"
},
"license": "MIT"
}
"keywords": [
"socket",
"tcp",
"connection"
],
"author": "killagu <killa123@126.com>",
"license": "MIT",
"bugs": {
"url": "https://github.com/node-modules/connection/issues"
},
"homepage": "https://github.com/node-modules/connection#readme",
"devDependencies": {
"autod": "^3.0.1",
"contributors": "^0.5.1",
"egg-bin": "^4.7.1",
"egg-ci": "^1.8.0",
"eslint": "^5.2.0",
"eslint-config-egg": "^7.0.0",
"mm": "^2.2.2",
"mz-modules": "^2.1.0",
"sofa-bolt-node": "^1.1.1"
},
"engines": {
"node": ">= 8.0.0"
},
"ci": {
"version": "8, 10, 11"
}
}

@@ -1,4 +0,153 @@

connection
==========
# connection
connection
[connection](https://github.com/node-modules/connection) socket wrapper
[![NPM version][npm-image]][npm-url]
[![build status][travis-image]][travis-url]
[![Test coverage][codecov-image]][codecov-url]
[![David deps][david-image]][david-url]
[![Known Vulnerabilities][snyk-image]][snyk-url]
[![npm download][download-image]][download-url]
[npm-image]: https://img.shields.io/npm/v/connection.svg?style=flat-square
[npm-url]: https://npmjs.org/package/connection
[travis-image]: https://img.shields.io/travis/node-modules/connection.svg?style=flat-square
[travis-url]: https://travis-ci.org/node-modules/connection
[codecov-image]: https://codecov.io/gh/node-modules/connection/branch/master/graph/badge.svg
[codecov-url]: https://codecov.io/gh/node-modules/connection
[david-image]: https://img.shields.io/david/node-modules/connection.svg?style=flat-square
[david-url]: https://david-dm.org/node-modules/connection
[snyk-image]: https://snyk.io/test/npm/connection/badge.svg?style=flat-square
[snyk-url]: https://snyk.io/test/npm/connection
[download-image]: https://img.shields.io/npm/dm/connection.svg?style=flat-square
[download-url]: https://npmjs.org/package/connection
## Usage
### Client Socket
```js
const net = require('net');
const awaitFirst = require('await-first');
const Connection = require('connection');
const Decoder = require('sofa-bolt-node/lib/decoder');
const Encoder = require('sofa-bolt-node/lib/encoder');
// bolt protocol example
const protocol = {
name: 'Rpc',
encoder: opts => new Encoder(opts),
decoder: opts => new Decoder(opts),
};
async function createConnection(hostname, port) {
const socket = net.connect(port, hostname);
await awaitFirst(socket, [ 'connect', 'error' ]);
return new Connection({
logger: console,
socket,
protocol,
});
}
const conn = await createConnection('127.0.0.1', 12200);
conn.writeRequest({
targetAppName: 'foo',
args: [ 'peter' ],
serverSignature: 'com.alipay.sofa.rpc.quickstart.HelloService:1.0',
methodName: 'sayHello',
methodArgSigs: [ 'java.lang.String' ],
requestProps: null,
});
```
### Server Socket
```js
const Connection = require('connection');
const server = net.createServer();
server.listen(port);
server.on('connection', sock => {
const conn = new Connection({
logger: console,
socket: sock,
protocol,
});
conn.on('request', req => {
conn.writeResponse(req, {
error: null,
appResponse: 'hello, peter',
responseProps: null,
});
});
});
```
[More example](./example)
### API
- oneway() - one way call
- async writeRequest(req) - write request and wait response
- async writeResponse(req, res) - write response
- async close() - wait all pending request done and destroy the socket
- async forceClose() - abort all pending request and destroy the socket
- get protocolOptions() - encoder/decoder constructor options, can be overwrite when custom protocol
### Protocol implement
```typescript
interface Request {
/**
* If request is oneway, shoule set to true
*/
oneway: boolean,
/**
* writeRequest will use the timeout to set the timer
*/
timeout: number,
/**
* request packet type, request|heartbeat|response
*/
packetType: string,
}
interface Response {
packetId: number,
}
interface Encoder extends Transform {
/**
* write request to socket
* Connection#writeRequest and Connection#oneway will call the function.
* @param {number} id - the request id
* @param {Object} req - the request object should be encoded
* @param {Function} cb - the encode callback
*/
writeRequest(id: number, req: object, cb);
/**
* write response to socket
* Connection#writeResponse will call the function.
* @param {Object} req - the request object
* @param {Object} res - the response object should be encoded
* @param {Function} cb - the encode callback
*/
writeResponse(req: object, res: object, cb);
}
interface Decoder extends Writable {
// events
// - request emit when have request packet
// - heartbeat emit when have heartbeat packet
// - response emit when have response packet
}
interface Protocol {
name: string;
encode(options: any): Encoder;
decode(options: any): Decoder;
}
```
.npmignore
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc