Security News
PyPI’s New Archival Feature Closes a Major Security Gap
PyPI now allows maintainers to archive projects, improving security and helping users make informed decisions about their dependencies.
@tanbo/stream
Advanced tools
##安装
npm install @tanbo/stream
最基础的数据流类,每一次订阅产生一个新的数据流。
import { Stream } from '@tanbo/stream';
const stream = new Stream(observer => {
observer.next(1);
observer.next(2);
})
stream.subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 2
基础广播类,所有订阅者共用同一个数据流,且只会拿到订阅后广播的数据。
import { Subject } from '@tanbo/stream';
const subject = new Subject();
subject.next(1);
subject.subscribe(value => {
console.log(value);
})
subject.next(2);
// 输出:
// 2
有默认值的广播类,所有订阅者共用同一个数据流,且所有订阅者在订阅时会同步拿到数据流中的最后一次数据,如果还没有广播,则同步拿到默认数据。
import { BehaviorSubject } from '@tanbo/stream';
const behaviorSubject = new BehaviorSubject(1);
behaviorSubject.subscribe(value => {
console.log(value);
})
// 输出:
// 1
behaviorSubject.next(2);
// 输出:
// 2
Stream
、Subject
、BehaviorSubject
类都可以通过同样的方法取消订阅。以 Stream 为例:
const stream = new Stream(observer => {
setTimeout(() => {
observer.next(1);
}, 1000)
})
const subscription = stream.subscribe(value => {
console.log(value);
})
// 取消订阅
subscription.unsubscribe();
// 前面的 console.log 不会执行,因为在还没有发送数据时,已取消了订阅
所有的数据流发射器都返回一个 Stream 实例。
把 DOM 事件转换成数据流。
fromEvent(document.getElementById('button'), 'click').subscribe(event => {
console.log(event);
})
const promise = new Promise(resolve => {
setTimeout(() => {
resolve(1)
}, 1000)
})
fromPromise(promise).subscribe(value => {
// 在 1 秒后,会收到由 Promise 发来的值
console.log(value)
})
按固定间隔时间发送值,默认间隔 1 秒,从 0 开始。
interval().subscribe(value => {
console.log(value);
})
// 输出:
// 0
// 1
// 2
// 3
// ...
同时订阅多个数据流,当任意一个数据流有新值时,立即将该值发送出去。
merge(interval(), interval()).subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 1
// 2
// 2
// 3
// 3
// 4
// ...
将既定的值按顺序同步发送。
of(1, 2, 3).subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 2
// 3
同时订阅多个数据流,当任意一个数据流有新值时,立即将该值发送出去,同时忽略其它的数据流,当所有的数据流均有接收到新值时,再发送下一个最新的数据,一直往复。
可以理解为,争先原则,即第一个通过,其它均不通过,直至上一个周期结束。
race(interval(1000), of('a')).subscribe(value => {
console.log(value)
})
// 输出:
// 'a'
延迟一段时间发送值。默认延迟一秒。
timeout().subscribe(() => {
console.log('1 秒后打印此消息');
})
同时订阅多个数据流,当所有数据流都有值时,将接收到最新的值按参数顺序组成数据向后发送。可以理解为 Promise.all
。
zip(of(1), of(2), timeout(1000, 'timeout')).subscribe(value => {
console.log(value);
})
// 输出:
// [1, 2, 'timeout']
操作符是对既有数据流作进一步有流程控制、数据转换或添加副作用。
操作符均通过 pipe
方法添加。pipe
方法既可以传入多个操作符,也可以链式调用。以下两种方式是等价的:
// 链式调用
interval()
.pipe(take(4))
.pipe(delay(2000))
.subscribe(value => {
console.log(value)
})
// 多参数调用
interval().pipe(
take(4),
delay(2000)
).subscribe(value => {
console.log(value)
})
忽略源值,并延迟一段时间,发送最新的值。
interval(1000).pipe(auditTime(2000)).subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 3
// 5
// 7
// ...
按顺序依次发出数据流本身和传入源的值,需要注意的事,只有前一个数据流完成时,才会监听并发送后一个数据流的值。
timeout(1000, 1).pipe(
concat(
of('a', 'b'),
of('A', 'B')
)
).subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 'a'
// 'b'
// 'A'
// 'B'
在一段时间内,没有新值时,才发送最新的值。
interval(1000).pipe(debounceTime(2000)).subscribe(value => {
// 永远也不会输出值,因为每一次新值的间隔都小于 2 秒
console.log(value);
})
将数据流延迟一段时间发送。
of('delay').pipe(delay(1000)).subscribe(value => {
console.log(value)
})
// 1 秒后输出:'dekay'
过滤连续重复的值。
of(1, 3, 3, 3, 5, 6, 6).pipe(distinctUntilChanged()).subscribe(value => {
console.log(value)
})
// 输出:
// 1
// 3
// 5
// 6
过滤源数据流,只发送返回为 true 时的数据。
of(1, 3, 3, 3, 5, 6, 6).pipe(filter(value => {
return value > 3;
})).subscribe(value => {
console.log(value)
})
// 输出:
// 5
// 6
// 6
将源数据转换成另外一种数据。
of('张三').pipe(map(value => {
return {
name: value
}
})).subscribe(value => {
console.log(value);
})
// 输出: {name: '张三'}
跳过指定次数的数据,然后发送后面的值。
of('A', 'B', 'C', 'D').pipe(skip(2)).subscribe(value => {
console.log(value);
})
// 输出:
// 'C'
// 'D'
返回一个新的数据流,并以新数据流的订阅结果,发送出去。
of(1).pipe(switchMap(value => {
return new Stream(observer => {
observer.next(value + 1)
})
})).subscribe(value => {
console.log(value)
})
// 输出:2
指定源数据流最多发送几次。
of('a', 'b', 'c', 'd').pipe(take(2)).subscribe(value => {
console.log(value);
})
// 输出:
// 'c'
// 'd'
在数据流中添加副作用。
of(1, 2).pipe(tap(() => {
console.log('副作用');
})).subscribe(value => {
console.log(value);
})
// 输出:
// '副作用'
// '副作用'
// 1
// 2
发出最先到达的值,并忽略一段时间内的新值,然后再发送时间到达之后最新到达的值。
interval(1000).pipe(throttleTime(2000)).subscribe(value => {
console.log(value);
})
// 输出:
// 0
// 2
// 4
// 6
// ...
FAQs
A data stream lib
The npm package @tanbo/stream receives a total of 94 weekly downloads. As such, @tanbo/stream popularity was classified as not popular.
We found that @tanbo/stream demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
PyPI now allows maintainers to archive projects, improving security and helping users make informed decisions about their dependencies.
Research
Security News
Malicious npm package postcss-optimizer delivers BeaverTail malware, targeting developer systems; similarities to past campaigns suggest a North Korean connection.
Security News
CISA's KEV data is now on GitHub, offering easier access, API integration, commit history tracking, and automated updates for security teams and researchers.