RxJS wrapper for messaging systems
This library makes it easy to send messages in a distributed network transparent
way via various brokers but initially via RabbitMQ.
Roadmap
At a later point we should have plugins to make it work with various messaging paradigms:
Principles:
- Declarative over imperative.
- Functions over classes.
- Simplicity over complexity.
- Immutable over mutable.
- Flexible and composable over fixed heirarchy.
- Pure over impure.
- Minmalistic sensible defaults over boilerplate.
- Idiomatic API's over reinventing the wheel.
Environments
- Basic framework should work in all V8 environments. eg.
- Middleware might be environment specific. Eg.
blockbid-messages/amqp requires node. blockbid-messages/socketio-browser may require browser objects. (YTBI)
TODO
Installation
You can install by referencing a version tag directly off the github repo.
yarn add blockbid/blockbid-message
Framework Usage
import { createConsumer, createProducer } from 'blockbid-message';
import { filter } from 'rxjs/opeerators';
const producer = createProducer(
transformMessageSomehow,
broadCastsMessagesSomewhere
);
const consumer = createConsumer(
receivesMessagesFromSomewhere,
logOrTransformMessage,
doSomeMoreTransformation
);
producer.next({
content: 'Hello World!',
route: 'hello'
});
const sub = consumer
.pipe(filter(msg => msg.content.toLowerCase().includes('world')))
.subscribe(msg => {
console.log(`Received: ${msg.content}`);
});
Typescript types
Messages
Generic message objects look like this:
export interface IMessage {
content: any;
route?: any;
}
You might use a message by sending it to the next() method of a producer.
producer.next({
content: 'Hi there!',
route: 'some-queue'
});
Middleware
Middleware are effectively functions designed to decorate RxJS streams and looks like this:
export type Middleware<T extends IMessage> = (
a: Observable<T>
) => Observable<T>;
You might use a middleware by passing it as one of the arguments to the createProducer() or createConsumer() functions
import {tap} from 'rxjs/operators';
function logger(stream: Observable<IMessage>) {
return stream
.pipe(tap(
(msg:IMessage) => console.log(`Stream logged: ${msg.content}`
));
}
const consumer = createConsumer(someReceiver, logger);
AMQP Middleware
AMQP Middleware is designed to work in Node environments only due to limitations with the amqplib package it is based on.
Basic Usecase with amqp middleware
import { createConsumer, createProducer } from 'blockbid-message';
import { createAmqpConnector } from 'blockbid-message/amqp';
const { sender, receiver } = createAmqpConnector({
declarations: {
queues: [
{
durable: false,
name: 'hello'
}
]
},
uri: 'amqp://user:password@somerabbitserver.io/user'
});
const producer = createProducer(sender());
producer.next({
content: 'Hello World!',
route: 'hello'
});
const consumer = createConsumer(
receiver({
noAck: true,
queue: 'hello'
})
);
const sub = consumer.subscribe(msg => {
console.log(`Received: ${msg.content}`);
});
sub.unsubscribe();
Example Usage AMQP Middleware
For usage and examples please look at the basic tests thrown together here
RxJS References
Docs
Videos
NOTE: Using version 6
blockbid-message uses RxJS v6.0 so you need to pipe all your operators:
import { filter } from 'rxjs/operators';
consumer.pipe(filter(forUserEvents(userId))).subscribe(
msg => {
dealWithMessage(msg.content);
},
() => {}
);
Other References