Launch Week Day 5: Introducing Reachability for PHP.Learn More
Socket
Book a DemoSign in
Socket

socket.io-amqp0

Package Overview
Dependencies
Maintainers
1
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

socket.io-amqp0 - npm Package Compare versions

Comparing version
4.0.5
to
5.0.1
+5
-0
dist/index.d.ts

@@ -13,2 +13,4 @@ import { BroadcastOptions, Room, SocketId, Adapter } from 'socket.io-adapter';

instanceName?: string;
exchangeName?: string;
queuePrefix?: string;
shutdownCallbackCallback?: (callback: () => Promise<void>) => void;

@@ -24,2 +26,4 @@ readyCallback?: () => void;

readonly instanceName: string;
readonly exchangeName: string;
readonly queuePrefix: string;
private roomListeners;

@@ -42,2 +46,3 @@ private closed;

delAll(id: string): void;
private publishToRooms;
broadcast(packet: any, opts: BroadcastOptions): Promise<void>;

@@ -44,0 +49,0 @@ sockets(rooms: Set<Room>, callback?: (sockets: Set<SocketId>) => void): Promise<Set<SocketId>>;

+23
-21

@@ -22,3 +22,4 @@ "use strict";

Object.freeze(emptySet);
const defaultRoomName = 'default';
const defaultRoomName = 'broadcast';
const defaultExchangeName = 'socket.io';
const createAdapter = function (opts) {

@@ -36,3 +37,3 @@ const shim = class AmqpAdapterWrapper extends AmqpAdapter {

constructor(nsp, options) {
var _a, _b;
var _a, _b, _c, _d;
super(nsp);

@@ -47,3 +48,5 @@ this.nsp = nsp;

this.instanceName = (_a = options.instanceName) !== null && _a !== void 0 ? _a : os_1.hostname();
(_b = options.shutdownCallbackCallback) === null || _b === void 0 ? void 0 : _b.call(options, async () => {
this.exchangeName = (_b = options.exchangeName) !== null && _b !== void 0 ? _b : defaultExchangeName;
this.queuePrefix = (_c = options.queuePrefix) !== null && _c !== void 0 ? _c : defaultExchangeName;
(_d = options.shutdownCallbackCallback) === null || _d === void 0 ? void 0 : _d.call(options, async () => {
await Promise.all(util_1.mapIter(this.roomListeners.values(), (unsub) => unsub()));

@@ -94,3 +97,3 @@ });

async createQueueForRoom(room) {
const queueName = `${this.instanceName}${room ? `#${room}` : ''}`;
const queueName = `${this.queuePrefix}#${this.instanceName}${room ? `#${room}` : ''}`;
await this.consumeChannel.assertQueue(queueName, {

@@ -106,5 +109,4 @@ autoDelete: true,

async createRoomExchangeAndQueue(room) {
const exchangeName = room !== null && room !== void 0 ? room : defaultRoomName;
const [, queueName] = await Promise.all([
this.publishChannel.assertExchange(exchangeName, 'fanout', {
this.publishChannel.assertExchange(this.exchangeName, 'direct', {
autoDelete: true,

@@ -115,3 +117,3 @@ durable: false,

]);
await this.consumeChannel.bindQueue(queueName, exchangeName, '*');
await this.consumeChannel.bindQueue(queueName, this.exchangeName, room !== null && room !== void 0 ? room : defaultRoomName);
return queueName;

@@ -205,2 +207,8 @@ }

}
async publishToRooms(rooms, envelope) {
debug('Publishing message for rooms', rooms, envelope);
const routeKeys = rooms.map((room) => room !== null && room !== void 0 ? room : defaultRoomName);
const buffer = Buffer.from(JSON.stringify(envelope));
await util_2.promisify(this.publishChannel.publish).bind(this.publishChannel)(this.exchangeName, routeKeys[0], buffer, { ...(routeKeys.length > 1 ? { CC: routeKeys.slice(1) } : {}) });
}
async broadcast(packet, opts) {

@@ -217,18 +225,12 @@ var _a;

const rooms = opts.rooms && opts.rooms.size ? opts.rooms : nullSet;
const nonlocalRooms = [...util_1.filterIter(rooms, (room) => !this.localRouting.has(room))];
await Promise.all([
...util_1.mapIter(rooms, async (room) => {
if (this.localRouting.has(room)) {
await this.broadcast(packet, {
...opts,
rooms: new Set([room]),
flags: { ...opts.flags, local: true },
});
}
else {
const exchangeName = room !== null && room !== void 0 ? room : defaultRoomName;
debug('Publishing message for room', room, envelope);
const buffer = Buffer.from(JSON.stringify(envelope));
await util_2.promisify(this.publishChannel.publish).bind(this.publishChannel)(exchangeName, '*', buffer, {});
}
...util_1.mapIter(util_1.filterIter(rooms, (room) => this.localRouting.has(room)), async (room) => {
await this.broadcast(packet, {
...opts,
rooms: new Set([room]),
flags: { ...opts.flags, local: true },
});
}),
this.publishToRooms(nonlocalRooms, envelope),
]);

@@ -235,0 +237,0 @@ }

@@ -1,1 +0,1 @@

{"version":3,"file":"index.js","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":";;;;AAAA,+BAA+B;AAC/B,yDAA8E;AAG9E,0DAAiC;AAEjC,2BAAiD;AACjD,iCAA+C;AAC/C,+BAAiC;AAEjC,MAAM,KAAK,GAAG,eAAY,CAAC,gBAAgB,CAAC,CAAC;AAE7C,IAAY,cAIX;AAJD,WAAY,cAAc;IACtB,mCAAiB,CAAA;IACjB,iCAAe,CAAA;IACf,mCAAiB,CAAA;AACrB,CAAC,EAJW,cAAc,GAAd,sBAAc,KAAd,sBAAc,QAIzB;AAgBD,MAAM,OAAO,GAAG,IAAI,GAAG,CAAO,CAAC,IAAI,CAAC,CAAC,CAAC;AACtC,MAAM,CAAC,MAAM,CAAC,OAAO,CAAC,CAAC;AAEvB,MAAM,QAAQ,GAAG,IAAI,GAAG,CAAM,EAAE,CAAC,CAAC;AAClC,MAAM,CAAC,MAAM,CAAC,QAAQ,CAAC,CAAC;AAExB,MAAM,eAAe,GAAG,SAAS,CAAC;AAE3B,MAAM,aAAa,GAAG,UAAU,IAAwB;IAC3D,MAAM,IAAI,GAAG,MAAM,kBAAmB,SAAQ,WAAW;QACrD,YAAY,GAAc;YACtB,KAAK,CAAC,GAAG,EAAE,IAAI,CAAC,CAAC;QACrB,CAAC;KACJ,CAAC;IACF,mCAAmC;IACnC,OAAO,IAAI,CAAC;AAChB,CAAC,CAAC;AARW,QAAA,aAAa,iBAQxB;AAEF,MAAa,WAAY,SAAQ,2BAAO;IAUpC,YAA4B,GAAc,EAAU,OAA2B;;QAC3E,KAAK,CAAC,GAAG,CAAC,CAAC;QADa,QAAG,GAAH,GAAG,CAAW;QAAU,YAAO,GAAP,OAAO,CAAoB;QATtE,UAAK,GAA6B,IAAI,GAAG,EAAE,CAAC;QAC5C,SAAI,GAA6B,IAAI,GAAG,EAAE,CAAC;QAE5C,kBAAa,GAA0C,IAAI,GAAG,EAAE,CAAC;QACjE,WAAM,GAAG,KAAK,CAAC;QAgEf,iBAAY,GAAgB,IAAI,GAAG,EAAE,CAAC;QAzD1C,IAAI,CAAC,YAAY,GAAG,MAAA,OAAO,CAAC,YAAY,mCAAI,aAAQ,EAAE,CAAC;QAEvD,MAAA,OAAO,CAAC,wBAAwB,+CAAhC,OAAO,EAA4B,KAAK,IAAI,EAAE;YAC1C,MAAM,OAAO,CAAC,GAAG,CAAC,cAAO,CAAC,IAAI,CAAC,aAAa,CAAC,MAAM,EAAE,EAAE,CAAC,KAAK,EAAE,EAAE,CAAC,KAAK,EAAE,CAAC,CAAC,CAAC;QAChF,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,IAAI,EAAE,CAAC,CAAC,4CAA4C;IAC7D,CAAC;IAED,KAAK,CAAC,gBAAgB,CAAC,IAAgB;QACnC,IAAI,CAAC,EAAE,CAAC,OAAO,EAAE,KAAK,IAAI,EAAE;YACxB,IAAI,IAAI,CAAC,MAAM;gBAAE,OAAO;YACxB,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,cAAc,EAAE,CAAC;YACjD,IAAI,CAAC,gBAAgB,CAAC,IAAI,CAAC,CAAC;QAChC,CAAC,CAAC,CAAC;QAEH,IAAI,CAAC,EAAE,CAAC,OAAO,EAAE,CAAC,GAAG,EAAE,EAAE;YACrB,KAAK,CAAC,sBAAsB,EAAE,GAAG,CAAC,CAAC;QACvC,CAAC,CAAC,CAAC;QAEH,MAAM,CAAC,cAAc,EAAE,cAAc,CAAC,GAAG,MAAM,OAAO,CAAC,GAAG,CAAC,CAAC,IAAI,CAAC,aAAa,EAAE,EAAE,IAAI,CAAC,oBAAoB,EAAE,CAAC,CAAC,CAAC;QAEhH,IAAI,CAAC,cAAc,GAAG,cAAc,CAAC;QACrC,IAAI,CAAC,cAAc,GAAG,cAAc,CAAC;QAErC,MAAM,QAAQ,GAAmB,EAAE,CAAC;QACpC,KAAK,MAAM,CAAC,IAAI,EAAE,QAAQ,CAAC,IAAI,IAAI,CAAC,aAAa,EAAE;YAC/C,QAAQ,CAAC,IAAI,CAAC,QAAQ,EAAE,CAAC,CAAC;YAC1B,QAAQ,CAAC,IAAI,CAAC,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,CAAC,CAAC;SACvC;IACL,CAAC;IAED,KAAK,CAAC,IAAI;;QACN,KAAK,CAAC,YAAY,CAAC,CAAC;QAEpB,MAAM,UAAU,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,cAAc,EAAE,CAAC;QACvD,MAAM,IAAI,CAAC,gBAAgB,CAAC,UAAU,CAAC,CAAC;QAExC,+BAA+B;QAC/B,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,0BAA0B,CAAC,IAAI,CAAC,CAAC;QAC9D,MAAM,KAAK,GAAG,IAAI,CAAC,kBAAkB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;QACvD,IAAI,CAAC,aAAa,CAAC,GAAG,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;QAEpC,KAAK,CAAC,UAAU,CAAC,CAAC;QAClB,MAAA,MAAA,IAAI,CAAC,OAAO,EAAC,aAAa,kDAAI,CAAC;IACnC,CAAC;IAED,KAAK,CAAC,KAAK;QACP,IAAI,CAAC,MAAM,GAAG,IAAI,CAAC;QACnB,MAAM,OAAO,CAAC,GAAG,CAAC,cAAO,CAAC,IAAI,CAAC,aAAa,CAAC,MAAM,EAAE,EAAE,CAAC,KAAK,EAAE,EAAE,CAAC,KAAK,EAAE,CAAC,CAAC,CAAC;IAChF,CAAC;IAEO,KAAK,CAAC,SAAS,CAAC,IAAmB;QACvC,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,0BAA0B,CAAC,IAAI,CAAC,CAAC;QAC9D,MAAM,KAAK,GAAG,IAAI,CAAC,kBAAkB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;QACvD,IAAI,CAAC,aAAa,CAAC,GAAG,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;IACxC,CAAC;IAIO,KAAK,CAAC,kBAAkB,CAAC,IAAmB;QAChD,MAAM,SAAS,GAAG,GAAG,IAAI,CAAC,YAAY,GAAG,IAAI,CAAC,CAAC,CAAC,IAAI,IAAI,EAAE,CAAC,CAAC,CAAC,EAAE,EAAE,CAAC;QAClE,MAAM,IAAI,CAAC,cAAc,CAAC,WAAW,CAAC,SAAS,EAAE;YAC7C,UAAU,EAAE,IAAI;YAChB,OAAO,EAAE,KAAK;YACd,SAAS,EAAE;gBACP,WAAW,EAAE,IAAI,GAAG,EAAE;aACzB;SACJ,CAAC,CAAC;QACH,OAAO,SAAS,CAAC;IACrB,CAAC;IAEO,KAAK,CAAC,0BAA0B,CAAC,IAAmB;QACxD,MAAM,YAAY,GAAG,IAAI,aAAJ,IAAI,cAAJ,IAAI,GAAI,eAAe,CAAC;QAC7C,MAAM,CAAC,EAAE,SAAS,CAAC,GAAG,MAAM,OAAO,CAAC,GAAG,CAAC;YACpC,IAAI,CAAC,cAAc,CAAC,cAAc,CAAC,YAAY,EAAE,QAAQ,EAAE;gBACvD,UAAU,EAAE,IAAI;gBAChB,OAAO,EAAE,KAAK;aACjB,CAAC;YACF,IAAI,CAAC,kBAAkB,CAAC,IAAI,CAAC;SAChC,CAAC,CAAC;QAEH,MAAM,IAAI,CAAC,cAAc,CAAC,SAAS,CAAC,SAAS,EAAE,YAAY,EAAE,GAAG,CAAC,CAAC;QAClE,OAAO,SAAS,CAAC;IACrB,CAAC;IAEO,KAAK,CAAC,aAAa,CAAC,QAAkB,EAAE,IAAmB;QAC/D,MAAM,IAAI,GAAG,IAAI,CAAC,CAAC,CAAC,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,CAAC,CAAC,CAAC,IAAI,CAAC,GAAG,CAAC,OAAO,CAAC,IAAI,EAAE,CAAC;QACpE,MAAM,OAAO,GAAG,IAAI,GAAG,CAAC,QAAQ,CAAC,MAAM,CAAC,CAAC;QACzC,MAAM,MAAM,GAAG,QAAQ,CAAC,MAAM,CAAC;QAC/B,KAAK,MAAM,GAAG,IAAI,IAAI,EAAE;YACpB,IAAI,OAAO,CAAC,GAAG,CAAC,GAAG,CAAC;gBAAE,SAAS;YAE/B,KAAK,CAAC,SAAS,CAAC,MAAM,EAAE,EAAE,KAAK,EAAE,IAAI,CAAC,CAAC,CAAC,IAAI,GAAG,CAAC,CAAC,IAAI,CAAC,CAAC,CAAC,CAAC,CAAC,QAAQ,EAAE,CAAC,CAAC;SACzE;IACL,CAAC;IAEO,kBAAkB,CAAC,IAAmB,EAAE,SAAiB;QAC7D,KAAK,CAAC,4BAA4B,EAAE,IAAI,CAAC,CAAC;QAC1C,IAAI,WAAW,GAAG,mBAAY,EAAE,CAAC;QAEjC,IAAI,CAAC,cAAc;aACd,OAAO,CACJ,SAAS,EACT,KAAK,EAAE,GAAG,EAAE,EAAE;YACV,IAAI,CAAC,GAAG;gBAAE,OAAO;YACjB,MAAM,OAAO,GAAG,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,OAAO,CAAC,QAAQ,CAAC,MAAM,CAAC,CAAC,CAAC;YACzD,MAAM,IAAI,CAAC,aAAa,CAAC,OAAO,EAAE,IAAI,CAAC,CAAC;YACxC,IAAI,CAAC,cAAc,CAAC,GAAG,CAAC,GAAG,EAAE,KAAK,CAAC,CAAC;QACxC,CAAC,EACD;YACI,KAAK,EAAE,KAAK;YACZ,WAAW;SACd,CACJ;aACA,IAAI,CAAC,CAAC,CAAC,EAAE,EAAE,CAAC,CAAC,WAAW,GAAG,CAAC,CAAC,WAAW,CAAC,CAAC,CAAC;QAEhD,OAAO,KAAK,IAAI,EAAE;YACd,KAAK,CAAC,6BAA6B,EAAE,IAAI,CAAC,CAAC;YAC3C,MAAM,IAAI,CAAC,cAAc,CAAC,MAAM,CAAC,WAAW,CAAC,CAAC;QAClD,CAAC,CAAC;IACN,CAAC;IAED,KAAK,CAAC,MAAM,CAAC,EAAU,EAAE,KAAkB;QACvC,8CAA8C;QAC9C,KAAK,CAAC,QAAQ,EAAE,GAAG,SAAS,CAAC,CAAC;QAE9B,MAAM,QAAQ,GAAG,IAAI,GAAG,EAAU,CAAC;QACnC,KAAK,MAAM,IAAI,IAAI,KAAK,EAAE;YACtB,IAAI,IAAI,KAAK,EAAE,EAAE;gBACb,IAAI,IAAI,CAAC,OAAO,CAAC,cAAc,KAAK,cAAc,CAAC,MAAM;oBAAE,SAAS;gBACpE,IAAI,IAAI,CAAC,OAAO,CAAC,cAAc,KAAK,cAAc,CAAC,KAAK,EAAE;oBACtD,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC;oBAC5B,SAAS;iBACZ;aACJ;YACD,IAAI,CAAC,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;gBACpB,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,EAAE,IAAI,GAAG,EAAE,CAAC,CAAC;aAChC;YACD,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAE,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC;YAE7B,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAC,EAAE;gBACvB,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,EAAE,IAAI,GAAG,EAAE,CAAC,CAAC;gBAChC,QAAQ,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC;aACtB;YACD,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,CAAC,GAAG,CAAC,EAAE,CAAC,CAAC;SACjC;QAED,MAAM,OAAO,CAAC,GAAG,CAAC;YACd,GAAG,cAAO,CAAC,QAAQ,EAAE,KAAK,EAAE,IAAI,EAAE,EAAE;gBAChC,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,0BAA0B,CAAC,IAAI,CAAC,CAAC;gBAC9D,MAAM,KAAK,GAAG,IAAI,CAAC,kBAAkB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;gBACvD,IAAI,CAAC,aAAa,CAAC,GAAG,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;YACxC,CAAC,CAAC;SACL,CAAC,CAAC;IACP,CAAC;IAED,GAAG,CAAC,EAAU,EAAE,IAAY;;QACxB,IAAI,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;YACnB,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAE,CAAC,MAAM,CAAC,IAAI,CAAC,CAAC;SACnC;QAED,IAAI,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAC,EAAE;YACtB,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,CAAC,MAAM,CAAC,EAAE,CAAC,CAAC;YACjC,IAAI,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,CAAC,IAAI,KAAK,CAAC,EAAE;gBAClC,IAAI,CAAC,KAAK,CAAC,MAAM,CAAC,IAAI,CAAC,CAAC;gBAExB,8BAA8B;gBAC9B,MAAA,IAAI,CAAC,aAAa,CAAC,GAAG,CAAC,IAAI,CAAC,2CAAI,CAAC;gBACjC,IAAI,CAAC,aAAa,CAAC,MAAM,CAAC,IAAI,CAAC,CAAC;aACnC;SACJ;IACL,CAAC;IACD,MAAM,CAAC,EAAU;QACb,IAAI,CAAC,YAAY,CAAC,MAAM,CAAC,EAAE,CAAC,CAAC;QAE7B,IAAI,CAAC,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;YACpB,OAAO;SACV;QAED,KAAK,MAAM,IAAI,IAAI,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAE,EAAE;YACnC,IAAI,CAAC,GAAG,CAAC,EAAE,EAAE,IAAI,CAAC,CAAC,CAAC,wCAAwC;SAC/D;QAED,IAAI,CAAC,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,CAAC;IACzB,CAAC;IAED,KAAK,CAAC,SAAS,CAAC,MAAW,EAAE,IAAsB;;QAC/C,KAAK,CAAC,WAAW,EAAE,MAAM,EAAE,IAAI,CAAC,CAAC;QACjC,IAAI,MAAA,IAAI,CAAC,KAAK,0CAAE,KAAK,EAAE;YACnB,OAAO,KAAK,CAAC,SAAS,CAAC,MAAM,EAAE,IAAI,CAAC,CAAC;SACxC;QACD,MAAM,QAAQ,GAAa;YACvB,MAAM;YACN,MAAM,EAAE,IAAI,CAAC,MAAM,IAAI,CAAC,GAAG,IAAI,CAAC,MAAM,CAAC;SAC1C,CAAC;QACF,MAAM,KAAK,GAAG,IAAI,CAAC,KAAK,IAAI,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,CAAC,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC,CAAC,OAAO,CAAC;QACnE,MAAM,OAAO,CAAC,GAAG,CAAC;YACd,GAAG,cAAO,CAAC,KAAK,EAAE,KAAK,EAAE,IAAI,EAAE,EAAE;gBAC7B,IAAI,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAK,CAAC,EAAE;oBAC9B,MAAM,IAAI,CAAC,SAAS,CAAC,MAAM,EAAE;wBACzB,GAAG,IAAI;wBACP,KAAK,EAAE,IAAI,GAAG,CAAC,CAAC,IAAK,CAAC,CAAC;wBACvB,KAAK,EAAE,EAAE,GAAG,IAAI,CAAC,KAAK,EAAE,KAAK,EAAE,IAAI,EAAE;qBACxC,CAAC,CAAC;iBACN;qBAAM;oBACH,MAAM,YAAY,GAAG,IAAI,aAAJ,IAAI,cAAJ,IAAI,GAAI,eAAe,CAAC;oBAC7C,KAAK,CAAC,6BAA6B,EAAE,IAAI,EAAE,QAAQ,CAAC,CAAC;oBAErD,MAAM,MAAM,GAAG,MAAM,CAAC,IAAI,CAAC,IAAI,CAAC,SAAS,CAAC,QAAQ,CAAC,CAAC,CAAC;oBACrD,MAAM,gBAAS,CAAC,IAAI,CAAC,cAAc,CAAC,OAAO,CAAC,CAAC,IAAI,CAAC,IAAI,CAAC,cAAc,CAAC,CAClE,YAAY,EACZ,GAAG,EACH,MAAM,EACN,EAAE,CACL,CAAC;iBACL;YACL,CAAC,CAAC;SACL,CAAC,CAAC;IACP,CAAC;IACD,OAAO,CAAC,KAAgB,EAAE,QAA2C;QACjE,MAAM,IAAI,GAAG,IAAI,GAAG,EAAY,CAAC;QAEjC,IAAI,KAAK,CAAC,IAAI,EAAE;YACZ,KAAK,MAAM,IAAI,IAAI,KAAK,EAAE;gBACtB,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAC;oBAAE,SAAS;gBAEpC,KAAK,MAAM,EAAE,IAAI,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,EAAE;oBACpC,IAAI,EAAE,IAAI,IAAI,CAAC,GAAG,CAAC,OAAO,EAAE;wBACxB,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAC;qBAChB;iBACJ;aACJ;SACJ;aAAM;YACH,KAAK,MAAM,CAAC,EAAE,CAAC,IAAI,IAAI,CAAC,IAAI,EAAE;gBAC1B,IAAI,EAAE,IAAI,IAAI,CAAC,GAAG,CAAC,OAAO;oBAAE,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAC;aAC5C;SACJ;QAED,QAAQ,aAAR,QAAQ,uBAAR,QAAQ,CAAG,IAAI,CAAC,CAAC;QACjB,OAAO,OAAO,CAAC,OAAO,CAAC,IAAI,CAAC,CAAC;IACjC,CAAC;IAED,WAAW,CAAC,EAAU;QAClB,OAAO,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAC;IAC7B,CAAC;CACJ;AAjQD,kCAiQC","sourcesContent":["/* eslint-disable no-console */\nimport { BroadcastOptions, Room, SocketId, Adapter } from 'socket.io-adapter';\nimport { Namespace, Socket } from 'socket.io';\nimport { EventEmitter } from 'events';\nimport debugFactory from 'debug';\nimport { Channel, ConfirmChannel, Connection } from 'amqplib';\nimport { hostname, networkInterfaces } from 'os';\nimport { randomString, mapIter } from './util';\nimport { promisify } from 'util';\n\nconst debug = debugFactory('socket.io-amqp');\n\nexport enum SidRoomRouting {\n normal = 'normal',\n local = 'local',\n banned = 'banned',\n}\n\nexport interface AmqpAdapterOptions {\n amqpConnection: () => Promise<Connection> | Connection;\n sidRoomRouting?: SidRoomRouting;\n instanceName?: string;\n\n shutdownCallbackCallback?: (callback: () => Promise<void>) => void;\n readyCallback?: () => void;\n}\n\ninterface Envelope {\n packet: any;\n except?: SocketId[];\n}\n\nconst nullSet = new Set<null>([null]);\nObject.freeze(nullSet);\n\nconst emptySet = new Set<any>([]);\nObject.freeze(emptySet);\n\nconst defaultRoomName = 'default';\n\nexport const createAdapter = function (opts: AmqpAdapterOptions): typeof AmqpAdapter {\n const shim = class AmqpAdapterWrapper extends AmqpAdapter {\n constructor(nsp: Namespace) {\n super(nsp, opts);\n }\n };\n // shim.name = AmqpAdapter.name;\n return shim;\n};\n\nexport class AmqpAdapter extends Adapter {\n readonly rooms: Map<Room, Set<SocketId>> = new Map();\n readonly sids: Map<SocketId, Set<Room>> = new Map();\n readonly instanceName: string;\n private roomListeners: Map<Room | null, () => Promise<void>> = new Map();\n private closed = false;\n\n private consumeChannel!: Channel;\n private publishChannel!: ConfirmChannel;\n\n constructor(public readonly nsp: Namespace, private options: AmqpAdapterOptions) {\n super(nsp);\n this.instanceName = options.instanceName ?? hostname();\n\n options.shutdownCallbackCallback?.(async () => {\n await Promise.all(mapIter(this.roomListeners.values(), (unsub) => unsub()));\n });\n this.init(); // hack until issue in socket.io is resolved\n }\n\n async handleConnection(conn: Connection) {\n conn.on('close', async () => {\n if (this.closed) return;\n const conn = await this.options.amqpConnection();\n this.handleConnection(conn);\n });\n\n conn.on('error', (err) => {\n debug('Got connection error', err);\n });\n\n const [consumeChannel, publishChannel] = await Promise.all([conn.createChannel(), conn.createConfirmChannel()]);\n\n this.consumeChannel = consumeChannel;\n this.publishChannel = publishChannel;\n\n const promises: Promise<any>[] = [];\n for (const [room, shutdown] of this.roomListeners) {\n promises.push(shutdown());\n promises.push(this.setupRoom(room));\n }\n }\n\n async init(): Promise<void> {\n debug('start init');\n\n const connection = await this.options.amqpConnection();\n await this.handleConnection(connection);\n\n // set up the default broadcast\n const queueName = await this.createRoomExchangeAndQueue(null);\n const unsub = this.createRoomListener(null, queueName);\n this.roomListeners.set(null, unsub);\n\n debug('end init');\n this.options.readyCallback?.();\n }\n\n async close(): Promise<void> {\n this.closed = true;\n await Promise.all(mapIter(this.roomListeners.values(), (unsub) => unsub()));\n }\n\n private async setupRoom(room: string | null): Promise<void> {\n const queueName = await this.createRoomExchangeAndQueue(room);\n const unsub = this.createRoomListener(room, queueName);\n this.roomListeners.set(room, unsub);\n }\n\n private localRouting: Set<string> = new Set();\n\n private async createQueueForRoom(room: string | null): Promise<string> {\n const queueName = `${this.instanceName}${room ? `#${room}` : ''}`;\n await this.consumeChannel.assertQueue(queueName, {\n autoDelete: true,\n durable: false,\n arguments: {\n 'x-expires': 1000 * 60,\n },\n });\n return queueName;\n }\n\n private async createRoomExchangeAndQueue(room: string | null): Promise<string> {\n const exchangeName = room ?? defaultRoomName;\n const [, queueName] = await Promise.all([\n this.publishChannel.assertExchange(exchangeName, 'fanout', {\n autoDelete: true,\n durable: false,\n }),\n this.createQueueForRoom(room),\n ]);\n\n await this.consumeChannel.bindQueue(queueName, exchangeName, '*');\n return queueName;\n }\n\n private async handleMessage(envelope: Envelope, room: string | null) {\n const sids = room ? this.rooms.get(room)! : this.nsp.sockets.keys();\n const excepts = new Set(envelope.except);\n const packet = envelope.packet;\n for (const sid of sids) {\n if (excepts.has(sid)) continue;\n\n super.broadcast(packet, { rooms: room ? new Set([room]) : emptySet });\n }\n }\n\n private createRoomListener(room: string | null, queueName: string): () => Promise<void> {\n debug('Starting room listener for', room);\n let consumerTag = randomString();\n\n this.consumeChannel\n .consume(\n queueName,\n async (msg) => {\n if (!msg) return;\n const payload = JSON.parse(msg.content.toString('utf8'));\n await this.handleMessage(payload, room);\n this.consumeChannel.ack(msg, false);\n },\n {\n noAck: false, // require manual ack\n consumerTag,\n },\n )\n .then((x) => (consumerTag = x.consumerTag));\n\n return async () => {\n debug('Canceling room listener for', room);\n await this.consumeChannel.cancel(consumerTag);\n };\n }\n\n async addAll(id: string, rooms: Set<string>): Promise<void> {\n // eslint-disable-next-line prefer-rest-params\n debug('addAll', ...arguments);\n\n const newRooms = new Set<string>();\n for (const room of rooms) {\n if (room === id) {\n if (this.options.sidRoomRouting === SidRoomRouting.banned) continue;\n if (this.options.sidRoomRouting === SidRoomRouting.local) {\n this.localRouting.add(room);\n continue;\n }\n }\n if (!this.sids.has(id)) {\n this.sids.set(id, new Set());\n }\n this.sids.get(id)!.add(room);\n\n if (!this.rooms.has(room)) {\n this.rooms.set(room, new Set());\n newRooms.add(room);\n }\n this.rooms.get(room)!.add(id);\n }\n\n await Promise.all([\n ...mapIter(newRooms, async (room) => {\n const queueName = await this.createRoomExchangeAndQueue(room);\n const unsub = this.createRoomListener(room, queueName);\n this.roomListeners.set(room, unsub);\n }),\n ]);\n }\n\n del(id: string, room: string): void {\n if (this.sids.has(id)) {\n this.sids.get(id)!.delete(room);\n }\n\n if (this.rooms.has(room)) {\n this.rooms.get(room)!.delete(id);\n if (this.rooms.get(room)!.size === 0) {\n this.rooms.delete(room);\n\n // tear down the room listener\n this.roomListeners.get(room)?.();\n this.roomListeners.delete(room);\n }\n }\n }\n delAll(id: string): void {\n this.localRouting.delete(id);\n\n if (!this.sids.has(id)) {\n return;\n }\n\n for (const room of this.sids.get(id)!) {\n this.del(id, room); // todo: probably wrap this via promises\n }\n\n this.sids.delete(id);\n }\n\n async broadcast(packet: any, opts: BroadcastOptions): Promise<void> {\n debug('broadcast', packet, opts);\n if (opts.flags?.local) {\n return super.broadcast(packet, opts);\n }\n const envelope: Envelope = {\n packet,\n except: opts.except && [...opts.except],\n };\n const rooms = opts.rooms && opts.rooms.size ? opts.rooms : nullSet;\n await Promise.all([\n ...mapIter(rooms, async (room) => {\n if (this.localRouting.has(room!)) {\n await this.broadcast(packet, {\n ...opts,\n rooms: new Set([room!]),\n flags: { ...opts.flags, local: true },\n });\n } else {\n const exchangeName = room ?? defaultRoomName;\n debug('Publishing message for room', room, envelope);\n\n const buffer = Buffer.from(JSON.stringify(envelope));\n await promisify(this.publishChannel.publish).bind(this.publishChannel)(\n exchangeName,\n '*',\n buffer,\n {},\n );\n }\n }),\n ]);\n }\n sockets(rooms: Set<Room>, callback?: (sockets: Set<SocketId>) => void): Promise<Set<SocketId>> {\n const sids = new Set<SocketId>();\n\n if (rooms.size) {\n for (const room of rooms) {\n if (!this.rooms.has(room)) continue;\n\n for (const id of this.rooms.get(room)!) {\n if (id in this.nsp.sockets) {\n sids.add(id);\n }\n }\n }\n } else {\n for (const [id] of this.sids) {\n if (id in this.nsp.sockets) sids.add(id);\n }\n }\n\n callback?.(sids);\n return Promise.resolve(sids);\n }\n\n socketRooms(id: string): Set<Room> | undefined {\n return this.sids.get(id);\n }\n}\n"]}
{"version":3,"file":"index.js","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":";;;;AAAA,+BAA+B;AAC/B,yDAA8E;AAG9E,0DAAiC;AAEjC,2BAAiD;AACjD,iCAA2D;AAC3D,+BAAiC;AAEjC,MAAM,KAAK,GAAG,eAAY,CAAC,gBAAgB,CAAC,CAAC;AAE7C,IAAY,cAIX;AAJD,WAAY,cAAc;IACtB,mCAAiB,CAAA;IACjB,iCAAe,CAAA;IACf,mCAAiB,CAAA;AACrB,CAAC,EAJW,cAAc,GAAd,sBAAc,KAAd,sBAAc,QAIzB;AAkBD,MAAM,OAAO,GAAG,IAAI,GAAG,CAAO,CAAC,IAAI,CAAC,CAAC,CAAC;AACtC,MAAM,CAAC,MAAM,CAAC,OAAO,CAAC,CAAC;AAEvB,MAAM,QAAQ,GAAG,IAAI,GAAG,CAAM,EAAE,CAAC,CAAC;AAClC,MAAM,CAAC,MAAM,CAAC,QAAQ,CAAC,CAAC;AAExB,MAAM,eAAe,GAAG,WAAW,CAAC;AACpC,MAAM,mBAAmB,GAAG,WAAW,CAAC;AAEjC,MAAM,aAAa,GAAG,UAAU,IAAwB;IAC3D,MAAM,IAAI,GAAG,MAAM,kBAAmB,SAAQ,WAAW;QACrD,YAAY,GAAc;YACtB,KAAK,CAAC,GAAG,EAAE,IAAI,CAAC,CAAC;QACrB,CAAC;KACJ,CAAC;IACF,mCAAmC;IACnC,OAAO,IAAI,CAAC;AAChB,CAAC,CAAC;AARW,QAAA,aAAa,iBAQxB;AAEF,MAAa,WAAY,SAAQ,2BAAO;IAYpC,YAA4B,GAAc,EAAU,OAA2B;;QAC3E,KAAK,CAAC,GAAG,CAAC,CAAC;QADa,QAAG,GAAH,GAAG,CAAW;QAAU,YAAO,GAAP,OAAO,CAAoB;QAXtE,UAAK,GAA6B,IAAI,GAAG,EAAE,CAAC;QAC5C,SAAI,GAA6B,IAAI,GAAG,EAAE,CAAC;QAI5C,kBAAa,GAA0C,IAAI,GAAG,EAAE,CAAC;QACjE,WAAM,GAAG,KAAK,CAAC;QAkEf,iBAAY,GAAgB,IAAI,GAAG,EAAE,CAAC;QA3D1C,IAAI,CAAC,YAAY,GAAG,MAAA,OAAO,CAAC,YAAY,mCAAI,aAAQ,EAAE,CAAC;QACvD,IAAI,CAAC,YAAY,GAAG,MAAA,OAAO,CAAC,YAAY,mCAAI,mBAAmB,CAAC;QAChE,IAAI,CAAC,WAAW,GAAG,MAAA,OAAO,CAAC,WAAW,mCAAI,mBAAmB,CAAC;QAE9D,MAAA,OAAO,CAAC,wBAAwB,+CAAhC,OAAO,EAA4B,KAAK,IAAI,EAAE;YAC1C,MAAM,OAAO,CAAC,GAAG,CAAC,cAAO,CAAC,IAAI,CAAC,aAAa,CAAC,MAAM,EAAE,EAAE,CAAC,KAAK,EAAE,EAAE,CAAC,KAAK,EAAE,CAAC,CAAC,CAAC;QAChF,CAAC,CAAC,CAAC;QACH,IAAI,CAAC,IAAI,EAAE,CAAC,CAAC,4CAA4C;IAC7D,CAAC;IAED,KAAK,CAAC,gBAAgB,CAAC,IAAgB;QACnC,IAAI,CAAC,EAAE,CAAC,OAAO,EAAE,KAAK,IAAI,EAAE;YACxB,IAAI,IAAI,CAAC,MAAM;gBAAE,OAAO;YACxB,MAAM,IAAI,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,cAAc,EAAE,CAAC;YACjD,IAAI,CAAC,gBAAgB,CAAC,IAAI,CAAC,CAAC;QAChC,CAAC,CAAC,CAAC;QAEH,IAAI,CAAC,EAAE,CAAC,OAAO,EAAE,CAAC,GAAG,EAAE,EAAE;YACrB,KAAK,CAAC,sBAAsB,EAAE,GAAG,CAAC,CAAC;QACvC,CAAC,CAAC,CAAC;QAEH,MAAM,CAAC,cAAc,EAAE,cAAc,CAAC,GAAG,MAAM,OAAO,CAAC,GAAG,CAAC,CAAC,IAAI,CAAC,aAAa,EAAE,EAAE,IAAI,CAAC,oBAAoB,EAAE,CAAC,CAAC,CAAC;QAEhH,IAAI,CAAC,cAAc,GAAG,cAAc,CAAC;QACrC,IAAI,CAAC,cAAc,GAAG,cAAc,CAAC;QAErC,MAAM,QAAQ,GAAmB,EAAE,CAAC;QACpC,KAAK,MAAM,CAAC,IAAI,EAAE,QAAQ,CAAC,IAAI,IAAI,CAAC,aAAa,EAAE;YAC/C,QAAQ,CAAC,IAAI,CAAC,QAAQ,EAAE,CAAC,CAAC;YAC1B,QAAQ,CAAC,IAAI,CAAC,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,CAAC,CAAC;SACvC;IACL,CAAC;IAED,KAAK,CAAC,IAAI;;QACN,KAAK,CAAC,YAAY,CAAC,CAAC;QAEpB,MAAM,UAAU,GAAG,MAAM,IAAI,CAAC,OAAO,CAAC,cAAc,EAAE,CAAC;QACvD,MAAM,IAAI,CAAC,gBAAgB,CAAC,UAAU,CAAC,CAAC;QAExC,+BAA+B;QAC/B,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,0BAA0B,CAAC,IAAI,CAAC,CAAC;QAC9D,MAAM,KAAK,GAAG,IAAI,CAAC,kBAAkB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;QACvD,IAAI,CAAC,aAAa,CAAC,GAAG,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;QAEpC,KAAK,CAAC,UAAU,CAAC,CAAC;QAClB,MAAA,MAAA,IAAI,CAAC,OAAO,EAAC,aAAa,kDAAI,CAAC;IACnC,CAAC;IAED,KAAK,CAAC,KAAK;QACP,IAAI,CAAC,MAAM,GAAG,IAAI,CAAC;QACnB,MAAM,OAAO,CAAC,GAAG,CAAC,cAAO,CAAC,IAAI,CAAC,aAAa,CAAC,MAAM,EAAE,EAAE,CAAC,KAAK,EAAE,EAAE,CAAC,KAAK,EAAE,CAAC,CAAC,CAAC;IAChF,CAAC;IAEO,KAAK,CAAC,SAAS,CAAC,IAAmB;QACvC,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,0BAA0B,CAAC,IAAI,CAAC,CAAC;QAC9D,MAAM,KAAK,GAAG,IAAI,CAAC,kBAAkB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;QACvD,IAAI,CAAC,aAAa,CAAC,GAAG,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;IACxC,CAAC;IAIO,KAAK,CAAC,kBAAkB,CAAC,IAAmB;QAChD,MAAM,SAAS,GAAG,GAAG,IAAI,CAAC,WAAW,IAAI,IAAI,CAAC,YAAY,GAAG,IAAI,CAAC,CAAC,CAAC,IAAI,IAAI,EAAE,CAAC,CAAC,CAAC,EAAE,EAAE,CAAC;QACtF,MAAM,IAAI,CAAC,cAAc,CAAC,WAAW,CAAC,SAAS,EAAE;YAC7C,UAAU,EAAE,IAAI;YAChB,OAAO,EAAE,KAAK;YACd,SAAS,EAAE;gBACP,WAAW,EAAE,IAAI,GAAG,EAAE;aACzB;SACJ,CAAC,CAAC;QACH,OAAO,SAAS,CAAC;IACrB,CAAC;IAEO,KAAK,CAAC,0BAA0B,CAAC,IAAmB;QACxD,MAAM,CAAC,EAAE,SAAS,CAAC,GAAG,MAAM,OAAO,CAAC,GAAG,CAAC;YACpC,IAAI,CAAC,cAAc,CAAC,cAAc,CAAC,IAAI,CAAC,YAAY,EAAE,QAAQ,EAAE;gBAC5D,UAAU,EAAE,IAAI;gBAChB,OAAO,EAAE,KAAK;aACjB,CAAC;YACF,IAAI,CAAC,kBAAkB,CAAC,IAAI,CAAC;SAChC,CAAC,CAAC;QAEH,MAAM,IAAI,CAAC,cAAc,CAAC,SAAS,CAAC,SAAS,EAAE,IAAI,CAAC,YAAY,EAAE,IAAI,aAAJ,IAAI,cAAJ,IAAI,GAAI,eAAe,CAAC,CAAC;QAC3F,OAAO,SAAS,CAAC;IACrB,CAAC;IAEO,KAAK,CAAC,aAAa,CAAC,QAAkB,EAAE,IAAmB;QAC/D,MAAM,IAAI,GAAG,IAAI,CAAC,CAAC,CAAC,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,CAAC,CAAC,CAAC,IAAI,CAAC,GAAG,CAAC,OAAO,CAAC,IAAI,EAAE,CAAC;QACpE,MAAM,OAAO,GAAG,IAAI,GAAG,CAAC,QAAQ,CAAC,MAAM,CAAC,CAAC;QACzC,MAAM,MAAM,GAAG,QAAQ,CAAC,MAAM,CAAC;QAC/B,KAAK,MAAM,GAAG,IAAI,IAAI,EAAE;YACpB,IAAI,OAAO,CAAC,GAAG,CAAC,GAAG,CAAC;gBAAE,SAAS;YAE/B,KAAK,CAAC,SAAS,CAAC,MAAM,EAAE,EAAE,KAAK,EAAE,IAAI,CAAC,CAAC,CAAC,IAAI,GAAG,CAAC,CAAC,IAAI,CAAC,CAAC,CAAC,CAAC,CAAC,QAAQ,EAAE,CAAC,CAAC;SACzE;IACL,CAAC;IAEO,kBAAkB,CAAC,IAAmB,EAAE,SAAiB;QAC7D,KAAK,CAAC,4BAA4B,EAAE,IAAI,CAAC,CAAC;QAC1C,IAAI,WAAW,GAAG,mBAAY,EAAE,CAAC;QAEjC,IAAI,CAAC,cAAc;aACd,OAAO,CACJ,SAAS,EACT,KAAK,EAAE,GAAG,EAAE,EAAE;YACV,IAAI,CAAC,GAAG;gBAAE,OAAO;YACjB,MAAM,OAAO,GAAG,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,OAAO,CAAC,QAAQ,CAAC,MAAM,CAAC,CAAC,CAAC;YACzD,MAAM,IAAI,CAAC,aAAa,CAAC,OAAO,EAAE,IAAI,CAAC,CAAC;YACxC,IAAI,CAAC,cAAc,CAAC,GAAG,CAAC,GAAG,EAAE,KAAK,CAAC,CAAC;QACxC,CAAC,EACD;YACI,KAAK,EAAE,KAAK;YACZ,WAAW;SACd,CACJ;aACA,IAAI,CAAC,CAAC,CAAC,EAAE,EAAE,CAAC,CAAC,WAAW,GAAG,CAAC,CAAC,WAAW,CAAC,CAAC,CAAC;QAEhD,OAAO,KAAK,IAAI,EAAE;YACd,KAAK,CAAC,6BAA6B,EAAE,IAAI,CAAC,CAAC;YAC3C,MAAM,IAAI,CAAC,cAAc,CAAC,MAAM,CAAC,WAAW,CAAC,CAAC;QAClD,CAAC,CAAC;IACN,CAAC;IAED,KAAK,CAAC,MAAM,CAAC,EAAU,EAAE,KAAkB;QACvC,8CAA8C;QAC9C,KAAK,CAAC,QAAQ,EAAE,GAAG,SAAS,CAAC,CAAC;QAE9B,MAAM,QAAQ,GAAG,IAAI,GAAG,EAAU,CAAC;QACnC,KAAK,MAAM,IAAI,IAAI,KAAK,EAAE;YACtB,IAAI,IAAI,KAAK,EAAE,EAAE;gBACb,IAAI,IAAI,CAAC,OAAO,CAAC,cAAc,KAAK,cAAc,CAAC,MAAM;oBAAE,SAAS;gBACpE,IAAI,IAAI,CAAC,OAAO,CAAC,cAAc,KAAK,cAAc,CAAC,KAAK,EAAE;oBACtD,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC;oBAC5B,SAAS;iBACZ;aACJ;YACD,IAAI,CAAC,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;gBACpB,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,EAAE,IAAI,GAAG,EAAE,CAAC,CAAC;aAChC;YACD,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAE,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC;YAE7B,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAC,EAAE;gBACvB,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,EAAE,IAAI,GAAG,EAAE,CAAC,CAAC;gBAChC,QAAQ,CAAC,GAAG,CAAC,IAAI,CAAC,CAAC;aACtB;YACD,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,CAAC,GAAG,CAAC,EAAE,CAAC,CAAC;SACjC;QAED,MAAM,OAAO,CAAC,GAAG,CAAC;YACd,GAAG,cAAO,CAAC,QAAQ,EAAE,KAAK,EAAE,IAAI,EAAE,EAAE;gBAChC,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,0BAA0B,CAAC,IAAI,CAAC,CAAC;gBAC9D,MAAM,KAAK,GAAG,IAAI,CAAC,kBAAkB,CAAC,IAAI,EAAE,SAAS,CAAC,CAAC;gBACvD,IAAI,CAAC,aAAa,CAAC,GAAG,CAAC,IAAI,EAAE,KAAK,CAAC,CAAC;YACxC,CAAC,CAAC;SACL,CAAC,CAAC;IACP,CAAC;IAED,GAAG,CAAC,EAAU,EAAE,IAAY;;QACxB,IAAI,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;YACnB,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAE,CAAC,MAAM,CAAC,IAAI,CAAC,CAAC;SACnC;QAED,IAAI,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAC,EAAE;YACtB,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,CAAC,MAAM,CAAC,EAAE,CAAC,CAAC;YACjC,IAAI,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,CAAC,IAAI,KAAK,CAAC,EAAE;gBAClC,IAAI,CAAC,KAAK,CAAC,MAAM,CAAC,IAAI,CAAC,CAAC;gBAExB,8BAA8B;gBAC9B,MAAA,IAAI,CAAC,aAAa,CAAC,GAAG,CAAC,IAAI,CAAC,2CAAI,CAAC;gBACjC,IAAI,CAAC,aAAa,CAAC,MAAM,CAAC,IAAI,CAAC,CAAC;aACnC;SACJ;IACL,CAAC;IACD,MAAM,CAAC,EAAU;QACb,IAAI,CAAC,YAAY,CAAC,MAAM,CAAC,EAAE,CAAC,CAAC;QAE7B,IAAI,CAAC,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,EAAE;YACpB,OAAO;SACV;QAED,KAAK,MAAM,IAAI,IAAI,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAE,EAAE;YACnC,IAAI,CAAC,GAAG,CAAC,EAAE,EAAE,IAAI,CAAC,CAAC,CAAC,wCAAwC;SAC/D;QAED,IAAI,CAAC,IAAI,CAAC,MAAM,CAAC,EAAE,CAAC,CAAC;IACzB,CAAC;IAEO,KAAK,CAAC,cAAc,CAAC,KAAwB,EAAE,QAAkB;QACrE,KAAK,CAAC,8BAA8B,EAAE,KAAK,EAAE,QAAQ,CAAC,CAAC;QAEvD,MAAM,SAAS,GAAG,KAAK,CAAC,GAAG,CAAC,CAAC,IAAI,EAAE,EAAE,CAAC,IAAI,aAAJ,IAAI,cAAJ,IAAI,GAAI,eAAe,CAAC,CAAC;QAE/D,MAAM,MAAM,GAAG,MAAM,CAAC,IAAI,CAAC,IAAI,CAAC,SAAS,CAAC,QAAQ,CAAC,CAAC,CAAC;QACrD,MAAM,gBAAS,CAAC,IAAI,CAAC,cAAc,CAAC,OAAO,CAAC,CAAC,IAAI,CAAC,IAAI,CAAC,cAAc,CAAC,CAClE,IAAI,CAAC,YAAY,EACjB,SAAS,CAAC,CAAC,CAAC,EACZ,MAAM,EACN,EAAE,GAAG,CAAC,SAAS,CAAC,MAAM,GAAG,CAAC,CAAC,CAAC,CAAC,EAAE,EAAE,EAAE,SAAS,CAAC,KAAK,CAAC,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC,EAAE,CAAC,EAAE,CAClE,CAAC;IACN,CAAC;IAED,KAAK,CAAC,SAAS,CAAC,MAAW,EAAE,IAAsB;;QAC/C,KAAK,CAAC,WAAW,EAAE,MAAM,EAAE,IAAI,CAAC,CAAC;QACjC,IAAI,MAAA,IAAI,CAAC,KAAK,0CAAE,KAAK,EAAE;YACnB,OAAO,KAAK,CAAC,SAAS,CAAC,MAAM,EAAE,IAAI,CAAC,CAAC;SACxC;QACD,MAAM,QAAQ,GAAa;YACvB,MAAM;YACN,MAAM,EAAE,IAAI,CAAC,MAAM,IAAI,CAAC,GAAG,IAAI,CAAC,MAAM,CAAC;SAC1C,CAAC;QACF,MAAM,KAAK,GAAG,IAAI,CAAC,KAAK,IAAI,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,CAAC,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC,CAAC,OAAO,CAAC;QACnE,MAAM,aAAa,GAAG,CAAC,GAAG,iBAAU,CAAC,KAAK,EAAE,CAAC,IAAI,EAAE,EAAE,CAAC,CAAC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAK,CAAC,CAAC,CAAC,CAAC;QACtF,MAAM,OAAO,CAAC,GAAG,CAAC;YACd,GAAG,cAAO,CACN,iBAAU,CAAC,KAAK,EAAE,CAAC,IAAI,EAAE,EAAE,CAAC,IAAI,CAAC,YAAY,CAAC,GAAG,CAAC,IAAK,CAAC,CAAC,EACzD,KAAK,EAAE,IAAI,EAAE,EAAE;gBACX,MAAM,IAAI,CAAC,SAAS,CAAC,MAAM,EAAE;oBACzB,GAAG,IAAI;oBACP,KAAK,EAAE,IAAI,GAAG,CAAC,CAAC,IAAK,CAAC,CAAC;oBACvB,KAAK,EAAE,EAAE,GAAG,IAAI,CAAC,KAAK,EAAE,KAAK,EAAE,IAAI,EAAE;iBACxC,CAAC,CAAC;YACP,CAAC,CACJ;YACD,IAAI,CAAC,cAAc,CAAC,aAAa,EAAE,QAAQ,CAAC;SAC/C,CAAC,CAAC;IACP,CAAC;IACD,OAAO,CAAC,KAAgB,EAAE,QAA2C;QACjE,MAAM,IAAI,GAAG,IAAI,GAAG,EAAY,CAAC;QAEjC,IAAI,KAAK,CAAC,IAAI,EAAE;YACZ,KAAK,MAAM,IAAI,IAAI,KAAK,EAAE;gBACtB,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAC;oBAAE,SAAS;gBAEpC,KAAK,MAAM,EAAE,IAAI,IAAI,CAAC,KAAK,CAAC,GAAG,CAAC,IAAI,CAAE,EAAE;oBACpC,IAAI,EAAE,IAAI,IAAI,CAAC,GAAG,CAAC,OAAO,EAAE;wBACxB,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAC;qBAChB;iBACJ;aACJ;SACJ;aAAM;YACH,KAAK,MAAM,CAAC,EAAE,CAAC,IAAI,IAAI,CAAC,IAAI,EAAE;gBAC1B,IAAI,EAAE,IAAI,IAAI,CAAC,GAAG,CAAC,OAAO;oBAAE,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAC;aAC5C;SACJ;QAED,QAAQ,aAAR,QAAQ,uBAAR,QAAQ,CAAG,IAAI,CAAC,CAAC;QACjB,OAAO,OAAO,CAAC,OAAO,CAAC,IAAI,CAAC,CAAC;IACjC,CAAC;IAED,WAAW,CAAC,EAAU;QAClB,OAAO,IAAI,CAAC,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,CAAC;IAC7B,CAAC;CACJ;AA1QD,kCA0QC","sourcesContent":["/* eslint-disable no-console */\nimport { BroadcastOptions, Room, SocketId, Adapter } from 'socket.io-adapter';\nimport { Namespace, Socket } from 'socket.io';\nimport { EventEmitter } from 'events';\nimport debugFactory from 'debug';\nimport { Channel, ConfirmChannel, Connection } from 'amqplib';\nimport { hostname, networkInterfaces } from 'os';\nimport { randomString, mapIter, filterIter } from './util';\nimport { promisify } from 'util';\n\nconst debug = debugFactory('socket.io-amqp');\n\nexport enum SidRoomRouting {\n normal = 'normal',\n local = 'local',\n banned = 'banned',\n}\n\nexport interface AmqpAdapterOptions {\n amqpConnection: () => Promise<Connection> | Connection;\n sidRoomRouting?: SidRoomRouting;\n instanceName?: string;\n exchangeName?: string;\n queuePrefix?: string;\n\n shutdownCallbackCallback?: (callback: () => Promise<void>) => void;\n readyCallback?: () => void;\n}\n\ninterface Envelope {\n packet: any;\n except?: SocketId[];\n}\n\nconst nullSet = new Set<null>([null]);\nObject.freeze(nullSet);\n\nconst emptySet = new Set<any>([]);\nObject.freeze(emptySet);\n\nconst defaultRoomName = 'broadcast';\nconst defaultExchangeName = 'socket.io';\n\nexport const createAdapter = function (opts: AmqpAdapterOptions): typeof AmqpAdapter {\n const shim = class AmqpAdapterWrapper extends AmqpAdapter {\n constructor(nsp: Namespace) {\n super(nsp, opts);\n }\n };\n // shim.name = AmqpAdapter.name;\n return shim;\n};\n\nexport class AmqpAdapter extends Adapter {\n readonly rooms: Map<Room, Set<SocketId>> = new Map();\n readonly sids: Map<SocketId, Set<Room>> = new Map();\n readonly instanceName: string;\n readonly exchangeName: string;\n readonly queuePrefix: string;\n private roomListeners: Map<Room | null, () => Promise<void>> = new Map();\n private closed = false;\n\n private consumeChannel!: Channel;\n private publishChannel!: ConfirmChannel;\n\n constructor(public readonly nsp: Namespace, private options: AmqpAdapterOptions) {\n super(nsp);\n this.instanceName = options.instanceName ?? hostname();\n this.exchangeName = options.exchangeName ?? defaultExchangeName;\n this.queuePrefix = options.queuePrefix ?? defaultExchangeName;\n\n options.shutdownCallbackCallback?.(async () => {\n await Promise.all(mapIter(this.roomListeners.values(), (unsub) => unsub()));\n });\n this.init(); // hack until issue in socket.io is resolved\n }\n\n async handleConnection(conn: Connection) {\n conn.on('close', async () => {\n if (this.closed) return;\n const conn = await this.options.amqpConnection();\n this.handleConnection(conn);\n });\n\n conn.on('error', (err) => {\n debug('Got connection error', err);\n });\n\n const [consumeChannel, publishChannel] = await Promise.all([conn.createChannel(), conn.createConfirmChannel()]);\n\n this.consumeChannel = consumeChannel;\n this.publishChannel = publishChannel;\n\n const promises: Promise<any>[] = [];\n for (const [room, shutdown] of this.roomListeners) {\n promises.push(shutdown());\n promises.push(this.setupRoom(room));\n }\n }\n\n async init(): Promise<void> {\n debug('start init');\n\n const connection = await this.options.amqpConnection();\n await this.handleConnection(connection);\n\n // set up the default broadcast\n const queueName = await this.createRoomExchangeAndQueue(null);\n const unsub = this.createRoomListener(null, queueName);\n this.roomListeners.set(null, unsub);\n\n debug('end init');\n this.options.readyCallback?.();\n }\n\n async close(): Promise<void> {\n this.closed = true;\n await Promise.all(mapIter(this.roomListeners.values(), (unsub) => unsub()));\n }\n\n private async setupRoom(room: string | null): Promise<void> {\n const queueName = await this.createRoomExchangeAndQueue(room);\n const unsub = this.createRoomListener(room, queueName);\n this.roomListeners.set(room, unsub);\n }\n\n private localRouting: Set<string> = new Set();\n\n private async createQueueForRoom(room: string | null): Promise<string> {\n const queueName = `${this.queuePrefix}#${this.instanceName}${room ? `#${room}` : ''}`;\n await this.consumeChannel.assertQueue(queueName, {\n autoDelete: true,\n durable: false,\n arguments: {\n 'x-expires': 1000 * 60,\n },\n });\n return queueName;\n }\n\n private async createRoomExchangeAndQueue(room: string | null): Promise<string> {\n const [, queueName] = await Promise.all([\n this.publishChannel.assertExchange(this.exchangeName, 'direct', {\n autoDelete: true,\n durable: false,\n }),\n this.createQueueForRoom(room),\n ]);\n\n await this.consumeChannel.bindQueue(queueName, this.exchangeName, room ?? defaultRoomName);\n return queueName;\n }\n\n private async handleMessage(envelope: Envelope, room: string | null) {\n const sids = room ? this.rooms.get(room)! : this.nsp.sockets.keys();\n const excepts = new Set(envelope.except);\n const packet = envelope.packet;\n for (const sid of sids) {\n if (excepts.has(sid)) continue;\n\n super.broadcast(packet, { rooms: room ? new Set([room]) : emptySet });\n }\n }\n\n private createRoomListener(room: string | null, queueName: string): () => Promise<void> {\n debug('Starting room listener for', room);\n let consumerTag = randomString();\n\n this.consumeChannel\n .consume(\n queueName,\n async (msg) => {\n if (!msg) return;\n const payload = JSON.parse(msg.content.toString('utf8'));\n await this.handleMessage(payload, room);\n this.consumeChannel.ack(msg, false);\n },\n {\n noAck: false, // require manual ack\n consumerTag,\n },\n )\n .then((x) => (consumerTag = x.consumerTag));\n\n return async () => {\n debug('Canceling room listener for', room);\n await this.consumeChannel.cancel(consumerTag);\n };\n }\n\n async addAll(id: string, rooms: Set<string>): Promise<void> {\n // eslint-disable-next-line prefer-rest-params\n debug('addAll', ...arguments);\n\n const newRooms = new Set<string>();\n for (const room of rooms) {\n if (room === id) {\n if (this.options.sidRoomRouting === SidRoomRouting.banned) continue;\n if (this.options.sidRoomRouting === SidRoomRouting.local) {\n this.localRouting.add(room);\n continue;\n }\n }\n if (!this.sids.has(id)) {\n this.sids.set(id, new Set());\n }\n this.sids.get(id)!.add(room);\n\n if (!this.rooms.has(room)) {\n this.rooms.set(room, new Set());\n newRooms.add(room);\n }\n this.rooms.get(room)!.add(id);\n }\n\n await Promise.all([\n ...mapIter(newRooms, async (room) => {\n const queueName = await this.createRoomExchangeAndQueue(room);\n const unsub = this.createRoomListener(room, queueName);\n this.roomListeners.set(room, unsub);\n }),\n ]);\n }\n\n del(id: string, room: string): void {\n if (this.sids.has(id)) {\n this.sids.get(id)!.delete(room);\n }\n\n if (this.rooms.has(room)) {\n this.rooms.get(room)!.delete(id);\n if (this.rooms.get(room)!.size === 0) {\n this.rooms.delete(room);\n\n // tear down the room listener\n this.roomListeners.get(room)?.();\n this.roomListeners.delete(room);\n }\n }\n }\n delAll(id: string): void {\n this.localRouting.delete(id);\n\n if (!this.sids.has(id)) {\n return;\n }\n\n for (const room of this.sids.get(id)!) {\n this.del(id, room); // todo: probably wrap this via promises\n }\n\n this.sids.delete(id);\n }\n\n private async publishToRooms(rooms: (string | null)[], envelope: Envelope) {\n debug('Publishing message for rooms', rooms, envelope);\n\n const routeKeys = rooms.map((room) => room ?? defaultRoomName);\n\n const buffer = Buffer.from(JSON.stringify(envelope));\n await promisify(this.publishChannel.publish).bind(this.publishChannel)(\n this.exchangeName,\n routeKeys[0],\n buffer,\n { ...(routeKeys.length > 1 ? { CC: routeKeys.slice(1) } : {}) },\n );\n }\n\n async broadcast(packet: any, opts: BroadcastOptions): Promise<void> {\n debug('broadcast', packet, opts);\n if (opts.flags?.local) {\n return super.broadcast(packet, opts);\n }\n const envelope: Envelope = {\n packet,\n except: opts.except && [...opts.except],\n };\n const rooms = opts.rooms && opts.rooms.size ? opts.rooms : nullSet;\n const nonlocalRooms = [...filterIter(rooms, (room) => !this.localRouting.has(room!))];\n await Promise.all([\n ...mapIter(\n filterIter(rooms, (room) => this.localRouting.has(room!)),\n async (room) => {\n await this.broadcast(packet, {\n ...opts,\n rooms: new Set([room!]),\n flags: { ...opts.flags, local: true },\n });\n },\n ),\n this.publishToRooms(nonlocalRooms, envelope),\n ]);\n }\n sockets(rooms: Set<Room>, callback?: (sockets: Set<SocketId>) => void): Promise<Set<SocketId>> {\n const sids = new Set<SocketId>();\n\n if (rooms.size) {\n for (const room of rooms) {\n if (!this.rooms.has(room)) continue;\n\n for (const id of this.rooms.get(room)!) {\n if (id in this.nsp.sockets) {\n sids.add(id);\n }\n }\n }\n } else {\n for (const [id] of this.sids) {\n if (id in this.nsp.sockets) sids.add(id);\n }\n }\n\n callback?.(sids);\n return Promise.resolve(sids);\n }\n\n socketRooms(id: string): Set<Room> | undefined {\n return this.sids.get(id);\n }\n}\n"]}
{
"name": "socket.io-amqp0",
"version": "4.0.5",
"version": "5.0.1",
"description": "socket.io adapter for amqp 0.9.1+ (e.g. RabbitMQ)",

@@ -5,0 +5,0 @@ "main": "dist/index.js",