Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
redis-streams-broker
Advanced tools
This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.
This package is based on redis stream data type which provides you with following features
npm -i redis-streams-broker
const brokerType = require('redis-streams-broker').StreamChannelBroker;
docker run --name streamz -p 6379:6379 -itd --rm redis:latest
const broker = new brokerType(redisClient, name);
const Redis = require("ioredis");
const redisConnectionString = "redis://127.0.0.1:6379/";
const qName = "Queue";
const redisClient = new Redis(redisConnectionString);
const brokerType = require('redis-streams-broker').StreamChannelBroker;
const broker = new brokerType(redisClient, qName);
//Used to publish a paylod on stream.
const payloadId = await broker.publish({ a: "Hello", b: "World" });
//Creates a consumer group to receive payload
const consumerGroup = await broker.joinConsumerGroup("MyGroup");
//Registers a new consumer with Name and Callback for message handlling.
const subscriptionHandle = await consumerGroup.subscribe("Consumer1", newMessageHandler);
// Handler for arriving Payload
async function newMessageHandler(payload) {
for (let index = 0; index < payload.length; index++) {
try {
const element = payload[index];
console.log("Payload Id:", element.id); //Payload Id
console.log("Payload Received from :", element.channel); //Stream name
console.log("Actual Payload:", element.payload); //Actual Payload
await element.markAsRead(); //Payload is marked as delivered or Acked also optionaly the message can be dropped.
}
catch (exception) {
console.error(exception);
}
}
}
//Provides summary of payloads which have delivered but not acked yet.
const summary = await consumerGroup.pendingSummary();
//Unsubscribes the consumer from the group.
const sucess = consumerGroup.unsubscribe(subscriptionHandle);
//Amount of memory consumed by this stream in bytes.
const consumedMem = await broker.memoryFootprint();
0.0.9[Beta]
This project is contrubution to public domain and completely free for use, view LICENSE.md file for details.
Class StreamChannelBroker
constructor(redisClient: any, channelName: string)
Creates a broker instance.
redisClient: Injectable redis client which will be used to send commands to redis server.
channelName: Name of the stream key, if this doesnot exists it will be created on first push or group subscription.
publish(payload: any, maximumApproximateMessages?: number): Promise<string>;
Publishes provided message into the stream and returns id generated by server.
payload: A JS object containing properties which are passed as key values pairs.
maximumApproximateMessages: Appropiate length of the stream it is equal to ~ MAXLENGTH
option in redis. Defaulted to 100.
joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>
Creates a consumer group on the given redis stream with information provided, if the group exists does nothing returning a ConsumerGroup
object.
groupName: Name of the group to be created ot joined.
readFrom: Id of the mesage to start reading from. defaulted to $
to only read new messages recevied on redis, check redis docs for more info.
memoryFootprint(): Promise<number>
Returns number of bytes consumed by the current stream.
destroy(): Promise<boolean>;
Starts to unsubscribe all the handles that were subscribed to this instance.
Class ConsumerGroup
subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>
Subscribes to stream to start receiving events when new payload arrives, this internally creates a polling system to check for new messages in stream. returns subscription name.
consumerName: Name of the consumer who is subscribing via the consumer group object.
handler: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature (payload: Payload[]) => Promise<boolean>
should be async return from this function is ignored for now, look at Payload
class below for more details.
pollSpan: Number of millisecond to wait after completion of handler to check for next available message in stream. Defaulted to 1000 milliseconds.
payloadsToFetch: Maximum number of messages to fetch in one poll to server this is simillar to COUNT
command in redis, this is optional and defaulted to 2.
subscriptionHandle: Name for subscription handler this is what will be returned from the function, this is defaulted to unique shortid.
readPending: If set to true
will read all messages from start of the stream ie: Id = 0 which are in pending list of this consumer and group, once all pending are read it will automatically switch to latest messages from the stream. If set to false
it will always look for new message from the stream, this is defaulted to false
.
unsubscribe(subscriptionHandle: string): Promise<boolean>
Unsubscribes from the stream for the given subscriptionhandle, returns true for sucess and false for failure.
subscriptionHandle: Name of the subscription handle which was returned by subscribe api.
pendingSummary(): Promise<GroupSummary>
Returns details of the pending items for the given group by exposing GroupSummary
object.
Class Payload
channel: string
: Name of the stream key in redis.
id: string
: Id of the message being received.
payload: any
: Actual payload to processs.
markAsRead(deleteMessage?: boolean): Promise<boolean>
This function helps to ack the payload as read or processed, returns status of the operation via boolean return type true
indicating success.
deleteMessage: if set to true
it will ack & delete the message from the stream if set to false
will only ack the message defaulted to false.
Class GroupSummary
total: number
: This is the total number of messages in pending list.firstId: string
: Id of the first message which is pending.lastId: string
: Id of the last message which is pending.consumerStats: any
: Extra information provided by XPENDING
command.FAQs
This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.
We found that redis-streams-broker demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.