Socket
Socket
Sign inDemoInstall

@tracerbench/spawn

Package Overview
Dependencies
4
Maintainers
3
Versions
13
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.1 to 1.0.2

8

dist/newPipeMessageTransport.d.ts
/// <reference types="node" />
import { AttachMessageTransport } from "@tracerbench/message-transport";
export default function newPipeMessageTransport(writeStream: NodeJS.WriteStream, readStream: NodeJS.ReadStream): AttachMessageTransport;
export declare type Write = (data: Buffer) => void;
export declare type EndWrite = () => void;
export declare type OnRead = (chunk: Buffer) => void;
export declare type OnReadEnd = () => void;
export declare type OnClose = (err?: Error) => void;
export declare type AttachProcess = (onRead: OnRead, onReadEnd: OnReadEnd, onClose: OnClose) => [Write, EndWrite];
export default function newPipeMessageTransport(connect: AttachProcess): AttachMessageTransport;
//# sourceMappingURL=newPipeMessageTransport.d.ts.map

37

dist/newPipeMessageTransport.js

@@ -5,4 +5,5 @@ "use strict";

const newTaskQueue_1 = require("./newTaskQueue");
function newPipeMessageTransport(writeStream, readStream) {
function newPipeMessageTransport(connect) {
let attached = false;
let closed = false;
return (onMessage, onClose) => {

@@ -13,10 +14,11 @@ if (attached) {

attached = true;
const [write, endWrite] = connect(onRead, onReadEnd, enqueueClose);
const enqueue = newTaskQueue_1.default();
const splitter = newBufferSplitter_1.default(0 /* NULL */, buffer => enqueue(() => onMessage(buffer.toString("utf8"))));
let closeEnqueued = false;
const enqueueOnClose = (error) => {
if (closeEnqueued) {
const splitter = newBufferSplitter_1.default(0 /* NULL */, split => enqueueMessage(split.toString("utf8")));
return sendMessage;
function enqueueClose(error) {
if (closed) {
return;
}
closeEnqueued = true;
closed = true;
if (error) {

@@ -28,11 +30,16 @@ enqueue(() => onClose(error));

}
};
writeStream.on("error", enqueueOnClose);
readStream.on("error", enqueueOnClose);
readStream.on("close", enqueueOnClose);
readStream.on("data", data => splitter.push(data));
readStream.on("end", () => splitter.flush());
return message => {
writeStream.write(message + "\0");
};
endWrite();
}
function enqueueMessage(message) {
enqueue(() => onMessage(message));
}
function onRead(data) {
splitter.push(data);
}
function onReadEnd() {
splitter.flush();
}
function sendMessage(message) {
write(Buffer.from(message + "\0", "utf8"));
}
};

@@ -39,0 +46,0 @@ }

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const debug = require("debug");
const events_1 = require("events");
const race_cancellation_1 = require("race-cancellation");
const debugCallback = debug("@tracerbench/spawn");
function newProcess(child) {
let hasExited = false;
const [exited, onExit] = race_cancellation_1.oneshot();
let lastError;
const emitter = new events_1.EventEmitter();
const raceExit = race_cancellation_1.newRaceCancellation(exited, "the process exited before the async task using it was completed", "UnexpectedProcessExit");
const [raceExit, cancel] = race_cancellation_1.cancellableRace();
child.on("error", error => {
lastError = error;
debugCallback("child process error %O", error);
onExit(error);
});
child.on("exit", () => {
debugCallback("child process exit");
onExit();
hasExited = true;
emitter.emit("exit");
});
child.on("error", error => {
emitter.emit("error", error);
});
return {

@@ -30,20 +33,50 @@ dispose,

};
async function kill(timeout, raceCancellation) {
function onExit(error) {
if (hasExited) {
return;
}
child.kill("SIGTERM");
try {
await waitForExit(timeout, raceCancellation);
hasExited = true;
if (error) {
cancel(`process exited early: ${error.message}`);
}
catch (e) {
if (!hasExited) {
child.kill("SIGKILL");
if (race_cancellation_1.isCancellation(e, "Timeout" /* Timeout */)) {
return await waitForExit(timeout, raceCancellation);
}
}
throw e;
else {
cancel(`process exited early`);
}
emitter.emit("exit");
}
async function exited(raceCancellation) {
if (lastError) {
throw lastError;
}
if (hasExited) {
return;
}
return await race_cancellation_1.disposablePromise((resolve, reject) => {
child.on("exit", resolve);
child.on("error", reject);
return () => {
child.removeListener("exit", resolve);
child.removeListener("error", reject);
};
}, raceCancellation);
}
async function waitForExit(timeout = 10000, raceCancellation) {
if (hasExited) {
return;
}
const result = await (timeout > 0
? race_cancellation_1.withRaceTimeout(exited, timeout)(raceCancellation)
: exited(raceCancellation));
return race_cancellation_1.throwIfCancelled(result);
}
async function kill(timeout, raceCancellation) {
if (hasExited) {
return;
}
if (child.killed || !child.pid) {
return;
}
child.kill();
await waitForExit(timeout, raceCancellation);
}
async function dispose() {

@@ -57,21 +90,10 @@ if (hasExited) {

catch (e) {
// dispose is in finally, we don't want to cover up error
emitter.emit("error", e);
// dispose is in finally and meant to be safe
// we don't want to cover up error
// just output for debugging
debugCallback("dispose error %O", e);
}
}
async function waitForExit(timeout = 3000, raceCancellation) {
let result;
if (timeout > 0) {
result = await race_cancellation_1.withRaceTimeout(raceTimeout => raceTimeout(exited), timeout)(raceCancellation);
}
else if (raceCancellation !== undefined) {
result = await raceCancellation(exited);
}
else {
result = await exited();
}
return race_cancellation_1.throwIfCancelled(result);
}
}
exports.default = newProcess;
//# sourceMappingURL=newProcess.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const debug = require("debug");
const newPipeMessageTransport_1 = require("./newPipeMessageTransport");
const newProcess_1 = require("./newProcess");
const debugCallback = debug("@tracerbench/spawn");
function newProcessWithPipeMessageTransport(child) {

@@ -9,3 +11,28 @@ const process = newProcess_1.default(child);

return Object.assign(process, {
attach: newPipeMessageTransport_1.default(writeStream, readStream),
attach: newPipeMessageTransport_1.default((onRead, onReadEnd, onClose) => {
child.on("error", onClose);
child.on("exit", onClose);
readStream.on("data", onRead);
readStream.on("end", () => {
debugCallback("read pipe end");
onReadEnd();
});
readStream.on("error", error => {
debugCallback("read pipe error %O", error);
});
writeStream.on("close", () => {
debugCallback("write pipe close");
onClose();
});
writeStream.on("error", (error) => {
debugCallback("write pipe error %O", error);
// writes while the other side is closing can cause EPIPE
// just wait for close to actually happen and ignore it.
if (error && "code" in error && error.code === "EPIPE") {
return;
}
onClose(error);
});
return [data => writeStream.write(data), () => writeStream.end()];
}),
});

@@ -12,0 +39,0 @@ }

{
"name": "@tracerbench/spawn",
"version": "1.0.1",
"version": "1.0.2",
"description": "High level spawn API for spawning process with a connection to the DevTools protocol.",

@@ -34,3 +34,3 @@ "license": "BSD-2-Clause",

},
"gitHead": "e37b27bd4de7c6c49ad7ad8593263e67ad6018a0"
"gitHead": "a63772e4ffbab7f3c273970f3fa176bb2544405b"
}

@@ -6,7 +6,20 @@ import { AttachMessageTransport } from "@tracerbench/message-transport";

export type Write = (data: Buffer) => void;
export type EndWrite = () => void;
export type OnRead = (chunk: Buffer) => void;
export type OnReadEnd = () => void;
export type OnClose = (err?: Error) => void;
export type AttachProcess = (
onRead: OnRead,
onReadEnd: OnReadEnd,
onClose: OnClose,
) => [Write, EndWrite];
export default function newPipeMessageTransport(
writeStream: NodeJS.WriteStream,
readStream: NodeJS.ReadStream,
connect: AttachProcess,
): AttachMessageTransport {
let attached = false;
let closed = false;
return (onMessage, onClose) => {

@@ -18,13 +31,22 @@ if (attached) {

const [write, endWrite] = connect(
onRead,
onReadEnd,
enqueueClose,
);
const enqueue = newTaskQueue();
const splitter = newBufferSplitter(Char.NULL, buffer =>
enqueue(() => onMessage(buffer.toString("utf8"))),
const splitter = newBufferSplitter(Char.NULL, split =>
enqueueMessage(split.toString("utf8")),
);
let closeEnqueued = false;
const enqueueOnClose = (error?: Error) => {
if (closeEnqueued) {
return sendMessage;
function enqueueClose(error?: Error) {
if (closed) {
return;
}
closeEnqueued = true;
closed = true;
if (error) {

@@ -35,13 +57,21 @@ enqueue(() => onClose(error));

}
};
writeStream.on("error", enqueueOnClose);
readStream.on("error", enqueueOnClose);
readStream.on("close", enqueueOnClose);
readStream.on("data", data => splitter.push(data));
readStream.on("end", () => splitter.flush());
endWrite();
}
return message => {
writeStream.write(message + "\0");
};
function enqueueMessage(message: string) {
enqueue(() => onMessage(message));
}
function onRead(data: Buffer) {
splitter.push(data);
}
function onReadEnd() {
splitter.flush();
}
function sendMessage(message: string) {
write(Buffer.from(message + "\0", "utf8"));
}
};

@@ -48,0 +78,0 @@ }

@@ -0,8 +1,7 @@

import debug = require("debug");
import { EventEmitter } from "events";
import {
cancellableRace,
Cancellation,
CancellationKind,
isCancellation,
newRaceCancellation,
oneshot,
disposablePromise,
RaceCancellation,

@@ -15,2 +14,4 @@ throwIfCancelled,

const debugCallback = debug("@tracerbench/spawn");
export default function newProcess(

@@ -20,21 +21,17 @@ child: import("execa").ExecaChildProcess,

let hasExited = false;
const [exited, onExit] = oneshot<void>();
let lastError: Error | undefined;
const emitter = new EventEmitter();
const [raceExit, cancel] = cancellableRace();
const raceExit = newRaceCancellation(
exited,
"the process exited before the async task using it was completed",
"UnexpectedProcessExit",
);
child.on("error", error => {
lastError = error;
debugCallback("child process error %O", error);
onExit(error);
});
child.on("exit", () => {
debugCallback("child process exit");
onExit();
hasExited = true;
emitter.emit("exit");
});
child.on("error", error => {
emitter.emit("error", error);
});
return {

@@ -53,50 +50,81 @@ dispose,

async function kill(timeout?: number, raceCancellation?: RaceCancellation) {
function onExit(error?: Error) {
if (hasExited) {
return;
}
child.kill("SIGTERM");
try {
await waitForExit(timeout, raceCancellation);
} catch (e) {
if (!hasExited) {
child.kill("SIGKILL");
if (isCancellation(e, CancellationKind.Timeout)) {
return await waitForExit(timeout, raceCancellation);
}
}
throw e;
hasExited = true;
if (error) {
cancel(`process exited early: ${error.message}`);
} else {
cancel(`process exited early`);
}
emitter.emit("exit");
}
async function dispose(): Promise<void> {
async function exited(
raceCancellation?: RaceCancellation,
): Promise<void | Cancellation> {
if (lastError) {
throw lastError;
}
if (hasExited) {
return;
}
try {
await kill();
} catch (e) {
// dispose is in finally, we don't want to cover up error
emitter.emit("error", e);
}
return await disposablePromise((resolve, reject) => {
child.on("exit", resolve);
child.on("error", reject);
return () => {
child.removeListener("exit", resolve);
child.removeListener("error", reject);
};
}, raceCancellation);
}
async function waitForExit(
timeout = 3000,
timeout = 10000,
raceCancellation?: RaceCancellation,
) {
let result: void | Cancellation;
if (timeout > 0) {
result = await withRaceTimeout(
raceTimeout => raceTimeout(exited),
timeout,
)(raceCancellation);
} else if (raceCancellation !== undefined) {
result = await raceCancellation(exited);
} else {
result = await exited();
if (hasExited) {
return;
}
const result = await (timeout > 0
? withRaceTimeout(exited, timeout)(raceCancellation)
: exited(raceCancellation));
return throwIfCancelled(result);
}
async function kill(timeout?: number, raceCancellation?: RaceCancellation) {
if (hasExited) {
return;
}
if (child.killed || !child.pid) {
return;
}
child.kill();
await waitForExit(timeout, raceCancellation);
}
async function dispose(): Promise<void> {
if (hasExited) {
return;
}
try {
await kill();
} catch (e) {
// dispose is in finally and meant to be safe
// we don't want to cover up error
// just output for debugging
debugCallback("dispose error %O", e);
}
}
}

@@ -0,1 +1,3 @@

import debug = require("debug");
import { ProcessWithPipeMessageTransport } from "../types";

@@ -6,2 +8,4 @@

const debugCallback = debug("@tracerbench/spawn");
export default function newProcessWithPipeMessageTransport(

@@ -13,3 +17,31 @@ child: import("execa").ExecaChildProcess,

return Object.assign(process, {
attach: createPipeMessageTransport(writeStream, readStream),
attach: createPipeMessageTransport((onRead, onReadEnd, onClose) => {
child.on("error", onClose);
child.on("exit", onClose);
readStream.on("data", onRead);
readStream.on("end", () => {
debugCallback("read pipe end");
onReadEnd();
});
readStream.on("error", error => {
debugCallback("read pipe error %O", error);
});
writeStream.on("close", () => {
debugCallback("write pipe close");
onClose();
});
writeStream.on("error", (error: Error | NodeJS.ErrnoException) => {
debugCallback("write pipe error %O", error);
// writes while the other side is closing can cause EPIPE
// just wait for close to actually happen and ignore it.
if (error && "code" in error && error.code === "EPIPE") {
return;
}
onClose(error);
});
return [data => writeStream.write(data), () => writeStream.end()];
}),
});

@@ -22,7 +54,7 @@ }

return stdio as [
NodeJS.WriteStream,
NodeJS.ReadStream,
NodeJS.ReadStream,
NodeJS.WriteStream,
NodeJS.ReadStream
NodeJS.WritableStream,
NodeJS.ReadableStream,
NodeJS.ReadableStream,
NodeJS.WritableStream,
NodeJS.ReadableStream,
];

@@ -29,0 +61,0 @@ }

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc