Comparing version 0.0.0 to 1.0.0
@@ -1,75 +0,269 @@ | ||
// Author : Lijian Qiu | ||
// Email : loye.qiu@gmail.com | ||
// Description : Connection | ||
'use strict'; | ||
const assert = require('assert'); | ||
const Base = require('sdk-base'); | ||
const pump = require('pump'); | ||
// const awaitEvent = require('await-event'); | ||
const awaitFirst = require('await-first'); | ||
var util = require('util'); | ||
var stream = require('stream'); | ||
//var WebSocket = require('ws'); | ||
const utils = require('./utils'); | ||
const DEFAULT_ERROR_PREFIX = ''; | ||
var Endpoint = require('./endpoint'); | ||
var ConnectionServer = require('./connection-server'); | ||
class Connection extends Base { | ||
/** | ||
* @constructor | ||
* @param {object} options - | ||
* @param {Socket} options.socket - | ||
* @param {Logger} options.logger - | ||
* @param {Protocol} options.protocol - | ||
* @param {Map} [options.sentReqs] - | ||
* @param {string} [options.url] - | ||
* @param {number} [options.connectTimeout] - | ||
*/ | ||
constructor(options) { | ||
assert(options.logger, '[Connection] options.logger is required'); | ||
assert(options.socket, '[Connection] options.socket is required'); | ||
assert(options.protocol, '[Connection] options.protocol is required'); | ||
assert(!options.socket.destroyed, '[Connection] options.socket should not be destroyed'); | ||
assert(options.protocol.encoder, '[Connection] options.protocol have not encoder impl'); | ||
assert(options.protocol.decoder, '[Connection] options.protocol have not decoder impl'); | ||
super(Object.assign(Connection.defaultOptions(), options, { initMethod: '_init' })); | ||
this._encoder = this.protocol.encoder(this.protocolOptions); | ||
this._decoder = this.protocol.decoder(this.protocolOptions); | ||
this._closed = false; | ||
this._userClosed = false; | ||
this._connected = !this.socket.connecting; | ||
this.bindEvent(); | ||
// @refer https://nodejs.org/en/docs/guides/backpressuring-in-streams/ | ||
pump(this._encoder, this.socket, this._decoder, err => { | ||
this._handleClose(err); | ||
}); | ||
this.buildErrorNames(this.protocol.name || DEFAULT_ERROR_PREFIX); | ||
} | ||
async _init() { | ||
this.url = this.options.url; | ||
if (this._connected) { | ||
if (!this.url) { | ||
this.url = `${this.socket.remoteAddress}:${this.socket.remotePort}`; | ||
} | ||
return; | ||
} | ||
this.socket.setTimeout(this.options.connectTimeout); | ||
const { event } = await awaitFirst(this.socket, [ 'connect', 'timeout', 'error' ]); | ||
if (event === 'timeout') { | ||
const err = new Error('connect timeout(' + this.options.connectTimeout + 'ms), ' + this.url); | ||
err.name = this.SocketConnectTimeoutError; | ||
this.close(); | ||
throw err; | ||
} | ||
if (!this.url) { | ||
this.url = `${this.socket.remoteAddress}:${this.socket.remotePort}`; | ||
} | ||
this.socket.setTimeout(0); | ||
this._connected = true; | ||
} | ||
function Connection(endpoint) { | ||
if (!(endpoint instanceof Endpoint)) endpoint = new Endpoint(endpoint); | ||
bindEvent() { | ||
this._decoder.on('request', req => this.emit('request', req)); | ||
this._decoder.on('heartbeat', hb => this.emit('heartbeat', hb)); | ||
this._decoder.on('response', res => this._handleResponse(res)); | ||
} | ||
stream.Duplex.call(this); | ||
buildErrorNames(errorPrefix) { | ||
this.OneWayEncodeErrorName = `${errorPrefix}OneWayEncodeError`; | ||
this.SocketConnectTimeoutError = `${errorPrefix}SocketConnectTimtoutError`; | ||
this.ResponseEncodeErrorName = `${errorPrefix}ResponseEncodeError`; | ||
this.ResponseTimeoutErrorName = `${errorPrefix}ResponseTimeoutError`; | ||
this.RequestEncodeErrorName = `${errorPrefix}RequestEncodeError`; | ||
this.SocketErrorName = `${errorPrefix}SocketError`; | ||
this.SocketCloseError = `${errorPrefix}SocketCloseError`; | ||
} | ||
this.status = { sent: 0, received: 0 }; | ||
async writeRequest(req) { | ||
const id = utils.nextId(); | ||
const timer = this._requestTimer(id, req); | ||
try { | ||
const p = this._waitResponse(id, req); | ||
this._writeRequest(id, req); | ||
return await p; | ||
} catch (e) { | ||
this._cleanReq(id); | ||
throw e; | ||
} finally { | ||
clearTimeout(timer); | ||
} | ||
} | ||
oneway(req) { | ||
assert(this._encoder.writeRequest, '[Connection] encoder have not impl writeRequest'); | ||
const id = utils.nextId(); | ||
req.oneway = true; | ||
this._encoder.writeRequest(id, req, err => { | ||
if (err) { | ||
err.name = this.OneWayEncodeErrorName; | ||
err.resultCode = '02'; | ||
this.logger.error(err); | ||
} | ||
}); | ||
} | ||
} | ||
util.inherits(Connection, stream.Duplex); | ||
async writeResponse(req, res) { | ||
assert(this._encoder.writeResponse, '[Connection] encoder have not impl writeResponse'); | ||
return new Promise((resolve, reject) => { | ||
this._encoder.writeResponse(req, res, err => { | ||
if (!err) { | ||
return resolve(); | ||
} | ||
err.name = this.ResponseEncodeErrorName; | ||
err.resultCode = '02'; | ||
return reject(err); | ||
}); | ||
}); | ||
} | ||
//function onopen() { | ||
// | ||
//} | ||
// | ||
//function onerror(err) { | ||
// this.emit('error', err); | ||
//} | ||
_requestTimer(id, req) { | ||
const start = Date.now(); | ||
return setTimeout(() => { | ||
const rt = Date.now() - start; | ||
const err = new Error('no response in ' + rt + 'ms, ' + this.url); | ||
err.name = this.ResponseTimeoutErrorName; | ||
err.resultCode = '03'; // 超时 | ||
this._handleRequestError(id, err); | ||
}, req.timeout); | ||
} | ||
(function (proto) { | ||
proto._write = function (chunk, encoding, callback) { | ||
var self = this; | ||
if (!this.connected) { | ||
this.once('connect', function () { | ||
self._write(chunk, encoding, callback); | ||
}); | ||
_writeRequest(id, req) { | ||
assert(this._encoder.writeRequest, '[Connection] encoder have not impl writeRequest'); | ||
this._encoder.writeRequest(id, req, err => { | ||
if (err) { | ||
err.name = this.RequestEncodeErrorName; | ||
err.resultCode = '02'; | ||
process.nextTick(() => { | ||
this._handleRequestError(id, err); | ||
}); | ||
} | ||
}); | ||
} | ||
_waitResponse(id, req) { | ||
const event = 'response_' + id; | ||
let resReject; | ||
const resPromise = new Promise((resolve, reject) => { | ||
resReject = reject; | ||
this.once(event, resolve); | ||
}); | ||
this._sentReqs.set(id, { req, resPromise, resReject }); | ||
return resPromise; | ||
} | ||
async forceClose(err) { | ||
const closePromise = this.await('close'); | ||
if (err) { | ||
this._decoder.emit('error', err); | ||
} else { | ||
var data = typeof chunk === 'string' ? new Buffer(chunk, encoding) : chunk; | ||
this._decoder.end(() => this._decoder.destroy()); | ||
} | ||
await closePromise; | ||
} | ||
async close(err) { | ||
if (this._userClosed) return; | ||
this._userClosed = true; | ||
const closeEvent = this.await('close'); | ||
// await pending request done | ||
await Promise.all( | ||
Array.from(this._sentReqs.values()) | ||
.map(data => data.resPromise) | ||
// catch the error, do noop, writeRequest will handle it | ||
).catch(() => {}); | ||
if (err) { | ||
this._decoder.emit('error', err); | ||
} else { | ||
// flush data | ||
this._decoder.end(); | ||
} | ||
await closeEvent; | ||
} | ||
_cleanReq(id) { | ||
return this._sentReqs.delete(id); | ||
} | ||
_handleResponse(res) { | ||
const id = res.packetId; | ||
if (this._cleanReq(id)) { | ||
this.emit('response_' + id, res); | ||
} else { | ||
this.logger.warn('[Connection] can not find invoke request for response: %j, maybe it\'s timeout.', res); | ||
} | ||
}; | ||
} | ||
proto._read = function (n) { }; | ||
_handleRequestError(id, err) { | ||
if (!this._sentReqs.has(id)) { | ||
return; | ||
} | ||
const { resReject } = this._sentReqs.get(id); | ||
this._cleanReq(id); | ||
return resReject(err); | ||
} | ||
proto.close = function () { | ||
this.status.closing = true; | ||
_handleClose(err) { | ||
if (this._closed) return; | ||
this._closed = true; | ||
if (err) { | ||
if (err.code === 'ECONNRESET') { | ||
this.logger.warn('[Connection] ECONNRESET, %s', this.url); | ||
} else { | ||
err.name = err.name === 'Error' ? this.SocketErrorName : err.name; | ||
err.message = err.message + ', ' + this.url; | ||
this.emit('error', err); | ||
} | ||
} | ||
this._cleanRequest(err); | ||
this._decoder.destroy(); | ||
this.emit('close'); | ||
} | ||
_cleanRequest(err) { | ||
if (!err) { | ||
err = new Error('The socket was closed. ' + this.url); | ||
err.name = this.SocketCloseError; | ||
err.resultCode = '02'; | ||
} | ||
for (const id of this._sentReqs.keys()) { | ||
this._handleRequestError(id, err); | ||
} | ||
} | ||
return this; | ||
}; | ||
get _sentReqs() { | ||
return this.options.sentReqs; | ||
} | ||
})(Connection.prototype); | ||
get protocol() { | ||
return this.options.protocol; | ||
} | ||
get socket() { | ||
return this.options.socket; | ||
} | ||
function listen(endpoint, onconnection) { | ||
var server = new ConnectionServer(endpoint); | ||
typeof onconnection === 'function' && server.on('connect', onconnection); | ||
return server; | ||
} | ||
get logger() { | ||
return this.options.logger; | ||
} | ||
function connect(endpoint, onconnect) { | ||
var conn = new Connection(endpoint); | ||
typeof onconnect === 'function' && conn.on('connect', onconnect); | ||
return conn; | ||
get protocolOptions() { | ||
return { | ||
sentReqs: this._sentReqs, | ||
}; | ||
} | ||
static defaultOptions() { | ||
return { | ||
sentReqs: new Map(), | ||
connectTimeout: 5000, | ||
}; | ||
} | ||
} | ||
//exports | ||
Connection.listen = listen; | ||
Connection.connect = connect; | ||
module.exports = Connection; |
{ | ||
"name": "connection", | ||
"description": "connection", | ||
"version": "0.0.0", | ||
"author": "Lijian Qiu", | ||
"keywords": ["connection", "stream"], | ||
"version": "1.0.0", | ||
"description": "wrap for socket", | ||
"dependencies": { | ||
"await-event": "^2.1.0", | ||
"await-first": "^1.0.0", | ||
"pump": "^3.0.0", | ||
"sdk-base": "^3.5.1" | ||
}, | ||
"files": [ | ||
"lib", | ||
"index.js" | ||
], | ||
"scripts": { | ||
"autod": "autod", | ||
"lint": "eslint . --ext .js", | ||
"cov": "TEST_TIMEOUT=10000 egg-bin cov", | ||
"test": "npm run lint && npm run test-local", | ||
"test-local": "egg-bin test", | ||
"pkgfiles": "egg-bin pkgfiles --check", | ||
"ci": "npm run autod -- --check && npm run pkgfiles && npm run lint && npm run cov", | ||
"contributors": "contributors -f plain -o AUTHORS" | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/loye/connection.git" | ||
"url": "git+https://github.com/node-modules/connection.git" | ||
}, | ||
"license": "MIT" | ||
} | ||
"keywords": [ | ||
"socket", | ||
"tcp", | ||
"connection" | ||
], | ||
"author": "killagu <killa123@126.com>", | ||
"license": "MIT", | ||
"bugs": { | ||
"url": "https://github.com/node-modules/connection/issues" | ||
}, | ||
"homepage": "https://github.com/node-modules/connection#readme", | ||
"devDependencies": { | ||
"autod": "^3.0.1", | ||
"contributors": "^0.5.1", | ||
"egg-bin": "^4.7.1", | ||
"egg-ci": "^1.8.0", | ||
"eslint": "^5.2.0", | ||
"eslint-config-egg": "^7.0.0", | ||
"mm": "^2.2.2", | ||
"mz-modules": "^2.1.0", | ||
"sofa-bolt-node": "^1.1.1" | ||
}, | ||
"engines": { | ||
"node": ">= 8.0.0" | ||
}, | ||
"ci": { | ||
"version": "8, 10, 11" | ||
} | ||
} |
155
README.md
@@ -1,4 +0,153 @@ | ||
connection | ||
========== | ||
# connection | ||
connection | ||
[connection](https://github.com/node-modules/connection) socket wrapper | ||
[![NPM version][npm-image]][npm-url] | ||
[![build status][travis-image]][travis-url] | ||
[![Test coverage][codecov-image]][codecov-url] | ||
[![David deps][david-image]][david-url] | ||
[![Known Vulnerabilities][snyk-image]][snyk-url] | ||
[![npm download][download-image]][download-url] | ||
[npm-image]: https://img.shields.io/npm/v/connection.svg?style=flat-square | ||
[npm-url]: https://npmjs.org/package/connection | ||
[travis-image]: https://img.shields.io/travis/node-modules/connection.svg?style=flat-square | ||
[travis-url]: https://travis-ci.org/node-modules/connection | ||
[codecov-image]: https://codecov.io/gh/node-modules/connection/branch/master/graph/badge.svg | ||
[codecov-url]: https://codecov.io/gh/node-modules/connection | ||
[david-image]: https://img.shields.io/david/node-modules/connection.svg?style=flat-square | ||
[david-url]: https://david-dm.org/node-modules/connection | ||
[snyk-image]: https://snyk.io/test/npm/connection/badge.svg?style=flat-square | ||
[snyk-url]: https://snyk.io/test/npm/connection | ||
[download-image]: https://img.shields.io/npm/dm/connection.svg?style=flat-square | ||
[download-url]: https://npmjs.org/package/connection | ||
## Usage | ||
### Client Socket | ||
```js | ||
const net = require('net'); | ||
const awaitFirst = require('await-first'); | ||
const Connection = require('connection'); | ||
const Decoder = require('sofa-bolt-node/lib/decoder'); | ||
const Encoder = require('sofa-bolt-node/lib/encoder'); | ||
// bolt protocol example | ||
const protocol = { | ||
name: 'Rpc', | ||
encoder: opts => new Encoder(opts), | ||
decoder: opts => new Decoder(opts), | ||
}; | ||
async function createConnection(hostname, port) { | ||
const socket = net.connect(port, hostname); | ||
await awaitFirst(socket, [ 'connect', 'error' ]); | ||
return new Connection({ | ||
logger: console, | ||
socket, | ||
protocol, | ||
}); | ||
} | ||
const conn = await createConnection('127.0.0.1', 12200); | ||
conn.writeRequest({ | ||
targetAppName: 'foo', | ||
args: [ 'peter' ], | ||
serverSignature: 'com.alipay.sofa.rpc.quickstart.HelloService:1.0', | ||
methodName: 'sayHello', | ||
methodArgSigs: [ 'java.lang.String' ], | ||
requestProps: null, | ||
}); | ||
``` | ||
### Server Socket | ||
```js | ||
const Connection = require('connection'); | ||
const server = net.createServer(); | ||
server.listen(port); | ||
server.on('connection', sock => { | ||
const conn = new Connection({ | ||
logger: console, | ||
socket: sock, | ||
protocol, | ||
}); | ||
conn.on('request', req => { | ||
conn.writeResponse(req, { | ||
error: null, | ||
appResponse: 'hello, peter', | ||
responseProps: null, | ||
}); | ||
}); | ||
}); | ||
``` | ||
[More example](./example) | ||
### API | ||
- oneway() - one way call | ||
- async writeRequest(req) - write request and wait response | ||
- async writeResponse(req, res) - write response | ||
- async close() - wait all pending request done and destroy the socket | ||
- async forceClose() - abort all pending request and destroy the socket | ||
- get protocolOptions() - encoder/decoder constructor options, can be overwrite when custom protocol | ||
### Protocol implement | ||
```typescript | ||
interface Request { | ||
/** | ||
* If request is oneway, shoule set to true | ||
*/ | ||
oneway: boolean, | ||
/** | ||
* writeRequest will use the timeout to set the timer | ||
*/ | ||
timeout: number, | ||
/** | ||
* request packet type, request|heartbeat|response | ||
*/ | ||
packetType: string, | ||
} | ||
interface Response { | ||
packetId: number, | ||
} | ||
interface Encoder extends Transform { | ||
/** | ||
* write request to socket | ||
* Connection#writeRequest and Connection#oneway will call the function. | ||
* @param {number} id - the request id | ||
* @param {Object} req - the request object should be encoded | ||
* @param {Function} cb - the encode callback | ||
*/ | ||
writeRequest(id: number, req: object, cb); | ||
/** | ||
* write response to socket | ||
* Connection#writeResponse will call the function. | ||
* @param {Object} req - the request object | ||
* @param {Object} res - the response object should be encoded | ||
* @param {Function} cb - the encode callback | ||
*/ | ||
writeResponse(req: object, res: object, cb); | ||
} | ||
interface Decoder extends Writable { | ||
// events | ||
// - request emit when have request packet | ||
// - heartbeat emit when have heartbeat packet | ||
// - response emit when have response packet | ||
} | ||
interface Protocol { | ||
name: string; | ||
encode(options: any): Encoder; | ||
decode(options: any): Decoder; | ||
} | ||
``` |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No bug tracker
MaintenancePackage does not have a linked bug tracker in package.json.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
No website
QualityPackage does not have a website.
Found 1 instance in 1 package
13806
255
0
0
154
4
9
6
2
+ Addedawait-event@^2.1.0
+ Addedawait-first@^1.0.0
+ Addedpump@^3.0.0
+ Addedsdk-base@^3.5.1
+ Addedawait-event@2.1.0(transitive)
+ Addedawait-first@1.0.0(transitive)
+ Addedco@4.6.0(transitive)
+ Addedcore-util-is@1.0.3(transitive)
+ Addedee-first@1.1.1(transitive)
+ Addedend-of-stream@1.4.4(transitive)
+ Addedis-class-hotfix@0.0.6(transitive)
+ Addedis-type-of@1.4.0(transitive)
+ Addedisstream@0.1.2(transitive)
+ Addedonce@1.4.0(transitive)
+ Addedpump@3.0.2(transitive)
+ Addedsdk-base@3.6.0(transitive)
+ Addedwrappy@1.0.2(transitive)