@nftx/queue
Advanced tools
Comparing version 1.11.1 to 1.11.2
@@ -6,2 +6,13 @@ # Change Log | ||
## [1.11.2](https://github.com/NFTX-project/nftxjs/compare/v1.11.1...v1.11.2) (2024-01-24) | ||
### Bug Fixes | ||
* allow signal and message queues to be closed ([e8b12af](https://github.com/NFTX-project/nftxjs/commit/e8b12af7d2ea3048fcde1891654a4302637ea42c)) | ||
## [1.11.1](https://github.com/NFTX-project/nftxjs/compare/v1.11.0...v1.11.1) (2024-01-19) | ||
@@ -8,0 +19,0 @@ |
@@ -5,4 +5,4 @@ 'use strict'; | ||
var IORedis = require('ioredis'); | ||
var constants = require('@nftx/constants'); | ||
var IORedis = require('ioredis'); | ||
var bullmq = require('bullmq'); | ||
@@ -26,55 +26,94 @@ | ||
const getConnection = () => { | ||
return new IORedis__default["default"](BULLMQ_REDIS_URI); | ||
class Connection extends IORedis__default["default"] { | ||
constructor(name, connectionUri) { | ||
super(connectionUri); | ||
this.name = name; | ||
Connection.connections[name] = this; | ||
} | ||
close() { | ||
delete Connection.connections[this.name]; | ||
super.disconnect(); | ||
} | ||
static connections = {}; | ||
static closeAll() { | ||
Object.values(Connection.connections).forEach(connection => { | ||
connection.close(); | ||
}); | ||
} | ||
} | ||
const getConnection = name => { | ||
return Connection.connections[name] || new Connection(name, BULLMQ_REDIS_URI); | ||
}; | ||
var getConnection$1 = getConnection; | ||
let queue; | ||
/** Returns a queue object that allows you to emit or subscribe to messages */ | ||
const getMessageQueue = () => { | ||
if (!queue) { | ||
const connection = getConnection$1(); | ||
const channel = MESSAGE_QUEUE_NAME; | ||
const add = (type, payload) => { | ||
console.debug(`Sending message: ${type}`); | ||
const message = JSON.stringify({ | ||
class MessageQueue { | ||
channel = MESSAGE_QUEUE_NAME; | ||
constructor(connection) { | ||
this.connection = connection; | ||
MessageQueue.queue = this; | ||
} | ||
add(type, payload) { | ||
console.debug(`Sending message: ${type}`); | ||
const message = JSON.stringify({ | ||
type, | ||
payload | ||
}); | ||
this.connection.publish(this.channel, message); | ||
} | ||
subscribe(cb) { | ||
this.connection.subscribe(MESSAGE_QUEUE_NAME, err => { | ||
if (err) { | ||
console.error(`Failed to subscribe: ${err.message}`); | ||
} | ||
}); | ||
this.connection.on('message', (channel, message) => { | ||
const { | ||
type, | ||
payload | ||
}); | ||
connection.publish(channel, message); | ||
}; | ||
const subscribe = callback => { | ||
connection.subscribe(MESSAGE_QUEUE_NAME, err => { | ||
} = JSON.parse(message); | ||
cb(type, payload); | ||
}); | ||
return () => { | ||
this.connection.unsubscribe(MESSAGE_QUEUE_NAME, err => { | ||
if (err) { | ||
console.error(`Failed to subscribe: ${err.message}`); | ||
console.error(`Failed to unsubscribe: ${err.message}`); | ||
} | ||
}); | ||
connection.on('message', (channel, message) => { | ||
const { | ||
type, | ||
payload | ||
} = JSON.parse(message); | ||
callback(type, payload); | ||
}); | ||
}; | ||
queue = { | ||
add, | ||
subscribe | ||
}; | ||
} | ||
return queue; | ||
close() { | ||
MessageQueue.queue = undefined; | ||
this.connection.close(); | ||
} | ||
static close() { | ||
MessageQueue.queue?.close(); | ||
} | ||
} | ||
/** Returns a queue object that allows you to emit or subscribe to messages */ | ||
const getMessageQueue = () => { | ||
return MessageQueue.queue || new MessageQueue(getConnection$1(MESSAGE_QUEUE_NAME)); | ||
}; | ||
var getMessageQueue$1 = getMessageQueue; | ||
const queues = {}; | ||
const getQueue = queueName => { | ||
let queue = queues[queueName]; | ||
if (!queue) { | ||
const connection = getConnection$1(); | ||
queue = queues[queueName] = new bullmq.Queue(queueName, { | ||
connection | ||
class NftxQueue extends bullmq.Queue { | ||
constructor(name, opts) { | ||
super(name, opts); | ||
NftxQueue.queues[name] = this; | ||
} | ||
close() { | ||
delete NftxQueue.queues[this.name]; | ||
return super.close(); | ||
} | ||
static queues = {}; | ||
static closeAll() { | ||
Object.values(NftxQueue.queues).forEach(queue => { | ||
queue.close(); | ||
}); | ||
} | ||
return queue; | ||
} | ||
const getQueue = queueName => { | ||
return NftxQueue.queues[queueName] || new NftxQueue(queueName, { | ||
connection: getConnection$1(queueName) | ||
}); | ||
}; | ||
@@ -136,9 +175,25 @@ var getQueue$1 = getQueue; | ||
class NftxWorker extends bullmq.Worker { | ||
constructor(queueName, onJob) { | ||
super(queueName, onJob, { | ||
concurrency: 8, | ||
connection: getConnection$1(queueName) | ||
}); | ||
NftxWorker.workers[queueName] = this; | ||
} | ||
close() { | ||
delete NftxWorker.workers[this.name]; | ||
return super.close(); | ||
} | ||
static workers = {}; | ||
static closeAll() { | ||
Object.values(NftxWorker.workers).forEach(worker => { | ||
worker.close(); | ||
}); | ||
} | ||
} | ||
// Create a bullmq worker for the given queue name (internal use) | ||
const createWorker = (queueName, onJob) => { | ||
const connection = getConnection$1(); | ||
const worker = new bullmq.Worker(queueName, onJob, { | ||
concurrency: 8, | ||
connection | ||
}); | ||
const worker = new NftxWorker(queueName, onJob); | ||
return worker; | ||
@@ -184,3 +239,18 @@ }; | ||
// Tear down all connections and queues | ||
const disconnect = () => { | ||
// Kill the message queue | ||
MessageQueue.close(); | ||
// Kill all signal queues | ||
NftxQueue.closeAll(); | ||
// Kill all remaining connections | ||
Connection.closeAll(); | ||
}; | ||
var disconnect$1 = disconnect; | ||
exports.disconnect = disconnect$1; | ||
exports.getConnection = getConnection$1; | ||
exports.getMessageQueue = getMessageQueue$1; | ||
exports.getNetworkQueue = getNetworkQueue$1; | ||
exports.getSignalQueue = getSignalQueue$1; | ||
exports.onMessage = onMessage$1; | ||
@@ -187,0 +257,0 @@ exports.onNetworkSignal = onNetworkSignal$1; |
@@ -0,3 +1,3 @@ | ||
import IORedis from 'ioredis'; | ||
import { Network } from '@nftx/constants'; | ||
import IORedis from 'ioredis'; | ||
import { Queue, Worker } from 'bullmq'; | ||
@@ -17,55 +17,94 @@ | ||
const getConnection = () => { | ||
return new IORedis(BULLMQ_REDIS_URI); | ||
class Connection extends IORedis { | ||
constructor(name, connectionUri) { | ||
super(connectionUri); | ||
this.name = name; | ||
Connection.connections[name] = this; | ||
} | ||
close() { | ||
delete Connection.connections[this.name]; | ||
super.disconnect(); | ||
} | ||
static connections = {}; | ||
static closeAll() { | ||
Object.values(Connection.connections).forEach(connection => { | ||
connection.close(); | ||
}); | ||
} | ||
} | ||
const getConnection = name => { | ||
return Connection.connections[name] || new Connection(name, BULLMQ_REDIS_URI); | ||
}; | ||
var getConnection$1 = getConnection; | ||
let queue; | ||
/** Returns a queue object that allows you to emit or subscribe to messages */ | ||
const getMessageQueue = () => { | ||
if (!queue) { | ||
const connection = getConnection$1(); | ||
const channel = MESSAGE_QUEUE_NAME; | ||
const add = (type, payload) => { | ||
console.debug(`Sending message: ${type}`); | ||
const message = JSON.stringify({ | ||
class MessageQueue { | ||
channel = MESSAGE_QUEUE_NAME; | ||
constructor(connection) { | ||
this.connection = connection; | ||
MessageQueue.queue = this; | ||
} | ||
add(type, payload) { | ||
console.debug(`Sending message: ${type}`); | ||
const message = JSON.stringify({ | ||
type, | ||
payload | ||
}); | ||
this.connection.publish(this.channel, message); | ||
} | ||
subscribe(cb) { | ||
this.connection.subscribe(MESSAGE_QUEUE_NAME, err => { | ||
if (err) { | ||
console.error(`Failed to subscribe: ${err.message}`); | ||
} | ||
}); | ||
this.connection.on('message', (channel, message) => { | ||
const { | ||
type, | ||
payload | ||
}); | ||
connection.publish(channel, message); | ||
}; | ||
const subscribe = callback => { | ||
connection.subscribe(MESSAGE_QUEUE_NAME, err => { | ||
} = JSON.parse(message); | ||
cb(type, payload); | ||
}); | ||
return () => { | ||
this.connection.unsubscribe(MESSAGE_QUEUE_NAME, err => { | ||
if (err) { | ||
console.error(`Failed to subscribe: ${err.message}`); | ||
console.error(`Failed to unsubscribe: ${err.message}`); | ||
} | ||
}); | ||
connection.on('message', (channel, message) => { | ||
const { | ||
type, | ||
payload | ||
} = JSON.parse(message); | ||
callback(type, payload); | ||
}); | ||
}; | ||
queue = { | ||
add, | ||
subscribe | ||
}; | ||
} | ||
return queue; | ||
close() { | ||
MessageQueue.queue = undefined; | ||
this.connection.close(); | ||
} | ||
static close() { | ||
MessageQueue.queue?.close(); | ||
} | ||
} | ||
/** Returns a queue object that allows you to emit or subscribe to messages */ | ||
const getMessageQueue = () => { | ||
return MessageQueue.queue || new MessageQueue(getConnection$1(MESSAGE_QUEUE_NAME)); | ||
}; | ||
var getMessageQueue$1 = getMessageQueue; | ||
const queues = {}; | ||
const getQueue = queueName => { | ||
let queue = queues[queueName]; | ||
if (!queue) { | ||
const connection = getConnection$1(); | ||
queue = queues[queueName] = new Queue(queueName, { | ||
connection | ||
class NftxQueue extends Queue { | ||
constructor(name, opts) { | ||
super(name, opts); | ||
NftxQueue.queues[name] = this; | ||
} | ||
close() { | ||
delete NftxQueue.queues[this.name]; | ||
return super.close(); | ||
} | ||
static queues = {}; | ||
static closeAll() { | ||
Object.values(NftxQueue.queues).forEach(queue => { | ||
queue.close(); | ||
}); | ||
} | ||
return queue; | ||
} | ||
const getQueue = queueName => { | ||
return NftxQueue.queues[queueName] || new NftxQueue(queueName, { | ||
connection: getConnection$1(queueName) | ||
}); | ||
}; | ||
@@ -127,9 +166,25 @@ var getQueue$1 = getQueue; | ||
class NftxWorker extends Worker { | ||
constructor(queueName, onJob) { | ||
super(queueName, onJob, { | ||
concurrency: 8, | ||
connection: getConnection$1(queueName) | ||
}); | ||
NftxWorker.workers[queueName] = this; | ||
} | ||
close() { | ||
delete NftxWorker.workers[this.name]; | ||
return super.close(); | ||
} | ||
static workers = {}; | ||
static closeAll() { | ||
Object.values(NftxWorker.workers).forEach(worker => { | ||
worker.close(); | ||
}); | ||
} | ||
} | ||
// Create a bullmq worker for the given queue name (internal use) | ||
const createWorker = (queueName, onJob) => { | ||
const connection = getConnection$1(); | ||
const worker = new Worker(queueName, onJob, { | ||
concurrency: 8, | ||
connection | ||
}); | ||
const worker = new NftxWorker(queueName, onJob); | ||
return worker; | ||
@@ -175,2 +230,13 @@ }; | ||
export { getConnection$1 as getConnection, onMessage$1 as onMessage, onNetworkSignal$1 as onNetworkSignal, onSignal$1 as onSignal, sendMessage$1 as sendMessage, sendNetworkSignal$1 as sendNetworkSignal, sendSignal$1 as sendSignal }; | ||
// Tear down all connections and queues | ||
const disconnect = () => { | ||
// Kill the message queue | ||
MessageQueue.close(); | ||
// Kill all signal queues | ||
NftxQueue.closeAll(); | ||
// Kill all remaining connections | ||
Connection.closeAll(); | ||
}; | ||
var disconnect$1 = disconnect; | ||
export { disconnect$1 as disconnect, getConnection$1 as getConnection, getMessageQueue$1 as getMessageQueue, getNetworkQueue$1 as getNetworkQueue, getSignalQueue$1 as getSignalQueue, onMessage$1 as onMessage, onNetworkSignal$1 as onNetworkSignal, onSignal$1 as onSignal, sendMessage$1 as sendMessage, sendNetworkSignal$1 as sendNetworkSignal, sendSignal$1 as sendSignal }; |
import IORedis from 'ioredis'; | ||
declare const getConnection: () => IORedis; | ||
export declare class Connection extends IORedis { | ||
name: string; | ||
constructor(name: string, connectionUri: string); | ||
close(): void; | ||
static connections: Record<string, Connection>; | ||
static closeAll(): void; | ||
} | ||
declare const getConnection: (name: string) => Connection; | ||
export default getConnection; |
export * from './actions'; | ||
export * from './connection'; | ||
export * from './listeners'; | ||
export * from './queues'; | ||
export * from './types'; | ||
export { default as disconnect } from './disconnect'; |
import { Job, Worker } from 'bullmq'; | ||
declare const createWorker: (queueName: string, onJob: (job: Job) => any) => Worker<any, any, string>; | ||
export declare class NftxWorker extends Worker { | ||
constructor(queueName: string, onJob: (job: Job) => any); | ||
close(): Promise<void>; | ||
static workers: Record<string, NftxWorker>; | ||
static closeAll(): void; | ||
} | ||
declare const createWorker: (queueName: string, onJob: (job: Job) => any) => NftxWorker; | ||
export default createWorker; |
@@ -6,3 +6,3 @@ import { SignalCallback } from '../types'; | ||
*/ | ||
declare const onNetworkSignal: (network: number, callback: SignalCallback) => import("bullmq").Worker<any, any, string>; | ||
declare const onNetworkSignal: (network: number, callback: SignalCallback) => import("./createWorker").NftxWorker; | ||
export default onNetworkSignal; |
@@ -5,3 +5,3 @@ import { SignalCallback } from '../types'; | ||
*/ | ||
declare const onSignal: (callback: SignalCallback) => import("bullmq").Worker<any, any, string>; | ||
declare const onSignal: (callback: SignalCallback) => import("./createWorker").NftxWorker; | ||
export default onSignal; |
@@ -0,6 +1,19 @@ | ||
import { Connection } from '../connection/getConnection'; | ||
interface IMessageQueue { | ||
add: (type: string, payload: Record<string, any>) => void; | ||
subscribe: (cb: (type: string, payload: any) => void) => () => void; | ||
close: () => void; | ||
} | ||
export declare class MessageQueue implements IMessageQueue { | ||
connection: Connection; | ||
channel: string; | ||
constructor(connection: Connection); | ||
add(type: string, payload: Record<string, any>): void; | ||
subscribe(cb: (type: string, payload: any) => void): () => void; | ||
close(): void; | ||
static queue: IMessageQueue | undefined; | ||
static close(): void; | ||
} | ||
/** Returns a queue object that allows you to emit or subscribe to messages */ | ||
declare const getMessageQueue: () => { | ||
add: (type: string, payload: Record<string, any>) => void; | ||
subscribe: (cb: (type: string, payload: any) => void) => void; | ||
}; | ||
declare const getMessageQueue: () => IMessageQueue; | ||
export default getMessageQueue; |
@@ -1,3 +0,9 @@ | ||
import { Queue } from 'bullmq'; | ||
import { Queue, QueueOptions } from 'bullmq'; | ||
export declare class NftxQueue extends Queue { | ||
constructor(name: string, opts: QueueOptions); | ||
close(): Promise<void>; | ||
static queues: Record<string, Queue>; | ||
static closeAll(): void; | ||
} | ||
declare const getQueue: (queueName: string) => Queue<any, any, string>; | ||
export default getQueue; |
{ | ||
"name": "@nftx/queue", | ||
"version": "1.11.1", | ||
"version": "1.11.2", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/NFTX-project/nftxjs", |
import IORedis from 'ioredis'; | ||
import { BULLMQ_REDIS_URI } from '../constants'; | ||
const getConnection = () => { | ||
return new IORedis(BULLMQ_REDIS_URI); | ||
export class Connection extends IORedis { | ||
constructor(public name: string, connectionUri: string) { | ||
super(connectionUri); | ||
Connection.connections[name] = this; | ||
} | ||
close() { | ||
delete Connection.connections[this.name]; | ||
super.disconnect(); | ||
} | ||
static connections: Record<string, Connection> = {}; | ||
static closeAll() { | ||
Object.values(Connection.connections).forEach((connection) => { | ||
connection.close(); | ||
}); | ||
} | ||
} | ||
const getConnection = (name: string) => { | ||
return Connection.connections[name] || new Connection(name, BULLMQ_REDIS_URI); | ||
}; | ||
export default getConnection; |
export * from './actions'; | ||
export * from './connection'; | ||
export * from './listeners'; | ||
// export * from './queues'; | ||
export * from './queues'; | ||
export * from './types'; | ||
export { default as disconnect } from './disconnect'; |
import { Job, Worker } from 'bullmq'; | ||
import { getConnection } from '../connection'; | ||
export class NftxWorker extends Worker { | ||
constructor(queueName: string, onJob: (job: Job) => any) { | ||
super(queueName, onJob, { | ||
concurrency: 8, | ||
connection: getConnection(queueName), | ||
}); | ||
NftxWorker.workers[queueName] = this; | ||
} | ||
close() { | ||
delete NftxWorker.workers[this.name]; | ||
return super.close(); | ||
} | ||
static workers: Record<string, NftxWorker> = {}; | ||
static closeAll() { | ||
Object.values(NftxWorker.workers).forEach((worker) => { | ||
worker.close(); | ||
}); | ||
} | ||
} | ||
// Create a bullmq worker for the given queue name (internal use) | ||
const createWorker = (queueName: string, onJob: (job: Job) => any) => { | ||
const connection = getConnection(); | ||
const worker = new Worker(queueName, onJob, { | ||
concurrency: 8, | ||
connection, | ||
}); | ||
const worker = new NftxWorker(queueName, onJob); | ||
return worker; | ||
@@ -12,0 +31,0 @@ }; |
import { getConnection } from '../connection'; | ||
import { Connection } from '../connection/getConnection'; | ||
import { MESSAGE_QUEUE_NAME } from '../constants'; | ||
let queue: { | ||
interface IMessageQueue { | ||
add: (type: string, payload: Record<string, any>) => void; | ||
subscribe: (cb: (type: string, payload: any) => void) => void; | ||
}; | ||
subscribe: (cb: (type: string, payload: any) => void) => () => void; | ||
close: () => void; | ||
} | ||
/** Returns a queue object that allows you to emit or subscribe to messages */ | ||
const getMessageQueue = () => { | ||
if (!queue) { | ||
const connection = getConnection(); | ||
const channel = MESSAGE_QUEUE_NAME; | ||
export class MessageQueue implements IMessageQueue { | ||
channel = MESSAGE_QUEUE_NAME; | ||
const add = (type: string, payload: Record<string, any>) => { | ||
console.debug(`Sending message: ${type}`); | ||
const message = JSON.stringify({ type, payload }); | ||
connection.publish(channel, message); | ||
}; | ||
const subscribe = ( | ||
callback: (type: string, payload: Record<string, any>) => any | ||
) => { | ||
connection.subscribe(MESSAGE_QUEUE_NAME, (err) => { | ||
constructor(public connection: Connection) { | ||
MessageQueue.queue = this; | ||
} | ||
add(type: string, payload: Record<string, any>) { | ||
console.debug(`Sending message: ${type}`); | ||
const message = JSON.stringify({ type, payload }); | ||
this.connection.publish(this.channel, message); | ||
} | ||
subscribe(cb: (type: string, payload: any) => void) { | ||
this.connection.subscribe(MESSAGE_QUEUE_NAME, (err) => { | ||
if (err) { | ||
console.error(`Failed to subscribe: ${err.message}`); | ||
} | ||
}); | ||
this.connection.on('message', (channel, message) => { | ||
const { type, payload } = JSON.parse(message); | ||
cb(type, payload); | ||
}); | ||
return () => { | ||
this.connection.unsubscribe(MESSAGE_QUEUE_NAME, (err) => { | ||
if (err) { | ||
console.error(`Failed to subscribe: ${err.message}`); | ||
console.error(`Failed to unsubscribe: ${err.message}`); | ||
} | ||
}); | ||
connection.on('message', (channel, message) => { | ||
const { type, payload } = JSON.parse(message); | ||
callback(type, payload); | ||
}); | ||
}; | ||
} | ||
close() { | ||
MessageQueue.queue = undefined; | ||
this.connection.close(); | ||
} | ||
queue = { add, subscribe }; | ||
static queue: IMessageQueue | undefined; | ||
static close() { | ||
MessageQueue.queue?.close(); | ||
} | ||
return queue; | ||
} | ||
/** Returns a queue object that allows you to emit or subscribe to messages */ | ||
const getMessageQueue = () => { | ||
return ( | ||
MessageQueue.queue || new MessageQueue(getConnection(MESSAGE_QUEUE_NAME)) | ||
); | ||
}; | ||
export default getMessageQueue; |
@@ -1,15 +0,30 @@ | ||
import { Queue } from 'bullmq'; | ||
import { Queue, QueueOptions } from 'bullmq'; | ||
import { getConnection } from '../connection'; | ||
const queues: Record<string, Queue> = {}; | ||
export class NftxQueue extends Queue { | ||
constructor(name: string, opts: QueueOptions) { | ||
super(name, opts); | ||
NftxQueue.queues[name] = this; | ||
} | ||
close() { | ||
delete NftxQueue.queues[this.name]; | ||
return super.close(); | ||
} | ||
static queues: Record<string, Queue> = {}; | ||
static closeAll() { | ||
Object.values(NftxQueue.queues).forEach((queue) => { | ||
queue.close(); | ||
}); | ||
} | ||
} | ||
const getQueue = (queueName: string) => { | ||
let queue = queues[queueName]; | ||
if (!queue) { | ||
const connection = getConnection(); | ||
queue = queues[queueName] = new Queue(queueName, { connection }); | ||
} | ||
return queue; | ||
return ( | ||
NftxQueue.queues[queueName] || | ||
new NftxQueue(queueName, { connection: getConnection(queueName) }) | ||
); | ||
}; | ||
export default getQueue; |
33971
49
917