Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

archan

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

archan - npm Package Compare versions

Comparing version 0.3.0 to 0.4.0

72

index.js
var slice = [].slice
exports = module.exports = function () {
return new Channel()
}
exports = module.exports = Channel
function Channel() {
function Channel(options) {
if (!(this instanceof Channel))
return new Channel(options)
var self = this
options = options || {}
this.concurrency = options.concurrency || Infinity
this.pending = 0

@@ -14,11 +17,16 @@ this.errors = []

this.queue = []
this.drains = []
// to make .shift() a yieldable
this.next = function (done) {
this._next = function (done) {
self.queue.push(done)
}
// yield next drain
this._nextDrain = function (done) {
self.drains.push(done)
}
// passed to async functions
// private!
this.callback = function (err, res) {
this._callback = function (err, res) {
self.pending--

@@ -29,3 +37,6 @@

self.push(err || res)
self._push(err, res)
while (self.drains.length && self.pushable)
self.drains.shift()()
}

@@ -35,9 +46,14 @@ }

Channel.prototype = {
// total number of values to be yielded
get length() {
return this.pending
+ this.values.length
+ this.queue.length
+ this.errors.length
},
get pushable() {
return this.pending < this.concurrency
}
}
// push a value or an error
Channel.prototype._push = function (err, res) {

@@ -52,6 +68,15 @@ if (this.queue.length)

Channel.prototype.unshift = function (val) {
if (!arguments.length)
throw new Error('cannot unshift nothing')
if (val instanceof Error)
return this.errors.unshift(val)
if (arguments.length > 1)
val = slice.call(arguments)
this.values.unshift(val)
}
// .push(err)
// .push(val)
// .push(val...)
Channel.prototype.unshift =
Channel.prototype.push = function (val) {

@@ -69,9 +94,8 @@ // push values

this.pending++
return this.callback
return this._callback
}
// var val = yield* shift()
Channel.prototype.pop =
Channel.prototype.read =
Channel.prototype.shift = function* () {
Channel.prototype.shift = function* (alwaysWait) {
// throw the first error

@@ -84,7 +108,16 @@ if (this.errors.length)

// if pending callbacks, queue
if (this.pending)
return yield this.next
if (this.pending || alwaysWait)
return yield this._next
// return nothing otherwise
}
Channel.prototype.end = function* () {
Channel.prototype.flush = function* (returnValues) {
if (returnValues === false) {
// just execute everything
while (this.length)
yield* this.shift()
return
}
// return the values
var values = []

@@ -94,2 +127,9 @@ while (this.length)

return values
}
Channel.prototype.drain = function* () {
if (this.pushable)
return
yield this._nextDrain
}
{
"name": "archan",
"description": "Array-like generator-based channels",
"version": "0.3.0",
"version": "0.4.0",
"author": {

@@ -6,0 +6,0 @@ "name": "Jonathan Ong",

# Archan [![Build Status](https://travis-ci.org/cojs/archan.png)](https://travis-ci.org/cojs/archan)
Array-like channels for [co](https://github.com/visionmedia/co).
Similar to [chan](https://github.com/brentburgoyne/chan) except it keeps track of the number of pending callbacks and allows a coroutine to exit.
Similar to [chan](https://github.com/brentburgoyne/chan) except it keeps track of the number of pending callbacks and thus allows a coroutine to exit.

@@ -33,15 +33,47 @@ ## Example

### var ch = archan()
```js
var archan = require('archan')
```
Creates a new channel instance.
#### var ch = archan([options])
### var cb = ch.push()
Creates a new channel instance. Options:
- `concurrency: Infinity` - See below for control flow handling
### Array-like Properties
#### ch.length
The number of values in the channel.
Note that if you ever push a "falsey" value in any of your callbacks,
the while loop in the example will not work as expected.
Instead, you should check `ch.length`:
```js
var ch = archan()
setTimeout(ch.push(), 1)
setTimeout(ch.push(), 10)
setTimeout(ch.push(), 100)
setTimeout(ch.push(), 1000)
var count = 0
while (ch.length) {
yield* ch.shift()
count++
}
assert.equal(count, 4)
```
#### var cb = ch.push()
Returns a new callback you pass to asynchronous functions.
```js
fs.readFile('file.txt', ch.push())
var buffer = yield ch.shift()
request('https://github.com', ch.push())
var result = yield ch.shift()
fs.readFile('file.txt', ch.push()) // push a callback
var buffer = yield ch.shift() // yield the result of the callback
request('https://github.com', ch.push()) // push a callback
var result = yield ch.shift() // yield the result of the callback
```

@@ -58,9 +90,9 @@

var stream = fs.createWriteStream('file.txt')
var cb = ch.push()
var cb = ch.push() // create a callback
stream.once('error', cb)
stream.once('finish', cb)
yield* ch.shift()
yield* ch.shift() // wait until the stream is finished
```
### ch.push(val...)
#### ch.push(val...)

@@ -77,11 +109,129 @@ Push a value to the channel synchronously.

### var val = yield* ch.shift()
#### var val = yield* ch.shift([alwaysWait])
Returns the next value in the channel.
The `*` is optional.
If there are no more pending callbacks,
it will return `undefined`.
If `alwaysWait` is `true` and there are no more values,
it will indefinitely wait for the next value just like [chan](https://github.com/brentburgoyne/chan).
Otherwise, `undefined` will be returned immediately.
### var val = yield* ch.read()
Note that if any values were errors,
`.shift()` will throw that error.
See below for error handling.
#### ch.unshift(val...)
Adds a value to the beginning of the channel.
### Control Flow
Since archan keeps track of your callbacks,
it can help you handle control flow much better.
#### ch.pending
The number of pending callbacks in the channel that have not yet returned.
#### ch.concurrency
This is the maximum number of pending callbacks you want to allow at once.
This value only matters when you do `yield* ch.drain()`.
By default, this value is `Infinity`.
#### yield* ch.drain()
If there are too many pending callbacks,
this yields until the next drain event.
Otherwise, it returns immediately.
For example, when saving files from a multipart upload to your server,
you'd want to limit the number of open file descriptors per request.
In this example, we'll limit the number to 5:
```js
var parse = require('co-busboy')
var saveTo = require('save-to')
app.use(function* () {
var ch = archan({
concurrency: 5 // 5 maximum file descriptors
})
var parts = parse(this, {
autoFields: true
})
var part
while (part = yield parts) {
yield* ch.drain()
// save the stream to a file
saveTo(part, 'randomFolder/' + part.filename, ch.push())
}
// wait until all the files are saved
// and all the file descriptors are closed
yield* ch.flush()
// tell the client that everything went okay
this.status = 204
})
```
#### var values = yield* ch.flush([returnValues])
Return all the pending values at once and clear the channel.
This is a shortcut for:
```js
var values = []
while (ch.length) {
values.push(yield* ch.shift())
}
return values
```
If `returnValues` is `false`,
it won't bother collecting the return values and returning it to you,
which is most likely the case if you're just using archan for control flow.
### Error Handling
`ch.shift()` will throw any errors that occured in any of the callbacks or that were pushed to the channel.
These errors will be pushed to the beginning of the channel before any other value.
Thus, if you want to handle errors on a per-shift basis,
you should do the following:
```js
var val
while (ch.length) {
try {
val = yield* ch.shift()
} catch (err) {
// do something with the error
err.status = 400
throw err
}
}
```
or in a single callback:
```js
fs.readFile('file.txt', ch.push())
try {
var text = yield* ch.shift()
} catch (err) {
console.log('this file probably doesn\'t exist')
}
```
### Streams
Channels are pretty similar to readable object streams except:
- Order is not preserved
- Values are executed in parallel instead of in series
#### var val = yield* ch.read()
An alias for `ch.shift()`.

@@ -103,42 +253,52 @@ This alias demonstrates archan's synonymity with [Readable Streams](http://nodejs.org/api/stream.html#stream_class_stream_readable) in object mode:

Note that unlike node's streams,
order is not preserved,
specifically with asynchronous functions.
Note that when passing callbacks,
order is not preserved.
In this case,
order is preserved because values were pushed synchronously.
### ch.length
#### Piping to a Writable Stream
The number of values in the channel,
both pending and ready.
Note that if you ever return a `falsey` value in any of your callbacks,
the while loop in the example will not work as expected.
Instead, you should check `ch.length`:
It's pretty easy to pipe to writable stream,
even with back pressure.
However, pipes won't be included in this repo since there are too many different ways you could want to do this.
Here's a channel that pipes to a stream until it flushes with backpressure:
```js
var stream = new Stream.Writable()
var ch = archan()
setTimeout(ch.push(), 1)
setTimeout(ch.push(), 10)
setTimeout(ch.push(), 100)
setTimeout(ch.push(), 1000)
// add some values
// has to be added before the pipe begins
// otherwise `ch.length === 0`
fs.readFile('file1.txt', ch.push())
fs.readFile('file2.txt', ch.push())
fs.readFile('file3.txt', ch.push())
fs.readFile('file4.txt', ch.push())
var count = 0
while (ch.length) {
yield ch.shift()
count++
}
// the pipe should be in its own coroutine
co(function* (){
while (ch.length) {
if (!stream.write(yield* ch.shift())) {
// needs a drain event, so wait until the next one
yield stream.once.bind(stream, 'drain')
}
}
assert.equal(count, 4)
// end the stream
stream.end()
})() // since we didn't specify an error handler, it'll throw on any errors
console.log('lets do something else in the mean time')
```
### var values = yield* ch.end()
Suppose you want to indefinitely pipe values from a channel to a Writable Stream.
Return all the pending values at once.
At this point, the channel will be empty and you will be able to reuse it.
This is a shortcut for:
```js
var values = []
while (ch.length) {
values.push(yield* ch.shift())
}
co(function* () {
while (true) {
if (!stream.write(yield* ch.shift(true))) {
yield stream.once.bind(stream, 'drain')
}
}
})
```

@@ -145,0 +305,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc