Programma
A light weight, simple FIFO Queue backed by PostgresSQL for delayed tasks.
Motivation
- Long scheduling jobs in future is hard to achieve using some of the most reliable distributed queues e.g, SQS with an upper limit of 12 hours visibility timeout. For long scheduled tasks message needs to be picked up and delayed again depending on how long in the future a job is scheduled
- Redis pub/sub model with delay time suits really well for scheduling jobs. But the problem is it is not backed by disk. It works really well for the immediate jobs where you need high throughput, but to keep jobs that has to run way in the future consumes lots of memory and it is costly
- While Redis cluster and cloud provided distributed queues elegantly solves scaling challenges, sometimes use-case is to track these jobs in one place for simplicity, like SQL/NoSQL store without too much effort in a common schema model. We attempt to track job states in Postgres through Programma
- Programma goal is not to implement a solid Job Worker logic. You could use it as your end-to-end job processor for simpler low latency tasks, but we recommend to use Redis, RabbitMQ or a queue like SQS for fanning out or distributing the workload to them through Programma
- The goal of Programma is to expose a very flexible and simple API. Where client could nudge the job processing lifecycle by calling utility methods without us dictating specific lifecycle of a job
Quickstart
npm install programma
import { Programma } from 'programma'
const programma = new Programma({
connectionString: '<database-connection-string>',
})
await programma.start()
await programma.addJob('sendEmail', {
data: { email: 'xyz@gmail.com'},
runAfterDate: '2020-08-27T20:02:18.255Z',
})
programma.processJobs({ topicName: 'sendEmail', heartBeat: 10 }, async (job: IReceiveJob) => {
await programma.moveJobToProcessing(job.id)
})
Features
- Lightweight: ~300 lines of code
- Handles schema creation for you
- Programma API is highly customizable. It's up-to you how you want to handle job life-cycle or retry logic
- Guarantee delivery of exactly to only one processor within the retry timeout period, without too much locking or effecting performance. Thanks to Postgres SKIP LOCK feature
- Programma ensures a job is delivered and claimed by the processor with retryAfterSeconds logic until job status is changed. This parameter is customizable and you can use it for exponential backoff logic as well by changing the retryAfterSeconds
- Received messages that are not changed to either Processing, Completed or FAILED state will appear again after retryAfterSecond timeout. Default is 30seconds
- Promise based API and written in typescript
Use with Queuing
Programma for us makes more sense to schedule task/work to be done, and to track it's progress and state or to fan-out multiple tasks to a queue system. You can parallelize Programma by running on multiple different Node processes giving maxJobs limit per heartbeat/interval where you can achieve horizontal scalability. Programma also handles schema/table creation so it could be used with a multiple PostgresDB to run the jobs. For parallelizing and doing the work you could use Redis based queue system like bull or rsmq using a shared Redis server multiple Node.js processes can send / receive messages
Following is an example to use it with awesome bull queue
const bullRedisQueue = new Queue('sendEmail', 'redis://127.0.0.1:6379');
await programma.addJob('sendEmail', {
data: { email: 'xyz@gmail.com'},
runAfterDate: '2020-08-27T20:02:18.255Z',
retryAfterSeconds: 60,
})
programma.processJobs({ topicName: 'sendEmail', heartBeat: 10 }, async (job: IReceiveJob) => {
await bullRedisQueue.add(
{ id: job.id, data: job.data, attributes: job.data },
{ retries: 3, backoff: 20000,timeout: 15000 }
)
await programma.moveJobToProcessing(job.id)
})
bullRedisQueue.process(async (job, done) => {
await programma.moveJobToDone(jod.data.id)
return Promise.resolve()
})
bullRedisQueue.on('failed', (id, error) => {
const job = await bullRedisQueue.getJob(id)
await programma.moveJobToFailed(jod.data.id)
await programma.setAttributes(job.id, { error: error })
})
Use without Queuing
For tasks that does not involve heavy computation (i.e, does not block event-loop or not to put a lot of back-pressure) programma could be used as a end-to-end solution or alternatively for high latency tasks it could be used with smaller batch sizes per interval. where work could be parallelized between multiple nodes that can poll the same topic
await programma.addJob('sendPushNotification', {
data: { token: '123213123' },
retryAfterSeconds: 60,
attributes: { maxAttempts: 5 },
retryAfterSeconds: 5,
})
programma.processJobs({ topicName: 'sendPushNotification', heartBeat: 10, maxJobs: 100 }, async (job: IReceiveJob) => {
if (job.attributes.attempts > job.attributes.maxAttempts) {
await programma.moveJobToFailed(job.id)
}
try {
await pushNotifyApi(job.data)
await programma.moveJobToDone(job.id)
} catch (e) {
await programma.setAttributes(job.id, { attempts: attempts ? attempts + 1 : 1 })
await programma.setRetryAfterSeconds(60)
}
})
API
Instance Creation
For the config programma uses node-postgres pool config to create an internal pool of connection. You can refer to PoolConfig interface
export interface IProgrammaConstructor {
new (config: PoolConfig, schemaName: string): IProgramma
}
const program = new Programma({
connectionString: `postgres://<user-name>:<password>@localhost:5432/postgres`,
max: 50,
}, 'mySchemaName')
await program.start()
Programma Methods
export interface IProgramma {
addJob(topicName: string, job: IJobConfig): Promise<string | null>
receiveJobs(config: IReceiveMessageConfig, handler: IHandlerCallback): void
deleteJob(id: string): Promise<boolean>
moveJobToProcessing(id: string): Promise<boolean>
moveJobToDone(id: string, deleteOnComplete: boolean): Promise<boolean>
moveJobToFailed(id: string, deleteOnFail: boolean): Promise<boolean>
getJob(id: string): Promise<IJobDetail | null>
setAttributes(id: string, attributes: object): Promise<boolean>
setRetryAfterSeconds(id: string, seconds: number): Promise<boolean>
setJobStartDate(id: string, startDate: string | Date): Promise<boolean>
start(): Promise<void>
shutdown(): void
}
addJob(topicName: string, job: IJobConfig): Promise<string | null>
export interface IJobConfig {
data: {}
attributes?: {}
runAfterSeconds?: number
runAfterDate?: string | Date
retryAfterSeconds?: number | null
}
if runAfterDate or runAfterSeconds is not provided currentTime will be picked by default so job could be run in next pooling interval
Job uuid will be returning on success
receiveJobs(config: IReceiveMessageConfig, handler: IHandlerCallback): void
export interface IReceiveMessageConfig {
maxJobs?: number
heartBeat?: number
topicName: string
}
export interface IHandlerCallback {
(job: IReceiveJob): void
}
export interface IReceiveJob {
id: string
data: Object
attributes: Object
}
getJob(id: string): Promise<IJobDetail | null>
export interface IJobDetail {
id: string
topicName: string
data: Object
attributes: Object
state: JobStates
start_after: string
started_at: string | null
created_at: string
retry_after_seconds: number
}
move to status methods
All the change status methods follows very self descriptive interface. For moveJobToDone, moveJobToFailed if a seconds boolean parameter true is passed it will delete the job from table. Following are the job states
export enum JobStates {
CREATED = 'created',
ACTIVE = 'active',
PROCESSING = 'processing',
FAILED = 'failed',
COMPLETED = 'completed',
}
Message will initially be in created state. It will be moved to ACTIVE state at the right time for message to be run. After that message will remain in ACTIVE state and will be retried after the retry_after_seconds if the message is not moved to a different state from ACTIVE.
SQL Jobs Table
create table if not exists ${this.schemaName}.jobs (
id uuid primary key not null default gen_random_uuid(),
topicName text not null,
data jsonb,
attributes jsonb,
state varchar(255) not null default('created'),
start_after timestamp with time zone not null default now(),
started_at timestamp with time zone,
created_at timestamp with time zone default now(),
retry_after_seconds int
);
Next Steps
- Create API for queue/job metrics
- TTL (Time to Live) for failed/completed job to be cleaned up configured per topic
Meta
Author: hadi@vincere.health
Crated at Vincere since we had a very similar use-case. Our initial design inspiration came from this blog
Also we were inspired by Pinterest Pin Later although it's fundamentally different in implementation
We are really open to suggestions and would love to hear your feedback and job scheduling use-cases so that we can improve