Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streamops

Package Overview
Dependencies
Maintainers
0
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamops - npm Package Compare versions

Comparing version 0.1.14 to 0.1.15

tests/streaming.operators.batch.test.js

2

package.json
{
"name": "streamops",
"version": "0.1.14",
"version": "0.1.15",
"main": "./src/main.js",

@@ -5,0 +5,0 @@ "scripts": {

class Dam {}
module.exports.Dam = Dam;

@@ -36,5 +35,7 @@ const operators = module.exports.operators = {

return function* (input) {
if (this.count === undefined) this.count = 0;
if (this.count < n) {
this.count++;
if (this.takeCount === undefined) {
this.takeCount = 0;
}
if (this.takeCount < n) {
this.takeCount++;
yield input;

@@ -55,29 +56,20 @@ }

batch: function(size) {
batch: function(size, {yieldIncomplete = true} = {}) {
return function* (input) {
this.buffer = this.buffer || [];
this.buffer.push(input);
if (this.buffer.length === size) {
yield this.buffer;
this.buffer = [];
if (!this.batchBuffer) {
this.batchBuffer = [];
}
};
},
debounce: function(ms) {
return function* (input) {
const now = Date.now();
if (!this.lastYield || (now - this.lastYield) >= ms) {
this.lastYield = now;
yield input;
}
};
},
if (input !== undefined) {
this.batchBuffer.push(input);
throttle: function(ms) {
return function* (input) {
const now = Date.now();
if (!this.lastYield || (now - this.lastYield) >= ms) {
this.lastYield = now;
yield input;
if (this.batchBuffer.length >= size) {
const batch = this.batchBuffer.slice(0, size);
this.batchBuffer = this.batchBuffer.slice(size);
yield batch;
}
} else if (yieldIncomplete && this.batchBuffer.length > 0) {
// Yield remaining items when stream ends
yield this.batchBuffer;
this.batchBuffer = [];
}

@@ -123,2 +115,3 @@ };

mergeAggregate: function(options = {}) {

@@ -167,7 +160,7 @@ const {

}
return function* (input) {
this.buffer = this.buffer || [];
this.buffer.push(input);
const isReady = () => {

@@ -177,3 +170,3 @@ if (typeof conditions === 'function') {

}
if (Array.isArray(conditions)) {

@@ -188,6 +181,6 @@ return conditions.every(field => this.buffer.some(item => field in item));

}
return false;
};
if (isReady()) {

@@ -245,2 +238,2 @@ const result = this.buffer;

dam: () => new Dam
};
};

@@ -54,12 +54,2 @@ const {operators} = require('./operators');

debounce(ms) {
this.pipeline.push(operators.debounce(ms));
return this;
}
throttle(ms) {
this.pipeline.push(operators.throttle(ms));
return this;
}
mergeAggregate(ms) {

@@ -66,0 +56,0 @@ this.pipeline.push(operators.mergeAggregate(ms));

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc