
Company News
/Security News
Socket Selected for OpenAI's Cybersecurity Grant Program
Socket is an initial recipient of OpenAI's Cybersecurity Grant Program, which commits $10M in API credits to defenders securing open source software.
@bdlite/observable
Advanced tools
Observable-RX是一个基于观察者模式的JavaScript库,用于处理异步数据流。它提供了一种优雅的方式来处理异步数据流,使得代码更易于理解和维护。
有别于RxJS的使用,其中一点是,当发出了error事件,不需要retry(吐槽下retry也没用),next跟error可以同时不限制地使用,这也是自研的原因之一。
使用npm安装:
npm install observable-rx
要创建一个Observable,只需调用Observable构造函数即可。
import { Observable } from 'observable-rx';
const observable = new Observable();
要订阅Observable,调用subscribe方法并传入一个观察者对象。观察者对象应该具有next、error方法,用于处理Observable发出的值和错误信号。
import { Observable } from 'observable-rx';
const observable = new Observable();
// 订阅可观察对象
const subscription = observable.subscribe({
next: (value) => console.log(value),
error: (value) => console.error(value),
});
// 取消订阅
subscription.unsubscribe();
使用Observable的next方法,需要先创建一个Observable对象并调用subscribe方法进行订阅。
在观察者对象中,可以定义next方法来处理Observable发出的值。例如:
import { Observable } from 'observable-rx';
const observable = new Observable();
// 订阅可观察对象
observable.subscribe({
next: (value) => console.log(value),
error: (value) => console.error(value),
});
// 在需要的地方,调用Observable的next方法
setTimeout(() => {
observable.next('hello world');
}, 3000);
在上面的代码中,我们调用了Observable的next方法并传入了一个字符串作为参数。
当Observable发出这个值时,观察者对象的next方法将被调用并传入该值作为参数。
需要注意的是,如果Observable还没有被订阅,调用next方法将不会有任何效果。
如果你需要获取到上一次的值,在创建observable的时候,加上配置项relay:
import { Observable } from 'observable-rx';
// 传入options,配置relay为1
const observable = new Observable({ options: { relay: 1 } });
// 调用Observable的next方法
observable.next('hello world');
// 订阅可观察对象
observable.subscribe({
next: (value) => console.log(value), // 会立即被调用一次
error: (value) => console.error(value),
});
由于relay大于0,即使订阅在 observable.next('hello world') 之后执行,subscribe中的next回调也会被执行。
如果开启共享模式,即使订阅在值被发出之后执行,所有观察者对象的next方法都将被立即调用。
relay选项指定了Observable发出值的数量,而不是订阅的数量。
目前没有真正实现relay的次数,仅判断是否大于0,因此当relay大于0时,所有next回调都只会被立即执行1次。
使用Observable的error方法,在观察者对象中,可以定义error方法来处理Observable发出的错误。例如:
import { Observable } from 'observable-rx';
const observable = new Observable({ options: { relay: 1 } });
// 订阅可观察对象
observable.subscribe({
next: (value) => console.log(value),
error: (value) => console.error(value),
});
// 出现异常时,调用Observable的error方法
setTimeout(() => {
observable.error('something wrong');
}, 5000);
在上面的代码中,我们调用了Observable的error方法并传入了一个字符串作为错误信息。当Observable发出这个值时,观察者对象的error方法将被调用并传入该值作为参数。
注意,错误不会因为relay配置而缓存下来,订阅时机在
observable.error('timeout')执行之前不会获取历史错误信息。
Observable提供了一个share操作符,用于共享Observable的订阅,真正实现发布订阅模式,而不是一对一响应。
import { Observable, share } from 'observable-rx';
const observable = new Observable({ options: { relay: 1 } });
// 该操作会重载部分observable的方法
share()(observable);
// 调用Observable的next方法
observable.next('hello world');
// 调用Observable的error方法
setTimeout(() => {
observable.error('timeout');
}, 5000);
// 只能监听到next事件
observable.subscribe((value) => console.log('subscribe-1: ', value));
// 监听多种事件
observable.subscribe({
next: (value) => console.log('subscribe-2-next: ', value),
error: (value) => console.error('subscribe-2-error: ', value),
});
// 再次调用Observable的next方法
observable.next('hello again');
未开启共享模式时,重复调用
observable.subscribe只会覆盖之前的回调。
new Observable(props?: Props): Observable
创建一个新的Observable。
参数:
返回值:
import { Observable } from 'observable-rx';
const observable = new Observable({ initialData: {}, options: { relay: 1 } });
subscribe(observer: Observer | Callback): Subscription
订阅Observable。
参数:
或者:
返回值:
const subscription = observable.subscribe({
next: (value) => console.log(value),
error: (value) => console.error(value),
});
subscription.unsubscribe();
或者
const subscription = observable.subscribe((value) => console.log(value));
subscription.unsubscribe();
next(data: any): void
向 Observer 发送数据。
参数:
返回值:
observable.next('hello world');
error(data: any): void
向 Observer 发送错误信息。
参数:
返回值:
observable.error('something went wrong');
share
共享Observable的订阅。
import { share } from 'observable-rx';
share()(observable);
observable.subscribe((value) => console.log('subscribe shared observable 1: ', value));
observable.subscribe({
next: (value) => console.log('subscribe shared observer 2-next: ', value),
error: (value) => console.error('subscribe shared observer 2-error: ', value),
});
如您有意使用,欢迎联系本人yexinxie@163.com,这个小项目会根据您的意见修改并添加测试用例。
If you are interested in using it, please contact me at yexinxie@163.com. This small project will be modified and test cases will be added according to your feedback.
FAQs
A publish-subscribe utility library designed with BroadcastChannel API
The npm package @bdlite/observable receives a total of 19 weekly downloads. As such, @bdlite/observable popularity was classified as not popular.
We found that @bdlite/observable demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 4 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Company News
/Security News
Socket is an initial recipient of OpenAI's Cybersecurity Grant Program, which commits $10M in API credits to defenders securing open source software.

Security News
Socket CEO Feross Aboukhadijeh joins 10 Minutes or Less, a podcast by Ali Rohde, to discuss the recent surge in open source supply chain attacks.

Research
/Security News
Campaign of 108 extensions harvests identities, steals sessions, and adds backdoors to browsers, all tied to the same C2 infrastructure.