Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pg-notify

Package Overview
Dependencies
Maintainers
1
Versions
26
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-notify - npm Package Compare versions

Comparing version 0.0.12 to 0.0.14

28

index.d.ts
import { ClientConfig } from 'pg'
export interface Options {
export interface Options extends ClientConfig {
reconnectMaxRetries?: number;
reconnectDelay?: number;
maxPayloadSize?: number;
maxEmitRetries?: number;
emitThrottleDelay?: number;
continuousEmitFailureThreshold?: number;
queueSize?: number;
emulateMqEmitterApi?: boolean
db: ClientConfig;
}
export interface Message {
topic: string
payload: string | object
}
declare class PGPubSub {
constructor (opts: Options);
emit (message: Message): Promise<void>;
emit (message: Message, callback: () => void): void;
on (topic: string, listener: (params: { payload: string | object }, callback?: () => void) => void, callback: () => void): void;
on (topic: string, listener: (params: { payload: string | object }, callback?: () => void) => void): Promise<void>;
removeListener (topic: string, listener: (params: { payload: string | object }, callback?: () => void) => void, callback: () => void): void;
removeListener (topic: string, listener: (params: { payload: string | object }, callback?: () => void) => void): Promise<void>;
emit (channel: string, payload: any): Promise<void>;
on (topic: string, listener: (payload: any) => void): Promise<void>;
removeListener (topic: string, listener: (payload: any) => void): Promise<void>;
connect (): Promise<void>;
close (): Promise<void>;

@@ -35,0 +15,0 @@ }

2

index.js

@@ -0,3 +1,5 @@

'use strict'
const PGPubSub = require('./lib/pg-notify')
module.exports = PGPubSub
'use strict'
const EventEmitter = require('events')
const util = require('util')
const pg = require('pg')
const format = require('pg-format')
const util = require('util')
const sjson = require('secure-json-parse')

@@ -24,10 +24,5 @@ const debug = require('debug')('pg-notify')

this.ee.setMaxListeners(0)
this.reconnectMaxRetries = opts.reconnectMaxRetries || 10
this.reconnectDelay = opts.reconnectDelay || 1000
this.maxPayloadSize = opts.maxPayloadSize || 7999
this.maxEmitRetries = opts.maxEmitRetries || 10
this.queueSize = opts.queueSize || 10000
this.emitThrottleDelay = opts.emitThrottleDelay || 100
this.continuousEmitFailureThreshold = opts.continuousEmitFailureThreshold || 100
this.emulateMqEmitterApi = opts.emulateMqEmitterApi || false
this.maxPayloadSize = opts.maxPayloadSize || 7999 // default on a standard pg installation

@@ -37,20 +32,9 @@ this.state = states.init

this.channels = {}
this.queue = []
this.flushingQueue = false
this.continuousEmitFails = 0
}
emit (channel, payload, callback) {
if (this.state === states.closing) {
return Promise.resolve()
async emit (channel, payload) {
if (this.state !== states.connected) {
throw new Error('[PGPubSub]: not connected')
}
let _retries = 0
if (typeof channel === 'object') {
callback = payload
payload = channel.payload
_retries = channel._retries
channel = channel.topic
}
if (typeof payload === 'object') {

@@ -63,3 +47,3 @@ payload = JSON.stringify(payload)

if (Buffer.byteLength(parsedPayload, 'utf-8') > this.maxPayloadSize) {
throw new Error(`Payload exceeds maximum size: ${this.maxPayloadSize}`)
throw new Error(`[PGPubSub]: payload exceeds maximum size: ${this.maxPayloadSize}`)
}

@@ -71,118 +55,34 @@

if (this.state !== states.connected) {
this._insertMessageInQueue({ topic: channel, payload })
if (callback) {
callback()
} else {
return Promise.resolve()
}
} else {
return this.client.query(`NOTIFY ${format.ident(channel)}, ${parsedPayload}`)
.then(() => {
this.continuousEmitFails = 0
debug('[emit] emitted')
// ensure queue is empty
this._flushQueue()
if (callback) {
callback()
}
})
.catch(err => {
this.continuousEmitFails++
debug('[emit] failed to emit')
debug('[emit] state:', this.state)
// failed to notify, add it to queue to process it later to avoid data loss
this._insertMessageInQueue({ topic: channel, payload, _retries })
if (this.state === states.connected) {
console.error('[PGPubSub]: emit failed', err.message)
}
})
}
return this.client.query(`NOTIFY ${format.ident(channel)}, ${parsedPayload}`)
}
on (topic, listener, callback) {
debug('[subscribe]', topic)
if (this.state === states.closing) {
return Promise.resolve()
async on (channel, listener) {
debug('[subscribe]', channel)
if (this.state !== states.connected) {
throw new Error('[PGPubSub]: not connected')
}
let handler = listener
if (this.emulateMqEmitterApi) {
// needed to support this as drop-in replacement for mqemitter
handler = (payload) => {
listener({ payload }, () => {})
}
if (this.channels[channel]) {
this.ee.on(channel, listener)
this.channels[channel].listeners++
return
}
if (this.channels[topic]) {
this.ee.on(topic, handler)
this.channels[topic].listeners++
if (callback) {
callback()
return
} else {
return Promise.resolve()
}
}
this.ee.on(channel, listener)
this.channels[channel] = { listeners: 1 }
this.ee.on(topic, handler)
this.channels[topic] = { listeners: 1 }
if (this.state !== states.connected) {
return
}
return this.client.query(`LISTEN ${format.ident(topic)}`)
.then(() => {
if (callback) {
callback()
}
})
.catch((err) => {
if (this.state === states.connected) {
console.error('[PGPubSub]: subscribe failed', err.message)
}
})
return this.client.query(`LISTEN ${format.ident(channel)}`)
}
removeListener (topic, handler, callback) {
if (!this.channels[topic]) {
callback()
async removeListener (channel, listener) {
if (!this.channels[channel]) {
return
}
this.ee.removeListener(topic, handler)
this.channels[topic].listeners--
this.ee.removeListener(channel, listener)
this.channels[channel].listeners--
if (this.channels[topic].listeners === 0) {
delete this.channels[topic]
if (this.state !== states.connected) {
if (callback) {
callback()
return
} else {
return Promise.resolve()
}
}
return this.client.query(`UNLISTEN ${format.ident(topic)}`)
.then(() => {
if (callback) {
callback()
}
})
.catch(err => {
if (this.state === states.connected) {
console.error('[PGPubSub]: removeListener failed', err.message)
}
})
} else {
if (callback) {
callback()
} else {
return Promise.resolve()
}
if (this.channels[channel].listeners === 0) {
delete this.channels[channel]
return this.client.query(`UNLISTEN ${format.ident(channel)}`)
}

@@ -198,3 +98,2 @@ }

} catch (e) {
console.error('[PGPubSub]: error setting up client, reconnecting', e)
await this._reconnect()

@@ -210,3 +109,2 @@ }

this.channels = {}
this.queue = []
this.ee.removeAllListeners()

@@ -228,6 +126,2 @@ if (this.client) {

if (this.reconnectRetries > 5) {
await sleep(this.reconnectDelay)
}
try {

@@ -239,17 +133,13 @@ this.client.end()

this.close()
throw new Error('[PGPubSub]: Max reconnect attempts reached, aborting', err)
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', err)
}
if (![states.closing, states.connected].includes((this.state))) {
await sleep(10)
await this._reconnect(true)
}
}
if (this.state === states.connected) {
debug('[_reconnect] flushing queue')
await this._flushQueue()
}
}
async _setupClient () {
this.client = new pg.Client(this.opts.db)
this.client = new pg.Client(this.opts)
await this.client.connect()

@@ -272,3 +162,3 @@

this.close()
throw new Error('[PGPubSub]: Max reconnect attempts reached, aborting', err)
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', err)
}

@@ -289,55 +179,5 @@

debug('[_setupClient] init listeners done')
await this._flushQueue()
}
_insertMessageInQueue (message) {
debug('[_insertMessageInQueue] queue.length', this.queue.length)
if (typeof message._retries !== 'undefined') {
message._retries++
} else {
message._retries = 0
}
debug('[_insertMessageInQueue] message.retries', message._retries)
if (this.state !== states.closing && this.queue.length < this.queueSize) {
this.queue.push(message)
}
}
async _flushQueue () {
if (this.flushingQueue) {
return
}
this.flushingQueue = true
debug('[_flushQueue] flushing queue')
while (this.queue.length) {
if (this.state !== states.connected) {
break
}
const message = this.queue.shift()
if (message._retries && message._retries > this.maxEmitRetries) {
// skip messages that continuously failed
console.error('[PGPubSub]: emit failed after retries', message)
} else {
// start throttling retries when emits start continuously failing
if (this.continuousEmitFails > this.continuousEmitFailureThreshold) {
await sleep(this.emitThrottleDelay)
}
try {
await this.emit(message)
} catch (e) {
this._insertMessageInQueue(message)
}
}
}
this.flushingQueue = false
}
}
module.exports = PGPubSub
{
"name": "pg-notify",
"version": "0.0.12",
"version": "0.0.14",
"description": "Postgres pubsub client",

@@ -32,4 +32,4 @@ "main": "index.js",

"dependencies": {
"debug": "^4.2.0",
"pg": "^8.4.1",
"debug": "^4.3.1",
"pg": "^8.5.1",
"pg-format": "^1.0.4",

@@ -39,9 +39,10 @@ "secure-json-parse": "^2.1.0"

"devDependencies": {
"@types/pg": "^7.14.5",
"ava": "^3.13.0",
"@types/pg": "^7.14.7",
"ava": "^3.14.0",
"benchmark": "^2.1.4",
"coveralls": "^3.1.0",
"dotenv": "^8.2.0",
"nyc": "^15.1.0",
"standard": "^16.0.0",
"tsd": "^0.13.1"
"standard": "^16.0.3",
"tsd": "^0.14.0"
},

@@ -48,0 +49,0 @@ "tsd": {

@@ -22,6 +22,9 @@ <h1 align="center">pg-notify</h1>

This is a pre-release version, which does not follow semver. There can be breaking changes in patch/minor versions.
The first stable release will be released with v1.0.0.
Use this at your own risk.
## Features
- Auto reconnect
- Payload size validation
- Channel and payload sanitization
## Install

@@ -38,2 +41,4 @@

> PGPubSub accepts the same config as [pg](https://github.com/brianc/node-postgres).
```js

@@ -44,4 +49,4 @@ const PGPubSub = require('pg-notify')

;(async () => {
const pubsub = new PGPubSub({
db: { connectionString: 'postgres://postgres:postgres@localhost:5432/db' }
const pubsub = new PGPubSub({
connectionString: 'postgres://postgres:postgres@localhost:5432/db'
})

@@ -60,2 +65,23 @@

## API
### new PubSub(options)
- `options` (`object`) Configuration options for pg-notify pubsub instance. Accepts same options as [pg](https://github.com/brianc/node-postgres) with few custom ones described below.
- reconnectMaxRetries (`number`) Maximum number of reconnect attempts after losing connection. Default: `10`.
- maxPayloadSize (`number`) Maximum payload size, exceeding given size will throw an error. Default: `7999` ([In the default configuration it must be shorter than 8000 bytes.](https://www.postgresql.org/docs/current/sql-notify.html)).
### emit(channel, payload)
- `channel` (`string`)
- `payload` (`string` or `object`)
### on(channel, listener)
- `channel` (`string`)
- `listener` (`function`) accepting single argument `payload`
### removeListener(listener)
- `listener` (`function`) accepting single argument `payload`
### close()
### connect()
## Contributing

@@ -62,0 +88,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc