Comparing version 0.1.11 to 0.1.12
{ | ||
"name": "streamops", | ||
"version": "0.1.11", | ||
"version": "0.1.12", | ||
"main": "./src/main.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -1,2 +0,2 @@ | ||
module.exports = { | ||
const operators =module.exports = { | ||
@@ -156,2 +156,79 @@ map: function(fn) { | ||
}, | ||
waitUntil: function(conditions) { | ||
if (typeof conditions !== 'function' && !Array.isArray(conditions) && (typeof conditions !== 'object' || conditions === null)) { | ||
throw new Error('Invalid condition type'); | ||
} | ||
return function* (input) { | ||
this.buffer = this.buffer || []; | ||
this.buffer.push(input); | ||
const isReady = () => { | ||
if (typeof conditions === 'function') { | ||
return conditions(this.buffer); | ||
} | ||
if (Array.isArray(conditions)) { | ||
return conditions.every(field => this.buffer.some(item => field in item)); | ||
} | ||
if (typeof conditions === 'object') { | ||
return Object.entries(conditions).every(([key, value]) => | ||
this.buffer.some(item => item[key] === value) | ||
); | ||
} | ||
return false; | ||
}; | ||
if (isReady()) { | ||
const result = this.buffer; | ||
this.buffer = []; | ||
yield result; | ||
} | ||
}; | ||
}, | ||
bufferBetween: function(startToken, endToken, mapFn = null) { | ||
return function* (input) { | ||
this.buffer = this.buffer || ''; | ||
this.buffering = this.buffering || false; | ||
let currentChunk = this.buffer + input; | ||
let startIndex, endIndex; | ||
while (currentChunk.length > 0) { | ||
if (!this.buffering) { | ||
startIndex = currentChunk.indexOf(startToken); | ||
if (startIndex !== -1) { | ||
if (startIndex > 0) { | ||
yield currentChunk.slice(0, startIndex); | ||
} | ||
this.buffering = true; | ||
currentChunk = currentChunk.slice(startIndex); | ||
} else { | ||
yield currentChunk; | ||
currentChunk = ''; | ||
} | ||
} else { | ||
endIndex = currentChunk.indexOf(endToken, startToken.length); | ||
if (endIndex !== -1) { | ||
let content = currentChunk.slice(0, endIndex + endToken.length); | ||
if (mapFn) { | ||
yield mapFn(content); | ||
} else { | ||
yield content; | ||
} | ||
this.buffering = false; | ||
currentChunk = currentChunk.slice(endIndex + endToken.length); | ||
} else { | ||
break; | ||
} | ||
} | ||
} | ||
this.buffer = currentChunk; | ||
}; | ||
} | ||
}; |
@@ -14,2 +14,12 @@ const operators = require('./operators'); | ||
bufferBetween(startToken, endToken) { | ||
this.pipeline.push(operators.bufferBetween(startToken, endToken)); | ||
return this; | ||
} | ||
waitUntil(condition) { | ||
this.pipeline.push(operators.waitUntil(condition)); | ||
return this; | ||
} | ||
filter(predicate) { | ||
@@ -16,0 +26,0 @@ this.pipeline.push(operators.filter(predicate)); |
95008
24
3142