
Research
Supply Chain Attack on Axios Pulls Malicious Dependency from npm
A supply chain attack on Axios introduced a malicious dependency, plain-crypto-js@4.2.1, published minutes earlier and absent from the project’s GitHub releases.
@tsjing/nsqjs
Advanced tools
The official NodeJS client for the nsq client protocol. This implementation attempts to be fully compliant and maintain feature parity with the official Go (go-nsq) and Python (pynsq) clients.
The topic and channel arguments are strings and must be specified. The options argument is optional. Below are the parameters that can be specified in the options object.
maxInFlight: 1 heartbeatInterval: 30 maxBackoffDuration: 128 maxAttempts: 0 maxAttempts > 0, then the message will be finished automatically when the number of attempts has been exhausted.requeueDelay: 90 nsqdTCPAddresses ['localhost:4150']lookupdHTTPAddresses ['localhost:4161'], ['http://localhost/lookup'], ['http://localhost/path/lookup?extra_param=true']lookupdPollInterval: 60 lookupdPollJitter: 0.3 tls: false tlsVerification: true deflate: false deflateLevel: 6 snappy: false authSecret: nulloutputBufferSize: nulloutputBufferSize >= 64outputBufferTimeout: nulloutputBufferTimeout >= 1. A value of -1 disables timeouts.messageTimeout: nullsampleRate: null1 <= sampleRate <= 99clientId: nullReader events are:
Reader.MESSAGE or messageReader.DISCARD or discardReader.ERROR or errorReader.NSQD_CONNECTED or nsqd_connectedReader.NSQD_CLOSED or nsqd_closedReader.MESSAGE and Reader.DISCARD both produce Message objects.
Reader.NSQD_CONNECTED and Reader.NSQD_CLOSED events both provide the host
and port of the nsqd to which the event pertains.
These methods are available on a Reader object:
connect() close() pause() unpause() isPaused() true if paused, false otherwise.The following properties and methods are available on Message objects produced by a Reader instance.
timestamp attempts id hasResponded body json() timeUntilTimeout(hard=false): finish() requeue(delay=null, backoff=true)
The delay is in milliseconds. This is how long nsqd will hold on the message
before attempting it again. The backoff parameter indicates that we should
treat this as an error within this process and we need to backoff to recover.touch() Allows messages to be sent to an nsqd.
Available Writer options:
tls: false tlsVerification: true deflate: false deflateLevel: 6 snappy: false clientId: nullWriter events are:
Writer.READY or readyWriter.CLOSED or closedWriter.ERROR or errorThese methods are available on a Writer object:
connect() close() publish(topic, msgs, [callback]) topic is a string. msgs is either a string, a Buffer, JSON serializable
object, a list of strings / Buffers / JSON serializable objects. callback takes a single error argument.Start nsqd and nsqdlookupd
# nsqdLookupd Listens on 4161 for HTTP requests and 4160 for TCP requests
$ nsqlookupd &
$ nsqd --lookupd-tcp-address=127.0.0.1:4160 &
var nsq = require('nsqjs');
var reader = new nsq.Reader('sample_topic', 'test_channel', {
lookupdHTTPAddresses: '127.0.0.1:4161'
});
reader.connect();
reader.on('message', function (msg) {
console.log('Received message [%s]: %s', msg.id, msg.body.toString());
msg.finish();
});
nsq = require 'nsqjs'
topic = 'sample_topic'
channel = 'test_channel'
options =
lookupdHTTPAddresses: '127.0.0.1:4161'
reader = new nsq.Reader topic, channel, options
reader.connect()
reader.on nsq.Reader.MESSAGE, (msg) ->
console.log "Received message [#{msg.id}]: #{msg.body.toString()}"
msg.finish()
Publish a message to nsqd to be consumed by the sample client:
$ curl -d "it really tied the room together" http://localhost:4151/pub?topic=sample_topic
This script simulates a message that takes a long time to process or at least longer than the default message timeout. To ensure that the message doesn't timeout while being processed, touch events are sent to keep it alive.
var nsq = require('nsqjs');
var reader = new nsq.Reader('sample_topic', 'test_channel', {
lookupdHTTPAddresses: '127.0.0.1:4161'
});
reader.connect();
reader.on('message', function (msg) {
console.log('Received message [%s]', msg.id);
function touch() {
if (!msg.hasResponded) {
console.log('Touch [%s]', msg.id);
msg.touch();
// Touch the message again a second before the next timeout.
setTimeout(touch, msg.timeUntilTimeout() - 1000);
}
}
function finish() {
console.log('Finished message [%s]: %s', msg.id, msg.body.toString());
msg.finish();
}
console.log('Message timeout is %f secs.', msg.timeUntilTimeout() / 1000);
setTimeout(touch, msg.timeUntilTimeout() - 1000);
// Finish the message after 2 timeout periods and 1 second.
setTimeout(finish, msg.timeUntilTimeout() * 2 + 1000);
});
{Reader} = require 'nsqjs'
topic = 'sample_topic'
channel = 'test_channel'
options =
lookupdHTTPAddresses: '127.0.0.1:4161'
reader = new Reader topic, channel, options
reader.connect()
reader.on Reader.MESSAGE, (msg) ->
console.log "Received message [#{msg.id}]"
touch = ->
unless msg.hasResponded
console.log "Touch [#{msg.id}]"
msg.touch()
# Touch the message again a second before the next timeout.
setTimeout touch, msg.timeUntilTimeout() - 1000
finish = ->
console.log "Finished message [#{msg.id}]: #{msg.body.toString()}"
msg.finish()
console.log "Message timeout is #{msg.timeUntilTimeout() / 1000} secs."
setTimeout touch, msg.timeUntilTimeout() - 1000
# Finish the message after 2 timeout periods and 1 second.
setTimeout finish, msg.timeUntilTimeout() * 2 + 1000
nsqjs uses debug to log debug output.
To see all nsqjs events:
$ DEBUG=nsqjs:* node my_nsqjs_script.js
To see all reader events:
$ DEBUG=nsqjs:reader:* node my_nsqjs_script.js
To see a specific reader's events:
$ DEBUG=nsqjs:reader:<topic>/<channel>:* node my_nsqjs_script.js
Replace
<topic>and<channel>
To see all writer events:
$ DEBUG=nsqjs:writer:* node my_nsqjs_script.js
The writer sends a single message and then a list of messages.
var nsq = require('nsqjs');
var w = new nsq.Writer('127.0.0.1', 4150);
w.connect();
w.on('ready', function () {
w.publish('sample_topic', 'it really tied the room together');
w.publish('sample_topic', [
'Uh, excuse me. Mark it zero. Next frame.',
'Smokey, this is not \'Nam. This is bowling. There are rules.'
]);
w.publish('sample_topic', 'Wu?', function (err) {
if (err) { return console.error(err.message); }
console.log('Message sent successfully');
w.close();
});
});
w.on('closed', function () {
console.log('Writer closed');
});
{Writer} = require 'nsqjs'
w = new Writer '127.0.0.1', 4150
w.connect()
w.on Writer.READY, ->
w.publish 'sample_topic', 'it really tied the room together'
w.publish 'sample_topic', ['Uh, excuse me. Mark it zero. Next frame.',
'Smokey, this is not \'Nam. This is bowling. There are rules.']
w.publish 'sample_topic', 'Wu?', (err) ->
console.log 'Message sent successfully' unless err?
w.close()
w.on Writer.CLOSED, ->
console.log 'Writer closed'
message.finish is called after backoff event.maxAttempts is now by default 0. [ Breaking Change! ]MESSAGE listener if there's no
DISCARD listener.debug [ Breaking Change! ]close, pause, and unpause to Readerfinish, requeue, etc after nsqd disconnectReaderRdy, ConnectionRdy implementationReader implementationNSQDConnection
Readerconnect() now happens on next tick so that it can be called before event
handlers are registered.Message
TOUCH eventsNSQDConnection implementationwire implementationMessage implementationFAQs
NodeJS client for NSQ
We found that @tsjing/nsqjs demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
A supply chain attack on Axios introduced a malicious dependency, plain-crypto-js@4.2.1, published minutes earlier and absent from the project’s GitHub releases.

Research
Malicious versions of the Telnyx Python SDK on PyPI delivered credential-stealing malware via a multi-stage supply chain attack.

Security News
TeamPCP is partnering with ransomware group Vect to turn open source supply chain attacks on tools like Trivy and LiteLLM into large-scale ransomware operations.