cluster-client
Advanced tools
Comparing version 3.0.0 to 3.0.1
3.0.1 / 2019-03-01 | ||
================== | ||
**fixes** | ||
* [[`6f9bf8e`](http://github.com/node-modules/cluster-client/commit/6f9bf8e300386f28c2a208aa8c394d441045c03c)] - fix: single mode bugs (#49) (zōng yǔ <<gxcsoccer@users.noreply.github.com>>) | ||
3.0.0 / 2019-02-26 | ||
@@ -3,0 +9,0 @@ ================== |
@@ -156,14 +156,2 @@ 'use strict'; | ||
invoke(methodName, args, callback) { | ||
if (!this._isReady) { | ||
this.ready(err => { | ||
if (err) { | ||
if (callback) { | ||
callback(err); | ||
} | ||
return; | ||
} | ||
this.invoke(methodName, args, callback); | ||
}); | ||
return; | ||
} | ||
let method = this._realClient[methodName]; | ||
@@ -170,0 +158,0 @@ // compatible with generatorFunction |
@@ -20,1 +20,2 @@ 'use strict'; | ||
exports.singleMode = Symbol.for('ClusterClient#singleMode'); | ||
exports.createClient = Symbol.for('ClusterClient#createClient'); |
'use strict'; | ||
const debug = require('debug')('cluster-client'); | ||
const is = require('is-type-of'); | ||
const Base = require('sdk-base'); | ||
const assert = require('assert'); | ||
const utils = require('../utils'); | ||
const Base = require('./base'); | ||
const Leader = require('../leader'); | ||
const Follower = require('../follower'); | ||
const ClusterServer = require('../server'); | ||
// Symbol | ||
const { | ||
@@ -16,30 +15,9 @@ init, | ||
innerClient, | ||
subscribe, | ||
unSubscribe, | ||
publish, | ||
invoke, | ||
subInfo, | ||
pubInfo, | ||
createClient, | ||
closeHandler, | ||
close, | ||
singleMode, | ||
} = require('../symbol'); | ||
class ClusterClient extends Base { | ||
/** | ||
* Share Connection among Multi-Process Mode | ||
* | ||
* @param {Object} options | ||
* - {Number} port - the port | ||
* - {Transcode} transcode - serialze / deseriaze methods | ||
* - {Boolean} isLeader - wehether is leader or follower | ||
* - {Number} maxWaitTime - leader startup max time (ONLY effective on isLeader is true) | ||
* - {Function} createRealClient - to create the real client instance | ||
* @constructor | ||
*/ | ||
constructor(options) { | ||
super(options); | ||
this[subInfo] = new Map(); | ||
this[pubInfo] = new Map(); | ||
this[singleMode] = false; | ||
@@ -52,26 +30,5 @@ this[closeHandler] = () => { | ||
}; | ||
this[init]().catch(err => { this.ready(err); }); | ||
// avoid warning message | ||
this.setMaxListeners(100); | ||
} | ||
get isClusterClientLeader() { | ||
return this[innerClient] && this[innerClient].isLeader; | ||
} | ||
/** | ||
* log instance | ||
* @property {Logger} ClusterClient#[logger] | ||
*/ | ||
get [logger]() { | ||
return this.options.logger; | ||
} | ||
/** | ||
* initialize, to leader or follower | ||
* | ||
* @return {void} | ||
*/ | ||
async [init]() { | ||
async [createClient]() { | ||
const name = this.options.name; | ||
@@ -94,162 +51,10 @@ const port = this.options.port; | ||
if (server) { | ||
this[innerClient] = new Leader(Object.assign({ server }, this.options)); | ||
debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port); | ||
} else { | ||
this[innerClient] = new Follower(this.options); | ||
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port); | ||
return new Leader(Object.assign({ server }, this.options)); | ||
} | ||
// events delegate | ||
utils.delegateEvents(this[innerClient], this); | ||
// re init when connection is close | ||
this[innerClient].on('close', this[closeHandler]); | ||
// wait leader/follower ready | ||
await this[innerClient].ready(); | ||
// subscribe all | ||
for (const key of this[subInfo].keys()) { | ||
const info = this[subInfo].get(key); | ||
const reg = info.reg; | ||
this[innerClient].subscribe(reg, data => { | ||
this[subInfo].set(key, { | ||
reg, | ||
inited: true, | ||
data, | ||
}); | ||
this.emit(key, data); | ||
}); | ||
} | ||
// publish all | ||
for (const reg of this[pubInfo].values()) { | ||
this[innerClient].publish(reg); | ||
} | ||
if (!this[isReady]) { | ||
this[isReady] = true; | ||
this.ready(true); | ||
} | ||
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port); | ||
return new Follower(this.options); | ||
} | ||
/** | ||
* do subscribe | ||
* | ||
* @param {Object} reg - subscription info | ||
* @param {Function} listener - callback function | ||
* @return {void} | ||
*/ | ||
[subscribe](reg, listener) { | ||
assert(is.function(listener), `[ClusterClient:${this.options.name}] subscribe(reg, listener) listener should be a function`); | ||
debug('[ClusterClient:%s] subscribe %j', this.options.name, reg); | ||
const key = this.options.formatKey(reg); | ||
this.on(key, listener); | ||
const info = this[subInfo].get(key); | ||
if (!info) { | ||
this[subInfo].set(key, { | ||
reg, | ||
inited: false, | ||
data: null, | ||
}); | ||
if (this[isReady]) { | ||
this[innerClient].subscribe(reg, data => { | ||
this[subInfo].set(key, { | ||
reg, | ||
inited: true, | ||
data, | ||
}); | ||
this.emit(key, data); | ||
}); | ||
} | ||
} else if (info.inited) { | ||
process.nextTick(() => { | ||
listener(info.data); | ||
}); | ||
} | ||
} | ||
/** | ||
* do unSubscribe | ||
* | ||
* @param {Object} reg - subscription info | ||
* @param {Function} listener - callback function | ||
* @return {void} | ||
*/ | ||
[unSubscribe](reg, listener) { | ||
debug('[ClusterClient:%s] unSubscribe %j', this.options.name, reg); | ||
const key = this.options.formatKey(reg); | ||
if (listener) { | ||
this.removeListener(key, listener); | ||
} else { | ||
this.removeAllListeners(key); | ||
} | ||
if (this.listenerCount(key) === 0) { | ||
this[subInfo].delete(key); | ||
if (this[isReady]) { | ||
this[innerClient].unSubscribe(reg); | ||
} | ||
} | ||
} | ||
/** | ||
* do publish | ||
* | ||
* @param {Object} reg - publish info | ||
* @return {void} | ||
*/ | ||
[publish](reg) { | ||
debug('[ClusterClient:%s] publish %j', this.options.name, reg); | ||
const key = this.options.formatKey(reg); | ||
this[pubInfo].set(key, reg); | ||
if (this[isReady]) { | ||
this[innerClient].publish(reg); | ||
} | ||
} | ||
/** | ||
* invoke a method asynchronously | ||
* | ||
* @param {String} method - the method name | ||
* @param {Array} args - the arguments list | ||
* @param {Function} callback - callback function | ||
* @return {void} | ||
*/ | ||
[invoke](method, args, callback) { | ||
if (!this[isReady]) { | ||
this.ready(err => { | ||
if (err) { | ||
callback && callback(err); | ||
return; | ||
} | ||
this[innerClient].invoke(method, args, callback); | ||
}); | ||
return; | ||
} | ||
debug('[ClusterClient:%s] invoke method: %s, args: %j', this.options.name, method, args); | ||
this[innerClient].invoke(method, args, callback); | ||
} | ||
async [close]() { | ||
try { | ||
// close after ready, in case of innerClient is initializing | ||
await this.ready(); | ||
} catch (err) { | ||
// ignore | ||
} | ||
const client = this[innerClient]; | ||
if (client) { | ||
// prevent re-initializing | ||
client.removeListener('close', this[closeHandler]); | ||
if (client.close) { | ||
await utils.callFn(client.close.bind(client)); | ||
} | ||
} | ||
} | ||
} | ||
module.exports = ClusterClient; |
'use strict'; | ||
const co = require('co'); | ||
const Base = require('./base'); | ||
const is = require('is-type-of'); | ||
const Base = require('sdk-base'); | ||
const assert = require('assert'); | ||
const utils = require('../utils'); | ||
const SdkBase = require('sdk-base'); | ||
const random = require('utility').random; | ||
// Symbol | ||
const { | ||
logger, | ||
isReady, | ||
innerClient, | ||
subscribe, | ||
unSubscribe, | ||
publish, | ||
invoke, | ||
close, | ||
subInfo, | ||
subscribeMethodName, | ||
unSubscribeMethodName, | ||
publishMethodName, | ||
closeByUser, | ||
createClient, | ||
singleMode, | ||
@@ -27,30 +18,27 @@ } = require('../symbol'); | ||
class SingleClient extends Base { | ||
class InnerClient extends SdkBase { | ||
constructor(options = {}) { | ||
super(options); | ||
this[isReady] = false; | ||
this[closeByUser] = false; | ||
this[singleMode] = true; | ||
this[subInfo] = new Map(); | ||
if (_instances.has(options.name)) { | ||
this[innerClient] = _instances.get(options.name); | ||
} else { | ||
this[innerClient] = options.createRealClient(); | ||
_instances.set(options.name, this[innerClient]); | ||
this[innerClient].once('close', () => { | ||
_instances.delete(options.name); | ||
this[logger].info('[cluster#SingleClient] %s is closed.', this.options.name); | ||
}); | ||
} | ||
this[subscribeMethodName] = utils.findMethodName(options.descriptors, 'subscribe'); | ||
this[unSubscribeMethodName] = utils.findMethodName(options.descriptors, 'unSubscribe'); | ||
this[publishMethodName] = utils.findMethodName(options.descriptors, 'publish'); | ||
this._subData = new Map(); // <key, data> | ||
this._subSet = new Set(); | ||
this._subListeners = new Map(); // <key, Array<Function>> | ||
this._transcode = options.transcode; | ||
this._realClient = options.createRealClient(); | ||
this._closeMethodName = utils.findMethodName(options.descriptors, 'close'); | ||
this._subscribeMethodName = utils.findMethodName(options.descriptors, 'subscribe'); | ||
this._publishMethodName = utils.findMethodName(options.descriptors, 'publish'); | ||
this._isReady = false; | ||
this._closeByUser = false; | ||
this._refCount = 1; | ||
if (is.function(this[innerClient].ready)) { | ||
this[innerClient].ready(err => { | ||
// event delegate | ||
utils.delegateEvents(this._realClient, this); | ||
if (is.function(this._realClient.ready)) { | ||
this._realClient.ready(err => { | ||
if (err) { | ||
this.ready(err); | ||
} else { | ||
this[isReady] = true; | ||
this._isReady = true; | ||
this.ready(true); | ||
@@ -60,3 +48,3 @@ } | ||
} else { | ||
this[isReady] = true; | ||
this._isReady = true; | ||
this.ready(true); | ||
@@ -66,50 +54,48 @@ } | ||
get isClusterClientLeader() { | ||
ref() { | ||
this._refCount++; | ||
} | ||
get isLeader() { | ||
return true; | ||
} | ||
/** | ||
* log instance | ||
* @property {Logger} SingleClient#[logger] | ||
*/ | ||
get [logger]() { | ||
return this.options.logger; | ||
formatKey(reg) { | ||
return '$$inner$$__' + this.options.formatKey(reg); | ||
} | ||
/** | ||
* do subscribe | ||
* | ||
* @param {Object} reg - subscription info | ||
* @param {Function} listener - callback function | ||
* @return {void} | ||
*/ | ||
[subscribe](reg, listener) { | ||
if (!this[subscribeMethodName]) return; | ||
subscribe(reg, listener) { | ||
const key = this.formatKey(reg); | ||
const transcode = this._transcode; | ||
const isBroadcast = this.options.isBroadcast; | ||
assert(is.function(listener), `[ClusterClient:${this.options.name}] subscribe(reg, listener) listener should be a function`); | ||
const key = this.options.formatKey(reg); | ||
this.on(key, listener); | ||
const listeners = this._subListeners.get(key) || []; | ||
listeners.push(listener); | ||
this._subListeners.set(key, listeners); | ||
const info = this[subInfo].get(key); | ||
if (!info) { | ||
this[subInfo].set(key, { | ||
reg, | ||
inited: false, | ||
data: null, | ||
}); | ||
this.ready(err => { | ||
if (!err) { | ||
this[innerClient][this[subscribeMethodName]](reg, data => { | ||
this[subInfo].set(key, { | ||
reg, | ||
inited: true, | ||
data, | ||
}); | ||
this.emit(key, data); | ||
}); | ||
if (!this._subSet.has(key)) { | ||
this._subSet.add(key); | ||
this._realClient[this._subscribeMethodName](reg, result => { | ||
const data = transcode.encode(result); | ||
this._subData.set(key, data); | ||
let fns = this._subListeners.get(key); | ||
if (!fns) { | ||
return; | ||
} | ||
const len = fns.length; | ||
// if isBroadcast equal to false, random pick one to notify | ||
if (!isBroadcast) { | ||
fns = [ fns[random(len)] ]; | ||
} | ||
for (const fn of fns) { | ||
fn(transcode.decode(data)); | ||
} | ||
}); | ||
} else if (info.inited) { | ||
} else if (this._subData.has(key) && isBroadcast) { | ||
process.nextTick(() => { | ||
listener(info.data); | ||
const data = this._subData.get(key); | ||
listener(transcode.decode(data)); | ||
}); | ||
@@ -119,68 +105,27 @@ } | ||
/** | ||
* do unSubscribe | ||
* | ||
* @param {Object} reg - subscription info | ||
* @param {Function} listener - callback function | ||
* @return {void} | ||
*/ | ||
[unSubscribe](reg, listener) { | ||
const key = this.options.formatKey(reg); | ||
if (listener) { | ||
this.removeListener(key, listener); | ||
unSubscribe(reg, listener) { | ||
const key = this.formatKey(reg); | ||
if (!listener) { | ||
this._subListeners.delete(key); | ||
} else { | ||
this.removeAllListeners(key); | ||
} | ||
const listeners = this._subListeners.get(key) || []; | ||
const newListeners = []; | ||
if (!this[unSubscribeMethodName]) return; | ||
if (this.listenerCount(key) === 0) { | ||
this[subInfo].delete(key); | ||
if (this[isReady]) { | ||
this[innerClient][this[unSubscribeMethodName]](reg); | ||
for (const fn of listeners) { | ||
if (fn === listener) { | ||
continue; | ||
} | ||
newListeners.push(fn); | ||
} | ||
this._subListeners.set(key, newListeners); | ||
} | ||
} | ||
/** | ||
* do publish | ||
* | ||
* @param {Object} reg - publish info | ||
* @return {void} | ||
*/ | ||
[publish](reg) { | ||
if (!this[publishMethodName]) return; | ||
if (!this[isReady]) { | ||
this.ready(err => { | ||
if (!err) { | ||
this[publish](reg); | ||
} | ||
}); | ||
return; | ||
} | ||
this[innerClient][this[publishMethodName]](reg); | ||
publish(reg) { | ||
this._realClient[this._publishMethodName](reg); | ||
} | ||
/** | ||
* invoke a method asynchronously | ||
* | ||
* @param {String} methodName - the method name | ||
* @param {Array} args - the arguments list | ||
* @param {Function} callback - callback function | ||
* @return {void} | ||
*/ | ||
[invoke](methodName, args, callback) { | ||
if (!this[isReady]) { | ||
this.ready(err => { | ||
if (err) { | ||
callback && callback(err); | ||
return; | ||
} | ||
this[invoke](methodName, args, callback); | ||
}); | ||
return; | ||
} | ||
let method = this[innerClient][methodName]; | ||
invoke(methodName, args, callback) { | ||
let method = this._realClient[methodName]; | ||
// compatible with generatorFunction | ||
@@ -191,34 +136,61 @@ if (is.generatorFunction(method)) { | ||
args.push(callback); | ||
const ret = method.apply(this[innerClient], args); | ||
const ret = method.apply(this._realClient, args); | ||
if (callback && is.promise(ret)) { | ||
ret.then(result => callback(null, result), err => callback(err)) | ||
// to avoid uncaught exception in callback function, then cause unhandledRejection | ||
.catch(err => { | ||
setImmediate(() => { | ||
if (!this[closeByUser]) { | ||
this.emit('error', err); | ||
} | ||
}); | ||
}); | ||
.catch(err => { this._errorHandler(err); }); | ||
} | ||
} | ||
async [close]() { | ||
this[closeByUser] = true; | ||
_instances.delete(this.options.name); | ||
// emit error asynchronously | ||
_errorHandler(err) { | ||
setImmediate(() => { | ||
if (!this._closeByUser) { | ||
this.emit('error', err); | ||
} | ||
}); | ||
} | ||
try { | ||
// close after ready, in case of innerClient is initializing | ||
await this.ready(); | ||
} catch (err) { | ||
// ignore | ||
async close() { | ||
if (this._refCount > 0) { | ||
this._refCount--; | ||
} | ||
if (this._refCount > 0) return; | ||
const client = this[innerClient]; | ||
if (client && client.close) { | ||
await utils.callFn(client.close.bind(client)); | ||
this._closeByUser = true; | ||
if (this._realClient) { | ||
if (this._closeMethodName) { | ||
// support common function, generatorFunction, and function returning a promise | ||
await utils.callFn(this._realClient[this._closeMethodName].bind(this._realClient)); | ||
} | ||
} | ||
this.emit('close'); | ||
} | ||
} | ||
class SingleClient extends Base { | ||
get [singleMode]() { | ||
return true; | ||
} | ||
async [createClient]() { | ||
const options = this.options; | ||
let client; | ||
if (_instances.has(options.name)) { | ||
client = _instances.get(options.name); | ||
client.ref(); | ||
return client; | ||
} | ||
client = new InnerClient(options); | ||
client.once('close', () => { | ||
_instances.delete(options.name); | ||
this[logger].info('[cluster#SingleClient] %s is closed.', options.name); | ||
}); | ||
_instances.set(options.name, client); | ||
return client; | ||
} | ||
} | ||
module.exports = SingleClient; |
{ | ||
"name": "cluster-client", | ||
"version": "3.0.0", | ||
"version": "3.0.1", | ||
"description": "Sharing Connection among Multi-Process Nodejs", | ||
@@ -38,3 +38,3 @@ "main": "./index.js", | ||
"debug": "^4.1.1", | ||
"egg-logger": "^2.3.1", | ||
"egg-logger": "^2.3.2", | ||
"is-type-of": "^1.2.1", | ||
@@ -55,2 +55,3 @@ "json-stringify-safe": "^5.0.1", | ||
"contributors": "^0.5.1", | ||
"detect-port": "^1.3.0", | ||
"egg-bin": "^4.11.0", | ||
@@ -57,0 +58,0 @@ "egg-ci": "^1.8.0", |
23
77285
16
2005
Updatedegg-logger@^2.3.2