fastify-rxjs-mqtt
Wrapper for rxjs-mqtt with TS support.
Install
npm i @eduardorothdev/fastify-rxjs-mqtt
Using it
Add it to your fastify project with the register
method.
import fastify from "fastify";
import { fastifyRxjsMqtt } from "@eduardorothdev/fastify-rxjs-mqtt";
const host = process.env.HOST ?? "localhost";
const port = process.env.PORT ? Number(process.env.PORT) : 3000;
const server = fastify();
server.register(fastifyRxjsMqtt, { url: "mqtt://localhost:1883" });
server.listen({ port, host }, (err) => {
if (err) throw err;
});
Then you can use it in your methods
import { FastifyInstance } from "fastify";
export default async function (fastify: FastifyInstance) {
fastify.get("/mqtt/ping", async function (req, reply) {
await this.mqttClient.publish("your/topic/#", "Async Hi Mosquitto!");
reply.send({ mqtt: "message sent!" });
});
}
Listening for events
import { FastifyInstance } from "fastify";
export default async function (fastify: FastifyInstance) {
try {
await fastify.mqttClient.subscribe("some/topic/#");
const sub = fastify.mqttClient
.onJsonMessage<{
some: string;
property: string;
mapping: boolean;
}>()
.pipe(
catchError((err) => {
return of(null);
}),
)
.subscribe((jsonMessage) => {
console.log(jsonMessage);
});
} catch (err) {
console.log(err);
}
}
License
Licensed under MIT.