@imqueue/job
Advanced tools
Comparing version
164
index.d.ts
@@ -18,5 +18,23 @@ /*! | ||
*/ | ||
import { ILogger } from '@imqueue/core'; | ||
import { ILogger, IMessageQueue } from '@imqueue/core'; | ||
/** | ||
* Job queues options | ||
*/ | ||
export interface JobQueueOptions { | ||
/** | ||
* Name of the job queue. In worker/publisher mode worker and | ||
* publisher must share the same job queue name. | ||
* Mandatory. | ||
* | ||
* @type {string} | ||
*/ | ||
name: string; | ||
/** | ||
* Connection params of the queue engine cluster (typically - | ||
* host and port). By default the broker is redis. | ||
* Optional. | ||
* By default is [{ host: "localhost", port: 6379 }]. | ||
* | ||
* @type {Array<{host: string, port: number}>} | ||
*/ | ||
cluster?: { | ||
@@ -26,5 +44,34 @@ host: string; | ||
}[]; | ||
/** | ||
* Logger to be used for producing log and error messages. | ||
* Optional. | ||
* By default is console. | ||
* | ||
* @type {ILogger} | ||
*/ | ||
logger?: ILogger; | ||
/** | ||
* Safe message delivery or not? When safe delivery is enabled (by default) | ||
* queue is processing jobs with guarantied job data delivery. If process | ||
* fails or dies - job data is re-queued for future processing by another | ||
* worker. | ||
* Optional. | ||
* Default is true. | ||
* | ||
* @type {boolean} | ||
*/ | ||
safe?: boolean; | ||
/** | ||
* TTL in milliseconds of the job in worker queue during safe delivery. | ||
* If worker does not finish processing after this TTL - job is re-queued | ||
* for other workers to be processed. | ||
* Optional. | ||
* By default is 10000. | ||
*/ | ||
safeLockTtl?: number; | ||
/** | ||
* Job queue prefix in queue broker. | ||
* Optional. | ||
* By default is "imq-job". | ||
*/ | ||
prefix?: string; | ||
@@ -84,22 +131,25 @@ } | ||
} | ||
export interface AnyJobQueue<T> { | ||
name: string; | ||
readonly logger: ILogger; | ||
start(): Promise<T>; | ||
stop(): Promise<T>; | ||
destroy(): Promise<void>; | ||
} | ||
export interface AnyJobQueueWorker<T, U> { | ||
onPop(handler: JobQueuePopHandler<U>): T; | ||
} | ||
export interface AnyJobQueuePublisher<T, U> { | ||
push(job: U, options?: PushOptions): T; | ||
} | ||
/** | ||
* Implements simple scheduled job queue. Job scheduling is optional. It may | ||
* process jobs immediately or after specified delay for particular job. | ||
* It also allows to define max lifetime of the job in a queue, after which | ||
* the job is removed from a queue. | ||
* Supports graceful shutdown, if TERM or SIGINT is sent to the process. | ||
* Abstract job queue, handles base implementations of AnyJobQueue interface. | ||
*/ | ||
export default class JobQueue<T> { | ||
private imq; | ||
private handler; | ||
private options; | ||
private readonly logger; | ||
export declare abstract class BaseJobQueue<T, U> implements AnyJobQueue<T> { | ||
protected options: JobQueueOptions; | ||
protected imq: IMessageQueue; | ||
protected handler: JobQueuePopHandler<U>; | ||
readonly logger: ILogger; | ||
protected constructor(options: JobQueueOptions); | ||
/** | ||
* Constructor. Instantiates new JobQueue instance. | ||
* | ||
* @constructor | ||
* @param {JobQueueOptions} options | ||
*/ | ||
constructor(options: JobQueueOptions); | ||
/** | ||
* Full name of this queue | ||
@@ -113,11 +163,11 @@ * | ||
* | ||
* @return {Promise<JobQueue<T>>} - this queue | ||
* @return {Promise<T>} - this queue | ||
*/ | ||
start(): Promise<JobQueue<T>>; | ||
start(): Promise<T>; | ||
/** | ||
* Stops processing job queue | ||
* | ||
* @return {Promise<JobQueue<T>>} - this queue | ||
* @return {Promise<T>} - this queue | ||
*/ | ||
stop(): Promise<JobQueue<T>>; | ||
stop(): Promise<T>; | ||
/** | ||
@@ -129,3 +179,16 @@ * Destroys job queue | ||
destroy(): Promise<void>; | ||
} | ||
/** | ||
* Implements simple scheduled job queue publisher. Job queue publisher is only | ||
* responsible for pushing queue messages. | ||
*/ | ||
export declare class JobQueuePublisher<T> extends BaseJobQueue<JobQueuePublisher<T>, T> implements AnyJobQueuePublisher<JobQueuePublisher<T>, T> { | ||
/** | ||
* Constructor. Instantiates new JobQueue instance. | ||
* | ||
* @constructor | ||
* @param {JobQueueOptions} options | ||
*/ | ||
constructor(options: JobQueueOptions); | ||
/** | ||
* Pushes new job to this queue | ||
@@ -135,6 +198,19 @@ * | ||
* @param {PushOptions} options - push options, like delay and ttl for job | ||
* @return {Promise<JobQueue<T>>} - this queue | ||
* @return {JobQueue<T>} - this queue | ||
*/ | ||
push(job: T, options?: PushOptions): this; | ||
push(job: T, options?: PushOptions): JobQueuePublisher<T>; | ||
} | ||
/** | ||
* Implements simple scheduled job queue worker. Job queue worker is only | ||
* responsible for processing queue messages. | ||
*/ | ||
export declare class JobQueueWorker<T> extends BaseJobQueue<JobQueueWorker<T>, T> implements AnyJobQueueWorker<JobQueueWorker<T>, T> { | ||
/** | ||
* Constructor. Instantiates new JobQueue instance. | ||
* | ||
* @constructor | ||
* @param {JobQueueOptions} options | ||
*/ | ||
constructor(options: JobQueueOptions); | ||
/** | ||
* Sets up job handler, which is called when the job is popped from this | ||
@@ -144,5 +220,45 @@ * queue. | ||
* @param {JobQueuePopHandler<T>} handler - job pop handler | ||
* @return {JobQueueWorker<T>} - this queue | ||
*/ | ||
onPop(handler: JobQueuePopHandler<T>): JobQueueWorker<T>; | ||
} | ||
/** | ||
* Implements simple scheduled job queue. Job scheduling is optional. It may | ||
* process jobs immediately or after specified delay for particular job. | ||
* It also allows to define max lifetime of the job in a queue, after which | ||
* the job is removed from a queue. | ||
* Supports graceful shutdown, if TERM or SIGINT is sent to the process. | ||
*/ | ||
export default class JobQueue<T> extends BaseJobQueue<JobQueue<T>, T> implements AnyJobQueueWorker<JobQueue<T>, T>, AnyJobQueuePublisher<JobQueue<T>, T> { | ||
/** | ||
* Constructor. Instantiates new JobQueue instance. | ||
* | ||
* @constructor | ||
* @param {JobQueueOptions} options | ||
*/ | ||
constructor(options: JobQueueOptions); | ||
/** | ||
* Starts processing job queue. Throws if handler is not set before start. | ||
* | ||
* @throws {TypeError} | ||
* @return {Promise<T>} - this queue | ||
*/ | ||
start(): Promise<JobQueue<T>>; | ||
/** | ||
* Pushes new job to this queue. Throws if handler is not set. | ||
* | ||
* @throws {TypeError} | ||
* @param {T} job - job data itself of user defined type | ||
* @param {PushOptions} options - push options, like delay and ttl for job | ||
* @return {JobQueue<T>} - this queue | ||
*/ | ||
push(job: T, options?: PushOptions): JobQueue<T>; | ||
/** | ||
* Sets up job handler, which is called when the job is popped from this | ||
* queue. | ||
* | ||
* @param {JobQueuePopHandler<T>} handler - job pop handler | ||
* @return {JobQueue<T>} - this queue | ||
*/ | ||
onPop(handler: JobQueuePopHandler<T>): JobQueue<T>; | ||
} |
159
index.js
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.JobQueueWorker = exports.JobQueuePublisher = exports.BaseJobQueue = void 0; | ||
/*! | ||
@@ -21,28 +22,9 @@ * Job Queue for @imqueue framework | ||
const core_1 = require("@imqueue/core"); | ||
// noinspection JSUnusedGlobalSymbols | ||
/** | ||
* Implements simple scheduled job queue. Job scheduling is optional. It may | ||
* process jobs immediately or after specified delay for particular job. | ||
* It also allows to define max lifetime of the job in a queue, after which | ||
* the job is removed from a queue. | ||
* Supports graceful shutdown, if TERM or SIGINT is sent to the process. | ||
* Abstract job queue, handles base implementations of AnyJobQueue interface. | ||
*/ | ||
class JobQueue { | ||
/** | ||
* Constructor. Instantiates new JobQueue instance. | ||
* | ||
* @constructor | ||
* @param {JobQueueOptions} options | ||
*/ | ||
class BaseJobQueue { | ||
constructor(options) { | ||
this.options = options; | ||
this.logger = options.logger || console; | ||
this.options = options; | ||
this.imq = core_1.default.create(options.name, { | ||
cluster: options.cluster, | ||
cleanup: false, | ||
safeDelivery: options.safe, | ||
safeDeliveryTtl: options.safeLockTtl, | ||
logger: this.logger, | ||
prefix: options.prefix || 'job-queue', | ||
}); | ||
} | ||
@@ -60,8 +42,5 @@ /** | ||
* | ||
* @return {Promise<JobQueue<T>>} - this queue | ||
* @return {Promise<T>} - this queue | ||
*/ | ||
async start() { | ||
if (!this.handler) { | ||
throw new TypeError('Message handler is not set, can not start job queue!'); | ||
} | ||
await this.imq.start(); | ||
@@ -73,3 +52,3 @@ return this; | ||
* | ||
* @return {Promise<JobQueue<T>>} - this queue | ||
* @return {Promise<T>} - this queue | ||
*/ | ||
@@ -88,3 +67,41 @@ async stop() { | ||
} | ||
} | ||
exports.BaseJobQueue = BaseJobQueue; | ||
/** | ||
* Creates and returns IMQOptions derived from a given JobQueueOptions | ||
* | ||
* @param {JobQueueOptions} options | ||
* @param {ILogger} logger | ||
* @return {Partial<IMQOptions>} | ||
* @private | ||
*/ | ||
function toIMQOptions(options, logger) { | ||
return { | ||
cluster: options.cluster, | ||
cleanup: false, | ||
safeDelivery: typeof options.safe === 'undefined' | ||
? true : options.safe, | ||
safeDeliveryTtl: typeof options.safeLockTtl === 'undefined' | ||
? 10000 : options.safeLockTtl, | ||
prefix: options.prefix || 'imq-job', | ||
logger, | ||
}; | ||
} | ||
// noinspection JSUnusedGlobalSymbols | ||
/** | ||
* Implements simple scheduled job queue publisher. Job queue publisher is only | ||
* responsible for pushing queue messages. | ||
*/ | ||
class JobQueuePublisher extends BaseJobQueue { | ||
/** | ||
* Constructor. Instantiates new JobQueue instance. | ||
* | ||
* @constructor | ||
* @param {JobQueueOptions} options | ||
*/ | ||
constructor(options) { | ||
super(options); | ||
this.imq = core_1.default.create(options.name, toIMQOptions(options, this.logger), core_1.IMQMode.PUBLISHER); | ||
} | ||
/** | ||
* Pushes new job to this queue | ||
@@ -94,13 +111,28 @@ * | ||
* @param {PushOptions} options - push options, like delay and ttl for job | ||
* @return {Promise<JobQueue<T>>} - this queue | ||
* @return {JobQueue<T>} - this queue | ||
*/ | ||
push(job, options) { | ||
options = options || {}; | ||
if (!this.handler) { | ||
throw new TypeError('Message handler is not set, can not enqueue data!'); | ||
} | ||
this.imq.send(this.name, Object.assign(Object.assign({ job: job }, (options.ttl ? { expire: Date.now() + options.ttl } : {})), (options.delay ? { delay: options.delay } : {})), options.delay).catch(err => this.logger.log('JobQueue push error:', err)); | ||
return this; | ||
} | ||
} | ||
exports.JobQueuePublisher = JobQueuePublisher; | ||
// noinspection JSUnusedGlobalSymbols | ||
/** | ||
* Implements simple scheduled job queue worker. Job queue worker is only | ||
* responsible for processing queue messages. | ||
*/ | ||
class JobQueueWorker extends BaseJobQueue { | ||
/** | ||
* Constructor. Instantiates new JobQueue instance. | ||
* | ||
* @constructor | ||
* @param {JobQueueOptions} options | ||
*/ | ||
constructor(options) { | ||
super(options); | ||
this.imq = core_1.default.create(options.name, toIMQOptions(options, this.logger), core_1.IMQMode.WORKER); | ||
} | ||
/** | ||
* Sets up job handler, which is called when the job is popped from this | ||
@@ -110,3 +142,3 @@ * queue. | ||
* @param {JobQueuePopHandler<T>} handler - job pop handler | ||
* @return {JobQueue<T>} - this queue | ||
* @return {JobQueueWorker<T>} - this queue | ||
*/ | ||
@@ -119,5 +151,2 @@ onPop(handler) { | ||
let rescheduleDelay; | ||
if (typeof expire === 'number' && expire <= Date.now()) { | ||
return; // remove job from queue | ||
} | ||
try { | ||
@@ -136,2 +165,5 @@ rescheduleDelay = this.handler(job); | ||
} | ||
if (typeof expire === 'number' && expire <= Date.now()) { | ||
return; // remove job from queue | ||
} | ||
if (typeof rescheduleDelay === 'number' && rescheduleDelay >= 0) { | ||
@@ -144,3 +176,60 @@ await this.imq.send(this.name, message, rescheduleDelay); | ||
} | ||
exports.JobQueueWorker = JobQueueWorker; | ||
// noinspection JSUnusedGlobalSymbols | ||
/** | ||
* Implements simple scheduled job queue. Job scheduling is optional. It may | ||
* process jobs immediately or after specified delay for particular job. | ||
* It also allows to define max lifetime of the job in a queue, after which | ||
* the job is removed from a queue. | ||
* Supports graceful shutdown, if TERM or SIGINT is sent to the process. | ||
*/ | ||
class JobQueue extends BaseJobQueue { | ||
/** | ||
* Constructor. Instantiates new JobQueue instance. | ||
* | ||
* @constructor | ||
* @param {JobQueueOptions} options | ||
*/ | ||
constructor(options) { | ||
super(options); | ||
this.imq = core_1.default.create(options.name, toIMQOptions(options, this.logger)); | ||
} | ||
/** | ||
* Starts processing job queue. Throws if handler is not set before start. | ||
* | ||
* @throws {TypeError} | ||
* @return {Promise<T>} - this queue | ||
*/ | ||
async start() { | ||
if (!this.handler) { | ||
throw new TypeError('Message handler is not set, can not start job queue!'); | ||
} | ||
return await super.start(); | ||
} | ||
/** | ||
* Pushes new job to this queue. Throws if handler is not set. | ||
* | ||
* @throws {TypeError} | ||
* @param {T} job - job data itself of user defined type | ||
* @param {PushOptions} options - push options, like delay and ttl for job | ||
* @return {JobQueue<T>} - this queue | ||
*/ | ||
push(job, options) { | ||
if (!this.handler) { | ||
throw new TypeError('Message handler is not set, can not enqueue data!'); | ||
} | ||
return JobQueuePublisher.prototype.push.call(this, job, options); | ||
} | ||
/** | ||
* Sets up job handler, which is called when the job is popped from this | ||
* queue. | ||
* | ||
* @param {JobQueuePopHandler<T>} handler - job pop handler | ||
* @return {JobQueue<T>} - this queue | ||
*/ | ||
onPop(handler) { | ||
return JobQueueWorker.prototype.onPop.call(this, handler); | ||
} | ||
} | ||
exports.default = JobQueue; | ||
//# sourceMappingURL=index.js.map |
{ | ||
"name": "@imqueue/job", | ||
"version": "1.0.2", | ||
"version": "1.1.0", | ||
"description": "Simple job queue", | ||
@@ -39,3 +39,3 @@ "keywords": [ | ||
"dependencies": { | ||
"@imqueue/core": "^1.8.1" | ||
"@imqueue/core": "^1.11.0" | ||
}, | ||
@@ -42,0 +42,0 @@ "devDependencies": { |
@@ -7,3 +7,4 @@ # Simple Job Queue (@imqueue/job) | ||
Simple JSON messaging based Job Queue for managing backand background jobs. | ||
Simple job queue using JSON messaging for managing backand background jobs. | ||
Backed up by Redis. | ||
@@ -13,8 +14,13 @@ # Features | ||
Based on @imqueue/core it provides Job Queue functionality including: | ||
- **Safe message handling** - no data loss! | ||
- **Supports gzip compression for messages** (decrease traffic usage, but | ||
- **Safe job processing** - no data loss! | ||
- **Fast processing** - by events, not timers, low resource usage. | ||
- **Supports gzip compression** for job data (decrease traffic usage, but | ||
slower). | ||
- **Concurrent workers model supported**, the same queue can have multiple | ||
consumers. | ||
- **Delayed jobs** - when the job should be scheduled. | ||
consumers with no data loss and natural load-balancing. | ||
- **Scheduleable jobs** - jobs can be delayed by specified time, | ||
granularity - milliseconds. | ||
- **Job expiration supported** - job can live forever or specified time, | ||
granularity - milliseconds. | ||
- **Publisher/Worker/Both** models of work with queues supported. | ||
- **TypeScript included!** | ||
@@ -35,4 +41,5 @@ | ||
~~~typescript | ||
import JobQueue from '@imqueue/job'; | ||
import JobQueue, { JobQueuePublisher, JobQueueWorker } from '@imqueue/job'; | ||
// Standard job queue (both - worker and publisher) example | ||
new JobQueue<string>({ name: 'TestJob' }) | ||
@@ -46,3 +53,19 @@ .onPop(job => console.log(job)) | ||
.push('Hello, world after 10 sec!', { delay: 10000 }), | ||
); | ||
); | ||
// Job queue publisher-only example | ||
new JobQueuePublisher<string>({ name: 'CustomTestJob' }) | ||
.start().then(queue => queue | ||
.push('Hello, job world!') | ||
.push('Hello, job world after 1 sec!', { delay: 1000 }) | ||
.push('Hello, job world after 2 sec!', { delay: 2000 }) | ||
.push('Hello, job world after 5 sec!', { delay: 5000 }) | ||
.push('Hello, job world after 10 sec!', { delay: 10000 }), | ||
); | ||
// Job queue worker only example | ||
new JobQueueWorker<string>({ name: 'CustomTestJob' }) | ||
.onPop(job => console.log(job)) | ||
.start() | ||
.catch(err => console.error(err)); | ||
~~~ | ||
@@ -49,0 +72,0 @@ |
26407
40.49%618
49.64%72
46.94%Updated