New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@tanbo/stream

Package Overview
Dependencies
Maintainers
0
Versions
36
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@tanbo/stream - npm Package Compare versions

Comparing version 1.2.6 to 1.2.7

bundles/operators/interval-time.d.ts

1

bundles/core/subject.d.ts

@@ -9,2 +9,3 @@ import { Observable } from './observable';

complete(): void;
private clean;
}

@@ -155,2 +155,5 @@ class Subscriber {

}
if (observer instanceof Subscriber) {
return observer;
}
return new Subscriber(observer);

@@ -193,2 +196,5 @@ }

this.subscribers.push(subscriber);
return () => {
this.clean(subscriber);
};
});

@@ -205,2 +211,5 @@ Object.defineProperty(this, "subscribers", {

this.subscribe(subscriber);
return () => {
this.clean(subscriber);
};
});

@@ -227,2 +236,3 @@ }

});
this.subscribers = [];
}

@@ -233,3 +243,10 @@ complete() {

});
this.subscribers = [];
}
clean(subscriber) {
const index = this.subscribers.indexOf(subscriber);
if (index > -1) {
this.subscribers.splice(index, 1);
}
}
}

@@ -815,2 +832,55 @@

/**
* 让所有数据流按固定间隔时间向后推送
* @param time 数据间隔时间
*/
function intervalTime(time) {
return function (source) {
return new Observable(subscriber => {
const buffer = [];
let canPublish = true;
let isCompleted = false;
let timer = null;
const next = () => {
if (buffer.length > 0) {
subscriber.next(buffer.shift());
timer = setTimeout(next, time);
}
else {
canPublish = true;
if (isCompleted) {
subscriber.complete();
}
}
};
const obs = source.subscribe({
next(value) {
if (!canPublish) {
buffer.push(value);
return;
}
subscriber.next(value);
canPublish = false;
timer = setTimeout(next, time);
},
error(error) {
clearTimeout(timer);
subscriber.error(error);
},
complete() {
if (buffer.length) {
isCompleted = true;
return;
}
clearTimeout(timer);
subscriber.complete();
}
});
return () => {
obs.unsubscribe();
};
});
};
}
/**
* 将源数据转换成另外一种数据,再发送出去

@@ -1066,2 +1136,64 @@ * @param handle 转换函数

/**
* 当返回的 Observable 完成后,再切换下一个数据,直接所有数据流完成
* @param callback
*/
function switchWhen(callback) {
return function (source) {
const cache = [];
let next = null;
let canToNext = true;
let isCompleted = false;
return new Observable(subscriber => {
function toNext() {
if (!canToNext) {
return;
}
const value = cache.shift();
if (!value) {
if (isCompleted) {
subscriber.complete();
}
return;
}
canToNext = false;
next = callback(value).subscribe({
next(nextValue) {
subscriber.next(nextValue);
if (canToNext) {
next = null;
toNext();
}
},
error(e) {
subscriber.error(e);
},
complete() {
canToNext = true;
next = null;
toNext();
}
});
}
const subscription = source.subscribe({
next(value) {
cache.push(value);
toNext();
},
error(e) {
subscriber.error(e);
},
complete() {
isCompleted = true;
toNext();
}
});
return () => {
subscription.unsubscribe();
next === null || next === void 0 ? void 0 : next.unsubscribe();
};
});
};
}
/**
* 指定源数据流最多发送几次

@@ -1164,2 +1296,2 @@ * @param count

export { BehaviorSubject, Observable, Subject, Subscriber, Subscription, auditTime, bufferTime, concat, debounceTime, delay, delayWhen, distinctUntilChanged, empty, every, filter, fromEvent, fromPromise, interval, map, merge, microTask, noop, of, race, retry, sampleTime, share, skip, switchMap, take, tap, throttleTime, throwError, timeout, zip };
export { BehaviorSubject, Observable, Subject, Subscriber, Subscription, auditTime, bufferTime, concat, debounceTime, delay, delayWhen, distinctUntilChanged, empty, every, filter, fromEvent, fromPromise, interval, intervalTime, map, merge, microTask, noop, of, race, retry, sampleTime, share, skip, switchMap, switchWhen, take, tap, throttleTime, throwError, timeout, zip };

@@ -157,2 +157,5 @@ 'use strict';

}
if (observer instanceof Subscriber) {
return observer;
}
return new Subscriber(observer);

@@ -195,2 +198,5 @@ }

this.subscribers.push(subscriber);
return () => {
this.clean(subscriber);
};
});

@@ -207,2 +213,5 @@ Object.defineProperty(this, "subscribers", {

this.subscribe(subscriber);
return () => {
this.clean(subscriber);
};
});

@@ -229,2 +238,3 @@ }

});
this.subscribers = [];
}

@@ -235,3 +245,10 @@ complete() {

});
this.subscribers = [];
}
clean(subscriber) {
const index = this.subscribers.indexOf(subscriber);
if (index > -1) {
this.subscribers.splice(index, 1);
}
}
}

@@ -817,2 +834,55 @@

/**
* 让所有数据流按固定间隔时间向后推送
* @param time 数据间隔时间
*/
function intervalTime(time) {
return function (source) {
return new Observable(subscriber => {
const buffer = [];
let canPublish = true;
let isCompleted = false;
let timer = null;
const next = () => {
if (buffer.length > 0) {
subscriber.next(buffer.shift());
timer = setTimeout(next, time);
}
else {
canPublish = true;
if (isCompleted) {
subscriber.complete();
}
}
};
const obs = source.subscribe({
next(value) {
if (!canPublish) {
buffer.push(value);
return;
}
subscriber.next(value);
canPublish = false;
timer = setTimeout(next, time);
},
error(error) {
clearTimeout(timer);
subscriber.error(error);
},
complete() {
if (buffer.length) {
isCompleted = true;
return;
}
clearTimeout(timer);
subscriber.complete();
}
});
return () => {
obs.unsubscribe();
};
});
};
}
/**
* 将源数据转换成另外一种数据,再发送出去

@@ -1068,2 +1138,64 @@ * @param handle 转换函数

/**
* 当返回的 Observable 完成后,再切换下一个数据,直接所有数据流完成
* @param callback
*/
function switchWhen(callback) {
return function (source) {
const cache = [];
let next = null;
let canToNext = true;
let isCompleted = false;
return new Observable(subscriber => {
function toNext() {
if (!canToNext) {
return;
}
const value = cache.shift();
if (!value) {
if (isCompleted) {
subscriber.complete();
}
return;
}
canToNext = false;
next = callback(value).subscribe({
next(nextValue) {
subscriber.next(nextValue);
if (canToNext) {
next = null;
toNext();
}
},
error(e) {
subscriber.error(e);
},
complete() {
canToNext = true;
next = null;
toNext();
}
});
}
const subscription = source.subscribe({
next(value) {
cache.push(value);
toNext();
},
error(e) {
subscriber.error(e);
},
complete() {
isCompleted = true;
toNext();
}
});
return () => {
subscription.unsubscribe();
next === null || next === void 0 ? void 0 : next.unsubscribe();
};
});
};
}
/**
* 指定源数据流最多发送几次

@@ -1184,2 +1316,3 @@ * @param count

exports.interval = interval;
exports.intervalTime = intervalTime;
exports.map = map;

@@ -1196,2 +1329,3 @@ exports.merge = merge;

exports.switchMap = switchMap;
exports.switchWhen = switchWhen;
exports.take = take;

@@ -1198,0 +1332,0 @@ exports.tap = tap;

@@ -10,2 +10,3 @@ export * from './audit-time';

export * from './filter';
export * from './interval-time';
export * from './map';

@@ -18,4 +19,5 @@ export * from './micro-task';

export * from './switch-map';
export * from './switch-when';
export * from './take';
export * from './tap';
export * from './throttle-time';

2

package.json
{
"name": "@tanbo/stream",
"version": "1.2.6",
"version": "1.2.7",
"description": "A data stream lib",

@@ -5,0 +5,0 @@ "main": "./bundles/index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc