Comparing version 0.3.0 to 0.4.0
72
index.js
var slice = [].slice | ||
exports = module.exports = function () { | ||
return new Channel() | ||
} | ||
exports = module.exports = Channel | ||
function Channel() { | ||
function Channel(options) { | ||
if (!(this instanceof Channel)) | ||
return new Channel(options) | ||
var self = this | ||
options = options || {} | ||
this.concurrency = options.concurrency || Infinity | ||
this.pending = 0 | ||
@@ -14,11 +17,16 @@ this.errors = [] | ||
this.queue = [] | ||
this.drains = [] | ||
// to make .shift() a yieldable | ||
this.next = function (done) { | ||
this._next = function (done) { | ||
self.queue.push(done) | ||
} | ||
// yield next drain | ||
this._nextDrain = function (done) { | ||
self.drains.push(done) | ||
} | ||
// passed to async functions | ||
// private! | ||
this.callback = function (err, res) { | ||
this._callback = function (err, res) { | ||
self.pending-- | ||
@@ -29,3 +37,6 @@ | ||
self.push(err || res) | ||
self._push(err, res) | ||
while (self.drains.length && self.pushable) | ||
self.drains.shift()() | ||
} | ||
@@ -35,9 +46,14 @@ } | ||
Channel.prototype = { | ||
// total number of values to be yielded | ||
get length() { | ||
return this.pending | ||
+ this.values.length | ||
+ this.queue.length | ||
+ this.errors.length | ||
}, | ||
get pushable() { | ||
return this.pending < this.concurrency | ||
} | ||
} | ||
// push a value or an error | ||
Channel.prototype._push = function (err, res) { | ||
@@ -52,6 +68,15 @@ if (this.queue.length) | ||
Channel.prototype.unshift = function (val) { | ||
if (!arguments.length) | ||
throw new Error('cannot unshift nothing') | ||
if (val instanceof Error) | ||
return this.errors.unshift(val) | ||
if (arguments.length > 1) | ||
val = slice.call(arguments) | ||
this.values.unshift(val) | ||
} | ||
// .push(err) | ||
// .push(val) | ||
// .push(val...) | ||
Channel.prototype.unshift = | ||
Channel.prototype.push = function (val) { | ||
@@ -69,9 +94,8 @@ // push values | ||
this.pending++ | ||
return this.callback | ||
return this._callback | ||
} | ||
// var val = yield* shift() | ||
Channel.prototype.pop = | ||
Channel.prototype.read = | ||
Channel.prototype.shift = function* () { | ||
Channel.prototype.shift = function* (alwaysWait) { | ||
// throw the first error | ||
@@ -84,7 +108,16 @@ if (this.errors.length) | ||
// if pending callbacks, queue | ||
if (this.pending) | ||
return yield this.next | ||
if (this.pending || alwaysWait) | ||
return yield this._next | ||
// return nothing otherwise | ||
} | ||
Channel.prototype.end = function* () { | ||
Channel.prototype.flush = function* (returnValues) { | ||
if (returnValues === false) { | ||
// just execute everything | ||
while (this.length) | ||
yield* this.shift() | ||
return | ||
} | ||
// return the values | ||
var values = [] | ||
@@ -94,2 +127,9 @@ while (this.length) | ||
return values | ||
} | ||
Channel.prototype.drain = function* () { | ||
if (this.pushable) | ||
return | ||
yield this._nextDrain | ||
} |
{ | ||
"name": "archan", | ||
"description": "Array-like generator-based channels", | ||
"version": "0.3.0", | ||
"version": "0.4.0", | ||
"author": { | ||
@@ -6,0 +6,0 @@ "name": "Jonathan Ong", |
246
README.md
# Archan [![Build Status](https://travis-ci.org/cojs/archan.png)](https://travis-ci.org/cojs/archan) | ||
Array-like channels for [co](https://github.com/visionmedia/co). | ||
Similar to [chan](https://github.com/brentburgoyne/chan) except it keeps track of the number of pending callbacks and allows a coroutine to exit. | ||
Similar to [chan](https://github.com/brentburgoyne/chan) except it keeps track of the number of pending callbacks and thus allows a coroutine to exit. | ||
@@ -33,15 +33,47 @@ ## Example | ||
### var ch = archan() | ||
```js | ||
var archan = require('archan') | ||
``` | ||
Creates a new channel instance. | ||
#### var ch = archan([options]) | ||
### var cb = ch.push() | ||
Creates a new channel instance. Options: | ||
- `concurrency: Infinity` - See below for control flow handling | ||
### Array-like Properties | ||
#### ch.length | ||
The number of values in the channel. | ||
Note that if you ever push a "falsey" value in any of your callbacks, | ||
the while loop in the example will not work as expected. | ||
Instead, you should check `ch.length`: | ||
```js | ||
var ch = archan() | ||
setTimeout(ch.push(), 1) | ||
setTimeout(ch.push(), 10) | ||
setTimeout(ch.push(), 100) | ||
setTimeout(ch.push(), 1000) | ||
var count = 0 | ||
while (ch.length) { | ||
yield* ch.shift() | ||
count++ | ||
} | ||
assert.equal(count, 4) | ||
``` | ||
#### var cb = ch.push() | ||
Returns a new callback you pass to asynchronous functions. | ||
```js | ||
fs.readFile('file.txt', ch.push()) | ||
var buffer = yield ch.shift() | ||
request('https://github.com', ch.push()) | ||
var result = yield ch.shift() | ||
fs.readFile('file.txt', ch.push()) // push a callback | ||
var buffer = yield ch.shift() // yield the result of the callback | ||
request('https://github.com', ch.push()) // push a callback | ||
var result = yield ch.shift() // yield the result of the callback | ||
``` | ||
@@ -58,9 +90,9 @@ | ||
var stream = fs.createWriteStream('file.txt') | ||
var cb = ch.push() | ||
var cb = ch.push() // create a callback | ||
stream.once('error', cb) | ||
stream.once('finish', cb) | ||
yield* ch.shift() | ||
yield* ch.shift() // wait until the stream is finished | ||
``` | ||
### ch.push(val...) | ||
#### ch.push(val...) | ||
@@ -77,11 +109,129 @@ Push a value to the channel synchronously. | ||
### var val = yield* ch.shift() | ||
#### var val = yield* ch.shift([alwaysWait]) | ||
Returns the next value in the channel. | ||
The `*` is optional. | ||
If there are no more pending callbacks, | ||
it will return `undefined`. | ||
If `alwaysWait` is `true` and there are no more values, | ||
it will indefinitely wait for the next value just like [chan](https://github.com/brentburgoyne/chan). | ||
Otherwise, `undefined` will be returned immediately. | ||
### var val = yield* ch.read() | ||
Note that if any values were errors, | ||
`.shift()` will throw that error. | ||
See below for error handling. | ||
#### ch.unshift(val...) | ||
Adds a value to the beginning of the channel. | ||
### Control Flow | ||
Since archan keeps track of your callbacks, | ||
it can help you handle control flow much better. | ||
#### ch.pending | ||
The number of pending callbacks in the channel that have not yet returned. | ||
#### ch.concurrency | ||
This is the maximum number of pending callbacks you want to allow at once. | ||
This value only matters when you do `yield* ch.drain()`. | ||
By default, this value is `Infinity`. | ||
#### yield* ch.drain() | ||
If there are too many pending callbacks, | ||
this yields until the next drain event. | ||
Otherwise, it returns immediately. | ||
For example, when saving files from a multipart upload to your server, | ||
you'd want to limit the number of open file descriptors per request. | ||
In this example, we'll limit the number to 5: | ||
```js | ||
var parse = require('co-busboy') | ||
var saveTo = require('save-to') | ||
app.use(function* () { | ||
var ch = archan({ | ||
concurrency: 5 // 5 maximum file descriptors | ||
}) | ||
var parts = parse(this, { | ||
autoFields: true | ||
}) | ||
var part | ||
while (part = yield parts) { | ||
yield* ch.drain() | ||
// save the stream to a file | ||
saveTo(part, 'randomFolder/' + part.filename, ch.push()) | ||
} | ||
// wait until all the files are saved | ||
// and all the file descriptors are closed | ||
yield* ch.flush() | ||
// tell the client that everything went okay | ||
this.status = 204 | ||
}) | ||
``` | ||
#### var values = yield* ch.flush([returnValues]) | ||
Return all the pending values at once and clear the channel. | ||
This is a shortcut for: | ||
```js | ||
var values = [] | ||
while (ch.length) { | ||
values.push(yield* ch.shift()) | ||
} | ||
return values | ||
``` | ||
If `returnValues` is `false`, | ||
it won't bother collecting the return values and returning it to you, | ||
which is most likely the case if you're just using archan for control flow. | ||
### Error Handling | ||
`ch.shift()` will throw any errors that occured in any of the callbacks or that were pushed to the channel. | ||
These errors will be pushed to the beginning of the channel before any other value. | ||
Thus, if you want to handle errors on a per-shift basis, | ||
you should do the following: | ||
```js | ||
var val | ||
while (ch.length) { | ||
try { | ||
val = yield* ch.shift() | ||
} catch (err) { | ||
// do something with the error | ||
err.status = 400 | ||
throw err | ||
} | ||
} | ||
``` | ||
or in a single callback: | ||
```js | ||
fs.readFile('file.txt', ch.push()) | ||
try { | ||
var text = yield* ch.shift() | ||
} catch (err) { | ||
console.log('this file probably doesn\'t exist') | ||
} | ||
``` | ||
### Streams | ||
Channels are pretty similar to readable object streams except: | ||
- Order is not preserved | ||
- Values are executed in parallel instead of in series | ||
#### var val = yield* ch.read() | ||
An alias for `ch.shift()`. | ||
@@ -103,42 +253,52 @@ This alias demonstrates archan's synonymity with [Readable Streams](http://nodejs.org/api/stream.html#stream_class_stream_readable) in object mode: | ||
Note that unlike node's streams, | ||
order is not preserved, | ||
specifically with asynchronous functions. | ||
Note that when passing callbacks, | ||
order is not preserved. | ||
In this case, | ||
order is preserved because values were pushed synchronously. | ||
### ch.length | ||
#### Piping to a Writable Stream | ||
The number of values in the channel, | ||
both pending and ready. | ||
Note that if you ever return a `falsey` value in any of your callbacks, | ||
the while loop in the example will not work as expected. | ||
Instead, you should check `ch.length`: | ||
It's pretty easy to pipe to writable stream, | ||
even with back pressure. | ||
However, pipes won't be included in this repo since there are too many different ways you could want to do this. | ||
Here's a channel that pipes to a stream until it flushes with backpressure: | ||
```js | ||
var stream = new Stream.Writable() | ||
var ch = archan() | ||
setTimeout(ch.push(), 1) | ||
setTimeout(ch.push(), 10) | ||
setTimeout(ch.push(), 100) | ||
setTimeout(ch.push(), 1000) | ||
// add some values | ||
// has to be added before the pipe begins | ||
// otherwise `ch.length === 0` | ||
fs.readFile('file1.txt', ch.push()) | ||
fs.readFile('file2.txt', ch.push()) | ||
fs.readFile('file3.txt', ch.push()) | ||
fs.readFile('file4.txt', ch.push()) | ||
var count = 0 | ||
while (ch.length) { | ||
yield ch.shift() | ||
count++ | ||
} | ||
// the pipe should be in its own coroutine | ||
co(function* (){ | ||
while (ch.length) { | ||
if (!stream.write(yield* ch.shift())) { | ||
// needs a drain event, so wait until the next one | ||
yield stream.once.bind(stream, 'drain') | ||
} | ||
} | ||
assert.equal(count, 4) | ||
// end the stream | ||
stream.end() | ||
})() // since we didn't specify an error handler, it'll throw on any errors | ||
console.log('lets do something else in the mean time') | ||
``` | ||
### var values = yield* ch.end() | ||
Suppose you want to indefinitely pipe values from a channel to a Writable Stream. | ||
Return all the pending values at once. | ||
At this point, the channel will be empty and you will be able to reuse it. | ||
This is a shortcut for: | ||
```js | ||
var values = [] | ||
while (ch.length) { | ||
values.push(yield* ch.shift()) | ||
} | ||
co(function* () { | ||
while (true) { | ||
if (!stream.write(yield* ch.shift(true))) { | ||
yield stream.once.bind(stream, 'drain') | ||
} | ||
} | ||
}) | ||
``` | ||
@@ -145,0 +305,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
12083
107
324