Security News
The Risks of Misguided Research in Supply Chain Security
Snyk's use of malicious npm packages for research raises ethical concerns, highlighting risks in public deployment, data exfiltration, and unauthorized testing.
A light weight, simple FIFO Queue backed by PostgresSQL for delayed tasks.
npm install programma
import { Programma } from 'programma'
const programma = new Programma({
connectionString: '<database-connection-string>',
})
// connect to node-pq pool and start processing
await programma.start()
await programma.addJob('sendEmail', {
data: { email: 'xyz@gmail.com'},
runAfterDate: '2020-08-27T20:02:18.255Z', // optional UTC ISO-8601 format, run after a specific date takes precedence
})
programma.processJobs({ topicName: 'sendEmail', heartBeat: 10 }, async (job: IReceiveJob) => {
await programma.moveJobToProcessing(job.id)
})
Programma for us makes more sense to schedule task/work to be done, and to track it's progress and state or to fan-out multiple tasks to a queue system. You can parallelize Programma by running on multiple different Node processes giving maxJobs limit per heartbeat/interval where you can achieve horizontal scalability. Programma also handles schema/table creation so it could be used with a multiple PostgresDB to run the jobs. For parallelizing and doing the work you could use Redis based queue system like bull or rsmq using a shared Redis server multiple Node.js processes can send / receive messages Following is an example to use it with awesome bull queue
const bullRedisQueue = new Queue('sendEmail', 'redis://127.0.0.1:6379');
await programma.addJob('sendEmail', {
data: { email: 'xyz@gmail.com'},
runAfterDate: '2020-08-27T20:02:18.255Z', // optional UTC ISO-8601 format, run after a specific date takes precedence
retryAfterSeconds: 60,
})
programma.processJobs({ topicName: 'sendEmail', heartBeat: 10 }, async (job: IReceiveJob) => {
// use all the good things provided by bull to retry the work to be done and everything
await bullRedisQueue.add(
{ id: job.id, data: job.data, attributes: job.data },
{ retries: 3, backoff: 20000,timeout: 15000 }
)
// move job to processing, after submitting it. the status will be changed to processing and Redis queue will handle it
// if a job is not moved to different state it will be retired after retryAfterSecond period
await programma.moveJobToProcessing(job.id)
})
bullRedisQueue.process(async (job, done) => {
// heavy processing
await programma.moveJobToDone(jod.data.id)
return Promise.resolve()
})
// job failed after all the back-off retries
// we can track that in SQL
bullRedisQueue.on('failed', (id, error) => {
const job = await bullRedisQueue.getJob(id)
await programma.moveJobToFailed(jod.data.id)
// also can track error in SQL
await programma.setAttributes(job.id, { error: error })
})
For tasks that does not involve heavy computation (i.e, does not block event-loop or not to put a lot of back-pressure) programma could be used as a end-to-end solution or alternatively for high latency tasks it could be used with smaller batch sizes per interval. where work could be parallelized between multiple nodes that can poll the same topic
await programma.addJob('sendPushNotification', {
data: { token: '123213123' },
retryAfterSeconds: 60,
attributes: { maxAttempts: 5 },
retryAfterSeconds: 5,
})
// you can control the re-try logic and could also use it for exponential back-off
// it's all up-to you how to handle it
programma.processJobs({ topicName: 'sendPushNotification', heartBeat: 10, maxJobs: 100 }, async (job: IReceiveJob) => {
// check if retry attempts are exhausted
if (job.attributes.attempts > job.attributes.maxAttempts) {
await programma.moveJobToFailed(job.id)
}
try {
await pushNotifyApi(job.data)
await programma.moveJobToDone(job.id)
} catch (e) {
// set the retry counter
await programma.setAttributes(job.id, { attempts: attempts ? attempts + 1 : 1 })
// use this for constant retry delay, exponential back-off or a combination thereof
await programma.setRetryAfterSeconds(60)
}
})
For the config programma uses node-postgres pool config to create an internal pool of connection. You can refer to PoolConfig interface
export interface IProgrammaConstructor {
new (config: PoolConfig, schemaName: string): IProgramma
}
// schemaName defaults to programma. you can also give your custom schema name
const program = new Programma({
connectionString: `postgres://<user-name>:<password>@localhost:5432/postgres`,
max: 50, // max connection pool size. default to 10
}, 'mySchemaName')
await program.start() // this established the pool connection, ensures migration and start pooling. if start with false it won't ensure migration
export interface IProgramma {
addJob(topicName: string, job: IJobConfig): Promise<string | null>
receiveJobs(config: IReceiveMessageConfig, handler: IHandlerCallback): void
deleteJob(id: string): Promise<boolean>
moveJobToProcessing(id: string): Promise<boolean>
moveJobToDone(id: string, deleteOnComplete: boolean): Promise<boolean>
moveJobToFailed(id: string, deleteOnFail: boolean): Promise<boolean>
getJob(id: string): Promise<IJobDetail | null>
setAttributes(id: string, attributes: object): Promise<boolean>
setRetryAfterSeconds(id: string, seconds: number): Promise<boolean>
setJobStartDate(id: string, startDate: string | Date): Promise<boolean>
start(): Promise<void>
shutdown(): void
}
export interface IJobConfig {
// job data is persisted in JSONB type
// it could contain any number of key value pairs
data: {}
// attributes is metadata related to job, persisted as JSONB type
// attributes could be like retry count etc. metadata for downstream system
// job calls will expose a method to run these attributes for downstream system
attributes?: {}
// run after seconds
runAfterSeconds?: number
// ISO8601 format String or Date. Use UTC ISO8601 format to avoid inconsistency
runAfterDate?: string | Date
// re-run the job after seconds. default is 30seconds
retryAfterSeconds?: number | null
}
if runAfterDate or runAfterSeconds is not provided currentTime will be picked by default so job could be run in next pooling interval Job uuid will be returning on success
export interface IReceiveMessageConfig {
// maximum number of messages polled per scheduled interval
// even though the handler will be executed independently per job
maxJobs?: number
// poll interval in seconds. has to be greater of equal to 1
// default poll interval is 5seconds
heartBeat?: number
// name of the queue topic. this is required field
topicName: string
}
export interface IHandlerCallback {
(job: IReceiveJob): void
}
export interface IReceiveJob {
id: string
data: Object
attributes: Object
}
export interface IJobDetail {
id: string
topicName: string
data: Object
attributes: Object
state: JobStates
start_after: string
started_at: string | null
created_at: string
retry_after_seconds: number
}
All the change status methods follows very self descriptive interface. For moveJobToDone, moveJobToFailed if a seconds boolean parameter true is passed it will delete the job from table. Following are the job states
export enum JobStates {
CREATED = 'created',
ACTIVE = 'active',
PROCESSING = 'processing',
FAILED = 'failed',
COMPLETED = 'completed',
}
Message will initially be in created state. It will be moved to ACTIVE state at the right time for message to be run. After that message will remain in ACTIVE state and will be retried after the retry_after_seconds if the message is not moved to a different state from ACTIVE.
create table if not exists ${this.schemaName}.jobs (
id uuid primary key not null default gen_random_uuid(),
topicName text not null,
data jsonb,
attributes jsonb,
state varchar(255) not null default('created'),
start_after timestamp with time zone not null default now(),
started_at timestamp with time zone,
created_at timestamp with time zone default now(),
retry_after_seconds int
);
Author: hadi@vincere.health
Crated at Vincere since we had a very similar use-case. Our initial design inspiration came from this blog
Also we were inspired by Pinterest Pin Later although it's fundamentally different in implementation
We are really open to suggestions and would love to hear your feedback and job scheduling use-cases so that we can improve
FAQs
A light weight, customizable, simple FIFO Queue backed by PostgresSQL
The npm package programma receives a total of 47 weekly downloads. As such, programma popularity was classified as not popular.
We found that programma demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Snyk's use of malicious npm packages for research raises ethical concerns, highlighting risks in public deployment, data exfiltration, and unauthorized testing.
Research
Security News
Socket researchers found several malicious npm packages typosquatting Chalk and Chokidar, targeting Node.js developers with kill switches and data theft.
Security News
pnpm 10 blocks lifecycle scripts by default to improve security, addressing supply chain attack risks but sparking debate over compatibility and workflow changes.