
Product
Introducing Tier 1 Reachability: Precision CVE Triage for Enterprise Teams
Socket’s new Tier 1 Reachability filters out up to 80% of irrelevant CVEs, so security teams can focus on the vulnerabilities that matter.
@thinkmill/node-worker
Advanced tools
A simple, promise-based, worker framework.
yarn add @thinkmill/node-worker
Argument | Type | Description |
---|---|---|
label | String | A label that describes the worker (used in logs, errors, etc) |
payload | Function | A promise-returning function that is executed on schedule (received the run ordinal) |
options | Object | Options controlling how the worker should behave |
payload
The payload
function represents the main body of the worker, where the real work is done.
It should return a promise that resolves or rejects withing the timeoutMs
provided to it.
The payload
is invoked according to the following schedule:
worker.start()
function is calledsleepMs
period before invoking the payload
payload
When invoked, the payload
will be provided with a single argument; an object containing the following:
Property | Type | Description |
---|---|---|
label | String | The worker label that was provided on construction |
ordinal | Number | An integer indicating how many times the payload has been executed |
timeoutMs | Number | The number of milliseconds the worker will wait for this invocation to return |
options
The options
can contain:
Property | Type | Description |
---|---|---|
sleepMs | Number | How long do we pause between runs? (in milliseconds) |
timeoutMs | Number | How long do we wait (in milliseconds) for the run promise to resolve/reject? See important notes below! |
Note that the timeoutMs
provided forces the end of a cycle (and allows the next run to be scheduled) but does not (and cannot?) terminate the still-running promise.
If the promise returned by the payload
function has errored internally (without resolving or rejecting) then that's OK; the schedule timeout will prevent the worker from stalling forever.
But if the promise returned is still doing work, there's the possibility we'll end up with multiple instances of the payload executing in tandem.
This is almost certainly a Bad Thing, but that's up to you.
The take away:
timeoutMs
valueMethod | Description |
---|---|
start() | Start the worker after a short delay. |
stop() | Stop the worker. The currently running job will be allowed to compete but the worker will not restart afterwords. |
scheduleRunInMs(delayMs, onceOff ) | Schedule a run in delayMs milliseconds. If onceOff is true this will trigger an extra run rather than rescheduling the next run. If the worker has been stopped this will have no effect. |
A simple example:
const Worker = require('@thinkmill/node-worker');
const myWorker = new Worker(
'test-worker',
({ label, ordinal, timeoutMs }) => {
return new Promise((resolve, reject) => {
const takeMs = 4000 + (Math.random() * 1500);
console.log(`Run #${ordinal} ..
will take ${takeMs} ms`);
setTimeout(() => {
console.log(`Run #${ordinal} ..
resolving`);
resolve(true);
}, takeMs);
});
}, {
sleepMs: 5000,
timeoutMs: 5 * 1000,
}
);
myWorker.start();
A more realistic/interesting example, processing items in a queue:
const Worker = require('@thinkmill/node-worker');
const debug = require('debug')('workers:dequeue-things');
const Model = require('../models/queuedThings');
// Manage the dequeuing of things
const payload = async ({ label, ordinal, timeoutMs }) => {
const runForMs = timeoutMs - 1000;
const runUntil = new Date(Date.now() + runForMs);
debug(`Running for ${runForMs} ms (until ${runUntil.toISOString()})`);
let processedCount = 0;
let queueEmptied = false;
let nextThing;
do {
await knex.transaction(async (trx) => {
// Get the next thing from the queue
nextThing = await Model.query(trx).findOne('isReady', true).whereNull('processedAt').orderBy('queuedAt');
// The queue is empty; exit early
if (!nextThing) {
queueEmptied = true;
return;
}
// Do whatever it is that things do
// ..
// Record that we've processed this thing
await Model.query(trx).update({ processedAt: new Date() }).where({ id: nextThing.id });
// Inc. our count
processedCount++;
});
}
while (new Date() < runUntil);
// Output some debug info
const summaryMsg = `DONE: ${processedCount} things processed, leaving the queue ${queueEmptied ? 'EMPTY' : 'NOT EMPTY'}`;
debug(summaryMsg);
// Resolve with a boolean indicating whether the payload should be re-invoked soon or after the normal sleep
return queueEmptied;
};
// Create the worker instance and start it
const worker = new Worker('dequeue-things', payload, { sleepMs: 60 * 1000 });
worker.start();
You can also request a once-off worker run from some other action. For example, if you have a worker sending emails which processes a queue every 10 minutes, you might want to trigger that worker after a successful account creation.
class AccountCreator {
onCreateSuccess () {
// Tell the email worker to run so the user receives email promptly.
const onceOff = true;
require('../email-worker').scheduleRunInMs(200, onceOff);
}
}
We use the debug
package internally.
Entries scoped to workers:${label}
(where label
is that supplied on construction).
It's probably helpful to follow this pattern in your own payload
functions.
Output can be enabled by supplying a scope (or list of scopes) to output in the DEBUG
env var.
This can include wildcards. Eg
# All worker debug
DEBUG=workers:* yarn start
# Debug for a specific worker
DEBUG=workers:send-notifications yarn start
# Debug for several specific workers
DEBUG=workers:send-notifications,workers:send-emails yarn start
BSD Licensed. Copyright (c) Thinkmill 2018.
FAQs
A simple, promise-based, worker framework
The npm package @thinkmill/node-worker receives a total of 1 weekly downloads. As such, @thinkmill/node-worker popularity was classified as not popular.
We found that @thinkmill/node-worker demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 17 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket’s new Tier 1 Reachability filters out up to 80% of irrelevant CVEs, so security teams can focus on the vulnerabilities that matter.
Research
/Security News
Ongoing npm supply chain attack spreads to DuckDB: multiple packages compromised with the same wallet-drainer malware.
Security News
The MCP Steering Committee has launched the official MCP Registry in preview, a central hub for discovering and publishing MCP servers.