rxjs-event-bus
Что это такое?
Это шина событий, написанная на rxjs
Зачем это?
Чтобы организовать связь между разными приложениями с помощью удобных в использовании потоков.
Также с этим подходом удобно поддерживать версионность и типы событий.
Что внутри?
Внутри потоки, реализованные через Subject и ReplaySubject, которые позволяют подписываться на события (Subject) и даже получать события из прошлого (ReplaySubject)!
Как это работает?
Всё просто, смотри:
Это просто подписчик, он не имеет к потокам никакого отношения. Тут должен быть твой код.
const log = (() => {
let counter = 0;
return event =>
console.log(`${++counter}: type: ${event.type}, payload: ${event.payload}`)
})()
Вот так выглядит обычный pub/sub
const bus = new Bus()
bus.emit({ type: 'first_event_name', payload: 1 })
bus
.select('first_event_name')
.subscribe(log)
bus
.select('second_event_name')
.subscribe(log)
bus.emit({ type: 'first_event_name', payload: 2 })
bus.emit({ type: 'second_event_name', payload: 1 })
bus.emit({ type: 'second_event_name', payload: 2 })
Как работает главный поток (mainStream)
const bus = new Bus()
bus.emit({ type: 'first_event_name', payload: 1 })
bus
.getMainStream()
.subscribe(log)
bus.emit({ type: 'first_event_name', payload: 2 })
bus.emit({ type: 'second_event_name', payload: 1 })
bus.emit({ type: 'third_event_name', payload: 1 })
А если я хочу получить события из прошлого? Сначала эмит, потом сабскрайб!
Для запоминания истории шину нужно сконфигурировать. Параметр конструктора - это Map, где ключи - имена событий, значения - глубина истории для этих событий.
const historySettings = new Map([
['first_event_name', 2],
['second_event_name', 1]
])
const bus = new Bus(historySettings)
bus.emit({ type: 'first_event_name', payload: 1 })
bus.emit({ type: 'first_event_name', payload: 2 })
bus.emit({ type: 'first_event_name', payload: 3 })
bus.emit({ type: 'second_event_name', payload: 1 })
bus.emit({ type: 'second_event_name', payload: 2 })
bus.emit({ type: 'second_event_name', payload: 3 })
bus
.select('first_event_name')
.subscribe(log)
bus
.select('second_event_name')
.subscribe(log)
bus.emit({ type: 'first_event_name', payload: 4 })
bus.emit({ type: 'second_event_name', payload: 4 })
Так же работает и mainStream
const historySettings = new Map([
['first_event_name', 2],
['second_event_name', 1]
])
const bus = new Bus(historySettings)
bus.emit({ type: 'first_event_name', payload: 1 })
bus.emit({ type: 'first_event_name', payload: 2 })
bus.emit({ type: 'first_event_name', payload: 3 })
bus.emit({ type: 'second_event_name', payload: 1 })
bus.emit({ type: 'second_event_name', payload: 2 })
bus.emit({ type: 'second_event_name', payload: 3 })
bus
.getMainStream()
.subscribe(log)
bus.emit({ type: 'first_event_name', payload: 4 })
bus.emit({ type: 'second_event_name', payload: 4 })
Погоди, а если мне не нужны все события из прошлого?
Просто передавай в select() вторым аргументом, сколько именно событий из прошлого тебе нужно
const historySettings = new Map([
['first_event_name', 2],
['second_event_name', 1],
])
const bus = new Bus(historySettings)
bus.emit({ type: 'first_event_name', payload: 1 })
bus.emit({ type: 'first_event_name', payload: 2 })
bus.emit({ type: 'first_event_name', payload: 3 })
bus.emit({ type: 'second_event_name', payload: 1 })
bus
.select('first_event_name', 1)
.subscribe(log)
bus
.select('second_event_name', 0)
.subscribe(log)
bus.emit({ type: 'first_event_name', payload: 4 })
bus.emit({ type: 'second_event_name', payload: 2 })
Окей, а что мне делать, если я хочу поиграться с этим своими руками?
Клонируй и запускай тесты
$ npm i
$ npm t