Adonis Queue Pro
Adonis queue pro is a queue-worker library for AdonisJS, it is backed by Kue and Kue-Scheduler.
Features
- Ace commands for generating jobs and start workers
- (Close-to) Full kue/kue-scheduler API supported including future/repeat job scheduling
- Multi-worker mode supported with full suite of AdonisJS API
- Produce/Consumer model for structuring your job
- Easy and Elegant API for scheduling and processing your jobs
Notices
- Currently only support Adonis V3.2, 4.0 support is scheduled for a later release. ( Any pull requests will be welcome :)
Installation
npm install --save adonis-queue-pro
Configuration
In your config/app.js, edit the following:
- add
'adonis-queue-pro/providers/QueueProvider'
to your providers array. - add
'adonis-queue-pro/providers/CommandProvider'
to your aceProviders array. - add
Queue: 'Adonis/Addon/Queue'
to your aliases array - add below commands to your commands array
'Adonis/Commands/Queue:Init'
'Adonis/Commands/Queue:Generate'
'Adonis/Commands/Queue:Work'
Consumer/Producer model
Instead of defining Job as a single entity, this library separates the responsibility of job into consumer and producer, here are the definitions:
Producer: Defines the static properties of the job, in kue's context, supported properties include priority, attempts, backOff, delay, ttl and unique. Documentations of each property can be found in Kue and Kue-scheduler's Github.
Consumer: Defines the runtime actions of the job, in kue's context, supported actions include concurrency and the process handler.
Example of a basic producer/consumer pair can be found by generating a sample job using the ./ace queue:generate
command.
CLi API
Initialize (Must be done first!)
$ ./ace queue:init
This will create the queue configuration file and placed in the config directory, which is very similar to kue-scheduler's config file.
This will also create the queue server adaptor in the bootstrap and APP_PATH directory, which will be the entry point for the queue worker process.
Create job
$ ./ace queue:generate SendEmail --jobId='send-email'
The option jobId
is optional, if not provided, the kue type for the job will be a kebab-case of the argument. i.e. SendEmail -> send-email.
This command will create job producers and consumers in directory configurable in config/queue.js with consumerPath and producerPath. Default to app/Jobs/{Consumers | Producers}.
The job consumers and producers will both run in Adonis framework context, thus you can easily use any supported libraries within the job file.
Run worker
$ ./ace queue:work 4
The argument defines the number of workers to run simultaneously. Default to 1 if not provided.
Notice: This command only support a simple fork
based process manager easy for local testing. It does not handle worker failure nor graceful restart. For production usage, you can use tools such as Supervisor or PM2, and the command will be node queue_server.js
in your app directory.
Job API
The producer job file supports Kue job properties which are defined as an ES6 get
property in the class, see example by running ./ace queue:generate
.
Refer to supported job properties above in the Consumer/Producer Model section.
The consumer job file supports Kue job's concurrecy defined as an ES6 static get
property in the class, see example by running ./ace queue:generate
.
The processing function is defined as a generator function * handle()
which can access constructor-injected payload using this.data
.
The producer job class also supports job events, listed below:
onInit(Kue/Job job)
onEnqueue(String jobType)
onStart(String jobType)
onPromotion(String jobType)
onRemove(String jobType)
onProgress(Float progress)
onComplete(Object data)
onFailed(Error error)
onFailedAttempts(Error error)
This producer job class itself is an Event Listener, thus you can export the data received from the job event to the outside world.
A useful scenario is to remove job by id, which is retrievable from the onInit
method:
onInit(job) {
this.emit('init', job.id);
}
job.on('init', id => Queue.remove(id));
Queue API
Access the queue
const Queue = use('Queue');
Push job on to queue
const ExampleJob = use('App/Jobs/Producer/ExampleJob');
Queue.dispatch(new ExampleJob({'data': 'whatever'}));
Queue.dispatch()
has a second optional String
argument default to 'now', which reflects the Kue-Scheduler API:
Queue.dispatch(new ExampleJob, 'now');
Queue.dispatch(new ExampleJob, 'every 2 seconds');
Queue.dispatch(new ExampleJob, '2 seconds from now');
Remove jobs
Remove a single job by id
Queue.remove(id)
.then(response => {}, error => {});
Clear all jobs
Queue.clear()
.then(response => {}, error => {});
Development
Contributions are welcomed ! This is an early start project so please send a pull request when you squashed a bug!
Todos
- 4.0 integration
- Write tests
- Complete Kue API integration
- Squash bugs
License
MIT