Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

streamops

Package Overview
Dependencies
Maintainers
0
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

streamops - npm Package Compare versions

Comparing version 0.1.11 to 0.1.12

tests/streaming.operators.bufferBetween.test.js

2

package.json
{
"name": "streamops",
"version": "0.1.11",
"version": "0.1.12",
"main": "./src/main.js",

@@ -5,0 +5,0 @@ "scripts": {

@@ -1,2 +0,2 @@

module.exports = {
const operators =module.exports = {

@@ -156,2 +156,79 @@ map: function(fn) {

},
waitUntil: function(conditions) {
if (typeof conditions !== 'function' && !Array.isArray(conditions) && (typeof conditions !== 'object' || conditions === null)) {
throw new Error('Invalid condition type');
}
return function* (input) {
this.buffer = this.buffer || [];
this.buffer.push(input);
const isReady = () => {
if (typeof conditions === 'function') {
return conditions(this.buffer);
}
if (Array.isArray(conditions)) {
return conditions.every(field => this.buffer.some(item => field in item));
}
if (typeof conditions === 'object') {
return Object.entries(conditions).every(([key, value]) =>
this.buffer.some(item => item[key] === value)
);
}
return false;
};
if (isReady()) {
const result = this.buffer;
this.buffer = [];
yield result;
}
};
},
bufferBetween: function(startToken, endToken, mapFn = null) {
return function* (input) {
this.buffer = this.buffer || '';
this.buffering = this.buffering || false;
let currentChunk = this.buffer + input;
let startIndex, endIndex;
while (currentChunk.length > 0) {
if (!this.buffering) {
startIndex = currentChunk.indexOf(startToken);
if (startIndex !== -1) {
if (startIndex > 0) {
yield currentChunk.slice(0, startIndex);
}
this.buffering = true;
currentChunk = currentChunk.slice(startIndex);
} else {
yield currentChunk;
currentChunk = '';
}
} else {
endIndex = currentChunk.indexOf(endToken, startToken.length);
if (endIndex !== -1) {
let content = currentChunk.slice(0, endIndex + endToken.length);
if (mapFn) {
yield mapFn(content);
} else {
yield content;
}
this.buffering = false;
currentChunk = currentChunk.slice(endIndex + endToken.length);
} else {
break;
}
}
}
this.buffer = currentChunk;
};
}
};

@@ -14,2 +14,12 @@ const operators = require('./operators');

bufferBetween(startToken, endToken) {
this.pipeline.push(operators.bufferBetween(startToken, endToken));
return this;
}
waitUntil(condition) {
this.pipeline.push(operators.waitUntil(condition));
return this;
}
filter(predicate) {

@@ -16,0 +26,0 @@ this.pipeline.push(operators.filter(predicate));

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc