Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

until-stream

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

until-stream - npm Package Compare versions

Comparing version 0.0.0 to 0.1.0

.travis.yml

12

package.json
{
"name": "until-stream",
"version": "0.0.0",
"description": "A through stream that stops piping when a signature is reached ",
"version": "0.1.0",
"description": "A PassThrough stream that stops piping when a pattern is reached ",
"main": "until.js",

@@ -25,9 +25,9 @@ "scripts": {

"tap": "~0.4.0",
"stream-buffers": "~0.2.3"
"stream-buffers": "~0.2.4"
},
"dependencies": {
"pullstream": "~0.1.0",
"over": "0.0.5",
"buffers": "~0.1.1"
"buffers": "~0.1.1",
"readable-stream": "~0.3.0",
"setimmediate": "~1.0.1"
}
}

@@ -1,6 +0,71 @@

until-stream
until-stream [![Build Status](https://travis-ci.org/EvanOxfeld/until-stream.png)](https://travis-ci.org/EvanOxfeld/until-stream)
============
A node.js stream that stops piping a stream when a signature is reached
Ever wanted to pause a stream when a certain String or
a binary signature is reached? Until-Stream is the
answer. Pipe Until-Stream and automatically stop when
your pattern is reached or call read() until the returned
data matches your pattern.
Current API is very unstable
<pre>
--------------------------------------
|Stability - API is somewhat unstable|
--------------------------------------
</pre>
read() and pipe() are implemented with some limitations.
For example, Until-Stream supports piping to only a
single destination stream.
## Installation
```bash
$ npm install until-stream
```
## Quick Examples
### Pipe
```javascript
var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");
var us = new UntilStream({ pattern: 'World'});
var sourceStream = new streamBuffers.ReadableStreamBuffer();
sourceStream.put("Hello World");
var writableStream = new streamBuffers.WritableStreamBuffer();
sourceStream.pipe(us).pipe(writableStream);
writableStream.once('close', function () {
//writeableStream contains all data before the pattern occurs
var str = writableStream.getContentsAsString('utf8'); // 'Hello '
//Now the next call to read() returns the pattern
var data = us.read(); // 'World'
});
```
### Read
```javascript
var UntilStream = require('until-stream');
var streamBuffers = require("stream-buffers");
var us = new UntilStream({ pattern: 'jumps'});
var sourceStream = new streamBuffers.ReadableStreamBuffer({ chunkSize: 8 });
sourceStream.put("The quick brown fox jumps over the lazy dog");
sourceStream.pipe(us);
us.on('readable', function() {
if (us.read() === 'jumps') {
console.log('Pattern reached!');
}
});
```
## License
MIT
'use strict';
module.exports = Until;
var PullStream = require('pullstream');
require("setimmediate");
var PassThrough = require('stream').PassThrough;
var inherits = require("util").inherits;
var Buffers = require('buffers');
function Until(opts) {
this._bufs = Buffers();
PullStream.call(this, opts);
if (!PassThrough) {
PassThrough = require('readable-stream/passthrough');
}
inherits(Until, PullStream);
Until.prototype.pullUntil = function (signature, callback) {
//todo better check
if (typeof signature === "string") {
signature = new Buffer(signature);
}
inherits(Until, PassThrough);
var self = this;
var window = Buffers();
var output = Buffers();
var prevData = new Buffer(0);
pullUntilServiceRequest();
function pullUntilServiceRequest() {
//todo chunk size should be an option
var data = self.pullUpTo(1024);
if (data) {
process(data);
} else {
self.once('readable', function() {
data = self.pullUpTo(1024);
process(data);
});
}
function Until(opts) {
//todo allow pattern to be set later
if (typeof opts.pattern === "string") {
opts.pattern = new Buffer(opts.pattern);
} else if (!opts.pattern instanceof Buffer) {
throw new Error('Invalid pattern type')
}
function process(data) {
window.push(prevData);
if (data) {
window.push(data);
}
this._opts = opts;
this._buf = Buffers();
this._flushing = false;
this.unpiping = false;
//todo make sure this is always async
var sigIndex = window.indexOf(signature);
if (sigIndex >= 0) {
processSignature(sigIndex);
} else {
output.push(prevData);
prevData = data;
pullUntilServiceRequest();
}
}
//todo allow and handle normal PassThrough opts
PassThrough.call(this);
}
function processSignature(sigIndex) {
output.push(window.slice(0, sigIndex));
self._bufs.push(window.slice(sigIndex + signature.length, window.length));
callback(null, output.toBuffer());
Until.prototype.read = function (size) {
if (this.unpiping) {
//either return null from read() until everything is unpiped
// or change flow() within readable-stream.pipe()
return null;
}
};
Until.prototype.pull = function (len, callback) {
//todo improve this logic
if (typeof len === 'function' && !callback) {
callback = len;
len = undefined;
var rs = this._readableState;
var pattern = this._opts.pattern;
var data;
if (this._buf.indexOf(pattern) === 0 ||
(data = PassThrough.prototype.read.call(this, size)) && this._buf.push(data)
&& this._buf.indexOf(pattern) === 0) {
if (rs.pipesCount) {
this.unpiping = true;
endPipes.call(this);
return null;
}
var output = this._buf.slice(0, pattern.length);
this._buf = Buffers([this._buf.slice(pattern.length)]);
return output;
}
if (!this._bufs.length || len === 0) {
if (!len) {
return PullStream.prototype.pull.call(this, callback);
var index = this._buf.indexOf(pattern);
if (index > 0) {
//lop off everything starting with pattern & put it in buffer for next read()
var output = this._buf.slice(0, index);
this._buf = Buffers([this._buf.slice(index)]);
if (rs.pipesCount) {
this.unpiping = true;
process.nextTick(endPipes.bind(this));
}
return PullStream.prototype.pull.call(this, len, callback);
return output;
}
var self = this;
pullServiceRequest();
function pullServiceRequest() {
self._serviceRequests = null;
if (self._flushed) {
return callback(new Error('End of Stream'));
if (pattern.length > this._buf.length) {
if (this._flushing) {
var output = this._buf.toBuffer();
this._buf = Buffers();
return output;
}
return null;
}
//todo cleanup -> keying off of len = undefined is not cool
var data;
if (self._bufs.length >= len) {
data = self._bufs.slice(0, len);
var slice = self._bufs = self._bufs.slice(len, self._bufs.length);
self._bufs = Buffers();
self._bufs.push(slice);
} else if (len) {
var streamData = self.read(len - self._bufs.length);
if (streamData) {
data = Buffer.concat([self._bufs.toBuffer(), streamData]);
self._bufs = Buffers();
}
} else {
var streamData = self.read();
if (streamData) {
data = Buffer.concat([self._bufs.toBuffer(), streamData])
} else {
data = self._bufs.toBuffer();
}
self._bufs = Buffers();
}
//slice off pattern.length - 1 from the end in case the pattern straddles chunks
var offset = this._buf.length - (pattern.length - 1);
var output = this._buf.slice(0, offset);
output = output.length ? output : null;
this._buf = offset > 0 ? Buffers([this._buf.slice(offset)]) : this._buf;
return output;
};
if (data) {
process.nextTick(callback.bind(null, null, data));
} else {
self._serviceRequests = pullServiceRequest;
}
}
Until.prototype.pipe = function(dest, pipeOpts) {
pipeOpts = pipeOpts || {};
pipeOpts.end = pipeOpts.end || false;
return PassThrough.prototype.pipe.call(this, dest, pipeOpts)
};
Until.prototype.pullUpTo = function (len) {
if (this._bufs.length >= len) {
var data = this._bufs.slice(0, len);
var slice = this._bufs.slice(len, this._bufs.length);
this._bufs = Buffers();
this._bufs.push(slice);
return data;
Until.prototype._flush = function(output, cb) {
this._flushing = true;
var rs = this._readableState;
if (rs.length || this._buf.length) {
if (rs.pipesCount > 0) {
writePipes.call(this, this.read());
}
//allow for I/O
return setImmediate(this._flush.bind(this, output, cb));
}
endPipes.call(this);
process.nextTick(cb);
}
var data = PullStream.prototype.pullUpTo.call(this, len - this._bufs.length);
if (data) {
data = Buffer.concat([this._bufs.toBuffer(), data]);
} else if (this._bufs.length) {
data = this._bufs.toBuffer();
function writePipes(data) {
var rs = this._readableState;
switch (rs.pipesCount) {
case 0:
break;
case 1:
rs.pipes.write(data);
break;
default:
//todo
break;
}
this._bufs = Buffers();
return data;
}
Until.prototype._flush = function (outputFn, callback) {
var self = this;
if (this._readableState.length > 0 || this._bufs.length) {
return setImmediate(self._flush.bind(self, outputFn, callback));
}
function endPipes() {
var rs = this._readableState;
var pipes = rs.pipes;
var pipesCount = rs.pipesCount;
this.unpipe();
this.unpiping = false;
this._flushed = true;
if (self._writesFinished) {
self._finish(callback);
} else {
callback();
switch (pipesCount) {
case 0:
break;
case 1:
pipes.end();
break;
default:
//todo
break;
}
};
//todo prepend _bufs in front of pipe
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc