socket.io-amqp0
Advanced tools
Comparing version
@@ -13,2 +13,4 @@ import { BroadcastOptions, Room, SocketId, Adapter } from 'socket.io-adapter'; | ||
instanceName?: string; | ||
exchangeName?: string; | ||
queuePrefix?: string; | ||
shutdownCallbackCallback?: (callback: () => Promise<void>) => void; | ||
@@ -24,2 +26,4 @@ readyCallback?: () => void; | ||
readonly instanceName: string; | ||
readonly exchangeName: string; | ||
readonly queuePrefix: string; | ||
private roomListeners; | ||
@@ -42,2 +46,3 @@ private closed; | ||
delAll(id: string): void; | ||
private publishToRooms; | ||
broadcast(packet: any, opts: BroadcastOptions): Promise<void>; | ||
@@ -44,0 +49,0 @@ sockets(rooms: Set<Room>, callback?: (sockets: Set<SocketId>) => void): Promise<Set<SocketId>>; |
@@ -22,3 +22,4 @@ "use strict"; | ||
Object.freeze(emptySet); | ||
const defaultRoomName = 'default'; | ||
const defaultRoomName = 'broadcast'; | ||
const defaultExchangeName = 'socket.io'; | ||
const createAdapter = function (opts) { | ||
@@ -36,3 +37,3 @@ const shim = class AmqpAdapterWrapper extends AmqpAdapter { | ||
constructor(nsp, options) { | ||
var _a, _b; | ||
var _a, _b, _c, _d; | ||
super(nsp); | ||
@@ -47,3 +48,5 @@ this.nsp = nsp; | ||
this.instanceName = (_a = options.instanceName) !== null && _a !== void 0 ? _a : os_1.hostname(); | ||
(_b = options.shutdownCallbackCallback) === null || _b === void 0 ? void 0 : _b.call(options, async () => { | ||
this.exchangeName = (_b = options.exchangeName) !== null && _b !== void 0 ? _b : defaultExchangeName; | ||
this.queuePrefix = (_c = options.queuePrefix) !== null && _c !== void 0 ? _c : defaultExchangeName; | ||
(_d = options.shutdownCallbackCallback) === null || _d === void 0 ? void 0 : _d.call(options, async () => { | ||
await Promise.all(util_1.mapIter(this.roomListeners.values(), (unsub) => unsub())); | ||
@@ -94,3 +97,3 @@ }); | ||
async createQueueForRoom(room) { | ||
const queueName = `${this.instanceName}${room ? `#${room}` : ''}`; | ||
const queueName = `${this.queuePrefix}#${this.instanceName}${room ? `#${room}` : ''}`; | ||
await this.consumeChannel.assertQueue(queueName, { | ||
@@ -106,5 +109,4 @@ autoDelete: true, | ||
async createRoomExchangeAndQueue(room) { | ||
const exchangeName = room !== null && room !== void 0 ? room : defaultRoomName; | ||
const [, queueName] = await Promise.all([ | ||
this.publishChannel.assertExchange(exchangeName, 'fanout', { | ||
this.publishChannel.assertExchange(this.exchangeName, 'direct', { | ||
autoDelete: true, | ||
@@ -115,3 +117,3 @@ durable: false, | ||
]); | ||
await this.consumeChannel.bindQueue(queueName, exchangeName, '*'); | ||
await this.consumeChannel.bindQueue(queueName, this.exchangeName, room !== null && room !== void 0 ? room : defaultRoomName); | ||
return queueName; | ||
@@ -205,2 +207,8 @@ } | ||
} | ||
async publishToRooms(rooms, envelope) { | ||
debug('Publishing message for rooms', rooms, envelope); | ||
const routeKeys = rooms.map((room) => room !== null && room !== void 0 ? room : defaultRoomName); | ||
const buffer = Buffer.from(JSON.stringify(envelope)); | ||
await util_2.promisify(this.publishChannel.publish).bind(this.publishChannel)(this.exchangeName, routeKeys[0], buffer, { ...(routeKeys.length > 1 ? { CC: routeKeys.slice(1) } : {}) }); | ||
} | ||
async broadcast(packet, opts) { | ||
@@ -217,18 +225,12 @@ var _a; | ||
const rooms = opts.rooms && opts.rooms.size ? opts.rooms : nullSet; | ||
const nonlocalRooms = [...util_1.filterIter(rooms, (room) => !this.localRouting.has(room))]; | ||
await Promise.all([ | ||
...util_1.mapIter(rooms, async (room) => { | ||
if (this.localRouting.has(room)) { | ||
await this.broadcast(packet, { | ||
...opts, | ||
rooms: new Set([room]), | ||
flags: { ...opts.flags, local: true }, | ||
}); | ||
} | ||
else { | ||
const exchangeName = room !== null && room !== void 0 ? room : defaultRoomName; | ||
debug('Publishing message for room', room, envelope); | ||
const buffer = Buffer.from(JSON.stringify(envelope)); | ||
await util_2.promisify(this.publishChannel.publish).bind(this.publishChannel)(exchangeName, '*', buffer, {}); | ||
} | ||
...util_1.mapIter(util_1.filterIter(rooms, (room) => this.localRouting.has(room)), async (room) => { | ||
await this.broadcast(packet, { | ||
...opts, | ||
rooms: new Set([room]), | ||
flags: { ...opts.flags, local: true }, | ||
}); | ||
}), | ||
this.publishToRooms(nonlocalRooms, envelope), | ||
]); | ||
@@ -235,0 +237,0 @@ } |
{ | ||
"name": "socket.io-amqp0", | ||
"version": "4.0.5", | ||
"version": "5.0.1", | ||
"description": "socket.io adapter for amqp 0.9.1+ (e.g. RabbitMQ)", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
Sorry, the diff of this file is not supported yet
URL strings
Supply chain riskPackage contains fragments of external URLs or IP addresses, which the package may be accessing at runtime.
Found 1 instance in 1 package
43471
4.28%456
1.56%1
Infinity%