@tanbo/stream
Advanced tools
Comparing version 1.2.6 to 1.2.7
@@ -9,2 +9,3 @@ import { Observable } from './observable'; | ||
complete(): void; | ||
private clean; | ||
} |
@@ -155,2 +155,5 @@ class Subscriber { | ||
} | ||
if (observer instanceof Subscriber) { | ||
return observer; | ||
} | ||
return new Subscriber(observer); | ||
@@ -193,2 +196,5 @@ } | ||
this.subscribers.push(subscriber); | ||
return () => { | ||
this.clean(subscriber); | ||
}; | ||
}); | ||
@@ -205,2 +211,5 @@ Object.defineProperty(this, "subscribers", { | ||
this.subscribe(subscriber); | ||
return () => { | ||
this.clean(subscriber); | ||
}; | ||
}); | ||
@@ -227,2 +236,3 @@ } | ||
}); | ||
this.subscribers = []; | ||
} | ||
@@ -233,3 +243,10 @@ complete() { | ||
}); | ||
this.subscribers = []; | ||
} | ||
clean(subscriber) { | ||
const index = this.subscribers.indexOf(subscriber); | ||
if (index > -1) { | ||
this.subscribers.splice(index, 1); | ||
} | ||
} | ||
} | ||
@@ -815,2 +832,55 @@ | ||
/** | ||
* 让所有数据流按固定间隔时间向后推送 | ||
* @param time 数据间隔时间 | ||
*/ | ||
function intervalTime(time) { | ||
return function (source) { | ||
return new Observable(subscriber => { | ||
const buffer = []; | ||
let canPublish = true; | ||
let isCompleted = false; | ||
let timer = null; | ||
const next = () => { | ||
if (buffer.length > 0) { | ||
subscriber.next(buffer.shift()); | ||
timer = setTimeout(next, time); | ||
} | ||
else { | ||
canPublish = true; | ||
if (isCompleted) { | ||
subscriber.complete(); | ||
} | ||
} | ||
}; | ||
const obs = source.subscribe({ | ||
next(value) { | ||
if (!canPublish) { | ||
buffer.push(value); | ||
return; | ||
} | ||
subscriber.next(value); | ||
canPublish = false; | ||
timer = setTimeout(next, time); | ||
}, | ||
error(error) { | ||
clearTimeout(timer); | ||
subscriber.error(error); | ||
}, | ||
complete() { | ||
if (buffer.length) { | ||
isCompleted = true; | ||
return; | ||
} | ||
clearTimeout(timer); | ||
subscriber.complete(); | ||
} | ||
}); | ||
return () => { | ||
obs.unsubscribe(); | ||
}; | ||
}); | ||
}; | ||
} | ||
/** | ||
* 将源数据转换成另外一种数据,再发送出去 | ||
@@ -1066,2 +1136,64 @@ * @param handle 转换函数 | ||
/** | ||
* 当返回的 Observable 完成后,再切换下一个数据,直接所有数据流完成 | ||
* @param callback | ||
*/ | ||
function switchWhen(callback) { | ||
return function (source) { | ||
const cache = []; | ||
let next = null; | ||
let canToNext = true; | ||
let isCompleted = false; | ||
return new Observable(subscriber => { | ||
function toNext() { | ||
if (!canToNext) { | ||
return; | ||
} | ||
const value = cache.shift(); | ||
if (!value) { | ||
if (isCompleted) { | ||
subscriber.complete(); | ||
} | ||
return; | ||
} | ||
canToNext = false; | ||
next = callback(value).subscribe({ | ||
next(nextValue) { | ||
subscriber.next(nextValue); | ||
if (canToNext) { | ||
next = null; | ||
toNext(); | ||
} | ||
}, | ||
error(e) { | ||
subscriber.error(e); | ||
}, | ||
complete() { | ||
canToNext = true; | ||
next = null; | ||
toNext(); | ||
} | ||
}); | ||
} | ||
const subscription = source.subscribe({ | ||
next(value) { | ||
cache.push(value); | ||
toNext(); | ||
}, | ||
error(e) { | ||
subscriber.error(e); | ||
}, | ||
complete() { | ||
isCompleted = true; | ||
toNext(); | ||
} | ||
}); | ||
return () => { | ||
subscription.unsubscribe(); | ||
next === null || next === void 0 ? void 0 : next.unsubscribe(); | ||
}; | ||
}); | ||
}; | ||
} | ||
/** | ||
* 指定源数据流最多发送几次 | ||
@@ -1164,2 +1296,2 @@ * @param count | ||
export { BehaviorSubject, Observable, Subject, Subscriber, Subscription, auditTime, bufferTime, concat, debounceTime, delay, delayWhen, distinctUntilChanged, empty, every, filter, fromEvent, fromPromise, interval, map, merge, microTask, noop, of, race, retry, sampleTime, share, skip, switchMap, take, tap, throttleTime, throwError, timeout, zip }; | ||
export { BehaviorSubject, Observable, Subject, Subscriber, Subscription, auditTime, bufferTime, concat, debounceTime, delay, delayWhen, distinctUntilChanged, empty, every, filter, fromEvent, fromPromise, interval, intervalTime, map, merge, microTask, noop, of, race, retry, sampleTime, share, skip, switchMap, switchWhen, take, tap, throttleTime, throwError, timeout, zip }; |
@@ -157,2 +157,5 @@ 'use strict'; | ||
} | ||
if (observer instanceof Subscriber) { | ||
return observer; | ||
} | ||
return new Subscriber(observer); | ||
@@ -195,2 +198,5 @@ } | ||
this.subscribers.push(subscriber); | ||
return () => { | ||
this.clean(subscriber); | ||
}; | ||
}); | ||
@@ -207,2 +213,5 @@ Object.defineProperty(this, "subscribers", { | ||
this.subscribe(subscriber); | ||
return () => { | ||
this.clean(subscriber); | ||
}; | ||
}); | ||
@@ -229,2 +238,3 @@ } | ||
}); | ||
this.subscribers = []; | ||
} | ||
@@ -235,3 +245,10 @@ complete() { | ||
}); | ||
this.subscribers = []; | ||
} | ||
clean(subscriber) { | ||
const index = this.subscribers.indexOf(subscriber); | ||
if (index > -1) { | ||
this.subscribers.splice(index, 1); | ||
} | ||
} | ||
} | ||
@@ -817,2 +834,55 @@ | ||
/** | ||
* 让所有数据流按固定间隔时间向后推送 | ||
* @param time 数据间隔时间 | ||
*/ | ||
function intervalTime(time) { | ||
return function (source) { | ||
return new Observable(subscriber => { | ||
const buffer = []; | ||
let canPublish = true; | ||
let isCompleted = false; | ||
let timer = null; | ||
const next = () => { | ||
if (buffer.length > 0) { | ||
subscriber.next(buffer.shift()); | ||
timer = setTimeout(next, time); | ||
} | ||
else { | ||
canPublish = true; | ||
if (isCompleted) { | ||
subscriber.complete(); | ||
} | ||
} | ||
}; | ||
const obs = source.subscribe({ | ||
next(value) { | ||
if (!canPublish) { | ||
buffer.push(value); | ||
return; | ||
} | ||
subscriber.next(value); | ||
canPublish = false; | ||
timer = setTimeout(next, time); | ||
}, | ||
error(error) { | ||
clearTimeout(timer); | ||
subscriber.error(error); | ||
}, | ||
complete() { | ||
if (buffer.length) { | ||
isCompleted = true; | ||
return; | ||
} | ||
clearTimeout(timer); | ||
subscriber.complete(); | ||
} | ||
}); | ||
return () => { | ||
obs.unsubscribe(); | ||
}; | ||
}); | ||
}; | ||
} | ||
/** | ||
* 将源数据转换成另外一种数据,再发送出去 | ||
@@ -1068,2 +1138,64 @@ * @param handle 转换函数 | ||
/** | ||
* 当返回的 Observable 完成后,再切换下一个数据,直接所有数据流完成 | ||
* @param callback | ||
*/ | ||
function switchWhen(callback) { | ||
return function (source) { | ||
const cache = []; | ||
let next = null; | ||
let canToNext = true; | ||
let isCompleted = false; | ||
return new Observable(subscriber => { | ||
function toNext() { | ||
if (!canToNext) { | ||
return; | ||
} | ||
const value = cache.shift(); | ||
if (!value) { | ||
if (isCompleted) { | ||
subscriber.complete(); | ||
} | ||
return; | ||
} | ||
canToNext = false; | ||
next = callback(value).subscribe({ | ||
next(nextValue) { | ||
subscriber.next(nextValue); | ||
if (canToNext) { | ||
next = null; | ||
toNext(); | ||
} | ||
}, | ||
error(e) { | ||
subscriber.error(e); | ||
}, | ||
complete() { | ||
canToNext = true; | ||
next = null; | ||
toNext(); | ||
} | ||
}); | ||
} | ||
const subscription = source.subscribe({ | ||
next(value) { | ||
cache.push(value); | ||
toNext(); | ||
}, | ||
error(e) { | ||
subscriber.error(e); | ||
}, | ||
complete() { | ||
isCompleted = true; | ||
toNext(); | ||
} | ||
}); | ||
return () => { | ||
subscription.unsubscribe(); | ||
next === null || next === void 0 ? void 0 : next.unsubscribe(); | ||
}; | ||
}); | ||
}; | ||
} | ||
/** | ||
* 指定源数据流最多发送几次 | ||
@@ -1184,2 +1316,3 @@ * @param count | ||
exports.interval = interval; | ||
exports.intervalTime = intervalTime; | ||
exports.map = map; | ||
@@ -1196,2 +1329,3 @@ exports.merge = merge; | ||
exports.switchMap = switchMap; | ||
exports.switchWhen = switchWhen; | ||
exports.take = take; | ||
@@ -1198,0 +1332,0 @@ exports.tap = tap; |
@@ -10,2 +10,3 @@ export * from './audit-time'; | ||
export * from './filter'; | ||
export * from './interval-time'; | ||
export * from './map'; | ||
@@ -18,4 +19,5 @@ export * from './micro-task'; | ||
export * from './switch-map'; | ||
export * from './switch-when'; | ||
export * from './take'; | ||
export * from './tap'; | ||
export * from './throttle-time'; |
{ | ||
"name": "@tanbo/stream", | ||
"version": "1.2.6", | ||
"version": "1.2.7", | ||
"description": "A data stream lib", | ||
@@ -5,0 +5,0 @@ "main": "./bundles/index.js", |
104757
45
2896