Socket
Socket
Sign inDemoInstall

@parcel/workers

Package Overview
Dependencies
Maintainers
1
Versions
876
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@parcel/workers - npm Package Compare versions

Comparing version 1.11.0 to 2.0.0-alpha.1

lib/backend.js

25

package.json
{
"name": "@parcel/workers",
"version": "1.11.0",
"version": "2.0.0-alpha.1",
"description": "Blazing fast, zero configuration web application bundler",
"main": "index.js",
"main": "lib/index.js",
"license": "MIT",

@@ -12,3 +12,3 @@ "repository": {

"engines": {
"node": ">= 6.0.0"
"node": ">= 8.0.0"
},

@@ -19,17 +19,12 @@ "publishConfig": {

"scripts": {
"test": "cross-env NODE_ENV=test mocha",
"test-ci": "yarn build && yarn test",
"format": "prettier --write \"./{src,bin,test}/**/*.{js,json,md}\"",
"lint": "eslint . && prettier \"./{src,bin,test}/**/*.{js,json,md}\" --list-different",
"build": "babel src -d lib",
"prepublish": "yarn build"
"test-ci": "yarn test"
},
"devDependencies": {
"mocha": "^5.2.0"
},
"dependencies": {
"@parcel/utils": "^1.11.0",
"@parcel/logger": "^2.0.0-alpha.1",
"@parcel/utils": "^2.0.0-alpha.1",
"chrome-trace-event": "^1.0.2",
"nullthrows": "^1.1.1",
"physical-cpu-count": "^2.0.0"
},
"gitHead": "34eb91e8e6991073e594bff731c333d09b0403b5"
}
"gitHead": "11d21d56b97f1b6ae3cd3671ccbb39adf80c438f"
}

193

src/child.js

@@ -1,44 +0,65 @@

const {errorUtils} = require('@parcel/utils');
// @flow
class Child {
constructor() {
if (!process.send) {
throw new Error('Only create Child instances in a worker!');
}
import type {
CallRequest,
WorkerDataResponse,
WorkerErrorResponse,
WorkerMessage,
WorkerRequest,
WorkerResponse,
ChildImpl
} from './types';
import type {IDisposable} from '@parcel/types';
this.module = undefined;
this.childId = undefined;
import invariant from 'assert';
import nullthrows from 'nullthrows';
import Logger, {patchConsole} from '@parcel/logger';
import {errorToJson, jsonToError} from '@parcel/utils';
import bus from './bus';
import Profiler from './Profiler';
this.callQueue = [];
this.responseQueue = new Map();
this.responseId = 0;
this.maxConcurrentCalls = 10;
type ChildCall = WorkerRequest & {|
resolve: (result: Promise<any> | any) => void,
reject: (error: any) => void
|};
export class Child {
callQueue: Array<ChildCall> = [];
childId: ?number;
maxConcurrentCalls: number = 10;
module: ?any;
responseId = 0;
responseQueue: Map<number, ChildCall> = new Map();
loggerDisposable: IDisposable;
child: ChildImpl;
profiler: ?Profiler;
constructor(ChildBackend: Class<ChildImpl>) {
this.child = new ChildBackend(
this.messageListener.bind(this),
this.handleEnd.bind(this)
);
patchConsole();
// Monitior all logging events inside this child process and forward to
// the main process via the bus.
this.loggerDisposable = Logger.onLog(event => {
bus.emit('logEvent', event);
});
}
messageListener(data) {
if (data === 'die') {
return this.end();
messageListener(message: WorkerMessage): void | Promise<void> {
if (message.type === 'response') {
return this.handleResponse(message);
} else if (message.type === 'request') {
return this.handleRequest(message);
}
let type = data.type;
if (type === 'response') {
return this.handleResponse(data);
} else if (type === 'request') {
return this.handleRequest(data);
}
}
async send(data) {
process.send(data, err => {
if (err && err instanceof Error) {
if (err.code === 'ERR_IPC_CHANNEL_CLOSED') {
// IPC connection closed
// no need to keep the worker running if it can't send or receive data
return this.end();
}
}
});
async send(data: WorkerMessage): Promise<void> {
this.child.send(data);
}
childInit(module, childId) {
childInit(module: string, childId: number): void {
// $FlowFixMe this must be dynamic
this.module = require(module);

@@ -48,19 +69,51 @@ this.childId = childId;

async handleRequest(data) {
let idx = data.idx;
let child = data.child;
let method = data.method;
let args = data.args;
async handleRequest(data: WorkerRequest): Promise<void> {
let {idx, method, args} = data;
let child = nullthrows(data.child);
let result = {idx, child, type: 'response'};
try {
result.contentType = 'data';
if (method === 'childInit') {
result.content = this.childInit(...args, child);
} else {
result.content = await this.module[method](...args);
const responseFromContent = (content: any): WorkerDataResponse => ({
idx,
child,
type: 'response',
contentType: 'data',
content
});
const errorResponseFromError = (e: Error): WorkerErrorResponse => ({
idx,
child,
type: 'response',
contentType: 'error',
content: errorToJson(e)
});
let result;
if (method === 'childInit') {
try {
let [moduleName] = args;
result = responseFromContent(this.childInit(moduleName, child));
} catch (e) {
result = errorResponseFromError(e);
}
} catch (e) {
result.contentType = 'error';
result.content = errorUtils.errorToJson(e);
} else if (method === 'startProfile') {
this.profiler = new Profiler();
try {
result = responseFromContent(await this.profiler.startProfiling());
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'endProfile') {
try {
let res = this.profiler ? await this.profiler.stopProfiling() : null;
result = responseFromContent(res);
} catch (e) {
result = errorResponseFromError(e);
}
} else {
try {
// $FlowFixMe
result = responseFromContent(await this.module[method](...args));
} catch (e) {
result = errorResponseFromError(e);
}
}

@@ -71,10 +124,11 @@

async handleResponse(data) {
let idx = data.idx;
async handleResponse(data: WorkerResponse): Promise<void> {
let idx = nullthrows(data.idx);
let contentType = data.contentType;
let content = data.content;
let call = this.responseQueue.get(idx);
let call = nullthrows(this.responseQueue.get(idx));
if (contentType === 'error') {
call.reject(errorUtils.jsonToError(content));
invariant(typeof content !== 'string');
call.reject(jsonToError(content));
} else {

@@ -91,7 +145,15 @@ call.resolve(content);

// Keep in mind to make sure responses to these calls are JSON.Stringify safe
async addCall(request, awaitResponse = true) {
let call = request;
call.type = 'request';
call.child = this.childId;
call.awaitResponse = awaitResponse;
async addCall(
request: CallRequest,
awaitResponse: boolean = true
): Promise<mixed> {
// $FlowFixMe
let call: ChildCall = {
...request,
type: 'request',
child: this.childId,
awaitResponse,
resolve: () => {},
reject: () => {}
};

@@ -112,3 +174,3 @@ let promise;

async sendRequest(call) {
async sendRequest(call: ChildCall): Promise<void> {
let idx;

@@ -119,7 +181,9 @@ if (call.awaitResponse) {

}
this.send({
idx: idx,
idx,
child: call.child,
type: call.type,
location: call.location,
handle: call.handle,
method: call.method,

@@ -131,3 +195,3 @@ args: call.args,

async processQueue() {
async processQueue(): Promise<void> {
if (!this.callQueue.length) {

@@ -142,10 +206,5 @@ return;

end() {
process.exit();
handleEnd(): void {
this.loggerDisposable.dispose();
}
}
let child = new Child();
process.on('message', child.messageListener.bind(child));
module.exports = child;

@@ -1,28 +0,43 @@

const childProcess = require('child_process');
const {EventEmitter} = require('events');
const {errorUtils} = require('@parcel/utils');
// @flow
const childModule = require.resolve('./child');
import type {FilePath} from '@parcel/types';
import type {WorkerMessage, WorkerImpl, BackendType} from './types';
import EventEmitter from 'events';
import {jsonToError} from '@parcel/utils';
import {getWorkerBackend} from './backend';
export type WorkerCall = {|
method: string,
args: Array<any>,
retries: number,
resolve: (result: Promise<any> | any) => void,
reject: (error: any) => void
|};
type WorkerOpts = {|
forcedKillTime: number,
backend: BackendType
|};
let WORKER_ID = 0;
class Worker extends EventEmitter {
constructor(options) {
super();
export default class Worker extends EventEmitter {
+options: WorkerOpts;
worker: WorkerImpl;
id: number = WORKER_ID++;
this.options = options;
this.id = WORKER_ID++;
calls: Map<number, WorkerCall> = new Map();
exitCode = null;
callId = 0;
this.sendQueue = [];
this.processQueue = true;
ready = false;
stopped = false;
isStopping = false;
this.calls = new Map();
this.exitCode = null;
this.callId = 0;
this.ready = false;
this.stopped = false;
this.isStopping = false;
constructor(options: WorkerOpts) {
super();
this.options = options;
}
async fork(forkModule, bundlerOptions) {
async fork(forkModule: FilePath) {
let filteredArgs = process.execArgv.filter(

@@ -32,21 +47,27 @@ v => !/^--(debug|inspect)/.test(v)

let options = {
execArgv: filteredArgs,
env: process.env,
cwd: process.cwd()
};
for (let i = 0; i < filteredArgs.length; i++) {
let arg = filteredArgs[i];
if (
(arg === '-r' || arg === '--require') &&
filteredArgs[i + 1] === '@parcel/register'
) {
filteredArgs.splice(i, 2);
i--;
}
}
this.child = childProcess.fork(childModule, process.argv, options);
this.child.on('message', data => this.receive(data));
this.child.once('exit', code => {
let onMessage = data => this.receive(data);
let onExit = code => {
this.exitCode = code;
this.emit('exit', code);
});
};
this.child.on('error', err => {
let onError = err => {
this.emit('error', err);
});
};
let WorkerBackend = getWorkerBackend(this.options.backend);
this.worker = new WorkerBackend(filteredArgs, onMessage, onError, onExit);
await this.worker.start();
await new Promise((resolve, reject) => {

@@ -62,50 +83,11 @@ this.call({

await this.init(bundlerOptions);
this.ready = true;
this.emit('ready');
}
async init(bundlerOptions) {
this.ready = false;
return new Promise((resolve, reject) => {
this.call({
method: 'init',
args: [bundlerOptions],
retries: 0,
resolve: (...args) => {
this.ready = true;
this.emit('ready');
resolve(...args);
},
reject
});
});
send(data: WorkerMessage): void {
this.worker.send(data);
}
send(data) {
if (!this.processQueue) {
return this.sendQueue.push(data);
}
let result = this.child.send(data, error => {
if (error && error instanceof Error) {
// Ignore this, the workerfarm handles child errors
return;
}
this.processQueue = true;
if (this.sendQueue.length > 0) {
let queueCopy = this.sendQueue.slice(0);
this.sendQueue = [];
queueCopy.forEach(entry => this.send(entry));
}
});
if (!result || /^win/.test(process.platform)) {
// Queue is handling too much messages throttle it
this.processQueue = false;
}
}
call(call) {
call(call: WorkerCall): void {
if (this.stopped || this.isStopping) {

@@ -127,3 +109,3 @@ return;

receive(data) {
receive(message: WorkerMessage): void {
if (this.stopped || this.isStopping) {

@@ -133,10 +115,10 @@ return;

let idx = data.idx;
let type = data.type;
let content = data.content;
let contentType = data.contentType;
if (message.type === 'request') {
this.emit('request', message);
} else if (message.type === 'response') {
let idx = message.idx;
if (idx == null) {
return;
}
if (type === 'request') {
this.emit('request', data);
} else if (type === 'response') {
let call = this.calls.get(idx);

@@ -148,10 +130,10 @@ if (!call) {

if (contentType === 'error') {
call.reject(errorUtils.jsonToError(content));
if (message.contentType === 'error') {
call.reject(jsonToError(message.content));
} else {
call.resolve(content);
call.resolve(message.content);
}
this.calls.delete(idx);
this.emit('response', data);
this.emit('response', message);
}

@@ -164,14 +146,4 @@ }

if (this.child) {
this.child.send('die');
let forceKill = setTimeout(
() => this.child.kill('SIGINT'),
this.options.forcedKillTime
);
await new Promise(resolve => {
this.child.once('exit', resolve);
});
clearTimeout(forceKill);
if (this.worker) {
await this.worker.stop();
}

@@ -181,3 +153,1 @@ }

}
module.exports = Worker;

@@ -1,8 +0,49 @@

const {EventEmitter} = require('events');
const {errorUtils} = require('@parcel/utils');
const Worker = require('./Worker');
const cpuCount = require('./cpuCount');
// @flow
import type {ErrorWithCode, FilePath} from '@parcel/types';
import type {
CallRequest,
WorkerRequest,
WorkerDataResponse,
WorkerErrorResponse,
BackendType
} from './types';
import nullthrows from 'nullthrows';
import EventEmitter from 'events';
import {
errorToJson,
jsonToError,
prepareForSerialization,
restoreDeserializedObject
} from '@parcel/utils';
import Worker, {type WorkerCall} from './Worker';
import cpuCount from './cpuCount';
import Handle from './Handle';
import {child} from './childState';
import {detectBackend} from './backend';
import Profiler from './Profiler';
import Trace from './Trace';
import fs from 'fs';
import logger from '@parcel/logger';
let shared = null;
let profileId = 1;
type FarmOptions = {|
maxConcurrentWorkers: number,
maxConcurrentCallsPerWorker: number,
forcedKillTime: number,
useLocalWorker: boolean,
warmWorkers: boolean,
workerPath?: FilePath,
backend: BackendType
|};
type HandleFunction = (...args: Array<any>) => Promise<any>;
type WorkerModule = {|
+[string]: (...args: Array<mixed>) => Promise<mixed>
|};
/**

@@ -12,4 +53,14 @@ * workerPath should always be defined inside farmOptions

class WorkerFarm extends EventEmitter {
constructor(options, farmOptions = {}) {
export default class WorkerFarm extends EventEmitter {
callQueue: Array<WorkerCall> = [];
ending: boolean = false;
localWorker: WorkerModule;
options: FarmOptions;
run: HandleFunction;
warmWorkers: number = 0;
workers: Map<number, Worker> = new Map();
handles: Map<number, Handle> = new Map();
profiler: ?Profiler;
constructor(farmOptions: $Shape<FarmOptions> = {}) {
super();

@@ -20,14 +71,8 @@ this.options = {

forcedKillTime: 500,
warmWorkers: true,
useLocalWorker: true
warmWorkers: false,
useLocalWorker: true,
backend: detectBackend(),
...farmOptions
};
if (farmOptions) {
this.options = Object.assign(this.options, farmOptions);
}
this.warmWorkers = 0;
this.workers = new Map();
this.callQueue = [];
if (!this.options.workerPath) {

@@ -37,9 +82,10 @@ throw new Error('Please provide a worker path!');

// $FlowFixMe this must be dynamic
this.localWorker = require(this.options.workerPath);
this.run = this.mkhandle('run');
this.run = this.createHandle('run');
this.init(options);
this.startMaxWorkers();
}
warmupWorker(method, args) {
warmupWorker(method: string, args: Array<any>): void {
// Workers are already stopping

@@ -66,11 +112,9 @@ if (this.ending) {

shouldStartRemoteWorkers() {
shouldStartRemoteWorkers(): boolean {
return (
this.options.maxConcurrentWorkers > 1 ||
process.env.NODE_ENV === 'test' ||
!this.options.useLocalWorker
this.options.maxConcurrentWorkers > 0 || !this.options.useLocalWorker
);
}
mkhandle(method) {
createHandle(method: string): HandleFunction {
return (...args) => {

@@ -87,3 +131,6 @@ // Child process workers are slow to start (~600ms).

return this.localWorker[method](...args, false);
let processedArgs = restoreDeserializedObject(
prepareForSerialization([...args, false])
);
return this.localWorker[method](...processedArgs);
}

@@ -93,3 +140,3 @@ };

onError(error, worker) {
onError(error: ErrorWithCode, worker: Worker) {
// Handle ipc errors

@@ -102,5 +149,8 @@ if (error.code === 'ERR_IPC_CHANNEL_CLOSED') {

startChild() {
let worker = new Worker(this.options);
let worker = new Worker({
forcedKillTime: this.options.forcedKillTime,
backend: this.options.backend
});
worker.fork(this.options.workerPath, this.bundlerOptions);
worker.fork(nullthrows(this.options.workerPath));

@@ -118,3 +168,3 @@ worker.on('request', data => this.processRequest(data, worker));

async stopWorker(worker) {
async stopWorker(worker: Worker): Promise<void> {
if (!worker.stopped) {

@@ -132,3 +182,3 @@ this.workers.delete(worker.id);

worker.calls = null;
worker.calls.clear();

@@ -142,3 +192,3 @@ await worker.stop();

async processQueue() {
async processQueue(): Promise<void> {
if (this.ending || !this.callQueue.length) return;

@@ -165,28 +215,51 @@

async processRequest(data, worker = false) {
let result = {
idx: data.idx,
type: 'response'
};
let method = data.method;
let args = data.args;
let location = data.location;
let awaitResponse = data.awaitResponse;
if (!location) {
async processRequest(
data: {|
location: FilePath
|} & $Shape<WorkerRequest>,
worker?: Worker
): Promise<?string> {
let {method, args, location, awaitResponse, idx, handle} = data;
let mod;
if (handle) {
mod = nullthrows(this.handles.get(handle));
} else if (location) {
// $FlowFixMe this must be dynamic
mod = require(location);
} else {
throw new Error('Unknown request');
}
const mod = require(location);
try {
result.contentType = 'data';
if (method) {
result.content = await mod[method](...args);
} else {
result.content = await mod(...args);
const responseFromContent = (content: any): WorkerDataResponse => ({
idx,
type: 'response',
contentType: 'data',
content
});
const errorResponseFromError = (e: Error): WorkerErrorResponse => ({
idx,
type: 'response',
contentType: 'error',
content: errorToJson(e)
});
let result;
if (method == null) {
try {
result = responseFromContent(await mod(...args));
} catch (e) {
result = errorResponseFromError(e);
}
} catch (e) {
result.contentType = 'error';
result.content = errorUtils.errorToJson(e);
} else {
// ESModule default interop
if (mod.__esModule && !mod[method] && mod.default) {
mod = mod.default;
}
try {
result = responseFromContent(await mod[method](...args));
} catch (e) {
result = errorResponseFromError(e);
}
}

@@ -198,3 +271,6 @@

} else {
return result;
if (result.contentType === 'error') {
throw jsonToError(result.content);
}
return result.content;
}

@@ -204,3 +280,3 @@ }

addCall(method, args) {
addCall(method: string, args: Array<any>): Promise<any> {
if (this.ending) {

@@ -222,3 +298,3 @@ throw new Error('Cannot add a worker call if workerfarm is ending.');

async end() {
async end(): Promise<void> {
this.ending = true;

@@ -232,43 +308,98 @@ await Promise.all(

init(bundlerOptions) {
this.bundlerOptions = bundlerOptions;
if (this.shouldStartRemoteWorkers()) {
this.persistBundlerOptions();
startMaxWorkers(): void {
// Starts workers until the maximum is reached
if (this.workers.size < this.options.maxConcurrentWorkers) {
let toStart = this.options.maxConcurrentWorkers - this.workers.size;
while (toStart--) {
this.startChild();
}
}
}
this.localWorker.init(bundlerOptions);
this.startMaxWorkers();
shouldUseRemoteWorkers(): boolean {
return (
!this.options.useLocalWorker ||
((this.warmWorkers >= this.workers.size || !this.options.warmWorkers) &&
this.options.maxConcurrentWorkers > 0)
);
}
persistBundlerOptions() {
createReverseHandle(fn: () => mixed) {
let handle = new Handle();
this.handles.set(handle.id, fn);
return handle;
}
async startProfile() {
let promises = [];
for (let worker of this.workers.values()) {
worker.init(this.bundlerOptions);
promises.push(
new Promise((resolve, reject) => {
worker.call({
method: 'startProfile',
args: [],
resolve,
reject,
retries: 0
});
})
);
}
this.profiler = new Profiler();
promises.push(this.profiler.startProfiling());
await Promise.all(promises);
}
startMaxWorkers() {
// Starts workers untill the maximum is reached
if (this.workers.size < this.options.maxConcurrentWorkers) {
for (
let i = 0;
i < this.options.maxConcurrentWorkers - this.workers.size;
i++
) {
this.startChild();
}
async endProfile() {
if (!this.profiler) {
return;
}
}
shouldUseRemoteWorkers() {
return (
!this.options.useLocalWorker ||
(this.warmWorkers >= this.workers.size || !this.options.warmWorkers)
);
let promises = [this.profiler.stopProfiling()];
let names = ['Master'];
for (let worker of this.workers.values()) {
names.push('Worker ' + worker.id);
promises.push(
new Promise((resolve, reject) => {
worker.call({
method: 'endProfile',
args: [],
resolve,
reject,
retries: 0
});
})
);
}
var profiles = await Promise.all(promises);
let trace = new Trace();
let filename = `profile-${profileId++}.trace`;
let stream = trace.pipe(fs.createWriteStream(filename));
for (let profile of profiles) {
trace.addCPUProfile(names.shift(), profile);
}
trace.flush();
await new Promise(resolve => {
stream.once('finish', resolve);
});
logger.info(`Wrote profile to ${filename}`);
}
static async getShared(options, farmOptions) {
static async getShared(
farmOptions?: $Shape<FarmOptions>
): Promise<WorkerFarm> {
// Farm options shouldn't be considered safe to overwrite
// and require an entire new instance to be created
if (shared && farmOptions) {
if (
shared &&
farmOptions &&
farmOptions.workerPath !== shared.options.workerPath
) {
await shared.end();

@@ -279,11 +410,5 @@ shared = null;

if (!shared) {
shared = new WorkerFarm(options, farmOptions);
} else if (options) {
shared.init(options);
shared = new WorkerFarm(farmOptions);
}
if (!shared && !options) {
throw new Error('Workerfarm should be initialised using options');
}
return shared;

@@ -298,8 +423,14 @@ }

static async callMaster(request, awaitResponse = true) {
if (WorkerFarm.isWorker()) {
const child = require('./child');
static async callMaster(
request: CallRequest,
awaitResponse: boolean = true
): Promise<mixed> {
if (child) {
return child.addCall(request, awaitResponse);
} else {
return (await WorkerFarm.getShared()).processRequest(request);
// $FlowFixMe
return (await WorkerFarm.getShared()).processRequest({
...request,
awaitResponse
});
}

@@ -309,3 +440,3 @@ }

static isWorker() {
return process.send && require.main.filename === require.resolve('./child');
return !!child;
}

@@ -316,4 +447,12 @@

}
static createReverseHandle(fn: (...args: any[]) => mixed) {
if (WorkerFarm.isWorker()) {
throw new Error(
'Cannot call WorkerFarm.createReverseHandle() from within Worker'
);
}
return nullthrows(shared).createReverseHandle(fn);
}
}
module.exports = WorkerFarm;

@@ -5,7 +5,2 @@ function run(data) {

function init() {
// do nothing
}
exports.run = run;
exports.init = init;

@@ -1,4 +0,2 @@

const WorkerFarm = require(`../../../${
parseInt(process.versions.node, 10) < 8 ? 'lib' : 'src'
}/WorkerFarm`);
const WorkerFarm = require('../../../src/WorkerFarm').default;

@@ -20,7 +18,2 @@ function run() {

function init() {
// Do nothing
}
exports.run = run;
exports.init = init;

@@ -1,4 +0,2 @@

const WorkerFarm = require(`../../../${
parseInt(process.versions.node, 10) < 8 ? 'lib' : 'src'
}/WorkerFarm`);
const WorkerFarm = require('../../../src/WorkerFarm').default;

@@ -12,7 +10,2 @@ function run(a, b) {

function init() {
// Do nothing
}
exports.run = run;
exports.init = init;

@@ -5,7 +5,2 @@ function run() {

function init() {
// do nothing
}
exports.run = run;
exports.init = init;

@@ -1,14 +0,14 @@

const assert = require('assert');
const WorkerFarm = require('../index');
import Logger from '@parcel/logger';
import assert from 'assert';
import WorkerFarm from '../';
describe('WorkerFarm', () => {
describe('WorkerFarm', function() {
this.timeout(10000);
it('Should start up workers', async () => {
let workerfarm = new WorkerFarm(
{},
{
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ping.js')
}
);
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ping.js')
});

@@ -21,10 +21,7 @@ assert.equal(await workerfarm.run(), 'pong');

it('Should handle 1000 requests without any issue', async () => {
let workerfarm = new WorkerFarm(
{},
{
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/echo.js')
}
);
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/echo.js')
});

@@ -40,37 +37,10 @@ let promises = [];

it('Should consistently initialise workers, even after 100 re-inits', async () => {
let options = {
key: 0
};
let workerfarm = new WorkerFarm(options, {
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/init.js')
it('Should warm up workers', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: true,
workerPath: require.resolve('./integration/workerfarm/echo.js')
});
for (let i = 0; i < 100; i++) {
options.key = i;
workerfarm.init(options);
for (let i = 0; i < workerfarm.workers.size; i++) {
assert.equal((await workerfarm.run()).key, options.key);
}
assert.equal(workerfarm.shouldUseRemoteWorkers(), true);
}
await workerfarm.end();
});
it('Should warm up workers', async () => {
let workerfarm = new WorkerFarm(
{},
{
warmWorkers: true,
useLocalWorker: true,
workerPath: require.resolve('./integration/workerfarm/echo.js')
}
);
for (let i = 0; i < 100; i++) {
assert.equal(await workerfarm.run(i), i);

@@ -91,10 +61,7 @@ }

it('Should use the local worker', async () => {
let workerfarm = new WorkerFarm(
{},
{
warmWorkers: true,
useLocalWorker: true,
workerPath: require.resolve('./integration/workerfarm/echo.js')
}
);
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: true,
workerPath: require.resolve('./integration/workerfarm/echo.js')
});

@@ -108,10 +75,7 @@ assert.equal(await workerfarm.run('hello world'), 'hello world');

it('Should be able to use bi-directional communication', async () => {
let workerfarm = new WorkerFarm(
{},
{
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ipc.js')
}
);
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ipc.js')
});

@@ -124,10 +88,7 @@ assert.equal(await workerfarm.run(1, 2), 3);

it('Should be able to handle 1000 bi-directional calls', async () => {
let workerfarm = new WorkerFarm(
{},
{
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ipc.js')
}
);
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ipc.js')
});

@@ -141,11 +102,9 @@ for (let i = 0; i < 1000; i++) {

it('Bi-directional call should return masters pid', async () => {
let workerfarm = new WorkerFarm(
{},
{
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ipc-pid.js')
}
);
it.skip('Bi-directional call should return masters pid', async () => {
// TODO: this test is only good for processes not threads
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ipc-pid.js')
});

@@ -162,10 +121,7 @@ let result = await workerfarm.run();

// This emulates the node.js ipc bug for win32
let workerfarm = new WorkerFarm(
{},
{
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/echo.js')
}
);
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/echo.js')
});

@@ -185,2 +141,89 @@ let bigData = [];

});
it('Forwards stdio from the child process and levels event source', async () => {
let events = [];
let logDisposable = Logger.onLog(event => events.push(event));
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/console.js')
});
await workerfarm.run();
assert.deepEqual(events, [
{
level: 'info',
message: 'one',
type: 'log'
},
{
level: 'info',
message: 'two',
type: 'log'
},
{
level: 'warn',
message: 'three',
type: 'log'
},
{
level: 'error',
message: 'four',
type: 'log'
},
{
level: 'verbose',
message: 'five',
type: 'log'
}
]);
logDisposable.dispose();
await workerfarm.end();
});
it('Forwards logger events to the main process', async () => {
let events = [];
let logDisposable = Logger.onLog(event => events.push(event));
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/logging.js')
});
await workerfarm.run();
// assert.equal(events.length, 2);
assert.deepEqual(events, [
{
level: 'info',
message: 'omg it works',
type: 'log'
},
{
level: 'error',
message: 'errors objects dont work yet',
type: 'log'
}
]);
logDisposable.dispose();
await workerfarm.end();
});
it('Should support reverse handle functions in main process that can be called in workers', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/reverse-handle.js')
});
let handle = workerfarm.createReverseHandle(() => 42);
let result = await workerfarm.run(handle);
assert.equal(result, 42);
await workerfarm.end();
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc