
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
Easy stream transformation and composition for node.js that integrates seamlessly with Q promises
Streamee.js is a set of stream transformers and composers for node.js that integrates seamlessly with Q promises. It can be seen as a mix of async and underscore.js, but for streams!
One of the most useful feature of streams is back-pressure: if the bottom of the stream pipeline is slow (for example the Web client), then the top will automatically push slowly (for example your database and/or Web server). As a result memory and CPU consumption in node are optimal.
Streamee.js allows you to build very easily pipelines that compose and transform streams, so that you can keep back-pressure all along the way in a nice functional programming style. All transformation functions can return Q promises instead of direct values, which makes asynchronous operations less verbose and more functional (less callback hell!).
Example:
var ee = require('streamee');
var stream1 = // Readable stream, for example from a HTTP chunked response, a MongoDB response, ...
var promiseOfStream = // sometimes, because of callbacks, we can only get a Q.Promise[Readable] instead of a Readable
var stream2 = ee.flattenReadable(promiseOfStream) // well, now you can flatten it!
ee.pipeAndRun( // create a pipeline
ee.interleave([stream1, stream2]), // interleave the streams
ee.map(ee.obj, function(obj) { // 'ee.obj' means that we want to handle the chunk as a json object
obj.newField = 'something useful'
return obj; // return directly a value, so this is a sync map
}),
ee.collect(ee.obj, function(obj) { // collect is filter + map
if (obj.intField > 3 && obj.intField < 10) { // filter
return getPromiseOfData(obj); // async map by returning a Q promise
}),
destination // Writable stream, for example a HTTP chunked response towards a Web client or a Websocket connection
);
Inspired from Play Framework Enumeratee.
npm install streamee
Streamee.js uses node 1.0+ streams, so if you use an API that returns node 0.8 streams, you have to wrap them like this:
var stream = require('stream');
var newStream = new stream.Readable().wrap(oldStream);
If each chunk of your stream is a logical independent unit (for example a stream of json strings), you should create an 'objectMode' stream so that node's stream buffers does not automatically concatenate the chunks:
var objectModeStream = new stream.Readable({objectMode: true}).wrap(nonObjectModeStream);
For example, here is a function that returns a chunked http response as an objectMode stream:
var http = require('http');
var Q = require('q');
// GET a http chunked stream (for example a stream of json strings)
function GETstream(url) {
var deferred = Q.defer();
http.get(url, function(res) {
deferred.resolve(new stream.Readable({objectMode: true}).wrap(res));
});
return ee.flattenReadable(deferred.promise); // flatten a Promise[stream.Readable] to a stream.Readable
}
All transformers and composers returned by streamee.js are instances of node 1.0+ Stream.
All transformers (map, filter, collect...) take as first parameter the type in which you want to handle the chunk in the transformation function. ee.bin is buffer (binary data), ee.str is string and ee.obj is an object. If a chunk is not
convertible to the asked type, it will be dropped.
Also, all transformation functions can return either buffer or string or object, as well as Promise[string] or Promise[buffer] or Promise[object].
Default encoding for all transformers is utf8. If a source or a destination has a different encoding, you can use
ee.encode(fromEncoding, toEncoding) at the begin or the end of the stream pipeline.
Map each chunk.
Arguments
Example
var mapper = ee.map(ee.str, function(str) {
return str + ' is mapped to this message';
})
Example with a promise
var request = require('request');
// Helper function that does a http GET request and returns a promise of the response body
function GET(url) {
var deferred = Q.defer();
request(url, function(err, res, body) {
if (!err && res.statusCode == 200) deferred.resolve(body)
else deferred.reject(err);
});
return deferred.promise;
}
var mapper = ee.map(ee.obj, function(obj) {
return GET(obj.url).then(function(body) {
return obj + ' is mapped to ' + body;
});
})
Keep only the chunks that pass the truth test f.
Arguments
Example
var filter = ee.filter(ee.obj, function(obj) {
return obj.aField === 'someValue';
})
Collect is filter + map.
Arguments
Example
var collector = ee.collect(ee.str, function(str) {
if (str.length > 10) return 'We keep ' + str + ' and map it to this message';
})
Re-chunk a string stream according to separator.
Arguments
Example
var splitter = ee.splitStr('\n'); // one chunk is one line
Encode the chunks that were encoded in 'fromEncoding' to 'toEncoding'.
Example
var utf8stream = ee.encode('utf16le', 'utf8');
Interleave the readable streams passed in the array.
Example
var mixedStream = ee.interleave([stream1, stream2]);
Concatenate sequentially the readable streams passed in the array. The resulting stream ends when the last stream of arrayOfReadableStreams ends.
Example
var stream1AndThenStream2 = ee.concatenate([stream1, stream2]);
Flatten a Q.Promise[Readable] to a Readable stream.
Example
var aStream = ee.flattenReadable(promiseOfReadableStream);
Take the streams passed in parameter and sequentially pipe them. Equivalent to stream1.pipe(stream2).pipe(...) ...
Example
ee.pipeAndRun(
srcStream,
ee.map(ee.obj, function(obj) {
var mappedObj = // ...
return mappedObj;
}),
destinationStream
);
This software is licensed under the Apache 2 license, quoted below.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
FAQs
Easy stream transformation and composition for node.js that integrates seamlessly with Q promises
We found that streamee demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.