Security News
GitHub Removes Malicious Pull Requests Targeting Open Source Repositories
GitHub removed 27 malicious pull requests attempting to inject harmful code across multiple open source repositories, in another round of low-effort attacks.
async-pipeline
Advanced tools
async-pipeline allows you to build a complex conditional flow from loosely coupled components.
npm install async-pipeline
Imagine you have set of processing operations that might be applied in different order based on input and intermediate results.
Of course you can build it as Promise
chain(s) but it will look hard to read at least. And over time, after more additions, you have a good chance it to turn into unreadable code mess.
Another much easier approach would be to build the flow based on PubSub, and let processing functions listen and emit messages with payloads. This would let you split the flow into easily tested atomic stages doing one thing at a time, not coupled to other components. Plus, you can have multiple functions listening to the same events and build parallel secondary flows like logging and tracking aside from the main logic.
async-pipeline
is helping to build such message hub by adding few generic features on top of EventEmitter
:
@end
, @error
process.hrtime
new Pipeline({
transitions: {
'start': ['stage-1:progress', 'stage-1:done'],
'stage-1:done': ['stage-2:done']
}
})
.on('@error', console.error.bind('ERROR:'))
.on('@end', dump => {
console.log('END:', inspect(dump, {depth: 10}))
})
.on('start', function (count) {
let i = 0
while (i++ < count) setTimeout(this.emit, i * 100, 'stage-1:progress', i)
setTimeout(this.emit, (count + 1) * 100, 'stage-1:done')
})
.on('stage-1:done', function () {
setTimeout(this.emit, 200, 'stage-2:done', 'hello', 'there')
})
.on('stage-2:done', function(...args) {
console.log('after stage-2', ...args)
this.end()
})
.on('stage-1:progress', console.log.bind(console, 'stage1'))
.start('start', 5)
the code would output following:
stage1 1
stage1 2
stage1 3
stage1 4
stage1 5
after stage-2 hello there
END: [ { event: 'start',
payload: [ 5 ],
routes:
[ { event: 'stage-1:progress',
payload: [ 1 ],
routes: [],
time: 107 },
{ event: 'stage-1:progress',
payload: [ 2 ],
routes: [],
time: 200 },
{ event: 'stage-1:progress',
payload: [ 3 ],
routes: [],
time: 305 },
{ event: 'stage-1:progress',
payload: [ 4 ],
routes: [],
time: 402 },
{ event: 'stage-1:progress',
payload: [ 5 ],
routes: [],
time: 503 },
{ event: 'stage-1:done',
payload: [],
routes:
[ { event: 'stage-2:done',
payload: [ 'hello', 'there' ],
routes: [],
time: 806 } ],
time: 604 } ],
time: 0 } ]
Call new Pipeline(options)
or just Pipeline(options)
to get a pipeline instance. Where options
are:
debug
- optional function to print internal logs, easiest way to get debugging output is to pass {debug: console.log}
contextAPI
– true
by default, expose API controls to the handlers through this
, otherwise pass as a first argumenttransitions
- optional mapping defining allowed transitions and entry points. Example:{
'from-event-0': ['to-event-1', 'to-event-2'],
'from-event-2': ['to-event-3']
}
this.start('event', ...payload)
- start the flow with event event
and optional payload. This will throw if options.constraints
is set and has no event
key
this.context(dataObject)
- defines data object available to all handlers through this.context()
@all (event, ...payload)
- catches all events, emitted right before normal event. Useful mainly for testing@end (trace)
- fired once any handler calls this.end()
. Also automatically triggered on unexpected errors@error (error, trace)
- fired if any handler has thrown or explicitly called this.end(error)
. This will cause an uncaught exception bubbling to the process
level if no handler registeredEach handler except ones for internal events has access to following methods exposed though call context. Keep in mind that handlers should be defined as function() {}
, arrow functions would forcefully override the context
this.end([error])
- end the flow and void all eventual events. If error
passed that it's considered an emergency shutdown. In either case if will end up triggering @end
event
this.emit('event', ...payload)
- cast an event with arbitrary payload. This will throw if emitted event is violating options.constraints
this.context([dataObject])
- getter/setter for context data kept shared for all handler. This should be used wisely though to keep handlers uncoupled
this.safe(fn)
- decorator for async calls that may throw within handler. Following would throw to the top level unless wrapped into safe()
:
pipeline.on('event', () => {
const later = this.safe(() => { throw new Error() })
setTimeout(later, 0)
})
Errors thrown from within event handlers will be caught and routed as @error
event payload. However those would bubble up to the top if no handler defined
Errors thrown from within internal events (@end
, @error
) are always bypassing @error
and propagate to the top to avoid recursive failures. Same for the failures in start()
and on()
calls
If you Node's EventEmitter
doesn't work for you for whatever reason then you can inject the alternative
Pipeline = Pipeline.di({EventEmitter: EventEmitterClass})
FAQs
Async pipeline with transition constraints and time tracking
The npm package async-pipeline receives a total of 2 weekly downloads. As such, async-pipeline popularity was classified as not popular.
We found that async-pipeline demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
GitHub removed 27 malicious pull requests attempting to inject harmful code across multiple open source repositories, in another round of low-effort attacks.
Security News
RubyGems.org has added a new "maintainer" role that allows for publishing new versions of gems. This new permission type is aimed at improving security for gem owners and the service overall.
Security News
Node.js will be enforcing stricter semver-major PR policies a month before major releases to enhance stability and ensure reliable release candidates.