batch-cluster
Advanced tools
Comparing version 4.3.0 to 5.0.0
@@ -20,2 +20,13 @@ # Changelog | ||
## v5.0.0 | ||
- 💔 The `rejectTaskOnStderr` API, which was added in v4.1.0 and applied to all | ||
tasks for a given `BatchCluster` instance, proved to be a poor decision, and | ||
has been removed. The `Parser` API, which is task-specific, now receives | ||
**both** stdin and stderr streams. Parsers then have the necessary context to | ||
decide what to do on a per task or per task-type basis. | ||
- 🐞 In previous versions, batch processes were recycled if any task had any | ||
type of error. This version allows pids to live even if they emit data to | ||
stderr. | ||
## v4.3.0 | ||
@@ -22,0 +33,0 @@ |
/// <reference types="node" /> | ||
import { ChildProcess } from "child_process"; | ||
import { BatchClusterEmitter } from "./BatchClusterEmitter"; | ||
import { BatchClusterOptions } from "./BatchClusterOptions"; | ||
import { BatchProcessOptions } from "./BatchProcessOptions"; | ||
import { Task } from "./Task"; | ||
export { kill, pidExists, pids } from "./Pids"; | ||
export { BatchClusterOptions } from "./BatchClusterOptions"; | ||
export { BatchProcessOptions } from "./BatchProcessOptions"; | ||
export { Deferred } from "./Deferred"; | ||
export * from "./Logger"; | ||
export { Task, Parser } from "./Task"; | ||
export { Parser } from "./Parser"; | ||
export { kill, pidExists, pids } from "./Pids"; | ||
export { Task } from "./Task"; | ||
/** | ||
@@ -20,124 +26,2 @@ * These are required parameters for a given BatchCluster. | ||
/** | ||
* `BatchProcessOptions` have no reasonable defaults, as they are specific | ||
* to the API of the command that BatchCluster is spawning. | ||
* | ||
* All fields must be set. | ||
*/ | ||
export interface BatchProcessOptions { | ||
/** | ||
* Low-overhead command to verify the child batch process has started. | ||
* Will be invoked immediately after spawn. This command must return | ||
* before any tasks will be given to a given process. | ||
*/ | ||
versionCommand: string; | ||
/** | ||
* Expected text to print if a command passes. Cannot be blank. Strings will | ||
* be interpreted as a regular expression fragment. | ||
*/ | ||
pass: string | RegExp; | ||
/** | ||
* Expected text to print if a command fails. Cannot be blank. Strings will be | ||
* interpreted as a regular expression fragment. | ||
*/ | ||
fail: string | RegExp; | ||
/** | ||
* Command to end the child batch process. If not provided, stdin will be | ||
* closed to signal to the child process that it may terminate, and if it | ||
* does not shut down within `endGracefulWaitTimeMillis`, it will be | ||
* SIGHUP'ed. | ||
*/ | ||
exitCommand?: string; | ||
} | ||
/** | ||
* These parameter values have somewhat sensible defaults, but can be | ||
* overridden for a given BatchCluster. | ||
*/ | ||
export declare class BatchClusterOptions { | ||
/** | ||
* No more than `maxProcs` child processes will be run at a given time | ||
* to serve pending tasks. | ||
* | ||
* Defaults to 1. | ||
*/ | ||
readonly maxProcs: number; | ||
/** | ||
* Child processes will be recycled when they reach this age. | ||
* | ||
* If this value is set to 0, child processes will not "age out". | ||
* | ||
* This value must not be less than `spawnTimeoutMillis` or | ||
* `taskTimeoutMillis`. | ||
* | ||
* Defaults to 5 minutes. | ||
*/ | ||
readonly maxProcAgeMillis: number; | ||
/** | ||
* This is the minimum interval between calls to `this.onIdle`, which | ||
* runs pending tasks and shuts down old child processes. | ||
* | ||
* Must be > 0. Defaults to 5 seconds. | ||
*/ | ||
readonly onIdleIntervalMillis: number; | ||
/** | ||
* If the initial `versionCommand` fails for new spawned processes more | ||
* than this rate, end this BatchCluster and throw an error, because | ||
* something is terribly wrong. | ||
* | ||
* If this backstop didn't exist, new (failing) child processes would be | ||
* created indefinitely. | ||
* | ||
* Must be >= 0. Defaults to 10. | ||
*/ | ||
readonly maxReasonableProcessFailuresPerMinute: number; | ||
/** | ||
* Spawning new child processes and servicing a "version" task must not | ||
* take longer than `spawnTimeoutMillis` before the process is considered | ||
* failed, and need to be restarted. Be pessimistic here--windows can | ||
* regularly take several seconds to spin up a process, thanks to | ||
* antivirus shenanigans. | ||
* | ||
* Must be >= 100ms. Defaults to 15 seconds. | ||
*/ | ||
readonly spawnTimeoutMillis: number; | ||
/** | ||
* If commands take longer than this, presume the underlying process is dead | ||
* and we should fail the task. | ||
* | ||
* This should be set to something on the order of seconds. | ||
* | ||
* Must be >= 10ms. Defaults to 10 seconds. | ||
*/ | ||
readonly taskTimeoutMillis: number; | ||
/** | ||
* Processes will be recycled after processing `maxTasksPerProcess` | ||
* tasks. Depending on the commands and platform, batch mode commands | ||
* shouldn't exhibit unduly memory leaks for at least tens if not | ||
* hundreds of tasks. Setting this to a low number (like less than 10) | ||
* will impact performance markedly, due to OS process start/stop | ||
* maintenance. Setting this to a very high number (> 1000) may result in | ||
* more memory being consumed than necessary. | ||
* | ||
* Must be >= 0. Defaults to 500 | ||
*/ | ||
readonly maxTasksPerProcess: number; | ||
/** | ||
* When `this.end()` is called, or Node broadcasts the `beforeExit` | ||
* event, this is the milliseconds spent waiting for currently running | ||
* tasks to finish before sending kill signals to child processes. | ||
* | ||
* Setting this value to 0 means child processes will immediately receive | ||
* a kill signal to shut down. Any pending requests may be interrupted. | ||
* Must be >= 0. Defaults to 500ms. | ||
*/ | ||
readonly endGracefulWaitTimeMillis: number; | ||
/** | ||
* Some tools emit non-fatal warnings to stderr. If this predicate returns | ||
* false, the task will not be rejected. | ||
* | ||
* This defaults to a function that always returns true, which makes all | ||
* stderr writes reject tasks. | ||
*/ | ||
readonly rejectTaskOnStderr: (task: Task<any>, error: string | Error) => boolean; | ||
} | ||
/** | ||
* BatchCluster instances manage 0 or more homogenious child processes, and | ||
@@ -152,4 +36,3 @@ * provide the main interface for enqueing `Task`s via `enqueueTask`. | ||
*/ | ||
export declare class BatchCluster { | ||
private readonly emitter; | ||
export declare class BatchCluster extends BatchClusterEmitter { | ||
private readonly _tasksPerProc; | ||
@@ -168,28 +51,2 @@ private readonly opts; | ||
private readonly exitListener; | ||
/** | ||
* Emitted when a child process has an error when spawning | ||
*/ | ||
on(event: "startError", listener: (err: Error) => void): void; | ||
/** | ||
* Emitted when tasks receive data, which may be partial chunks from the task | ||
* stream. | ||
*/ | ||
on(event: "taskData", listener: (data: Buffer | string, task: Task<any> | undefined) => void): void; | ||
/** | ||
* Emitted when a task has an error | ||
*/ | ||
on(event: "taskError", listener: (err: Error, task: Task<any>) => void): void; | ||
/** | ||
* Emitted when a child process has an error during shutdown | ||
*/ | ||
on(event: "endError", listener: (err: Error) => void): void; | ||
/** | ||
* Emitted when this instance is in the process of ending. | ||
*/ | ||
on(event: "beforeEnd", listener: () => void): void; | ||
/** | ||
* Emitted when this instance has ended. No child processes should remain at | ||
* this point. | ||
*/ | ||
on(event: "end", listener: () => void): void; | ||
readonly ended: boolean; | ||
@@ -196,0 +53,0 @@ /** |
"use strict"; | ||
var __assign = (this && this.__assign) || function () { | ||
__assign = Object.assign || function(t) { | ||
for (var s, i = 1, n = arguments.length; i < n; i++) { | ||
s = arguments[i]; | ||
for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p)) | ||
t[p] = s[p]; | ||
} | ||
return t; | ||
var __extends = (this && this.__extends) || (function () { | ||
var extendStatics = function (d, b) { | ||
extendStatics = Object.setPrototypeOf || | ||
({ __proto__: [] } instanceof Array && function (d, b) { d.__proto__ = b; }) || | ||
function (d, b) { for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; }; | ||
return extendStatics(d, b); | ||
} | ||
return function (d, b) { | ||
extendStatics(d, b); | ||
function __() { this.constructor = d; } | ||
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); | ||
}; | ||
return __assign.apply(this, arguments); | ||
}; | ||
})(); | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
@@ -52,3 +54,2 @@ return new (P || (P = Promise))(function (resolve, reject) { | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var events_1 = require("events"); | ||
var _p = require("process"); | ||
@@ -58,2 +59,4 @@ var timers_1 = require("timers"); | ||
var Async_1 = require("./Async"); | ||
var BatchClusterEmitter_1 = require("./BatchClusterEmitter"); | ||
var BatchClusterOptions_1 = require("./BatchClusterOptions"); | ||
var BatchProcess_1 = require("./BatchProcess"); | ||
@@ -64,2 +67,7 @@ var Logger_1 = require("./Logger"); | ||
var Rate_1 = require("./Rate"); | ||
var BatchClusterOptions_2 = require("./BatchClusterOptions"); | ||
exports.BatchClusterOptions = BatchClusterOptions_2.BatchClusterOptions; | ||
var Deferred_1 = require("./Deferred"); | ||
exports.Deferred = Deferred_1.Deferred; | ||
__export(require("./Logger")); | ||
var Pids_2 = require("./Pids"); | ||
@@ -69,139 +77,5 @@ exports.kill = Pids_2.kill; | ||
exports.pids = Pids_2.pids; | ||
var Deferred_1 = require("./Deferred"); | ||
exports.Deferred = Deferred_1.Deferred; | ||
__export(require("./Logger")); | ||
var Task_1 = require("./Task"); | ||
exports.Task = Task_1.Task; | ||
/** | ||
* These parameter values have somewhat sensible defaults, but can be | ||
* overridden for a given BatchCluster. | ||
*/ | ||
var BatchClusterOptions = /** @class */ (function () { | ||
function BatchClusterOptions() { | ||
/** | ||
* No more than `maxProcs` child processes will be run at a given time | ||
* to serve pending tasks. | ||
* | ||
* Defaults to 1. | ||
*/ | ||
this.maxProcs = 1; | ||
/** | ||
* Child processes will be recycled when they reach this age. | ||
* | ||
* If this value is set to 0, child processes will not "age out". | ||
* | ||
* This value must not be less than `spawnTimeoutMillis` or | ||
* `taskTimeoutMillis`. | ||
* | ||
* Defaults to 5 minutes. | ||
*/ | ||
this.maxProcAgeMillis = 5 * 60 * 1000; | ||
/** | ||
* This is the minimum interval between calls to `this.onIdle`, which | ||
* runs pending tasks and shuts down old child processes. | ||
* | ||
* Must be > 0. Defaults to 5 seconds. | ||
*/ | ||
this.onIdleIntervalMillis = 5000; | ||
/** | ||
* If the initial `versionCommand` fails for new spawned processes more | ||
* than this rate, end this BatchCluster and throw an error, because | ||
* something is terribly wrong. | ||
* | ||
* If this backstop didn't exist, new (failing) child processes would be | ||
* created indefinitely. | ||
* | ||
* Must be >= 0. Defaults to 10. | ||
*/ | ||
this.maxReasonableProcessFailuresPerMinute = 10; | ||
/** | ||
* Spawning new child processes and servicing a "version" task must not | ||
* take longer than `spawnTimeoutMillis` before the process is considered | ||
* failed, and need to be restarted. Be pessimistic here--windows can | ||
* regularly take several seconds to spin up a process, thanks to | ||
* antivirus shenanigans. | ||
* | ||
* Must be >= 100ms. Defaults to 15 seconds. | ||
*/ | ||
this.spawnTimeoutMillis = 15000; | ||
/** | ||
* If commands take longer than this, presume the underlying process is dead | ||
* and we should fail the task. | ||
* | ||
* This should be set to something on the order of seconds. | ||
* | ||
* Must be >= 10ms. Defaults to 10 seconds. | ||
*/ | ||
this.taskTimeoutMillis = 10000; | ||
/** | ||
* Processes will be recycled after processing `maxTasksPerProcess` | ||
* tasks. Depending on the commands and platform, batch mode commands | ||
* shouldn't exhibit unduly memory leaks for at least tens if not | ||
* hundreds of tasks. Setting this to a low number (like less than 10) | ||
* will impact performance markedly, due to OS process start/stop | ||
* maintenance. Setting this to a very high number (> 1000) may result in | ||
* more memory being consumed than necessary. | ||
* | ||
* Must be >= 0. Defaults to 500 | ||
*/ | ||
this.maxTasksPerProcess = 500; | ||
/** | ||
* When `this.end()` is called, or Node broadcasts the `beforeExit` | ||
* event, this is the milliseconds spent waiting for currently running | ||
* tasks to finish before sending kill signals to child processes. | ||
* | ||
* Setting this value to 0 means child processes will immediately receive | ||
* a kill signal to shut down. Any pending requests may be interrupted. | ||
* Must be >= 0. Defaults to 500ms. | ||
*/ | ||
this.endGracefulWaitTimeMillis = 500; | ||
/** | ||
* Some tools emit non-fatal warnings to stderr. If this predicate returns | ||
* false, the task will not be rejected. | ||
* | ||
* This defaults to a function that always returns true, which makes all | ||
* stderr writes reject tasks. | ||
*/ | ||
this.rejectTaskOnStderr = function () { return true; }; | ||
} | ||
return BatchClusterOptions; | ||
}()); | ||
exports.BatchClusterOptions = BatchClusterOptions; | ||
function verifyOptions(opts) { | ||
function toRe(s) { | ||
return s instanceof RegExp | ||
? s | ||
: new RegExp("^((?:[\\s\\S]*[\\n\\r]+)?)" + s + "[\\n\\r]*$"); | ||
} | ||
var result = __assign({}, new BatchClusterOptions(), opts, { passRE: toRe(opts.pass), failRE: toRe(opts.fail) }); | ||
var errors = []; | ||
function notBlank(fieldName) { | ||
var v = result[fieldName]; | ||
if (v.trim().length === 0) { | ||
errors.push(fieldName + " must not be blank"); | ||
} | ||
} | ||
function gte(fieldName, value) { | ||
var v = result[fieldName]; | ||
if (v < value) { | ||
errors.push(fieldName + " must be greater than or equal to " + value); | ||
} | ||
} | ||
notBlank("versionCommand"); | ||
notBlank("pass"); | ||
notBlank("fail"); | ||
gte("spawnTimeoutMillis", 100); | ||
gte("taskTimeoutMillis", 10); | ||
gte("maxTasksPerProcess", 1); | ||
gte("maxProcs", 1); | ||
gte("maxProcAgeMillis", Math.max(result.spawnTimeoutMillis, result.taskTimeoutMillis)); | ||
gte("onIdleIntervalMillis", 0); | ||
gte("endGracefulWaitTimeMillis", 0); | ||
gte("maxReasonableProcessFailuresPerMinute", 0); | ||
if (errors.length > 0) { | ||
throw new Error("BatchCluster was given invalid options: " + errors.join(", ")); | ||
} | ||
return result; | ||
} | ||
/** | ||
* BatchCluster instances manage 0 or more homogenious child processes, and | ||
@@ -216,17 +90,17 @@ * provide the main interface for enqueing `Task`s via `enqueueTask`. | ||
*/ | ||
var BatchCluster = /** @class */ (function () { | ||
var BatchCluster = /** @class */ (function (_super) { | ||
__extends(BatchCluster, _super); | ||
function BatchCluster(opts) { | ||
var _this = this; | ||
this.emitter = new events_1.EventEmitter(); | ||
this._tasksPerProc = new Mean_1.Mean(); | ||
this._procs = []; | ||
this.tasks = []; | ||
this.startErrorRate = new Rate_1.Rate(); | ||
this._spawnedProcs = 0; | ||
this._ended = false; | ||
this._internalErrorCount = 0; | ||
this.beforeExitListener = function () { return _this.end(true); }; | ||
this.exitListener = function () { return _this.end(false); }; | ||
this.onIdle = Async_1.serial(function () { return __awaiter(_this, void 0, void 0, function () { | ||
var beforeProcLen, minStart, readyProcs, execNextTask, proc_1; | ||
var _this = _super.call(this) || this; | ||
_this._tasksPerProc = new Mean_1.Mean(); | ||
_this._procs = []; | ||
_this.tasks = []; | ||
_this.startErrorRate = new Rate_1.Rate(); | ||
_this._spawnedProcs = 0; | ||
_this._ended = false; | ||
_this._internalErrorCount = 0; | ||
_this.beforeExitListener = function () { return _this.end(true); }; | ||
_this.exitListener = function () { return _this.end(false); }; | ||
_this.onIdle = Async_1.serial(function () { return __awaiter(_this, void 0, void 0, function () { | ||
var minStart, readyProcs, execNextTask, proc_1; | ||
var _this = this; | ||
@@ -236,3 +110,2 @@ return __generator(this, function (_a) { | ||
return [2 /*return*/]; | ||
beforeProcLen = this._procs.length; | ||
minStart = Date.now() - this.opts.maxProcAgeMillis; | ||
@@ -252,8 +125,2 @@ Array_1.filterInPlace(this._procs, function (proc) { | ||
readyProcs = this._procs.filter(function (proc) { return proc.ready; }); | ||
Logger_1.logger().trace("BatchCluster.onIdle()", { | ||
beforeProcLen: beforeProcLen, | ||
readyProcs: readyProcs.map(function (ea) { return ea.pid; }), | ||
pendingTasks: this.tasks.slice(0, 3).map(function (ea) { return ea.command; }), | ||
pendingTaskCount: this.tasks.length | ||
}); | ||
execNextTask = function () { | ||
@@ -266,9 +133,3 @@ var idleProc = readyProcs.shift(); | ||
return; | ||
if (idleProc.execTask(task)) { | ||
Logger_1.logger().trace("BatchCluster.onIdle(): submitted " + | ||
task.command + | ||
" to child pid " + | ||
idleProc.pid); | ||
} | ||
else { | ||
if (!idleProc.execTask(task)) { | ||
Logger_1.logger().warn("BatchCluster.onIdle(): execTask for " + | ||
@@ -292,12 +153,11 @@ task.command + | ||
} | ||
Logger_1.logger().trace("BatchCluster.onIdle() finished"); | ||
return [2 /*return*/]; | ||
}); | ||
}); }); | ||
this.opts = verifyOptions(opts); | ||
if (this.opts.onIdleIntervalMillis > 0) { | ||
this.onIdleInterval = timers_1.setInterval(function () { return _this.onIdle(); }, this.opts.onIdleIntervalMillis); | ||
this.onIdleInterval.unref(); // < don't prevent node from exiting | ||
_this.opts = BatchClusterOptions_1.verifyOptions(opts); | ||
if (_this.opts.onIdleIntervalMillis > 0) { | ||
_this.onIdleInterval = timers_1.setInterval(function () { return _this.onIdle(); }, _this.opts.onIdleIntervalMillis); | ||
_this.onIdleInterval.unref(); // < don't prevent node from exiting | ||
} | ||
this.observer = { | ||
_this.observer = { | ||
onIdle: function () { return _this.onIdle(); }, | ||
@@ -309,11 +169,8 @@ onStartError: function (err) { return _this.onStartError(err); }, | ||
onTaskError: function (err, task) { return _this.emitter.emit("taskError", err, task); }, | ||
onInternalError: function (err) { return _this.onInternalError(err); }, | ||
rejectTaskOnStderr: this.opts.rejectTaskOnStderr | ||
onInternalError: function (err) { return _this.onInternalError(err); } | ||
}; | ||
_p.once("beforeExit", this.beforeExitListener); | ||
_p.once("exit", this.exitListener); | ||
_p.once("beforeExit", _this.beforeExitListener); | ||
_p.once("exit", _this.exitListener); | ||
return _this; | ||
} | ||
BatchCluster.prototype.on = function (event, listener) { | ||
this.emitter.on(event, listener); | ||
}; | ||
Object.defineProperty(BatchCluster.prototype, "ended", { | ||
@@ -378,3 +235,2 @@ get: function () { | ||
} | ||
Logger_1.logger().trace("BatchCluster.enqueueTask(" + task.command + ")"); | ||
this.tasks.push(task); | ||
@@ -477,4 +333,4 @@ setTimeout(function () { return _this.onIdle(); }, 1); | ||
return BatchCluster; | ||
}()); | ||
}(BatchClusterEmitter_1.BatchClusterEmitter)); | ||
exports.BatchCluster = BatchCluster; | ||
//# sourceMappingURL=BatchCluster.js.map |
/// <reference types="node" /> | ||
import * as _cp from "child_process"; | ||
import { BatchClusterOptions, BatchProcessOptions } from "./BatchCluster"; | ||
import { BatchProcessObserver } from "./BatchProcessObserver"; | ||
import { InternalBatchProcessOptions } from "./InternalBatchProcessOptions"; | ||
import { Task } from "./Task"; | ||
/** | ||
* This interface decouples BatchProcess from BatchCluster. | ||
*/ | ||
export interface BatchProcessObserver { | ||
onIdle(): void; | ||
onTaskData(data: Buffer | string, task: Task<any> | undefined): void; | ||
onTaskError(error: Error, task: Task<any>): void; | ||
onStartError(error: Error): void; | ||
onInternalError(error: Error): void; | ||
rejectTaskOnStderr: (task: Task<any>, error: string | Error) => boolean; | ||
} | ||
export interface InternalBatchProcessOptions extends BatchProcessOptions, BatchClusterOptions { | ||
readonly passRE: RegExp; | ||
readonly failRE: RegExp; | ||
} | ||
/** | ||
* BatchProcess manages the care and feeding of a single child process. | ||
@@ -41,6 +27,2 @@ */ | ||
/** | ||
* Data from stdout, to be given to _currentTask | ||
*/ | ||
private buff; | ||
/** | ||
* Should be undefined if this instance is not currently processing a task. | ||
@@ -76,2 +58,4 @@ */ | ||
private onExit; | ||
private onStderr; | ||
private onStdout; | ||
private onData; | ||
@@ -78,0 +62,0 @@ private clearCurrentTask; |
@@ -41,4 +41,7 @@ "use strict"; | ||
var Deferred_1 = require("./Deferred"); | ||
var Error_1 = require("./Error"); | ||
var Object_1 = require("./Object"); | ||
var Pids_1 = require("./Pids"); | ||
var Stream_1 = require("./Stream"); | ||
var String_1 = require("./String"); | ||
var Task_1 = require("./Task"); | ||
@@ -66,6 +69,2 @@ /** | ||
this._exited = new Deferred_1.Deferred(); | ||
/** | ||
* Data from stdout, to be given to _currentTask | ||
*/ | ||
this.buff = ""; | ||
this.name = "BatchProcess(" + proc.pid + ")"; | ||
@@ -80,7 +79,5 @@ // don't let node count the child processes as a reason to stay alive | ||
this.proc.stdout.on("error", function (err) { return _this.onError("stdout.error", err); }); | ||
this.proc.stdout.on("data", function (d) { return _this.onData(d); }); | ||
this.proc.stdout.on("data", function (d) { return _this.onStdout(d); }); | ||
this.proc.stderr.on("error", function (err) { return _this.onError("stderr.error", err); }); | ||
this.proc.stderr.on("data", function (err) { | ||
return _this.onError("stderr.data", new Error(cleanError(err))); | ||
}); | ||
this.proc.stderr.on("data", function (err) { return _this.onStderr(err); }); | ||
this.startupTask = new Task_1.Task(opts.versionCommand, function (ea) { return ea; }); | ||
@@ -190,7 +187,3 @@ this.startupTask.promise | ||
if (this._ended || this.currentTask != null) { | ||
this.observer.onInternalError(new Error(this.name + | ||
".execTask(" + | ||
task.command + | ||
"): already working on " + | ||
this.currentTask)); | ||
this.observer.onInternalError(new Error(this.name + ".execTask(" + task.command + "): already working on " + this.currentTask)); | ||
return false; | ||
@@ -200,3 +193,3 @@ } | ||
this.currentTask = task; | ||
var cmd = ensureSuffix(task.command, "\n"); | ||
var cmd = String_1.ensureSuffix(task.command, "\n"); | ||
var timeoutMs = task === this.startupTask | ||
@@ -234,5 +227,5 @@ ? this.opts.spawnTimeoutMillis | ||
if (!firstEnd) return [3 /*break*/, 2]; | ||
cmd = Object_1.map(this.opts.exitCommand, function (ea) { return ensureSuffix(ea, "\n"); }); | ||
cmd = Object_1.map(this.opts.exitCommand, function (ea) { return String_1.ensureSuffix(ea, "\n"); }); | ||
if (!this.proc.stdin.writable) return [3 /*break*/, 2]; | ||
return [4 /*yield*/, end(this.proc.stdin, cmd)]; | ||
return [4 /*yield*/, Stream_1.end(this.proc.stdin, cmd)]; | ||
case 1: | ||
@@ -255,3 +248,3 @@ _a.sent(); | ||
this.clearCurrentTask(); | ||
tryEach([ | ||
Error_1.tryEach([ | ||
function () { return _this.proc.stdin.end(); }, | ||
@@ -314,3 +307,3 @@ function () { return _this.proc.stdout.destroy(); }, | ||
return __awaiter(this, void 0, void 0, function () { | ||
var error, fatal; | ||
var error; | ||
return __generator(this, function (_a) { | ||
@@ -320,3 +313,3 @@ if (task == null) { | ||
} | ||
error = new Error(source + ": " + cleanError(_error.message)); | ||
error = new Error(source + ": " + Error_1.cleanError(_error.message)); | ||
BatchCluster_1.logger().warn(this.name + ".onError()", { | ||
@@ -329,13 +322,4 @@ source: source, | ||
// Error stacks, if set, will not be redefined from a rethrow: | ||
error.stack = cleanError(_error.stack); | ||
error.stack = Error_1.cleanError(_error.stack); | ||
} | ||
if (task != null) { | ||
fatal = source !== "stderr.data" || | ||
this.observer.rejectTaskOnStderr(task, error); | ||
if (!fatal) { | ||
BatchCluster_1.logger().info("Error permitted by observer, will continue with task."); | ||
this.onData(""); | ||
return [2 /*return*/]; | ||
} | ||
} | ||
// clear the task before ending so the onExit from end() doesn't retry the task: | ||
@@ -349,7 +333,2 @@ this.clearCurrentTask(); | ||
if (task != null) { | ||
BatchCluster_1.logger().debug(this.name + ".onError(): task failed", { | ||
command: task.command, | ||
pid: this.pid, | ||
taskCount: this.taskCount | ||
}); | ||
this.observer.onTaskError(error, task); | ||
@@ -360,7 +339,3 @@ if (task.pending) { | ||
else { | ||
this.observer.onInternalError(new Error(this.name + | ||
".onError(): cannot reject task " + | ||
task.command + | ||
" is it is already " + | ||
task.state)); | ||
this.observer.onInternalError(new Error(this.name + ".onError(" + error + ") cannot reject already-fulfilled task.")); | ||
} | ||
@@ -376,55 +351,55 @@ } | ||
}; | ||
BatchProcess.prototype.onData = function (data) { | ||
BatchCluster_1.logger().trace(this.name + ".onData(" + data.toString() + ")"); | ||
this.observer.onTaskData(data, this.currentTask); | ||
this.buff = this.buff + data.toString(); | ||
var pass = this.opts.passRE.exec(this.buff); | ||
if (pass != null) { | ||
BatchCluster_1.logger().trace(this.name + " found PASS"); | ||
this.resolveCurrentTask(pass[1].trim()); | ||
this.observer.onIdle(); | ||
BatchProcess.prototype.onStderr = function (data) { | ||
if (this.currentTask != null) { | ||
this.currentTask.onStderr(data); | ||
this.onData(this.currentTask); | ||
} | ||
else { | ||
var fail = this.opts.failRE.exec(this.buff); | ||
if (fail != null) { | ||
BatchCluster_1.logger().trace(this.name + " found FAIL"); | ||
var err = new Error(cleanError(fail[1]) || "command error"); | ||
this.onError("onData", err); | ||
} | ||
this.observer.onInternalError(new Error("onStderr() with no current task: " + data)); | ||
} | ||
}; | ||
BatchProcess.prototype.onStdout = function (data) { | ||
if (this.currentTask != null) { | ||
this.observer.onTaskData(data, this.currentTask); | ||
this.currentTask.onStdout(data); | ||
this.onData(this.currentTask); | ||
} | ||
else { | ||
this.observer.onInternalError(new Error("onStdout() with no current task: " + data)); | ||
} | ||
}; | ||
BatchProcess.prototype.onData = function (task) { | ||
var pass = this.opts.passRE.exec(task.stdout); | ||
if (pass != null) { | ||
this.resolveCurrentTask(task, pass[1].trim(), task.stderr); | ||
return; | ||
} | ||
var failout = this.opts.failRE.exec(task.stdout); | ||
if (failout != null) { | ||
this.resolveCurrentTask(task, failout[1].trim(), task.stderr); | ||
return; | ||
} | ||
var failerr = this.opts.failRE.exec(task.stderr); | ||
if (failerr != null) { | ||
this.resolveCurrentTask(task, task.stdout, failerr[1].trim()); | ||
} | ||
}; | ||
BatchProcess.prototype.clearCurrentTask = function () { | ||
if (this.currentTaskTimeout != null) { | ||
clearTimeout(this.currentTaskTimeout); | ||
this.currentTaskTimeout = undefined; | ||
} | ||
Object_1.map(this.currentTaskTimeout, function (ea) { return clearTimeout(ea); }); | ||
this.currentTaskTimeout = undefined; | ||
this.currentTask = undefined; | ||
}; | ||
BatchProcess.prototype.resolveCurrentTask = function (result) { | ||
this.buff = ""; | ||
var task = this.currentTask; | ||
BatchProcess.prototype.resolveCurrentTask = function (task, result, stderr) { | ||
this.clearCurrentTask(); | ||
if (task == null) { | ||
if (result.length > 0 && !this._ended) { | ||
this.observer.onInternalError(new Error(this.name + ".resolveCurrentTask(): no current task")); | ||
} | ||
this.end(false, "resolveCurrentTask(no current task)"); | ||
if (task.pending) { | ||
task.resolve(result, stderr); | ||
} | ||
else { | ||
BatchCluster_1.logger().trace(this.name + ".resolveCurrentTask()", { | ||
task: task.command, | ||
result: result | ||
}); | ||
if (task.pending) { | ||
task.resolve(result); | ||
} | ||
else { | ||
this.observer.onInternalError(new Error(this.name + | ||
".resolveCurrentTask(): cannot resolve task " + | ||
task.command + | ||
" as it is already " + | ||
task.state)); | ||
} | ||
this.observer.onIdle(); | ||
this.observer.onInternalError(new Error(this.name + | ||
".resolveCurrentTask(): " + | ||
task.command + | ||
" is already " + | ||
task.state)); | ||
} | ||
this.observer.onIdle(); | ||
}; | ||
@@ -434,28 +409,2 @@ return BatchProcess; | ||
exports.BatchProcess = BatchProcess; | ||
/** | ||
* When we wrap errors, an Error always prefixes the toString() and stack with | ||
* "Error: ", so we can remove that prefix. | ||
*/ | ||
function cleanError(s) { | ||
return String(s) | ||
.trim() | ||
.replace(/^error: /gi, ""); | ||
} | ||
function ensureSuffix(s, suffix) { | ||
return s.endsWith(suffix) ? s : s + suffix; | ||
} | ||
function end(endable, contents) { | ||
return new Promise(function (resolve) { | ||
contents == null ? endable.end(resolve) : endable.end(contents, resolve); | ||
}); | ||
} | ||
function tryEach(arr) { | ||
for (var _i = 0, arr_1 = arr; _i < arr_1.length; _i++) { | ||
var f = arr_1[_i]; | ||
try { | ||
f(); | ||
} | ||
catch (_) { } | ||
} | ||
} | ||
//# sourceMappingURL=BatchProcess.js.map |
@@ -98,2 +98,5 @@ "use strict"; | ||
if (force === void 0) { force = false; } | ||
if (pid == _p.pid || pid == _p.ppid) { | ||
throw new Error("cannot self-terminate"); | ||
} | ||
if (isWin) { | ||
@@ -100,0 +103,0 @@ var args = ["/PID", sanitize(pid), "/T"]; |
@@ -0,8 +1,4 @@ | ||
/// <reference types="node" /> | ||
import { Parser } from "./Parser"; | ||
/** | ||
* Parser implementations convert stdout from the underlying child process to | ||
* a more useable format. This can be a no-op passthrough if no parsing is | ||
* necessary. | ||
*/ | ||
export declare type Parser<T> = (data: string) => T; | ||
/** | ||
* Tasks embody individual jobs given to the underlying child processes. Each | ||
@@ -16,2 +12,4 @@ * instance has a promise that will be resolved or rejected based on the | ||
private readonly d; | ||
private _stdout; | ||
private _stderr; | ||
/** | ||
@@ -31,2 +29,6 @@ * @param {string} command is the value written to stdin to perform the given | ||
toString(): string; | ||
onStdout(buf: string | Buffer): void; | ||
onStderr(buf: string | Buffer): void; | ||
readonly stdout: string; | ||
readonly stderr: string; | ||
/** | ||
@@ -36,3 +38,3 @@ * This is for use by `BatchProcess` only, and will only be called when the | ||
*/ | ||
resolve(data: string): void; | ||
resolve(result: string, stderr: string): void; | ||
/** | ||
@@ -42,3 +44,3 @@ * This is for use by `BatchProcess` only, and will only be called when the | ||
*/ | ||
reject(error: Error): void; | ||
reject(error: Error, source?: string): void; | ||
} |
@@ -21,2 +21,4 @@ "use strict"; | ||
this.d = new Deferred_1.Deferred(); | ||
this._stdout = ""; | ||
this._stderr = ""; | ||
} | ||
@@ -54,2 +56,22 @@ Object.defineProperty(Task.prototype, "promise", { | ||
}; | ||
Task.prototype.onStdout = function (buf) { | ||
this._stdout += buf.toString(); | ||
}; | ||
Task.prototype.onStderr = function (buf) { | ||
this._stderr += buf.toString(); | ||
}; | ||
Object.defineProperty(Task.prototype, "stdout", { | ||
get: function () { | ||
return this._stdout; | ||
}, | ||
enumerable: true, | ||
configurable: true | ||
}); | ||
Object.defineProperty(Task.prototype, "stderr", { | ||
get: function () { | ||
return this._stderr; | ||
}, | ||
enumerable: true, | ||
configurable: true | ||
}); | ||
/** | ||
@@ -59,17 +81,13 @@ * This is for use by `BatchProcess` only, and will only be called when the | ||
*/ | ||
Task.prototype.resolve = function (data) { | ||
Task.prototype.resolve = function (result, stderr) { | ||
try { | ||
var result = this.parser(data); | ||
var parseResult = this.parser(result, stderr); | ||
Logger_1.logger().trace("Task.onData(): resolved", { | ||
command: this.command, | ||
result: result | ||
parseResult: parseResult | ||
}); | ||
this.d.resolve(result); | ||
this.d.resolve(parseResult); | ||
} | ||
catch (error) { | ||
Logger_1.logger().warn("Task.onData(): rejected", { | ||
command: this.command, | ||
error: error | ||
}); | ||
this.d.reject(error); | ||
this.reject(error, "Task.onData(): rejected"); | ||
} | ||
@@ -81,4 +99,5 @@ }; | ||
*/ | ||
Task.prototype.reject = function (error) { | ||
Logger_1.logger().warn("Task.reject()", { | ||
Task.prototype.reject = function (error, source) { | ||
if (source === void 0) { source = "Task.reject()"; } | ||
Logger_1.logger().warn(source, { | ||
cmd: this.command, | ||
@@ -85,0 +104,0 @@ error: error |
{ | ||
"name": "batch-cluster", | ||
"version": "4.3.0", | ||
"version": "5.0.0", | ||
"description": "Manage a cluster of child processes", | ||
@@ -31,8 +31,8 @@ "main": "dist/BatchCluster.js", | ||
"devDependencies": { | ||
"@types/chai": "^4.1.4", | ||
"@types/chai": "^4.1.6", | ||
"@types/chai-as-promised": "^7.1.0", | ||
"@types/chai-string": "^1.4.1", | ||
"@types/mocha": "^5.2.5", | ||
"@types/node": "^10.9.4", | ||
"chai": "^4.1.2", | ||
"@types/node": "^10.12.0", | ||
"chai": "^4.2.0", | ||
"chai-as-promised": "^7.1.1", | ||
@@ -42,12 +42,12 @@ "chai-string": "^1.5.0", | ||
"mocha": "^5.2.0", | ||
"prettier": "^1.14.2", | ||
"prettier": "^1.14.3", | ||
"rimraf": "^2.6.2", | ||
"seedrandom": "^2.4.4", | ||
"serve": "^10.0.1", | ||
"serve": "^10.0.2", | ||
"source-map-support": "^0.5.9", | ||
"timekeeper": "^2.1.2", | ||
"typedoc": "^0.12.0", | ||
"typescript": "^3.0.3", | ||
"wtfnode": "^0.7.1" | ||
"typedoc": "^0.13.0", | ||
"typescript": "^3.1.3", | ||
"wtfnode": "^0.7.3" | ||
} | ||
} |
@@ -8,3 +8,2 @@ # batch-cluster | ||
[](https://ci.appveyor.com/project/mceachen/batch-cluster-js/branch/master) | ||
 | ||
@@ -68,3 +67,3 @@ Many command line tools, like | ||
1. Implement the [Parser](https://batch-cluster.js.org/globals.html#parser) | ||
1. Implement the [Parser](https://batch-cluster.js.org/interfaces/parser) | ||
class to parse results from your child process. | ||
@@ -71,0 +70,0 @@ |
Sorry, the diff of this file is not supported yet
185073
48
2131
78