until-stream
Advanced tools
Comparing version 0.0.0 to 0.1.0
{ | ||
"name": "until-stream", | ||
"version": "0.0.0", | ||
"description": "A through stream that stops piping when a signature is reached ", | ||
"version": "0.1.0", | ||
"description": "A PassThrough stream that stops piping when a pattern is reached ", | ||
"main": "until.js", | ||
@@ -25,9 +25,9 @@ "scripts": { | ||
"tap": "~0.4.0", | ||
"stream-buffers": "~0.2.3" | ||
"stream-buffers": "~0.2.4" | ||
}, | ||
"dependencies": { | ||
"pullstream": "~0.1.0", | ||
"over": "0.0.5", | ||
"buffers": "~0.1.1" | ||
"buffers": "~0.1.1", | ||
"readable-stream": "~0.3.0", | ||
"setimmediate": "~1.0.1" | ||
} | ||
} |
@@ -1,6 +0,71 @@ | ||
until-stream | ||
until-stream [![Build Status](https://travis-ci.org/EvanOxfeld/until-stream.png)](https://travis-ci.org/EvanOxfeld/until-stream) | ||
============ | ||
A node.js stream that stops piping a stream when a signature is reached | ||
Ever wanted to pause a stream when a certain String or | ||
a binary signature is reached? Until-Stream is the | ||
answer. Pipe Until-Stream and automatically stop when | ||
your pattern is reached or call read() until the returned | ||
data matches your pattern. | ||
Current API is very unstable | ||
<pre> | ||
-------------------------------------- | ||
|Stability - API is somewhat unstable| | ||
-------------------------------------- | ||
</pre> | ||
read() and pipe() are implemented with some limitations. | ||
For example, Until-Stream supports piping to only a | ||
single destination stream. | ||
## Installation | ||
```bash | ||
$ npm install until-stream | ||
``` | ||
## Quick Examples | ||
### Pipe | ||
```javascript | ||
var UntilStream = require('until-stream'); | ||
var streamBuffers = require("stream-buffers"); | ||
var us = new UntilStream({ pattern: 'World'}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer(); | ||
sourceStream.put("Hello World"); | ||
var writableStream = new streamBuffers.WritableStreamBuffer(); | ||
sourceStream.pipe(us).pipe(writableStream); | ||
writableStream.once('close', function () { | ||
//writeableStream contains all data before the pattern occurs | ||
var str = writableStream.getContentsAsString('utf8'); // 'Hello ' | ||
//Now the next call to read() returns the pattern | ||
var data = us.read(); // 'World' | ||
}); | ||
``` | ||
### Read | ||
```javascript | ||
var UntilStream = require('until-stream'); | ||
var streamBuffers = require("stream-buffers"); | ||
var us = new UntilStream({ pattern: 'jumps'}); | ||
var sourceStream = new streamBuffers.ReadableStreamBuffer({ chunkSize: 8 }); | ||
sourceStream.put("The quick brown fox jumps over the lazy dog"); | ||
sourceStream.pipe(us); | ||
us.on('readable', function() { | ||
if (us.read() === 'jumps') { | ||
console.log('Pattern reached!'); | ||
} | ||
}); | ||
``` | ||
## License | ||
MIT | ||
222
until.js
'use strict'; | ||
module.exports = Until; | ||
var PullStream = require('pullstream'); | ||
require("setimmediate"); | ||
var PassThrough = require('stream').PassThrough; | ||
var inherits = require("util").inherits; | ||
var Buffers = require('buffers'); | ||
function Until(opts) { | ||
this._bufs = Buffers(); | ||
PullStream.call(this, opts); | ||
if (!PassThrough) { | ||
PassThrough = require('readable-stream/passthrough'); | ||
} | ||
inherits(Until, PullStream); | ||
Until.prototype.pullUntil = function (signature, callback) { | ||
//todo better check | ||
if (typeof signature === "string") { | ||
signature = new Buffer(signature); | ||
} | ||
inherits(Until, PassThrough); | ||
var self = this; | ||
var window = Buffers(); | ||
var output = Buffers(); | ||
var prevData = new Buffer(0); | ||
pullUntilServiceRequest(); | ||
function pullUntilServiceRequest() { | ||
//todo chunk size should be an option | ||
var data = self.pullUpTo(1024); | ||
if (data) { | ||
process(data); | ||
} else { | ||
self.once('readable', function() { | ||
data = self.pullUpTo(1024); | ||
process(data); | ||
}); | ||
} | ||
function Until(opts) { | ||
//todo allow pattern to be set later | ||
if (typeof opts.pattern === "string") { | ||
opts.pattern = new Buffer(opts.pattern); | ||
} else if (!opts.pattern instanceof Buffer) { | ||
throw new Error('Invalid pattern type') | ||
} | ||
function process(data) { | ||
window.push(prevData); | ||
if (data) { | ||
window.push(data); | ||
} | ||
this._opts = opts; | ||
this._buf = Buffers(); | ||
this._flushing = false; | ||
this.unpiping = false; | ||
//todo make sure this is always async | ||
var sigIndex = window.indexOf(signature); | ||
if (sigIndex >= 0) { | ||
processSignature(sigIndex); | ||
} else { | ||
output.push(prevData); | ||
prevData = data; | ||
pullUntilServiceRequest(); | ||
} | ||
} | ||
//todo allow and handle normal PassThrough opts | ||
PassThrough.call(this); | ||
} | ||
function processSignature(sigIndex) { | ||
output.push(window.slice(0, sigIndex)); | ||
self._bufs.push(window.slice(sigIndex + signature.length, window.length)); | ||
callback(null, output.toBuffer()); | ||
Until.prototype.read = function (size) { | ||
if (this.unpiping) { | ||
//either return null from read() until everything is unpiped | ||
// or change flow() within readable-stream.pipe() | ||
return null; | ||
} | ||
}; | ||
Until.prototype.pull = function (len, callback) { | ||
//todo improve this logic | ||
if (typeof len === 'function' && !callback) { | ||
callback = len; | ||
len = undefined; | ||
var rs = this._readableState; | ||
var pattern = this._opts.pattern; | ||
var data; | ||
if (this._buf.indexOf(pattern) === 0 || | ||
(data = PassThrough.prototype.read.call(this, size)) && this._buf.push(data) | ||
&& this._buf.indexOf(pattern) === 0) { | ||
if (rs.pipesCount) { | ||
this.unpiping = true; | ||
endPipes.call(this); | ||
return null; | ||
} | ||
var output = this._buf.slice(0, pattern.length); | ||
this._buf = Buffers([this._buf.slice(pattern.length)]); | ||
return output; | ||
} | ||
if (!this._bufs.length || len === 0) { | ||
if (!len) { | ||
return PullStream.prototype.pull.call(this, callback); | ||
var index = this._buf.indexOf(pattern); | ||
if (index > 0) { | ||
//lop off everything starting with pattern & put it in buffer for next read() | ||
var output = this._buf.slice(0, index); | ||
this._buf = Buffers([this._buf.slice(index)]); | ||
if (rs.pipesCount) { | ||
this.unpiping = true; | ||
process.nextTick(endPipes.bind(this)); | ||
} | ||
return PullStream.prototype.pull.call(this, len, callback); | ||
return output; | ||
} | ||
var self = this; | ||
pullServiceRequest(); | ||
function pullServiceRequest() { | ||
self._serviceRequests = null; | ||
if (self._flushed) { | ||
return callback(new Error('End of Stream')); | ||
if (pattern.length > this._buf.length) { | ||
if (this._flushing) { | ||
var output = this._buf.toBuffer(); | ||
this._buf = Buffers(); | ||
return output; | ||
} | ||
return null; | ||
} | ||
//todo cleanup -> keying off of len = undefined is not cool | ||
var data; | ||
if (self._bufs.length >= len) { | ||
data = self._bufs.slice(0, len); | ||
var slice = self._bufs = self._bufs.slice(len, self._bufs.length); | ||
self._bufs = Buffers(); | ||
self._bufs.push(slice); | ||
} else if (len) { | ||
var streamData = self.read(len - self._bufs.length); | ||
if (streamData) { | ||
data = Buffer.concat([self._bufs.toBuffer(), streamData]); | ||
self._bufs = Buffers(); | ||
} | ||
} else { | ||
var streamData = self.read(); | ||
if (streamData) { | ||
data = Buffer.concat([self._bufs.toBuffer(), streamData]) | ||
} else { | ||
data = self._bufs.toBuffer(); | ||
} | ||
self._bufs = Buffers(); | ||
} | ||
//slice off pattern.length - 1 from the end in case the pattern straddles chunks | ||
var offset = this._buf.length - (pattern.length - 1); | ||
var output = this._buf.slice(0, offset); | ||
output = output.length ? output : null; | ||
this._buf = offset > 0 ? Buffers([this._buf.slice(offset)]) : this._buf; | ||
return output; | ||
}; | ||
if (data) { | ||
process.nextTick(callback.bind(null, null, data)); | ||
} else { | ||
self._serviceRequests = pullServiceRequest; | ||
} | ||
} | ||
Until.prototype.pipe = function(dest, pipeOpts) { | ||
pipeOpts = pipeOpts || {}; | ||
pipeOpts.end = pipeOpts.end || false; | ||
return PassThrough.prototype.pipe.call(this, dest, pipeOpts) | ||
}; | ||
Until.prototype.pullUpTo = function (len) { | ||
if (this._bufs.length >= len) { | ||
var data = this._bufs.slice(0, len); | ||
var slice = this._bufs.slice(len, this._bufs.length); | ||
this._bufs = Buffers(); | ||
this._bufs.push(slice); | ||
return data; | ||
Until.prototype._flush = function(output, cb) { | ||
this._flushing = true; | ||
var rs = this._readableState; | ||
if (rs.length || this._buf.length) { | ||
if (rs.pipesCount > 0) { | ||
writePipes.call(this, this.read()); | ||
} | ||
//allow for I/O | ||
return setImmediate(this._flush.bind(this, output, cb)); | ||
} | ||
endPipes.call(this); | ||
process.nextTick(cb); | ||
} | ||
var data = PullStream.prototype.pullUpTo.call(this, len - this._bufs.length); | ||
if (data) { | ||
data = Buffer.concat([this._bufs.toBuffer(), data]); | ||
} else if (this._bufs.length) { | ||
data = this._bufs.toBuffer(); | ||
function writePipes(data) { | ||
var rs = this._readableState; | ||
switch (rs.pipesCount) { | ||
case 0: | ||
break; | ||
case 1: | ||
rs.pipes.write(data); | ||
break; | ||
default: | ||
//todo | ||
break; | ||
} | ||
this._bufs = Buffers(); | ||
return data; | ||
} | ||
Until.prototype._flush = function (outputFn, callback) { | ||
var self = this; | ||
if (this._readableState.length > 0 || this._bufs.length) { | ||
return setImmediate(self._flush.bind(self, outputFn, callback)); | ||
} | ||
function endPipes() { | ||
var rs = this._readableState; | ||
var pipes = rs.pipes; | ||
var pipesCount = rs.pipesCount; | ||
this.unpipe(); | ||
this.unpiping = false; | ||
this._flushed = true; | ||
if (self._writesFinished) { | ||
self._finish(callback); | ||
} else { | ||
callback(); | ||
switch (pipesCount) { | ||
case 0: | ||
break; | ||
case 1: | ||
pipes.end(); | ||
break; | ||
default: | ||
//todo | ||
break; | ||
} | ||
}; | ||
//todo prepend _bufs in front of pipe | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
12966
10
288
72
1
+ Addedreadable-stream@~0.3.0
+ Addedsetimmediate@~1.0.1
+ Addedreadable-stream@0.3.1(transitive)
- Removedover@0.0.5
- Removedpullstream@~0.1.0
- Removedover@0.0.5(transitive)
- Removedpullstream@0.1.0(transitive)
- Removedreadable-stream@0.2.0(transitive)