
Security News
Package Maintainers Call for Improvements to GitHub’s New npm Security Plan
Maintainers back GitHub’s npm security overhaul but raise concerns about CI/CD workflows, enterprise support, and token management.
@imqueue/pg-pubsub
Advanced tools
Reliable PostgreSQL LISTEN/NOTIFY with inter-process lock support
This library provides a clean way to use PostgreSQL LISTEN and NOTIFY commands for its asynchronous mechanism implementation. It comes as a top-level wrapper over node-postgres and provides better, cleaner way to work with database notifications engine.
To make it clear - it solves several major problems you will fall into if you're going to use LISTEN/NOTIFY in your node app:
As easy as:
npm i --save @imqueue/pg-pubsub
import { PgPubSub } from '@imqueue/pg-pubsub';
const connectionString = 'postgres://user:pass@localhost:5432/dbname';
const pubSub = new PgPubSub({ connectionString, singleListener: false });
(async () => {
await pubSub.connect();
})();
With such instantiation options natural behavior of PgPubSub will be as follows:
After connection established you may decide to listen for any numbers of channels your application may need to utilize:
await pubSub.listen('UserChanged');
await pubSub.listen('OrderCreated');
await pubSub.listen('ArticleUpdated');
BTW, the most reliable way is to initiate listening on 'connect'
event:
pubSub.on('connect', async () => {
await Promise.all([
'UserChanged',
'OrderCreated',
'ArticleUpdated',
].map(channel => pubSub.listen(channel)));
});
Now, whenever you need to close/reopen connection, or reconnect occurred for any reason you'll be sure nothing is broken.
All payloads on messages are treated as JSON, so when the handler catch a message it is already parsed as JSON value, so you do not need to manage serialization/deserialization yourself.
There are 2 ways of handling channel messages - by using 'message'
event
handler on pubSub
object, or using pubSub.channels
event emitter and to
listen only particular channel for it's messages. On message event fires first,
channels events fires afterwards, so this could be a good way if you need to
inject and transform particular message in a synchronous manner before it
will come to a particular channel listeners.
Also 'message'
listener could be useful during implementation of handling of
database side events. It is easy imagine that db can send us messages into, so
called, structural channels, e. g. 'user:insert'
, 'company:update'
or
'user_company:delete'
, where such names are generated by some generic trigger
which handles corresponding database operations and send updates to subscribers
using NOTIFY calls. In such case we can treat channel on application side
as self-describable database operation change, which we can easily manage with
a single piece of code and keep following DRY.
// using 'message' handler:
pubSub.on('message', (channel: string, payload: AnyJson) => {
// ... do the job
switch (channel) {
case 'UserChanged': {
// ... do some staff with user change event payload
break;
}
default: {
// do something with payload by default
break;
}
}
});
// handling using channels
pubSub.channels.on('UserChanged', (payload: AnyJson) => {
// do something with user changed payload
});
pubSub.channels.on('OrderCreated', (payload: AnyJson) => {
// do something with order created payload
});
pubSub.channels.on('ArticleUpdated', (payload: AnyJson) => {
// do something with article updated payload
});
Of course, it is better to setup listeners before calling connect()
that it
starts handle payloads right up on connect time.
You can send messages in many different ways. For example, you may create database triggers which would notify all connected clients with some specific updates. Or you may use a database only as notifications engine and generate notifications on application level. Or you may combine both approaches - there are no limits!
Here is how you can send notification with PgPubSub
API (aka application
level of notifications):
pubSub.notify('UserChanged', {
old: { id: 777, name: 'John Doe', phone: '555-55-55' },
new: { id: 777, name: 'Sam Peters', phone: '777-77-77' },
});
Now all subscribers, who listening 'UserChanged'
channel will receive given
payload JSON object.
There are variety of many possible architectures to come up with when you're building scalable distributed system.
With services on scale in such systems it might be a need to make sure only single service of many similar running is listening to particular database notifications. Here why comes an idea of inter process (IP) locking mechanism, which would guarantee that only one process handles notifications and if it dies, next one which is live will immediately handle listening.
This library comes with this option turned on by default. To make it work in
such manner, you would need to skip passing singleListener
option to
PgPubSub
constructor or set it to true
:
const pubSub = new PgPubSub({ connectionString });
// or, equivalently
const pubSub = new PgPubSub({ connectionString, singleListener: true });
Locking mechanism utilazes the same connection and LISTEN/NOTIFY commands, so it won't consume any additional computing resources.
Also, if you already working with pg
library in your application and you
have a need to stay for some reason with that single connection usage, you
can bypass it directly as pgClient
option. But that is not always a good idea.
Normally, you have to understand what you are doing and why.
const pubSub = new PgPubSub({ pgClient: existingClient });
NOTE: With LISTEN connections it is really hard to utilize power of connection pool as long as it will require additional implementation of some connection switching mechanism using listen/unlisten and some specific watchers which may fall into need of re-implementing pools from scratch. So, that is why most of existing listen/notify solutions based on a single connection approach. And this library as well. It is just more simple and reliable.
You may read API docs on wiki pages , read the code of the library itself, use hints in your IDE or generate HTML docs with:
git clone git@github.com:imqueue/pg-pubsub.git
cd pg-pubsub
npm i
npm run doc
Try to run the following minimal example code of single listener scenario (do not forget to set proper database connection string):
import { PgPubSub } from '@imqueue/pg-pubsub';
import Timer = NodeJS.Timer;
let timer: Timer;
const NOTIFY_DELAY = 2000;
const CHANNEL = 'HelloChannel';
const pubSub = new PgPubSub({
connectionString: 'postgres://postgres@localhost:5432/postgres',
singleListener: true,
// filtered: true,
});
pubSub.on('listen', channel => console.info(`Listening to ${channel}...`));
pubSub.on('connect', async () => {
console.info('Database connected!');
await pubSub.listen(CHANNEL);
timer = setInterval(async () => {
await pubSub.notify(CHANNEL, { hello: { from: process.pid } });
}, NOTIFY_DELAY);
});
pubSub.on('notify', channel => console.log(`${channel} notified`));
pubSub.on('end', () => console.warn('Connection closed!'));
pubSub.channels.on(CHANNEL, console.log);
pubSub.connect().catch(err => console.error('Connection error:', err));
Or take a look at other minimal code examples
Play with them locally:
git clone -b examples git://github.com/imqueue/pg-pubsub.git examples
cd examples
npm i
Now you can start any of them, for example:
./node_modules/.bin/ts-node filtered.ts
Any contributions are greatly appreciated. Feel free to fork, propose PRs, open issues, do whatever you think may be helpful to this project. PRs which passes all tests and do not brake tslint rules are first-class candidates to be accepted!
Happy Coding!
FAQs
Reliable PostgreSQL LISTEN/NOTIFY with inter-process lock support
The npm package @imqueue/pg-pubsub receives a total of 302 weekly downloads. As such, @imqueue/pg-pubsub popularity was classified as not popular.
We found that @imqueue/pg-pubsub demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Maintainers back GitHub’s npm security overhaul but raise concerns about CI/CD workflows, enterprise support, and token management.
Product
Socket Firewall is a free tool that blocks malicious packages at install time, giving developers proactive protection against rising supply chain attacks.
Research
Socket uncovers malicious Rust crates impersonating fast_log to steal Solana and Ethereum wallet keys from source code.