streamfilter
Advanced tools
Comparing version 1.0.3 to 1.0.4
{ | ||
"name": "streamfilter", | ||
"version": "1.0.3", | ||
"version": "1.0.4", | ||
"description": "Filtering streams.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -27,3 +27,3 @@ 'use strict'; | ||
this._transform = function(chunk, encoding, done) { | ||
this._transform = function streamFilterTransform(chunk, encoding, done) { | ||
filterCallback(chunk, encoding, function StreamFilterCallback(filter) { | ||
@@ -37,3 +37,5 @@ if(!filter) { | ||
} else { | ||
_this.restore.__programPush(chunk, encoding, done); | ||
_this.restore.__programPush(chunk, encoding, function() { | ||
done(); | ||
}); | ||
} | ||
@@ -46,3 +48,3 @@ } else { | ||
this._flush = function(done) { | ||
this._flush = function streamFilterFlush(done) { | ||
this._filterStreamEnded = true; | ||
@@ -66,3 +68,3 @@ done(); | ||
this.restore._transform = function(chunk, encoding, done) { | ||
this.restore._transform = function streamFilterRestoreTransform(chunk, encoding, done) { | ||
_this.restore.push(chunk, encoding); | ||
@@ -72,3 +74,3 @@ done(); | ||
this.restore._flush = function(done) { | ||
this.restore._flush = function streamFilterRestoreFlush(done) { | ||
_this._restoreStreamCallback = done; | ||
@@ -84,13 +86,19 @@ if(_this._filterStreamEnded) { | ||
this.restore.__programPush = function(chunk, encoding, done) { | ||
this.restore.__programPush = function streamFilterRestoreProgramPush(chunk, encoding, done) { | ||
if(_this.restore.__programmedPush) { | ||
_this.emit('error', new Error('No supposed to happen!')); | ||
_this.restore.emit('error', new Error('Not supposed to happen!')); | ||
} | ||
_this.restore.__programmedPush = [chunk, encoding, done]; | ||
_this.restore.__attemptPush(); | ||
// Need to be async to avoid nested push attempts | ||
setImmediate(_this.restore.__attemptPush.bind(_this.restore)); | ||
_this.restore.emit('readable'); | ||
_this.restore.emit('drain'); | ||
}; | ||
this.restore.__attemptPush = function() { | ||
this.restore.__attemptPush = function streamFilterRestoreAttemptPush() { | ||
var cb = null; | ||
if(_this.restore.__waitPush) { | ||
if(_this.restore.__programmedPush) { | ||
cb = _this.restore.__programmedPush[2]; | ||
_this.restore.__waitPush = _this.restore.push( | ||
@@ -100,9 +108,14 @@ _this.restore.__programmedPush[0], | ||
); | ||
_this.restore.__programmedPush[2](); | ||
_this.restore.__programmedPush = null; | ||
cb(); | ||
} | ||
} else { | ||
setImmediate(function() { | ||
// Need to be async to avoid nested push attempts | ||
_this.restore.emit('readable'); | ||
}); | ||
} | ||
}; | ||
this.restore._read = function() { | ||
this.restore._read = function streamFilterRestoreRead() { | ||
_this.restore.__waitPush = true; | ||
@@ -109,0 +122,0 @@ // Need to be async to avoid nested push attempts |
@@ -0,1 +1,3 @@ | ||
'use strict'; | ||
var assert = require('assert'); | ||
@@ -18,7 +20,12 @@ var Stream = require('stream'); | ||
describe('should work', function() { | ||
it('should work without new', function() { | ||
var createFilter = StreamFilter; | ||
assert(createFilter(function() {}) instanceof StreamFilter); | ||
}); | ||
}); | ||
// Iterating through versions | ||
@@ -85,2 +92,88 @@ StreamTest.versions.forEach(function(version) { | ||
it('with restore option and more than 16 nested objects', function(done) { | ||
var nDone = 0; | ||
var inputStream = StreamTest[version].fromObjects([ | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
object1, object2, object1, object2, object1, object2, object1, object2, | ||
]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(objs.length, 32); | ||
if(2 === ++nDone) { | ||
done(); | ||
} | ||
}); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.equal(objs2.length, 32); | ||
if(2 === ++nDone) { | ||
done(); | ||
} | ||
})); | ||
inputStream.pipe(filter).pipe(outputStream); | ||
}); | ||
it('with restore option and more than 16 objects', function(done) { | ||
var nDone = 0; | ||
var inputStream = StreamTest[version].fromObjects([ | ||
object1, object1, object1, object1, object1, object1, object1, object1, | ||
object1, object1, object1, object1, object1, object1, object1, object1, | ||
object1, object1, object1, object1, object1, object1, object1, object1, | ||
object2, object2, object2, object2, object2, object2, object2, object2, | ||
object2, object2, object2, object2, object2, object2, object2, object2, | ||
object2, object2, object2, object2, object2, object2, object2, object2, | ||
]); | ||
var filter = new StreamFilter(function(obj, unused, cb) { | ||
if(obj === object2) { | ||
return cb(true); | ||
} | ||
return cb(false); | ||
}, { | ||
objectMode: true, | ||
restore: true, | ||
}); | ||
var outputStream = StreamTest[version].toObjects(function(err, objs) { | ||
if(err) { | ||
return done(err); | ||
} | ||
assert.equal(objs.length, 24); | ||
if(2 === ++nDone) { | ||
done(); | ||
} | ||
}); | ||
filter.restore.pipe(StreamTest[version].toObjects(function(err2, objs2) { | ||
if(err2) { | ||
return done(err2); | ||
} | ||
assert.equal(objs2.length, 24); | ||
if(2 === ++nDone) { | ||
done(); | ||
} | ||
})); | ||
inputStream.pipe(filter).pipe(outputStream); | ||
}); | ||
it('with restore and passthrough option in a different pipeline', function(done) { | ||
@@ -87,0 +180,0 @@ var inputStream = StreamTest[version].fromObjects([object1, object2]); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
30572
512