@eduardorothdev/rxjs-mqtt
RxJS Wrapper for mqtt with TS support. Based on async-mqtt
Install
npm i @eduardorothdev/rxjs-mqtt
Using it
You first need to call and await the connect function, which will return a MqttClient
object with updated methods.
The methods for this MqttClient object are the same as one generated with mqtt
but has the following methods modified:
- Promises
.publish()
.subscribe()
.unsubscribe()
.end()
- Observables
.on()
- This method returns an observable that emits any time it receives from the event being listened.
.onJsonMessage<T>()
New method
- This is a helper method that extends the
on()
one, getting only the events of message
sent from the MQTT server, and parsing them to JSON <T>
is the interface or type that the method will try to parse the received message
In addition to those, I added two more Operator Function helpers that you can chain to the pipe
method of the Observable returned, that will help you easily parse the received events.
- Helpers
parsePayload<T>(parser: (buffer: Buffer) => T | string = (buffer: Buffer) => buffer.toString()
- This Operator Function helper allows you to parse the bytes payload received from the MQTT event with a custom function that will convert the bytes to anything.
parsePayloadToJSON()
- This Operator Function helper extends the previous function to easily convert the received bytes to JSON.
Example code
import { connect } from "@eduardorothdev/rxjs-mqtt";
try {
const client = await connect("mqtt://test.mosquitto.org", {
});
await client.subscribe("some/topic/#");
const sub = client
.onJsonMessage<{
some: string;
property: string;
mapping: boolean;
}>()
.pipe(
catchError((err) => {
return of(null);
}),
)
.subscribe((jsonMessage) => {
console.log(jsonMessage);
});
} catch (err) {
console.log(err);
}
License
Licensed under MIT.