Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

cluster-service

Package Overview
Dependencies
Maintainers
1
Versions
66
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

cluster-service - npm Package Compare versions

Comparing version 0.6.2 to 0.7.0

.jshintrc

74

cluster-service.js

@@ -1,6 +0,4 @@

var
cluster = require("cluster"),
colors = require("colors"),
locals = require("./lib/defaults")
;
var cluster = require("cluster");
var colors = require("colors");
var locals = require("./lib/defaults");

@@ -17,37 +15,39 @@ module.exports = exports;

Object.defineProperty(exports, "workers", {
get: require("./lib/workers").get
get: require("./lib/workers").get
});
Object.defineProperty(exports, "isMaster", {
get: function() {
return cluster.isMaster;
}
get: function() {
return cluster.isMaster;
}
});
Object.defineProperty(exports, "isWorker", {
get: function() {
return cluster.isWorker;
}
get: function() {
return cluster.isWorker;
}
});
Object.defineProperty(exports, "options", {
get: function() {
return locals.options;
}
get: function() {
return locals.options;
}
});
Object.defineProperty(exports, "locals", {
get: function() {
return locals;
}
get: function() {
return locals;
}
});
if (cluster.isMaster === true) {
exports.control = require("./lib/control").addControls;
exports.stop = require("./lib/stop");
exports.trigger = require("./lib/trigger");
exports.newWorker = require("./lib/new-worker");
exports.on = require("./lib/on");
exports.control = require("./lib/control").addControls;
exports.stop = require("./lib/stop");
exports.trigger = require("./lib/trigger");
exports.newWorker = require("./lib/new-worker");
exports.on = require("./lib/commands").on;
exports.registerCommands = require("./lib/commands").register;
} else {
exports.on = function() { };
exports.on = function() { };
exports.registerCommands = function() { };
}

@@ -57,11 +57,21 @@

if (cluster.isWorker === true && typeof (cluster.worker.module) === "undefined") {
cluster.worker.module = {}; // intermediate state to prevent 2nd call while async in progress
// load the worker if not already loaded
setTimeout(function() { // async, in case worker loads cluster-service, we need to return before it's avail
cluster.worker.module = require(process.env.worker);
}, 10);
if (
cluster.isWorker === true
&& typeof (cluster.worker.module) === "undefined"
){
// intermediate state to prevent 2nd call while async in progress
cluster.worker.module = {};
// load the worker if not already loaded
// async, in case worker loads cluster-service, we need to return before
// it's avail
setTimeout(function() {
cluster.worker.module = require(process.env.worker);
if (locals.workerReady === undefined && process.env.ready === false) {
// if workerReady not invoked explicitly, inform master worker is ready
exports.workerReady();
}
}, 10);
// start worker monitor to establish two-way relationship with master
require("./lib/workers").monitor();
// start worker monitor to establish two-way relationship with master
require("./lib/workers").monitor();
}

@@ -1,65 +0,75 @@

var
cservice = require("../cluster-service"),
util = require("util"),
locals = null,
options = null
;
var cservice = require("../cluster-service"),
util = require("util"),
locals = null,
options = null;
exports.init = function(l, o) {
locals = l;
options = o;
locals = l;
options = o;
util.inspect.styles.name = "grey";
util.inspect.styles.name = "grey";
cservice.log("CLI is now available. Enter 'help [enter]' for instructions.".info);
process.stdin.resume();
process.stdin.setEncoding('utf8');
process.stdin.on("data", onCommand);
cservice.log(
"CLI is now available. Enter 'help [enter]' for instructions.".info
);
process.stdin.resume();
process.stdin.setEncoding('utf8');
// wait momentarily before attaching CLI. allows workers a little time to output as needed
setTimeout(function() {
process.stdout.write("cservice> ".cservice);
}, 1000);
process.stdin.on("data", onCommand);
// wait momentarily before attaching CLI. allows workers a little time
// to output as needed
setTimeout(function() {
process.stdout.write("cservice> ".cservice);
}, 1000);
};
exports.close = function() {
try {
process.stdin.pause();
} catch (ex) {
}
try {
process.stdin.pause();
} catch (ex) {
}
};
function onCommand(question) {
question = question.replace(/[\r\n]/g, "");
var args = require("./util").getArgsFromQuestion(question, " ");
args = [args[0], onCallback].concat(args.slice(1));
if (!locals.events[args[0]]) {
onCallback("Command " + args[0] + " not found. Try 'help'.");
var args;
question = question.replace(/[\r\n]/g, "");
args = require("./util").getArgsFromQuestion(question, " ");
args = [args[0], onCallback].concat(args.slice(1));
return;
}
try {
cservice.trigger.apply(null, args);
} catch(ex) {
cservice.error("Command Error " + args[0], util.inspect(ex, { depth:null } ), ex.stack || new Error().stack);
if (!locals.events[args[0]]) {
onCallback("Command " + args[0] + " not found. Try 'help'.");
onCallback();
}
return;
}
try {
cservice.trigger.apply(null, args);
} catch (ex) {
cservice.error(
"Command Error " + args[0],
util.inspect(ex, {depth: null}), ex.stack || new Error().stack
);
onCallback();
}
}
function onCallback(err, result) {
delete locals.reason;
if (err) {
cservice.error("Error: ", err, err.stack ? util.inspect(err.stack, { depth:null, colors: true}) : "");
} else if (result) {
cservice.log(util.inspect(result, { depth: null, colors: true }));
}
delete locals.reason;
//cservice.log("");//newline
process.stdout.write("cservice> ".cservice);
if (err) {
cservice.error(
"Error: ",
err,
err.stack
? util.inspect(err.stack, {depth: null, colors: true})
: ""
);
} else if (result) {
cservice.log(util.inspect(result, {depth: null, colors: true}));
}
//cservice.log("");//newline
process.stdout.write("cservice> ".cservice);
}

@@ -1,30 +0,28 @@

var
util = require("util"),
cservice = require("../../cluster-service")
;
var util = require("util"),
cservice = require("../../cluster-service");
module.exports = function(evt, cb, cmd) {
if (cmd !== "now") {
cb("Invalid request, 'now' required. Try help exit");
return;
}
if (cmd !== "now") {
cb("Invalid request, 'now' required. Try help exit");
return;
}
cservice.log("*** FORCEFUL TERMINATION REQUESTED ***".warn);
cservice.log("Exiting now.".warn);
cservice.log("*** FORCEFUL TERMINATION REQUESTED ***".warn);
cservice.log("Exiting now.".warn);
process.exit(0); // exit master
process.exit(0); // exit master
cb(null, "Exiting now.");
cb(null, "Exiting now.");
};
module.exports.more = function(cb) {
cb(null, {
info: "Forcefully exits the service.",
command: "exit now",
"now": "Required. 'now' to force exit."
});
cb(null, {
info: "Forcefully exits the service.",
command: "exit now",
"now": "Required. 'now' to force exit."
});
};
module.exports.control = function(){
return "local";
module.exports.control = function() {
return "local";
};

@@ -1,14 +0,15 @@

var
cluster = require("cluster")
;
var cluster = require("cluster");
module.exports = function(evt, cb) {
cb(null, "OK");
cb(null, "OK");
};
module.exports.more = function(cb) {
cb(null, {
command: "health",
info: "Returns health of service. May be overidden by service to expose app-specific data."
});
cb(null, {
command: "health",
info: [
"Returns health of service.",
"May be overidden by service to expose app-specific data."
].join(' ')
});
};

@@ -1,40 +0,39 @@

var
util = require("util")
;
var util = require("util");
module.exports = function(evt, cb, cmdName) {
var evt_name, cmd, line_idx = 0;
var ret = { };
if (typeof cmdName === "string") {
ret.command = cmdName;
cmd = evt.locals.events[cmdName];
if (!cmd) {
ret.err = "Command not found";
} else {
if (typeof cmd.cb.more === "function") {
cmd.cb.more(function(err, result) {
cb(null, result);
});
return;
} else {
ret.more = "No additional details found.";
}
}
} else { // full listing
ret.more = "Commands (Use 'help [command_name]' for more details)";
ret.commands = [];
for (evt_name in evt.locals.events) {
cmd = evt.locals.events[evt_name];
ret.commands.push(evt_name);
}
}
var evtName, cmd, ret = {};
if (typeof cmdName === "string") {
ret.command = cmdName;
cmd = evt.locals.events[cmdName];
if (!cmd) {
ret.err = "Command not found";
} else {
if (typeof cmd.cb.more === "function") {
cmd.cb.more(function(err, result) {
cb(null, result);
});
return;
} else {
ret.more = "No additional details found.";
}
}
} else { // full listing
ret.more = "Commands (Use 'help [command_name]' for more details)";
ret.commands = [];
for (evtName in evt.locals.events) {
cmd = evt.locals.events[evtName];
if (cmd.cb.visible === false)
continue;
ret.commands.push(evtName);
}
}
cb(null, ret);
cb(null, ret);
};
module.exports.more = function(cb) {
cb(null, {
"command": "help [command_name]",
"command_name": "Optional if you want extended help"
});
cb(null, {
"command": "help [command_name]",
"command_name": "Optional if you want extended help"
});
};

@@ -1,123 +0,134 @@

var
async = require("async"),
util = require("util"),
cservice = require("../../cluster-service")
;
var async = require("async"),
util = require("util"),
cservice = require("../../cluster-service");
module.exports = function(evt, cb, cmd, options) {
var pid = parseInt(cmd);
options = options || {};
options.timeout = parseInt(options.timeout) || 60000;
if (cmd !== "all" && !pid) {
cb("Invalid request. Try help restart");
return;
}
var pid = parseInt(cmd),
originalAutoRestart,
tasks;
options = options || {};
options.timeout = parseInt(options.timeout) || 60000;
if (cmd !== "all" && !pid) {
cb("Invalid request. Try help restart");
return;
}
evt.locals.reason = "restart";
var originalAutoRestart = evt.locals.restartOnFailure;
evt.locals.restartOnFailure = false;
var workers = evt.service.workers;
var tasks = [];
evt.locals.reason = "restart";
originalAutoRestart = evt.locals.restartOnFailure;
evt.locals.restartOnFailure = false;
for (var w in workers) {
var worker = workers[w];
if (pid && worker.process.pid !== pid) {
continue; // cannot kill external processes
}
tasks = [];
tasks.push(getTask(evt, worker, options, (pid ? true : false)));
}
evt.service.workers.forEach(function(worker){
if (pid && worker.process.pid !== pid) {
return; // cannot kill external processes
}
if (tasks.length === 0) {
cb("No workers to restart");
} else {
cservice.log("Restarting workers... timeout: ".warn + options.timeout.toString().info);
tasks.push(getTask(evt, worker, options, (pid ? true : false)));
});
async.series(tasks, function(err) {
evt.locals.restartOnFailure = originalAutoRestart;
if (tasks.length === 0) {
cb("No workers to restart");
} else {
cservice.log(
"Restarting workers... timeout: ".warn + options.timeout.toString().info
);
if (err) {
cb(err);
} else {
cb(null, tasks.length + " workers restarted successfully");
}
});
}
async.series(tasks, function(err) {
evt.locals.restartOnFailure = originalAutoRestart;
if (err) {
cb(err);
} else {
cb(null, tasks.length + " workers restarted successfully");
}
});
}
};
module.exports.more = function(cb) {
cb(null, {
info: "Gracefully restart service, waiting up to timeout before terminating workers.",
command: "restart all|pid { \"option1\": \"value\" }",
"all|pid": "Required. 'all' to force shutdown of all workers, otherwise the pid of the specific worker to restart",
"options": "An object of options.",
"* timeout": "Timeout, in milliseconds, before terminating workers. 0 for infinite wait."
});
cb(null, {
info: [
"Gracefully restart service, waiting up to timeout before terminating",
"workers."
].join(' '),
command: "restart all|pid { \"option1\": \"value\" }",
"all|pid": [
"Required. 'all' to force shutdown of all workers, otherwise the pid of",
"the specific worker to restart"
].join(' '),
"options": "An object of options.",
"* timeout": [
"Timeout, in milliseconds, before terminating workers.",
"0 for infinite wait."
].join(' ')
});
};
function getTask(evt, worker, options, explicitRestart) {
return function(cb) {
return function(cb) {
// kill new worker if takes too long
var newKiller = null;
var newWorker = null;
var exitListener = function() {
if (newKiller) {
clearTimeout(newKiller);
}
};
var w;
// kill new worker if takes too long
var new_killer = null;
var new_worker = null;
if (worker.cservice.restart === false && explicitRestart === false) {
cservice.log(
"Worker process " + worker.process.pid + " immune to restarts"
);
cb();
return;
}
var exit_listener = function() {
if (new_killer) {
clearTimeout(new_killer);
}
};
if (options.timeout > 0) { // start timeout if specified
newKiller = setTimeout(function() {
w = newWorker;
newWorker = null;
if (w) {
w.removeListener("exit", exitListener); // remove temp listener
w.kill("SIGKILL"); // go get'em, killer
}
cb("timed out");
}, options.timeout);
}
if (worker.cservice.restart === false && explicitRestart === false) {
cservice.log("Worker process " + worker.process.pid + " immune to restarts");
cb();
return;
}
if (options.timeout > 0) { // start timeout if specified
new_killer = setTimeout(function() {
var w = new_worker;
new_worker = null;
if (w) {
w.removeListener("exit", exit_listener); // remove temp listener
w.kill("SIGKILL"); // go get'em, killer
}
cb("timed out");
}, options.timeout);
}
// lets start new worker
newWorker = evt.service.newWorker(worker.cservice, function(err, newWorker){
var killer;
newWorker.removeListener("exit", exitListener); // remove temp listener
newWorker = null;
if (newKiller) { // timeout no longer needed
clearTimeout(newKiller);
}
// lets start new worker
var new_worker = evt.service.newWorker(worker.cservice, function(err, new_worker) {
new_worker.removeListener("exit", exit_listener); // remove temp listener
new_worker = null;
if (new_killer) { // timeout no longer needed
clearTimeout(new_killer);
}
// ok, lets stop old worker
killer = null;
if (options.timeout > 0) { // start timeout if specified
killer = setTimeout(function() {
worker.kill("SIGKILL"); // go get'em, killer
}, options.timeout);
}
// ok, lets stop old worker
var killer = null;
if (options.timeout > 0) { // start timeout if specified
killer = setTimeout(function() {
worker.kill("SIGKILL"); // go get'em, killer
}, options.timeout);
}
worker.on("exit", function() {
if (killer) {
clearTimeout(killer);
}
worker.on("exit", function() {
if (killer) {
clearTimeout(killer);
}
// exit complete, fire callback
setTimeout(cb, 100); // slight delay in case other events are piled up
});
// exit complete, fire callback
setTimeout(cb, 100); // slight delay in case other events are piled up
});
if (worker.cservice.onStop === true) {
worker.send({cservice: {cmd: "onWorkerStop"}});
} else {
worker.kill("SIGTERM"); // exit worker
}
if (worker.cservice.onStop === true) {
worker.send({ cservice: { cmd: "onWorkerStop" } });
} else {
worker.kill("SIGTERM"); // exit worker
}
});
};
});
};
}

@@ -1,92 +0,115 @@

var
util = require("util"),
cservice = require("../../cluster-service")
;
/* jshint loopfunc:true */
var util = require("util"),
cservice = require("../../cluster-service");
module.exports = function(evt, cb, cmd, options) {
var pid = parseInt(cmd);
options = options || {};
options.timeout = parseInt(options.timeout) || 60000;
if (cmd !== "all" && !pid) {
cb("Invalid request. Try help shutdown");
return;
}
var pid = parseInt(cmd);
var workersToKill;
var exiting;
evt.locals.reason = "shutdown";
options = options || {};
options.timeout = parseInt(options.timeout) || 60000;
if (cmd !== "all" && !pid) {
cb("Invalid request. Try help shutdown");
return;
}
var workersToKill = 0;
var workers = evt.service.workers;
var exiting = false;
for (var w in workers) {
var worker = workers[w];
if (pid && worker.process.pid !== pid) {
continue; // cannot kill external processes
}
evt.locals.reason = "shutdown";
exiting = true;
workersToKill++;
worker.process.on("exit", getExitHandler(evt, worker, options.timeout > 0 ? setTimeout(getKiller(worker), options.timeout) : null, function() {
workersToKill--;
if (workersToKill === 0) {
// no workers remain
if (evt.service.workers.length === 0) {
evt.locals.reason = "kill";
cservice.log("All workers shutdown. Exiting...".warn);
evt.service.stop(options.timeout, cb);
} else {
cb(null, "Worker shutdown"); // DONE
}
}
}));
if (worker.onWorkerStop === true) { // try the nice way first
worker.send({ cservice: { cmd: "onWorkerStop" } });
} else {
worker.kill("SIGTERM"); // exit worker
}
}
if (exiting === false) {
if (evt.service.workers.length === 0) {
evt.locals.reason = "kill";
cservice.log("All workers shutdown. Exiting...");
evt.service.stop(options.timeout, cb);
} else {
cb("No workers were shutdown");
}
} else {
cservice.log("Killing workers... timeout: ".warn + (options.timeout || 0).toString().info);
}
workersToKill = 0;
exiting = false;
evt.service.workers.forEach(function(worker){
if (pid && worker.process.pid !== pid) {
return; // cannot kill external processes
}
exiting = true;
workersToKill++;
worker.process.on(
"exit",
getExitHandler(
evt
, worker
, options.timeout > 0
? setTimeout(getKiller(worker), options.timeout)
: null
, function() {
workersToKill--;
if (workersToKill === 0) {
// no workers remain
if (evt.service.workers.length === 0) {
evt.locals.reason = "kill";
cservice.log("All workers shutdown. Exiting...".warn);
evt.service.stop(options.timeout, cb);
} else {
cb(null, "Worker shutdown"); // DONE
}
}
}
)
);
if (worker.onWorkerStop === true) { // try the nice way first
worker.send({cservice: {cmd: "onWorkerStop"}});
} else {
worker.kill("SIGTERM"); // exit worker
}
});
if (exiting === false) {
if (evt.service.workers.length === 0) {
evt.locals.reason = "kill";
cservice.log("All workers shutdown. Exiting...");
evt.service.stop(options.timeout, cb);
} else {
cb("No workers were shutdown");
}
} else {
cservice.log(
"Killing workers... timeout: ".warn +
(options.timeout || 0).toString().info
);
}
};
module.exports.more = function(cb) {
cb(null, {
info: "Gracefully shutdown service, waiting up to timeout before terminating workers.",
command: "shutdown all|pid { \"option1\": \"value\" }",
"all|pid": "Required. 'all' to force shutdown of all workers, otherwise the pid of the specific worker to shutdown",
"options": "An object of options.",
"* timeout": "Timeout, in milliseconds, before terminating workers. 0 for infinite wait."
});
cb(null, {
info: [
"Gracefully shutdown service, waiting up to timeout before terminating",
"workers."
].join(' '),
command: "shutdown all|pid { \"option1\": \"value\" }",
"all|pid": [
"Required. 'all' to force shutdown of all workers, otherwise the pid of",
"the specific worker to shutdown"
].join(' '),
"options": "An object of options.",
"* timeout": [
"Timeout, in milliseconds, before terminating workers. 0 for infinite",
"wait."
].join(' ')
});
};
module.exports.control = function(){
return "local";
module.exports.control = function() {
return "local";
};
function getKiller(worker) {
return function() {
worker.kill("SIGKILL"); // go get'em, killer
};
return function() {
worker.kill("SIGKILL"); // go get'em, killer
};
}
function getExitHandler(evt, worker, killer, cb) {
return function() {
if (killer) {
clearTimeout(killer);
killer = null;
}
cb();
};
return function() {
if (killer) {
clearTimeout(killer);
killer = null;
}
cb();
};
}

@@ -1,82 +0,92 @@

var
async = require("async"),
util = require("util"),
cservice = require("../../cluster-service")
;
var async = require("async"),
util = require("util"),
cservice = require("../../cluster-service");
module.exports = function(evt, cb, workerPath, options) {
options = options || {};
options.cwd = options.cwd || process.cwd();
options.count = parseInt(options.count) || 1;
options.timeout = parseInt(options.timeout) || 60000;
options.worker = workerPath;
if (typeof workerPath !== "string" || options.count < 1) {
cb("Invalid request. Try help start");
return;
}
var tasks;
var i;
options = options || {};
options.cwd = options.cwd || process.cwd();
options.count = parseInt(options.count) || 1;
options.timeout = parseInt(options.timeout) || 60000;
options.worker = workerPath;
if (typeof workerPath !== "string" || options.count < 1) {
cb("Invalid request. Try help start");
return;
}
evt.locals.reason = "start";
evt.locals.restartOnFailure = false;
var tasks = [];
evt.locals.reason = "start";
evt.locals.restartOnFailure = false;
cservice.log("Starting workers... timeout: " + (options.timeout || 0));
tasks = [];
for (var i = 0; i < options.count; i++) {
tasks.push(getTask(evt, options));
}
async.series(tasks, function(err) {
if (err) {
cb(err);
} else {
cb(null, tasks.length + " workers started successfully");
}
});
cservice.log("Starting workers... timeout: " + (options.timeout || 0));
for (i = 0; i < options.count; i++) {
tasks.push(getTask(evt, options));
}
async.series(tasks, function(err) {
if (err) {
cb(err);
} else {
cb(null, tasks.length + " workers started successfully");
}
});
};
module.exports.more = function(cb) {
cb(null, {
info: "Gracefully start service, one worker at a time.",
command: "start workerPath { \"option1\": \"value\" }",
"workerPath": "Path of worker file (i.e. /workers/worker) to start, absolute path, or relative to cwd.",
"options": "An object of options.",
"* cwd": "Path to set as the current working directory. If not provided, existing cwd will be used.",
"* count": "The number of workers to start, or 1 if not specified.",
"* timeout": "Timeout, in milliseconds, before terminating replaced workers. 0 for infinite wait.",
"* ready": "If false, will wait for workerReady event before assuming success."
});
cb(null, {
info: "Gracefully start service, one worker at a time.",
command: "start workerPath { \"option1\": \"value\" }",
"workerPath": [
"Path of worker file (i.e. /workers/worker) to start, absolute path, or",
"relative to cwd."
].join(' '),
"options": "An object of options.",
"* cwd": [
"Path to set as the current working directory. If not provided, existing",
"cwd will be used."
].join(' '),
"* count": "The number of workers to start, or 1 if not specified.",
"* timeout": [
"Timeout, in milliseconds, before terminating replaced workers. 0 for",
"infinite wait."
].join(' '),
"* ready":
"If false, will wait for workerReady event before assuming success."
});
};
function getTask(evt, options) {
return function(cb) {
return function(cb) {
// kill new worker if takes too long
var newKiller = null;
var newWorker;
var exitListener = function() {
if (newKiller) {
clearTimeout(newKiller);
}
};
// kill new worker if takes too long
var new_killer = null;
if (options.timeout > 0) { // start timeout if specified
newKiller = setTimeout(function() {
newWorker.removeListener("exit", exitListener); // remove temp listener
newWorker.kill("SIGKILL"); // go get'em, killer
cb("timed out");
}, options.timeout);
}
var exit_listener = function() {
if (new_killer) {
clearTimeout(new_killer);
}
};
// lets start new worker
newWorker = evt.service.newWorker(options, function(err, newWorker) {
newWorker.removeListener("exit", exitListener); // remove temp listener
if (newKiller) { // timeout no longer needed
clearTimeout(newKiller);
}
if (options.timeout > 0) { // start timeout if specified
new_killer = setTimeout(function() {
new_worker.removeListener("exit", exit_listener); // remove temp listener
new_worker.kill("SIGKILL"); // go get'em, killer
cb("timed out");
}, options.timeout);
}
cb(err);
// lets start new worker
var new_worker = evt.service.newWorker(options, function(err, new_worker) {
new_worker.removeListener("exit", exit_listener); // remove temp listener
if (new_killer) { // timeout no longer needed
clearTimeout(new_killer);
}
cb(err);
});
};
});
};
}

@@ -1,118 +0,131 @@

var
async = require("async"),
util = require("util"),
extend = require("extend"),
cservice = require("../../cluster-service")
;
var async = require("async"),
util = require("util"),
extend = require("extend"),
cservice = require("../../cluster-service");
module.exports = function(evt, cb, cmd, workerPath, options) {
var pid = parseInt(cmd);
options = options || {};
options.timeout = parseInt(options.timeout) || 60000;
options.worker = workerPath;
if (typeof workerPath !== "string" || (cmd !== "all" && !pid)) {
cb("Invalid request. Try help upgrade");
return;
}
var pid = parseInt(cmd);
var originalAutoRestart;
var tasks;
var workerOptions;
evt.locals.reason = "upgrade";
var originalAutoRestart = evt.locals.restartOnFailure;
evt.locals.restartOnFailure = false;
var workers = evt.service.workers;
var tasks = [];
options = options || {};
options.timeout = parseInt(options.timeout) || 60000;
options.worker = workerPath;
if (typeof workerPath !== "string" || (cmd !== "all" && !pid)) {
cb("Invalid request. Try help upgrade");
return;
}
for (var w in workers) {
var worker = workers[w];
if (pid && worker.process.pid !== pid) {
continue; // cannot kill external processes
}
evt.locals.reason = "upgrade";
originalAutoRestart = evt.locals.restartOnFailure;
evt.locals.restartOnFailure = false;
// use original worker options as default, by overwrite using new options
var workerOptions = extend(true, {}, worker.cservice, options);
tasks = [];
tasks.push(getTask(evt, worker, workerOptions));
}
if (tasks.length === 0) {
cb("No workers to upgrade");
} else {
cservice.log("Upgrading workers... timeout: " + (options.timeout || 0));
evt.service.workers.forEach(function(worker){
if (pid && worker.process.pid !== pid) {
return; // cannot kill external processes
}
async.series(tasks, function(err) {
evt.locals.restartOnFailure = originalAutoRestart;
// use original worker options as default, by overwrite using new options
workerOptions = extend(true, {}, worker.cservice, options);
if (err) {
cb(err);
} else {
cb(null, tasks.length + " workers upgraded successfully");
}
});
}
tasks.push(getTask(evt, worker, workerOptions));
});
if (tasks.length === 0) {
cb("No workers to upgrade");
} else {
cservice.log("Upgrading workers... timeout: " + (options.timeout || 0));
async.series(tasks, function(err) {
evt.locals.restartOnFailure = originalAutoRestart;
if (err) {
cb(err);
} else {
cb(null, tasks.length + " workers upgraded successfully");
}
});
}
};
module.exports.more = function(cb) {
cb(null, {
info: "Gracefully upgrade service, one worker at a time.",
command: "upgrade all|pid workerPath { \"option1\": \"value\" }",
"all|pid": "Required. 'all' to force shutdown of all workers, otherwise the pid of the specific worker to upgrade",
"workerPath": "Path of worker file (i.e. /workers/worker) to start, absolute path, or relative to cwd.",
"options": "An object of options.",
"* cwd": "Path to set as the current working directory. If not provided, existing cwd will be used.",
"* timeout": "Timeout, in milliseconds, before terminating replaced workers. 0 for infinite wait."
});
cb(null, {
info: "Gracefully upgrade service, one worker at a time.",
command: "upgrade all|pid workerPath { \"option1\": \"value\" }",
"all|pid": [
"Required. 'all' to force shutdown of all workers, otherwise the pid of",
"the specific worker to upgrade"
].join(' '),
"workerPath":[
"Path of worker file (i.e. /workers/worker) to start, absolute path, or",
"relative to cwd."
].join(' '),
"options": "An object of options.",
"* cwd":[
"Path to set as the current working directory. If not provided, existing",
"cwd will be used."
].join(' '),
"* timeout": [
"Timeout, in milliseconds, before terminating replaced workers. 0 for",
"infinite wait."
].join(' ')
});
};
function getTask(evt, worker, options) {
return function(cb) {
return function(cb) {
// kill new worker if takes too long
var new_killer = null;
// kill new worker if takes too long
var newKiller = null;
var newWorker;
var exitListener = function() {
if (newKiller) {
clearTimeout(newKiller);
}
};
var killer;
var exit_listener = function() {
if (new_killer) {
clearTimeout(new_killer);
}
};
if (options.timeout > 0) { // start timeout if specified
newKiller = setTimeout(function() {
newWorker.removeListener("exit", exitListener);// remove temp listener
newWorker.kill("SIGKILL"); // go get'em, killer
cb("timed out");
}, options.timeout);
}
if (options.timeout > 0) { // start timeout if specified
new_killer = setTimeout(function() {
new_worker.removeListener("exit", exit_listener); // remove temp listener
new_worker.kill("SIGKILL"); // go get'em, killer
cb("timed out");
}, options.timeout);
}
// lets start new worker
newWorker = evt.service.newWorker(options, function(err, newWorker) {
newWorker.removeListener("exit", exitListener); // remove temp listener
if (newKiller) { // timeout no longer needed
clearTimeout(newKiller);
}
// lets start new worker
var new_worker = evt.service.newWorker(options, function(err, new_worker) {
new_worker.removeListener("exit", exit_listener); // remove temp listener
if (new_killer) { // timeout no longer needed
clearTimeout(new_killer);
}
// ok, lets stop old worker
killer = null;
if (options.timeout > 0) { // start timeout if specified
killer = setTimeout(function() {
worker.kill("SIGKILL"); // go get'em, killer
}, options.timeout);
}
// ok, lets stop old worker
var killer = null;
if (options.timeout > 0) { // start timeout if specified
killer = setTimeout(function() {
worker.kill("SIGKILL"); // go get'em, killer
}, options.timeout);
}
worker.on("exit", function() {
if (killer) {
clearTimeout(killer);
}
worker.on("exit", function() {
if (killer) {
clearTimeout(killer);
}
// exit complete, fire callback
setTimeout(cb, 250); // slight delay in case other events are piled up
});
// exit complete, fire callback
setTimeout(cb, 250); // slight delay in case other events are piled up
});
if (worker.cservice.onStop === true) {
worker.send({cservice: {cmd: "onWorkerStop"}});
} else {
worker.kill("SIGTERM"); // exit worker
}
if (worker.cservice.onStop === true) {
worker.send({ cservice: { cmd: "onWorkerStop" } });
} else {
worker.kill("SIGTERM"); // exit worker
}
});
};
});
};
}

@@ -1,15 +0,13 @@

var
cservice = require("../../cluster-service")
;
var cservice = require("../../cluster-service");
module.exports = function(evt, cb) {
var pkg = require("../../package.json");
cb(null, pkg.version);
var pkg = require("../../package.json");
cb(null, pkg.version);
};
module.exports.more = function(cb) {
cb(null, {
info: "Get version of cluster-service.",
command: "version"
});
cb(null, {
info: "Get version of cluster-service.",
command: "version"
});
};

@@ -1,76 +0,75 @@

var
async = require("async"),
extend = require("extend")
;
var async = require("async"),
extend = require("extend");
module.exports = function(evt, cb, cmd) {
processDetails(evt.service.workers, function(err, workers) {
var ret = { };
cmd = cmd || "simple";
switch (cmd) {
case "details":
ret.workers = workers;
break;
default:
ret.workers = workerSummary(workers);
break;
}
processDetails(evt.service.workers, function(err, workers) {
var ret = {};
cmd = cmd || "simple";
switch (cmd) {
case "details":
ret.workers = workers;
break;
default:
ret.workers = workerSummary(workers);
break;
}
cb(err, ret);
});
cb(err, ret);
});
};
module.exports.more = function(cb) {
cb(null, {
command: "workers [simple|details]",
info: "Returns list of active worker processes.",
"simple|details": "Defaults to 'simple'.",
"* simple": "Simple overview of running workers.",
"* details": "Full details of running workers."
});
cb(null, {
command: "workers [simple|details]",
info: "Returns list of active worker processes.",
"simple|details": "Defaults to 'simple'.",
"* simple": "Simple overview of running workers.",
"* details": "Full details of running workers."
});
};
function processDetails(workers, cb) {
var tasks = [];
for (var i = 0; i < workers.length; i++) {
var w = workers[i];
tasks.push(getProcessDetails(w));
}
async.parallel(tasks, function(err, results) {
cb(err, workers);
});
var tasks = [], i, w;
for (i = 0; i < workers.length; i++) {
w = workers[i];
tasks.push(getProcessDetails(w));
}
async.parallel(tasks, function(err, results) {
cb(err, workers);
});
}
function getProcessDetails(worker) {
return function(cb) {
worker.once("message", function(msg) {
if (!msg || !msg.processDetails) {
cb("processDetails not returned");
return; // end
}
return function(cb) {
worker.once("message", function(msg) {
if (!msg || !msg.processDetails) {
cb("processDetails not returned");
return; // end
}
worker.processDetails = msg.processDetails;
cb(null, worker);
});
// todo! timeout needed? perhaps.
worker.process.send({ cservice: "processDetails" });
};
worker.processDetails = msg.processDetails;
cb(null, worker);
});
// todo! timeout needed? perhaps.
worker.process.send({cservice: "processDetails"});
};
}
function workerSummary(workers) {
var ret = [];
for (var i = 0; i < workers.length; i++) {
var w = workers[i];
ret.push({
id: w.id,
pid: w.pid,
state: w.state,
worker: w.cservice.worker,
cwd: w.cservice.cwd,
process: w.processDetails
});
}
return ret;
var ret = [], i, w;
for (i = 0; i < workers.length; i++) {
w = workers[i];
ret.push({
id: w.id,
pid: w.pid,
state: w.state,
worker: w.cservice.worker,
cwd: w.cservice.cwd,
process: w.processDetails
});
}
return ret;
}
var _controls = {};
var levels = {
"remote": 10, // anyone with credentials can access
"local": 20, // anyone locally with credentials can access
"inproc": 30, // CLI of the master process
"disabled": 99 // disabled
"remote": 10, // anyone with credentials can access
"local": 20, // anyone locally with credentials can access
"inproc": 30, // CLI of the master process
"disabled": 99 // disabled
};
function setControls(controls){
_controls = {};
return addControls(controls);
function setControls(controls) {
_controls = {};
return addControls(controls);
}
function addControls(controls){
for(var control in controls){
if (levels[controls[control]] === undefined){
throw(controls[control] + " is not a valid control level.");
}
_controls[control] = levels[controls[control]];
}
return _controls;
function addControls(controls) {
var control;
for (control in controls) {
if (levels[controls[control]] === undefined) {
throw(controls[control] + " is not a valid control level.");
}
_controls[control] = levels[controls[control]];
}
return _controls;
}
function authorize(name, control){
if (_controls[name]){
return control >= _controls[name];
}
// We default to "remote" which is full access
return control >= levels.remote;
function authorize(name, control) {
if (_controls[name]) {
return control >= _controls[name];
}
// We default to "remote" which is full access
return control >= levels.remote;
}

@@ -32,0 +33,0 @@

@@ -1,41 +0,39 @@

var
os = require("os")
;
var os = require("os");
module.exports = exports = {
firstTime: true,
events: {},
workers: {},
state: 0, // 0-not running, 1-starting, 2-running
isAttached: false, // attached to CLI over REST
workerReady: false,
restartOnFailure: true,
options: {
host: "localhost",
port: 11987,
accessKey: undefined,
workers: undefined,
workerCount: os.cpus().length,
restartDelayMs: 100,
allowHttpGet: false, // useful for testing -- not safe for production use
restartsPerMinute: 10, // not yet supported
cli: true,
silent: false,
log: console.log,
error: console.error,
debug: console.debug,
json: false, // output as JSON
restartOnMemUsage: undefined,
restartOnUpTime: undefined,
colors: {
cservice: "grey",
success: "green",
error: "red",
data: "cyan",
warn: "yellow",
info: "magenta",
debug: "grey"
},
error: console.error
}
firstTime: true,
events: {},
workers: {},
state: 0, // 0-not running, 1-starting, 2-running
isAttached: false, // attached to CLI over REST
workerReady: undefined,
restartOnFailure: true,
options: {
host: "localhost",
port: 11987,
accessKey: undefined,
workers: undefined,
workerCount: os.cpus().length,
restartDelayMs: 100,
allowHttpGet: false, // useful for testing -- not safe for production use
restartsPerMinute: 10, // not yet supported
cli: false,
silent: false,
log: console.log,
error: console.error,
debug: console.debug,
json: false, // output as JSON
restartOnMemUsage: undefined,
restartOnUpTime: undefined,
commands: undefined,
colors: {
cservice: "grey",
success: "green",
error: "red",
data: "cyan",
warn: "yellow",
info: "magenta",
debug: "grey"
}
}
};

@@ -1,22 +0,26 @@

var
util = require("util"),
http = require("http"),
querystring = require("querystring"),
locals = null,
options = null,
cservice = require("../cluster-service")
;
var util = require("util"),
http = require("http"),
querystring = require("querystring"),
locals = null,
options = null,
cservice = require("../cluster-service");
exports.init = function(l, o) {
locals = l;
options = o;
locals = l;
options = o;
cservice.log("Service already running. Attached CLI to master service. Enter 'help [enter]' for help.".info);
cservice.log([
"Service already running. Attached CLI to master service.",
"Enter 'help [enter]' for help."
]
.join(' ')
.info
);
if (!options || options.silentMode !== true) {
process.stdin.resume();
process.stdin.setEncoding('utf8');
process.stdin.on("data", onCommand);
process.stdout.write("cservice> ".cservice);
}
if (!options || options.silentMode !== true) {
process.stdin.resume();
process.stdin.setEncoding('utf8');
process.stdin.on("data", onCommand);
process.stdout.write("cservice> ".cservice);
}
};

@@ -27,59 +31,74 @@

function onCommand(question, cb) {
question = question.replace(/[\r\n]/g, "");
var split = question.split(" ");
if (split[0] === "exit") {
cservice.log("Exiting CLI ONLY.".yellow);
process.kill(process.pid, "SIGKILL"); // exit by force
return;
}
var qs = querystring.stringify({
cmd: question,
accessKey: options.accessKey
});
var url = "http://" + (options.host || "localhost") + ":" + (options.port || 11987) + "/cli"
+ "?" + qs
;
cservice.log("Running remote command: ".warn + url.replace(/accessKey=.*/i, "accessKey={ACCESS_KEY}").data);
var body = "", err;
http.request(
{
host: options.host || "localhost",
port: options.port || 11987,
path: "/cli?" + qs,
method: "POST"
}, function(res) {
res.setEncoding('utf8');
res.on("data", function(chunk) {
body += chunk;
});
res.on("end", function() {
if (res.statusCode !== 200 && body) {
err = body;
}
onCallback(err, body, res, cb);
});
var split, qs, url, body, err;
question = question.replace(/[\r\n]/g, "");
split = question.split(" ");
if (split[0] === "exit") {
cservice.log("Exiting CLI ONLY.".yellow);
process.kill(process.pid, "SIGKILL"); // exit by force
return;
}
qs = querystring.stringify({
cmd: question,
accessKey: options.accessKey
});
url = "http://"
+ (options.host || "localhost")
+ ":"
+ (options.port || 11987)
+ "/cli"
+ "?"
+ qs;
cservice.log(
"Running remote command: ".warn
+ url.replace(/accessKey=.*/i, "accessKey={ACCESS_KEY}").data
);
body = "";
http.request(
{
host: options.host || "localhost",
port: options.port || 11987,
path: "/cli?" + qs,
method: "POST"
}
, function(res) {
res.setEncoding('utf8');
res.on("data", function(chunk) {
body += chunk;
});
res.on("end", function() {
if (res.statusCode !== 200 && body) {
err = body;
}
).on("error", function(err) {
body = err;
onCallback(err, body, null, cb);
}).end();
onCallback(err, body, res, cb);
});
}
).on("error", function(err) {
body = err;
onCallback(err, body, null, cb);
}
).end();
}
function onCallback(err, result, res, cb) {
if (err) {
cservice.error("Error: ", err);
result = { statusCode: res ? res.statusCode : "unknown", error: err };
} else if (result) {
if (typeof result === "string" && (result.indexOf("{") === 0 || result.indexOf("[") === 0)) {
result = JSON.parse(result); // deserialize
}
}
cservice.log(util.inspect(result, { depth: null, colors: true }));
if (err) {
cservice.error("Error: ", err);
result = {statusCode: res ? res.statusCode : "unknown", error: err};
} else if (result) {
if (
typeof result === "string"
&& (result.indexOf("{") === 0 || result.indexOf("[") === 0)
) {
result = JSON.parse(result); // deserialize
}
}
cservice.log(util.inspect(result, {depth: null, colors: true}));
if (!options || options.silentMode !== true) {
//cservice.log("");//newline
process.stdout.write("cservice> ".cservice);
}
cb && cb(err, result);
if (!options || options.silentMode !== true) {
//cservice.log("");//newline
process.stdout.write("cservice> ".cservice);
}
if(cb){
cb(err, result);
}
}

@@ -1,134 +0,165 @@

var
cservice = require("../cluster-service"),
util = require("util"),
http = require("http"),
https = require("https"),
querystring = require("querystring"),
control = require("./control"),
locals = {},
options = null,
server = null
;
var cservice = require("../cluster-service"),
util = require("util"),
http = require("http"),
https = require("https"),
querystring = require("querystring"),
control = require("./control"),
locals = {},
options = null,
server = null;
exports.init = function(l, o, cb) {
locals = l;
options = o;
locals = l;
options = o;
if (options.ssl) { // HTTPS
server = locals.http = https.createServer(options.ssl, processRequest);
} else { // HTTP
server = locals.http = http.createServer(processRequest);
}
server.on("error", cb);
if (options.ssl) { // HTTPS
server = locals.http = https.createServer(options.ssl, processRequest);
} else { // HTTP
server = locals.http = http.createServer(processRequest);
}
server.listen(options.port, options.host, cb);
server.on("error", cb);
server.listen(options.port, options.host, cb);
};
exports.close = function() {
try {
server.close();
} catch (ex) {
}
try {
server.close();
} catch (ex) {
}
};
function processRequest(req, res) {
try {
cservice.log("API: " + req.url.replace(/accessKey=.*/i, "accessKey={ACCESS_KEY}").data);
var qsIdx, qs, question;
try {
cservice.log(
"API: "
+ req.url.replace(/accessKey=.*/i, "accessKey={ACCESS_KEY}").data
);
if (req.url.indexOf("/cli?") !== 0) {
res.writeHead(404);
res.end("Page Not Found");
return;
}
if (req.method !== "POST" && cservice.options.allowHttpGet !== true) {
res.writeHead(405);
res.end("Method Not Allowed");
return;
}
if (req.url.indexOf("/cli?") !== 0) {
res.writeHead(404);
res.end("Page Not Found");
return;
}
var qs_idx = req.url.indexOf("?");
var qs = querystring.parse(req.url.substr(qs_idx + 1));
if (!qs.accessKey || qs.accessKey !== options.accessKey) {
res.writeHead(401);
res.end("Not authorized");
return;
}
var question = qs.cmd || "?";
if (req.method !== "POST" && cservice.options.allowHttpGet !== true) {
res.writeHead(405);
res.end("Method Not Allowed");
return;
}
onCommand(req, res, question);
} catch (ex) {
cservice.error("Woops, an ERROR!".error, util.inspect(ex, { depth: null }), util.inspect(ex.stack || new Error().stack, { depth: null }));
}
qsIdx = req.url.indexOf("?");
qs = querystring.parse(req.url.substr(qsIdx + 1));
if (!qs.accessKey || qs.accessKey !== options.accessKey) {
res.writeHead(401);
res.end("Not authorized");
return;
}
question = qs.cmd || "?";
onCommand(req, res, question);
} catch (ex) {
cservice.error(
"Woops, an ERROR!".error,
util.inspect(ex, {depth: null}),
util.inspect(ex.stack || new Error().stack, {depth: null})
);
}
}
function onCommand(req, res, question) {
var args = require("./util").getArgsFromQuestion(question, " ");
args = [args[0], function(err, result) { onCallback(req, res, err, result); }].concat(args.slice(1));
var args = require("./util").getArgsFromQuestion(question, " ");
var controlLevel;
var isAuthorized;
if (!locals.events[args[0]]) {
cservice.error("Command " + args[0].cyan + " not found".error);
res.writeHead(404);
res.end("Not found. Try /help");
return;
}
args = [args[0], function(err, result) {
onCallback(req, res, err, result);
}].concat(args.slice(1));
var controlLevel = control.levels.remote;
if (req.connection.remoteAddress === "127.0.0.1"){
controlLevel = control.levels.local;
}
if (!locals.events[args[0]]) {
cservice.error("Command " + args[0].cyan + " not found".error);
res.writeHead(404);
res.end("Not found. Try /help");
return;
}
var isAuthorized = control.authorize(args[0], controlLevel);
controlLevel = control.levels.remote;
if (req.connection.remoteAddress === "127.0.0.1") {
controlLevel = control.levels.local;
}
if (!isAuthorized){
res.writeHead(401);
res.end("Not authorized to execute '" + args[0] + "' remotely");
return;
}
isAuthorized = control.authorize(args[0], controlLevel);
try {
cservice.trigger.apply(null, args);
} catch(ex) {
res.writeHead(400);
res.end(JSON.stringify({ex: ex, stack: ex.stack || new Error().stack, more: "Error. Try /help"}));
}
if (!isAuthorized) {
res.writeHead(401);
res.end("Not authorized to execute '" + args[0] + "' remotely");
return;
}
try {
cservice.trigger.apply(null, args);
} catch (ex) {
res.writeHead(400);
res.end(JSON.stringify(
{
ex: ex,
stack: ex.stack || new Error().stack,
more: "Error. Try /help"
}
));
}
}
function onCallback(req, res, err, result) {
try {
delete locals.reason;
if (err) { // should do nothing if response already sent
res.writeHead(400);
res.end(err);
} else {
if (result) {
var body;
try
{
var body = JSON.stringify(result, function (key, val) {
if (key[0] === "_") {
return undefined;
} else {
return val;
}
});
res.writeHead(200, { "Content-Type": "text/json; charset=UTF-8", "Content-Length": body.length });
res.end(body);
} catch (ex) {
err = util.inspect(ex, {depth:null});
res.writeHead(400);
res.end(JSON.stringify({ error: err }));
cservice.error("Woops, an ERROR!".error, err, util.inspect(ex.stack || new Error().stack, { depth: null }));
}
} else {
res.writeHead(200);
res.end("No data");
}
}
} catch (ex) {
cservice.error("Woops, an ERROR!".error, util.inspect(ex, { depth: null }), util.inspect(ex.stack || new Error().stack, { depth: null }));
}
var body;
try {
delete locals.reason;
if (err) { // should do nothing if response already sent
res.writeHead(400);
res.end(err);
} else {
if (result) {
try
{
body = JSON.stringify(result, function(key, val) {
if (key[0] === "_") {
return undefined;
} else {
return val;
}
});
res.writeHead(
200,
{
"Content-Type": "text/json; charset=UTF-8",
"Content-Length": body.length
}
);
res.end(body);
} catch (ex) {
err = util.inspect(ex, {depth: null});
res.writeHead(400);
res.end(JSON.stringify({error: err}));
cservice.error(
"Woops, an ERROR!".error,
err,
util.inspect(ex.stack || new Error().stack, {depth: null})
);
}
} else {
res.writeHead(200);
res.end("No data");
}
}
} catch (ex) {
cservice.error(
"Woops, an ERROR!".error,
util.inspect(ex, {depth: null}),
util.inspect(ex.stack || new Error().stack, {depth: null})
);
}
}

@@ -1,4 +0,2 @@

var
cservice = require("../cluster-service")
;
var cservice = require("../cluster-service");

@@ -8,9 +6,11 @@ module.exports = exports = legacySupport;

function legacySupport(options) {
if (options.worker) {
cservice.log("Option `worker` has been deprecated. Use `workers` instead.".warn);
options.workers = options.worker;
delete options.worker;
}
if (options.worker) {
cservice.log(
"Option `worker` has been deprecated. Use `workers` instead.".warn
);
options.workers = options.worker;
delete options.worker;
}
}
legacySupport(cservice.options);

@@ -1,8 +0,8 @@

var
cservice = require("../cluster-service"),
cluster = require("cluster"),
httpserver = require("./http-server"),
async = require("async"),
startRequests = [] // queued start requests
;
/* jshint loopfunc:true */
var cservice = require("../cluster-service"),
cluster = require("cluster"),
httpserver = require("./http-server"),
async = require("async"),
path = require("path"),
startRequests = []; // queued start requests

@@ -12,157 +12,208 @@ exports.start = startMaster;

function startMaster(options, cb) {
options = options || {};
options.workerCount = options.workerCount || 1;
var workersRemaining;
var workersForked;
var workers;
var i;
var workerName;
var worker;
var workerCount;
if (cservice.locals.state === 0) { // one-time initializers
cservice.locals.state = 1; // starting
require("./commands/version")({}, function(err, ver) {
cservice.log("cluster-service v".info + ver.data + " starting...".info);
});
/*process.on("uncaughtException", function(err) {
cservice.log("uncaughtException", util.inspect(err));
});*/
// queue up our request
startRequests.push(function() {
startMaster(options, cb);
});
startListener(options, function(err) {
if (err) {
cservice.locals.isAttached = true;
options = options || {};
options.workerCount = options.workerCount || 1;
// start the http client
require("./http-client").init(cservice.locals, options);
} else { // we're the single-master
cservice.locals.isAttached = false;
if (cservice.locals.state === 0) { // one-time initializers
cservice.locals.state = 1; // starting
cluster.setupMaster({ silent: (options.silent === true) });
cluster.on("online", function(worker) {
cservice.trigger("workerStart", worker);
});
require("./commands/version")({}, function(err, ver) {
cservice.log("cluster-service v".info + ver.data + " starting...".info);
});
cluster.on("exit", function(worker, code, signal) {
cservice.trigger("workerExit", worker);
/*process.on("uncaughtException", function(err) {
cservice.log("uncaughtException", util.inspect(err));
});*/
// do not restart if there is a reason, or disabled
if (typeof (cservice.locals.reason) === "undefined" && worker.suicide !== true && cservice.locals.restartOnFailure === true) {
setTimeout(function() {
// lets replace lost worker.
cservice.newWorker(worker.cservice);
}, options.restartDelayMs);
}
});
// queue up our request
startRequests.push(function() {
startMaster(options, cb);
});
// start monitor
monitorWorkers();
startListener(options, function(err) {
var i;
if (err) {
cservice.locals.isAttached = true;
if (options.cli === true) {
// wire-up CLI
require("./cli").init(cservice.locals, options);
}
}
// start the http client
require("./http-client").init(cservice.locals, options);
} else { // we're the single-master
cservice.locals.isAttached = false;
cservice.locals.state = 2; // running
cluster.setupMaster({silent: (options.silent === true)});
// now that listener is ready, process queued start requests
for (var i = 0; i < startRequests.length; i++) {
startRequests[i](); // execute
}
startRequests = [];
});
} else if (cservice.locals.state === 1) { // if still starting, queue requests
startRequests.push(function() {
startMaster(options, cb);
});
} else if (cservice.locals.isAttached === false && options.workers !== null) { // if we're NOT attached, we can spawn the workers now
// fork it, i'm out of here
var workersRemaining = 0;
var workersForked = 0;
var workers = typeof options.workers === "string" ? { main: { worker: options.workers } } : options.workers;
var i;
for (var workerName in workers) {
var worker = workers[workerName];
var workerCount = worker.count || options.workerCount;
workersRemaining += workerCount;
workersForked += workerCount;
for (i = 0; i < workerCount; i++) {
cservice.newWorker(worker, function(err) {
workersRemaining--;
if (err) {
workersRemaining = 0; // callback now
}
if (workersRemaining === 0) {
cb && cb(err);
}
});
}
}
if (workersForked === 0) { // if no forking took place, make sure cb is invoked
cb && cb();
}
} else { // nothing else to do
cb && cb();
}
};
cluster.on("online", function(worker) {
cservice.trigger("workerStart", worker);
});
cluster.on("exit", function(worker, code, signal) {
cservice.trigger("workerExit", worker);
// do not restart if there is a reason, or disabled
if (
typeof (cservice.locals.reason) === "undefined"
&& worker.suicide !== true
&& cservice.locals.restartOnFailure === true
) {
setTimeout(function() {
// lets replace lost worker.
cservice.newWorker(worker.cservice);
}, options.restartDelayMs);
}
});
// start monitor
monitorWorkers();
if (options.cli === true) {
// wire-up CLI
require("./cli").init(cservice.locals, options);
}
}
cservice.locals.state = 2; // running
// now that listener is ready, process queued start requests
for (i = 0; i < startRequests.length; i++) {
startRequests[i](); // execute
}
startRequests = [];
});
} else if (cservice.locals.state === 1) { // if still starting, queue requests
startRequests.push(function() {
startMaster(options, cb);
});
// if we're NOT attached, we can spawn the workers now
} else if (cservice.locals.isAttached === false && options.workers !== null) {
// fork it, i'm out of here
workersRemaining = 0;
workersForked = 0;
workers = typeof options.workers === "string"
? {main: {worker: options.workers}}
: options.workers;
for (workerName in workers) {
worker = workers[workerName];
workerCount = worker.count || options.workerCount;
workersRemaining += workerCount;
workersForked += workerCount;
for (i = 0; i < workerCount; i++) {
cservice.newWorker(worker, function(err) {
workersRemaining--;
if (err) {
workersRemaining = 0; // callback now
}
if (workersRemaining === 0) {
if (typeof options.master === "string") {
require(path.resolve(options.master));
}
if(cb){
cb(err);
}
}
});
}
}
// if no forking took place, make sure cb is invoked
if (workersForked === 0) {
if(cb){
cb();
}
}
} else { // nothing else to do
if(cb){
cb();
}
}
}
function startListener(options, cb) {
if (typeof options.accessKey !== "string") { // in-proc mode only
cservice.log("LOCAL ONLY MODE. Run with 'accessKey' option to enable communication channel.".info);
cb();
return;
}
httpserver.init(cservice.locals, options, function(err) {
if (!err) {
cservice.log(("Listening at " + ((options.ssl ? "https://" : "http://") + options.host + ":" + options.port + "/cli").data).info);
}
if (typeof options.accessKey !== "string") { // in-proc mode only
cservice.log(
[
"LOCAL ONLY MODE. Run with 'accessKey' option to enable communication",
"channel."
]
.join(' ')
.info
);
cb();
return;
}
cb(err);
});
httpserver.init(cservice.locals, options, function(err) {
if (!err) {
cservice.log(
("Listening at "
+ (
(options.ssl ? "https://" : "http://")
+ options.host
+ ":"
+ options.port
+ "/cli"
)
.data
)
.info
);
}
cb(err);
});
}
function monitorWorkers() {
if (cservice.options.restartOnMemUsage || cservice.options.restartOnUpTime) {
setTimeout(onMonitorWorkers, 20000).unref(); // do not hold server open
}
if (cservice.options.restartOnMemUsage || cservice.options.restartOnUpTime) {
setTimeout(onMonitorWorkers, 20000).unref(); // do not hold server open
}
}
function onMonitorWorkers() {
cservice.trigger("workers", function(err, results) {
if (err || !results || !results.workers) {
// nothing we can do about it at this time
setTimeout(onMonitorWorkers, 60000).unref(); // do not hold server open
return;
}
var workers = results.workers;
var restarts = [];
var memUsage = cservice.options.restartOnMemUsage;
var upTime = cservice.options.restartOnUpTime;
for (var i = 0; i < workers.length; i++) {
var w = workers[i];
if (
(memUsage && w.process.memoryUsage.rss > memUsage)
||
(upTime && w.process.uptime > upTime)
) {
restarts.push(getWorkerToRestart(w));
}
}
if (restarts.length > 0) {
async.series(restarts, function(err, results) {
setTimeout(onMonitorWorkers, 20000).unref(); // do not hold server open
});
} else {
setTimeout(onMonitorWorkers, 30000).unref(); // do not hold server open
}
}, "simple");
cservice.trigger("workers", function(err, results) {
var workers;
var restarts;
var memUsage;
var upTime;
var i;
var w;
if (err || !results || !results.workers) {
// nothing we can do about it at this time
setTimeout(onMonitorWorkers, 60000).unref(); // do not hold server open
return;
}
workers = results.workers;
restarts = [];
memUsage = cservice.options.restartOnMemUsage;
upTime = cservice.options.restartOnUpTime;
for (i = 0; i < workers.length; i++) {
w = workers[i];
if (
(memUsage && w.process.memoryUsage.rss > memUsage)
||
(upTime && w.process.uptime > upTime)
) {
restarts.push(getWorkerToRestart(w));
}
}
if (restarts.length > 0) {
async.series(restarts, function(err, results) {
setTimeout(onMonitorWorkers, 20000).unref(); // do not hold server open
});
} else {
setTimeout(onMonitorWorkers, 30000).unref(); // do not hold server open
}
}, "simple");
}
function getWorkerToRestart(worker) {
return function(cb) {
cservice.trigger("restart", cb, worker.pid);
};
return function(cb) {
cservice.trigger("restart", cb, worker.pid);
};
}

@@ -1,8 +0,6 @@

var
cservice = require("../cluster-service"),
cluster = require("cluster"),
path = require("path"),
fs = require("fs"),
extend = require("extend")
;
var cservice = require("../cluster-service"),
cluster = require("cluster"),
path = require("path"),
fs = require("fs"),
extend = require("extend");

@@ -12,49 +10,66 @@ module.exports = exports = newWorker;

function newWorker(options, cb) {
options = extend(true, {}, {
worker: "./worker.js",
ready: true,
count: undefined,
restart: true,
cwd: undefined,
onStop: false
}, options);
if (options.worker.indexOf(".") === 0 || (options.worker.indexOf("//") !== 0 && options.worker.indexOf(":\\") < 0)) {
// resolve if not absolute
options.worker = path.resolve(options.worker);
}
if (fs.existsSync(options.worker) === false && fs.existsSync(options.worker + ".js") === false) {
cb && cb("Worker not found: '" + options.worker + "'. Set 'workers' option to proper path.");
return null;
}
options.cwd = options.cwd || process.cwd();
options.onReady = cb;
if (options.wasReady === false) {
options.ready = false; // preserve preference between restarts, etc
}
var worker = cluster.fork(options);
worker.cservice = options;
worker.on("message", onMessageFromWorker);
if (worker.cservice.ready === true && typeof cb === "function") {
cb(null, worker);
}
return worker;
};
var worker;
options = extend(true, {}, {
worker: "./worker.js",
ready: false,
count: undefined,
restart: true,
cwd: undefined,
onStop: false
}, options);
if (
options.worker.indexOf(".") === 0
|| (options.worker.indexOf("//") !== 0
&& options.worker.indexOf(":\\") < 0)
) {
// resolve if not absolute
options.worker = path.resolve(options.worker);
}
if (
fs.existsSync(options.worker) === false
&& fs.existsSync(options.worker + ".js") === false
) {
if(cb){
cb(
"Worker not found: '"
+ options.worker
+ "'. Set 'workers' option to proper path."
);
}
return null;
}
options.cwd = options.cwd || process.cwd();
options.onReady = cb;
if (options.wasReady === false) {
options.ready = false; // preserve preference between restarts, etc
}
worker = cluster.fork(options);
worker.cservice = options;
worker.on("message", onMessageFromWorker);
if (options.ready === true && typeof cb === "function") {
cb(null, worker);
}
return worker;
}
function onMessageFromWorker(msg) {
var worker = this;
if (!msg || !msg.cservice || !msg.cservice.cmd) {
return; // ignore invalid cluster-service messages
}
switch (msg.cservice.cmd) {
case "workerReady":
if (worker.cservice.ready === false) {
worker.cservice.wasReady = false; // preserve preference between restarts, etc
worker.cservice.ready = true;
worker.cservice.onStop = (msg.cservice.onStop === true);
worker.cservice.onReady && worker.cservice.onReady(null, worker);
}
break;
};
var worker = this;
if (!msg || !msg.cservice || !msg.cservice.cmd) {
return; // ignore invalid cluster-service messages
}
switch (msg.cservice.cmd) {
case "workerReady":
if (worker.cservice.ready === false) {
// preserve preference between restarts, etc
worker.cservice.wasReady = false;
worker.cservice.ready = true;
worker.cservice.onStop = (msg.cservice.onStop === true);
if(typeof worker.cservice.onReady === "function"){
worker.cservice.onReady(null, worker);
}
}
break;
}
}

@@ -1,70 +0,89 @@

var
util = require("util"),
http = require("http"),
querystring = require("querystring"),
options = null,
cservice = require("../cluster-service")
;
var util = require("util"),
http = require("http"),
querystring = require("querystring"),
options = null,
cservice = require("../cluster-service");
exports.start = function(o, cb) {
options = o;
var cmd;
options = o;
var cmd = options.run;
delete options.run;
run(cmd, cb);
cmd = options.run;
delete options.run;
run(cmd, cb);
};
function run(question, cb) {
var split = question.split(" ");
var qs = querystring.stringify({
cmd: question,
accessKey: options.accessKey
});
var url = "http://" + (options.host || "localhost") + ":" + (options.port || 11987) + "/cli"
+ "?" + qs
;
cservice.log("Running remote command: ".warn + url.replace(/accessKey=.*/i, "accessKey={ACCESS_KEY}").data);
var body = "", err;
http.request(
{
host: options.host || "localhost",
port: options.port || 11987,
path: "/cli?" + qs,
method: "POST"
}, function(res) {
res.setEncoding('utf8');
res.on("data", function(chunk) {
body += chunk;
});
res.on("end", function() {
if (res.statusCode !== 200 && body) {
err = body;
}
if (err) {
cservice.error("Error: ", err);
body = { statusCode: res ? res.statusCode : "no response", error: err };
} else if (typeof body === "string" && (body.indexOf("{") === 0 || body.indexOf("[") === 0)) {
body = JSON.parse(body); // deserialize
}
if (options.json === true) {
cservice.results(JSON.stringify(body));
} else {
cservice.results(util.inspect(body, { depth: null, colors: true }));
}
cb && cb(err, body);
});
var qs = querystring.stringify({
cmd: question,
accessKey: options.accessKey
});
var body = "", err;
var url = "http://"
+ (options.host || "localhost")
+ ":"
+ (options.port || 11987)
+ "/cli"
+ "?" + qs
;
cservice.log(
"Running remote command: ".warn
+ url.replace(/accessKey=.*/i, "accessKey={ACCESS_KEY}").data
);
http.request({
host: options.host || "localhost",
port: options.port || 11987,
path: "/cli?" + qs,
method: "POST"
}
, function(res) {
res.setEncoding('utf8');
res.on("data", function(chunk) {
body += chunk;
});
res.on("end", function() {
if (res.statusCode !== 200 && body) {
err = body;
}
).on("error", function(err) {
body = err;
if (err) {
cservice.error("Error: ", err);
body = {statusCode: res ? res.statusCode : "no response", error: err};
} else if (
typeof body === "string"
&& (
body.indexOf("{") === 0
|| body.indexOf("[") === 0
)
) {
body = JSON.parse(body); // deserialize
}
if (options.json === true) {
cservice.results(JSON.stringify(body));
} else {
cservice.results(util.inspect(body, {depth: null, colors: true}));
}
if (options.json === true) {
cservice.results(JSON.stringify(body));
} else {
cservice.results(util.inspect(body, { depth: null, colors: true }));
}
if (cb) {
cb(err, body);
}
});
}
).on(
"error"
, function(err) {
body = err;
cb && cb(err, body);
}).end();
if (options.json === true) {
cservice.results(JSON.stringify(body));
} else {
cservice.results(util.inspect(body, {depth: null, colors: true}));
}
if (cb) {
cb(err, body);
}
}
)
.end();
}

@@ -1,9 +0,7 @@

var
cservice = require("../cluster-service"),
cluster = require("cluster"),
fs = require("fs"),
path = require("path"),
colors = require("colors"),
extend = require("extend")
;
var cservice = require("../cluster-service"),
cluster = require("cluster"),
fs = require("fs"),
path = require("path"),
colors = require("colors"),
extend = require("extend");

@@ -15,72 +13,87 @@ module.exports = exports = start;

function start(options, masterCb) {
if (cluster.isWorker === true) {
// ignore starts if not master. do NOT invoke masterCb, as that is reserved for master callback
return;
}
var argv;
if (cluster.isWorker === true) {
// ignore starts if not master. do NOT invoke masterCb, as that is
// reserved for master callback
if (arguments.length === 0) {
var argv = require("optimist").argv;
return;
}
options = argv; // use command-line arguments instead
prepArgs(options);
masterCb = masterCallback;
}
options = options || {};
if ("config" in options) {
options = JSON.parse(fs.readFileSync(options.config));
}
cservice.locals.options = extend(true, cservice.locals.options, options);
if ("workers" in options) { // overwrite workers if provided
cservice.locals.options.workers = options.workers;
}
options = cservice.locals.options;
if (typeof options.workers === "undefined") {
// only define default worker if worker is undefined (null is reserved for "no worker")
options.workers = "./worker.js"; // default worker to execute
}
if (typeof options.workers === "string") {
options.workers = { main: { worker: options.workers, ready: !(options.ready == 'false') } };
delete options.ready;
}
colors.setTheme(options.colors);
if (arguments.length === 0) {
argv = require("optimist").argv;
require("./legacy");
if (options.run) {
require("./run").start(options, function(err, result) {
if (masterCb && masterCb(err, result) === false) {
return; // do not exit if cb returns false
}
process.exit(1); // graceful exit
});
} else {
require("./master").start(options, masterCb);
}
options = argv; // use command-line arguments instead
if (!("cli" in options)) {
options.cli = true; // auto-enable cli if run from command-line
}
prepArgs(options);
masterCb = masterCallback;
}
options = options || {};
if ("config" in options) {
options = JSON.parse(fs.readFileSync(options.config));
}
cservice.locals.options = extend(true, cservice.locals.options, options);
if ("workers" in options) { // overwrite workers if provided
cservice.locals.options.workers = options.workers;
}
options = cservice.locals.options;
if (typeof options.workers === "undefined") {
// only define default worker if worker is undefined (null is reserved
// for "no worker")
options.workers = "./worker.js"; // default worker to execute
}
if (typeof options.workers === "string") {
options.workers = {
main: {
worker: options.workers,
ready: options.ready !== 'false'
}
};
delete options.ready;
}
if (options.commands) {
cservice.registerCommands(options.commands);
}
colors.setTheme(options.colors);
require("./legacy");
if (options.run) {
require("./run").start(options, function(err, result) {
if (masterCb && masterCb(err, result) === false) {
return; // do not exit if cb returns false
}
process.exit(1); // graceful exit
});
} else {
require("./master").start(options, masterCb);
}
}
function masterCallback(err) {
if (err) {
cservice.error(err);
cservice.log("Startup failed, exiting...".warn);
process.exit(1); // graceful exit
}
if (err) {
cservice.error(err);
cservice.log("Startup failed, exiting...".warn);
process.exit(1); // graceful exit
}
}
function prepArgs(options) {
if (options._ && options._.length > 0) {
var ext = path.extname(options._[0]).toLowerCase();
if (ext === ".js") { // if js file, use as worker
options.workers = options._[0]
} else if (ext === ".json") { // if json file, use as config
options.config = options._[0];
} else { // otherwise assume it is a command to execute
options.run = options._[0];
if (options.json === true) {
options.cli = false;
}
}
}
var ext;
if (options._ && options._.length > 0) {
ext = path.extname(options._[0]).toLowerCase();
if (ext === ".js") { // if js file, use as worker
options.workers = options._[0];
} else if (ext === ".json") { // if json file, use as config
options.config = options._[0];
} else { // otherwise assume it is a command to execute
options.run = options._[0];
if (options.json === true) {
options.cli = false;
}
}
}
}

@@ -1,4 +0,2 @@

var
cservice = require("../cluster-service")
;
var cservice = require("../cluster-service");

@@ -8,25 +6,31 @@ module.exports = exports = stop;

function stop(timeout, cb) {
if (cservice.locals.state === 0) {
cb && cb();
return;
}
if (cservice.locals.state === 0) {
if(cb){
cb();
}
return;
}
if (cservice.workers.length > 0) { // issue shutdown
cservice.trigger("shutdown", function() {
require("./http-server").close();
if (cservice.options.cli === true) {
process.exit(1);
} else {
cb && cb();
}
}, "all", timeout);
} else { // gracefully shutdown
require("./http-server").close();
cservice.locals.state = 0;
if (cservice.options.cli === true) {
process.exit(1);
} else {
cb && cb();
}
}
if (cservice.workers.length > 0) { // issue shutdown
cservice.trigger("shutdown", function() {
require("./http-server").close();
if (cservice.options.cli === true) {
process.exit(1);
} else {
if(cb){
cb();
}
}
}, "all", timeout);
} else { // gracefully shutdown
require("./http-server").close();
cservice.locals.state = 0;
if (cservice.options.cli === true) {
process.exit(1);
} else {
if(cb){
cb();
}
}
}
}

@@ -1,4 +0,2 @@

var
cservice = require("../cluster-service")
;
var cservice = require("../cluster-service");

@@ -8,20 +6,23 @@ module.exports = exports = trigger;

function trigger(eventName) {
var evt = cservice.locals.events[eventName];
if (!evt) {
if (typeof arguments[1] === "function") { // invoke callback if provided instead of throwing
arguments[1]("Event " + eventName + " not found");
} else {
throw new Error("Event " + eventName + " not found");
}
}
var evt = cservice.locals.events[eventName];
var args;
var i;
if (!evt) {
// invoke callback if provided instead of throwing
if (typeof arguments[1] === "function") {
arguments[1]("Event " + eventName + " not found");
} else {
throw new Error("Event " + eventName + " not found");
}
}
var args = [evt]; // event is always first arg
if (arguments.length > 1) { // grab custom args
for (var i = 1; i < arguments.length; i++) {
args.push(arguments[i]);
}
}
args = [evt]; // event is always first arg
if (arguments.length > 1) { // grab custom args
for (i = 1; i < arguments.length; i++) {
args.push(arguments[i]);
}
}
//exports.log("trigger." + eventName + ".args=" + args.length);
// invoke event callback
return evt.cb.apply(null, args);
// invoke event callback
return evt.cb.apply(null, args);
}

@@ -1,12 +0,10 @@

var
cservice = require("../cluster-service")
;
var cservice = require("../cluster-service");
/*
* question - Question to split apart.
Ex: prop1 "prop #2" { "prop": 3 } [ "prop #4" ] 5
Ex: prop1 "prop #2" { "prop": 3 } [ "prop #4" ] 5
* delimiter - What splits the properties? Can be one or more characters.
* return - An array of arguments.
Ex: [ "prop1", "prop #2", { "prop": 3 }, [ "prop #4" ], 5 ]
*/
Ex: [ "prop1", "prop #2", { "prop": 3 }, [ "prop #4" ], 5 ]
*/
exports.getArgsFromQuestion = getArgsFromQuestion;

@@ -19,161 +17,188 @@ exports.debug = debug;

function debug() {
if (cservice.options.cli === true && cservice.options.debug) {
process.stdout.clearLine && process.stdout.clearLine();
process.stdout.cursorTo && process.stdout.cursorTo(0);
var args;
var i;
if (cservice.options.cli === true && cservice.options.debug) {
if(process.stdout.clearLine){
process.stdout.clearLine();
}
if(process.stdout.cursorTo){
process.stdout.cursorTo(0);
}
var args = Array.prototype.slice.call(arguments);
for (var i = 0; i < args.length; i++) {
if (typeof args[i] === "string") {
args[i] = args[i].debug;
}
}
if (args.length > 0 && typeof args[0] === "string" && args[0][0] === "{") {
cservice.options.debug("cservice:".cservice);
} else {
args = ["cservice: ".cservice].concat(args);
}
cservice.options.debug.apply(this, args);
args = Array.prototype.slice.call(arguments);
for (i = 0; i < args.length; i++) {
if (typeof args[i] === "string") {
args[i] = args[i].debug;
}
}
};
if (args.length > 0 && typeof args[0] === "string" && args[0][0] === "{") {
cservice.options.debug("cservice:".cservice);
} else {
args = ["cservice: ".cservice].concat(args);
}
cservice.options.debug.apply(this, args);
}
}
function log() {
if (cservice.options.cli === true && cservice.options.log) {
process.stdout.clearLine && process.stdout.clearLine();
process.stdout.cursorTo && process.stdout.cursorTo(0);
var args;
if (cservice.options.cli === true && cservice.options.log) {
if(process.stdout.clearLine){
process.stdout.clearLine();
}
if(process.stdout.cursorTo){
process.stdout.cursorTo(0);
}
var args = Array.prototype.slice.call(arguments);
if (args.length > 0 && typeof args[0] === "string" && args[0][0] === "{") {
cservice.options.log("cservice:".cservice);
} else {
args = ["cservice: ".cservice].concat(args);
}
cservice.options.log.apply(this, args);
}
};
args = Array.prototype.slice.call(arguments);
if (args.length > 0 && typeof args[0] === "string" && args[0][0] === "{") {
cservice.options.log("cservice:".cservice);
} else {
args = ["cservice: ".cservice].concat(args);
}
cservice.options.log.apply(this, args);
}
}
function error() {
if (cservice.options.cli === true && cservice.options.error) {
process.stdout.clearLine && process.stdout.clearLine();
process.stdout.cursorTo && process.stdout.cursorTo(0);
var args;
var i;
if (cservice.options.cli === true && cservice.options.error) {
if(process.stdout.clearLine){
process.stdout.clearLine();
}
if(process.stdout.cursorTo){
process.stdout.cursorTo(0);
}
var args = Array.prototype.slice.call(arguments);
for (var i = 0; i < args.length; i++) {
if (typeof args[i] === "string") {
args[i] = args[i].error;
}
}
if (args.length > 0 && typeof args[0] === "string" && args[0][0] === "{") {
cservice.options.error("cservice:".cservice);
} else {
args = ["cservice: ".cservice].concat(args);
}
cservice.options.error.apply(this, args);
}
};
args = Array.prototype.slice.call(arguments);
for (i = 0; i < args.length; i++) {
if (typeof args[i] === "string") {
args[i] = args[i].error;
}
}
if (args.length > 0 && typeof args[0] === "string" && args[0][0] === "{") {
cservice.options.error("cservice:".cservice);
} else {
args = ["cservice: ".cservice].concat(args);
}
cservice.options.error.apply(this, args);
}
}
function results() {
cservice.options.log &&
cservice.options.log.apply(this, arguments);
};
if(cservice.options.log){
cservice.options.log.apply(this, arguments);
}
}
function getArgsFromQuestion(question, delimiter) {
// OLD WAY - simply breaks args by delimiter
//var split = question.split(" ");
//var args = [split[0], onCallback].concat(split.slice(1));
// parser needs to be smarter, to account for various data types:
// single word strings: hello
// phrases: "hello world"
// numbers: 1 or 1.3
// JSON: [] or { "a": { "b": "hello \"world\"" } }
var arg = [], args = [], stringOpen = false, jsonLevel = 0, arrayLevel = 0, i, isDelim, c, cprev, cnext;
// OLD WAY - simply breaks args by delimiter
//var split = question.split(" ");
//var args = [split[0], onCallback].concat(split.slice(1));
for (i = 0; i < question.length; i++) {
cprev = i > 0 ? question[i - 1] : "";
c = question[i];
cnext = (i < question.length - 1) ? question[i + 1] : "";
isDelim = (c === delimiter);
if (stringOpen === true) { // processing quotted string
if (c === "\"" && cprev !== "\\") { // closer
// close string
stringOpen = false;
// add string arg, even if empty
args.push(getArgFromValue(arg.join("")));
// reset arg
arg = [];
} else { // just another char
arg.push(c);
}
} else if (jsonLevel > 0) { // processing JSON object
if (c === "}" && cprev !== "\\") { // closer
jsonLevel--;
} else if (c === "{" && cprev !== "\\") { // opener
jsonLevel++;
}
// parser needs to be smarter, to account for various data types:
// single word strings: hello
// phrases: "hello world"
// numbers: 1 or 1.3
// JSON: [] or { "a": { "b": "hello \"world\"" } }
var arg = []
, args = []
, stringOpen = false
, jsonLevel = 0
, arrayLevel = 0
, i
, isDelim
, c
, cprev
, cnext;
arg.push(c);
for (i = 0; i < question.length; i++) {
cprev = i > 0 ? question[i - 1] : "";
c = question[i];
cnext = (i < question.length - 1) ? question[i + 1] : "";
isDelim = (c === delimiter);
if (stringOpen === true) { // processing quotted string
if (c === "\"" && cprev !== "\\") { // closer
// close string
stringOpen = false;
// add string arg, even if empty
args.push(getArgFromValue(arg.join("")));
// reset arg
arg = [];
} else { // just another char
arg.push(c);
}
} else if (jsonLevel > 0) { // processing JSON object
if (c === "}" && cprev !== "\\") { // closer
jsonLevel--;
} else if (c === "{" && cprev !== "\\") { // opener
jsonLevel++;
}
if (jsonLevel === 0) { // closed
args.push(getArgFromValue(arg.join("")));
// reset arg
arg = [];
}
} else if (arrayLevel > 0) { // processing JSON object
if (c === "]" && cprev !== "\\") { // closer
arrayLevel--;
} else if (c === "[" && cprev !== "\\") { // opener
arrayLevel++;
}
arg.push(c);
arg.push(c);
if (jsonLevel === 0) { // closed
args.push(getArgFromValue(arg.join("")));
// reset arg
arg = [];
}
} else if (arrayLevel > 0) { // processing JSON object
if (c === "]" && cprev !== "\\") { // closer
arrayLevel--;
} else if (c === "[" && cprev !== "\\") { // opener
arrayLevel++;
}
if (arrayLevel === 0) { // closed
args.push(getArgFromValue(arg.join("")));
// reset arg
arg = [];
}
} else { // processing basic arg
if (c === delimiter) { // delimiter
if (arg.length > 0) { // if arg, add it
args.push(getArgFromValue(arg.join("")));
// reset arg
arg = [];
}
} else if (c === "{" && arg.length === 0) { // JSON opener
jsonLevel++;
arg.push(c);
} else if (c === "[" && arg.length === 0) { // Array opener
arrayLevel++;
arg.push(c);
} else if (c === "\"" && arg.length === 0) { // string opener
stringOpen = true;
} else { // add it
arg.push(c);
}
}
}
arg.push(c);
if (arg.length > 0) { // if arg remains, add it too
args.push(getArgFromValue(arg.join("")));
}
if (arrayLevel === 0) { // closed
args.push(getArgFromValue(arg.join("")));
// reset arg
arg = [];
}
} else { // processing basic arg
if (c === delimiter) { // delimiter
if (arg.length > 0) { // if arg, add it
args.push(getArgFromValue(arg.join("")));
// reset arg
arg = [];
}
} else if (c === "{" && arg.length === 0) { // JSON opener
jsonLevel++;
arg.push(c);
} else if (c === "[" && arg.length === 0) { // Array opener
arrayLevel++;
arg.push(c);
} else if (c === "\"" && arg.length === 0) { // string opener
stringOpen = true;
} else { // add it
arg.push(c);
}
}
}
return args;
};
if (arg.length > 0) { // if arg remains, add it too
args.push(getArgFromValue(arg.join("")));
}
return args;
}
function getArgFromValue(val) {
try {
// \" tags should be standard quotes after parsed
val = val.replace(/\\\"/g, '"');
try {
// \" tags should be standard quotes after parsed
val = val.replace(/\\\"/g, '"');
// try to process as JSON first
// Typical use cases:
// 1 - number
// 1.3 - number
// [] - array
// { "a": { } } - object
return JSON.parse(val);
} catch (ex) {
return val; // use as-is
}
// try to process as JSON first
// Typical use cases:
// 1 - number
// 1.3 - number
// [] - array
// { "a": { } } - object
return JSON.parse(val);
} catch (ex) {
return val; // use as-is
}
}

@@ -1,6 +0,4 @@

var
cservice = require("../cluster-service"),
cluster = require("cluster"),
onWorkerStop = null
;
var cservice = require("../cluster-service"),
cluster = require("cluster"),
onWorkerStop = null;

@@ -10,34 +8,45 @@ module.exports = exports = workerReady;

function workerReady(options, forceIsWorker) {
if (cluster.isMaster === true && forceIsWorker !== true) {
return; // ignore if coming from master
}
if (cluster.isMaster === true && forceIsWorker !== true) {
return; // ignore if coming from master
}
if (cservice.locals.workerReady === true) {
return; // ignore dup calls
}
if (cservice.locals.workerReady === true) {
return; // ignore dup calls
}
cservice.locals.workerReady = true;
if (options === false) {
cservice.locals.workerReady = false;
options = options || {};
return; // do not continue
}
onWorkerStop = options.onWorkerStop;
process.on("message", onMessageFromMaster);
cservice.locals.workerReady = true;
// allow worker to inform the master when ready to speed up initialization
process.send({ cservice: { cmd: "workerReady", onStop: (typeof options.onWorkerStop === "function") } });
options = options || {};
onWorkerStop = options.onWorkerStop;
process.on("message", onMessageFromMaster);
// allow worker to inform the master when ready to speed up initialization
process.send({
cservice: {
cmd: "workerReady",
onStop: (typeof options.onWorkerStop === "function")
}
});
}
function onMessageFromMaster(msg) {
if (!msg || !msg.cservice || !msg.cservice.cmd) {
return; // ignore invalid cluster-service messages
}
if (!msg || !msg.cservice || !msg.cservice.cmd) {
return; // ignore invalid cluster-service messages
}
switch (msg.cservice.cmd) {
case "onWorkerStop":
if (typeof onWorkerStop === "function") {
onWorkerStop();
}
break;
};
switch (msg.cservice.cmd) {
case "onWorkerStop":
if (typeof onWorkerStop === "function") {
onWorkerStop();
}
break;
}
}

@@ -1,4 +0,2 @@

var
cluster = require("cluster")
;
var cluster = require("cluster");

@@ -10,37 +8,65 @@ exports.get = get;

function get() {
var workers = [];
var cworkers = cluster.workers;
for (var k in cworkers) {
var worker = cworkers[k];
worker.pid = worker.process.pid;
workers.push(worker);
}
var workers = [];
var cworkers = cluster.workers;
var k;
var worker;
for (k in cworkers) {
worker = cworkers[k];
worker.pid = worker.process.pid;
workers.push(worker);
}
return workers;
workers.send=send;
return workers;
}
// i hate O(N) lookups, but not hit hard enough to worry about optimizing at this point. freshness is more important
// i hate O(N) lookups, but not hit hard enough to worry about optimizing at
// this point. freshness is more important
function getByPID(pid) {
var workers = get();
for (var i = 0; i < workers.length; i++) {
var worker = workers[i];
if (worker.pid === pid) {
return worker;
}
}
// else return undefined
var workers = get();
var i;
var worker;
for (i = 0; i < workers.length; i++) {
worker = workers[i];
if (worker.pid === pid) {
return worker;
}
}
// else return undefined
}
function monitor() {
process.on("message", function(msg) {
if (!msg || typeof msg.cservice !== "string") {
return; // end
}
process.on("message", function(msg) {
if (!msg || typeof msg.cservice !== "string") {
return; // end
}
switch (msg.cservice) {
case "processDetails":
process.send({ cservice: "processDetails", processDetails: { memoryUsage: process.memoryUsage(), title: process.title, uptime: process.uptime(), hrtime: process.hrtime() } });
break;
}
});
switch (msg.cservice) {
case "processDetails":
process.send({
cservice: "processDetails",
processDetails: {
memoryUsage: process.memoryUsage(),
title: process.title,
uptime: process.uptime(),
hrtime: process.hrtime()
}
});
break;
}
});
}
/**
* This is shorthand for:
* <pre>
* module.workers.forEach(function(worker){...});
* </pre>
*/
function send(){
this.forEach(function(worker){
//worker.send.apply(worker, [].slice.apply(arguments));
});
}
{
"name": "cluster-service",
"version": "0.6.2",
"version": "0.7.0",
"author": {

@@ -11,8 +11,13 @@ "name": "Aaron Silvas",

"scripts": {
"test": "istanbul cover ./node_modules/mocha/bin/_mocha -- --ui bdd -R spec -t 5000 -d"
"lint" : "npm run-script lint-src && npm run-script lint-test",
"lint-src": "jshint bin lib cluster-service.js",
"lint-test": "jshint --config .test-jshintrc test",
"cover":"istanbul cover ./node_modules/mocha/bin/_mocha -- --ui bdd -R spec -t 5000 -d",
"test-devel":"./node_modules/.bin/mocha bdd -R spec -t 5000 test/*.js test/workers/*.js",
"test": "npm run-script lint && npm run-script cover"
},
"dependencies": {
"async": ">=0.2.x",
"optimist": ">=0.6.0",
"colors": ">=0.6.2",
"optimist": ">=0.6.0",
"colors": ">=0.6.2",
"extend": ">=1.1.x"

@@ -23,3 +28,5 @@ },

"request": ">=2.21.0",
"istanbul": "~0.1.43"
"istanbul": "~0.1.43",
"sinon": "1.7.3",
"jshint": "2.3.x"
},

@@ -47,16 +54,16 @@ "repository": {

"remote access",
"multi process",
"master",
"child",
"process",
"monitor",
"monitoring",
"continous integration",
"healthcheck",
"heartbeat",
"health check",
"heart beat",
"REST",
"resilient"
"multi process",
"master",
"child",
"process",
"monitor",
"monitoring",
"continous integration",
"healthcheck",
"heartbeat",
"health check",
"heart beat",
"REST",
"resilient"
]
}
# cluster-service
[![Build Status](https://travis-ci.org/godaddy/node-cluster-service.png)](https://travis-ci.org/godaddy/node-cluster-service) [![NPM version](https://badge.fury.io/js/cluster-service.png)](http://badge.fury.io/js/cluster-service) [![Dependency Status](https://gemnasium.com/godaddy/node-cluster-service.png)](https://gemnasium.com/godaddy/node-cluster-service)
[![Build Status](https://travis-ci.org/godaddy/node-cluster-service.png)](https://travis-ci.org/godaddy/node-cluster-service) [![NPM version](https://badge.fury.io/js/cluster-service.png)](http://badge.fury.io/js/cluster-service) [![Dependency Status](https://gemnasium.com/godaddy/node-cluster-service.png)](https://gemnasium.com/godaddy/node-cluster-service) [![Bitdeli Badge](https://d2weczhvl823v0.cloudfront.net/godaddy/node-cluster-service/trend.png)](https://bitdeli.com/free "Bitdeli Badge")

@@ -118,3 +118,3 @@

forked based on value of ```workerCount```. An object indicates one or more worker objects:
```{ "worker1": { worker: "worker1.js", cwd: process.cwd(), count: 2, ready: true, restart: true } }```.
```{ "worker1": { worker: "worker1.js", cwd: process.cwd(), count: 2, ready: false, restart: true } }```.
This option is automatically set if run via command-line ```cservice "worker.js"``` if

@@ -124,2 +124,3 @@ the ```.js``` extension is detected.

Allows CLI & REST interfaces.
* master - An optional module to execute for the master process only, once ```start``` has been completed.
* config - A filename to the configuration to load. Useful to keep options from having to be inline.

@@ -135,4 +136,4 @@ This option is automatically set if run via command-line ```cservice "config.json"``` if

workers do not impact availability, such as task queues, and can be run as a single instance.
* cli (default: true) - Enable the command line interface. Can be disabled for background
services, or test cases.
* cli (default: false) - Enable the command line interface. Can be disabled for background
services, or test cases. Note: As of v0.7 and later, this defaults to true if run from command-line.
* ssl - If provided, will bind using HTTPS by passing this object as the

@@ -151,2 +152,7 @@ [TLS options](http://nodejs.org/api/tls.html#tls_tls_createserver_options_secureconnectionlistener).

(in seconds), the process will be restarted gracefully. Only one worker will be restarted at a time.
* commands - A single directory, an array of directories, or a comma-delimited list of directories
may be provided to auto-register commands found in the provided folders that match the ".js"
extension. If the module exposes the "id" property, that will be the name of the command,
otherwise the filename (minus the extension) will be used as the name of the command. If relative
paths are provided, they will be resolved from process.cwd().

@@ -215,4 +221,9 @@

You may optionally register one more directories of commands via the ```commands``` option:
cservice.start({ commands: "./commands,../some_more_commands" });
The above example allows you to skip manually registering each command via ```cservice.on```.
## Cluster Events

@@ -229,12 +240,9 @@

By default, when a process is started successfully without it exiting, it is assumed to be "running".
By default, when a process is started successfully without exiting, it is assumed to be "running".
This behavior is not always desired however, and may optionally be controlled by:
Indicate the worker is NOT ready, via ```ready``` option:
require("cluster-service").start({ workers: { "async": { worker: "async_worker.js", ready: false } } });
Have the worker inform the master once it is actually ready:
// worker.js
require("cluster-service").workerReady(false); // we're NOT ready!
setTimeout(funtion() {

@@ -241,0 +249,0 @@ // dumb example of async support

var exit = require('../lib/commands/exit');
var assert = require("assert");
describe('[Exit cmd]', function(){
it('Invalid request if cmd not equal to now', function(done){
var evt = {};
var cmd = "Not now";
var cb = function(message){
assert.equal(message, "Invalid request, 'now' required. Try help exit");
done();
};
exit(evt, cb, cmd);
});
it('Calls process exit', function(done){
var evt = {};
var cmd = "now";
process.exit = function(input){
assert.equal(input, 0);
};
var real_log = console.log;
console.log = function(msg1, msg2){
assert.equal(msg1, "*** FORCEFUL TERMINATION REQUESTED ***");
assert.equal(msg2, "Exiting now.");
};
var cb = function(nullObj, message){
assert.equal(nullObj, null);
assert.equal(message, "Exiting now.");
console.log = real_log;
done();
};
exit(evt, cb, cmd);
});
it('more', function(done){
var callback = function(nullObj, obj){
assert.equal(nullObj, null);
assert.equal(obj.info, "Forcefully exits the service.");
assert.equal(obj.command, "exit now");
assert.equal(obj["now"], "Required. 'now' to force exit.");
done();
};
describe('[Exit cmd]', function() {
it('Invalid request if cmd not equal to now', function(done) {
var evt = {};
var cmd = "Not now";
var cb = function(message) {
assert.equal(message, "Invalid request, 'now' required. Try help exit");
done();
};
exit(evt, cb, cmd);
});
exit.more(callback);
});
})
it('Calls process exit', function(done) {
var evt = {};
var cmd = "now";
process.exit = function(input) {
assert.equal(input, 0);
};
var realLog = console.log;
console.log = function(msg1, msg2) {
assert.equal(msg1, "*** FORCEFUL TERMINATION REQUESTED ***");
assert.equal(msg2, "Exiting now.");
};
var cb = function(nullObj, message) {
assert.equal(nullObj, null);
assert.equal(message, "Exiting now.");
console.log = realLog;
done();
};
exit(evt, cb, cmd);
});
it('more', function(done) {
var callback = function(nullObj, obj) {
assert.equal(nullObj, null);
assert.equal(obj.info, "Forcefully exits the service.");
assert.equal(obj.command, "exit now");
assert.equal(obj.now, "Required. 'now' to force exit.");
done();
};
exit.more(callback);
});
});
var cmd = require('../lib/commands/health');
var assert = require("assert");
describe('[Health cmd]', function(){
it('Issue command', function(done){
cmd({}, function(nullObj, data){
assert.equal(nullObj, null);
assert.equal(data, "OK");
done();
});
});
describe('[Health cmd]', function() {
it('Issue command', function(done) {
cmd({}, function(nullObj, data) {
assert.equal(nullObj, null);
assert.equal(data, "OK");
done();
});
});
it('more', function(done){
var callback = function(nullObj, data){
assert.equal(nullObj, null);
assert.equal(data.info, "Returns health of service. May be overidden by service to expose app-specific data.");
assert.equal(data.command, "health");
done();
};
it('more', function(done) {
var callback = function(nullObj, data) {
assert.equal(nullObj, null);
assert.equal(
data.info,
[
"Returns health of service. May be overidden by service to expose",
"app-specific data."
].join(' ')
);
assert.equal(data.command, "health");
done();
};
cmd.more(callback);
});
})
cmd.more(callback);
});
});

@@ -0,1 +1,2 @@

/* jshint camelcase:false */
var cmd = require('../lib/commands/help');

@@ -6,45 +7,83 @@ var assert = require("assert");

cservice.log = function() {};
cservice.isWorker && it("WORKER", function(done) { });
cservice.isMaster && describe('[Help cmd]', function(){
it('Start', function(done){
cservice.start({ workers: null, workerCount: 1, accessKey: "123", cli: false }, function() {
assert.equal(cservice.workers.length, 0, "0 worker expected, but " + cservice.workers.length + " found");
done();
});
});
it('Help all', function(done){
cservice.trigger("help", function(err, data) {
assert.equal(err, null);
assert.equal(data.more, "Commands (Use 'help [command_name]' for more details)");
done();
});
});
if(cservice.isWorker){
it("WORKER", function(done) {});
} else {
describe('[Help cmd]', function() {
it('Start', function(done) {
cservice.start(
{
workers: null,
workerCount: 1,
accessKey: "123",
cli: false
},
function() {
assert.equal(
cservice.workers.length,
0,
"0 worker expected, but " + cservice.workers.length + " found"
);
done();
}
);
});
it('Help health', function(done){
cservice.trigger("help", function(err, data) {
assert.equal(err, null);
assert.equal(data.info, "Returns health of service. May be overidden by service to expose app-specific data.");
assert.equal(data.command, "health");
done();
}, "health");
});
it('more', function(done){
var callback = function(nullObj, obj){
assert.equal(nullObj, null);
assert.equal(obj.command_name, "Optional if you want extended help");
assert.equal(obj.command, "help [command_name]");
done();
};
it('Help all', function(done) {
cservice.trigger("help", function(err, data) {
assert.equal(err, null);
assert.equal(
data.more,
"Commands (Use 'help [command_name]' for more details)"
);
assert.ok(data.commands);
var invisible_commands = data.commands.filter(
function(el) {
return (el === "workerStart" || el === "workerExit");
}
);
assert.equal(invisible_commands.length, 0,
"Expected to find 0 invisible commands, but found " +
invisible_commands.length
);
done();
});
});
cmd.more(callback);
});
it('Help health', function(done) {
cservice.trigger("help", function(err, data) {
assert.equal(err, null);
assert.equal(
data.info,
[
"Returns health of service. May be overidden by service to expose",
"app-specific data."
].join(' ')
);
assert.equal(data.command, "health");
done();
}, "health");
});
it('Stop', function(done){
cservice.stop(30000, function() {
assert.equal(cservice.workers.length, 0, "0 workers expected, but " + cservice.workers.length + " found");
done();
});
});
})
it('more', function(done) {
var callback = function(nullObj, obj) {
assert.equal(nullObj, null);
assert.equal(obj.command_name, "Optional if you want extended help");
assert.equal(obj.command, "help [command_name]");
done();
};
cmd.more(callback);
});
it('Stop', function(done) {
cservice.stop(30000, function() {
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
done();
});
});
});
}
var cmd = require('../lib/commands/version');
var assert = require("assert");
describe('[Version cmd]', function(){
it('Get version', function(done){
var pkg = require("../package.json");
var evt = {};
var cb = function(err, data){
assert.ifError(err);
assert.equal(data, pkg.version);
done();
};
cmd(evt, cb);
});
})
describe('[Version cmd]', function() {
it('Get version', function(done) {
var pkg = require("../package.json");
var evt = {};
var cb = function(err, data) {
assert.ifError(err);
assert.equal(data, pkg.version);
done();
};
cmd(evt, cb);
});
});

@@ -6,36 +6,55 @@ var cservice = require("../cluster-service");

cservice.log = function() {};
cservice.isWorker && it("WORKER", function(done) { });
cservice.isMaster && describe('[Workers cmd]', function(){
it('Start', function(done){
cservice.start({ workers: null, workerCount: 1, accessKey: "123", cli: false }, function() {
assert.equal(cservice.workers.length, 0, "0 worker expected, but " + cservice.workers.length + " found");
done();
});
});
if(cservice.isWorker){
it("WORKER", function(done) {});
} else {
describe('[Workers cmd]', function() {
it('Start', function(done) {
cservice.start(
{
workers: null,
workerCount: 1,
accessKey: "123",
cli: false
},
function() {
assert.equal(
cservice.workers.length,
0,
"0 worker expected, but " + cservice.workers.length + " found"
);
done();
}
);
});
it('Test workers command', function(done){
cservice.trigger("workers", function(err, data) {
assert.equal(err, null);
assert.equal(data.workers.length, 0);
done();
});
});
it('more', function(done){
var callback = function(nullObj, obj){
assert.equal(nullObj, null);
assert.equal(obj.info, "Returns list of active worker processes.");
assert.equal(obj.command, "workers [simple|details]");
done();
};
it('Test workers command', function(done) {
cservice.trigger("workers", function(err, data) {
assert.equal(err, null);
assert.equal(data.workers.length, 0);
done();
});
});
workers.more(callback);
});
it('more', function(done) {
var callback = function(nullObj, obj) {
assert.equal(nullObj, null);
assert.equal(obj.info, "Returns list of active worker processes.");
assert.equal(obj.command, "workers [simple|details]");
done();
};
it('Stop', function(done){
cservice.stop(30000, function() {
assert.equal(cservice.workers.length, 0, "0 workers expected, but " + cservice.workers.length + " found");
done();
});
});
})
workers.more(callback);
});
it('Stop', function(done) {
cservice.stop(30000, function() {
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
done();
});
});
});
}
var control = require("../lib/control");
var assert = require("assert");
describe('Control', function(){
describe('levels', function(){
it('should have inproc, local and remote', function(done){
assert.notEqual(control.levels.inproc, undefined, "control.levels.inproc should exist");
assert.notEqual(control.levels.local, undefined, "control.levels.local should exist");
assert.notEqual(control.levels.remote, undefined, "control.levels.remote should exist");
done();
});
});
describe('Control', function() {
describe('levels', function() {
it('should have inproc, local and remote', function(done) {
assert.notEqual(
control.levels.inproc,
undefined,
"control.levels.inproc should exist"
);
assert.notEqual(
control.levels.local,
undefined,
"control.levels.local should exist"
);
assert.notEqual(
control.levels.remote,
undefined,
"control.levels.remote should exist"
);
done();
});
});
describe('levels', function(){
it('should have hierarchy', function(done){
assert.equal(control.levels.inproc > control.levels.local, true, "control.levels.inproc should be greater than control.levels.local");
assert.equal(control.levels.inproc > control.levels.remote, true, "control.levels.inproc should be greater than control.levels.remote");
assert.equal(control.levels.local > control.levels.remote, true, "control.levels.local should be greater than control.levels.remote");
done();
});
});
describe('levels', function() {
it('should have hierarchy', function(done) {
assert.equal(
control.levels.inproc > control.levels.local,
true,
"control.levels.inproc should be greater than control.levels.local"
);
assert.equal(
control.levels.inproc > control.levels.remote,
true,
"control.levels.inproc should be greater than control.levels.remote"
);
assert.equal(
control.levels.local > control.levels.remote,
true,
"control.levels.local should be greater than control.levels.remote"
);
done();
});
});
describe('setControls', function(){
it('returns controls', function(done){
var controls = control.setControls({ "test" : "inproc" });
assert.equal(controls["test"], control.levels.inproc);
done();
});
});
describe('setControls', function() {
it('returns controls', function(done) {
var controls = control.setControls({"test": "inproc"});
assert.equal(controls.test, control.levels.inproc);
done();
});
});
describe('setControls', function(){
it('should throw if level does not exist', function(done){
assert.throws(function(){control.setControls({"test" : "does not exist"})});
done();
});
});
describe('setControls', function() {
it('should throw if level does not exist', function(done) {
assert.throws(function() {
control.setControls({"test": "does not exist"});
});
done();
});
});
describe('addControls', function(){
it('should add to controls', function(done){
control.setControls({ "test" : "inproc" });
var controls = control.addControls({ "test2" : "local" });
assert.equal(controls["test"], control.levels.inproc);
assert.equal(controls["test2"], control.levels.local);
done();
});
});
describe('addControls', function() {
it('should add to controls', function(done) {
control.setControls({"test": "inproc"});
var controls = control.addControls({"test2": "local"});
assert.equal(controls.test, control.levels.inproc);
assert.equal(controls.test2, control.levels.local);
done();
});
});
describe('addControls', function(){
it('should override existing controls', function(done){
control.setControls({ "test" : "inproc" });
var controls = control.addControls({ "test" : "local" });
assert.equal(controls["test"], control.levels.local);
done();
});
});
describe('addControls', function() {
it('should override existing controls', function(done) {
control.setControls({"test": "inproc"});
var controls = control.addControls({"test": "local"});
assert.equal(controls.test, control.levels.local);
done();
});
});
describe('authorize', function(){
it('should authorize for exact match', function(done){
control.setControls({"test" : "inproc"});
var isAuthorized = control.authorize("test", control.levels.inproc);
assert.equal(isAuthorized, true, "isAuthorized should be true.");
done();
});
});
describe('authorize', function() {
it('should authorize for exact match', function(done) {
control.setControls({"test": "inproc"});
var isAuthorized = control.authorize("test", control.levels.inproc);
assert.equal(isAuthorized, true, "isAuthorized should be true.");
done();
});
});
describe('authorize', function(){
it('should authorize inproc if allowed control is local', function(done){
control.setControls({"test" : "local"});
var isAuthorized = control.authorize("test", control.levels.inproc);
assert.equal(isAuthorized, true, "isAuthorized should be true.");
done();
});
});
describe('authorize', function() {
it('should authorize inproc if allowed control is local', function(done) {
control.setControls({"test": "local"});
var isAuthorized = control.authorize("test", control.levels.inproc);
assert.equal(isAuthorized, true, "isAuthorized should be true.");
done();
});
});
describe('authorize', function(){
it('default is remote and should authorize', function(done){
control.setControls({});
var isAuthorized = control.authorize("test", control.levels.local);
assert.equal(isAuthorized, true, "isAuthorized should be true.");
done();
});
});
describe('authorize', function() {
it('default is remote and should authorize', function(done) {
control.setControls({});
var isAuthorized = control.authorize("test", control.levels.local);
assert.equal(isAuthorized, true, "isAuthorized should be true.");
done();
});
});
});

@@ -9,84 +9,127 @@ var cservice = require("../cluster-service");

cservice.results = function() {};
cservice.isWorker && it("WORKER", function(done) { });
cservice.isMaster && describe('[REST Server]', function(){
it('Start worker', function(done){
cservice.start({ workers: null, workerCount: 0, accessKey: "123", cli: false }, function() {
assert.equal(cservice.workers.length, 0, "0 worker expected, but " + cservice.workers.length + " found");
done();
});
});
httpclient.init(cservice.locals, extend(cservice.options, { accessKey: "123", silentMode: true }));
it('Health check', function(done){
httpclient.execute("health", function(err, result) {
assert.equal(result, "\"OK\"", "Expected OK. result=" + result + ", err=" + err);
done();
}
);
});
it('Bad command', function(done){
httpclient.execute("x98s7df987sdf", function(err, result) {
assert.equal(err, "Not found. Try /help", "Expected 'Not found. Try /help'. result=" + result + ", err=" + err);
done();
}
);
});
if(cservice.isWorker){
it("WORKER", function(done) {});
} else {
describe('[REST Server]', function() {
it('Start worker', function(done) {
cservice.start(
{
workers: null,
workerCount: 0,
accessKey: "123",
cli: false
},
function() {
assert.equal(
cservice.workers.length,
0,
"0 worker expected, but " + cservice.workers.length + " found"
);
done();
}
);
});
var disabledCmd = function (evt, cb) {
cb(null, "You shouldn't be able to see my data");
};
disabledCmd.control = function() {
return "disabled";
};
httpclient.init(
cservice.locals,
extend(cservice.options, {accessKey: "123", silentMode: true})
);
it('Health check', function(done) {
httpclient.execute("health", function(err, result) {
assert.equal(
result, "\"OK\"", "Expected OK. result=" + result + ", err=" + err
);
done();
}
);
});
it('Run cmd', function(done) {
cservice.start({ run: "health" }, function(err, result) {
assert.ifError(err);
assert.equal(result, "\"OK\"", "Expected OK, but received: " + result);
done();
return false;
});
});
it('Command authorization', function(done){
cservice.on("disabledCmd", disabledCmd);
var url = "http://localhost:11987/cli?cmd=disabledCmd&accessKey=123";
request.post(url, function (err, res, result) {
assert.equal(result, "Not authorized to execute 'disabledCmd' remotely");
done();
});
});
it('Bad command', function(done) {
httpclient.execute("x98s7df987sdf", function(err, result) {
assert.equal(
err,
"Not found. Try /help", "Expected 'Not found. Try /help'. result="
+ result
+ ", err=" + err
);
done();
}
);
});
it('Request authorization', function(done){
cservice.on("disabledCmd", disabledCmd);
var url = "http://localhost:11987/cli?cmd=disabledCmd&accessKey=BAD";
request.post(url, function (err, res, result) {
assert.equal(result, "Not authorized");
done();
});
});
var disabledCmd = function(evt, cb) {
cb(null, "You shouldn't be able to see my data");
};
disabledCmd.control = function() {
return "disabled";
};
it('Method Not Allowed', function(done){
var url = "http://localhost:11987/cli?cmd=health&accessKey=123";
request.get(url, function (err, res, result) {
assert.equal(result, "Method Not Allowed", "Expected 'Method Not Allowed'. result=" + result + ", err=" + err);
done();
});
});
it('Run cmd', function(done) {
cservice.start({run: "health"}, function(err, result) {
assert.ifError(err);
assert.equal(result, "\"OK\"", "Expected OK, but received: " + result);
done();
return false;
});
});
it('Page Not Found', function(done){
var url = "http://localhost:11987/BADCLI?cmd=health&accessKey=123";
request.post(url, function (err, res, result) {
assert.equal(result, "Page Not Found", "Expected 'Page Not Found'. result=" + result + ", err=" + err);
done();
});
});
it('Stop workers', function(done){
cservice.stop(30000, function() {
assert.equal(cservice.workers.length, 0, "0 workers expected, but " + cservice.workers.length + " found");
done();
});
});
})
it('Command authorization', function(done) {
cservice.on("disabledCmd", disabledCmd);
var url = "http://localhost:11987/cli?cmd=disabledCmd&accessKey=123";
request.post(url, function(err, res, result) {
assert.equal(
result,
"Not authorized to execute 'disabledCmd' remotely"
);
done();
});
});
it('Request authorization', function(done) {
cservice.on("disabledCmd", disabledCmd);
var url = "http://localhost:11987/cli?cmd=disabledCmd&accessKey=BAD";
request.post(url, function(err, res, result) {
assert.equal(result, "Not authorized");
done();
});
});
it('Method Not Allowed', function(done) {
var url = "http://localhost:11987/cli?cmd=health&accessKey=123";
request.get(url, function(err, res, result) {
assert.equal(
result,
"Method Not Allowed", "Expected 'Method Not Allowed'. result="
+ result
+ ", err=" + err
);
done();
});
});
it('Page Not Found', function(done) {
var url = "http://localhost:11987/BADCLI?cmd=health&accessKey=123";
request.post(url, function(err, res, result) {
assert.equal(
result,
"Page Not Found", "Expected 'Page Not Found'. result="
+ result
+ ", err=" + err
);
done();
});
});
it('Stop workers', function(done) {
cservice.stop(30000, function() {
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
done();
});
});
});
}

@@ -5,62 +5,102 @@ var cservice = require("../cluster-service");

cservice.log = function() {};
cservice.isWorker && it("WORKER", function(done) { });
cservice.isMaster && describe('[Restart]', function(){
it('Start workers', function(done){
assert.equal(cservice.workers.length, 0, "0 workers expected, but " + cservice.workers.length + " found");
cservice.start({ workers: { basic2: { worker: "./test/workers/basic2", count: 2, ready: false } }, accessKey: "123", cli: false }, function() {
assert.equal(cservice.workers.length, 2, "2 workers expected, but " + cservice.workers.length + " found");
done();
});
});
it('Bad input #1', function(done){
cservice.trigger("restart", function(err) {
assert.equal(err, "Invalid request. Try help restart");
done();
}, "GG"
);
});
if(cservice.isWorker){
it("WORKER", function(done) {});
} else {
describe('[Restart]', function() {
it('Start workers', function(done) {
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
cservice.start(
{
workers: {
basic2: {
worker: "./test/workers/basic2",
count: 2,
ready: false
}
},
accessKey: "123",
cli: false
},
function() {
assert.equal(
cservice.workers.length,
2,
"2 workers expected, but " + cservice.workers.length + " found"
);
done();
}
);
});
it('Bad input #2', function(done){
cservice.trigger("restart", function(err) {
assert.equal(err, "Invalid request. Try help restart");
done();
}, 0
);
});
it('Bad input #1', function(done) {
cservice.trigger("restart", function(err) {
assert.equal(err, "Invalid request. Try help restart");
done();
}, "GG"
);
});
it('Restart without timeout', function(done){
cservice.trigger("restart", function() {
assert.equal(cservice.workers.length, 2, "2 workers expected, but " + cservice.workers.length + " found");
done();
}, "all", { timeout: 30000 } // with timeout
);
});
it('Bad input #2', function(done) {
cservice.trigger("restart", function(err) {
assert.equal(err, "Invalid request. Try help restart");
done();
}, 0
);
});
it('Restart with timeout', function(done){
cservice.trigger("restart", function(err) {
assert.equal(err, "timed out");
setTimeout(function() {
assert.equal(cservice.workers.length, 2, "2 workers expected, but " + cservice.workers.length + " found");
done();
}, 1000);
}, "all", { timeout: 1 } // with timeout
);
});
it('Stop workers', function(done){
cservice.stop(30000, function() {
assert.equal(cservice.workers.length, 0, "0 workers expected, but " + cservice.workers.length + " found");
done();
});
});
it('Restart with no workers', function(done){
cservice.trigger("restart", function(err) {
assert.equal(err, "No workers to restart");
assert.equal(cservice.workers.length, 0, "0 workers expected, but " + cservice.workers.length + " found");
done();
}, "all"
);
});
})
it('Restart without timeout', function(done) {
cservice.trigger("restart", function() {
assert.equal(
cservice.workers.length,
2,
"2 workers expected, but " + cservice.workers.length + " found"
);
done();
}, "all", {timeout: 30000} // with timeout
);
});
it('Restart with timeout', function(done) {
cservice.trigger("restart", function(err) {
assert.equal(err, "timed out");
setTimeout(function() {
assert.equal(
cservice.workers.length,
2,
"2 workers expected, but " + cservice.workers.length + " found"
);
done();
}, 1000);
}, "all", {timeout: 1} // with timeout
);
});
it('Stop workers', function(done) {
cservice.stop(30000, function() {
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
done();
});
});
it('Restart with no workers', function(done) {
cservice.trigger("restart", function(err) {
assert.equal(err, "No workers to restart");
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
done();
}, "all");
});
});
}

@@ -5,100 +5,244 @@ var cservice = require("../cluster-service");

cservice.log = function() {};
cservice.isWorker && it("WORKER", function(done) { });
cservice.isMaster && describe('[Start & Stop]', function(){
it('Start worker', function(done){
assert.equal(cservice.workers.length, 0, "0 workers expected, but " + cservice.workers.length + " found");
cservice.start({ config: "./test/workers/basic.json" }, function() {
assert.equal(cservice.workers.length, 1, "1 worker expected, but " + cservice.workers.length + " found");
done();
});
});
cservice.log = function() {
};
if(cservice.isWorker){
it("WORKER", function(done) {});
} else {
describe('[Start & Stop]', function() {
it('Start worker', function(done) {
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
cservice.start({config: "./test/workers/basic.json"}, function() {
assert.equal(
cservice.workers.length,
1,
"1 worker expected, but " + cservice.workers.length + " found"
);
done();
});
});
it("start.prepArgs.js", function(done) {
var o = { _: [ "Server.JS" ] };
start.prepArgs(o);
assert.equal(o.workers, "Server.JS", "Expected 'Server.JS', got " + o.workers);
done();
});
it("start.prepArgs.js", function(done) {
var o = {_: ["Server.JS"]};
start.prepArgs(o);
assert.equal(
o.workers,
"Server.JS", "Expected 'Server.JS', got " + o.workers
);
done();
});
it("start.prepArgs.json", function(done) {
var o = { _: [ "Config.JSon" ] };
start.prepArgs(o);
assert.equal(o.config, "Config.JSon", "Expected 'Config.JSon', got " + o.config);
done();
});
it("start.prepArgs.json", function(done) {
var o = {_: ["Config.JSon"]};
start.prepArgs(o);
assert.equal(
o.config,
"Config.JSon", "Expected 'Config.JSon', got " + o.config
);
done();
});
it("start.prepArgs.run", function(done) {
var o = { _: [ "some command" ] };
start.prepArgs(o);
assert.equal(o.run, "some command", "Expected 'some command', got " + o.run);
assert.ifError(o.json);
done();
});
it("start.prepArgs.run", function(done) {
var o = {_: ["some command"]};
start.prepArgs(o);
assert.equal(
o.run,
"some command", "Expected 'some command', got "+o.run
);
assert.ifError(o.json);
done();
});
it("start.prepArgs.run.json", function(done) {
var o = { _: [ "some command" ], json: true };
start.prepArgs(o);
assert.equal(o.run, "some command", "Expected 'some command', got " + o.run);
assert.equal(o.cli, false, "CLI should be disabled");
done();
});
it('Add 2nd worker', function(done){
cservice.trigger("start", function(err, result) {
assert.equal(cservice.workers.length, 2, "2 workers expected, but " + cservice.workers.length + " found");
done();
}, "./test/workers/basic", { ready: false, count: 1, timeout: 10000 });
});
it("start.prepArgs.run.json", function(done) {
var o = {_: ["some command"], json: true};
start.prepArgs(o);
assert.equal(
o.run,
"some command", "Expected 'some command', got "+o.run
);
assert.equal(o.cli, false, "CLI should be disabled");
done();
});
it('Timeout on new worker', function(done){
cservice.trigger("start", function(err, result) {
assert.equal(err, "timed out");
done();
}, "./test/workers/longInit", { ready: false, count: 1, timeout: 1000 });
});
it('Add 2nd worker', function(done) {
cservice.trigger("start", function(err, result) {
assert.equal(
cservice.workers.length,
2,
"2 workers expected, but " + cservice.workers.length + " found"
);
done();
}, "./test/workers/basic", {ready: false, count: 1, timeout: 10000});
});
it('Start help', function(done){
cservice.trigger("help", function(err, result) {
assert.equal(result.info, "Gracefully start service, one worker at a time.");
done();
}, "start");
});
it('Bad worker start', function(done){
cservice.trigger("start", function(err, result) {
assert.equal(err, "Invalid request. Try help start");
done();
}, null, { count: 1, timeout: 1000 });
});
it('Restart workers', function(done){
cservice.trigger("restart", function() {
assert.equal(cservice.workers.length, 2, "2 workers expected, but " + cservice.workers.length + " found");
done();
}, "all"
);
});
it('Upgrade workers', function(done){
cservice.trigger("upgrade", function() {
assert.equal(cservice.workers.length, 2, "2 workers expected, but " + cservice.workers.length + " found");
done();
}, "all", "./test/workers/basic2", { ready: false }
);
});
it('Timeout on new worker', function(done) {
cservice.trigger("start", function(err, result) {
assert.equal(err, "timed out");
done();
}, "./test/workers/longInit", {ready: false, count: 1, timeout: 100});
});
it('Stop workers', function(done){
cservice.stop(30000, function() {
assert.equal(cservice.workers.length, 0, "0 workers expected, but " + cservice.workers.length + " found");
done();
});
});
it('Start help', function(done) {
cservice.trigger("help", function(err, result) {
assert.equal(
result.info,
"Gracefully start service, one worker at a time."
);
done();
}, "start");
});
it('Stop an already stopped service', function(done){
cservice.stop(30000, function() {
assert.equal(cservice.workers.length, 0, "0 workers expected, but " + cservice.workers.length + " found");
done();
});
});
})
it('Bad worker start', function(done) {
cservice.trigger("start", function(err, result) {
assert.equal(err, "Invalid request. Try help start");
done();
}, null, {count: 1, timeout: 1000});
});
it('Restart workers', function(done) {
cservice.trigger("restart", function() {
assert.equal(
cservice.workers.length,
2,
"2 workers expected, but " + cservice.workers.length + " found"
);
done();
}, "all"
);
});
it('Upgrade workers', function(done) {
cservice.trigger("upgrade", function() {
assert.equal(
cservice.workers.length,
2,
"2 workers expected, but " + cservice.workers.length + " found"
);
done();
}, "all", "./test/workers/basic2", {ready: false}
);
});
it('Stop workers', function(done) {
cservice.stop(30000, function() {
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
done();
});
});
it('Stop an already stopped service', function(done) {
cservice.stop(30000, function() {
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
done();
});
});
it('Start legacy async worker', function(done) {
var startTime = new Date().getTime();
cservice.start(
{
workers:
{
legacyReady: {
worker: "./test/workers/legacyReady.js",
ready: false,
count: 1
}
}
},
function() {
assert.equal(
cservice.workers.length,
1,
"1 worker expected, but " + cservice.workers.length + " found"
);
var diffTime = (new Date().getTime() - startTime);
assert.ok(diffTime >= 1000,
"Legacy workerReady logic should have taken >= 1000ms, " +
"but returned in " + diffTime + "ms"
);
done();
}
);
});
it('Start legacy async worker with ready set to true', function(done) {
var startTime = new Date().getTime();
cservice.start(
{
workers:
{
legacyReady: {
worker: "./test/workers/legacyReady.js",
ready: true,
count: 1
}
}
},
function() {
assert.equal(
cservice.workers.length,
2,
"2 workers expected, but " + cservice.workers.length + " found"
);
var diffTime = (new Date().getTime() - startTime);
assert.ok(diffTime < 1000,
"Legacy workerReady logic should have taken < 1000ms, " +
"but returned in " + diffTime + "ms"
);
done();
}
);
});
it('Start inline async worker', function(done) {
var startTime = new Date().getTime();
cservice.start(
{
workers:
{
inlineReady: {
worker: "./test/workers/inlineReady.js",
count: 1
}
}
},
function() {
assert.equal(
cservice.workers.length,
3,
"3 workers expected, but " + cservice.workers.length + " found"
);
var diffTime = (new Date().getTime() - startTime);
assert.ok(diffTime >= 1000,
"Inline workerReady logic should have taken >= 1000ms, " +
"but returned in " + diffTime + "ms"
);
done();
}
);
});
it('Stop workers', function(done) {
cservice.stop(30000, function() {
assert.equal(
cservice.workers.length,
0,
"0 workers expected, but " + cservice.workers.length + " found"
);
done();
});
});
});
}
var assert = require("assert");
var util = require("../lib/util");
describe('Util funcs', function(){
describe('getArgsFromQuestion', function(){
it('Strings', function(done){
var args = util.getArgsFromQuestion("health { \"check\": true, \"nested\": { } } \"arg #3\" [\"arg #4\"]", " ");
assert.equal(args.length, 4);
assert.equal(args[1].check, true);
done();
});
});
})
describe('Util funcs', function() {
describe('getArgsFromQuestion', function() {
it('Strings', function(done) {
var args = util.getArgsFromQuestion(
"health { \"check\": true, \"nested\": { } } \"arg #3\" [\"arg #4\"]",
" "
);
assert.equal(args.length, 4);
assert.equal(args[1].check, true);
done();
});
});
});
var cservice = require("../cluster-service");
var assert = require("assert");
var sinon = require("sinon");
var stats = {
onWorkerReady: 0,
onWorkerStop: 0
onWorkerReady: 0,
onWorkerStop: 0
};

@@ -11,47 +12,101 @@ var workerStopped = 0;

cservice.log = function() {};
cservice.isWorker && it("WORKER", function(done) { });
cservice.isMaster && describe('[Works]', function(){
before(function(done) {
process.send = fakeSend;
done();
});
if(cservice.isWorker){
it("WORKER", function(done) {});
} else {
describe('[Works]', function() {
before(function(done) {
process.send = fakeSend;
after(function() {
delete process.send;
});
it('workerReady.isWorker', function(done){
workerReady({ onWorkerStop: onWorkerStop }, true);
assert.equal(stats.onWorkerReady, 1, "1 onWorkerReady expected, but " + stats.onWorkerReady + " detected");
workerReady({ onWorkerStop: onWorkerStop }, true);
assert.equal(stats.onWorkerReady, 1, "1 onWorkerReady expected, but " + stats.onWorkerReady + " detected");
done();
});
done();
});
it('workerReady.ifMaster', function(done){
workerReady({ onWorkerStop: onWorkerStop });
assert.equal(stats.onWorkerReady, 1, "1 onWorkerReady expected, but " + stats.onWorkerReady + " detected");
done();
});
it('onWorkerStop', function(done){
process.emit("message", { cservice: { cmd: 'onWorkerStop' } });
assert.equal(stats.onWorkerStop, 1, "1 onWorkerStop expected, but " + stats.onWorkerStop + " detected");
done();
});
it('BAD onWorkerStop', function(done){
process.emit("message", { cservice: { } });
assert.equal(stats.onWorkerStop, 1, "1 onWorkerStop expected, but " + stats.onWorkerStop + " detected");
done();
});
})
after(function() {
delete process.send;
});
it('workerReady.isWorker', function(done) {
workerReady({onWorkerStop: onWorkerStop}, true);
assert.equal(
stats.onWorkerReady,
1,
"1 onWorkerReady expected, but " + stats.onWorkerReady + " detected"
);
workerReady({onWorkerStop: onWorkerStop}, true);
assert.equal(
stats.onWorkerReady,
1,
"1 onWorkerReady expected, but " + stats.onWorkerReady + " detected"
);
done();
});
it('workerReady.ifMaster', function(done) {
workerReady({onWorkerStop: onWorkerStop});
assert.equal(
stats.onWorkerReady,
1,
"1 onWorkerReady expected, but " + stats.onWorkerReady + " detected"
);
done();
});
it('onWorkerStop', function(done) {
process.emit("message", {cservice: {cmd: 'onWorkerStop'}});
assert.equal(
stats.onWorkerStop,
1,
"1 onWorkerStop expected, but " + stats.onWorkerStop + " detected"
);
done();
});
it('BAD onWorkerStop', function(done) {
process.emit("message", {cservice: {}});
assert.equal(
stats.onWorkerStop,
1,
"1 onWorkerStop expected, but " + stats.onWorkerStop + " detected"
);
done();
});
describe("#send", function(){
var original;
beforeEach(function(){
original = [];
cservice.workers.map(function(worker){
var stub = sinon.stub();
original.push(worker);
stub.pid = worker.pid;
return stub;
});
});
afterEach(function(){
cservice.workers.map(original.shift);
});
it("sends the message to all workers", function() {
var workersCalled = 0;
//act
cservice.workers.send({boo:true});
//assert
cservice.workers.map(function(stub){
sinon.calledWith(stub, sinon.match({boo:true}));
workersCalled+=1;
});
assert.equal(workersCalled, original.length);
});
});
});
}
function fakeSend(o) {
stats.onWorkerReady++;
stats.onWorkerReady++;
}
function onWorkerStop() {
stats.onWorkerStop++;
}
stats.onWorkerStop++;
}

@@ -1,5 +0,3 @@

var
cservice = require("../../cluster-service")
;
var cservice = require("../../cluster-service");
cservice.workerReady();
cservice.workerReady();
{
"workers": {
"basic": { "worker": "./test/workers/basic.js", "count": 1 }
}, "accessKey": "123", "cli": false
"workers": {
"basic": {"worker": "./test/workers/basic.js", "count": 1}
}, "accessKey": "123", "cli": false
}

@@ -1,7 +0,5 @@

var
cservice = require("../../cluster-service")
;
var cservice = require("../../cluster-service");
setTimeout(function() {
cservice.workerReady();
cservice.workerReady();
}, 100);
{
"workers": {
"basic2": { "worker": "./test/workers/basic2.js", "ready": false }
}, "accessKey": "123", "cli": false
"workers": {
"basic2": {"worker": "./test/workers/basic2.js", "ready": false}
}, "accessKey": "123", "cli": false
}

@@ -1,7 +0,5 @@

var
cservice = require("../../cluster-service")
;
var cservice = require("../../cluster-service");
setTimeout(function() {
cservice.workerReady();
cservice.workerReady();
}, 10000);

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc