
Security News
Meet Socket at Black Hat Europe and BSides London 2025
Socket is heading to London! Stop by our booth or schedule a meeting to see what we've been working on.
@e2fyi/streams
Advanced tools
Nodejs stream library for various use cases: e.g. Auto-tagging object streams, streaming to mongoDb via mongoose models, etc.
This NodeJS library provides custom NodeJS streams for specific use cases.
Currently, the following streams are available:
DocumentTagger: A Transform stream to tag an auto-increment field to each object in the stream. Can also mutate the stream objects through a function or default Object.
MongooseStream: A Transform stream which will bulk write the objects in the stream to mongodb via a mongoose model.
DocumentTagger, MongooseStream.Using CommonJS module
// importing DocumentTagger and MongooseStream
const {DocumentTagger, MongooseStream} = require('@e2fyi/streams');
The API documentation is also available at https://e2fyi.github.io/streams.
stream.Transform
stream.Transform
ObjectObjectstream.TransformTransform Object stream (objectMode=true) to tag with an autoIncrement id. An object or function can be optionally provided to mutate each object in the stream.
Kind: static class of @e2fyi/streams
Extends: stream.Transform
Create a new DocumentTagger stream.
| Param | Type | Description |
|---|---|---|
| opts | DocumentTaggerSettings | Settings for the stream. |
Example
const docTagger = new DocumentTagger({autoIncrement: 'id', mutate: { project: 'test' }});
someReadableStreamFromArray([{text: 'abc'}, {text: 'efg'}])
.pipe(docTagger)
.pipe(process.stdout);
// stdout >
// {"text": "abc", "id": 0, "project": "test"}
// {"text": "efg", "id": 1, "project": "test"}
stream.TransformA custom NodeJS Transform stream to mongo via mongoose.
Kind: static class of @e2fyi/streams
Extends: stream.Transform
Create a Transform stream which bulkWrite to mongo based on the itemWaterMark. model (mongoose Model) is a required field.
| Param | Type | Description |
|---|---|---|
| opts | MongooseStreamSettings | Configuration for MongoStream. Default value for itemWaterMark is 50. |
Example
var stream2mongo = new MongooseStream({mode: SomeMongooseModel});
someReadableStreamFromArray([{text: 'abc'}, {text: 'efg'}])
.pipe(stream2mongo) // writes to mongo (while stream are also passthrough)
.pipe(response); // stream same results back to some request
ObjectSettings for DocumentTagger.
Kind: static typedef of @e2fyi/streams
Properties
| Name | Type | Description |
|---|---|---|
| autoIncrement | String | The auto-increment field to tag onto the stream object. |
| mutate | function | Object | Function or Object to mutate the stream. If an Object is provided, each stream object will be mutated with the Object.assign(streamObj, mutateObj). |
ObjectSettings for MongooseStream.
Kind: static typedef of @e2fyi/streams
Properties
| Name | Type | Default | Description |
|---|---|---|---|
| itemWaterMark | Number | 50 | The number of item collected before writing to mongodb. |
| model | mongoose.Model | mongoose Model. |
Unit testing
npm test
Build documentation
npm run build
FAQs
Nodejs stream library for various use cases: e.g. Auto-tagging object streams, streaming to mongoDb via mongoose models, etc.
We found that @e2fyi/streams demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Socket is heading to London! Stop by our booth or schedule a meeting to see what we've been working on.

Security News
OWASP’s 2025 Top 10 introduces Software Supply Chain Failures as a new category, reflecting rising concern over dependency and build system risks.

Research
/Security News
Socket researchers discovered nine malicious NuGet packages that use time-delayed payloads to crash applications and corrupt industrial control systems.