New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

cluster-client

Package Overview
Dependencies
Maintainers
5
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cluster-client - npm Package Compare versions

Comparing version

to
2.0.0

7

History.md
2.0.0 / 2018-03-06
==================
**others**
* [[`2574eae`](http://github.com/node-modules/cluster-client/commit/2574eae6fdbe603b74a3c27a57e3b545aec54314)] - [BREAKING] feat: migrating from generators to async/await (#36) (zōng yǔ <<gxcsoccer@users.noreply.github.com>>)
* [[`e32f0ef`](http://github.com/node-modules/cluster-client/commit/e32f0eff5fe5dfb4187386d928b2fa8cdc65cfa5)] - chore: release 1.7.1 (xiaochen.gaoxc <<xiaochen.gaoxc@alibaba-inc.com>>),
1.7.1 / 2017-09-21

@@ -3,0 +10,0 @@ ==================

121

lib/client.js
'use strict';
const debug = require('debug')('cluster-client');
const co = require('co');
const is = require('is-type-of');

@@ -49,5 +48,5 @@ const Base = require('sdk-base');

this.ready(false);
this[init]();
this[init]().catch(err => { this.ready(err); });
};
this[init]();
this[init]().catch(err => { this.ready(err); });
}

@@ -72,55 +71,51 @@

*/
[init]() {
co(function* () {
const name = this.options.name;
const port = this.options.port;
let server;
if (this.options.isLeader === true) {
server = yield ClusterServer.create(name, port);
if (!server) {
throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`);
}
} else if (this.options.isLeader === false) {
// wait for leader active
yield ClusterServer.waitFor(port, this.options.maxWaitTime);
} else {
debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port);
server = yield ClusterServer.create(name, port);
async [init]() {
const name = this.options.name;
const port = this.options.port;
let server;
if (this.options.isLeader === true) {
server = await ClusterServer.create(name, port);
if (!server) {
throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`);
}
} else if (this.options.isLeader === false) {
// wait for leader active
await ClusterServer.waitFor(port, this.options.maxWaitTime);
} else {
debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port);
server = await ClusterServer.create(name, port);
}
if (server) {
this[innerClient] = new Leader(Object.assign({ server }, this.options));
debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port);
} else {
this[innerClient] = new Follower(this.options);
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
}
if (server) {
this[innerClient] = new Leader(Object.assign({ server }, this.options));
debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port);
} else {
this[innerClient] = new Follower(this.options);
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
}
// events delegate
utils.delegateEvents(this[innerClient], this);
// events delegate
utils.delegateEvents(this[innerClient], this);
// re init when connection is close
this[innerClient].on('close', this[closeHandler]);
// re init when connection is close
this[innerClient].on('close', this[closeHandler]);
// wait leader/follower ready
yield this[innerClient].ready();
// wait leader/follower ready
await this[innerClient].ready();
// subscribe all
for (const registrations of this[subInfo].values()) {
for (const args of registrations) {
this[innerClient].subscribe(args[0], args[1]);
}
// subscribe all
for (const registrations of this[subInfo].values()) {
for (const args of registrations) {
this[innerClient].subscribe(args[0], args[1]);
}
// publish all
for (const reg of this[pubInfo].values()) {
this[innerClient].publish(reg);
}
}
// publish all
for (const reg of this[pubInfo].values()) {
this[innerClient].publish(reg);
}
if (!this[isReady]) {
this[isReady] = true;
this.ready(true);
}
}.bind(this)).catch(err => {
this.ready(err);
});
if (!this[isReady]) {
this[isReady] = true;
this.ready(true);
}
}

@@ -215,20 +210,18 @@

[close]() {
return co(function* () {
try {
// close after ready, in case of innerClient is initializing
yield this.ready();
} catch (err) {
// ignore
}
async [close]() {
try {
// close after ready, in case of innerClient is initializing
await this.ready();
} catch (err) {
// ignore
}
const client = this[innerClient];
if (client) {
// prevent re-initializing
client.removeListener('close', this[closeHandler]);
if (client.close) {
yield utils.callFn(client.close.bind(client));
}
const client = this[innerClient];
if (client) {
// prevent re-initializing
client.removeListener('close', this[closeHandler]);
if (client.close) {
await utils.callFn(client.close.bind(client));
}
}.bind(this));
}
}

@@ -235,0 +228,0 @@ }

@@ -197,3 +197,4 @@ 'use strict';

const descriptor = Reflect.getOwnPropertyDescriptor(proto, key);
if (descriptor.value && is.generatorFunction(descriptor.value)) {
if (descriptor.value &&
(is.generatorFunction(descriptor.value) || is.asyncFunction(descriptor.value))) {
this.delegate(key);

@@ -200,0 +201,0 @@ }

@@ -56,3 +56,2 @@ 'use strict';

this._closeHandler = this._handleClose.bind(this);
this._handleConnection = this._handleConnection.bind(this);

@@ -63,3 +62,5 @@

this._server.once('close', () => { this.emit('server_closed'); });
this.on('server_closed', this._closeHandler);
this.on('server_closed', () => {
this._handleClose().catch(err => { this.emit('error', err); });
});

@@ -252,3 +253,3 @@ // maxIdleTime is 3 times of heartbeatInterval

name: this.options.name,
logger: this.options.logger,
logger: this.logger,
transcode: this.options.transcode,

@@ -399,3 +400,3 @@ requestTimeout: this.options.requestTimeout,

* _handleClose() {
async _handleClose() {
debug('[Leader:%s] leader server is closed', this.options.name);

@@ -407,3 +408,3 @@ // close the real client

// support common function, generatorFunction, and function returning a promise
yield utils.callFn(this._realClient[originClose].bind(this._realClient));
await utils.callFn(this._realClient[originClose].bind(this._realClient));
}

@@ -416,24 +417,22 @@ }

close() {
async close() {
this._closeByUser = true;
return co(function* () {
debug('[Leader:%s] try to close leader', this.options.name);
// 1. stop listening to server channel
this._server.removeListener(`${this.options.name}_connection`, this._handleConnection);
debug('[Leader:%s] try to close leader', this.options.name);
// 1. stop listening to server channel
this._server.removeListener(`${this.options.name}_connection`, this._handleConnection);
// 2. close all mock connections
for (const conn of this._connections.values()) {
if (conn.isMock) {
conn.emit('close');
}
// 2. close all mock connections
for (const conn of this._connections.values()) {
if (conn.isMock) {
conn.emit('close');
}
}
// 3. close server
// CANNOT close server directly by server.close(), other cluster clients may be using it
this.removeAllListeners('server_closed');
yield ClusterServer.close(this.options.name, this._server);
// 3. close server
// CANNOT close server directly by server.close(), other cluster clients may be using it
this.removeAllListeners('server_closed');
await ClusterServer.close(this.options.name, this._server);
// 5. close real client
yield this._handleClose();
}.bind(this));
// 5. close real client
await this._handleClose();
}

@@ -440,0 +439,0 @@ }

@@ -6,2 +6,3 @@ 'use strict';

const Base = require('sdk-base');
const sleep = require('mz-modules/sleep');
const Packet = require('./protocol/packet');

@@ -23,6 +24,4 @@

const sleep = timeout => cb => setTimeout(cb, timeout);
function claimServer(port) {
return cb => {
return new Promise((resolve, reject) => {
const server = net.createServer();

@@ -38,3 +37,3 @@ server.listen({

debug('listen %s error: %s', port, err);
cb(err);
reject(err);
}

@@ -46,9 +45,9 @@

debug('listen %s success', port);
cb(null, server);
resolve(server);
});
};
});
}
function tryToConnect(port) {
return cb => {
return new Promise(resolve => {
const socket = net.connect(port, '127.0.0.1');

@@ -59,3 +58,3 @@ debug('try to connecting %s', port);

success = true;
cb(null, true);
resolve(true);
// disconnect

@@ -69,5 +68,5 @@ socket.end();

if (success) return;
cb(null, false);
resolve(false);
});
};
});
}

@@ -178,3 +177,3 @@

*/
static* create(name, port) {
static async create(name, port) {
const key = `${name}@${port}`;

@@ -191,3 +190,3 @@ let instance = serverMap.get(port);

try {
const server = yield claimServer(port);
const server = await claimServer(port);
instance = new ClusterServer({ server, port });

@@ -211,3 +210,3 @@ typeSet.add(key);

static* close(name, server) {
static async close(name, server) {
const port = server._port;

@@ -229,3 +228,3 @@

const server = serverMap.get(port);
if (server) yield server.close();
if (server) await server.close();
}

@@ -241,7 +240,7 @@ }

*/
static* waitFor(port, timeout) {
static async waitFor(port, timeout) {
const start = Date.now();
let connect = false;
while (!connect) {
connect = yield tryToConnect(port);
connect = await tryToConnect(port);

@@ -253,3 +252,3 @@ // if timeout, throw error

if (!connect) {
yield sleep(3000);
await sleep(3000);
}

@@ -256,0 +255,0 @@ }

'use strict';
const co = require('co');
const is = require('is-type-of');

@@ -73,13 +74,15 @@ const stringify = require('json-stringify-safe');

*/
exports.callFn = function* (fn, args) {
exports.callFn = async function(fn, args) {
args = args || [];
if (!is.function(fn)) return;
if (is.generatorFunction(fn)) {
return yield fn(...args);
return await co(function* () {
return yield fn(...args);
});
}
const r = fn(...args);
if (is.promise(r)) {
return yield r;
return await r;
}
return r;
};
{
"name": "cluster-client",
"version": "1.7.1",
"version": "2.0.0",
"description": "Sharing Connection among Multi-Process Nodejs",

@@ -17,3 +17,3 @@ "main": "./index.js",

"pkgfiles": "egg-bin pkgfiles --check",
"ci": "egg-bin autod --check && npm run pkgfiles && npm run lint && npm run cov",
"ci": "npm run autod --check && npm run pkgfiles && npm run lint && npm run cov",
"contributors": "contributors -f plain -o AUTHORS"

@@ -36,36 +36,38 @@ },

"dependencies": {
"byte": "^1.2.0",
"byte": "^1.4.0",
"co": "^4.6.0",
"debug": "^3.0.1",
"egg-logger": "^1.6.0",
"debug": "^3.1.0",
"egg-logger": "^1.6.1",
"is-type-of": "^1.2.0",
"json-stringify-safe": "^5.0.1",
"long": "^3.2.0",
"sdk-base": "^3.3.0",
"serialize-json": "^1.0.1",
"tcp-base": "^3.0.0",
"utility": "^1.12.0"
"long": "^4.0.0",
"mz-modules": "^2.1.0",
"sdk-base": "^3.4.0",
"serialize-json": "^1.0.2",
"tcp-base": "^3.1.0",
"utility": "^1.13.1"
},
"devDependencies": {
"address": "^1.0.3",
"autod": "^2.9.0",
"autod": "^3.0.1",
"await-event": "^2.1.0",
"coffee": "^4.1.0",
"contributors": "^0.5.1",
"egg-bin": "^4.3.3",
"egg-bin": "^4.3.7",
"egg-ci": "^1.8.0",
"egg-mock": "^3.12.1",
"eslint": "^4.7.1",
"eslint-config-egg": "^5.1.1",
"egg-mock": "^3.14.1",
"eslint": "^4.18.2",
"eslint-config-egg": "^7.0.0",
"mm": "^2.2.0",
"mz-modules": "^2.0.0",
"mz-modules": "^2.1.0",
"pedding": "^1.1.0",
"spy": "^1.0.0"
"spy": "^1.0.0",
"webstorm-disable-index": "^1.2.0"
},
"engines": {
"node": ">= 6.0.0"
"node": ">= 8.0.0"
},
"ci": {
"version": "6, 8"
"version": "8, 9"
}
}