Comparing version 0.1.14 to 0.1.15
{ | ||
"name": "streamops", | ||
"version": "0.1.14", | ||
"version": "0.1.15", | ||
"main": "./src/main.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
class Dam {} | ||
module.exports.Dam = Dam; | ||
@@ -36,5 +35,7 @@ const operators = module.exports.operators = { | ||
return function* (input) { | ||
if (this.count === undefined) this.count = 0; | ||
if (this.count < n) { | ||
this.count++; | ||
if (this.takeCount === undefined) { | ||
this.takeCount = 0; | ||
} | ||
if (this.takeCount < n) { | ||
this.takeCount++; | ||
yield input; | ||
@@ -55,29 +56,20 @@ } | ||
batch: function(size) { | ||
batch: function(size, {yieldIncomplete = true} = {}) { | ||
return function* (input) { | ||
this.buffer = this.buffer || []; | ||
this.buffer.push(input); | ||
if (this.buffer.length === size) { | ||
yield this.buffer; | ||
this.buffer = []; | ||
if (!this.batchBuffer) { | ||
this.batchBuffer = []; | ||
} | ||
}; | ||
}, | ||
debounce: function(ms) { | ||
return function* (input) { | ||
const now = Date.now(); | ||
if (!this.lastYield || (now - this.lastYield) >= ms) { | ||
this.lastYield = now; | ||
yield input; | ||
} | ||
}; | ||
}, | ||
if (input !== undefined) { | ||
this.batchBuffer.push(input); | ||
throttle: function(ms) { | ||
return function* (input) { | ||
const now = Date.now(); | ||
if (!this.lastYield || (now - this.lastYield) >= ms) { | ||
this.lastYield = now; | ||
yield input; | ||
if (this.batchBuffer.length >= size) { | ||
const batch = this.batchBuffer.slice(0, size); | ||
this.batchBuffer = this.batchBuffer.slice(size); | ||
yield batch; | ||
} | ||
} else if (yieldIncomplete && this.batchBuffer.length > 0) { | ||
// Yield remaining items when stream ends | ||
yield this.batchBuffer; | ||
this.batchBuffer = []; | ||
} | ||
@@ -123,2 +115,3 @@ }; | ||
mergeAggregate: function(options = {}) { | ||
@@ -167,7 +160,7 @@ const { | ||
} | ||
return function* (input) { | ||
this.buffer = this.buffer || []; | ||
this.buffer.push(input); | ||
const isReady = () => { | ||
@@ -177,3 +170,3 @@ if (typeof conditions === 'function') { | ||
} | ||
if (Array.isArray(conditions)) { | ||
@@ -188,6 +181,6 @@ return conditions.every(field => this.buffer.some(item => field in item)); | ||
} | ||
return false; | ||
}; | ||
if (isReady()) { | ||
@@ -245,2 +238,2 @@ const result = this.buffer; | ||
dam: () => new Dam | ||
}; | ||
}; |
@@ -54,12 +54,2 @@ const {operators} = require('./operators'); | ||
debounce(ms) { | ||
this.pipeline.push(operators.debounce(ms)); | ||
return this; | ||
} | ||
throttle(ms) { | ||
this.pipeline.push(operators.throttle(ms)); | ||
return this; | ||
} | ||
mergeAggregate(ms) { | ||
@@ -66,0 +56,0 @@ this.pipeline.push(operators.mergeAggregate(ms)); |
106480
28
3549