streamfilter
Advanced tools
Comparing version 1.0.4 to 1.0.5
{ | ||
"name": "streamfilter", | ||
"version": "1.0.4", | ||
"version": "1.0.5", | ||
"description": "Filtering streams.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
115
src/index.js
@@ -33,9 +33,5 @@ 'use strict'; | ||
} else if(options.restore) { | ||
if(options.passthrough) { | ||
_this.restore.write(chunk, encoding, done); | ||
} else { | ||
_this.restore.__programPush(chunk, encoding, function() { | ||
done(); | ||
}); | ||
} | ||
_this._restoreManager.programPush(chunk, encoding, function() { | ||
done(); | ||
}); | ||
} else { | ||
@@ -52,3 +48,5 @@ done(); | ||
if(!options.passthrough) { | ||
this.restore.push(null); | ||
this._restoreManager.programPush(null, {}.undef, function() { | ||
done(); | ||
}); | ||
} else if(this._restoreStreamCallback) { | ||
@@ -65,63 +63,68 @@ this._restoreStreamCallback(); | ||
if(options.passthrough) { | ||
this.restore = new stream.Transform(options); | ||
this.restore._transform = function streamFilterRestoreTransform(chunk, encoding, done) { | ||
_this.restore.push(chunk, encoding); | ||
done(); | ||
this.restore = new stream.Duplex(options); | ||
this._restoreManager = createReadStreamBackpressureManager(this.restore); | ||
this.restore._write = function streamFilterRestoreWrite(chunk, encoding, done) { | ||
_this._restoreManager.programPush(chunk, encoding, done); | ||
}; | ||
this.restore._flush = function streamFilterRestoreFlush(done) { | ||
_this._restoreStreamCallback = done; | ||
this.restore.on('finish', function streamFilterRestoreFinish() { | ||
_this._restoreStreamCallback = function() { | ||
_this._restoreManager.programPush(null, {}.undef, function() {}); | ||
}; | ||
if(_this._filterStreamEnded) { | ||
done(); | ||
_this._restoreStreamCallback(); | ||
} | ||
}; | ||
}); | ||
} else { | ||
this.restore = new stream.Readable(options); | ||
this.restore.__waitPush = true; | ||
this.restore.__programmedPush = null; | ||
this._restoreManager = createReadStreamBackpressureManager(this.restore); | ||
} | ||
} | ||
} | ||
this.restore.__programPush = function streamFilterRestoreProgramPush(chunk, encoding, done) { | ||
if(_this.restore.__programmedPush) { | ||
_this.restore.emit('error', new Error('Not supposed to happen!')); | ||
} | ||
_this.restore.__programmedPush = [chunk, encoding, done]; | ||
// Need to be async to avoid nested push attempts | ||
setImmediate(_this.restore.__attemptPush.bind(_this.restore)); | ||
_this.restore.emit('readable'); | ||
_this.restore.emit('drain'); | ||
}; | ||
util.inherits(StreamFilter, stream.Transform); | ||
this.restore.__attemptPush = function streamFilterRestoreAttemptPush() { | ||
var cb = null; | ||
// Utils to manage readable stream backpressure | ||
function createReadStreamBackpressureManager(readableStream) { | ||
var manager = { | ||
waitPush: true, | ||
programmedPushs: [], | ||
programPush: function programPush(chunk, encoding, done) { | ||
// Store the current write | ||
manager.programmedPushs.push([chunk, encoding, done]); | ||
// Need to be async to avoid nested push attempts | ||
// Programm a push attempt | ||
setImmediate(manager.attemptPush); | ||
// Let's say we're ready for a read | ||
readableStream.emit('readable'); | ||
readableStream.emit('drain'); | ||
}, | ||
attemptPush: function attemptPush() { | ||
var nextPush; | ||
if(_this.restore.__waitPush) { | ||
if(_this.restore.__programmedPush) { | ||
cb = _this.restore.__programmedPush[2]; | ||
_this.restore.__waitPush = _this.restore.push( | ||
_this.restore.__programmedPush[0], | ||
_this.restore.__programmedPush[1] | ||
); | ||
_this.restore.__programmedPush = null; | ||
cb(); | ||
} | ||
} else { | ||
setImmediate(function() { | ||
// Need to be async to avoid nested push attempts | ||
_this.restore.emit('readable'); | ||
}); | ||
if(manager.waitPush) { | ||
if(manager.programmedPushs.length) { | ||
nextPush = manager.programmedPushs.shift(); | ||
manager.waitPush = readableStream.push(nextPush[0], nextPush[1]); | ||
(nextPush[2])(); | ||
} | ||
}; | ||
} else { | ||
setImmediate(function() { | ||
// Need to be async to avoid nested push attempts | ||
readableStream.emit('readable'); | ||
}); | ||
} | ||
}, | ||
}; | ||
this.restore._read = function streamFilterRestoreRead() { | ||
_this.restore.__waitPush = true; | ||
// Need to be async to avoid nested push attempts | ||
setImmediate(_this.restore.__attemptPush.bind(this)); | ||
}; | ||
} | ||
} | ||
// Patch the readable stream to manage reads | ||
readableStream._read = function streamFilterRestoreRead() { | ||
manager.waitPush = true; | ||
// Need to be async to avoid nested push attempts | ||
setImmediate(manager.attemptPush); | ||
}; | ||
return manager; | ||
} | ||
util.inherits(StreamFilter, stream.Transform); | ||
module.exports = StreamFilter; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
30612
516