Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
task-serializer
Advanced tools
Serialize tasks/promises for integrated control. Option for limiting number of concurrent tasks.
copyright 2020 craigphicks ISC license
The TaskSerializer
module can serialize tasks/promises for integrated control - Tasks/promises can be added immediately as they are produced and then be made available to a consumer when they have resolved and the consumer is ready to read them.
Optionally, the number of concurrently running tasks are limited to a user parameter. In that special case, only functions (and their args) may be added, and function will be executed when a space is available. Trying to add promises will throw an Error.
The are 4 different classes exported from the module:
AsyncIter
NextSymbol
WaitAll
Callbacks
See Essential Information for a discussion of their behavior and differences.
The module is not dependent upon NodeJS, so can be used in browser code.
To make the examples more readable some shared function are used. They are listed at the end of the examples.
One of those shared functions is the async function producer()
. It inputs the tasks by calling install.addTask(...)
staggered over time, followed by install.addEnd()
. Some of those tasks throw Errors
, other resolve normally.
All the below example code is availalable in the example-usages
subdirectory of the installed node module, e.g., node_modules/task-serializer/usage-examples
.
AsyncIter
usage example'use strict';
const {AsyncIter}=require('task-serializer');
const {producer}=require('./demo-lib.js');
async function consumer(ai){
do{
try{
for await(const res of ai){
console.log(' '+JSON.stringify(res));
}
break;
}catch(e){
console.log(' '+'error: '+e.message);
}
}while(true);
}
async function main(){
let ai=new AsyncIter(2);
await Promise.all([producer(ai),consumer(ai)]);
}
main()
.then(()=>{console.log('success');})
.catch((e)=>{console.log('failure '+e.message);});
NextSymbol
usage example'use strict';
const {NextSymbol}=require('task-serializer');
const {makepr,producer}=require('./demo-lib.js');
var somethingElse=makepr();
var iv=setInterval(()=>{somethingElse.resolve("somethingElse");},300);
async function consumer(ts){
let emptied=false;
while(!emptied){
let next = await Promise.race([
somethingElse.promise,
ts.nextSymbol(),
]);
switch(next){
case "somethingElse":
console.log(next);
somethingElse=makepr();// reset
break;
case ts.symbolTaskResolved():{
console.log();
let res=ts.getTaskResolvedValue();
console.log("symbolTaskResolved, result="+res);
break;}
case ts.symbolTaskRejected():{
let e=ts.getTaskRejectedValue();
console.log("symbolTaskRejected, message="+e.message);
break;}
case ts.symbolAllRead():{
console.log("symbolAllRead");
emptied=true;
clearInterval(iv);
break;}
}
}
}
async function main(){
let ts=new NextSymbol({concurrentTaskLimit:2});
await Promise.all([consumer(ts),producer(ts)]);
}
main()
.then(()=>{console.log('success');})
.catch((e)=>{console.log('failure: '+e.message);});
WaitAll
usage examples'use strict';
const {WaitAll}=require('task-serializer');
const {producer}=require('./demo-lib.js');
async function consumer_waitAll(ts){
try{
let r=await ts.waitAll();
console.log(`ts.waitAll() returned`);
console.log(JSON.stringify(r,0,2));
}catch(e){
console.log(`ts.waitAll() caught ${e.message}`);
}
}
async function consumer_waitAllSettled(ts){
let r=await ts.waitAllSettled();
console.log(`ts.waitAllSettled() returned`);
console.log(JSON.stringify(r,0,2));
console.log('consumer finished');
}
async function main(){
let waitAll=new WaitAll({concurrentLimit:2});
await Promise.all([
consumer_waitAll(waitAll),
producer(waitAll),
]);
waitAll=new WaitAll({concurrentLimit:2});
await Promise.all([
consumer_waitAllSettled(waitAll),
producer(waitAll),
]);
}
main()
.then(()=>{console.log('success');})
.catch((e)=>{console.log('failure '+e.message);});
Callbacks
usage example'use strict';
const {Callbacks}=require('task-serializer');
const {producer}=require('./demo-lib.js');
async function consumer(ts){
await new Promise((resolve)=>{
ts.onTaskResolved((resolvedValue)=>{
console.log(`onTaskResolved ${resolvedValue}`);
});
ts.onTaskRejected((rejectedValue)=>{
console.log(`onTaskRejected ${rejectedValue}`);
});
ts.onEmpty(()=>{
console.log(`onEmpty`);
resolve();
});
});
console.log('consumer finished');
}
async function main(){
let ts=new Callbacks({concurrentLimit:2});
await Promise.all([
consumer(ts),// consumer must initialize first
producer(ts)
]);
}
main()
.then(()=>{console.log('success');})
.catch((e)=>{console.log('failure '+e.message);});
demo-lib.js
'use strict';
var {AsyncIter,NextSymbol}=require('task-serializer');
function snooze(ms){return new Promise(r=>setTimeout(r,ms));}
function range(len){return [...Array(len).keys()];}
function makepr(){
let pr={};
pr.promise=new Promise((r)=>{pr.resolve=r;});
return pr;
}
function logStatus(ts){
let wa=ts.getCountWaiting();
let wo=ts.getCountWorking();
let rest=ts.getCountResolvedTotal();
let rejt=ts.getCountRejectedTotal();
let fint=ts.getCountFinishedTotal();
console.log(
`wa:${wa},wo:${wo},rest:${rest},rejt:${rejt},fint:${fint}`);
if ((ts instanceof AsyncIter)||(ts instanceof NextSymbol)){
let resnr=ts.getCountResolvedNotRead();
let rejnr=ts.getCountRejectedNotRead();
console.log(`resnr:${resnr},rejnr:${rejnr}`);
}
}
async function task(id,ms,err=false){
console.log(`-->enter ${id}`);
if (err)
throw new Error(`task failed id=${id}`);
await snooze(ms);
console.log(`<--leave ${id}`);
return `task ${id}, took ${ms}ms`;
}
async function producer(ts){
for (let i=0; i<6; i++){
ts.addTask(task,i,2**(10-i),(i+1)%3==0);
await snooze(100);
}
ts.addEnd();
console.log('producer finished');
}
module.exports.snooze=snooze;
module.exports.task=task;
module.exports.range=range;
module.exports.makepr=makepr;
module.exports.producer=producer;
Each of the classes includes these input functions:
addTask(func,...args)
/addTask(promise)
to add tasks/promises.addEnd()
to indicate that no more tasks/promises will be added, thus allowing exit after the pipeline has drained.The output interface of each of those classes differ, and are suitable for different usage cases. The following table compares some properties of those classes to help decide which is suitable for a given usage case:
property | AsyncIter | NextSymbol | WaitAll | Callbacks |
---|---|---|---|---|
read buffered | yes | yes | yes | no |
continuous vs. batch | cont | cont | batch | cont |
control loop | yes | yes | no | no |
select style | no | yes | N/A | N/A |
where 'property' are as follows:
WaitAll.waitAll
)WaitAll.waitAllSettled
)NextSymbol
usage example.)All rejected tasks/promises are managed so that they don't throw unhandled rejections.
Read-buffered classes prioritize rejected-values over resolved-values, and pass the rejected-values first whenever both are availabe. The exception to this rule is WaitAll.waitAllSettled()
, which transforms rejected-values into resolved-values.
Each task/promise after being added will go through all of the following milestones in order:
addTask
.concurrentTaskLimit
>0, then a task may be forced to wait before start.concurrentTaskLimit
<=0, that all task/promises are (considered) started when added.resolve(<resolved-value>)
or return <resolved-value>
or
throw `TaskSerializer
module. A rejected-value typically satisfies (<rejected-value> instanceof Error)
, but that is not mandatory.AsyncIter
,NextSymbol
, and WaitAll
. In the case of class Callbacks
, one of the onTaskResolved
/onTaskRejected
callbacks is called immediately when finished is reached, so read and finished are reached simultaneously.The class instance passes through the following milestones, in order:
addEnd
has been called to guarantee no more tasks/promises will be added.addEnd
has been called and all added tasks/promises have reached finished.AsyncIter
, NextSymbol
, and WaitAll
. In the case of Callbacks
, one of the callbacks onResolved
/onRejected
will be called immediately upon reaching finished.There is no active termination method, but there is passive termination when instance
is no longer referenced, or processing is deliberately abandoned.
The following two cases are important:
(/*) 'Deadlocked' means a task/promise is waiting on a promise, but there is no possibility that the waited-on-promise can be resolved. The prototypical example of such a promise is new Promise(()=>{})
. More typically interconnected promises deadlock in a practical example of the Dining Philosopher's Problem. Note that in JS, this will NOT happen to a promise directly or indirectly waiting upon external input, e.g., when waiting upon process.stdin
while process.stdin
is active.
In case of running under nodeJS, There is another important termination case:
In the case of nodeJS, when the internal nodeJS event queue becomes empty, nodeJS may decide that the async function referencing our class instance
is deadlocked, and cause that async function to return. This can be very confusing. However, in the author's experience, the deadlock is always real - i.e., it is due to programmer error.
The nodeJS function process.onBeforeExit()
, can be helpful in diagnosing unexpected deadlock. It sets a callback which will be called when nodeJS diagnoses the whole program as being "in deadlock". Example code using process.onBeforeExit()
can found under the node module node_modules/task-serializer/usage-examples-nodejs-only/demo-lib.js
.
instance=new <Classname>({concurrentTaskLimit=0}={})
<Classname>
is one of AsyncIter
,Callbacks
,NextSymbol
,or WaitAll
.concurrentTaskLimit
is the integer number of task allowed to run concurrently, unless it is <=0
, in which case their is no limit.instance.addTask(func,...args)
where (func instanceof Function)
, or addTask(promise)
where (promise instanceof Promise)
concurrentTaskLimit>0
,
addTask
will allow only the first form, and passing a promise will throw.func(...args)
will be called in the order passed, when the concurrent task limit constraint allows.concurrentTaskLimit<=0
, either form is allowed.<Classname>
until reaching miletone read. The tasks/promises may reject, and those rejections are handled to prevent unhandled promise rejections.instance.addEnd()
addTask
will not be called again. It is required so the instance knows that when all tasks/promises have reached the finished milestone, the instance has reached the finished-processing milestone.The following are informational functions common to all the classes.
instance.getCountWaiting()
concurrentTaskLimit<=0
, always returns 0
.instance.getCountWorking()
instance.getCountResolvedTotal()
instance.getCountRejectedTotal()
instance.getCountFinishedTotal()
instance.getCountResolvedTotal()
and instance.getCountRejectedTotal()
The following are informational functions available only in the read-buffered classes AsyncIter
and NextSymbol
:
instance.getCountResolvedNotRead()
instance.getCountRejectedNotRead()
instance.getCountFinishedNotRead()
instance.getCountResolvedNotRead()
and instance.getCountRejectedNotRead()
AsyncIter
only APIAsyncIter
usage example for example.const {AsyncIter}=require('task-serializer')
instance=new AsyncIter({concurrentTaskLimit=0}={})
instance.next()
or implicit async for await (iter of instance)
{done:false,value:<resolved-value>}
<rejected-value>
{done:true}
iter
will be the <resolved-value>
<rejected-value>
NextSymbol
only APINextSymbol
usage example for example.const {NextSymbol}=require('task-serializer')
instance=new NextSymbol({concurrentTaskLimit=0}={})
instance.nextSymbol()
Returns a value strictly equal to one of instance.symbolTaskResolved()
, instance.symbolTaskRejected()
, or instance.symbolAllRead()
.
instance.symbolTaskResolved()
: indicates a task/promise resolved-value is ready to be read.instance.symbolTaskRejected()
: indicates a task/promise rejected-value is ready to be read.instance.symbolAllRead()
: indicates the instance milestone all-read has been reached.instance.getTaskResolvedValue()
async instance.nextSymbol()
has returned a value equal to instance.symbolTaskResolved()
instance.getTaskRejectedValue()
async instance.nextSymbol()
has returned a value equal to instance.symbolTaskRejected()
WaitAll
only APIWaitAll
usage examples for example.const {WaitAll}=require('task-serializer')
instance=new WaitAll({concurrentTaskLimit=0}={})
instance.waitAll()
waitAll
, i.e. no requirement to call before after any instance milestone, although obviously milestone empty is not reached until waitAll
is called.addTask
, then that first one will be returned immediately when waitAll
is called.addTask
is called, waitAll
will immediately return with that error.waitAll
will not return before all the tasks/promises added with addTask
have terminated, and the array of resolved-values will be the order they were added, not the order they were resolved.instance.waitAllSettled()
waitAllSettled()
. Any time from before the first addTask
to after all tasks/promises have terminated (i.e., end-of-processing) is allowed. Measures have been taken to prevent unhandled rejections.waitAllSettled
will return no sooner than end-of-processing. It will return the same value as Promise.waitAllSettled()
would return on an array of all tasks/promises added via addTask
in the order the were added.Callbacks
only APICallbacks
usage example for example.const {Callbacks}=require('task-serializer')
instance=new Callbacks({concurrentTaskLimit=0}={})
instance.onTaskResolved(callback)
instance.onTaskRejected(callback)
instance.onEmpty(callback)
instance.on<*>
function should be called only once per instance. Only one callback per function is actually registered.instance.on<*>
function must be called before the instance has reached the processing milestone, i.e., before the first call to addTask
.FAQs
Serialize tasks/promises for integrated control. Option for limiting number of concurrent tasks.
The npm package task-serializer receives a total of 2 weekly downloads. As such, task-serializer popularity was classified as not popular.
We found that task-serializer demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.