@akiroz/thalamus
Advanced tools
Comparing version 0.2.4 to 0.3.0
@@ -5,7 +5,4 @@ /// <reference types="node" /> | ||
import * as RPC from "./rpc"; | ||
declare type SubHandler = (payload: Uint8Array, topic: string) => Promise<void>; | ||
declare type MQHandler = (message: { | ||
topic: string; | ||
message: Uint8Array; | ||
}, done: () => void) => void; | ||
import Dispatcher from "./dispatcher"; | ||
declare type SubHandler = (payload: Uint8Array, topic: string) => any; | ||
export default class Thalamus extends EventEmitter { | ||
@@ -15,11 +12,8 @@ subDebounceWindow: number; | ||
topics: Set<string>; | ||
event: EventEmitter; | ||
promise: Promise<void>; | ||
timeout: NodeJS.Timeout; | ||
doSubscribeLock: boolean; | ||
}; | ||
emitter: import("@akiroz/mqemitter").MQEmitter; | ||
servers: MQTT.AsyncMqttClient[]; | ||
persistantTopics: Set<string>; | ||
handlers: WeakMap<SubHandler, MQHandler>; | ||
dispatcher: Dispatcher; | ||
constructor(serverOptList?: MQTT.IClientOptions[]); | ||
@@ -34,3 +28,4 @@ publish(topic: string, payload: Uint8Array): Promise<void>; | ||
call<P extends RPC.RPCParamResult, R extends RPC.RPCParamResult>(topic: string, params?: P, opt?: Partial<typeof RPC.defaultCallOptions>): Promise<R>; | ||
close(force?: boolean): void; | ||
} | ||
export {}; |
@@ -117,5 +117,5 @@ "use strict"; | ||
var events_1 = require("events"); | ||
var mqemitter_1 = __importDefault(require("@akiroz/mqemitter")); | ||
var MQTT = __importStar(require("./asyncWrapper")); | ||
var RPC = __importStar(require("./rpc")); | ||
var dispatcher_1 = __importDefault(require("./dispatcher")); | ||
var Thalamus = /** @class */ (function (_super) { | ||
@@ -128,5 +128,4 @@ __extends(Thalamus, _super); | ||
_this.subDebounceState = null; | ||
_this.emitter = (0, mqemitter_1.default)(); | ||
_this.persistantTopics = new Set(); | ||
_this.handlers = new WeakMap(); | ||
_this.dispatcher = new dispatcher_1.default(); | ||
if (serverOptList.length < 1) | ||
@@ -164,3 +163,3 @@ throw Error("No MQTT servers"); | ||
this_1.servers[i].on("error", function (err) { return _this.emit("error", err, i); }); | ||
this_1.servers[i].on("message", function (topic, message) { return _this.emitter.emit({ topic: topic, message: message }); }); | ||
this_1.servers[i].on("message", function (topic, msg) { return _this.dispatcher.emit(topic, msg); }); | ||
}; | ||
@@ -224,3 +223,3 @@ var this_1 = this; | ||
return __awaiter(this, void 0, void 0, function () { | ||
var conn, err_2; | ||
var conn; | ||
var _this = this; | ||
@@ -231,8 +230,5 @@ return __generator(this, function (_a) { | ||
this.subDebounceState.doSubscribeLock = true; | ||
_a.label = 1; | ||
case 1: | ||
_a.trys.push([1, 3, , 4]); | ||
conn = this.servers.filter(function (s) { return s.connected; }); | ||
return [4 /*yield*/, Promise.all(conn.map(function (srv) { return __awaiter(_this, void 0, void 0, function () { | ||
var err_3; | ||
var err_2; | ||
return __generator(this, function (_a) { | ||
@@ -247,6 +243,6 @@ switch (_a.label) { | ||
case 2: | ||
err_3 = _a.sent(); | ||
console.warn("[Thalamus] doSubscribe failed, reconnect...", err_3); | ||
err_2 = _a.sent(); | ||
console.warn("[Thalamus] doSubscribe failed, reconnect...", err_2); | ||
srv.reconnect(); | ||
throw err_3; | ||
throw err_2; | ||
case 3: return [2 /*return*/]; | ||
@@ -256,11 +252,4 @@ } | ||
}); }))]; | ||
case 2: | ||
case 1: | ||
_a.sent(); | ||
this.subDebounceState.event.emit("sub"); | ||
return [3 /*break*/, 4]; | ||
case 3: | ||
err_2 = _a.sent(); | ||
this.subDebounceState.event.emit("err", err_2); | ||
return [3 /*break*/, 4]; | ||
case 4: | ||
this.subDebounceState = null; | ||
@@ -275,3 +264,3 @@ return [2 /*return*/]; | ||
return __awaiter(this, void 0, void 0, function () { | ||
var h, ee_1, conn; | ||
var conn; | ||
var _this = this; | ||
@@ -281,10 +270,5 @@ return __generator(this, function (_a) { | ||
case 0: | ||
h = function (_a, done) { | ||
var topic = _a.topic, message = _a.message; | ||
return (done(), handler(message, topic)); | ||
}; | ||
if (!opts.persistent) return [3 /*break*/, 2]; | ||
this.persistantTopics.add(topic); | ||
this.handlers.set(handler, h); | ||
this.emitter.on(topic, h); | ||
this.dispatcher.on(topic, handler); | ||
if (this.subDebounceState) { | ||
@@ -294,16 +278,9 @@ if (this.subDebounceState.doSubscribeLock) | ||
this.subDebounceState.topics.add(topic); | ||
clearTimeout(this.subDebounceState.timeout); | ||
this.subDebounceState.timeout = setTimeout(function () { return _this.doSubscribe(); }, this.subDebounceWindow); | ||
} | ||
else { | ||
ee_1 = new events_1.EventEmitter(); | ||
this.subDebounceState = { | ||
doSubscribeLock: false, | ||
topics: new Set([topic]), | ||
event: ee_1, | ||
promise: new Promise(function (rsov, rjct) { | ||
ee_1.once("sub", rsov); | ||
ee_1.once("err", rjct); | ||
}), | ||
timeout: setTimeout(function () { return _this.doSubscribe(); }, this.subDebounceWindow), | ||
doSubscribeLock: false, | ||
promise: new Promise(function (r) { return setTimeout(r, _this.subDebounceWindow); }) | ||
.then(function () { return _this.doSubscribe(); }), | ||
}; | ||
@@ -317,4 +294,3 @@ } | ||
conn = this.servers.filter(function (s) { return s.connected; }); | ||
this.handlers.set(handler, h); | ||
this.emitter.on(topic, h); | ||
this.dispatcher.on(topic, handler); | ||
return [4 /*yield*/, Promise.all(conn.map(function (s) { return s.subscribeAsync(topic, { qos: 0 }); }))]; | ||
@@ -335,3 +311,3 @@ case 3: | ||
case 0: | ||
this.emitter.removeListener(topic, handler && this.handlers.get(handler)); | ||
this.dispatcher.removeListener(topic, handler); | ||
conn = this.servers.filter(function (s) { return s.connected; }); | ||
@@ -370,4 +346,7 @@ return [4 /*yield*/, Promise.all(conn.map(function (s) { return s.unsubscribeAsync(topic); }))]; | ||
}; | ||
Thalamus.prototype.close = function (force) { | ||
this.servers.forEach(function (s) { return s.end(force); }); | ||
}; | ||
return Thalamus; | ||
}(events_1.EventEmitter)); | ||
exports.default = Thalamus; |
@@ -61,39 +61,6 @@ "use strict"; | ||
}; | ||
var __read = (this && this.__read) || function (o, n) { | ||
var m = typeof Symbol === "function" && o[Symbol.iterator]; | ||
if (!m) return o; | ||
var i = m.call(o), r, ar = [], e; | ||
try { | ||
while ((n === void 0 || n-- > 0) && !(r = i.next()).done) ar.push(r.value); | ||
} | ||
catch (error) { e = { error: error }; } | ||
finally { | ||
try { | ||
if (r && !r.done && (m = i["return"])) m.call(i); | ||
} | ||
finally { if (e) throw e.error; } | ||
} | ||
return ar; | ||
}; | ||
var __spreadArray = (this && this.__spreadArray) || function (to, from, pack) { | ||
if (pack || arguments.length === 2) for (var i = 0, l = from.length, ar; i < l; i++) { | ||
if (ar || !(i in from)) { | ||
if (!ar) ar = Array.prototype.slice.call(from, 0, i); | ||
ar[i] = from[i]; | ||
} | ||
} | ||
return to.concat(ar || Array.prototype.slice.call(from)); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.register = exports.call = exports.defaultCallOptions = void 0; | ||
var MsgPack = __importStar(require("@msgpack/msgpack")); | ||
function encodeBase64URL(data) { | ||
var base64 = Buffer | ||
? Buffer.from(data).toString("base64") | ||
: btoa(String.fromCharCode.apply(String, __spreadArray([], __read(data), false))); | ||
return base64 | ||
.replace(/=/g, "") | ||
.replace(/\+/g, "-") | ||
.replace(/\//g, "_"); | ||
} | ||
var js_base64_1 = require("js-base64"); | ||
function generateCallId(size) { | ||
@@ -150,3 +117,3 @@ if (typeof window !== 'undefined') { | ||
id = generateCallId(opt.idSize); | ||
strId = encodeBase64URL(id); | ||
strId = js_base64_1.Base64.fromUint8Array(id, true); | ||
responseTopic = "".concat(topic, "/").concat(strId); | ||
@@ -170,3 +137,3 @@ return [4 /*yield*/, new Promise(function (rsov, rjct) { | ||
client.unsubscribe(responseTopic); | ||
rjct({ message: "pub/sub error", data: err }); | ||
rjct({ message: "pub/sub error: ".concat(err), data: err }); | ||
}); | ||
@@ -203,3 +170,3 @@ })]; | ||
throw Error("Missing id in RPC call"); | ||
strId = encodeBase64URL(id); | ||
strId = js_base64_1.Base64.fromUint8Array(id, true); | ||
if (idDedup.has(strId)) | ||
@@ -206,0 +173,0 @@ throw Error("Duplicate call request"); |
{ | ||
"name": "@akiroz/thalamus", | ||
"version": "0.2.4", | ||
"version": "0.3.0", | ||
"repository": "github:akiroz/Thalamus", | ||
@@ -9,10 +9,19 @@ "main": "dist/index.js", | ||
"prepublish": "tsc", | ||
"test": "ts-node test/index.ts" | ||
"test": "ts-mocha test/*.ts" | ||
}, | ||
"dependencies": { | ||
"@msgpack/msgpack": "^2.7.2", | ||
"events": "^3.3.0", | ||
"js-base64": "^3.7.2", | ||
"mqtt": "^4.3.7" | ||
}, | ||
"devDependencies": { | ||
"@types/mocha": "^9.1.1", | ||
"@types/node": "^16.9.6", | ||
"@types/web": "^0.0.64", | ||
"husky": "^7.0.2", | ||
"mocha": "^10.0.0", | ||
"prettier": "^2.4.1", | ||
"pretty-quick": "^3.1.1", | ||
"ts-mocha": "^10.0.0", | ||
"ts-node": "^10.7.0", | ||
@@ -30,9 +39,3 @@ "typescript": "^4.4.3" | ||
} | ||
}, | ||
"dependencies": { | ||
"@akiroz/mqemitter": "^4.5.1", | ||
"@msgpack/msgpack": "^2.7.2", | ||
"events": "^3.3.0", | ||
"mqtt": "^4.3.7" | ||
} | ||
} |
# Thalamus | ||
[![](https://img.shields.io/npm/v/@akiroz/thalamus)](https://www.npmjs.com/package/@akiroz/thalamus) | ||
![test](https://github.com/akiroz/Thalamus/workflows/test/badge.svg) | ||
@@ -5,0 +6,0 @@ An opinionated application messaging framework based on MQTT. |
@@ -0,4 +1,58 @@ | ||
import "ts-mocha"; | ||
import Thalamus from "../src"; | ||
import Dispatcher, { Node } from "../src/dispatcher"; | ||
import assert from "assert"; | ||
(async () => { | ||
describe("Dispatcher", () => { | ||
const h = () => {}; | ||
const h2 = () => {}; | ||
const h3 = () => {}; | ||
let node = new Node(); | ||
let disp = new Dispatcher(); | ||
beforeEach(() => { | ||
node = new Node(); | ||
disp = new Dispatcher(); | ||
}); | ||
it("add handler", () => { | ||
node.add(["a"], h); | ||
assert.deepStrictEqual([h], node.get(["a"])); | ||
}); | ||
it("add multi-level handler", () => { | ||
node.add(["a", "b", "c"], h); | ||
assert.deepStrictEqual([h], node.get(["a", "b", "c"])); | ||
}); | ||
it("add wildcard handler", () => { | ||
node.add(["+"], h); | ||
assert.deepStrictEqual([h], node.get(["a"])); | ||
}); | ||
it("add multiple handler", () => { | ||
node.add(["a", "b", "c"], h); | ||
node.add(["a", "b", "c"], h2); | ||
assert.deepStrictEqual([h, h2], node.get(["a", "b", "c"])); | ||
}); | ||
it("add multi-level wildcard handler", () => { | ||
node.add(["a", "b", "c"], h); | ||
node.add(["a", "+", "c"], h2); | ||
node.add(["a", "+", "+"], h3); | ||
assert.deepStrictEqual([h, h2, h3], node.get(["a", "b", "c"])); | ||
}); | ||
it("remove handler", () => { | ||
node.add(["a", "b", "c"], h); | ||
node.add(["a", "+", "c"], h2); | ||
node.rm(["a", "b", "c"]); | ||
node.rm(["a", "+", "c"]); | ||
assert.deepStrictEqual([], node.get(["a", "b", "c"])); | ||
}); | ||
}); | ||
describe("Thalamus", () => { | ||
const rpcTopic = "068b883978/a0f0d9ec9293f/351aae431"; | ||
const thalamus = new Thalamus([{ | ||
@@ -10,13 +64,27 @@ host: "broker.hivemq.com", | ||
const rpc = "068b883978a0f0d9ec9293f351aae431"; | ||
thalamus.register(rpc, async (params) => { | ||
console.log("Handler", params); | ||
return params; | ||
before(async () => { | ||
return new Promise(r => thalamus.once("connect", r)); | ||
}); | ||
thalamus.once("connect", async () => { | ||
console.log("Connect"); | ||
console.log("Call", await thalamus.call(rpc, { test: 123 })); | ||
it("register RPC", async () => { | ||
await thalamus.register(rpcTopic, async (params) => { | ||
return params; | ||
}); | ||
}); | ||
})().catch(console.error); | ||
it("call RPC", async () => { | ||
const testData = { | ||
bool: true, | ||
str: "foo", | ||
int: 1, | ||
float: Math.PI, | ||
buf: Buffer.from([1, 2, 3, 4, 5]), | ||
map: { foo: "bar" }, | ||
arr: [true, 1, "a"], | ||
}; | ||
const result = await thalamus.call(rpcTopic, testData); | ||
assert.deepStrictEqual(testData, result); | ||
}); | ||
after(() => thalamus.close(true)); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
43140
13
898
37
10
+ Addedjs-base64@^3.7.2
+ Addedjs-base64@3.7.7(transitive)
- Removed@akiroz/mqemitter@^4.5.1
- Removed@akiroz/mqemitter@4.5.1(transitive)
- Removedfastparallel@2.4.1(transitive)
- Removedqlobber@5.0.3(transitive)
- Removedreusify@1.0.4(transitive)