Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@akiroz/thalamus

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@akiroz/thalamus - npm Package Compare versions

Comparing version 0.2.4 to 0.3.0

.github/workflows/test.yaml

13

dist/index.d.ts

@@ -5,7 +5,4 @@ /// <reference types="node" />

import * as RPC from "./rpc";
declare type SubHandler = (payload: Uint8Array, topic: string) => Promise<void>;
declare type MQHandler = (message: {
topic: string;
message: Uint8Array;
}, done: () => void) => void;
import Dispatcher from "./dispatcher";
declare type SubHandler = (payload: Uint8Array, topic: string) => any;
export default class Thalamus extends EventEmitter {

@@ -15,11 +12,8 @@ subDebounceWindow: number;

topics: Set<string>;
event: EventEmitter;
promise: Promise<void>;
timeout: NodeJS.Timeout;
doSubscribeLock: boolean;
};
emitter: import("@akiroz/mqemitter").MQEmitter;
servers: MQTT.AsyncMqttClient[];
persistantTopics: Set<string>;
handlers: WeakMap<SubHandler, MQHandler>;
dispatcher: Dispatcher;
constructor(serverOptList?: MQTT.IClientOptions[]);

@@ -34,3 +28,4 @@ publish(topic: string, payload: Uint8Array): Promise<void>;

call<P extends RPC.RPCParamResult, R extends RPC.RPCParamResult>(topic: string, params?: P, opt?: Partial<typeof RPC.defaultCallOptions>): Promise<R>;
close(force?: boolean): void;
}
export {};

@@ -117,5 +117,5 @@ "use strict";

var events_1 = require("events");
var mqemitter_1 = __importDefault(require("@akiroz/mqemitter"));
var MQTT = __importStar(require("./asyncWrapper"));
var RPC = __importStar(require("./rpc"));
var dispatcher_1 = __importDefault(require("./dispatcher"));
var Thalamus = /** @class */ (function (_super) {

@@ -128,5 +128,4 @@ __extends(Thalamus, _super);

_this.subDebounceState = null;
_this.emitter = (0, mqemitter_1.default)();
_this.persistantTopics = new Set();
_this.handlers = new WeakMap();
_this.dispatcher = new dispatcher_1.default();
if (serverOptList.length < 1)

@@ -164,3 +163,3 @@ throw Error("No MQTT servers");

this_1.servers[i].on("error", function (err) { return _this.emit("error", err, i); });
this_1.servers[i].on("message", function (topic, message) { return _this.emitter.emit({ topic: topic, message: message }); });
this_1.servers[i].on("message", function (topic, msg) { return _this.dispatcher.emit(topic, msg); });
};

@@ -224,3 +223,3 @@ var this_1 = this;

return __awaiter(this, void 0, void 0, function () {
var conn, err_2;
var conn;
var _this = this;

@@ -231,8 +230,5 @@ return __generator(this, function (_a) {

this.subDebounceState.doSubscribeLock = true;
_a.label = 1;
case 1:
_a.trys.push([1, 3, , 4]);
conn = this.servers.filter(function (s) { return s.connected; });
return [4 /*yield*/, Promise.all(conn.map(function (srv) { return __awaiter(_this, void 0, void 0, function () {
var err_3;
var err_2;
return __generator(this, function (_a) {

@@ -247,6 +243,6 @@ switch (_a.label) {

case 2:
err_3 = _a.sent();
console.warn("[Thalamus] doSubscribe failed, reconnect...", err_3);
err_2 = _a.sent();
console.warn("[Thalamus] doSubscribe failed, reconnect...", err_2);
srv.reconnect();
throw err_3;
throw err_2;
case 3: return [2 /*return*/];

@@ -256,11 +252,4 @@ }

}); }))];
case 2:
case 1:
_a.sent();
this.subDebounceState.event.emit("sub");
return [3 /*break*/, 4];
case 3:
err_2 = _a.sent();
this.subDebounceState.event.emit("err", err_2);
return [3 /*break*/, 4];
case 4:
this.subDebounceState = null;

@@ -275,3 +264,3 @@ return [2 /*return*/];

return __awaiter(this, void 0, void 0, function () {
var h, ee_1, conn;
var conn;
var _this = this;

@@ -281,10 +270,5 @@ return __generator(this, function (_a) {

case 0:
h = function (_a, done) {
var topic = _a.topic, message = _a.message;
return (done(), handler(message, topic));
};
if (!opts.persistent) return [3 /*break*/, 2];
this.persistantTopics.add(topic);
this.handlers.set(handler, h);
this.emitter.on(topic, h);
this.dispatcher.on(topic, handler);
if (this.subDebounceState) {

@@ -294,16 +278,9 @@ if (this.subDebounceState.doSubscribeLock)

this.subDebounceState.topics.add(topic);
clearTimeout(this.subDebounceState.timeout);
this.subDebounceState.timeout = setTimeout(function () { return _this.doSubscribe(); }, this.subDebounceWindow);
}
else {
ee_1 = new events_1.EventEmitter();
this.subDebounceState = {
doSubscribeLock: false,
topics: new Set([topic]),
event: ee_1,
promise: new Promise(function (rsov, rjct) {
ee_1.once("sub", rsov);
ee_1.once("err", rjct);
}),
timeout: setTimeout(function () { return _this.doSubscribe(); }, this.subDebounceWindow),
doSubscribeLock: false,
promise: new Promise(function (r) { return setTimeout(r, _this.subDebounceWindow); })
.then(function () { return _this.doSubscribe(); }),
};

@@ -317,4 +294,3 @@ }

conn = this.servers.filter(function (s) { return s.connected; });
this.handlers.set(handler, h);
this.emitter.on(topic, h);
this.dispatcher.on(topic, handler);
return [4 /*yield*/, Promise.all(conn.map(function (s) { return s.subscribeAsync(topic, { qos: 0 }); }))];

@@ -335,3 +311,3 @@ case 3:

case 0:
this.emitter.removeListener(topic, handler && this.handlers.get(handler));
this.dispatcher.removeListener(topic, handler);
conn = this.servers.filter(function (s) { return s.connected; });

@@ -370,4 +346,7 @@ return [4 /*yield*/, Promise.all(conn.map(function (s) { return s.unsubscribeAsync(topic); }))];

};
Thalamus.prototype.close = function (force) {
this.servers.forEach(function (s) { return s.end(force); });
};
return Thalamus;
}(events_1.EventEmitter));
exports.default = Thalamus;

@@ -61,39 +61,6 @@ "use strict";

};
var __read = (this && this.__read) || function (o, n) {
var m = typeof Symbol === "function" && o[Symbol.iterator];
if (!m) return o;
var i = m.call(o), r, ar = [], e;
try {
while ((n === void 0 || n-- > 0) && !(r = i.next()).done) ar.push(r.value);
}
catch (error) { e = { error: error }; }
finally {
try {
if (r && !r.done && (m = i["return"])) m.call(i);
}
finally { if (e) throw e.error; }
}
return ar;
};
var __spreadArray = (this && this.__spreadArray) || function (to, from, pack) {
if (pack || arguments.length === 2) for (var i = 0, l = from.length, ar; i < l; i++) {
if (ar || !(i in from)) {
if (!ar) ar = Array.prototype.slice.call(from, 0, i);
ar[i] = from[i];
}
}
return to.concat(ar || Array.prototype.slice.call(from));
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.register = exports.call = exports.defaultCallOptions = void 0;
var MsgPack = __importStar(require("@msgpack/msgpack"));
function encodeBase64URL(data) {
var base64 = Buffer
? Buffer.from(data).toString("base64")
: btoa(String.fromCharCode.apply(String, __spreadArray([], __read(data), false)));
return base64
.replace(/=/g, "")
.replace(/\+/g, "-")
.replace(/\//g, "_");
}
var js_base64_1 = require("js-base64");
function generateCallId(size) {

@@ -150,3 +117,3 @@ if (typeof window !== 'undefined') {

id = generateCallId(opt.idSize);
strId = encodeBase64URL(id);
strId = js_base64_1.Base64.fromUint8Array(id, true);
responseTopic = "".concat(topic, "/").concat(strId);

@@ -170,3 +137,3 @@ return [4 /*yield*/, new Promise(function (rsov, rjct) {

client.unsubscribe(responseTopic);
rjct({ message: "pub/sub error", data: err });
rjct({ message: "pub/sub error: ".concat(err), data: err });
});

@@ -203,3 +170,3 @@ })];

throw Error("Missing id in RPC call");
strId = encodeBase64URL(id);
strId = js_base64_1.Base64.fromUint8Array(id, true);
if (idDedup.has(strId))

@@ -206,0 +173,0 @@ throw Error("Duplicate call request");

{
"name": "@akiroz/thalamus",
"version": "0.2.4",
"version": "0.3.0",
"repository": "github:akiroz/Thalamus",

@@ -9,10 +9,19 @@ "main": "dist/index.js",

"prepublish": "tsc",
"test": "ts-node test/index.ts"
"test": "ts-mocha test/*.ts"
},
"dependencies": {
"@msgpack/msgpack": "^2.7.2",
"events": "^3.3.0",
"js-base64": "^3.7.2",
"mqtt": "^4.3.7"
},
"devDependencies": {
"@types/mocha": "^9.1.1",
"@types/node": "^16.9.6",
"@types/web": "^0.0.64",
"husky": "^7.0.2",
"mocha": "^10.0.0",
"prettier": "^2.4.1",
"pretty-quick": "^3.1.1",
"ts-mocha": "^10.0.0",
"ts-node": "^10.7.0",

@@ -30,9 +39,3 @@ "typescript": "^4.4.3"

}
},
"dependencies": {
"@akiroz/mqemitter": "^4.5.1",
"@msgpack/msgpack": "^2.7.2",
"events": "^3.3.0",
"mqtt": "^4.3.7"
}
}
# Thalamus
[![](https://img.shields.io/npm/v/@akiroz/thalamus)](https://www.npmjs.com/package/@akiroz/thalamus)
![test](https://github.com/akiroz/Thalamus/workflows/test/badge.svg)

@@ -5,0 +6,0 @@ An opinionated application messaging framework based on MQTT.

@@ -0,4 +1,58 @@

import "ts-mocha";
import Thalamus from "../src";
import Dispatcher, { Node } from "../src/dispatcher";
import assert from "assert";
(async () => {
describe("Dispatcher", () => {
const h = () => {};
const h2 = () => {};
const h3 = () => {};
let node = new Node();
let disp = new Dispatcher();
beforeEach(() => {
node = new Node();
disp = new Dispatcher();
});
it("add handler", () => {
node.add(["a"], h);
assert.deepStrictEqual([h], node.get(["a"]));
});
it("add multi-level handler", () => {
node.add(["a", "b", "c"], h);
assert.deepStrictEqual([h], node.get(["a", "b", "c"]));
});
it("add wildcard handler", () => {
node.add(["+"], h);
assert.deepStrictEqual([h], node.get(["a"]));
});
it("add multiple handler", () => {
node.add(["a", "b", "c"], h);
node.add(["a", "b", "c"], h2);
assert.deepStrictEqual([h, h2], node.get(["a", "b", "c"]));
});
it("add multi-level wildcard handler", () => {
node.add(["a", "b", "c"], h);
node.add(["a", "+", "c"], h2);
node.add(["a", "+", "+"], h3);
assert.deepStrictEqual([h, h2, h3], node.get(["a", "b", "c"]));
});
it("remove handler", () => {
node.add(["a", "b", "c"], h);
node.add(["a", "+", "c"], h2);
node.rm(["a", "b", "c"]);
node.rm(["a", "+", "c"]);
assert.deepStrictEqual([], node.get(["a", "b", "c"]));
});
});
describe("Thalamus", () => {
const rpcTopic = "068b883978/a0f0d9ec9293f/351aae431";
const thalamus = new Thalamus([{

@@ -10,13 +64,27 @@ host: "broker.hivemq.com",

const rpc = "068b883978a0f0d9ec9293f351aae431";
thalamus.register(rpc, async (params) => {
console.log("Handler", params);
return params;
before(async () => {
return new Promise(r => thalamus.once("connect", r));
});
thalamus.once("connect", async () => {
console.log("Connect");
console.log("Call", await thalamus.call(rpc, { test: 123 }));
it("register RPC", async () => {
await thalamus.register(rpcTopic, async (params) => {
return params;
});
});
})().catch(console.error);
it("call RPC", async () => {
const testData = {
bool: true,
str: "foo",
int: 1,
float: Math.PI,
buf: Buffer.from([1, 2, 3, 4, 5]),
map: { foo: "bar" },
arr: [true, 1, "a"],
};
const result = await thalamus.call(rpcTopic, testData);
assert.deepStrictEqual(testData, result);
});
after(() => thalamus.close(true));
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc