mole-rpc-transport-mqtt
Advanced tools
Comparing version 1.0.1 to 1.3.0
{ | ||
"name": "mole-rpc-transport-mqtt", | ||
"version": "1.0.1", | ||
"version": "1.3.0", | ||
"description": "MQTT transport for Mole-RPC (JSON RPC library)", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "node ./tests/autotests.js || exit 1" | ||
"test": "(node ./tests/autotests.js && node ./tests/topicRegExpTests.js) || exit 1" | ||
}, | ||
@@ -15,6 +15,6 @@ "repository": { | ||
"mole-rpc": "1.*", | ||
"async-mqtt": "^2.5.0" | ||
"async-mqtt": "^2.6.3" | ||
}, | ||
"devDependencies": { | ||
"async-mqtt": "^2.5.0", | ||
"async-mqtt": "^2.6.3", | ||
"mole-rpc": "1.*", | ||
@@ -21,0 +21,0 @@ "mole-rpc-autotester": "1.*" |
@@ -1,2 +0,2 @@ | ||
MQTT Transport for Mole RPC (JSON RPC Library) | ||
MQTT Transport for Mole RPC (JSON RPC Library) | ||
--------------------------------------------- | ||
@@ -11,3 +11,3 @@ | ||
This is MQTT based tranport but there are [many more transports](https://www.npmjs.com/search?q=keywords:mole-transport). | ||
This is MQTT based tranport but there are [many more transports](https://www.npmjs.com/search?q=keywords:mole-transport). | ||
@@ -19,3 +19,3 @@ | ||
You can organize many to many RPC-connunication between microservices via MQTT. | ||
You can organize many to many RPC-connunication between microservices via MQTT. | ||
@@ -66,11 +66,11 @@ *For example, in our case, we use this module to connect microservices deployed to an IoT Hub* | ||
}); | ||
server.expose({ | ||
getGreeting(name) { | ||
return `Hi, ${name}`; | ||
} | ||
} | ||
}); | ||
await server.run(); | ||
} | ||
} | ||
@@ -84,7 +84,7 @@ | ||
mqttClient, | ||
inTopic: 'toClient/1', | ||
inTopic: 'toClient/1', | ||
outTopic: 'fromClient/1' | ||
}), | ||
}); | ||
const client2 = new MoleClient({ | ||
@@ -97,3 +97,3 @@ transport: new MQTTTransportClient({ | ||
}); | ||
console.log( | ||
@@ -115,8 +115,10 @@ 'CLIENT 1', | ||
MQTT has topics. Different services can subscribe to different topic. You can use wildcard characters in names like "+" or "#". | ||
MQTT has topics. \ | ||
Different services can subscribe to different topic. \ | ||
You can use wildcard characters in names like `+` or `#`. | ||
**MQTTTransportClient** has two topic related parameters: | ||
* outTopic - sends request to this topic. | ||
* inTopic - gets response in this topic. | ||
* `outTopic` - sends request to this topic. | ||
* `inTopic` - gets response in this topic. | ||
@@ -126,16 +128,29 @@ | ||
* inTopic - listend to requests from client in this topic. If you have many clients it makes sense use wildcard topic. | ||
* outTopic - sends response to this topic. You can pass callback which will convert in topic to outtopic. | ||
* `inTopic` - listend to requests from client in this topic. \ | ||
If you have many clients it makes sense use wildcard topic. | ||
* `outTopic` - sends response to this topic. \ | ||
You can pass callback which will convert in topic to outtopic. | ||
Also, both **MQTTTransportClient** and **MQTTTransportServer** support `inQos` and `outQos` parameters \ | ||
to control the [quality of service](https://en.wikipedia.org/wiki/MQTT#Quality_of_service). | ||
## Universal approach for to how to name topics in scalable way | ||
## How to use topics in the scalable way | ||
### If you use MQTT version 5 or higher | ||
MQTT version 5 supports the `responseTopic` property in-built to the protocol. | ||
The **MQTTTransportClient** will automatically pass it to server, therefore \ | ||
you can omit specifing `outTopic` for **MQTTTransportServer** in this case. | ||
### If you use MQTT version 4 or lower | ||
One of the pattern can be the following: | ||
* Send data to: `/rpc/${FROM}/${TO}` | ||
* Get data from: `/rpc/${TO}/${FROM}` | ||
* Send data to: `rpc/${FROM}/${TO}` | ||
* Get data from: `rpc/${TO}/${FROM}` | ||
*See "Usage Example" for a simpler approach* | ||
### Many RPC clients and one RPC Server. | ||
#### Many RPC clients and one RPC Server. | ||
@@ -147,3 +162,3 @@ Let's assume that is an authentication server | ||
```js | ||
const inTopic = '/rpc/+/auth-server'; | ||
const inTopic = 'rpc/+/auth-server'; | ||
const outTopic = inTopicToOutTopic; | ||
@@ -162,22 +177,26 @@ | ||
const clientId = 'client123'; // you can use UUID for automatic id generation. | ||
const inTopic = `/rpc/auth-server/${clientId}`; | ||
const outTopic = `/rpc/${clientId}/auth-server` | ||
const inTopic = `rpc/auth-server/${clientId}`; | ||
const outTopic = `rpc/${clientId}/auth-server` | ||
``` | ||
So, for each clients connection to server you will have a pair of topics. | ||
It looks a little bit more complecated but allows easely switch to many-to-many connection apprach. | ||
So, for each clients connection to server you will have a pair of topics. \ | ||
It looks a little bit more complicated but allows easely switch to many-to-many connection apprach. | ||
### Many-to-many connections | ||
#### Many-to-many connections | ||
For this case, you can use the same approach as for "Many RPC clients and one RPC Server" case. | ||
For this case, you can use the same approach as for "Many RPC clients and one RPC Server" case. | ||
1. You run every service as an Mole RPC server. | ||
2. You use instantiate Mole RPC clients with corresponding topics. | ||
2. You use instantiate Mole RPC clients with corresponding topics. | ||
You can notive that if SERVICE_A talks to SEVICE_B we need 2 topics. But when SERVICE_B talks to SERVICE_A we will use the same topic names and that is ok. This transport handles this situation, so you can use understandable topics which always follow this pattern: | ||
You can notive that if SERVICE_A talks to SEVICE_B we need 2 topics. \ | ||
But when SERVICE_B talks to SERVICE_A we will use the same topic names and that is ok. \ | ||
This transport handles this situation, so you can use understandable topics which always follow this pattern: | ||
* Always send data to: `/rpc/${FROM}/${TO}` | ||
* Always get data from: `/rpc/${TO}/${FROM}` | ||
* Always send data to: `rpc/${FROM}/${TO}` | ||
* Always get data from: `rpc/${TO}/${FROM}` | ||
This is the approach we use for many-to-many communication approach but you can use other approaces. | ||
For example, if you want to allow SERVICE_A to get request only from SERVICE_B, you can use inTopic name without wildcard - `/rpc/SERVICE_B/SERVICE_A` (outTopic can be set to static value `/rpc/SERVICE_A/SERVICE_B` as well ). | ||
This is the approach we use for many-to-many communication approach but you can use other approaces. \ | ||
For example, if you want to allow SERVICE_A to get request only from SERVICE_B, \ | ||
you can use inTopic name without wildcard - `rpc/SERVICE_B/SERVICE_A` \ | ||
(outTopic can be set to static value `rpc/SERVICE_A/SERVICE_B` as well ). |
@@ -14,5 +14,10 @@ const MQTT = require('async-mqtt'); | ||
async function main() { | ||
const server = await prepareServer(); | ||
const clients = await prepareClients(); | ||
await runAutoTests({ protocolVersion: 4 }); | ||
await runAutoTests({ protocolVersion: 5 }); | ||
} | ||
async function runAutoTests(settings) { | ||
const server = await prepareServer(settings); | ||
const clients = await prepareClients(settings); | ||
const autoTester = new AutoTester({ | ||
@@ -28,4 +33,4 @@ X, | ||
async function prepareServer() { | ||
const mqttClient = MQTT.connect(MQTT_ENDPOINT); | ||
async function prepareServer({ protocolVersion }) { | ||
const mqttClient = MQTT.connect(MQTT_ENDPOINT, { protocolVersion }); | ||
@@ -43,4 +48,4 @@ return new MoleServer({ | ||
async function prepareClients() { | ||
const mqttClient = MQTT.connect(MQTT_ENDPOINT); | ||
async function prepareClients({ protocolVersion }) { | ||
const mqttClient = MQTT.connect(MQTT_ENDPOINT, { protocolVersion }); | ||
@@ -47,0 +52,0 @@ const simpleClient = new MoleClient({ |
@@ -0,3 +1,12 @@ | ||
const { QOS_LEVELS } = require('./constants'); | ||
const buildTopicRegExp = require('./buildTopicRegExp'); | ||
class MQTTTransportClient { | ||
constructor({ mqttClient, inTopic, outTopic }) { | ||
constructor({ | ||
mqttClient, | ||
inTopic, | ||
outTopic, | ||
inQos = QOS_LEVELS.AT_MOST_ONCE, | ||
outQos = QOS_LEVELS.AT_MOST_ONCE | ||
}) { | ||
if (!mqttClient) throw new Error('"mqttClient" required'); | ||
@@ -10,18 +19,41 @@ if (!inTopic) throw new Error('"inTopic" required'); | ||
this.outTopic = outTopic; | ||
this.inQos = inQos; | ||
this.outQos = outQos; | ||
this.messageHandler = () => {}; | ||
} | ||
async onData(callback) { | ||
await this.mqttClient.subscribe(this.inTopic); | ||
const inTopicRegExp = buildTopicRegExp(this.inTopic); | ||
this.mqttClient.on('message', (topic, data) => { | ||
if (topic !== this.inTopic) return; | ||
callback(data.toString()); | ||
}); | ||
this.messageHandler = (topic, messageBuffer) => { | ||
if (!inTopicRegExp.test(topic)) { | ||
return; | ||
} | ||
callback(messageBuffer.toString()); | ||
}; | ||
this.mqttClient.on('message', this.messageHandler); | ||
await this.mqttClient.subscribe(this.inTopic, { qos: this.inQos }); | ||
} | ||
async sendData(data) { | ||
await this.mqttClient.publish(this.outTopic, data); | ||
await this.mqttClient.publish(this.outTopic, data, { | ||
qos: this.outQos, | ||
properties: { | ||
responseTopic: this.inTopic | ||
} | ||
}); | ||
} | ||
async shutdown() { | ||
await this.mqttClient.unsubscribe(this.inTopic); | ||
this.mqttClient.off('message', this.messageHandler); | ||
this.messageHandler = () => {}; | ||
} | ||
} | ||
module.exports = MQTTTransportClient; |
@@ -0,6 +1,14 @@ | ||
const { QOS_LEVELS } = require('./constants'); | ||
const buildTopicRegExp = require('./buildTopicRegExp'); | ||
class MQTTTransportServer { | ||
constructor({ mqttClient, inTopic, outTopic }) { | ||
constructor({ | ||
mqttClient, | ||
inTopic, | ||
outTopic, | ||
inQos = QOS_LEVELS.AT_MOST_ONCE, | ||
outQos = QOS_LEVELS.AT_MOST_ONCE | ||
}) { | ||
if (!mqttClient) throw new Error('"mqttClient" required'); | ||
if (!inTopic) throw new Error('"inTopic" required'); | ||
if (!outTopic) throw new Error('"outTopic" required'); | ||
@@ -10,19 +18,53 @@ this.mqttClient = mqttClient; | ||
this.outTopic = outTopic; | ||
this.inQos = inQos; | ||
this.outQos = outQos; | ||
this.messageHandler = () => {}; | ||
} | ||
async onData(callback) { | ||
await this.mqttClient.subscribe(this.inTopic); | ||
const inTopicRegExp = buildTopicRegExp(this.inTopic); | ||
this.mqttClient.on('message', async (inTopic, requestData) => { | ||
const responseData = await callback(requestData.toString()); | ||
if (!responseData) return; | ||
this.messageHandler = async (topic, messageBuffer, packet) => { | ||
if (!inTopicRegExp.test(topic)) { | ||
return; | ||
} | ||
const outTopic = | ||
typeof this.outTopic === 'function' ? this.outTopic({ inTopic }) : this.outTopic; | ||
const responseData = await callback(messageBuffer.toString()); | ||
this.mqttClient.publish(outTopic, responseData); | ||
}); | ||
if (!responseData) { | ||
return; | ||
} | ||
let outTopic; | ||
if (packet.properties && packet.properties.responseTopic) { | ||
// Supported only by MQTT 5.0 | ||
outTopic = packet.properties.responseTopic; | ||
} else if (typeof this.outTopic === 'function') { | ||
outTopic = this.outTopic({ inTopic: topic }); | ||
} else { | ||
outTopic = this.outTopic; | ||
} | ||
if (!outTopic) { | ||
throw new Error('"outTopic" is not specified'); | ||
} | ||
await this.mqttClient.publish(outTopic, responseData, { qos: this.outQos }); | ||
}; | ||
this.mqttClient.on('message', this.messageHandler); | ||
await this.mqttClient.subscribe(this.inTopic, { qos: this.inQos }); | ||
} | ||
async shutdown() { | ||
await this.mqttClient.unsubscribe(this.inTopic); | ||
this.mqttClient.off('message', this.messageHandler); | ||
this.messageHandler = () => {}; | ||
} | ||
} | ||
module.exports = MQTTTransportServer; |
Sorry, the diff of this file is not supported yet
20130
15
292
193