Socket
Socket
Sign inDemoInstall

nats-jobs

Package Overview
Dependencies
7
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    nats-jobs

Background job processor using NATS


Version published
Weekly downloads
53
increased by60.61%
Maintainers
1
Created
Weekly downloads
 

Readme

Source

NATS Jobs

Background job processing using NATS JetStream for distributing work.

See examples directory for more examples.

Companion libraries

Usage

This library uses debug. To enable:

DEBUG=nats-jobs node myfile.js

msgpackr is the recommended way to encode complex data structures since it's fast, efficient, and can handle serializing and unserializing dates.

Processing jobs

import { JsMsg } from 'nats'
import { setTimeout } from 'node:timers/promises'
import { expBackoff, jobProcessor } from 'nats-jobs'

const def = {
  stream: 'ORDERS',
  backoff: expBackoff(1000),
  async perform(msg: JsMsg) {
    console.log(`Started ${msg.info.streamSequence}`)
    console.log(msg.data.toString())
    // Simulate work
    await setTimeout(5000)
    console.log(`Completed ${msg.info.streamSequence}`)
  },
}

const processor = await jobProcessor()
const myJob = processor.start(jobDef)
// Gracefully handle shutdown
const shutDown = async () => {
  await myJob.stop()
  process.exit(0)
}
process.on('SIGTERM', shutDown)
process.on('SIGINT', shutDown)

To gracefully shutdown mutliple jobs and close the NATS connection call stop from the object returned by jobProcessor.

const processor = await jobProcessor()
const jobs = [
  processor.start(jobDef1),
  processor.start(jobDef2),
  processor.start(jobDef3),
]
const shutDown = async () => {
  // Shuts down jobs 1, 2, and 3 and closes the NATS connection
  await processor.stop()
  process.exit(0)
}

process.on('SIGTERM', shutDown)
process.on('SIGINT', shutDown)

Publish via NATS

nats pub ORDERS someText

Strategies

Short-lived jobs For short-lived jobs like sending an email you shouldn't need to enable autoExtendTimeout. Processing of a message should happen in less than the 10 second ack_wait default.

Long-lived jobs Long-lived jobs should use a short ack_wait with autoExtendAckTimeout set to true. You may also want to use the timeout option as a sanity check. A timeout call abort on the abort signal just like calling stop. However, you can differntiate by checking signal.reason which will be set to timeout for a timeout and stop when stop is called. This allows the job control of how it wants to handle this situation with two likely scenarios:

(1) Finish processing current record and exit. (2) Register an onabort callback that logs an error and calls process.exit().

Testing

Run the following in the /docker directory to start up NATS.

docker compose up

Keywords

FAQs

Last updated on 13 Dec 2022

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc