socket.io-amqp0
Advanced tools
Comparing version 4.0.1-dev.0 to 4.0.1
@@ -23,11 +23,14 @@ import { BroadcastOptions, Room, SocketId, Adapter } from 'socket.io-adapter'; | ||
readonly instanceName: string; | ||
private roomListeners; | ||
private closed; | ||
private consumeChannel; | ||
private publishChannel; | ||
constructor(nsp: Namespace, options: AmqpAdapterOptions); | ||
handleConnection(conn: Connection): Promise<void>; | ||
init(): Promise<void>; | ||
close(): Promise<void>; | ||
private roomListeners; | ||
private setupRoom; | ||
private localRouting; | ||
private createQueueForRoom; | ||
private createRoomSnsAndSqs; | ||
private createRoomExchangeAndQueue; | ||
private handleMessage; | ||
@@ -34,0 +37,0 @@ private createRoomListener; |
@@ -42,2 +42,3 @@ "use strict"; | ||
this.roomListeners = new Map(); | ||
this.closed = false; | ||
this.localRouting = new Set(); | ||
@@ -50,2 +51,21 @@ this.instanceName = (_a = options.instanceName) !== null && _a !== void 0 ? _a : os_1.hostname(); | ||
} | ||
async handleConnection(conn) { | ||
conn.on('close', async () => { | ||
if (this.closed) | ||
return; | ||
const conn = await this.options.amqpConnection(); | ||
this.handleConnection(conn); | ||
}); | ||
conn.on('error', (err) => { | ||
debug('Got connection error', err); | ||
}); | ||
const [consumeChannel, publishChannel] = await Promise.all([conn.createChannel(), conn.createConfirmChannel()]); | ||
this.consumeChannel = consumeChannel; | ||
this.publishChannel = publishChannel; | ||
const promises = []; | ||
for (const [room, shutdown] of this.roomListeners) { | ||
promises.push(shutdown()); | ||
promises.push(this.setupRoom(room)); | ||
} | ||
} | ||
async init() { | ||
@@ -55,10 +75,5 @@ var _a, _b; | ||
const connection = await this.options.amqpConnection(); | ||
const [consumeChannel, publishChannel] = await Promise.all([ | ||
connection.createChannel(), | ||
connection.createConfirmChannel(), | ||
]); | ||
this.consumeChannel = consumeChannel; | ||
this.publishChannel = publishChannel; | ||
await this.handleConnection(connection); | ||
// set up the default broadcast | ||
const queueName = await this.createRoomSnsAndSqs(null); | ||
const queueName = await this.createRoomExchangeAndQueue(null); | ||
const unsub = this.createRoomListener(null, queueName); | ||
@@ -70,4 +85,10 @@ this.roomListeners.set(null, unsub); | ||
async close() { | ||
this.closed = true; | ||
await Promise.all(util_1.mapIter(this.roomListeners.values(), (unsub) => unsub())); | ||
} | ||
async setupRoom(room) { | ||
const queueName = await this.createRoomExchangeAndQueue(room); | ||
const unsub = this.createRoomListener(room, queueName); | ||
this.roomListeners.set(room, unsub); | ||
} | ||
async createQueueForRoom(room) { | ||
@@ -78,6 +99,9 @@ const queueName = `${this.instanceName}${room ? `#${room}` : ''}`; | ||
durable: false, | ||
arguments: { | ||
'x-expires': 1000 * 60, | ||
}, | ||
}); | ||
return queueName; | ||
} | ||
async createRoomSnsAndSqs(room) { | ||
async createRoomExchangeAndQueue(room) { | ||
const exchangeName = room !== null && room !== void 0 ? room : defaultRoomName; | ||
@@ -146,3 +170,3 @@ const [, queueName] = await Promise.all([ | ||
...util_1.mapIter(newRooms, async (room) => { | ||
const queueName = await this.createRoomSnsAndSqs(room); | ||
const queueName = await this.createRoomExchangeAndQueue(room); | ||
const unsub = this.createRoomListener(room, queueName); | ||
@@ -149,0 +173,0 @@ this.roomListeners.set(room, unsub); |
{ | ||
"name": "socket.io-amqp0", | ||
"version": "4.0.1-dev.0", | ||
"version": "4.0.1", | ||
"description": "socket.io adapter for amqp 0.9.1+ (e.g. RabbitMQ)", | ||
@@ -5,0 +5,0 @@ "main": "lib/main.js", |
Sorry, the diff of this file is not supported yet
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
38537
446
1