Socket
Socket
Sign inDemoInstall

@nftx/queue

Package Overview
Dependencies
4
Maintainers
2
Versions
36
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.11.1 to 1.11.2

dist/ts/disconnect.d.ts

11

CHANGELOG.md

@@ -6,2 +6,13 @@ # Change Log

## [1.11.2](https://github.com/NFTX-project/nftxjs/compare/v1.11.1...v1.11.2) (2024-01-24)
### Bug Fixes
* allow signal and message queues to be closed ([e8b12af](https://github.com/NFTX-project/nftxjs/commit/e8b12af7d2ea3048fcde1891654a4302637ea42c))
## [1.11.1](https://github.com/NFTX-project/nftxjs/compare/v1.11.0...v1.11.1) (2024-01-19)

@@ -8,0 +19,0 @@

158

dist/cjs/queue.js

@@ -5,4 +5,4 @@ 'use strict';

var IORedis = require('ioredis');
var constants = require('@nftx/constants');
var IORedis = require('ioredis');
var bullmq = require('bullmq');

@@ -26,55 +26,94 @@

const getConnection = () => {
return new IORedis__default["default"](BULLMQ_REDIS_URI);
class Connection extends IORedis__default["default"] {
constructor(name, connectionUri) {
super(connectionUri);
this.name = name;
Connection.connections[name] = this;
}
close() {
delete Connection.connections[this.name];
super.disconnect();
}
static connections = {};
static closeAll() {
Object.values(Connection.connections).forEach(connection => {
connection.close();
});
}
}
const getConnection = name => {
return Connection.connections[name] || new Connection(name, BULLMQ_REDIS_URI);
};
var getConnection$1 = getConnection;
let queue;
/** Returns a queue object that allows you to emit or subscribe to messages */
const getMessageQueue = () => {
if (!queue) {
const connection = getConnection$1();
const channel = MESSAGE_QUEUE_NAME;
const add = (type, payload) => {
console.debug(`Sending message: ${type}`);
const message = JSON.stringify({
class MessageQueue {
channel = MESSAGE_QUEUE_NAME;
constructor(connection) {
this.connection = connection;
MessageQueue.queue = this;
}
add(type, payload) {
console.debug(`Sending message: ${type}`);
const message = JSON.stringify({
type,
payload
});
this.connection.publish(this.channel, message);
}
subscribe(cb) {
this.connection.subscribe(MESSAGE_QUEUE_NAME, err => {
if (err) {
console.error(`Failed to subscribe: ${err.message}`);
}
});
this.connection.on('message', (channel, message) => {
const {
type,
payload
});
connection.publish(channel, message);
};
const subscribe = callback => {
connection.subscribe(MESSAGE_QUEUE_NAME, err => {
} = JSON.parse(message);
cb(type, payload);
});
return () => {
this.connection.unsubscribe(MESSAGE_QUEUE_NAME, err => {
if (err) {
console.error(`Failed to subscribe: ${err.message}`);
console.error(`Failed to unsubscribe: ${err.message}`);
}
});
connection.on('message', (channel, message) => {
const {
type,
payload
} = JSON.parse(message);
callback(type, payload);
});
};
queue = {
add,
subscribe
};
}
return queue;
close() {
MessageQueue.queue = undefined;
this.connection.close();
}
static close() {
MessageQueue.queue?.close();
}
}
/** Returns a queue object that allows you to emit or subscribe to messages */
const getMessageQueue = () => {
return MessageQueue.queue || new MessageQueue(getConnection$1(MESSAGE_QUEUE_NAME));
};
var getMessageQueue$1 = getMessageQueue;
const queues = {};
const getQueue = queueName => {
let queue = queues[queueName];
if (!queue) {
const connection = getConnection$1();
queue = queues[queueName] = new bullmq.Queue(queueName, {
connection
class NftxQueue extends bullmq.Queue {
constructor(name, opts) {
super(name, opts);
NftxQueue.queues[name] = this;
}
close() {
delete NftxQueue.queues[this.name];
return super.close();
}
static queues = {};
static closeAll() {
Object.values(NftxQueue.queues).forEach(queue => {
queue.close();
});
}
return queue;
}
const getQueue = queueName => {
return NftxQueue.queues[queueName] || new NftxQueue(queueName, {
connection: getConnection$1(queueName)
});
};

@@ -136,9 +175,25 @@ var getQueue$1 = getQueue;

class NftxWorker extends bullmq.Worker {
constructor(queueName, onJob) {
super(queueName, onJob, {
concurrency: 8,
connection: getConnection$1(queueName)
});
NftxWorker.workers[queueName] = this;
}
close() {
delete NftxWorker.workers[this.name];
return super.close();
}
static workers = {};
static closeAll() {
Object.values(NftxWorker.workers).forEach(worker => {
worker.close();
});
}
}
// Create a bullmq worker for the given queue name (internal use)
const createWorker = (queueName, onJob) => {
const connection = getConnection$1();
const worker = new bullmq.Worker(queueName, onJob, {
concurrency: 8,
connection
});
const worker = new NftxWorker(queueName, onJob);
return worker;

@@ -184,3 +239,18 @@ };

// Tear down all connections and queues
const disconnect = () => {
// Kill the message queue
MessageQueue.close();
// Kill all signal queues
NftxQueue.closeAll();
// Kill all remaining connections
Connection.closeAll();
};
var disconnect$1 = disconnect;
exports.disconnect = disconnect$1;
exports.getConnection = getConnection$1;
exports.getMessageQueue = getMessageQueue$1;
exports.getNetworkQueue = getNetworkQueue$1;
exports.getSignalQueue = getSignalQueue$1;
exports.onMessage = onMessage$1;

@@ -187,0 +257,0 @@ exports.onNetworkSignal = onNetworkSignal$1;

@@ -0,3 +1,3 @@

import IORedis from 'ioredis';
import { Network } from '@nftx/constants';
import IORedis from 'ioredis';
import { Queue, Worker } from 'bullmq';

@@ -17,55 +17,94 @@

const getConnection = () => {
return new IORedis(BULLMQ_REDIS_URI);
class Connection extends IORedis {
constructor(name, connectionUri) {
super(connectionUri);
this.name = name;
Connection.connections[name] = this;
}
close() {
delete Connection.connections[this.name];
super.disconnect();
}
static connections = {};
static closeAll() {
Object.values(Connection.connections).forEach(connection => {
connection.close();
});
}
}
const getConnection = name => {
return Connection.connections[name] || new Connection(name, BULLMQ_REDIS_URI);
};
var getConnection$1 = getConnection;
let queue;
/** Returns a queue object that allows you to emit or subscribe to messages */
const getMessageQueue = () => {
if (!queue) {
const connection = getConnection$1();
const channel = MESSAGE_QUEUE_NAME;
const add = (type, payload) => {
console.debug(`Sending message: ${type}`);
const message = JSON.stringify({
class MessageQueue {
channel = MESSAGE_QUEUE_NAME;
constructor(connection) {
this.connection = connection;
MessageQueue.queue = this;
}
add(type, payload) {
console.debug(`Sending message: ${type}`);
const message = JSON.stringify({
type,
payload
});
this.connection.publish(this.channel, message);
}
subscribe(cb) {
this.connection.subscribe(MESSAGE_QUEUE_NAME, err => {
if (err) {
console.error(`Failed to subscribe: ${err.message}`);
}
});
this.connection.on('message', (channel, message) => {
const {
type,
payload
});
connection.publish(channel, message);
};
const subscribe = callback => {
connection.subscribe(MESSAGE_QUEUE_NAME, err => {
} = JSON.parse(message);
cb(type, payload);
});
return () => {
this.connection.unsubscribe(MESSAGE_QUEUE_NAME, err => {
if (err) {
console.error(`Failed to subscribe: ${err.message}`);
console.error(`Failed to unsubscribe: ${err.message}`);
}
});
connection.on('message', (channel, message) => {
const {
type,
payload
} = JSON.parse(message);
callback(type, payload);
});
};
queue = {
add,
subscribe
};
}
return queue;
close() {
MessageQueue.queue = undefined;
this.connection.close();
}
static close() {
MessageQueue.queue?.close();
}
}
/** Returns a queue object that allows you to emit or subscribe to messages */
const getMessageQueue = () => {
return MessageQueue.queue || new MessageQueue(getConnection$1(MESSAGE_QUEUE_NAME));
};
var getMessageQueue$1 = getMessageQueue;
const queues = {};
const getQueue = queueName => {
let queue = queues[queueName];
if (!queue) {
const connection = getConnection$1();
queue = queues[queueName] = new Queue(queueName, {
connection
class NftxQueue extends Queue {
constructor(name, opts) {
super(name, opts);
NftxQueue.queues[name] = this;
}
close() {
delete NftxQueue.queues[this.name];
return super.close();
}
static queues = {};
static closeAll() {
Object.values(NftxQueue.queues).forEach(queue => {
queue.close();
});
}
return queue;
}
const getQueue = queueName => {
return NftxQueue.queues[queueName] || new NftxQueue(queueName, {
connection: getConnection$1(queueName)
});
};

@@ -127,9 +166,25 @@ var getQueue$1 = getQueue;

class NftxWorker extends Worker {
constructor(queueName, onJob) {
super(queueName, onJob, {
concurrency: 8,
connection: getConnection$1(queueName)
});
NftxWorker.workers[queueName] = this;
}
close() {
delete NftxWorker.workers[this.name];
return super.close();
}
static workers = {};
static closeAll() {
Object.values(NftxWorker.workers).forEach(worker => {
worker.close();
});
}
}
// Create a bullmq worker for the given queue name (internal use)
const createWorker = (queueName, onJob) => {
const connection = getConnection$1();
const worker = new Worker(queueName, onJob, {
concurrency: 8,
connection
});
const worker = new NftxWorker(queueName, onJob);
return worker;

@@ -175,2 +230,13 @@ };

export { getConnection$1 as getConnection, onMessage$1 as onMessage, onNetworkSignal$1 as onNetworkSignal, onSignal$1 as onSignal, sendMessage$1 as sendMessage, sendNetworkSignal$1 as sendNetworkSignal, sendSignal$1 as sendSignal };
// Tear down all connections and queues
const disconnect = () => {
// Kill the message queue
MessageQueue.close();
// Kill all signal queues
NftxQueue.closeAll();
// Kill all remaining connections
Connection.closeAll();
};
var disconnect$1 = disconnect;
export { disconnect$1 as disconnect, getConnection$1 as getConnection, getMessageQueue$1 as getMessageQueue, getNetworkQueue$1 as getNetworkQueue, getSignalQueue$1 as getSignalQueue, onMessage$1 as onMessage, onNetworkSignal$1 as onNetworkSignal, onSignal$1 as onSignal, sendMessage$1 as sendMessage, sendNetworkSignal$1 as sendNetworkSignal, sendSignal$1 as sendSignal };
import IORedis from 'ioredis';
declare const getConnection: () => IORedis;
export declare class Connection extends IORedis {
name: string;
constructor(name: string, connectionUri: string);
close(): void;
static connections: Record<string, Connection>;
static closeAll(): void;
}
declare const getConnection: (name: string) => Connection;
export default getConnection;
export * from './actions';
export * from './connection';
export * from './listeners';
export * from './queues';
export * from './types';
export { default as disconnect } from './disconnect';
import { Job, Worker } from 'bullmq';
declare const createWorker: (queueName: string, onJob: (job: Job) => any) => Worker<any, any, string>;
export declare class NftxWorker extends Worker {
constructor(queueName: string, onJob: (job: Job) => any);
close(): Promise<void>;
static workers: Record<string, NftxWorker>;
static closeAll(): void;
}
declare const createWorker: (queueName: string, onJob: (job: Job) => any) => NftxWorker;
export default createWorker;

@@ -6,3 +6,3 @@ import { SignalCallback } from '../types';

*/
declare const onNetworkSignal: (network: number, callback: SignalCallback) => import("bullmq").Worker<any, any, string>;
declare const onNetworkSignal: (network: number, callback: SignalCallback) => import("./createWorker").NftxWorker;
export default onNetworkSignal;

@@ -5,3 +5,3 @@ import { SignalCallback } from '../types';

*/
declare const onSignal: (callback: SignalCallback) => import("bullmq").Worker<any, any, string>;
declare const onSignal: (callback: SignalCallback) => import("./createWorker").NftxWorker;
export default onSignal;

@@ -0,6 +1,19 @@

import { Connection } from '../connection/getConnection';
interface IMessageQueue {
add: (type: string, payload: Record<string, any>) => void;
subscribe: (cb: (type: string, payload: any) => void) => () => void;
close: () => void;
}
export declare class MessageQueue implements IMessageQueue {
connection: Connection;
channel: string;
constructor(connection: Connection);
add(type: string, payload: Record<string, any>): void;
subscribe(cb: (type: string, payload: any) => void): () => void;
close(): void;
static queue: IMessageQueue | undefined;
static close(): void;
}
/** Returns a queue object that allows you to emit or subscribe to messages */
declare const getMessageQueue: () => {
add: (type: string, payload: Record<string, any>) => void;
subscribe: (cb: (type: string, payload: any) => void) => void;
};
declare const getMessageQueue: () => IMessageQueue;
export default getMessageQueue;

@@ -1,3 +0,9 @@

import { Queue } from 'bullmq';
import { Queue, QueueOptions } from 'bullmq';
export declare class NftxQueue extends Queue {
constructor(name: string, opts: QueueOptions);
close(): Promise<void>;
static queues: Record<string, Queue>;
static closeAll(): void;
}
declare const getQueue: (queueName: string) => Queue<any, any, string>;
export default getQueue;
{
"name": "@nftx/queue",
"version": "1.11.1",
"version": "1.11.2",
"description": "",

@@ -5,0 +5,0 @@ "homepage": "https://github.com/NFTX-project/nftxjs",

import IORedis from 'ioredis';
import { BULLMQ_REDIS_URI } from '../constants';
const getConnection = () => {
return new IORedis(BULLMQ_REDIS_URI);
export class Connection extends IORedis {
constructor(public name: string, connectionUri: string) {
super(connectionUri);
Connection.connections[name] = this;
}
close() {
delete Connection.connections[this.name];
super.disconnect();
}
static connections: Record<string, Connection> = {};
static closeAll() {
Object.values(Connection.connections).forEach((connection) => {
connection.close();
});
}
}
const getConnection = (name: string) => {
return Connection.connections[name] || new Connection(name, BULLMQ_REDIS_URI);
};
export default getConnection;
export * from './actions';
export * from './connection';
export * from './listeners';
// export * from './queues';
export * from './queues';
export * from './types';
export { default as disconnect } from './disconnect';
import { Job, Worker } from 'bullmq';
import { getConnection } from '../connection';
export class NftxWorker extends Worker {
constructor(queueName: string, onJob: (job: Job) => any) {
super(queueName, onJob, {
concurrency: 8,
connection: getConnection(queueName),
});
NftxWorker.workers[queueName] = this;
}
close() {
delete NftxWorker.workers[this.name];
return super.close();
}
static workers: Record<string, NftxWorker> = {};
static closeAll() {
Object.values(NftxWorker.workers).forEach((worker) => {
worker.close();
});
}
}
// Create a bullmq worker for the given queue name (internal use)
const createWorker = (queueName: string, onJob: (job: Job) => any) => {
const connection = getConnection();
const worker = new Worker(queueName, onJob, {
concurrency: 8,
connection,
});
const worker = new NftxWorker(queueName, onJob);
return worker;

@@ -12,0 +31,0 @@ };

import { getConnection } from '../connection';
import { Connection } from '../connection/getConnection';
import { MESSAGE_QUEUE_NAME } from '../constants';
let queue: {
interface IMessageQueue {
add: (type: string, payload: Record<string, any>) => void;
subscribe: (cb: (type: string, payload: any) => void) => void;
};
subscribe: (cb: (type: string, payload: any) => void) => () => void;
close: () => void;
}
/** Returns a queue object that allows you to emit or subscribe to messages */
const getMessageQueue = () => {
if (!queue) {
const connection = getConnection();
const channel = MESSAGE_QUEUE_NAME;
export class MessageQueue implements IMessageQueue {
channel = MESSAGE_QUEUE_NAME;
const add = (type: string, payload: Record<string, any>) => {
console.debug(`Sending message: ${type}`);
const message = JSON.stringify({ type, payload });
connection.publish(channel, message);
};
const subscribe = (
callback: (type: string, payload: Record<string, any>) => any
) => {
connection.subscribe(MESSAGE_QUEUE_NAME, (err) => {
constructor(public connection: Connection) {
MessageQueue.queue = this;
}
add(type: string, payload: Record<string, any>) {
console.debug(`Sending message: ${type}`);
const message = JSON.stringify({ type, payload });
this.connection.publish(this.channel, message);
}
subscribe(cb: (type: string, payload: any) => void) {
this.connection.subscribe(MESSAGE_QUEUE_NAME, (err) => {
if (err) {
console.error(`Failed to subscribe: ${err.message}`);
}
});
this.connection.on('message', (channel, message) => {
const { type, payload } = JSON.parse(message);
cb(type, payload);
});
return () => {
this.connection.unsubscribe(MESSAGE_QUEUE_NAME, (err) => {
if (err) {
console.error(`Failed to subscribe: ${err.message}`);
console.error(`Failed to unsubscribe: ${err.message}`);
}
});
connection.on('message', (channel, message) => {
const { type, payload } = JSON.parse(message);
callback(type, payload);
});
};
}
close() {
MessageQueue.queue = undefined;
this.connection.close();
}
queue = { add, subscribe };
static queue: IMessageQueue | undefined;
static close() {
MessageQueue.queue?.close();
}
return queue;
}
/** Returns a queue object that allows you to emit or subscribe to messages */
const getMessageQueue = () => {
return (
MessageQueue.queue || new MessageQueue(getConnection(MESSAGE_QUEUE_NAME))
);
};
export default getMessageQueue;

@@ -1,15 +0,30 @@

import { Queue } from 'bullmq';
import { Queue, QueueOptions } from 'bullmq';
import { getConnection } from '../connection';
const queues: Record<string, Queue> = {};
export class NftxQueue extends Queue {
constructor(name: string, opts: QueueOptions) {
super(name, opts);
NftxQueue.queues[name] = this;
}
close() {
delete NftxQueue.queues[this.name];
return super.close();
}
static queues: Record<string, Queue> = {};
static closeAll() {
Object.values(NftxQueue.queues).forEach((queue) => {
queue.close();
});
}
}
const getQueue = (queueName: string) => {
let queue = queues[queueName];
if (!queue) {
const connection = getConnection();
queue = queues[queueName] = new Queue(queueName, { connection });
}
return queue;
return (
NftxQueue.queues[queueName] ||
new NftxQueue(queueName, { connection: getConnection(queueName) })
);
};
export default getQueue;
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc