socket.io-amqp0
Advanced tools
Comparing version 4.0.5 to 5.0.1
@@ -13,2 +13,4 @@ import { BroadcastOptions, Room, SocketId, Adapter } from 'socket.io-adapter'; | ||
instanceName?: string; | ||
exchangeName?: string; | ||
queuePrefix?: string; | ||
shutdownCallbackCallback?: (callback: () => Promise<void>) => void; | ||
@@ -24,2 +26,4 @@ readyCallback?: () => void; | ||
readonly instanceName: string; | ||
readonly exchangeName: string; | ||
readonly queuePrefix: string; | ||
private roomListeners; | ||
@@ -42,2 +46,3 @@ private closed; | ||
delAll(id: string): void; | ||
private publishToRooms; | ||
broadcast(packet: any, opts: BroadcastOptions): Promise<void>; | ||
@@ -44,0 +49,0 @@ sockets(rooms: Set<Room>, callback?: (sockets: Set<SocketId>) => void): Promise<Set<SocketId>>; |
@@ -22,3 +22,4 @@ "use strict"; | ||
Object.freeze(emptySet); | ||
const defaultRoomName = 'default'; | ||
const defaultRoomName = 'broadcast'; | ||
const defaultExchangeName = 'socket.io'; | ||
const createAdapter = function (opts) { | ||
@@ -36,3 +37,3 @@ const shim = class AmqpAdapterWrapper extends AmqpAdapter { | ||
constructor(nsp, options) { | ||
var _a, _b; | ||
var _a, _b, _c, _d; | ||
super(nsp); | ||
@@ -47,3 +48,5 @@ this.nsp = nsp; | ||
this.instanceName = (_a = options.instanceName) !== null && _a !== void 0 ? _a : os_1.hostname(); | ||
(_b = options.shutdownCallbackCallback) === null || _b === void 0 ? void 0 : _b.call(options, async () => { | ||
this.exchangeName = (_b = options.exchangeName) !== null && _b !== void 0 ? _b : defaultExchangeName; | ||
this.queuePrefix = (_c = options.queuePrefix) !== null && _c !== void 0 ? _c : defaultExchangeName; | ||
(_d = options.shutdownCallbackCallback) === null || _d === void 0 ? void 0 : _d.call(options, async () => { | ||
await Promise.all(util_1.mapIter(this.roomListeners.values(), (unsub) => unsub())); | ||
@@ -94,3 +97,3 @@ }); | ||
async createQueueForRoom(room) { | ||
const queueName = `${this.instanceName}${room ? `#${room}` : ''}`; | ||
const queueName = `${this.queuePrefix}#${this.instanceName}${room ? `#${room}` : ''}`; | ||
await this.consumeChannel.assertQueue(queueName, { | ||
@@ -106,5 +109,4 @@ autoDelete: true, | ||
async createRoomExchangeAndQueue(room) { | ||
const exchangeName = room !== null && room !== void 0 ? room : defaultRoomName; | ||
const [, queueName] = await Promise.all([ | ||
this.publishChannel.assertExchange(exchangeName, 'fanout', { | ||
this.publishChannel.assertExchange(this.exchangeName, 'direct', { | ||
autoDelete: true, | ||
@@ -115,3 +117,3 @@ durable: false, | ||
]); | ||
await this.consumeChannel.bindQueue(queueName, exchangeName, '*'); | ||
await this.consumeChannel.bindQueue(queueName, this.exchangeName, room !== null && room !== void 0 ? room : defaultRoomName); | ||
return queueName; | ||
@@ -205,2 +207,8 @@ } | ||
} | ||
async publishToRooms(rooms, envelope) { | ||
debug('Publishing message for rooms', rooms, envelope); | ||
const routeKeys = rooms.map((room) => room !== null && room !== void 0 ? room : defaultRoomName); | ||
const buffer = Buffer.from(JSON.stringify(envelope)); | ||
await util_2.promisify(this.publishChannel.publish).bind(this.publishChannel)(this.exchangeName, routeKeys[0], buffer, { ...(routeKeys.length > 1 ? { CC: routeKeys.slice(1) } : {}) }); | ||
} | ||
async broadcast(packet, opts) { | ||
@@ -217,18 +225,12 @@ var _a; | ||
const rooms = opts.rooms && opts.rooms.size ? opts.rooms : nullSet; | ||
const nonlocalRooms = [...util_1.filterIter(rooms, (room) => !this.localRouting.has(room))]; | ||
await Promise.all([ | ||
...util_1.mapIter(rooms, async (room) => { | ||
if (this.localRouting.has(room)) { | ||
await this.broadcast(packet, { | ||
...opts, | ||
rooms: new Set([room]), | ||
flags: { ...opts.flags, local: true }, | ||
}); | ||
} | ||
else { | ||
const exchangeName = room !== null && room !== void 0 ? room : defaultRoomName; | ||
debug('Publishing message for room', room, envelope); | ||
const buffer = Buffer.from(JSON.stringify(envelope)); | ||
await util_2.promisify(this.publishChannel.publish).bind(this.publishChannel)(exchangeName, '*', buffer, {}); | ||
} | ||
...util_1.mapIter(util_1.filterIter(rooms, (room) => this.localRouting.has(room)), async (room) => { | ||
await this.broadcast(packet, { | ||
...opts, | ||
rooms: new Set([room]), | ||
flags: { ...opts.flags, local: true }, | ||
}); | ||
}), | ||
this.publishToRooms(nonlocalRooms, envelope), | ||
]); | ||
@@ -235,0 +237,0 @@ } |
{ | ||
"name": "socket.io-amqp0", | ||
"version": "4.0.5", | ||
"version": "5.0.1", | ||
"description": "socket.io adapter for amqp 0.9.1+ (e.g. RabbitMQ)", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
Sorry, the diff of this file is not supported yet
43471
456