+56
-16
| var slice = [].slice | ||
| exports = module.exports = function () { | ||
| return new Channel() | ||
| } | ||
| exports = module.exports = Channel | ||
| function Channel() { | ||
| function Channel(options) { | ||
| if (!(this instanceof Channel)) | ||
| return new Channel(options) | ||
| var self = this | ||
| options = options || {} | ||
| this.concurrency = options.concurrency || Infinity | ||
| this.pending = 0 | ||
@@ -14,11 +17,16 @@ this.errors = [] | ||
| this.queue = [] | ||
| this.drains = [] | ||
| // to make .shift() a yieldable | ||
| this.next = function (done) { | ||
| this._next = function (done) { | ||
| self.queue.push(done) | ||
| } | ||
| // yield next drain | ||
| this._nextDrain = function (done) { | ||
| self.drains.push(done) | ||
| } | ||
| // passed to async functions | ||
| // private! | ||
| this.callback = function (err, res) { | ||
| this._callback = function (err, res) { | ||
| self.pending-- | ||
@@ -29,3 +37,6 @@ | ||
| self.push(err || res) | ||
| self._push(err, res) | ||
| while (self.drains.length && self.pushable) | ||
| self.drains.shift()() | ||
| } | ||
@@ -35,9 +46,14 @@ } | ||
| Channel.prototype = { | ||
| // total number of values to be yielded | ||
| get length() { | ||
| return this.pending | ||
| + this.values.length | ||
| + this.queue.length | ||
| + this.errors.length | ||
| }, | ||
| get pushable() { | ||
| return this.pending < this.concurrency | ||
| } | ||
| } | ||
| // push a value or an error | ||
| Channel.prototype._push = function (err, res) { | ||
@@ -52,6 +68,15 @@ if (this.queue.length) | ||
| Channel.prototype.unshift = function (val) { | ||
| if (!arguments.length) | ||
| throw new Error('cannot unshift nothing') | ||
| if (val instanceof Error) | ||
| return this.errors.unshift(val) | ||
| if (arguments.length > 1) | ||
| val = slice.call(arguments) | ||
| this.values.unshift(val) | ||
| } | ||
| // .push(err) | ||
| // .push(val) | ||
| // .push(val...) | ||
| Channel.prototype.unshift = | ||
| Channel.prototype.push = function (val) { | ||
@@ -69,9 +94,8 @@ // push values | ||
| this.pending++ | ||
| return this.callback | ||
| return this._callback | ||
| } | ||
| // var val = yield* shift() | ||
| Channel.prototype.pop = | ||
| Channel.prototype.read = | ||
| Channel.prototype.shift = function* () { | ||
| Channel.prototype.shift = function* (alwaysWait) { | ||
| // throw the first error | ||
@@ -84,7 +108,16 @@ if (this.errors.length) | ||
| // if pending callbacks, queue | ||
| if (this.pending) | ||
| return yield this.next | ||
| if (this.pending || alwaysWait) | ||
| return yield this._next | ||
| // return nothing otherwise | ||
| } | ||
| Channel.prototype.end = function* () { | ||
| Channel.prototype.flush = function* (returnValues) { | ||
| if (returnValues === false) { | ||
| // just execute everything | ||
| while (this.length) | ||
| yield* this.shift() | ||
| return | ||
| } | ||
| // return the values | ||
| var values = [] | ||
@@ -94,2 +127,9 @@ while (this.length) | ||
| return values | ||
| } | ||
| Channel.prototype.drain = function* () { | ||
| if (this.pushable) | ||
| return | ||
| yield this._nextDrain | ||
| } |
+1
-1
| { | ||
| "name": "archan", | ||
| "description": "Array-like generator-based channels", | ||
| "version": "0.3.0", | ||
| "version": "0.4.0", | ||
| "author": { | ||
@@ -6,0 +6,0 @@ "name": "Jonathan Ong", |
+203
-43
| # Archan [](https://travis-ci.org/cojs/archan) | ||
| Array-like channels for [co](https://github.com/visionmedia/co). | ||
| Similar to [chan](https://github.com/brentburgoyne/chan) except it keeps track of the number of pending callbacks and allows a coroutine to exit. | ||
| Similar to [chan](https://github.com/brentburgoyne/chan) except it keeps track of the number of pending callbacks and thus allows a coroutine to exit. | ||
@@ -33,15 +33,47 @@ ## Example | ||
| ### var ch = archan() | ||
| ```js | ||
| var archan = require('archan') | ||
| ``` | ||
| Creates a new channel instance. | ||
| #### var ch = archan([options]) | ||
| ### var cb = ch.push() | ||
| Creates a new channel instance. Options: | ||
| - `concurrency: Infinity` - See below for control flow handling | ||
| ### Array-like Properties | ||
| #### ch.length | ||
| The number of values in the channel. | ||
| Note that if you ever push a "falsey" value in any of your callbacks, | ||
| the while loop in the example will not work as expected. | ||
| Instead, you should check `ch.length`: | ||
| ```js | ||
| var ch = archan() | ||
| setTimeout(ch.push(), 1) | ||
| setTimeout(ch.push(), 10) | ||
| setTimeout(ch.push(), 100) | ||
| setTimeout(ch.push(), 1000) | ||
| var count = 0 | ||
| while (ch.length) { | ||
| yield* ch.shift() | ||
| count++ | ||
| } | ||
| assert.equal(count, 4) | ||
| ``` | ||
| #### var cb = ch.push() | ||
| Returns a new callback you pass to asynchronous functions. | ||
| ```js | ||
| fs.readFile('file.txt', ch.push()) | ||
| var buffer = yield ch.shift() | ||
| request('https://github.com', ch.push()) | ||
| var result = yield ch.shift() | ||
| fs.readFile('file.txt', ch.push()) // push a callback | ||
| var buffer = yield ch.shift() // yield the result of the callback | ||
| request('https://github.com', ch.push()) // push a callback | ||
| var result = yield ch.shift() // yield the result of the callback | ||
| ``` | ||
@@ -58,9 +90,9 @@ | ||
| var stream = fs.createWriteStream('file.txt') | ||
| var cb = ch.push() | ||
| var cb = ch.push() // create a callback | ||
| stream.once('error', cb) | ||
| stream.once('finish', cb) | ||
| yield* ch.shift() | ||
| yield* ch.shift() // wait until the stream is finished | ||
| ``` | ||
| ### ch.push(val...) | ||
| #### ch.push(val...) | ||
@@ -77,11 +109,129 @@ Push a value to the channel synchronously. | ||
| ### var val = yield* ch.shift() | ||
| #### var val = yield* ch.shift([alwaysWait]) | ||
| Returns the next value in the channel. | ||
| The `*` is optional. | ||
| If there are no more pending callbacks, | ||
| it will return `undefined`. | ||
| If `alwaysWait` is `true` and there are no more values, | ||
| it will indefinitely wait for the next value just like [chan](https://github.com/brentburgoyne/chan). | ||
| Otherwise, `undefined` will be returned immediately. | ||
| ### var val = yield* ch.read() | ||
| Note that if any values were errors, | ||
| `.shift()` will throw that error. | ||
| See below for error handling. | ||
| #### ch.unshift(val...) | ||
| Adds a value to the beginning of the channel. | ||
| ### Control Flow | ||
| Since archan keeps track of your callbacks, | ||
| it can help you handle control flow much better. | ||
| #### ch.pending | ||
| The number of pending callbacks in the channel that have not yet returned. | ||
| #### ch.concurrency | ||
| This is the maximum number of pending callbacks you want to allow at once. | ||
| This value only matters when you do `yield* ch.drain()`. | ||
| By default, this value is `Infinity`. | ||
| #### yield* ch.drain() | ||
| If there are too many pending callbacks, | ||
| this yields until the next drain event. | ||
| Otherwise, it returns immediately. | ||
| For example, when saving files from a multipart upload to your server, | ||
| you'd want to limit the number of open file descriptors per request. | ||
| In this example, we'll limit the number to 5: | ||
| ```js | ||
| var parse = require('co-busboy') | ||
| var saveTo = require('save-to') | ||
| app.use(function* () { | ||
| var ch = archan({ | ||
| concurrency: 5 // 5 maximum file descriptors | ||
| }) | ||
| var parts = parse(this, { | ||
| autoFields: true | ||
| }) | ||
| var part | ||
| while (part = yield parts) { | ||
| yield* ch.drain() | ||
| // save the stream to a file | ||
| saveTo(part, 'randomFolder/' + part.filename, ch.push()) | ||
| } | ||
| // wait until all the files are saved | ||
| // and all the file descriptors are closed | ||
| yield* ch.flush() | ||
| // tell the client that everything went okay | ||
| this.status = 204 | ||
| }) | ||
| ``` | ||
| #### var values = yield* ch.flush([returnValues]) | ||
| Return all the pending values at once and clear the channel. | ||
| This is a shortcut for: | ||
| ```js | ||
| var values = [] | ||
| while (ch.length) { | ||
| values.push(yield* ch.shift()) | ||
| } | ||
| return values | ||
| ``` | ||
| If `returnValues` is `false`, | ||
| it won't bother collecting the return values and returning it to you, | ||
| which is most likely the case if you're just using archan for control flow. | ||
| ### Error Handling | ||
| `ch.shift()` will throw any errors that occured in any of the callbacks or that were pushed to the channel. | ||
| These errors will be pushed to the beginning of the channel before any other value. | ||
| Thus, if you want to handle errors on a per-shift basis, | ||
| you should do the following: | ||
| ```js | ||
| var val | ||
| while (ch.length) { | ||
| try { | ||
| val = yield* ch.shift() | ||
| } catch (err) { | ||
| // do something with the error | ||
| err.status = 400 | ||
| throw err | ||
| } | ||
| } | ||
| ``` | ||
| or in a single callback: | ||
| ```js | ||
| fs.readFile('file.txt', ch.push()) | ||
| try { | ||
| var text = yield* ch.shift() | ||
| } catch (err) { | ||
| console.log('this file probably doesn\'t exist') | ||
| } | ||
| ``` | ||
| ### Streams | ||
| Channels are pretty similar to readable object streams except: | ||
| - Order is not preserved | ||
| - Values are executed in parallel instead of in series | ||
| #### var val = yield* ch.read() | ||
| An alias for `ch.shift()`. | ||
@@ -103,42 +253,52 @@ This alias demonstrates archan's synonymity with [Readable Streams](http://nodejs.org/api/stream.html#stream_class_stream_readable) in object mode: | ||
| Note that unlike node's streams, | ||
| order is not preserved, | ||
| specifically with asynchronous functions. | ||
| Note that when passing callbacks, | ||
| order is not preserved. | ||
| In this case, | ||
| order is preserved because values were pushed synchronously. | ||
| ### ch.length | ||
| #### Piping to a Writable Stream | ||
| The number of values in the channel, | ||
| both pending and ready. | ||
| Note that if you ever return a `falsey` value in any of your callbacks, | ||
| the while loop in the example will not work as expected. | ||
| Instead, you should check `ch.length`: | ||
| It's pretty easy to pipe to writable stream, | ||
| even with back pressure. | ||
| However, pipes won't be included in this repo since there are too many different ways you could want to do this. | ||
| Here's a channel that pipes to a stream until it flushes with backpressure: | ||
| ```js | ||
| var stream = new Stream.Writable() | ||
| var ch = archan() | ||
| setTimeout(ch.push(), 1) | ||
| setTimeout(ch.push(), 10) | ||
| setTimeout(ch.push(), 100) | ||
| setTimeout(ch.push(), 1000) | ||
| // add some values | ||
| // has to be added before the pipe begins | ||
| // otherwise `ch.length === 0` | ||
| fs.readFile('file1.txt', ch.push()) | ||
| fs.readFile('file2.txt', ch.push()) | ||
| fs.readFile('file3.txt', ch.push()) | ||
| fs.readFile('file4.txt', ch.push()) | ||
| var count = 0 | ||
| while (ch.length) { | ||
| yield ch.shift() | ||
| count++ | ||
| } | ||
| // the pipe should be in its own coroutine | ||
| co(function* (){ | ||
| while (ch.length) { | ||
| if (!stream.write(yield* ch.shift())) { | ||
| // needs a drain event, so wait until the next one | ||
| yield stream.once.bind(stream, 'drain') | ||
| } | ||
| } | ||
| assert.equal(count, 4) | ||
| // end the stream | ||
| stream.end() | ||
| })() // since we didn't specify an error handler, it'll throw on any errors | ||
| console.log('lets do something else in the mean time') | ||
| ``` | ||
| ### var values = yield* ch.end() | ||
| Suppose you want to indefinitely pipe values from a channel to a Writable Stream. | ||
| Return all the pending values at once. | ||
| At this point, the channel will be empty and you will be able to reuse it. | ||
| This is a shortcut for: | ||
| ```js | ||
| var values = [] | ||
| while (ch.length) { | ||
| values.push(yield* ch.shift()) | ||
| } | ||
| co(function* () { | ||
| while (true) { | ||
| if (!stream.write(yield* ch.shift(true))) { | ||
| yield stream.once.bind(stream, 'drain') | ||
| } | ||
| } | ||
| }) | ||
| ``` | ||
@@ -145,0 +305,0 @@ |
12083
73.93%107
44.59%324
97.56%