Security News
GitHub Removes Malicious Pull Requests Targeting Open Source Repositories
GitHub removed 27 malicious pull requests attempting to inject harmful code across multiple open source repositories, in another round of low-effort attacks.
Tasks/CRON scheduler and manager for horizontally scaled multi-server applications
"JoSk" is a Node.js task manager for horizontally scaled apps and apps that would need to scale horizontally quickly at some point of growth.
"JoSk" mimics the native API of setTimeout
and setInterval
and supports CRON expressions. All queued tasks are synced between all running application instances via Redis, MongoDB, or a custom adapter.
The "JoSk" package is made for a variety of horizontally scaled apps, such as clusters, multi-servers, and multi-threaded Node.js instances, that are running either on the same or different machines or even different data centers. "JoSk ensures that the only single execution of each task occurs across all running instances of the application.
"JoSk" is not just for multi-instance apps. It seamlessly integrates with single-instance applications as well, showcasing its versatility and adaptability.
Note: JoSk is the server-only package.
redis-server@>=5.0.0
— Redis Server Version (if used with RedisAdapter)mongod@>=4.0.0
— MongoDB Server Version (if used with MongoAdapter)node@>=14.20.0
— Node.js versionmongod@<4.0.0
— use josk@=1.1.0
node@<14.20.0
— use josk@=3.0.2
node@<8.9.0
— use josk@=1.1.0
npm install josk --save
// ES Module Style
import { JoSk, RedisAdapter, MongoAdapter } from 'josk';
// CommonJS
const { JoSk, RedisAdapter, MongoAdapter } = require('josk');
Constructor options for JoSK, MongoAdapter, and RedisAdapter
new JoSk(opts)
opts.adapter
{RedisAdapter|MongoAdapter} - [Required] Instance of RedisAdapter
or MongoAdapter
or custom adapteropts.debug
{Boolean} - [Optional] Enable debugging messages, useful during developmentopts.autoClear
{Boolean} - [Optional] Remove (Clear) obsolete tasks (any tasks which are not found in the instance memory (runtime), but exists in the database). Obsolete tasks may appear in cases when it wasn't cleared from the database on process shutdown, and/or was removed/renamed in the app. Obsolete tasks may appear if multiple app instances running different codebase within the same database, and the task may not exist on one of the instances. Default: false
opts.zombieTime
{Number} - [Optional] time in milliseconds, after this time - task will be interpreted as "zombie". This parameter allows to rescue task from "zombie mode" in case when: ready()
wasn't called, exception during runtime was thrown, or caused by bad logic. While resetOnInit
option helps to make sure tasks are done
on startup, zombieTime
option helps to solve same issue, but during runtime. Default value is 900000
(15 minutes). It's not recommended to set this value to below 60000
(one minute)opts.minRevolvingDelay
{Number} - [Optional] Minimum revolving delay — the minimum delay between tasks executions in milliseconds. Default: 128
opts.maxRevolvingDelay
{Number} - [Optional] Maximum revolving delay — the maximum delay between tasks executions in milliseconds. Default: 768
opts.onError
{Function} - [Optional] Informational hook, called instead of throwing exceptions. Default: false
. Called with two arguments:
title
{String}details
{Object}details.description
{String}details.error
{Mix}details.uid
{String} - Internal uid
, suitable for .clearInterval()
and .clearTimeout()
opts.onExecuted
{Function} - [Optional] Informational hook, called when task is finished. Default: false
. Called with two arguments:
uid
{String} - uid
passed into .setImmediate()
, .setTimeout()
, or setInterval()
methodsdetails
{Object}details.uid
{String} - Internal uid
, suitable for .clearInterval()
and .clearTimeout()
details.date
{Date} - Execution timestamp as JS {Date}details.delay
{Number} - Execution delay
(e.g. interval
for .setInterval()
)details.timestamp
{Number} - Execution timestamp as unix {Number}new RedisAdapter(opts)
Since v5.0.0
opts.client
{RedisClient} - [Required] RedisClient
instance, like one returned from await redis.createClient().connect()
methodopts.prefix
{String} - [Optional] use to create multiple named instancesopts.resetOnInit
{Boolean} - [Optional] (use with caution) make sure all old tasks are completed during initialization. Useful for single-instance apps to clean up unfinished that occurred due to intermediate shutdown, reboot, or exception. Default: false
new MongoAdapter(opts)
Since v5.0.0
opts.db
{Db} - [Required] Mongo's Db
instance, like one returned from MongoClient#db()
methodopts.prefix
{String} - [Optional] use to create multiple named instancesopts.lockCollectionName
{String} - [Optional] By default all JoSk instances use the same __JobTasks__.lock
collection for lockingopts.resetOnInit
{Boolean} - [Optional] (use with caution) make sure all old tasks are completed during initialization. Useful for single-instance apps to clean up unfinished that occurred due to intermediate shutdown, reboot, or exception. Default: false
JoSk is storage-agnostic (since v4.0.0
). It's shipped with Redis and MongoDB "adapters" out of the box, with option to extend its capabilities by creating and passing a custom adapter
JoSk has no dependencies, hence make sure redis
NPM package is installed in order to support Redis Storage Adapter. RedisAdapter
utilize basic set of commands SET
, GET
, DEL
, EXISTS
, HSET
, HGETALL
, and SCAN
. RedisAdapter
is compatible with all Redis-alike databases, and was well-tested with Redis and KeyDB
import { JoSk, RedisAdapter } from 'josk';
import { createClient } from 'redis';
const redisClient = await createClient({
url: 'redis://127.0.0.1:6379'
}).connect();
const jobs = new JoSk({
adapter: new RedisAdapter({
client: redisClient,
prefix: 'app-scheduler',
}),
onError(reason, details) {
// Use onError hook to catch runtime exceptions
// thrown inside scheduled tasks
console.log(reason, details.error);
}
});
JoSk has no dependencies, hence make sure mongodb
NPM package is installed in order to support MongoDB Storage Adapter. Note: this package will add two new MongoDB collections per each new JoSk()
. One collection for tasks and second for "Read Locking" with .lock
suffix
import { JoSk, MongoAdapter } from 'josk';
import { MongoClient } from 'mongodb';
const client = new MongoClient('mongodb://127.0.0.1:27017');
// To avoid "DB locks" — it's a good idea to use separate DB from the "main" DB
const mongoDb = client.db('joskdb');
const jobs = new JoSk({
adapter: new MongoAdapter({
db: mongoDb,
prefix: 'cluster-scheduler',
}),
onError(reason, details) {
// Use onError hook to catch runtime exceptions
// thrown inside scheduled tasks
console.log(reason, details.error);
}
});
After JoSk initialized simply call JoSk#setInterval
to create recurring task
const jobs = new JoSk({ /*...*/ });
jobs.setInterval((ready) => {
/* ...code here... */
ready();
}, 60 * 60000, 'task1h'); // every hour
jobs.setInterval((ready) => {
/* ...code here... */
asyncCall(() => {
/* ...more code here...*/
ready();
});
}, 15 * 60000, 'asyncTask15m'); // every 15 mins
/**
* no need to call ready() inside async function
*/
jobs.setInterval(async () => {
try {
await asyncMethod();
} catch (err) {
console.log(err)
}
}, 30 * 60000, 'asyncAwaitTask30m'); // every 30 mins
/**
* no need to call ready() when call returns Promise
*/
jobs.setInterval(() => {
return asyncMethod(); // <-- returns Promise
}, 2 * 60 * 60000, 'asyncAwaitTask2h'); // every two hours
Note: This library relies on job ID. Always use different uid
, even for the same task:
const task = function (ready) {
//... code here
ready();
};
jobs.setInterval(task, 60000, 'task-1m'); // every minute
jobs.setInterval(task, 2 * 60000, 'task-2m'); // every two minutes
setInterval(func, delay, uid)
func
{Function} - Function to call on scheduledelay
{Number} - Delay for the first run and interval between further executions in millisecondsuid
{String} - Unique app-wide task idSet task into interval execution loop. ready()
callback is passed as the first argument into a task function.
In the example below, the next task will not be scheduled until the current is ready:
jobs.setInterval(function (ready) {
/* ...run sync code... */
ready();
}, 60 * 60000, 'syncTask1h'); // will execute every hour + time to execute the task
jobs.setInterval(async function () {
try {
await asyncMethod();
} catch (err) {
console.log(err)
}
}, 60 * 60000, 'asyncAwaitTask1h'); // will execute every hour + time to execute the task
In the example below, the next task will not wait for the current task to finish:
jobs.setInterval(function (ready) {
ready();
/* ...run sync code... */
}, 60 * 60000, 'syncTask1h'); // will execute every hour
jobs.setInterval(async function () {
/* ...task re-scheduled instantly here... */
process.nextTick(async () => {
await asyncMethod();
});
}, 60 * 60000, 'asyncAwaitTask1h'); // will execute every hour
In the next example, a long running task is executed in a loop without delay after the full execution:
jobs.setInterval(function (ready) {
asyncCall((error, result) => {
if (error) {
ready(); // <-- Always run `ready()`, even if call was unsuccessful
} else {
anotherCall(result.data, ['param'], (error, response) => {
if (error) {
ready(); // <-- Always run `ready()`, even if call was unsuccessful
return;
}
waitForSomethingElse(response, () => {
ready(); // <-- End of the full execution
});
});
}
});
}, 0, 'longRunningAsyncTask'); // run in a loop as soon as previous run is finished
Same task combining await
/async
and callbacks
jobs.setInterval(function (ready) {
process.nextTick(async () => {
try {
const result = await asyncCall();
const response = await anotherCall(result.data, ['param']);
waitForSomethingElse(response, () => {
ready(); // <-- End of the full execution
});
} catch (err) {
console.log(err)
ready(); // <-- Always run `ready()`, even if call was unsuccessful
}
});
}, 0, 'longRunningAsyncTask'); // run in a loop as soon as previous run is finished
setTimeout(func, delay, uid)
func
{Function} - Function to call after delay
delay
{Number} - Delay in millisecondsuid
{String} - Unique app-wide task idRun a task after delay in ms. setTimeout
is useful for cluster - when you need to make sure task executed only once. ready()
callback is passed as the first argument into a task function.
jobs.setTimeout(function (ready) {
/* ...run sync code... */
ready();
}, 60000, 'syncTaskIn1m'); // will run only once across the cluster in a minute
jobs.setTimeout(function (ready) {
asyncCall(function () {
/* ...run async code... */
ready();
});
}, 60000, 'asyncTaskIn1m'); // will run only once across the cluster in a minute
jobs.setTimeout(async function () {
try {
/* ...code here... */
await asyncMethod();
/* ...more code here...*/
} catch (err) {
console.log(err)
}
}, 60000, 'asyncAwaitTaskIn1m'); // will run only once across the cluster in a minute
setImmediate(func, uid)
func
{Function} - Function to executeuid
{String} - Unique app-wide task idImmediate execute the function, and only once. setImmediate
is useful for cluster - when you need to execute function immediately and only once across all servers. ready()
is passed as the first argument into the task function.
jobs.setImmediate(function (ready) {
//...run sync code
ready();
}, 'syncTask'); // will run immediately and only once across the cluster
jobs.setImmediate(function (ready) {
asyncCall(function () {
//...run more async code
ready();
});
}, 'asyncTask'); // will run immediately and only once across the cluster
jobs.setImmediate(async function () {
try {
/* ...code here... */
await asyncMethod();
} catch (err) {
console.log(err)
}
}, 'asyncTask'); // will run immediately and only once across the cluster
clearInterval(timerId)
timerId
{String|Promise} — Timer id returned from JoSk#setInterval()
methodtrue
when task is successfully cleared, or false
when task was not foundCancel current interval timer.
const timer = await jobs.setInterval(func, 34789, 'unique-taskid');
await jobs.clearInterval(timer);
clearTimeout(timerId)
timerId
{String|Promise} — Timer id returned from JoSk#setTimeout()
methodtrue
when task is successfully cleared, or false
when task was not foundCancel current timeout timer.
const timer = await jobs.setTimeout(func, 34789, 'unique-taskid');
await jobs.clearTimeout(timer);
destroy()
true
if instance successfully destroyed, false
if instance already destroyedDestroy JoSk instance. This method shouldn't be called in normal circumstances. Stop internal interval timer. After JoSk is destroyed — calling public methods would end up logged to stdout
or if onError
hook was passed to JoSk it would receive an error. Only permitted methods are clearTimeout
and clearInterval
.
// EXAMPLE: DESTROY JoSk INSTANCE UPON SERVER PROCESS TERMINATION
const jobs = new JoSk({ /* ... */ });
const cleanUpBeforeTermination = function () {
/* ...CLEAN UP AND STOP OTHER THINGS HERE... */
jobs.destroy();
process.exit(1);
};
process.stdin.resume();
process.on('uncaughtException', cleanUpBeforeTermination);
process.on('exit', cleanUpBeforeTermination);
process.on('SIGHUP', cleanUpBeforeTermination);
ping()
Ping JoSk instance. Check scheduler readiness and its connection to the "storage adapter"
// EXAMPLE: DESTROY JoSk INSTANCE UPON SERVER PROCESS TERMINATION
const jobs = new JoSk({ /* ... */ });
const pingResult = await jobs.ping();
console.log(pingResult)
/**
In case of the successful response
{
status: 'OK',
code: 200,
statusCode: 200,
}
Failed response
{
status: 'Error reason',
code: 500,
statusCode: 500,
error: ErrorObject
}
*/
Use cases and usage examples
Use JoSk to invoke synchronized tasks by CRON schedule, and cron-parser
package to parse CRON expressions. To simplify CRON scheduling — grab and use setCron
function below:
import parser from 'cron-parser';
const jobsCron = new JoSk({
adapter: new RedisAdapter({
client: await createClient({ url: 'redis://127.0.0.1:6379' }).connect(),
prefix: 'cron-scheduler'
}),
minRevolvingDelay: 512, // Adjust revolving delays to higher values
maxRevolvingDelay: 1000, // as CRON schedule defined to seconds
});
// CRON HELPER FUNCTION
const setCron = async (uniqueName, cronTask, task) => {
const nextTimestamp = +parser.parseExpression(cronTask).next().toDate();
return await jobsCron.setInterval(function (done) {
done(parser.parseExpression(cronTask).next().toDate());
task();
}, nextTimestamp - Date.now(), uniqueName);
};
setCron('Run every two seconds cron', '*/2 * * * * *', function () {
console.log(new Date);
});
Passing arguments can be done via wrapper function
const jobs = new JoSk({ /* ... */ });
const myVar = { key: 'value' };
let myLet = 'Some top level or env.variable (can get changed during runtime)';
const task = function (arg1, arg2, ready) {
//... code here
ready();
};
jobs.setInterval((ready) => {
task(myVar, myLet, ready);
}, 60 * 60000, 'taskA');
jobs.setInterval((ready) => {
task({ otherKey: 'Another Value' }, 'Some other string', ready);
}, 60 * 60000, 'taskB');
For long-running async tasks, or with callback-apis it might be needed to call ready()
explicitly. Wrap task's body into process.nextTick
to enjoy await
/async
combined with classic callback-apis
jobs.setInterval((ready) => {
process.nextTick(async () => {
try {
const result = await asyncCall();
waitForSomethingElse(async (error, data) => {
if (error) {
ready(); // <-- Always run `ready()`, even if call was unsuccessful
return;
}
await saveCollectedData(result, [data]);
ready(); // <-- End of the full execution
});
} catch (err) {
console.log(err)
ready(); // <-- Always run `ready()`, even if call was unsuccessful
}
});
}, 60 * 60000, 'longRunningTask1h'); // once every hour
During development and tests you may want to clean up Adapter's Storage
To clean up old tasks via Redis CLI use the next query pattern:
redis-cli --no-auth-warning KEYS "josk:default:*" | xargs redis-cli --raw --no-auth-warning DEL
# If you're using multiple JoSk instances with prefix:
redis-cli --no-auth-warning KEYS "josk:prefix:*" | xargs redis-cli --raw --no-auth-warning DEL
To clean up old tasks via MongoDB use the next query pattern:
// Run directly in MongoDB console:
db.getCollection('__JobTasks__').remove({});
// If you're using multiple JoSk instances with prefix:
db.getCollection('__JobTasks__PrefixHere').remove({});
// Recommended MongoDB connection options
// When used with ReplicaSet
const options = {
writeConcern: {
j: true,
w: 'majority',
wtimeout: 30000
},
readConcern: {
level: 'majority'
},
readPreference: 'primary'
};
MongoClient.connect('mongodb://url', options, (error, client) => {
// To avoid "DB locks" — it's a good idea to use separate DB from "main" application DB
const db = client.db('dbName');
const jobs = new JoSk({
adapter: new MongoAdapter({
db: db,
})
});
});
task_delay ± (256 + Storage_Request_Delay)
. That means this package won't fit when you need to run a task with very precise delays. For other cases, if ±256 ms
delays are acceptable - this package is the great solution;opts.minRevolvingDelay
and opts.maxRevolvingDelay
to set the range for random delays between executions. Revolving range acts as a safety control to make sure different servers not picking the same task at the same time. Default values (128
and 768
) are the best for 3-server setup (the most common topology). Tune these options to match needs of your project. Higher opts.minRevolvingDelay
will reduce storage read/writes;.lock
collection for MongoDB.# Before running tests make sure NODE_ENV === development
# Install NPM dependencies
npm install --save-dev
# Before running tests you need
# to have access to MongoDB and Redis servers
REDIS_URL="redis://127.0.0.1:6379" MONGO_URL="mongodb://127.0.0.1:27017/npm-josk-test-001" npm test
# If previous run has errors — add "debug" to output extra logs
DEBUG=true REDIS_URL="redis://127.0.0.1:6379" MONGO_URL="mongodb://127.0.0.1:27017/npm-josk-test-001" npm test
# Be patient, tests are taking around 6 mins
Run Redis-related tests only
# Before running Redis tests you need to have Redis server installed and running
REDIS_URL="redis://127.0.0.1:6379" npm run test-redis
# Be patient, tests are taking around 3 mins
Run MongoDB-related tests only
# Before running Mongo tests you need to have MongoDB server installed and running
MONGO_URL="mongodb://127.0.0.1:27017/npm-josk-test-001" npm run test-mongo
# Be patient, tests are taking around 3 mins
JoSk
is Job-Task - Is randomly generated name by "uniq" project
FAQs
Tasks/CRON scheduler and manager for horizontally scaled multi-server applications
We found that josk demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
GitHub removed 27 malicious pull requests attempting to inject harmful code across multiple open source repositories, in another round of low-effort attacks.
Security News
RubyGems.org has added a new "maintainer" role that allows for publishing new versions of gems. This new permission type is aimed at improving security for gem owners and the service overall.
Security News
Node.js will be enforcing stricter semver-major PR policies a month before major releases to enhance stability and ensure reliable release candidates.