Socket
Socket
Sign inDemoInstall

mole-rpc-transport-mqtt

Package Overview
Dependencies
49
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.1 to 1.3.0

buildTopicRegExp.js

8

package.json
{
"name": "mole-rpc-transport-mqtt",
"version": "1.0.1",
"version": "1.3.0",
"description": "MQTT transport for Mole-RPC (JSON RPC library)",
"main": "index.js",
"scripts": {
"test": "node ./tests/autotests.js || exit 1"
"test": "(node ./tests/autotests.js && node ./tests/topicRegExpTests.js) || exit 1"
},

@@ -15,6 +15,6 @@ "repository": {

"mole-rpc": "1.*",
"async-mqtt": "^2.5.0"
"async-mqtt": "^2.6.3"
},
"devDependencies": {
"async-mqtt": "^2.5.0",
"async-mqtt": "^2.6.3",
"mole-rpc": "1.*",

@@ -21,0 +21,0 @@ "mole-rpc-autotester": "1.*"

@@ -1,2 +0,2 @@

MQTT Transport for Mole RPC (JSON RPC Library)
MQTT Transport for Mole RPC (JSON RPC Library)
---------------------------------------------

@@ -11,3 +11,3 @@

This is MQTT based tranport but there are [many more transports](https://www.npmjs.com/search?q=keywords:mole-transport).
This is MQTT based tranport but there are [many more transports](https://www.npmjs.com/search?q=keywords:mole-transport).

@@ -19,3 +19,3 @@

You can organize many to many RPC-connunication between microservices via MQTT.
You can organize many to many RPC-connunication between microservices via MQTT.

@@ -66,11 +66,11 @@ *For example, in our case, we use this module to connect microservices deployed to an IoT Hub*

});
server.expose({
getGreeting(name) {
return `Hi, ${name}`;
}
}
});
await server.run();
}
}

@@ -84,7 +84,7 @@

mqttClient,
inTopic: 'toClient/1',
inTopic: 'toClient/1',
outTopic: 'fromClient/1'
}),
});
const client2 = new MoleClient({

@@ -97,3 +97,3 @@ transport: new MQTTTransportClient({

});
console.log(

@@ -115,8 +115,10 @@ 'CLIENT 1',

MQTT has topics. Different services can subscribe to different topic. You can use wildcard characters in names like "+" or "#".
MQTT has topics. \
Different services can subscribe to different topic. \
You can use wildcard characters in names like `+` or `#`.
**MQTTTransportClient** has two topic related parameters:
* outTopic - sends request to this topic.
* inTopic - gets response in this topic.
* `outTopic` - sends request to this topic.
* `inTopic` - gets response in this topic.

@@ -126,16 +128,29 @@

* inTopic - listend to requests from client in this topic. If you have many clients it makes sense use wildcard topic.
* outTopic - sends response to this topic. You can pass callback which will convert in topic to outtopic.
* `inTopic` - listend to requests from client in this topic. \
If you have many clients it makes sense use wildcard topic.
* `outTopic` - sends response to this topic. \
You can pass callback which will convert in topic to outtopic.
Also, both **MQTTTransportClient** and **MQTTTransportServer** support `inQos` and `outQos` parameters \
to control the [quality of service](https://en.wikipedia.org/wiki/MQTT#Quality_of_service).
## Universal approach for to how to name topics in scalable way
## How to use topics in the scalable way
### If you use MQTT version 5 or higher
MQTT version 5 supports the `responseTopic` property in-built to the protocol.
The **MQTTTransportClient** will automatically pass it to server, therefore \
you can omit specifing `outTopic` for **MQTTTransportServer** in this case.
### If you use MQTT version 4 or lower
One of the pattern can be the following:
* Send data to: `/rpc/${FROM}/${TO}`
* Get data from: `/rpc/${TO}/${FROM}`
* Send data to: `rpc/${FROM}/${TO}`
* Get data from: `rpc/${TO}/${FROM}`
*See "Usage Example" for a simpler approach*
### Many RPC clients and one RPC Server.
#### Many RPC clients and one RPC Server.

@@ -147,3 +162,3 @@ Let's assume that is an authentication server

```js
const inTopic = '/rpc/+/auth-server';
const inTopic = 'rpc/+/auth-server';
const outTopic = inTopicToOutTopic;

@@ -162,22 +177,26 @@

const clientId = 'client123'; // you can use UUID for automatic id generation.
const inTopic = `/rpc/auth-server/${clientId}`;
const outTopic = `/rpc/${clientId}/auth-server`
const inTopic = `rpc/auth-server/${clientId}`;
const outTopic = `rpc/${clientId}/auth-server`
```
So, for each clients connection to server you will have a pair of topics.
It looks a little bit more complecated but allows easely switch to many-to-many connection apprach.
So, for each clients connection to server you will have a pair of topics. \
It looks a little bit more complicated but allows easely switch to many-to-many connection apprach.
### Many-to-many connections
#### Many-to-many connections
For this case, you can use the same approach as for "Many RPC clients and one RPC Server" case.
For this case, you can use the same approach as for "Many RPC clients and one RPC Server" case.
1. You run every service as an Mole RPC server.
2. You use instantiate Mole RPC clients with corresponding topics.
2. You use instantiate Mole RPC clients with corresponding topics.
You can notive that if SERVICE_A talks to SEVICE_B we need 2 topics. But when SERVICE_B talks to SERVICE_A we will use the same topic names and that is ok. This transport handles this situation, so you can use understandable topics which always follow this pattern:
You can notive that if SERVICE_A talks to SEVICE_B we need 2 topics. \
But when SERVICE_B talks to SERVICE_A we will use the same topic names and that is ok. \
This transport handles this situation, so you can use understandable topics which always follow this pattern:
* Always send data to: `/rpc/${FROM}/${TO}`
* Always get data from: `/rpc/${TO}/${FROM}`
* Always send data to: `rpc/${FROM}/${TO}`
* Always get data from: `rpc/${TO}/${FROM}`
This is the approach we use for many-to-many communication approach but you can use other approaces.
For example, if you want to allow SERVICE_A to get request only from SERVICE_B, you can use inTopic name without wildcard - `/rpc/SERVICE_B/SERVICE_A` (outTopic can be set to static value `/rpc/SERVICE_A/SERVICE_B` as well ).
This is the approach we use for many-to-many communication approach but you can use other approaces. \
For example, if you want to allow SERVICE_A to get request only from SERVICE_B, \
you can use inTopic name without wildcard - `rpc/SERVICE_B/SERVICE_A` \
(outTopic can be set to static value `rpc/SERVICE_A/SERVICE_B` as well ).

@@ -14,5 +14,10 @@ const MQTT = require('async-mqtt');

async function main() {
const server = await prepareServer();
const clients = await prepareClients();
await runAutoTests({ protocolVersion: 4 });
await runAutoTests({ protocolVersion: 5 });
}
async function runAutoTests(settings) {
const server = await prepareServer(settings);
const clients = await prepareClients(settings);
const autoTester = new AutoTester({

@@ -28,4 +33,4 @@ X,

async function prepareServer() {
const mqttClient = MQTT.connect(MQTT_ENDPOINT);
async function prepareServer({ protocolVersion }) {
const mqttClient = MQTT.connect(MQTT_ENDPOINT, { protocolVersion });

@@ -43,4 +48,4 @@ return new MoleServer({

async function prepareClients() {
const mqttClient = MQTT.connect(MQTT_ENDPOINT);
async function prepareClients({ protocolVersion }) {
const mqttClient = MQTT.connect(MQTT_ENDPOINT, { protocolVersion });

@@ -47,0 +52,0 @@ const simpleClient = new MoleClient({

@@ -0,3 +1,12 @@

const { QOS_LEVELS } = require('./constants');
const buildTopicRegExp = require('./buildTopicRegExp');
class MQTTTransportClient {
constructor({ mqttClient, inTopic, outTopic }) {
constructor({
mqttClient,
inTopic,
outTopic,
inQos = QOS_LEVELS.AT_MOST_ONCE,
outQos = QOS_LEVELS.AT_MOST_ONCE
}) {
if (!mqttClient) throw new Error('"mqttClient" required');

@@ -10,18 +19,41 @@ if (!inTopic) throw new Error('"inTopic" required');

this.outTopic = outTopic;
this.inQos = inQos;
this.outQos = outQos;
this.messageHandler = () => {};
}
async onData(callback) {
await this.mqttClient.subscribe(this.inTopic);
const inTopicRegExp = buildTopicRegExp(this.inTopic);
this.mqttClient.on('message', (topic, data) => {
if (topic !== this.inTopic) return;
callback(data.toString());
});
this.messageHandler = (topic, messageBuffer) => {
if (!inTopicRegExp.test(topic)) {
return;
}
callback(messageBuffer.toString());
};
this.mqttClient.on('message', this.messageHandler);
await this.mqttClient.subscribe(this.inTopic, { qos: this.inQos });
}
async sendData(data) {
await this.mqttClient.publish(this.outTopic, data);
await this.mqttClient.publish(this.outTopic, data, {
qos: this.outQos,
properties: {
responseTopic: this.inTopic
}
});
}
async shutdown() {
await this.mqttClient.unsubscribe(this.inTopic);
this.mqttClient.off('message', this.messageHandler);
this.messageHandler = () => {};
}
}
module.exports = MQTTTransportClient;

@@ -0,6 +1,14 @@

const { QOS_LEVELS } = require('./constants');
const buildTopicRegExp = require('./buildTopicRegExp');
class MQTTTransportServer {
constructor({ mqttClient, inTopic, outTopic }) {
constructor({
mqttClient,
inTopic,
outTopic,
inQos = QOS_LEVELS.AT_MOST_ONCE,
outQos = QOS_LEVELS.AT_MOST_ONCE
}) {
if (!mqttClient) throw new Error('"mqttClient" required');
if (!inTopic) throw new Error('"inTopic" required');
if (!outTopic) throw new Error('"outTopic" required');

@@ -10,19 +18,53 @@ this.mqttClient = mqttClient;

this.outTopic = outTopic;
this.inQos = inQos;
this.outQos = outQos;
this.messageHandler = () => {};
}
async onData(callback) {
await this.mqttClient.subscribe(this.inTopic);
const inTopicRegExp = buildTopicRegExp(this.inTopic);
this.mqttClient.on('message', async (inTopic, requestData) => {
const responseData = await callback(requestData.toString());
if (!responseData) return;
this.messageHandler = async (topic, messageBuffer, packet) => {
if (!inTopicRegExp.test(topic)) {
return;
}
const outTopic =
typeof this.outTopic === 'function' ? this.outTopic({ inTopic }) : this.outTopic;
const responseData = await callback(messageBuffer.toString());
this.mqttClient.publish(outTopic, responseData);
});
if (!responseData) {
return;
}
let outTopic;
if (packet.properties && packet.properties.responseTopic) {
// Supported only by MQTT 5.0
outTopic = packet.properties.responseTopic;
} else if (typeof this.outTopic === 'function') {
outTopic = this.outTopic({ inTopic: topic });
} else {
outTopic = this.outTopic;
}
if (!outTopic) {
throw new Error('"outTopic" is not specified');
}
await this.mqttClient.publish(outTopic, responseData, { qos: this.outQos });
};
this.mqttClient.on('message', this.messageHandler);
await this.mqttClient.subscribe(this.inTopic, { qos: this.inQos });
}
async shutdown() {
await this.mqttClient.unsubscribe(this.inTopic);
this.mqttClient.off('message', this.messageHandler);
this.messageHandler = () => {};
}
}
module.exports = MQTTTransportServer;

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc