QStream
Topic-Based Messaging Queue on top of Redis Streams
Example
Producer
const streams = require('@openmessage/qstream')();
streams.publish('your-topic', data);
see more at examples
Consumer
const streams = require('@openmessage/qstream')();
const group = await streams.group('your-topic', 'group/queue name');
group.consume(async (data) => {
console.log({ data });
return true;
});
see more at examples
Usage
Connection
const QStream = require('@openmessage/qstream');
const qstream = QStream(redisUrl);
redisUrl: Valid Redis URL format
Publish/Produce/Emit
qstream.publish('your-topic', data);
data: can be any valid javascript object, primitive values not supported
With extra args, like maxLen, that will cap the stream to the specified length:
qstream.publish('your-topic', data, 10);
Or approximated maxLen:
qstream.publish('your-topic', data, '~10');
By default streams will be capped to aprox 10000 (MAXLEN ~ 10000). If you don't want your stream to be capped, you have to explicitly set the last arg of publish to null
.
Consumer Group
const group = await streams.group('your-topic', 'consumer-group/queue-name');
Consumers in the same consumer group will load balance jobs among them
Subscrie/Consume/Listen
group.consume(async (data) => {
console.log({ data });
return true;
});
The function passed to the consume method can be a promise
group.consume(console.log, 10);
as a second parameter to the consume function it receives the number of concurrent jobs, defaults to 1
Debug
This lib uses debug to debug the processing
DEBUG=qstream:* npm start
Roadmap