
Research
/Security News
Weaponizing Discord for Command and Control Across npm, PyPI, and RubyGems.org
Socket researchers uncover how threat actors weaponize Discord across the npm, PyPI, and RubyGems ecosystems to exfiltrate sensitive data.
A framework to simplify the implementation of an event-bus oriented microservices architecture
A framework to simplify the implementation of an event-bus oriented microservices architecture.
$ npm install msb --save
var msb = require('msb');
See examples of the patterns below.
Every message-broker pattern in this module is based on one of these publish/subscribe patterns. Producers publish messages to a topic on the microservicebus, which gets delivered to subscribed consumers.
The broadcaster should ensure their messages are formatted according to the envelope schema. A time-to-live (ttl) can be optionally provided to ensure messages cannot be delivered after this amount of milliseconds. (This value is sensitive to synchronisation of clocks between producers and consumers.) The payload should be provided as a JSON-serializable object.
var message = messageFactory.createBroadcastMessage({
namespace: 'test:pubsub',
ttl: 30000 // Optional
})
message.payload = { /* ... */ }
The implementer should decide how they want to handle messages that cannot be delivered to the message broker, i.e. where an error is passed back.
msb
.channelManager
.findOrCreateProducer('test:pubsub')
.publish(message, function(err) {
if (err) return debug(err)
//...
})
All listeners will receive all messages published to this topic, as long as they are online at the time the message is published.
msb
.channelManager
.findOrCreateConsumer('test:pubsub', { groupId: false })
.on('message', function(message) {
//...
})
.on('error', debug)
Only one listener will receive each message published to the specified topic.
Listeners will only receive messages published while they are online.
msb
.channelManager
.findOrCreateConsumer('test:pubsub', {
groupId: 'example-string'
})
.on('message', function(message) {
//...
})
.on('error', debug)
Listeners will also receive messages published while they were offline, queued up in the message broker. Messages that has a time-to-live (ttl) specified will not be delivered after this time has been exceeded.
Note that messages will only be queued from the first time this listener has been instantiated.
msb
.channelManager
.findOrCreateConsumer('test:pubsub', {
groupId: 'example-string',
durable: true
})
.on('message', function(message) {
//...
})
.on('error', debug)
The simplest way to do a 1-1 request is to provide only a topic and a JSON-serializable payload. Should multiple responders attempt to respond, only the first response to be received will be provided to the callback.
msb.request('example:topic', payload, function(err, payload, _fullMesssage) {
if (err) return debug(err)
//...
});
Additional settings can be provided:
msb.request({
namespace: 'example:topic',
waitForResponsesMs: 1000
}, payload, function(err, payload, _fullMesssage) {
if (err) return debug(err)
//...
});
A single payload is published with a return topic derived from the namespace that will ensure responses are received by this requester.
The requester will listen for multiple responses for the specified amount of time.
var requester = msb.Requester({
namespace: 'example:topic',
waitForResponsesMs: 10000 // a.k.a. responseTimeout
})
requester
.on('payload', function(payload, _fullMessage) {
//...
})
.on('error', function(err) {
debug(err)
})
.on('end', function() {
//... Note: won't fire if the requester encountered an error
})
.publish(payload)
The requester will 'end' once this number of responses have been received.
var requester = msb.Requester({
namespace: 'example:topic',
waitForResponses: 1
})
//...
Responders have the ability to change the expected number of responses or how long the requester should wait for responses from that responder. If you want to guarantee that the requester will wait for such messages (acks) to be received, you should specify a minimum time for the requester to wait.
var requester = msb.Requester({
namespace: 'example:topic',
waitForAcksMs: 1000, // a.k.a. ackTimeout
waitForResponses: 1
})
//...
In the above case, the requester will only ever end after the specified waitForAcksMs
.
A single response (per responder) for each incoming request.
msb.Responder.createServer({
namespace: 'example:topic'
})
.use(function(request, response, next) {
var body = {}
response.writeHead(200) // HTTP-compatible
response.end(body) // To be provided in response `payload.body`
})
.listen()
An ack is sent to ensure the requester will continue to wait for this response.
msb.Responder.createServer({
namespace: 'example:topic'
})
.use(function(request, response, next) {
var expectedResponses = 1
var expectedTimeForResponse = 3000
response.responder.sendAck(expectedResponses, expectedTimeForResponse, next)
})
.use(function(request, response, next) {
var body = {}
response.writeHead(200) // HTTP-compatible
response.end(body) // To be provided in response `payload.body`
})
.listen()
An ack is sent to ensure the requester will wait for the multiple responses being prepared.
msb.Responder.createEmitter({
namespace: 'example:topic'
})
.on('responder', function(responder) {
responder.sendAck(3, 5000)
var i = 0;
while (i++ < 3) {
var payload = {
body: {
//...
}
}
responder.send(payload, function(err) {
if (err) return debug(err)
})
}
})
Loads the provided config object over the configuration for the channelManager. E.g.
msb.configure(config); // Default channelManager, or
msb.createChannelManager().configure(config); // Additional channelManager
Note: It is recommended that you do not re-configure after publisher/subscriber channels have been created.
name
in the package.json of the main module.)version
in the package.json of the main module.)config.amqp
overriding defaults.channelManager.configure(config)
programmatically.By default, MSB does not automatically recover failed connection with the broker. When a connection failure occurs, an error will be raised and the application process will terminate. This behaviour can be overridden by setting MSB_BROKER_RECONNECT=true
. There is a drawback in the implementation of that functionality though. The reconnection will be done silently, without emitting any events or logging errors. Due to that it might be hard to understand if a microservice is connected to the broker or is in the middle of connection retry.
Listens to a topic on the bus and prints JSON to stdout. By default it will also listen for response topics detected on messages, and JSON is pretty-printed. For Newline-delimited JSON compatibility, specify -p false
.
$ node_modules/msb/bin/msb -t topic:to:listen:to
Or if globally installed, i.e. npm install msb -g
:
$ msb -t topic:to:listen:to
Options:
RabbitMQ is the default message broker used. The AMQP adapter is tested with RabbitMQ and it implements a limited topology for simplification. One exchange is created per topic and a queue is created for every group of similar services, configured using a groupId. This means that you can have different types of services listening on the same topic, and multiple processes of the same type of service would receive a fair distribution of messages.
A responder lets you send of formatted acks and responses in response to a request message received on a topic/namespace.
The request message this responder is responding to.
msb.channelManager
)See ResponderServer for options.
(Use msb.Responder.createServer()
to create instances.)
function handler(request, response, next)
function errorHandler(err, request, response, next)
next()
call.Call this to start listening for requests.
msb.channelManager
)Passed to ResponderServer middelware-like functions. The interface is kept similar to core HttpServerResponse for convenience.
See http.
The Responder object used to send acks and responses.
An requester is a collector component that can also publish new messages on the bus.
null
to prevent inheritance from the current messageFactory
context.function(payload, _message) { }
function(ack, _message) { }
Emitted either on timeout or when the expected number of responses has been received.
A collector is a component that listens for multiple response messages, with timeouts and number of responses determining its lifetime.
(For events and instantiation, see Requester.)
A simpler API for 1-1 request/responses.
A function that throws a validation error if the message does not validate.
Returns a middleware-style function, e.g. function(request, response, next) { }
, to be used in a ResponderServer middleware chain, that will pass a validation error to next()
for invalid incoming requests.
request
.E.g. responderServer.use(msb.validateWithSchema.middleware(payloadSchema));
Returns an event handler function, e.g. function(payload) { ... }
.
function(payload) { }
An event handler that will only be called if the incoming payload validates.function(err, payload) { }
A function that will be called with the validation error and original payload if the incoming message fails validation.Note: Without an errorEventHandlerFn
, errors will be emitted on the original event emitter.
E.g.
requester
.on(msb.validateWithSchema.onEvent(messageSchema, function(payload) {
...
}))
.on('error', function(err, payload) {
console.error(err);
requester.end();
}));
The channel manager enables re-use of channels listening/publishing per topic. It is an EventEmitter instance used as a singleton with the app-wide configuration.
var channelManager = msb.channelManager;
Returns a producer for this topic. Either existing or new. Corresponding channelManager
events will be emitted for this producer.
Returns a consumer listening on this topic. Either existing or new. Corresponding channelManager events will be emitted for this consumer. If config.cleanupConsumers
is set, these consumers will be removed as soon as there are no more listeners for them. If an app-wide schema exists, it will be checked for every incoming message.
false
for broadcast-style message queue.false
to require explicit confirmation of processed messages. (Default: true)The adapter has established connection with message broker
The adapter has lost connection with message broker, but is going to recover it (if connection recovery is enabled)
An error occurred on adapter level, e.g. unrecoverable connection loss (if connection recovery is disabled)
(Created using the channelManager.findOrCreateProducer
.)
(Created using the channelManager.findOrCreateConsumer
.)
Stops listening for messages on this topic. If config.cleanupConsumers
is set, and this consumer was created using channelManager.findOrCreateConsumer
, it would be removed from the channelManager
.
Confirms with the broker (where supported) that processing of this message has completed. Only works where the broker-adapter is AMQP, and where config.autoConfirm
is set to false
.
Confirms with the broker (where supported) that this message should not be processed, e.g. in cases such as invalid message or TTL reached. Only works where the broker-adapter is AMQP, and where config.autoConfirm
is set to false
.
config.schema
.FAQs
A framework to simplify the implementation of an event-bus oriented microservices architecture
The npm package msb receives a total of 335 weekly downloads. As such, msb popularity was classified as not popular.
We found that msb demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
/Security News
Socket researchers uncover how threat actors weaponize Discord across the npm, PyPI, and RubyGems ecosystems to exfiltrate sensitive data.
Security News
Socket now integrates with Bun 1.3’s Security Scanner API to block risky packages at install time and enforce your organization’s policies in local dev and CI.
Research
The Socket Threat Research Team is tracking weekly intrusions into the npm registry that follow a repeatable adversarial playbook used by North Korean state-sponsored actors.