You're Invited: Meet the Socket team at BSidesSF and RSAC - April 27 - May 1.RSVP
Socket
Sign inDemoInstall
Socket

@imqueue/job

Package Overview
Dependencies
Maintainers
2
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@imqueue/job - npm Package Compare versions

Comparing version

to
1.1.0

164

index.d.ts

@@ -18,5 +18,23 @@ /*!

*/
import { ILogger } from '@imqueue/core';
import { ILogger, IMessageQueue } from '@imqueue/core';
/**
* Job queues options
*/
export interface JobQueueOptions {
/**
* Name of the job queue. In worker/publisher mode worker and
* publisher must share the same job queue name.
* Mandatory.
*
* @type {string}
*/
name: string;
/**
* Connection params of the queue engine cluster (typically -
* host and port). By default the broker is redis.
* Optional.
* By default is [{ host: "localhost", port: 6379 }].
*
* @type {Array<{host: string, port: number}>}
*/
cluster?: {

@@ -26,5 +44,34 @@ host: string;

}[];
/**
* Logger to be used for producing log and error messages.
* Optional.
* By default is console.
*
* @type {ILogger}
*/
logger?: ILogger;
/**
* Safe message delivery or not? When safe delivery is enabled (by default)
* queue is processing jobs with guarantied job data delivery. If process
* fails or dies - job data is re-queued for future processing by another
* worker.
* Optional.
* Default is true.
*
* @type {boolean}
*/
safe?: boolean;
/**
* TTL in milliseconds of the job in worker queue during safe delivery.
* If worker does not finish processing after this TTL - job is re-queued
* for other workers to be processed.
* Optional.
* By default is 10000.
*/
safeLockTtl?: number;
/**
* Job queue prefix in queue broker.
* Optional.
* By default is "imq-job".
*/
prefix?: string;

@@ -84,22 +131,25 @@ }

}
export interface AnyJobQueue<T> {
name: string;
readonly logger: ILogger;
start(): Promise<T>;
stop(): Promise<T>;
destroy(): Promise<void>;
}
export interface AnyJobQueueWorker<T, U> {
onPop(handler: JobQueuePopHandler<U>): T;
}
export interface AnyJobQueuePublisher<T, U> {
push(job: U, options?: PushOptions): T;
}
/**
* Implements simple scheduled job queue. Job scheduling is optional. It may
* process jobs immediately or after specified delay for particular job.
* It also allows to define max lifetime of the job in a queue, after which
* the job is removed from a queue.
* Supports graceful shutdown, if TERM or SIGINT is sent to the process.
* Abstract job queue, handles base implementations of AnyJobQueue interface.
*/
export default class JobQueue<T> {
private imq;
private handler;
private options;
private readonly logger;
export declare abstract class BaseJobQueue<T, U> implements AnyJobQueue<T> {
protected options: JobQueueOptions;
protected imq: IMessageQueue;
protected handler: JobQueuePopHandler<U>;
readonly logger: ILogger;
protected constructor(options: JobQueueOptions);
/**
* Constructor. Instantiates new JobQueue instance.
*
* @constructor
* @param {JobQueueOptions} options
*/
constructor(options: JobQueueOptions);
/**
* Full name of this queue

@@ -113,11 +163,11 @@ *

*
* @return {Promise<JobQueue<T>>} - this queue
* @return {Promise<T>} - this queue
*/
start(): Promise<JobQueue<T>>;
start(): Promise<T>;
/**
* Stops processing job queue
*
* @return {Promise<JobQueue<T>>} - this queue
* @return {Promise<T>} - this queue
*/
stop(): Promise<JobQueue<T>>;
stop(): Promise<T>;
/**

@@ -129,3 +179,16 @@ * Destroys job queue

destroy(): Promise<void>;
}
/**
* Implements simple scheduled job queue publisher. Job queue publisher is only
* responsible for pushing queue messages.
*/
export declare class JobQueuePublisher<T> extends BaseJobQueue<JobQueuePublisher<T>, T> implements AnyJobQueuePublisher<JobQueuePublisher<T>, T> {
/**
* Constructor. Instantiates new JobQueue instance.
*
* @constructor
* @param {JobQueueOptions} options
*/
constructor(options: JobQueueOptions);
/**
* Pushes new job to this queue

@@ -135,6 +198,19 @@ *

* @param {PushOptions} options - push options, like delay and ttl for job
* @return {Promise<JobQueue<T>>} - this queue
* @return {JobQueue<T>} - this queue
*/
push(job: T, options?: PushOptions): this;
push(job: T, options?: PushOptions): JobQueuePublisher<T>;
}
/**
* Implements simple scheduled job queue worker. Job queue worker is only
* responsible for processing queue messages.
*/
export declare class JobQueueWorker<T> extends BaseJobQueue<JobQueueWorker<T>, T> implements AnyJobQueueWorker<JobQueueWorker<T>, T> {
/**
* Constructor. Instantiates new JobQueue instance.
*
* @constructor
* @param {JobQueueOptions} options
*/
constructor(options: JobQueueOptions);
/**
* Sets up job handler, which is called when the job is popped from this

@@ -144,5 +220,45 @@ * queue.

* @param {JobQueuePopHandler<T>} handler - job pop handler
* @return {JobQueueWorker<T>} - this queue
*/
onPop(handler: JobQueuePopHandler<T>): JobQueueWorker<T>;
}
/**
* Implements simple scheduled job queue. Job scheduling is optional. It may
* process jobs immediately or after specified delay for particular job.
* It also allows to define max lifetime of the job in a queue, after which
* the job is removed from a queue.
* Supports graceful shutdown, if TERM or SIGINT is sent to the process.
*/
export default class JobQueue<T> extends BaseJobQueue<JobQueue<T>, T> implements AnyJobQueueWorker<JobQueue<T>, T>, AnyJobQueuePublisher<JobQueue<T>, T> {
/**
* Constructor. Instantiates new JobQueue instance.
*
* @constructor
* @param {JobQueueOptions} options
*/
constructor(options: JobQueueOptions);
/**
* Starts processing job queue. Throws if handler is not set before start.
*
* @throws {TypeError}
* @return {Promise<T>} - this queue
*/
start(): Promise<JobQueue<T>>;
/**
* Pushes new job to this queue. Throws if handler is not set.
*
* @throws {TypeError}
* @param {T} job - job data itself of user defined type
* @param {PushOptions} options - push options, like delay and ttl for job
* @return {JobQueue<T>} - this queue
*/
push(job: T, options?: PushOptions): JobQueue<T>;
/**
* Sets up job handler, which is called when the job is popped from this
* queue.
*
* @param {JobQueuePopHandler<T>} handler - job pop handler
* @return {JobQueue<T>} - this queue
*/
onPop(handler: JobQueuePopHandler<T>): JobQueue<T>;
}

159

index.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.JobQueueWorker = exports.JobQueuePublisher = exports.BaseJobQueue = void 0;
/*!

@@ -21,28 +22,9 @@ * Job Queue for @imqueue framework

const core_1 = require("@imqueue/core");
// noinspection JSUnusedGlobalSymbols
/**
* Implements simple scheduled job queue. Job scheduling is optional. It may
* process jobs immediately or after specified delay for particular job.
* It also allows to define max lifetime of the job in a queue, after which
* the job is removed from a queue.
* Supports graceful shutdown, if TERM or SIGINT is sent to the process.
* Abstract job queue, handles base implementations of AnyJobQueue interface.
*/
class JobQueue {
/**
* Constructor. Instantiates new JobQueue instance.
*
* @constructor
* @param {JobQueueOptions} options
*/
class BaseJobQueue {
constructor(options) {
this.options = options;
this.logger = options.logger || console;
this.options = options;
this.imq = core_1.default.create(options.name, {
cluster: options.cluster,
cleanup: false,
safeDelivery: options.safe,
safeDeliveryTtl: options.safeLockTtl,
logger: this.logger,
prefix: options.prefix || 'job-queue',
});
}

@@ -60,8 +42,5 @@ /**

*
* @return {Promise<JobQueue<T>>} - this queue
* @return {Promise<T>} - this queue
*/
async start() {
if (!this.handler) {
throw new TypeError('Message handler is not set, can not start job queue!');
}
await this.imq.start();

@@ -73,3 +52,3 @@ return this;

*
* @return {Promise<JobQueue<T>>} - this queue
* @return {Promise<T>} - this queue
*/

@@ -88,3 +67,41 @@ async stop() {

}
}
exports.BaseJobQueue = BaseJobQueue;
/**
* Creates and returns IMQOptions derived from a given JobQueueOptions
*
* @param {JobQueueOptions} options
* @param {ILogger} logger
* @return {Partial<IMQOptions>}
* @private
*/
function toIMQOptions(options, logger) {
return {
cluster: options.cluster,
cleanup: false,
safeDelivery: typeof options.safe === 'undefined'
? true : options.safe,
safeDeliveryTtl: typeof options.safeLockTtl === 'undefined'
? 10000 : options.safeLockTtl,
prefix: options.prefix || 'imq-job',
logger,
};
}
// noinspection JSUnusedGlobalSymbols
/**
* Implements simple scheduled job queue publisher. Job queue publisher is only
* responsible for pushing queue messages.
*/
class JobQueuePublisher extends BaseJobQueue {
/**
* Constructor. Instantiates new JobQueue instance.
*
* @constructor
* @param {JobQueueOptions} options
*/
constructor(options) {
super(options);
this.imq = core_1.default.create(options.name, toIMQOptions(options, this.logger), core_1.IMQMode.PUBLISHER);
}
/**
* Pushes new job to this queue

@@ -94,13 +111,28 @@ *

* @param {PushOptions} options - push options, like delay and ttl for job
* @return {Promise<JobQueue<T>>} - this queue
* @return {JobQueue<T>} - this queue
*/
push(job, options) {
options = options || {};
if (!this.handler) {
throw new TypeError('Message handler is not set, can not enqueue data!');
}
this.imq.send(this.name, Object.assign(Object.assign({ job: job }, (options.ttl ? { expire: Date.now() + options.ttl } : {})), (options.delay ? { delay: options.delay } : {})), options.delay).catch(err => this.logger.log('JobQueue push error:', err));
return this;
}
}
exports.JobQueuePublisher = JobQueuePublisher;
// noinspection JSUnusedGlobalSymbols
/**
* Implements simple scheduled job queue worker. Job queue worker is only
* responsible for processing queue messages.
*/
class JobQueueWorker extends BaseJobQueue {
/**
* Constructor. Instantiates new JobQueue instance.
*
* @constructor
* @param {JobQueueOptions} options
*/
constructor(options) {
super(options);
this.imq = core_1.default.create(options.name, toIMQOptions(options, this.logger), core_1.IMQMode.WORKER);
}
/**
* Sets up job handler, which is called when the job is popped from this

@@ -110,3 +142,3 @@ * queue.

* @param {JobQueuePopHandler<T>} handler - job pop handler
* @return {JobQueue<T>} - this queue
* @return {JobQueueWorker<T>} - this queue
*/

@@ -119,5 +151,2 @@ onPop(handler) {

let rescheduleDelay;
if (typeof expire === 'number' && expire <= Date.now()) {
return; // remove job from queue
}
try {

@@ -136,2 +165,5 @@ rescheduleDelay = this.handler(job);

}
if (typeof expire === 'number' && expire <= Date.now()) {
return; // remove job from queue
}
if (typeof rescheduleDelay === 'number' && rescheduleDelay >= 0) {

@@ -144,3 +176,60 @@ await this.imq.send(this.name, message, rescheduleDelay);

}
exports.JobQueueWorker = JobQueueWorker;
// noinspection JSUnusedGlobalSymbols
/**
* Implements simple scheduled job queue. Job scheduling is optional. It may
* process jobs immediately or after specified delay for particular job.
* It also allows to define max lifetime of the job in a queue, after which
* the job is removed from a queue.
* Supports graceful shutdown, if TERM or SIGINT is sent to the process.
*/
class JobQueue extends BaseJobQueue {
/**
* Constructor. Instantiates new JobQueue instance.
*
* @constructor
* @param {JobQueueOptions} options
*/
constructor(options) {
super(options);
this.imq = core_1.default.create(options.name, toIMQOptions(options, this.logger));
}
/**
* Starts processing job queue. Throws if handler is not set before start.
*
* @throws {TypeError}
* @return {Promise<T>} - this queue
*/
async start() {
if (!this.handler) {
throw new TypeError('Message handler is not set, can not start job queue!');
}
return await super.start();
}
/**
* Pushes new job to this queue. Throws if handler is not set.
*
* @throws {TypeError}
* @param {T} job - job data itself of user defined type
* @param {PushOptions} options - push options, like delay and ttl for job
* @return {JobQueue<T>} - this queue
*/
push(job, options) {
if (!this.handler) {
throw new TypeError('Message handler is not set, can not enqueue data!');
}
return JobQueuePublisher.prototype.push.call(this, job, options);
}
/**
* Sets up job handler, which is called when the job is popped from this
* queue.
*
* @param {JobQueuePopHandler<T>} handler - job pop handler
* @return {JobQueue<T>} - this queue
*/
onPop(handler) {
return JobQueueWorker.prototype.onPop.call(this, handler);
}
}
exports.default = JobQueue;
//# sourceMappingURL=index.js.map
{
"name": "@imqueue/job",
"version": "1.0.2",
"version": "1.1.0",
"description": "Simple job queue",

@@ -39,3 +39,3 @@ "keywords": [

"dependencies": {
"@imqueue/core": "^1.8.1"
"@imqueue/core": "^1.11.0"
},

@@ -42,0 +42,0 @@ "devDependencies": {

@@ -7,3 +7,4 @@ # Simple Job Queue (@imqueue/job)

Simple JSON messaging based Job Queue for managing backand background jobs.
Simple job queue using JSON messaging for managing backand background jobs.
Backed up by Redis.

@@ -13,8 +14,13 @@ # Features

Based on @imqueue/core it provides Job Queue functionality including:
- **Safe message handling** - no data loss!
- **Supports gzip compression for messages** (decrease traffic usage, but
- **Safe job processing** - no data loss!
- **Fast processing** - by events, not timers, low resource usage.
- **Supports gzip compression** for job data (decrease traffic usage, but
slower).
- **Concurrent workers model supported**, the same queue can have multiple
consumers.
- **Delayed jobs** - when the job should be scheduled.
consumers with no data loss and natural load-balancing.
- **Scheduleable jobs** - jobs can be delayed by specified time,
granularity - milliseconds.
- **Job expiration supported** - job can live forever or specified time,
granularity - milliseconds.
- **Publisher/Worker/Both** models of work with queues supported.
- **TypeScript included!**

@@ -35,4 +41,5 @@

~~~typescript
import JobQueue from '@imqueue/job';
import JobQueue, { JobQueuePublisher, JobQueueWorker } from '@imqueue/job';
// Standard job queue (both - worker and publisher) example
new JobQueue<string>({ name: 'TestJob' })

@@ -46,3 +53,19 @@ .onPop(job => console.log(job))

.push('Hello, world after 10 sec!', { delay: 10000 }),
);
);
// Job queue publisher-only example
new JobQueuePublisher<string>({ name: 'CustomTestJob' })
.start().then(queue => queue
.push('Hello, job world!')
.push('Hello, job world after 1 sec!', { delay: 1000 })
.push('Hello, job world after 2 sec!', { delay: 2000 })
.push('Hello, job world after 5 sec!', { delay: 5000 })
.push('Hello, job world after 10 sec!', { delay: 10000 }),
);
// Job queue worker only example
new JobQueueWorker<string>({ name: 'CustomTestJob' })
.onPop(job => console.log(job))
.start()
.catch(err => console.error(err));
~~~

@@ -49,0 +72,0 @@