New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

batch-cluster

Package Overview
Dependencies
Maintainers
1
Versions
93
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

batch-cluster - npm Package Compare versions

Comparing version 8.1.0 to 9.0.0

22

CHANGELOG.md

@@ -21,5 +21,25 @@ # Changelog

## v9.0.0
- 💔 The `BatchProcessObserver` signature was deleted, as `BatchClusterEmitter` is
now typesafe. Consumers should not have used this signature directly, but in
case anyone did, I bumped the major version.
- ✨ Added `BatchCluster.off` to unregister event listeners provided to `BatchCluster.on`.
- 📦 Private fields and methods now use [the `#` private
prefix](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Classes/Private_class_fields)
rather than the TypeScript `private` modifier.
- 📦 Minor tweaks (fixed several jsdoc errors, simplified some boolean logic,
small reduction in promise chains, ...)
- 📦 Updated development dependencies and rebuild docs
## v8.1.0
- 📦 Added `BatchCluster.procCount` and `BatchCluster.setMaxProcs`, and new `BatchCluster.ChildEndCountType` which includes a new `tooMany` value, which is incremented when `setMaxProcs` is set to a smaller value.
- 📦 Added `BatchCluster.procCount` and `BatchCluster.setMaxProcs`, and new
`BatchCluster.ChildEndCountType` which includes a new `tooMany` value, which
is incremented when `setMaxProcs` is set to a smaller value.
- 📦 Updated development dependencies

@@ -26,0 +46,0 @@

38

dist/BatchCluster.d.ts
/// <reference types="node" />
import child_process from "child_process";
import { BatchClusterEmitter } from "./BatchClusterEmitter";
import { BatchClusterEmitter, BatchClusterEvents } from "./BatchClusterEmitter";
import { AllOpts, BatchClusterOptions } from "./BatchClusterOptions";

@@ -16,3 +16,3 @@ import { WhyNotReady } from "./BatchProcess";

export { Task } from "./Task";
export type { BatchProcessOptions, Parser };
export type { BatchProcessOptions, Parser, BatchClusterEvents, BatchClusterEmitter, };
/**

@@ -40,19 +40,16 @@ * These are required parameters for a given BatchCluster.

*/
export declare class BatchCluster extends BatchClusterEmitter {
private readonly _tasksPerProc;
private readonly logger;
export declare class BatchCluster {
#private;
readonly options: AllOpts;
private readonly _procs;
private _lastSpawnedProcTime;
private _lastPidsCheckTime;
private readonly tasks;
private onIdleInterval;
private readonly startErrorRate;
private _spawnedProcs;
private endPromise?;
private _internalErrorCount;
private readonly _childEndCounts;
readonly emitter: BatchClusterEmitter;
constructor(opts: Partial<BatchClusterOptions> & BatchProcessOptions & ChildProcessFactory);
private readonly beforeExitListener;
private readonly exitListener;
/**
* @see BatchClusterEvents
*/
readonly on: <E extends keyof BatchClusterEvents>(event: E, listener: (...args: BatchClusterEvents[E] extends (...args: infer A) => void ? A : never) => void) => BatchClusterEmitter;
/**
* @see BatchClusterEvents
* @since v9.0.0
*/
readonly off: <E extends keyof BatchClusterEvents>(event: E, listener: (...args: BatchClusterEvents[E] extends (...args: infer A) => void ? A : never) => void) => BatchClusterEmitter;
get ended(): boolean;

@@ -108,4 +105,2 @@ /**

get internalErrorCount(): number;
private emitInternalError;
private emitStartError;
/**

@@ -136,3 +131,2 @@ * Verify that each BatchProcess PID is actually alive.

private onIdle;
private maybeCheckPids;
/**

@@ -143,6 +137,2 @@ * Run maintenance on currently spawned child processes. This method is

vacuumProcs(): void;
private execNextTask;
private cannotLaunchNewChild;
private maybeLaunchNewChild;
private _maybeLaunchNewChild;
}

@@ -12,14 +12,27 @@ "use strict";

};
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
};
var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
if (kind === "m") throw new TypeError("Private method is not writable");
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
};
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
var _BatchCluster_instances, _BatchCluster_tasksPerProc, _BatchCluster_logger, _BatchCluster_procs, _BatchCluster_lastSpawnedProcTime, _BatchCluster_lastPidsCheckTime, _BatchCluster_tasks, _BatchCluster_onIdleInterval, _BatchCluster_startErrorRate, _BatchCluster_spawnedProcs, _BatchCluster_endPromise, _BatchCluster_internalErrorCount, _BatchCluster_childEndCounts, _BatchCluster_beforeExitListener, _BatchCluster_exitListener, _BatchCluster_maybeCheckPids, _BatchCluster_execNextTask, _BatchCluster_maybeLaunchNewChild, _BatchCluster_spawnChild;
Object.defineProperty(exports, "__esModule", { value: true });
exports.BatchCluster = exports.Task = exports.pids = exports.pidExists = exports.kill = exports.SimpleParser = exports.Deferred = exports.BatchClusterOptions = void 0;
const events_1 = __importDefault(require("events"));
const process_1 = __importDefault(require("process"));
const timers_1 = __importDefault(require("timers"));
const Array_1 = require("./Array");
const BatchClusterEmitter_1 = require("./BatchClusterEmitter");
const BatchClusterOptions_1 = require("./BatchClusterOptions");
const BatchProcess_1 = require("./BatchProcess");
const Deferred_1 = require("./Deferred");
const Error_1 = require("./Error");
const Mean_1 = require("./Mean");

@@ -51,50 +64,57 @@ const Object_1 = require("./Object");

*/
class BatchCluster extends BatchClusterEmitter_1.BatchClusterEmitter {
class BatchCluster {
constructor(opts) {
super();
this._tasksPerProc = new Mean_1.Mean();
this._procs = [];
this._lastSpawnedProcTime = 0;
this._lastPidsCheckTime = Date.now();
this.tasks = [];
this.startErrorRate = new Rate_1.Rate();
this._spawnedProcs = 0;
this._internalErrorCount = 0;
this._childEndCounts = new Map();
this.beforeExitListener = () => this.end(true);
this.exitListener = () => this.end(false);
const observer = {
onIdle: () => {
this.onIdle();
},
onStartError: (err) => {
this.emitStartError(err);
},
onTaskData: (data, task) => {
this.emitter.emit("taskData", data, task);
},
onTaskResolved: (task, proc) => {
this.emitter.emit("taskResolved", task, proc);
},
onTaskError: (err, task, proc) => {
this.emitter.emit("taskError", err, task, proc);
},
onHealthCheckError: (err, proc) => {
this.emitter.emit("healthCheckError", err, proc);
},
onInternalError: (err) => {
this.emitInternalError(err);
},
};
this.options = (0, BatchClusterOptions_1.verifyOptions)({ ...opts, observer });
_BatchCluster_instances.add(this);
_BatchCluster_tasksPerProc.set(this, new Mean_1.Mean());
_BatchCluster_logger.set(this, void 0);
_BatchCluster_procs.set(this, []);
_BatchCluster_lastSpawnedProcTime.set(this, 0);
_BatchCluster_lastPidsCheckTime.set(this, Date.now());
_BatchCluster_tasks.set(this, []);
_BatchCluster_onIdleInterval.set(this, void 0);
_BatchCluster_startErrorRate.set(this, new Rate_1.Rate());
_BatchCluster_spawnedProcs.set(this, 0);
_BatchCluster_endPromise.set(this, void 0);
_BatchCluster_internalErrorCount.set(this, 0);
_BatchCluster_childEndCounts.set(this, new Map());
this.emitter = new events_1.default();
/**
* @see BatchClusterEvents
*/
this.on = this.emitter.on.bind(this.emitter);
/**
* @see BatchClusterEvents
* @since v9.0.0
*/
this.off = this.emitter.off.bind(this.emitter);
_BatchCluster_beforeExitListener.set(this, () => this.end(true));
_BatchCluster_exitListener.set(this, () => this.end(false));
this.options = (0, BatchClusterOptions_1.verifyOptions)({ ...opts, observer: this.emitter });
this.on("internalError", (error) => {
var _a;
__classPrivateFieldGet(this, _BatchCluster_logger, "f").call(this).error("BatchCluster: INTERNAL ERROR: " + error);
__classPrivateFieldSet(this, _BatchCluster_internalErrorCount, (_a = __classPrivateFieldGet(this, _BatchCluster_internalErrorCount, "f"), _a++, _a), "f");
});
this.on("startError", (error) => {
__classPrivateFieldGet(this, _BatchCluster_logger, "f").call(this).warn("BatchCluster.onStartError(): " + error);
__classPrivateFieldGet(this, _BatchCluster_startErrorRate, "f").onEvent();
if (__classPrivateFieldGet(this, _BatchCluster_startErrorRate, "f").eventsPerMinute >
this.options.maxReasonableProcessFailuresPerMinute) {
this.emitter.emit("endError", new Error(error +
"(start errors/min: " +
__classPrivateFieldGet(this, _BatchCluster_startErrorRate, "f").eventsPerMinute.toFixed(2) +
")"));
this.end();
}
});
if (this.options.onIdleIntervalMillis > 0) {
this.onIdleInterval = timers_1.default.setInterval(() => this.onIdle(), this.options.onIdleIntervalMillis);
this.onIdleInterval.unref(); // < don't prevent node from exiting
__classPrivateFieldSet(this, _BatchCluster_onIdleInterval, timers_1.default.setInterval(() => this.onIdle(), this.options.onIdleIntervalMillis), "f");
__classPrivateFieldGet(this, _BatchCluster_onIdleInterval, "f").unref(); // < don't prevent node from exiting
}
this.logger = this.options.logger;
process_1.default.once("beforeExit", this.beforeExitListener);
process_1.default.once("exit", this.exitListener);
__classPrivateFieldSet(this, _BatchCluster_logger, this.options.logger, "f");
process_1.default.once("beforeExit", __classPrivateFieldGet(this, _BatchCluster_beforeExitListener, "f"));
process_1.default.once("exit", __classPrivateFieldGet(this, _BatchCluster_exitListener, "f"));
}
get ended() {
return this.endPromise != null;
return __classPrivateFieldGet(this, _BatchCluster_endPromise, "f") != null;
}

@@ -108,9 +128,9 @@ /**

end(gracefully = true) {
if (this.endPromise == null) {
if (__classPrivateFieldGet(this, _BatchCluster_endPromise, "f") == null) {
this.emitter.emit("beforeEnd");
(0, Object_1.map)(this.onIdleInterval, timers_1.default.clearInterval);
this.onIdleInterval = undefined;
process_1.default.removeListener("beforeExit", this.beforeExitListener);
process_1.default.removeListener("exit", this.exitListener);
this.endPromise = new Deferred_1.Deferred().observe(this.closeChildProcesses(gracefully)
(0, Object_1.map)(__classPrivateFieldGet(this, _BatchCluster_onIdleInterval, "f"), timers_1.default.clearInterval);
__classPrivateFieldSet(this, _BatchCluster_onIdleInterval, undefined, "f");
process_1.default.removeListener("beforeExit", __classPrivateFieldGet(this, _BatchCluster_beforeExitListener, "f"));
process_1.default.removeListener("exit", __classPrivateFieldGet(this, _BatchCluster_exitListener, "f"));
__classPrivateFieldSet(this, _BatchCluster_endPromise, new Deferred_1.Deferred().observe(this.closeChildProcesses(gracefully)
.catch((err) => {

@@ -121,5 +141,5 @@ this.emitter.emit("endError", err);

this.emitter.emit("end");
}));
})), "f");
}
return this.endPromise;
return __classPrivateFieldGet(this, _BatchCluster_endPromise, "f");
}

@@ -136,7 +156,5 @@ /**

}
this.tasks.push(task);
__classPrivateFieldGet(this, _BatchCluster_tasks, "f").push(task);
setImmediate(() => this.onIdle());
task.promise.then(() => this.onIdle(), () => null // < ignore errors in this promise chain.
);
return task.promise;
return task.promise.finally(() => this.onIdle());
}

@@ -153,3 +171,3 @@ /**

get pendingTaskCount() {
return this.tasks.length;
return __classPrivateFieldGet(this, _BatchCluster_tasks, "f").length;
}

@@ -160,3 +178,3 @@ /**

get meanTasksPerProc() {
return this._tasksPerProc.mean;
return __classPrivateFieldGet(this, _BatchCluster_tasksPerProc, "f").mean;
}

@@ -167,3 +185,3 @@ /**

get spawnedProcCount() {
return this._spawnedProcs;
return __classPrivateFieldGet(this, _BatchCluster_spawnedProcs, "f");
}

@@ -174,3 +192,3 @@ /**

get procCount() {
return this._procs.length;
return __classPrivateFieldGet(this, _BatchCluster_procs, "f").length;
}

@@ -181,3 +199,3 @@ /**

get busyProcCount() {
return this._procs.filter(
return __classPrivateFieldGet(this, _BatchCluster_procs, "f").filter(
// don't count procs that are starting up as "busy":

@@ -190,3 +208,3 @@ (ea) => ea.taskCount > 0 && !ea.exited && !ea.idle).length;

get pendingTasks() {
return this.tasks;
return __classPrivateFieldGet(this, _BatchCluster_tasks, "f");
}

@@ -197,3 +215,3 @@ /**

get currentTasks() {
return this._procs
return __classPrivateFieldGet(this, _BatchCluster_procs, "f")
.map((ea) => ea.currentTask)

@@ -206,22 +224,4 @@ .filter((ea) => ea != null);

get internalErrorCount() {
return this._internalErrorCount;
return __classPrivateFieldGet(this, _BatchCluster_internalErrorCount, "f");
}
emitInternalError(error) {
this.emitter.emit("internalError", error);
this.logger().error("BatchCluster: INTERNAL ERROR: " + error);
this._internalErrorCount++;
}
emitStartError(error) {
this.logger().warn("BatchCluster.onStartError(): " + error);
this.emitter.emit("startError", error);
this.startErrorRate.onEvent();
if (this.startErrorRate.eventsPerMinute >
this.options.maxReasonableProcessFailuresPerMinute) {
this.emitter.emit("endError", new Error(error +
"(start errors/min: " +
this.startErrorRate.eventsPerMinute.toFixed(2) +
")"));
this.end();
}
}
/**

@@ -234,3 +234,3 @@ * Verify that each BatchProcess PID is actually alive.

const arr = [];
for (const proc of [...this._procs]) {
for (const proc of [...__classPrivateFieldGet(this, _BatchCluster_procs, "f")]) {
if (proc != null && !proc.exited && (await proc.running())) {

@@ -247,6 +247,6 @@ arr.push(proc.pid);

var _a;
return (_a = this._childEndCounts.get(why)) !== null && _a !== void 0 ? _a : 0;
return (_a = __classPrivateFieldGet(this, _BatchCluster_childEndCounts, "f").get(why)) !== null && _a !== void 0 ? _a : 0;
}
get childEndCounts() {
return (0, Object_1.fromEntries)([...this._childEndCounts.entries()]);
return (0, Object_1.fromEntries)([...__classPrivateFieldGet(this, _BatchCluster_childEndCounts, "f").entries()]);
}

@@ -258,4 +258,4 @@ /**

async closeChildProcesses(gracefully = true) {
const procs = [...this._procs];
this._procs.length = 0;
const procs = [...__classPrivateFieldGet(this, _BatchCluster_procs, "f")];
__classPrivateFieldGet(this, _BatchCluster_procs, "f").length = 0;
for (const proc of procs) {

@@ -283,16 +283,7 @@ try {

this.vacuumProcs();
while (this.execNextTask()) {
while (__classPrivateFieldGet(this, _BatchCluster_instances, "m", _BatchCluster_execNextTask).call(this)) {
//
}
if (this.tasks.length > 0) {
void this.maybeLaunchNewChild();
}
__classPrivateFieldGet(this, _BatchCluster_instances, "m", _BatchCluster_maybeLaunchNewChild).call(this);
}
maybeCheckPids() {
if (this.options.pidCheckIntervalMillis > 0 &&
this._lastPidsCheckTime + this.options.pidCheckIntervalMillis < Date.now()) {
this._lastPidsCheckTime = Date.now();
void this.pids();
}
}
/**

@@ -304,12 +295,12 @@ * Run maintenance on currently spawned child processes. This method is

vacuumProcs() {
this.maybeCheckPids();
(0, Array_1.filterInPlace)(this._procs, (proc) => {
__classPrivateFieldGet(this, _BatchCluster_instances, "m", _BatchCluster_maybeCheckPids).call(this);
(0, Array_1.filterInPlace)(__classPrivateFieldGet(this, _BatchCluster_procs, "f"), (proc) => {
// Don't bother running procs:
if (!proc.ending && !proc.idle)
return true;
const why = this._procs.length > this.options.maxProcs
const why = __classPrivateFieldGet(this, _BatchCluster_procs, "f").length > this.options.maxProcs
? "tooMany"
: proc.whyNotHealthy; // NOT whyNotReady: we don't care about busy procs
if (why != null) {
this._childEndCounts.set(why, 1 + this.countEndedChildProcs(why));
__classPrivateFieldGet(this, _BatchCluster_childEndCounts, "f").set(why, 1 + this.countEndedChildProcs(why));
void proc.end(true, why);

@@ -320,87 +311,87 @@ }

}
// NOT ASYNC: updates internal state.
execNextTask() {
if (this.tasks.length === 0 || this.ended)
return false;
const readyProc = this._procs.find((ea) => ea.ready);
// no procs are idle and healthy :(
if (readyProc == null) {
return false;
}
const task = this.tasks.shift();
if (task == null) {
this.emitInternalError(new Error("unexpected null task"));
return false;
}
const submitted = readyProc.execTask(task);
if (!submitted) {
// This isn't an internal error: the proc may have needed to run a health
// check. Let's reschedule the task and try again:
this.tasks.push(task);
// We don't want to return false here (it'll stop the onIdle loop) unless
// we actually can't submit the task:
return this.execNextTask();
}
return submitted;
}
exports.BatchCluster = BatchCluster;
_BatchCluster_tasksPerProc = new WeakMap(), _BatchCluster_logger = new WeakMap(), _BatchCluster_procs = new WeakMap(), _BatchCluster_lastSpawnedProcTime = new WeakMap(), _BatchCluster_lastPidsCheckTime = new WeakMap(), _BatchCluster_tasks = new WeakMap(), _BatchCluster_onIdleInterval = new WeakMap(), _BatchCluster_startErrorRate = new WeakMap(), _BatchCluster_spawnedProcs = new WeakMap(), _BatchCluster_endPromise = new WeakMap(), _BatchCluster_internalErrorCount = new WeakMap(), _BatchCluster_childEndCounts = new WeakMap(), _BatchCluster_beforeExitListener = new WeakMap(), _BatchCluster_exitListener = new WeakMap(), _BatchCluster_instances = new WeakSet(), _BatchCluster_maybeCheckPids = function _BatchCluster_maybeCheckPids() {
if (this.options.pidCheckIntervalMillis > 0 &&
__classPrivateFieldGet(this, _BatchCluster_lastPidsCheckTime, "f") + this.options.pidCheckIntervalMillis < Date.now()) {
__classPrivateFieldSet(this, _BatchCluster_lastPidsCheckTime, Date.now(), "f");
void this.pids();
}
cannotLaunchNewChild() {
return (this.ended ||
this.tasks.length === 0 ||
this._procs.length >= this.options.maxProcs ||
this._lastSpawnedProcTime >
Date.now() - this.options.minDelayBetweenSpawnMillis);
}, _BatchCluster_execNextTask = function _BatchCluster_execNextTask() {
if (__classPrivateFieldGet(this, _BatchCluster_tasks, "f").length === 0 || this.ended)
return false;
const readyProc = __classPrivateFieldGet(this, _BatchCluster_procs, "f").find((ea) => ea.ready);
// no procs are idle and healthy :(
if (readyProc == null) {
return false;
}
// NOT ASYNC: updates internal state.
maybeLaunchNewChild() {
if (this.cannotLaunchNewChild())
return;
// prevent other runs:
this._lastSpawnedProcTime = Date.now();
void this._maybeLaunchNewChild();
const task = __classPrivateFieldGet(this, _BatchCluster_tasks, "f").shift();
if (task == null) {
this.emitter.emit("internalError", new Error("unexpected null task"));
return false;
}
async _maybeLaunchNewChild() {
// don't check cannotLaunchNewChild() again here: it'll be true because we
// just set _lastSpawnedProcTime.
if (this.ended || this._procs.length >= this.options.maxProcs)
const submitted = readyProc.execTask(task);
if (!submitted) {
// This isn't an internal error: the proc may have needed to run a health
// check. Let's reschedule the task and try again:
__classPrivateFieldGet(this, _BatchCluster_tasks, "f").push(task);
// We don't want to return false here (it'll stop the onIdle loop) unless
// we actually can't submit the task:
return __classPrivateFieldGet(this, _BatchCluster_instances, "m", _BatchCluster_execNextTask).call(this);
}
return submitted;
}, _BatchCluster_maybeLaunchNewChild = function _BatchCluster_maybeLaunchNewChild() {
if (!this.ended &&
__classPrivateFieldGet(this, _BatchCluster_tasks, "f").length > 0 &&
__classPrivateFieldGet(this, _BatchCluster_procs, "f").length < this.options.maxProcs &&
__classPrivateFieldGet(this, _BatchCluster_lastSpawnedProcTime, "f") + this.options.minDelayBetweenSpawnMillis <=
Date.now()) {
// prevent multiple concurrent spawns:
__classPrivateFieldSet(this, _BatchCluster_lastSpawnedProcTime, Date.now(), "f");
void __classPrivateFieldGet(this, _BatchCluster_instances, "m", _BatchCluster_spawnChild).call(this);
}
}, _BatchCluster_spawnChild =
// must only be called by .#maybeLaunchNewChild()
async function _BatchCluster_spawnChild() {
var _a;
if (this.ended)
return;
try {
const child = await this.options.processFactory();
const pid = child.pid;
if (pid == null) {
this.emitter.emit("childExit", child);
return;
try {
const child = await this.options.processFactory();
const pid = child.pid;
if (pid == null) {
this.emitter.emit("childExit", child);
return;
}
const proc = new BatchProcess_1.BatchProcess(child, this.options);
if (this.ended) {
void proc.end(false, "ended");
return;
}
// Bookkeeping (even if we need to shut down `proc`):
this._spawnedProcs++;
this.emitter.emit("childStart", child);
void proc.exitPromise.then(() => {
this._tasksPerProc.push(proc.taskCount);
this.emitter.emit("childExit", child);
});
// Did we call _mayLaunchNewChild() a couple times in parallel?
if (this._procs.length >= this.options.maxProcs) {
// only vacuum if we're at the limit
this.vacuumProcs();
}
if (this._procs.length >= this.options.maxProcs) {
void proc.end(false, "maxProcs");
return;
}
else {
this._procs.push(proc);
return proc;
}
}
catch (err) {
this.emitter.emit("startError", err);
const proc = new BatchProcess_1.BatchProcess(child, this.options);
if (this.ended) {
void proc.end(false, "ended");
return;
}
// Bookkeeping (even if we need to shut down `proc`):
__classPrivateFieldSet(this, _BatchCluster_spawnedProcs, (_a = __classPrivateFieldGet(this, _BatchCluster_spawnedProcs, "f"), _a++, _a), "f");
this.emitter.emit("childStart", child);
void proc.exitPromise.then(() => {
__classPrivateFieldGet(this, _BatchCluster_tasksPerProc, "f").push(proc.taskCount);
this.emitter.emit("childExit", child);
});
// Did we call _mayLaunchNewChild() a couple times in parallel?
if (__classPrivateFieldGet(this, _BatchCluster_procs, "f").length >= this.options.maxProcs) {
// only vacuum if we're at the limit
this.vacuumProcs();
}
if (__classPrivateFieldGet(this, _BatchCluster_procs, "f").length >= this.options.maxProcs) {
void proc.end(false, "maxProcs");
return;
}
else {
__classPrivateFieldGet(this, _BatchCluster_procs, "f").push(proc);
return proc;
}
}
}
exports.BatchCluster = BatchCluster;
catch (err) {
this.emitter.emit("startError", (0, Error_1.asError)(err));
return;
}
};
//# sourceMappingURL=BatchCluster.js.map
/// <reference types="node" />
import child_process from "child_process";
import events from "events";
import _cp from "child_process";
import { BatchProcess } from "./BatchProcess";
import { Task } from "./Task";
export declare class BatchClusterEmitter {
readonly emitter: events;
/**
* This interface describes the BatchCluster's event names as fields. The type
* of the field describes the event data payload.
*
* See {@link BatchClusterEmitter} for more details.
*/
export interface BatchClusterEvents {
/**
* Emitted when a child process has started
*/
on(event: "childStart", listener: (childProcess: child_process.ChildProcess) => void): void;
childStart: (childProcess: _cp.ChildProcess) => void;
/**
* Emitted when a child process has exitted
*/
on(event: "childExit", listener: (childProcess: child_process.ChildProcess) => void): void;
childExit: (childProcess: _cp.ChildProcess) => void;
/**
* Emitted when a child process has an error when spawning
*/
on(event: "startError", listener: (err: Error) => void): void;
startError: (err: Error) => void;
/**
* Emitted when an internal consistency check fails
*/
on(event: "internalError", listener: (err: Error) => void): void;
internalError: (err: Error) => void;
/**

@@ -28,28 +32,64 @@ * Emitted when tasks receive data, which may be partial chunks from the task

*/
on(event: "taskData", listener: (data: Buffer | string, task: Task | undefined) => void): void;
taskData: (data: Buffer | string, task: Task | undefined, proc: BatchProcess) => void;
/**
* Emitted when a task has been resolved
*/
on(event: "taskResolved", listener: (task: Task, proc: BatchProcess) => void): void;
taskResolved: (task: Task, proc: BatchProcess) => void;
/**
* Emitted when a task times out. Note that a `taskError` event always succeeds these events.
*/
taskTimeout: (timeoutMs: number, task: Task, proc: BatchProcess) => void;
/**
* Emitted when a task has an error
*/
on(event: "taskError", listener: (err: Error, task: Task, proc: BatchProcess) => void): void;
taskError: (err: Error, task: Task, proc: BatchProcess) => void;
/**
* Emitted when a process fails health checks
*/
on(event: "healthCheckError", listener: (err: Error, proc: BatchProcess) => void): void;
healthCheckError: (err: Error, proc: BatchProcess) => void;
/**
* Emitted when a child process has an error during shutdown
*/
on(event: "endError", listener: (err: Error) => void): void;
endError: (err: Error) => void;
/**
* Emitted when this instance is in the process of ending.
*/
on(event: "beforeEnd", listener: () => void): void;
beforeEnd: () => void;
/**
* Emitted when a task is completed, asking for more work to be scheduled, if
* possible.
*/
idle: () => void;
/**
* Emitted when this instance has ended. No child processes should remain at
* this point.
*/
on(event: "end", listener: () => void): void;
end: () => void;
}
declare type Args<E extends keyof BatchClusterEvents> = BatchClusterEvents[E] extends (...args: infer A) => void ? A : never;
/**
* The BatchClusterEmitter signature is built up automatically by the
* {@link BatchClusterEvents} interface, which ensures `.on`, `.off`, and
* `.emit` signatures are all consistent, and include the correct data payloads
* for all of BatchCluster's events.
*
* This approach has some benefits:
*
* - it ensures that on(), off(), and emit() signatures are all consistent,
* - supports editor autocomplete, and
* - offers strong typing,
*
* but has one drawback:
*
* - jsdocs don't list all signatures directly: you have to visit the event
* source interface.
*
* See {@link BatchClusterEvents} for a the list of events and their payload
* signatures
*/
export interface BatchClusterEmitter {
on<E extends keyof BatchClusterEvents>(event: E, listener: (...args: Args<E>) => void): this;
off<E extends keyof BatchClusterEvents>(event: E, listener: (...args: Args<E>) => void): this;
emit<E extends keyof BatchClusterEvents>(event: E, ...args: Args<E>): boolean;
}
export {};
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.BatchClusterEmitter = void 0;
const events_1 = __importDefault(require("events"));
class BatchClusterEmitter {
constructor() {
this.emitter = new events_1.default.EventEmitter();
}
on(event, listener) {
this.emitter.on(event, listener);
}
}
exports.BatchClusterEmitter = BatchClusterEmitter;
//# sourceMappingURL=BatchClusterEmitter.js.map
import { ChildProcessFactory } from "./BatchCluster";
import { BatchProcessObserver } from "./BatchProcessObserver";
import { BatchClusterEmitter } from "./BatchClusterEmitter";
import { BatchProcessOptions } from "./BatchProcessOptions";

@@ -153,5 +153,5 @@ import { InternalBatchProcessOptions } from "./InternalBatchProcessOptions";

export interface WithObserver {
observer: BatchProcessObserver;
observer: BatchClusterEmitter;
}
export declare type AllOpts = BatchClusterOptions & InternalBatchProcessOptions & ChildProcessFactory & WithObserver;
export declare function verifyOptions(opts: Partial<BatchClusterOptions> & BatchProcessOptions & ChildProcessFactory & WithObserver): AllOpts;

@@ -10,2 +10,3 @@ /// <reference types="node" />

export declare class BatchProcess {
#private;
readonly proc: _cp.ChildProcess;

@@ -16,20 +17,4 @@ readonly opts: InternalBatchProcessOptions;

readonly start: number;
private lastHealthCheck;
private healthCheckFailures;
readonly startupTaskId: number;
private readonly logger;
private lastJobFinshedAt;
private dead;
failedTaskCount: number;
private _taskCount;
private _ending;
/**
* Supports non-polling notification of process exit
*/
private readonly resolvedOnExit;
/**
* Should be undefined if this instance is not currently processing a task.
*/
private _currentTask;
private currentTaskTimeout;
constructor(proc: _cp.ChildProcess, opts: InternalBatchProcessOptions);

@@ -39,7 +24,9 @@ get currentTask(): Task | undefined;

/**
* @return {boolean} true if `this.end()` has been requested or the child process has exited.
* @return true if `this.end()` has been requested or the child process has
* exited.
*/
get ending(): boolean;
/**
* @return true if the child process has exited and is no longer in the process table.
* @return true if the child process has exited and is no longer in the
* process table.
*/

@@ -50,3 +37,3 @@ get exited(): boolean;

/**
* @returns true if the process doesn't need to be recycled.
* @return true if the process doesn't need to be recycled.
*/

@@ -70,3 +57,2 @@ get healthy(): boolean;

execTask(task: Task): boolean;
private _execTask;
/**

@@ -80,10 +66,2 @@ * End this child process.

end(gracefully: boolean | undefined, source: string): Promise<void> | undefined;
private _end;
private awaitNotRunning;
private onTimeout;
private onError;
private onExit;
private onStderr;
private onStdout;
private clearCurrentTask;
}
"use strict";
var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
if (kind === "m") throw new TypeError("Private method is not writable");
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
};
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
};
var _BatchProcess_instances, _BatchProcess_lastHealthCheck, _BatchProcess_healthCheckFailures, _BatchProcess_logger, _BatchProcess_lastJobFinshedAt, _BatchProcess_dead, _BatchProcess_taskCount, _BatchProcess_ending, _BatchProcess_resolvedOnExit, _BatchProcess_currentTask, _BatchProcess_currentTaskTimeout, _BatchProcess_execTask, _BatchProcess_end, _BatchProcess_awaitNotRunning, _BatchProcess_onTimeout, _BatchProcess_onError, _BatchProcess_onExit, _BatchProcess_onStderr, _BatchProcess_onStdout, _BatchProcess_clearCurrentTask;
Object.defineProperty(exports, "__esModule", { value: true });

@@ -20,17 +32,34 @@ exports.BatchProcess = void 0;

this.opts = opts;
_BatchProcess_instances.add(this);
this.start = Date.now();
this.lastHealthCheck = Date.now();
this.healthCheckFailures = 0;
this.lastJobFinshedAt = Date.now();
_BatchProcess_lastHealthCheck.set(this, Date.now());
_BatchProcess_healthCheckFailures.set(this, 0);
_BatchProcess_logger.set(this, void 0);
_BatchProcess_lastJobFinshedAt.set(this, Date.now()
// Only set to true when `proc.pid` is no longer in the process table.
this.dead = false;
);
// Only set to true when `proc.pid` is no longer in the process table.
_BatchProcess_dead.set(this, false);
this.failedTaskCount = 0;
this._taskCount = -1; // don't count the startupTask
this._ending = false;
_BatchProcess_taskCount.set(this, -1); // don't count the startupTask
_BatchProcess_ending.set(this, false
/**
* Supports non-polling notification of process exit
*/
this.resolvedOnExit = new Deferred_1.Deferred();
);
/**
* Supports non-polling notification of process exit
*/
_BatchProcess_resolvedOnExit.set(this, new Deferred_1.Deferred()
/**
* Should be undefined if this instance is not currently processing a task.
*/
);
/**
* Should be undefined if this instance is not currently processing a task.
*/
_BatchProcess_currentTask.set(this, void 0);
_BatchProcess_currentTaskTimeout.set(this, void 0);
this.name = "BatchProcess(" + proc.pid + ")";
this.logger = opts.logger;
__classPrivateFieldSet(this, _BatchProcess_logger, opts.logger, "f");
// don't let node count the child processes as a reason to stay alive

@@ -42,18 +71,18 @@ this.proc.unref();

this.pid = proc.pid;
this.proc.on("error", (err) => this.onError("proc.error", err));
this.proc.on("close", () => this.onExit("close"));
this.proc.on("exit", () => this.onExit("exit"));
this.proc.on("disconnect", () => this.onExit("disconnect"));
this.proc.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "proc.error", err));
this.proc.on("close", () => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onExit).call(this, "close"));
this.proc.on("exit", () => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onExit).call(this, "exit"));
this.proc.on("disconnect", () => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onExit).call(this, "disconnect"));
const stdin = this.proc.stdin;
if (stdin == null)
throw new Error("Given proc had no stdin");
stdin.on("error", (err) => this.onError("stdin.error", err));
stdin.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "stdin.error", err));
const stdout = this.proc.stdout;
if (stdout == null)
throw new Error("Given proc had no stdout");
stdout.on("error", (err) => this.onError("stdout.error", err));
stdout.on("data", (d) => this.onStdout(d));
stdout.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "stdout.error", err));
stdout.on("data", (d) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onStdout).call(this, d));
(0, Object_1.map)(this.proc.stderr, (stderr) => {
stderr.on("error", (err) => this.onError("stderr.error", err));
stderr.on("data", (err) => this.onStderr(err));
stderr.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "stderr.error", err));
stderr.on("data", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onStderr).call(this, err));
});

@@ -65,25 +94,27 @@ const startupTask = new Task_1.Task(opts.versionCommand, Parser_1.SimpleParser);

// internal bug and the process starts, don't veto because there's a bug:
this.opts.observer.onInternalError(new Error(this.name + " startup task was not submitted"));
this.opts.observer.emit("internalError", new Error(this.name + " startup task was not submitted"));
}
}
get currentTask() {
return this._currentTask;
return __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
}
get taskCount() {
return this._taskCount;
return __classPrivateFieldGet(this, _BatchProcess_taskCount, "f");
}
/**
* @return {boolean} true if `this.end()` has been requested or the child process has exited.
* @return true if `this.end()` has been requested or the child process has
* exited.
*/
get ending() {
return this._ending;
return __classPrivateFieldGet(this, _BatchProcess_ending, "f");
}
/**
* @return true if the child process has exited and is no longer in the process table.
* @return true if the child process has exited and is no longer in the
* process table.
*/
get exited() {
return this.resolvedOnExit.settled;
return __classPrivateFieldGet(this, _BatchProcess_resolvedOnExit, "f").settled;
}
get exitPromise() {
return this.resolvedOnExit.promise;
return __classPrivateFieldGet(this, _BatchProcess_resolvedOnExit, "f").promise;
}

@@ -95,6 +126,6 @@ get whyNotHealthy() {

}
else if (this._ending) {
else if (__classPrivateFieldGet(this, _BatchProcess_ending, "f")) {
return "ending";
}
else if (this.healthCheckFailures > 0) {
else if (__classPrivateFieldGet(this, _BatchProcess_healthCheckFailures, "f") > 0) {
return "unhealthy";

@@ -121,3 +152,3 @@ }

}
else if ((_b = (this.opts.taskTimeoutMillis > 0 && ((_a = this._currentTask) === null || _a === void 0 ? void 0 : _a.runtimeMs))) !== null && _b !== void 0 ? _b : 0 > this.opts.taskTimeoutMillis) {
else if ((_b = (this.opts.taskTimeoutMillis > 0 && ((_a = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f")) === null || _a === void 0 ? void 0 : _a.runtimeMs))) !== null && _b !== void 0 ? _b : 0 > this.opts.taskTimeoutMillis) {
return "timeout";

@@ -130,3 +161,3 @@ }

/**
* @returns true if the process doesn't need to be recycled.
* @return true if the process doesn't need to be recycled.
*/

@@ -143,6 +174,6 @@ get healthy() {

get idle() {
return this._currentTask == null;
return __classPrivateFieldGet(this, _BatchProcess_currentTask, "f") == null;
}
get idleMs() {
return this.idle ? Date.now() - this.lastJobFinshedAt : -1;
return this.idle ? Date.now() - __classPrivateFieldGet(this, _BatchProcess_lastJobFinshedAt, "f") : -1;
}

@@ -153,3 +184,3 @@ /**

async running() {
if (this.dead) {
if (__classPrivateFieldGet(this, _BatchProcess_dead, "f")) {
// this.dead is only set if the process table has said we're dead.

@@ -162,5 +193,5 @@ return false;

// once a PID leaves the process table, it's gone for good:
this.dead = true;
this._ending = true;
this.resolvedOnExit.resolve();
__classPrivateFieldSet(this, _BatchProcess_dead, true, "f");
__classPrivateFieldSet(this, _BatchProcess_ending, true, "f");
__classPrivateFieldGet(this, _BatchProcess_resolvedOnExit, "f").resolve();
}

@@ -178,3 +209,3 @@ return alive;

async ended() {
return this.dead || (await this.notRunning());
return __classPrivateFieldGet(this, _BatchProcess_dead, "f") || (await this.notRunning());
}

@@ -194,75 +225,21 @@ async notEnded() {

this.opts.healthCheckIntervalMillis > 0 &&
Date.now() - this.lastHealthCheck > this.opts.healthCheckIntervalMillis) {
this.lastHealthCheck = Date.now();
Date.now() - __classPrivateFieldGet(this, _BatchProcess_lastHealthCheck, "f") > this.opts.healthCheckIntervalMillis) {
__classPrivateFieldSet(this, _BatchProcess_lastHealthCheck, Date.now(), "f");
const t = new Task_1.Task(this.opts.healthCheckCommand, Parser_1.SimpleParser);
t.promise
.catch((err) => {
var _a;
// console.log("execTask#" + this.pid + ": health check failed", err)
this.opts.observer.onHealthCheckError(err, this);
this.healthCheckFailures++;
this.opts.observer.emit("healthCheckError", err, this);
__classPrivateFieldSet(this, _BatchProcess_healthCheckFailures, (_a = __classPrivateFieldGet(this, _BatchProcess_healthCheckFailures, "f"), _a++, _a), "f");
})
.finally(() => {
this.lastHealthCheck = Date.now();
__classPrivateFieldSet(this, _BatchProcess_lastHealthCheck, Date.now(), "f");
});
this._execTask(t);
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_execTask).call(this, t);
return false;
}
// console.log("running " + task + " on " + this.pid)
return this._execTask(task);
return __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_execTask).call(this, task);
}
_execTask(task) {
var _a;
if (this._ending)
return false;
this._taskCount++;
this._currentTask = task;
const cmd = (0, String_1.ensureSuffix)(task.command, "\n");
const isStartupTask = task.taskId === this.startupTaskId;
const timeoutMs = isStartupTask
? this.opts.spawnTimeoutMillis
: this.opts.taskTimeoutMillis;
if (timeoutMs > 0) {
// logger().trace(this.name + ".execTask(): scheduling timeout", {
// command: task.command,
// timeoutMs,
// pid: this.pid
// })
this.currentTaskTimeout = setTimeout(() => this.onTimeout(task, timeoutMs), timeoutMs);
}
// CAREFUL! If you add a .catch or .finally, the pipeline can emit unhandled
// rejections:
void task.promise.then(() => {
this.clearCurrentTask(task);
this.opts.observer.onTaskResolved(task, this);
}, (err) => {
this.clearCurrentTask(task);
if (isStartupTask) {
this.opts.observer.onStartError(err);
}
else {
this.opts.observer.onTaskError(err, task, this);
}
});
try {
task.onStart(this.opts);
const stdin = (_a = this.proc) === null || _a === void 0 ? void 0 : _a.stdin;
if (stdin == null || stdin.destroyed) {
task.reject(new Error("proc.stdin unexpectedly closed"));
return false;
}
else {
stdin.write(cmd, (err) => {
if (err != null) {
task.reject(err);
}
});
return true;
}
}
catch (err) {
// child process went away. We should too.
this.end(false, "proc.stdin.write(cmd)");
return false;
}
}
/**

@@ -277,163 +254,213 @@ * End this child process.

end(gracefully = true, source) {
if (this._ending) {
if (__classPrivateFieldGet(this, _BatchProcess_ending, "f")) {
return undefined;
}
else {
this._ending = true;
return this._end(gracefully, source);
__classPrivateFieldSet(this, _BatchProcess_ending, true, "f");
return __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_end).call(this, gracefully, source);
}
}
// NOTE: Must only be invoked by this.end(), and only expected to be invoked
// once per instance.
async _end(gracefully = true, source) {
const lastTask = this._currentTask;
this.clearCurrentTask();
// NOTE: We wait on all tasks (even startup tasks) so we can assert that
// BatchCluster is idle (and this proc is idle) when the end promise is
// resolved.
// NOTE: holy crap there are a lot of notes here.
if (lastTask != null) {
try {
// Let's wait for streams to flush, as that may actually allow the task
// to complete successfully. Let's not wait forever, though.
await Promise.race([lastTask.promise, (0, Async_1.delay)(gracefully ? 2000 : 250)]);
}
catch {
//
}
if (lastTask.pending) {
lastTask.reject(new Error(`end() called before task completed (${JSON.stringify({
gracefully,
source,
})})`));
}
}
exports.BatchProcess = BatchProcess;
_BatchProcess_lastHealthCheck = new WeakMap(), _BatchProcess_healthCheckFailures = new WeakMap(), _BatchProcess_logger = new WeakMap(), _BatchProcess_lastJobFinshedAt = new WeakMap(), _BatchProcess_dead = new WeakMap(), _BatchProcess_taskCount = new WeakMap(), _BatchProcess_ending = new WeakMap(), _BatchProcess_resolvedOnExit = new WeakMap(), _BatchProcess_currentTask = new WeakMap(), _BatchProcess_currentTaskTimeout = new WeakMap(), _BatchProcess_instances = new WeakSet(), _BatchProcess_execTask = function _BatchProcess_execTask(task) {
var _a;
var _b;
if (__classPrivateFieldGet(this, _BatchProcess_ending, "f"))
return false;
__classPrivateFieldSet(this, _BatchProcess_taskCount, (_b = __classPrivateFieldGet(this, _BatchProcess_taskCount, "f"), _b++, _b), "f");
__classPrivateFieldSet(this, _BatchProcess_currentTask, task, "f");
const cmd = (0, String_1.ensureSuffix)(task.command, "\n");
const isStartupTask = task.taskId === this.startupTaskId;
const timeoutMs = isStartupTask
? this.opts.spawnTimeoutMillis
: this.opts.taskTimeoutMillis;
if (timeoutMs > 0) {
// logger().trace(this.name + ".execTask(): scheduling timeout", {
// command: task.command,
// timeoutMs,
// pid: this.pid
// })
__classPrivateFieldSet(this, _BatchProcess_currentTaskTimeout, setTimeout(() => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onTimeout).call(this, task, timeoutMs), timeoutMs), "f");
}
// CAREFUL! If you add a .catch or .finally, the pipeline can emit unhandled
// rejections:
void task.promise.then(() => {
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this, task);
this.opts.observer.emit("taskResolved", task, this);
}, (err) => {
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this, task);
if (isStartupTask) {
this.opts.observer.emit("startError", err);
}
const cmd = (0, Object_1.map)(this.opts.exitCommand, (ea) => (0, String_1.ensureSuffix)(ea, "\n"));
// proc cleanup:
(0, Error_1.tryEach)([
() => (0, Stream_1.mapNotDestroyed)(this.proc.stdin, (ea) => ea.end(cmd)),
() => (0, Stream_1.mapNotDestroyed)(this.proc.stdout, (ea) => ea.destroy()),
() => (0, Stream_1.mapNotDestroyed)(this.proc.stderr, (ea) => ea.destroy()),
() => this.proc.disconnect(),
]);
if (this.opts.cleanupChildProcs &&
gracefully &&
this.opts.endGracefulWaitTimeMillis > 0 &&
(await this.running())) {
// Wait for the end command to take effect:
await this.awaitNotRunning(this.opts.endGracefulWaitTimeMillis / 2);
// If it's still running, send the pid a signal:
if ((await this.running()) && this.proc.pid != null)
await (0, Pids_1.kill)(this.proc.pid);
// Wait for the signal handler to work:
await this.awaitNotRunning(this.opts.endGracefulWaitTimeMillis / 2);
else {
this.opts.observer.emit("taskError", err, task, this);
}
if (this.opts.cleanupChildProcs &&
this.proc.pid != null &&
(await this.running())) {
this.logger().warn(this.name + ".end(): force-killing still-running child.");
await (0, Pids_1.kill)(this.proc.pid, true);
});
try {
task.onStart(this.opts);
const stdin = (_a = this.proc) === null || _a === void 0 ? void 0 : _a.stdin;
if (stdin == null || stdin.destroyed) {
task.reject(new Error("proc.stdin unexpectedly closed"));
return false;
}
return this.resolvedOnExit;
else {
stdin.write(cmd, (err) => {
if (err != null) {
task.reject(err);
}
});
return true;
}
}
awaitNotRunning(timeout) {
return (0, Async_1.until)(() => this.notRunning(), timeout);
catch (err) {
// child process went away. We should too.
this.end(false, "proc.stdin.write(cmd)");
return false;
}
onTimeout(task, timeoutMs) {
if (task.pending) {
this.onError("timeout", new Error("waited " + timeoutMs + "ms"), task);
}, _BatchProcess_end =
// NOTE: Must only be invoked by this.end(), and only expected to be invoked
// once per instance.
async function _BatchProcess_end(gracefully = true, source) {
const lastTask = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this);
// NOTE: We wait on all tasks (even startup tasks) so we can assert that
// BatchCluster is idle (and this proc is idle) when the end promise is
// resolved.
// NOTE: holy crap there are a lot of notes here.
if (lastTask != null) {
try {
// Let's wait for streams to flush, as that may actually allow the task
// to complete successfully. Let's not wait forever, though.
await Promise.race([lastTask.promise, (0, Async_1.delay)(gracefully ? 2000 : 250)]);
}
}
onError(source, _error, task) {
if (this._ending) {
// We're ending already, so don't propagate the error.
// This is expected due to race conditions stdin EPIPE and process shutdown.
this.logger().debug(this.name + ".onError() post-end (expected and not propagated)", {
catch {
//
}
if (lastTask.pending) {
lastTask.reject(new Error(`end() called before task completed (${JSON.stringify({
gracefully,
source,
_error,
task,
});
return;
})})`));
}
if (task == null) {
task = this._currentTask;
}
const error = new Error(source + ": " + (0, Error_1.cleanError)(_error.message));
this.logger().warn(this.name + ".onError()", {
}
const cmd = (0, Object_1.map)(this.opts.exitCommand, (ea) => (0, String_1.ensureSuffix)(ea, "\n"));
// proc cleanup:
(0, Error_1.tryEach)([
() => (0, Stream_1.mapNotDestroyed)(this.proc.stdin, (ea) => ea.end(cmd)),
() => (0, Stream_1.mapNotDestroyed)(this.proc.stdout, (ea) => ea.destroy()),
() => (0, Stream_1.mapNotDestroyed)(this.proc.stderr, (ea) => ea.destroy()),
() => this.proc.disconnect(),
]);
if (this.opts.cleanupChildProcs &&
gracefully &&
this.opts.endGracefulWaitTimeMillis > 0 &&
(await this.running())) {
// Wait for the end command to take effect:
await __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_awaitNotRunning).call(this, this.opts.endGracefulWaitTimeMillis / 2);
// If it's still running, send the pid a signal:
if ((await this.running()) && this.proc.pid != null)
await (0, Pids_1.kill)(this.proc.pid);
// Wait for the signal handler to work:
await __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_awaitNotRunning).call(this, this.opts.endGracefulWaitTimeMillis / 2);
}
if (this.opts.cleanupChildProcs &&
this.proc.pid != null &&
(await this.running())) {
__classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".end(): force-killing still-running child.");
await (0, Pids_1.kill)(this.proc.pid, true);
}
return __classPrivateFieldGet(this, _BatchProcess_resolvedOnExit, "f");
}, _BatchProcess_awaitNotRunning = function _BatchProcess_awaitNotRunning(timeout) {
return (0, Async_1.until)(() => this.notRunning(), timeout);
}, _BatchProcess_onTimeout = function _BatchProcess_onTimeout(task, timeoutMs) {
if (task.pending) {
this.opts.observer.emit("taskTimeout", timeoutMs, task, this);
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "timeout", new Error("waited " + timeoutMs + "ms"), task);
}
}, _BatchProcess_onError = function _BatchProcess_onError(source, _error, task) {
if (__classPrivateFieldGet(this, _BatchProcess_ending, "f")) {
// We're ending already, so don't propagate the error.
// This is expected due to race conditions stdin EPIPE and process shutdown.
__classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).debug(this.name + ".onError() post-end (expected and not propagated)", {
source,
task: (0, Object_1.map)(task, (t) => t.command),
error,
_error,
task,
});
if (_error.stack != null) {
// Error stacks, if set, will not be redefined from a rethrow:
error.stack = (0, Error_1.cleanError)(_error.stack);
}
// clear the task before ending so the onExit from end() doesn't retry the task:
this.clearCurrentTask();
void this.end(false, "onError(" + source + ")");
if (task != null && this.taskCount === 1) {
this.logger().warn(this.name + ".onError(): startup task failed: " + error);
this.opts.observer.onStartError(error);
}
if (task != null) {
if (task.pending) {
task.reject(error);
}
else {
this.opts.observer.onInternalError(new Error(`${this.name}.onError(${error}) cannot reject already-fulfilled task.`));
}
}
return;
}
onExit(source) {
this.resolvedOnExit.resolve();
// no need to be graceful, it's just for bookkeeping:
return this.end(false, "onExit(" + source + ")");
if (task == null) {
task = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
}
onStderr(data) {
if ((0, String_1.blank)(data))
return;
this.logger().info("onStderr(" + this.pid + "):" + data);
const task = this._currentTask;
if (task != null && task.pending) {
task.onStderr(data);
}
else if (!this._ending) {
this.end(false, "onStderr (no current task)");
// If we're ending and there isn't a task, don't worry about it.
// Otherwise:
this.opts.observer.onInternalError(new Error("onStderr(" +
String(data).trim() +
") no pending currentTask (task: " +
task +
")"));
}
const error = new Error(source + ": " + (0, Error_1.cleanError)(_error.message));
__classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".onError()", {
source,
task: (0, Object_1.map)(task, (t) => t.command),
error,
});
if (_error.stack != null) {
// Error stacks, if set, will not be redefined from a rethrow:
error.stack = (0, Error_1.cleanError)(_error.stack);
}
onStdout(data) {
// logger().debug("onStdout(" + this.pid + "):" + data)
if (data == null)
return;
const task = this._currentTask;
if (task != null && task.pending) {
this.opts.observer.onTaskData(data, task);
task.onStdout(data);
// clear the task before ending so the onExit from end() doesn't retry the task:
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this);
void this.end(false, "onError(" + source + ")");
if (task != null && this.taskCount === 1) {
__classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".onError(): startup task failed: " + error);
this.opts.observer.emit("startError", error);
}
if (task != null) {
if (task.pending) {
task.reject(error);
}
else if (!this._ending) {
this.end(false, "onStdout (no current task)");
// If we're ending and there isn't a task, don't worry about it.
// Otherwise:
this.opts.observer.onInternalError(new Error("onStdout(" + data + ") no pending currentTask (task: " + task + ")"));
else {
this.opts.observer.emit("internalError", new Error(`${this.name}.onError(${error}) cannot reject already-fulfilled task.`));
}
}
clearCurrentTask(task) {
var _a;
setImmediate(() => this.opts.observer.onIdle());
if (task != null && task.taskId !== ((_a = this._currentTask) === null || _a === void 0 ? void 0 : _a.taskId))
return;
(0, Object_1.map)(this.currentTaskTimeout, (ea) => clearTimeout(ea));
this.currentTaskTimeout = undefined;
this._currentTask = undefined;
this.lastJobFinshedAt = Date.now();
}, _BatchProcess_onExit = function _BatchProcess_onExit(source) {
__classPrivateFieldGet(this, _BatchProcess_resolvedOnExit, "f").resolve();
// no need to be graceful, it's just for bookkeeping:
return this.end(false, "onExit(" + source + ")");
}, _BatchProcess_onStderr = function _BatchProcess_onStderr(data) {
if ((0, String_1.blank)(data))
return;
__classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).info("onStderr(" + this.pid + "):" + data);
const task = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
if (task != null && task.pending) {
task.onStderr(data);
}
}
exports.BatchProcess = BatchProcess;
else if (!__classPrivateFieldGet(this, _BatchProcess_ending, "f")) {
this.end(false, "onStderr (no current task)");
// If we're ending and there isn't a task, don't worry about it.
// Otherwise:
this.opts.observer.emit("internalError", new Error("onStderr(" +
String(data).trim() +
") no pending currentTask (task: " +
task +
")"));
}
}, _BatchProcess_onStdout = function _BatchProcess_onStdout(data) {
// logger().debug("onStdout(" + this.pid + "):" + data)
if (data == null)
return;
const task = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
if (task != null && task.pending) {
this.opts.observer.emit("taskData", data, task, this);
task.onStdout(data);
}
else if (!__classPrivateFieldGet(this, _BatchProcess_ending, "f")) {
this.end(false, "onStdout (no current task)");
// If we're ending and there isn't a task, don't worry about it.
// Otherwise:
this.opts.observer.emit("internalError", new Error("onStdout(" + data + ") no pending currentTask (task: " + task + ")"));
}
}, _BatchProcess_clearCurrentTask = function _BatchProcess_clearCurrentTask(task) {
var _a;
setImmediate(() => this.opts.observer.emit("idle"));
if (task != null && task.taskId !== ((_a = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f")) === null || _a === void 0 ? void 0 : _a.taskId))
return;
(0, Object_1.map)(__classPrivateFieldGet(this, _BatchProcess_currentTaskTimeout, "f"), (ea) => clearTimeout(ea));
__classPrivateFieldSet(this, _BatchProcess_currentTaskTimeout, undefined, "f");
__classPrivateFieldSet(this, _BatchProcess_currentTask, undefined, "f");
__classPrivateFieldSet(this, _BatchProcess_lastJobFinshedAt, Date.now(), "f");
};
//# sourceMappingURL=BatchProcess.js.map

@@ -7,20 +7,21 @@ /**

export declare class Deferred<T> implements PromiseLike<T> {
#private;
readonly [Symbol.toStringTag] = "Deferred";
readonly promise: Promise<T>;
private _resolve;
private _reject;
private state;
constructor();
/**
* @return `true` iff `resolve` has been invoked.
* @return `true` iff neither `resolve` nor `rejected` have been invoked
*/
get pending(): boolean;
/**
* @return `true` iff `resolve` has been invoked.
* @return `true` iff `resolve` has been invoked
*/
get fulfilled(): boolean;
/**
* @return `true` iff `resolve` has been invoked.
* @return `true` iff `rejected` has been invoked
*/
get rejected(): boolean;
/**
* @return `true` iff `resolve` or `rejected` have been invoked
*/
get settled(): boolean;

@@ -27,0 +28,0 @@ then<TResult1 = T, TResult2 = never>(onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | undefined | null, onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null): Promise<TResult1 | TResult2>;

"use strict";
var _a;
var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
if (kind === "m") throw new TypeError("Private method is not writable");
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
};
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
};
var _Deferred_resolve, _Deferred_reject, _Deferred_state, _a;
Object.defineProperty(exports, "__esModule", { value: true });

@@ -22,28 +33,33 @@ exports.Deferred = void 0;

this[_a] = "Deferred";
this.state = State.pending;
_Deferred_resolve.set(this, void 0);
_Deferred_reject.set(this, void 0);
_Deferred_state.set(this, State.pending);
this.promise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
__classPrivateFieldSet(this, _Deferred_resolve, resolve, "f");
__classPrivateFieldSet(this, _Deferred_reject, reject, "f");
});
}
/**
* @return `true` iff `resolve` has been invoked.
* @return `true` iff neither `resolve` nor `rejected` have been invoked
*/
get pending() {
return this.state === State.pending;
return __classPrivateFieldGet(this, _Deferred_state, "f") === State.pending;
}
/**
* @return `true` iff `resolve` has been invoked.
* @return `true` iff `resolve` has been invoked
*/
get fulfilled() {
return this.state === State.fulfilled;
return __classPrivateFieldGet(this, _Deferred_state, "f") === State.fulfilled;
}
/**
* @return `true` iff `resolve` has been invoked.
* @return `true` iff `rejected` has been invoked
*/
get rejected() {
return this.state === State.rejected;
return __classPrivateFieldGet(this, _Deferred_state, "f") === State.rejected;
}
/**
* @return `true` iff `resolve` or `rejected` have been invoked
*/
get settled() {
return this.fulfilled || this.rejected;
return __classPrivateFieldGet(this, _Deferred_state, "f") !== State.pending;
}

@@ -61,4 +77,4 @@ then(onfulfilled, onrejected) {

else {
this.state = State.fulfilled;
this._resolve(value);
__classPrivateFieldSet(this, _Deferred_state, State.fulfilled, "f");
__classPrivateFieldGet(this, _Deferred_resolve, "f").call(this, value);
return true;

@@ -68,8 +84,12 @@ }

reject(reason) {
if (this.settled) {
const wasSettled = this.settled;
// This isn't great: the wrapped Promise may be in a different state than
// #state: but the caller wanted to reject, so even if it already was
// resolved, let's try to respect that.
__classPrivateFieldSet(this, _Deferred_state, State.rejected, "f");
if (wasSettled) {
return false;
}
else {
this.state = State.rejected;
this._reject(reason);
__classPrivateFieldGet(this, _Deferred_reject, "f").call(this, reason);
return true;

@@ -88,3 +108,3 @@ }

exports.Deferred = Deferred;
_a = Symbol.toStringTag;
_Deferred_resolve = new WeakMap(), _Deferred_reject = new WeakMap(), _Deferred_state = new WeakMap(), _a = Symbol.toStringTag;
async function observe(d, p) {

@@ -91,0 +111,0 @@ try {

@@ -7,1 +7,2 @@ /**

export declare function cleanError(s: any): string;
export declare function asError(err: any): Error;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.cleanError = exports.tryEach = void 0;
exports.asError = exports.cleanError = exports.tryEach = void 0;
const String_1 = require("./String");
/**

@@ -25,2 +26,8 @@ * When we wrap errors, an Error always prefixes the toString() and stack with

exports.cleanError = cleanError;
function asError(err) {
return err instanceof Error
? err
: new Error((0, String_1.blank)(err) ? "(unknown)" : (0, String_1.toS)(err));
}
exports.asError = asError;
//# sourceMappingURL=Error.js.map
export declare class Rate {
#private;
readonly ttlMs: number;
private _eventCount;
private readonly start;
private readonly store;
constructor(ttlMs?: number);

@@ -14,3 +12,2 @@ onEvent(): void;

clear(): this;
private vacuum;
}
"use strict";
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
};
var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
if (kind === "m") throw new TypeError("Private method is not writable");
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
};
var _Rate_instances, _Rate_start, _Rate_store, _Rate_eventCount, _Rate_vacuum;
Object.defineProperty(exports, "__esModule", { value: true });

@@ -7,25 +19,27 @@ exports.Rate = void 0;

this.ttlMs = ttlMs;
this._eventCount = 0;
this.start = Date.now();
this.store = [];
_Rate_instances.add(this);
_Rate_start.set(this, Date.now());
_Rate_store.set(this, []);
_Rate_eventCount.set(this, 0);
}
onEvent() {
this._eventCount++;
this.store.push(Date.now());
this.vacuum();
var _a;
__classPrivateFieldSet(this, _Rate_eventCount, (_a = __classPrivateFieldGet(this, _Rate_eventCount, "f"), _a++, _a), "f");
__classPrivateFieldGet(this, _Rate_store, "f").push(Date.now());
__classPrivateFieldGet(this, _Rate_instances, "m", _Rate_vacuum).call(this);
}
get eventCount() {
return this._eventCount;
return __classPrivateFieldGet(this, _Rate_eventCount, "f");
}
get period() {
return Date.now() - this.start;
return Date.now() - __classPrivateFieldGet(this, _Rate_start, "f");
}
get eventsPerMs() {
this.vacuum();
const elapsed = Math.max(1, Date.now() - this.start);
if (elapsed > this.ttlMs) {
return this.store.length / this.ttlMs;
__classPrivateFieldGet(this, _Rate_instances, "m", _Rate_vacuum).call(this);
const elapsedMs = Date.now() - __classPrivateFieldGet(this, _Rate_start, "f");
if (elapsedMs > this.ttlMs) {
return __classPrivateFieldGet(this, _Rate_store, "f").length / this.ttlMs;
}
else {
return this.store.length / elapsed;
return __classPrivateFieldGet(this, _Rate_store, "f").length / Math.max(1, elapsedMs);
}

@@ -40,19 +54,19 @@ }

clear() {
this.store.length = 0;
__classPrivateFieldGet(this, _Rate_store, "f").length = 0;
return this;
}
vacuum() {
const minTime = Date.now() - this.ttlMs;
// If nothing's expired, findIndex should return index 0, so this should
// normally be quite cheap:
const firstGoodIndex = this.store.findIndex((ea) => ea > minTime);
if (firstGoodIndex === -1) {
this.clear();
}
else if (firstGoodIndex > 0) {
this.store.splice(0, firstGoodIndex);
}
}
}
exports.Rate = Rate;
_Rate_start = new WeakMap(), _Rate_store = new WeakMap(), _Rate_eventCount = new WeakMap(), _Rate_instances = new WeakSet(), _Rate_vacuum = function _Rate_vacuum() {
const minTime = Date.now() - this.ttlMs;
// If nothing's expired, findIndex should return index 0, so this should
// normally be quite cheap:
const firstGoodIndex = __classPrivateFieldGet(this, _Rate_store, "f").findIndex((ea) => ea > minTime);
if (firstGoodIndex === -1) {
this.clear();
}
else if (firstGoodIndex > 0) {
__classPrivateFieldGet(this, _Rate_store, "f").splice(0, firstGoodIndex);
}
};
//# sourceMappingURL=Rate.js.map

@@ -11,13 +11,6 @@ /// <reference types="node" />

export declare class Task<T = any> {
#private;
readonly command: string;
readonly parser: Parser<T>;
readonly taskId: number;
private opts?;
private startedAt?;
private settledAt?;
private _passed?;
private _pending;
private readonly d;
private _stdout;
private _stderr;
/**

@@ -30,3 +23,2 @@ * @param {string} command is the value written to stdin to perform the given

constructor(command: string, parser: Parser<T>);
private onSettle;
/**

@@ -43,5 +35,7 @@ * @return the resolution or rejection of this task.

onStderr(buf: string | Buffer): void;
private resolve;
reject(error: Error): void;
/**
* @return true if the wrapped promise was rejected
*/
reject(error: Error): boolean;
}
export {};
"use strict";
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
};
var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
if (kind === "m") throw new TypeError("Private method is not writable");
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
};
var _Task_instances, _Task_opts, _Task_startedAt, _Task_parsing, _Task_settledAt, _Task_d, _Task_stdout, _Task_stderr, _Task_onSettle, _Task_resolve;
Object.defineProperty(exports, "__esModule", { value: true });

@@ -22,14 +34,22 @@ exports.Task = void 0;

this.parser = parser;
_Task_instances.add(this);
this.taskId = _taskId++;
this._pending = true;
this.d = new Deferred_1.Deferred();
this._stdout = "";
this._stderr = "";
this.d.promise.then(() => this.onSettle(), () => this.onSettle());
_Task_opts.set(this, void 0);
_Task_startedAt.set(this, void 0);
_Task_parsing.set(this, false);
_Task_settledAt.set(this, void 0);
_Task_d.set(this, new Deferred_1.Deferred());
_Task_stdout.set(this, "");
_Task_stderr.set(this, ""
/**
* @param {string} command is the value written to stdin to perform the given
* task.
* @param {Parser<T>} parser is used to parse resulting data from the
* underlying process to a typed object.
*/
);
// We can't use .finally here, because that creates a promise chain that, if
// rejected, results in an uncaught rejection.
__classPrivateFieldGet(this, _Task_d, "f").promise.then(() => __classPrivateFieldGet(this, _Task_instances, "m", _Task_onSettle).call(this), () => __classPrivateFieldGet(this, _Task_instances, "m", _Task_onSettle).call(this));
}
onSettle() {
var _a;
this._pending = false;
(_a = this.settledAt) !== null && _a !== void 0 ? _a : (this.settledAt = Date.now());
}
/**

@@ -39,23 +59,23 @@ * @return the resolution or rejection of this task.

get promise() {
return this.d.promise;
return __classPrivateFieldGet(this, _Task_d, "f").promise;
}
get pending() {
return this.d.pending;
return __classPrivateFieldGet(this, _Task_d, "f").pending;
}
get state() {
return this.d.pending
return __classPrivateFieldGet(this, _Task_d, "f").pending
? "pending"
: this.d.fulfilled
? "resolved"
: "rejected";
: __classPrivateFieldGet(this, _Task_d, "f").rejected
? "rejected"
: "resolved";
}
onStart(opts) {
this.opts = opts;
this.startedAt = Date.now();
__classPrivateFieldSet(this, _Task_opts, opts, "f");
__classPrivateFieldSet(this, _Task_startedAt, Date.now(), "f");
}
get runtimeMs() {
var _a;
return this.startedAt == null
return __classPrivateFieldGet(this, _Task_startedAt, "f") == null
? undefined
: ((_a = this.settledAt) !== null && _a !== void 0 ? _a : Date.now()) - this.startedAt;
: ((_a = __classPrivateFieldGet(this, _Task_settledAt, "f")) !== null && _a !== void 0 ? _a : Date.now()) - __classPrivateFieldGet(this, _Task_startedAt, "f");
}

@@ -69,35 +89,17 @@ toString() {

}
// private trace({
// meth,
// desc,
// meta,
// }: {
// meth: string
// desc?: string
// meta?: any
// }) {
// this.opts
// ?.logger()
// .trace("Task#" + this.taskId + "." + meth + "() " + toS(desc), meta)
// }
onStdout(buf) {
var _a, _b;
// this.trace({ meth: "onStdout", meta: { buf: buf.toString() } })
this._stdout += buf.toString();
const m = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.passRE.exec(this._stdout);
if (null != m) {
// this.trace({ meth: "onStdout", desc: "found pass!", meta: { m } })
__classPrivateFieldSet(this, _Task_stdout, __classPrivateFieldGet(this, _Task_stdout, "f") + buf.toString(), "f");
const passRE = (_a = __classPrivateFieldGet(this, _Task_opts, "f")) === null || _a === void 0 ? void 0 : _a.passRE;
if (passRE != null && passRE.exec(__classPrivateFieldGet(this, _Task_stdout, "f")) != null) {
// remove the pass token from stdout:
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._stdout = this._stdout.replace(this.opts.passRE, "");
this.resolve(true);
__classPrivateFieldSet(this, _Task_stdout, __classPrivateFieldGet(this, _Task_stdout, "f").replace(passRE, ""), "f");
__classPrivateFieldGet(this, _Task_instances, "m", _Task_resolve).call(this, true);
}
else {
const m2 = (_b = this.opts) === null || _b === void 0 ? void 0 : _b.failRE.exec(this._stdout);
if (null != m2) {
// this.trace({ meth: "onStdout", desc: "found fail!", meta: { m2 } })
const failRE = (_b = __classPrivateFieldGet(this, _Task_opts, "f")) === null || _b === void 0 ? void 0 : _b.failRE;
if (failRE != null && failRE.exec(__classPrivateFieldGet(this, _Task_stdout, "f")) != null) {
// remove the fail token from stdout:
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._stdout = this._stdout.replace(this.opts.failRE, "");
this.resolve(false);
__classPrivateFieldSet(this, _Task_stdout, __classPrivateFieldGet(this, _Task_stdout, "f").replace(failRE, ""), "f");
__classPrivateFieldGet(this, _Task_instances, "m", _Task_resolve).call(this, false);
}

@@ -108,47 +110,50 @@ }

var _a;
// this.trace({ meth: "onStderr", meta: { buf: buf.toString() } })
this._stderr += buf.toString();
const m = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.failRE.exec(this._stderr);
if (null != m) {
// this.trace({ meth: "onStderr", desc: "found fail!", meta: { m } })
__classPrivateFieldSet(this, _Task_stderr, __classPrivateFieldGet(this, _Task_stderr, "f") + buf.toString(), "f");
const failRE = (_a = __classPrivateFieldGet(this, _Task_opts, "f")) === null || _a === void 0 ? void 0 : _a.failRE;
if (failRE != null && failRE.exec(__classPrivateFieldGet(this, _Task_stderr, "f")) != null) {
// remove the fail token from stderr:
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._stderr = this._stderr.replace(this.opts.failRE, "");
this.resolve(false);
__classPrivateFieldSet(this, _Task_stderr, __classPrivateFieldGet(this, _Task_stderr, "f").replace(failRE, ""), "f");
__classPrivateFieldGet(this, _Task_instances, "m", _Task_resolve).call(this, false);
}
}
async resolve(passed) {
var _a, _b, _c, _d;
// fail always wins.
this._passed = ((_a = this._passed) !== null && _a !== void 0 ? _a : true) && passed;
// this.trace({
// meth: "resolve",
// meta: { passed, this_passed: this._passed },
// })
// wait for stderr and stdout to flush:
await (0, Async_1.delay)((_c = (_b = this.opts) === null || _b === void 0 ? void 0 : _b.streamFlushMillis) !== null && _c !== void 0 ? _c : 10, true);
// we're expecting this method may be called concurrently (if there are both
// pass and fail tokens found in stderr and stdout), but we only want to run
// this once, so
if (!this.d.pending || !this._pending)
return;
this._pending = false;
try {
const parseResult = await this.parser(this._stdout, this._stderr, this._passed);
if (this.d.resolve(parseResult)) {
}
else {
(_d = this.opts) === null || _d === void 0 ? void 0 : _d.observer.onInternalError(new Error(this.toString() + " ._resolved() more than once"));
}
/**
* @return true if the wrapped promise was rejected
*/
reject(error) {
return __classPrivateFieldGet(this, _Task_d, "f").reject(error);
}
}
exports.Task = Task;
_Task_opts = new WeakMap(), _Task_startedAt = new WeakMap(), _Task_parsing = new WeakMap(), _Task_settledAt = new WeakMap(), _Task_d = new WeakMap(), _Task_stdout = new WeakMap(), _Task_stderr = new WeakMap(), _Task_instances = new WeakSet(), _Task_onSettle = function _Task_onSettle() {
var _a;
__classPrivateFieldSet(this, _Task_settledAt, (_a = __classPrivateFieldGet(this, _Task_settledAt, "f")) !== null && _a !== void 0 ? _a : Date.now(), "f");
}, _Task_resolve = async function _Task_resolve(passed) {
var _a, _b, _c;
// fail always wins.
passed = !__classPrivateFieldGet(this, _Task_d, "f").rejected && passed;
// this.trace({
// meth: "resolve",
// meta: { passed, this_passed: this._passed },
// })
// wait for stderr and stdout to flush:
await (0, Async_1.delay)((_b = (_a = __classPrivateFieldGet(this, _Task_opts, "f")) === null || _a === void 0 ? void 0 : _a.streamFlushMillis) !== null && _b !== void 0 ? _b : 10, true);
// we're expecting this method may be called concurrently (if there are both
// pass and fail tokens found in stderr and stdout), but we only want to run
// this once, so
if (!this.pending || __classPrivateFieldGet(this, _Task_parsing, "f"))
return;
// Prevent concurrent parsing:
__classPrivateFieldSet(this, _Task_parsing, true, "f");
try {
const parseResult = await this.parser(__classPrivateFieldGet(this, _Task_stdout, "f"), __classPrivateFieldGet(this, _Task_stderr, "f"), passed);
if (__classPrivateFieldGet(this, _Task_d, "f").resolve(parseResult)) {
}
catch (error) {
this.reject(error);
else {
(_c = __classPrivateFieldGet(this, _Task_opts, "f")) === null || _c === void 0 ? void 0 : _c.observer.emit("internalError", new Error(this.toString() + " ._resolved() more than once"));
}
}
reject(error) {
if (this.d.reject(error)) {
}
catch (error) {
this.reject(error);
}
}
exports.Task = Task;
};
//# sourceMappingURL=Task.js.map
{
"name": "batch-cluster",
"version": "8.1.0",
"version": "9.0.0",
"description": "Manage a cluster of child processes",

@@ -39,3 +39,3 @@ "main": "dist/BatchCluster.js",

"@types/chai-string": "^1.4.2",
"@types/mocha": "^9.0.0",
"@types/mocha": "^9.1.0",
"@types/node": "^17.0.10",

@@ -59,4 +59,4 @@ "@typescript-eslint/eslint-plugin": "^5.10.0",

"typedoc": "^0.22.11",
"typescript": "^4.5.4"
"typescript": "^4.5.5"
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc