New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

cluster-client

Package Overview
Dependencies
Maintainers
5
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cluster-client - npm Package Compare versions

Comparing version 3.0.0 to 3.0.1

lib/wrapper/base.js

6

History.md
3.0.1 / 2019-03-01
==================
**fixes**
* [[`6f9bf8e`](http://github.com/node-modules/cluster-client/commit/6f9bf8e300386f28c2a208aa8c394d441045c03c)] - fix: single mode bugs (#49) (zōng yǔ <<gxcsoccer@users.noreply.github.com>>)
3.0.0 / 2019-02-26

@@ -3,0 +9,0 @@ ==================

12

lib/leader.js

@@ -156,14 +156,2 @@ 'use strict';

invoke(methodName, args, callback) {
if (!this._isReady) {
this.ready(err => {
if (err) {
if (callback) {
callback(err);
}
return;
}
this.invoke(methodName, args, callback);
});
return;
}
let method = this._realClient[methodName];

@@ -170,0 +158,0 @@ // compatible with generatorFunction

1

lib/symbol.js

@@ -20,1 +20,2 @@ 'use strict';

exports.singleMode = Symbol.for('ClusterClient#singleMode');
exports.createClient = Symbol.for('ClusterClient#createClient');
'use strict';
const debug = require('debug')('cluster-client');
const is = require('is-type-of');
const Base = require('sdk-base');
const assert = require('assert');
const utils = require('../utils');
const Base = require('./base');
const Leader = require('../leader');
const Follower = require('../follower');
const ClusterServer = require('../server');
// Symbol
const {

@@ -16,30 +15,9 @@ init,

innerClient,
subscribe,
unSubscribe,
publish,
invoke,
subInfo,
pubInfo,
createClient,
closeHandler,
close,
singleMode,
} = require('../symbol');
class ClusterClient extends Base {
/**
* Share Connection among Multi-Process Mode
*
* @param {Object} options
* - {Number} port - the port
* - {Transcode} transcode - serialze / deseriaze methods
* - {Boolean} isLeader - wehether is leader or follower
* - {Number} maxWaitTime - leader startup max time (ONLY effective on isLeader is true)
* - {Function} createRealClient - to create the real client instance
* @constructor
*/
constructor(options) {
super(options);
this[subInfo] = new Map();
this[pubInfo] = new Map();
this[singleMode] = false;

@@ -52,26 +30,5 @@ this[closeHandler] = () => {

};
this[init]().catch(err => { this.ready(err); });
// avoid warning message
this.setMaxListeners(100);
}
get isClusterClientLeader() {
return this[innerClient] && this[innerClient].isLeader;
}
/**
* log instance
* @property {Logger} ClusterClient#[logger]
*/
get [logger]() {
return this.options.logger;
}
/**
* initialize, to leader or follower
*
* @return {void}
*/
async [init]() {
async [createClient]() {
const name = this.options.name;

@@ -94,162 +51,10 @@ const port = this.options.port;

if (server) {
this[innerClient] = new Leader(Object.assign({ server }, this.options));
debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port);
} else {
this[innerClient] = new Follower(this.options);
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
return new Leader(Object.assign({ server }, this.options));
}
// events delegate
utils.delegateEvents(this[innerClient], this);
// re init when connection is close
this[innerClient].on('close', this[closeHandler]);
// wait leader/follower ready
await this[innerClient].ready();
// subscribe all
for (const key of this[subInfo].keys()) {
const info = this[subInfo].get(key);
const reg = info.reg;
this[innerClient].subscribe(reg, data => {
this[subInfo].set(key, {
reg,
inited: true,
data,
});
this.emit(key, data);
});
}
// publish all
for (const reg of this[pubInfo].values()) {
this[innerClient].publish(reg);
}
if (!this[isReady]) {
this[isReady] = true;
this.ready(true);
}
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
return new Follower(this.options);
}
/**
* do subscribe
*
* @param {Object} reg - subscription info
* @param {Function} listener - callback function
* @return {void}
*/
[subscribe](reg, listener) {
assert(is.function(listener), `[ClusterClient:${this.options.name}] subscribe(reg, listener) listener should be a function`);
debug('[ClusterClient:%s] subscribe %j', this.options.name, reg);
const key = this.options.formatKey(reg);
this.on(key, listener);
const info = this[subInfo].get(key);
if (!info) {
this[subInfo].set(key, {
reg,
inited: false,
data: null,
});
if (this[isReady]) {
this[innerClient].subscribe(reg, data => {
this[subInfo].set(key, {
reg,
inited: true,
data,
});
this.emit(key, data);
});
}
} else if (info.inited) {
process.nextTick(() => {
listener(info.data);
});
}
}
/**
* do unSubscribe
*
* @param {Object} reg - subscription info
* @param {Function} listener - callback function
* @return {void}
*/
[unSubscribe](reg, listener) {
debug('[ClusterClient:%s] unSubscribe %j', this.options.name, reg);
const key = this.options.formatKey(reg);
if (listener) {
this.removeListener(key, listener);
} else {
this.removeAllListeners(key);
}
if (this.listenerCount(key) === 0) {
this[subInfo].delete(key);
if (this[isReady]) {
this[innerClient].unSubscribe(reg);
}
}
}
/**
* do publish
*
* @param {Object} reg - publish info
* @return {void}
*/
[publish](reg) {
debug('[ClusterClient:%s] publish %j', this.options.name, reg);
const key = this.options.formatKey(reg);
this[pubInfo].set(key, reg);
if (this[isReady]) {
this[innerClient].publish(reg);
}
}
/**
* invoke a method asynchronously
*
* @param {String} method - the method name
* @param {Array} args - the arguments list
* @param {Function} callback - callback function
* @return {void}
*/
[invoke](method, args, callback) {
if (!this[isReady]) {
this.ready(err => {
if (err) {
callback && callback(err);
return;
}
this[innerClient].invoke(method, args, callback);
});
return;
}
debug('[ClusterClient:%s] invoke method: %s, args: %j', this.options.name, method, args);
this[innerClient].invoke(method, args, callback);
}
async [close]() {
try {
// close after ready, in case of innerClient is initializing
await this.ready();
} catch (err) {
// ignore
}
const client = this[innerClient];
if (client) {
// prevent re-initializing
client.removeListener('close', this[closeHandler]);
if (client.close) {
await utils.callFn(client.close.bind(client));
}
}
}
}
module.exports = ClusterClient;
'use strict';
const co = require('co');
const Base = require('./base');
const is = require('is-type-of');
const Base = require('sdk-base');
const assert = require('assert');
const utils = require('../utils');
const SdkBase = require('sdk-base');
const random = require('utility').random;
// Symbol
const {
logger,
isReady,
innerClient,
subscribe,
unSubscribe,
publish,
invoke,
close,
subInfo,
subscribeMethodName,
unSubscribeMethodName,
publishMethodName,
closeByUser,
createClient,
singleMode,

@@ -27,30 +18,27 @@ } = require('../symbol');

class SingleClient extends Base {
class InnerClient extends SdkBase {
constructor(options = {}) {
super(options);
this[isReady] = false;
this[closeByUser] = false;
this[singleMode] = true;
this[subInfo] = new Map();
if (_instances.has(options.name)) {
this[innerClient] = _instances.get(options.name);
} else {
this[innerClient] = options.createRealClient();
_instances.set(options.name, this[innerClient]);
this[innerClient].once('close', () => {
_instances.delete(options.name);
this[logger].info('[cluster#SingleClient] %s is closed.', this.options.name);
});
}
this[subscribeMethodName] = utils.findMethodName(options.descriptors, 'subscribe');
this[unSubscribeMethodName] = utils.findMethodName(options.descriptors, 'unSubscribe');
this[publishMethodName] = utils.findMethodName(options.descriptors, 'publish');
this._subData = new Map(); // <key, data>
this._subSet = new Set();
this._subListeners = new Map(); // <key, Array<Function>>
this._transcode = options.transcode;
this._realClient = options.createRealClient();
this._closeMethodName = utils.findMethodName(options.descriptors, 'close');
this._subscribeMethodName = utils.findMethodName(options.descriptors, 'subscribe');
this._publishMethodName = utils.findMethodName(options.descriptors, 'publish');
this._isReady = false;
this._closeByUser = false;
this._refCount = 1;
if (is.function(this[innerClient].ready)) {
this[innerClient].ready(err => {
// event delegate
utils.delegateEvents(this._realClient, this);
if (is.function(this._realClient.ready)) {
this._realClient.ready(err => {
if (err) {
this.ready(err);
} else {
this[isReady] = true;
this._isReady = true;
this.ready(true);

@@ -60,3 +48,3 @@ }

} else {
this[isReady] = true;
this._isReady = true;
this.ready(true);

@@ -66,50 +54,48 @@ }

get isClusterClientLeader() {
ref() {
this._refCount++;
}
get isLeader() {
return true;
}
/**
* log instance
* @property {Logger} SingleClient#[logger]
*/
get [logger]() {
return this.options.logger;
formatKey(reg) {
return '$$inner$$__' + this.options.formatKey(reg);
}
/**
* do subscribe
*
* @param {Object} reg - subscription info
* @param {Function} listener - callback function
* @return {void}
*/
[subscribe](reg, listener) {
if (!this[subscribeMethodName]) return;
subscribe(reg, listener) {
const key = this.formatKey(reg);
const transcode = this._transcode;
const isBroadcast = this.options.isBroadcast;
assert(is.function(listener), `[ClusterClient:${this.options.name}] subscribe(reg, listener) listener should be a function`);
const key = this.options.formatKey(reg);
this.on(key, listener);
const listeners = this._subListeners.get(key) || [];
listeners.push(listener);
this._subListeners.set(key, listeners);
const info = this[subInfo].get(key);
if (!info) {
this[subInfo].set(key, {
reg,
inited: false,
data: null,
});
this.ready(err => {
if (!err) {
this[innerClient][this[subscribeMethodName]](reg, data => {
this[subInfo].set(key, {
reg,
inited: true,
data,
});
this.emit(key, data);
});
if (!this._subSet.has(key)) {
this._subSet.add(key);
this._realClient[this._subscribeMethodName](reg, result => {
const data = transcode.encode(result);
this._subData.set(key, data);
let fns = this._subListeners.get(key);
if (!fns) {
return;
}
const len = fns.length;
// if isBroadcast equal to false, random pick one to notify
if (!isBroadcast) {
fns = [ fns[random(len)] ];
}
for (const fn of fns) {
fn(transcode.decode(data));
}
});
} else if (info.inited) {
} else if (this._subData.has(key) && isBroadcast) {
process.nextTick(() => {
listener(info.data);
const data = this._subData.get(key);
listener(transcode.decode(data));
});

@@ -119,68 +105,27 @@ }

/**
* do unSubscribe
*
* @param {Object} reg - subscription info
* @param {Function} listener - callback function
* @return {void}
*/
[unSubscribe](reg, listener) {
const key = this.options.formatKey(reg);
if (listener) {
this.removeListener(key, listener);
unSubscribe(reg, listener) {
const key = this.formatKey(reg);
if (!listener) {
this._subListeners.delete(key);
} else {
this.removeAllListeners(key);
}
const listeners = this._subListeners.get(key) || [];
const newListeners = [];
if (!this[unSubscribeMethodName]) return;
if (this.listenerCount(key) === 0) {
this[subInfo].delete(key);
if (this[isReady]) {
this[innerClient][this[unSubscribeMethodName]](reg);
for (const fn of listeners) {
if (fn === listener) {
continue;
}
newListeners.push(fn);
}
this._subListeners.set(key, newListeners);
}
}
/**
* do publish
*
* @param {Object} reg - publish info
* @return {void}
*/
[publish](reg) {
if (!this[publishMethodName]) return;
if (!this[isReady]) {
this.ready(err => {
if (!err) {
this[publish](reg);
}
});
return;
}
this[innerClient][this[publishMethodName]](reg);
publish(reg) {
this._realClient[this._publishMethodName](reg);
}
/**
* invoke a method asynchronously
*
* @param {String} methodName - the method name
* @param {Array} args - the arguments list
* @param {Function} callback - callback function
* @return {void}
*/
[invoke](methodName, args, callback) {
if (!this[isReady]) {
this.ready(err => {
if (err) {
callback && callback(err);
return;
}
this[invoke](methodName, args, callback);
});
return;
}
let method = this[innerClient][methodName];
invoke(methodName, args, callback) {
let method = this._realClient[methodName];
// compatible with generatorFunction

@@ -191,34 +136,61 @@ if (is.generatorFunction(method)) {

args.push(callback);
const ret = method.apply(this[innerClient], args);
const ret = method.apply(this._realClient, args);
if (callback && is.promise(ret)) {
ret.then(result => callback(null, result), err => callback(err))
// to avoid uncaught exception in callback function, then cause unhandledRejection
.catch(err => {
setImmediate(() => {
if (!this[closeByUser]) {
this.emit('error', err);
}
});
});
.catch(err => { this._errorHandler(err); });
}
}
async [close]() {
this[closeByUser] = true;
_instances.delete(this.options.name);
// emit error asynchronously
_errorHandler(err) {
setImmediate(() => {
if (!this._closeByUser) {
this.emit('error', err);
}
});
}
try {
// close after ready, in case of innerClient is initializing
await this.ready();
} catch (err) {
// ignore
async close() {
if (this._refCount > 0) {
this._refCount--;
}
if (this._refCount > 0) return;
const client = this[innerClient];
if (client && client.close) {
await utils.callFn(client.close.bind(client));
this._closeByUser = true;
if (this._realClient) {
if (this._closeMethodName) {
// support common function, generatorFunction, and function returning a promise
await utils.callFn(this._realClient[this._closeMethodName].bind(this._realClient));
}
}
this.emit('close');
}
}
class SingleClient extends Base {
get [singleMode]() {
return true;
}
async [createClient]() {
const options = this.options;
let client;
if (_instances.has(options.name)) {
client = _instances.get(options.name);
client.ref();
return client;
}
client = new InnerClient(options);
client.once('close', () => {
_instances.delete(options.name);
this[logger].info('[cluster#SingleClient] %s is closed.', options.name);
});
_instances.set(options.name, client);
return client;
}
}
module.exports = SingleClient;
{
"name": "cluster-client",
"version": "3.0.0",
"version": "3.0.1",
"description": "Sharing Connection among Multi-Process Nodejs",

@@ -38,3 +38,3 @@ "main": "./index.js",

"debug": "^4.1.1",
"egg-logger": "^2.3.1",
"egg-logger": "^2.3.2",
"is-type-of": "^1.2.1",

@@ -55,2 +55,3 @@ "json-stringify-safe": "^5.0.1",

"contributors": "^0.5.1",
"detect-port": "^1.3.0",
"egg-bin": "^4.11.0",

@@ -57,0 +58,0 @@ "egg-ci": "^1.8.0",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc