
Research
/Security News
Critical Vulnerability in NestJS Devtools: Localhost RCE via Sandbox Escape
A flawed sandbox in @nestjs/devtools-integration lets attackers run code on your machine via CSRF, leading to full Remote Code Execution (RCE).
A light weight, simple FIFO Queue backed by PostgresSQL for delayed tasks.
npm install programma
import { Programma } from 'programma'
const programma = new Programma({
connectionString: '<database-connection-string>',
})
// connect to node-pq pool and start processing
await programma.start()
await programma.addJob('sendEmail', {
data: { email: 'xyz@gmail.com'},
runAfterDate: '2020-08-27T20:02:18.255Z', // optional UTC ISO-8601 format, run after a specific date takes precedence
})
programma.processJobs({ topicName: 'sendEmail', heartBeat: 10 }, async (job: IReceiveJob) => {
await programma.moveJobToProcessing(job.id)
})
Programma for us makes more sense to schedule task/work to be done, and to track it's progress and state or to fan-out multiple tasks to a queue system. You can parallelize Programma by running on multiple different Node processes giving maxJobs limit per heartbeat/interval where you can achieve horizontal scalability. Programma also handles schema/table creation so it could be used with a multiple PostgresDB to run the jobs. For parallelizing and doing the work you could use Redis based queue system like bull or rsmq using a shared Redis server multiple Node.js processes can send / receive messages Following is an example to use it with awesome bull queue
const bullRedisQueue = new Queue('sendEmail', 'redis://127.0.0.1:6379');
await programma.addJob('sendEmail', {
data: { email: 'xyz@gmail.com'},
runAfterDate: '2020-08-27T20:02:18.255Z', // optional UTC ISO-8601 format, run after a specific date takes precedence
retryAfterSeconds: 60,
})
programma.processJobs({ topicName: 'sendEmail', heartBeat: 10 }, async (job: IReceiveJob) => {
// use all the good things provided by bull to retry the work to be done and everything
await bullRedisQueue.add(
{ id: job.id, data: job.data, attributes: job.data },
{ retries: 3, backoff: 20000,timeout: 15000 }
)
// move job to processing, after submitting it. the status will be changed to processing and Redis queue will handle it
// if a job is not moved to different state it will be retired after retryAfterSecond period
await programma.moveJobToProcessing(job.id)
})
bullRedisQueue.process(async (job, done) => {
// heavy processing
await programma.moveJobToDone(jod.data.id)
return Promise.resolve()
})
// job failed after all the back-off retries
// we can track that in SQL
bullRedisQueue.on('failed', (id, error) => {
const job = await bullRedisQueue.getJob(id)
await programma.moveJobToFailed(jod.data.id)
// also can track error in SQL
await programma.setAttributes(job.id, { error: error })
})
For tasks that does not involve heavy computation (i.e, does not block event-loop or not to put a lot of back-pressure) programma could be used as a end-to-end solution or alternatively for high latency tasks it could be used with smaller batch sizes per interval. where work could be parallelized between multiple nodes that can poll the same topic
await programma.addJob('sendPushNotification', {
data: { token: '123213123' },
retryAfterSeconds: 60,
attributes: { maxAttempts: 5 },
retryAfterSeconds: 5,
})
// you can control the re-try logic and could also use it for exponential back-off
// it's all up-to you how to handle it
programma.processJobs({ topicName: 'sendPushNotification', heartBeat: 10, maxJobs: 100 }, async (job: IReceiveJob) => {
// check if retry attempts are exhausted
if (job.attributes.attempts > job.attributes.maxAttempts) {
await programma.moveJobToFailed(job.id)
}
try {
await pushNotifyApi(job.data)
await programma.moveJobToDone(job.id)
} catch (e) {
// set the retry counter
await programma.setAttributes(job.id, { attempts: attempts ? attempts + 1 : 1 })
// use this for constant retry delay, exponential back-off or a combination thereof
await programma.setRetryAfterSeconds(60)
}
})
For the config programma uses node-postgres pool config to create an internal pool of connection. You can refer to PoolConfig interface
export interface IProgrammaConstructor {
new (config: PoolConfig, schemaName: string): IProgramma
}
// schemaName defaults to programma. you can also give your custom schema name
const program = new Programma({
connectionString: `postgres://<user-name>:<password>@localhost:5432/postgres`,
max: 50, // max connection pool size. default to 10
}, 'mySchemaName')
await program.start() // this established the pool connection, ensures migration and start pooling. if start with false it won't ensure migration
export interface IProgramma {
addJob(topicName: string, job: IJobConfig): Promise<string | null>
receiveJobs(config: IReceiveMessageConfig, handler: IHandlerCallback): void
deleteJob(id: string): Promise<boolean>
moveJobToProcessing(id: string): Promise<boolean>
moveJobToDone(id: string, deleteOnComplete: boolean): Promise<boolean>
moveJobToFailed(id: string, deleteOnFail: boolean): Promise<boolean>
getJob(id: string): Promise<IJobDetail | null>
setAttributes(id: string, attributes: object): Promise<boolean>
setRetryAfterSeconds(id: string, seconds: number): Promise<boolean>
setJobStartDate(id: string, startDate: string | Date): Promise<boolean>
start(): Promise<void>
shutdown(): void
}
export interface IJobConfig {
// job data is persisted in JSONB type
// it could contain any number of key value pairs
data: {}
// attributes is metadata related to job, persisted as JSONB type
// attributes could be like retry count etc. metadata for downstream system
// job calls will expose a method to run these attributes for downstream system
attributes?: {}
// run after seconds
runAfterSeconds?: number
// ISO8601 format String or Date. Use UTC ISO8601 format to avoid inconsistency
runAfterDate?: string | Date
// re-run the job after seconds. default is 30seconds
retryAfterSeconds?: number | null
}
if runAfterDate or runAfterSeconds is not provided currentTime will be picked by default so job could be run in next pooling interval Job uuid will be returning on success
export interface IReceiveMessageConfig {
// maximum number of messages polled per scheduled interval
// even though the handler will be executed independently per job
maxJobs?: number
// poll interval in seconds. has to be greater of equal to 1
// default poll interval is 5seconds
heartBeat?: number
// name of the queue topic. this is required field
topicName: string
}
export interface IHandlerCallback {
(job: IReceiveJob): void
}
export interface IReceiveJob {
id: string
data: Object
attributes: Object
}
export interface IJobDetail {
id: string
topicName: string
data: Object
attributes: Object
state: JobStates
start_after: string
started_at: string | null
created_at: string
retry_after_seconds: number
}
All the change status methods follows very self descriptive interface. For moveJobToDone, moveJobToFailed if a seconds boolean parameter true is passed it will delete the job from table. Following are the job states
export enum JobStates {
CREATED = 'created',
ACTIVE = 'active',
PROCESSING = 'processing',
FAILED = 'failed',
COMPLETED = 'completed',
}
Message will initially be in created state. It will be moved to ACTIVE state at the right time for message to be run. After that message will remain in ACTIVE state and will be retried after the retry_after_seconds if the message is not moved to a different state from ACTIVE.
create table if not exists ${this.schemaName}.jobs (
id uuid primary key not null default gen_random_uuid(),
topicName text not null,
data jsonb,
attributes jsonb,
state varchar(255) not null default('created'),
start_after timestamp with time zone not null default now(),
started_at timestamp with time zone,
created_at timestamp with time zone default now(),
retry_after_seconds int
);
Author: hadi@vincere.health
Crated at Vincere since we had a very similar use-case. Our initial design inspiration came from this blog
Also we were inspired by Pinterest Pin Later although it's fundamentally different in implementation
We are really open to suggestions and would love to hear your feedback and job scheduling use-cases so that we can improve
FAQs
A light weight, customizable, simple FIFO Queue backed by PostgresSQL
The npm package programma receives a total of 18 weekly downloads. As such, programma popularity was classified as not popular.
We found that programma demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
/Security News
A flawed sandbox in @nestjs/devtools-integration lets attackers run code on your machine via CSRF, leading to full Remote Code Execution (RCE).
Product
Customize license detection with Socket’s new license overlays: gain control, reduce noise, and handle edge cases with precision.
Product
Socket now supports Rust and Cargo, offering package search for all users and experimental SBOM generation for enterprise projects.