Comparing version 0.0.12 to 0.0.14
import { ClientConfig } from 'pg' | ||
export interface Options { | ||
export interface Options extends ClientConfig { | ||
reconnectMaxRetries?: number; | ||
reconnectDelay?: number; | ||
maxPayloadSize?: number; | ||
maxEmitRetries?: number; | ||
emitThrottleDelay?: number; | ||
continuousEmitFailureThreshold?: number; | ||
queueSize?: number; | ||
emulateMqEmitterApi?: boolean | ||
db: ClientConfig; | ||
} | ||
export interface Message { | ||
topic: string | ||
payload: string | object | ||
} | ||
declare class PGPubSub { | ||
constructor (opts: Options); | ||
emit (message: Message): Promise<void>; | ||
emit (message: Message, callback: () => void): void; | ||
on (topic: string, listener: (params: { payload: string | object }, callback?: () => void) => void, callback: () => void): void; | ||
on (topic: string, listener: (params: { payload: string | object }, callback?: () => void) => void): Promise<void>; | ||
removeListener (topic: string, listener: (params: { payload: string | object }, callback?: () => void) => void, callback: () => void): void; | ||
removeListener (topic: string, listener: (params: { payload: string | object }, callback?: () => void) => void): Promise<void>; | ||
emit (channel: string, payload: any): Promise<void>; | ||
on (topic: string, listener: (payload: any) => void): Promise<void>; | ||
removeListener (topic: string, listener: (payload: any) => void): Promise<void>; | ||
connect (): Promise<void>; | ||
close (): Promise<void>; | ||
@@ -35,0 +15,0 @@ } |
@@ -0,3 +1,5 @@ | ||
'use strict' | ||
const PGPubSub = require('./lib/pg-notify') | ||
module.exports = PGPubSub |
'use strict' | ||
const EventEmitter = require('events') | ||
const util = require('util') | ||
const pg = require('pg') | ||
const format = require('pg-format') | ||
const util = require('util') | ||
const sjson = require('secure-json-parse') | ||
@@ -24,10 +24,5 @@ const debug = require('debug')('pg-notify') | ||
this.ee.setMaxListeners(0) | ||
this.reconnectMaxRetries = opts.reconnectMaxRetries || 10 | ||
this.reconnectDelay = opts.reconnectDelay || 1000 | ||
this.maxPayloadSize = opts.maxPayloadSize || 7999 | ||
this.maxEmitRetries = opts.maxEmitRetries || 10 | ||
this.queueSize = opts.queueSize || 10000 | ||
this.emitThrottleDelay = opts.emitThrottleDelay || 100 | ||
this.continuousEmitFailureThreshold = opts.continuousEmitFailureThreshold || 100 | ||
this.emulateMqEmitterApi = opts.emulateMqEmitterApi || false | ||
this.maxPayloadSize = opts.maxPayloadSize || 7999 // default on a standard pg installation | ||
@@ -37,20 +32,9 @@ this.state = states.init | ||
this.channels = {} | ||
this.queue = [] | ||
this.flushingQueue = false | ||
this.continuousEmitFails = 0 | ||
} | ||
emit (channel, payload, callback) { | ||
if (this.state === states.closing) { | ||
return Promise.resolve() | ||
async emit (channel, payload) { | ||
if (this.state !== states.connected) { | ||
throw new Error('[PGPubSub]: not connected') | ||
} | ||
let _retries = 0 | ||
if (typeof channel === 'object') { | ||
callback = payload | ||
payload = channel.payload | ||
_retries = channel._retries | ||
channel = channel.topic | ||
} | ||
if (typeof payload === 'object') { | ||
@@ -63,3 +47,3 @@ payload = JSON.stringify(payload) | ||
if (Buffer.byteLength(parsedPayload, 'utf-8') > this.maxPayloadSize) { | ||
throw new Error(`Payload exceeds maximum size: ${this.maxPayloadSize}`) | ||
throw new Error(`[PGPubSub]: payload exceeds maximum size: ${this.maxPayloadSize}`) | ||
} | ||
@@ -71,118 +55,34 @@ | ||
if (this.state !== states.connected) { | ||
this._insertMessageInQueue({ topic: channel, payload }) | ||
if (callback) { | ||
callback() | ||
} else { | ||
return Promise.resolve() | ||
} | ||
} else { | ||
return this.client.query(`NOTIFY ${format.ident(channel)}, ${parsedPayload}`) | ||
.then(() => { | ||
this.continuousEmitFails = 0 | ||
debug('[emit] emitted') | ||
// ensure queue is empty | ||
this._flushQueue() | ||
if (callback) { | ||
callback() | ||
} | ||
}) | ||
.catch(err => { | ||
this.continuousEmitFails++ | ||
debug('[emit] failed to emit') | ||
debug('[emit] state:', this.state) | ||
// failed to notify, add it to queue to process it later to avoid data loss | ||
this._insertMessageInQueue({ topic: channel, payload, _retries }) | ||
if (this.state === states.connected) { | ||
console.error('[PGPubSub]: emit failed', err.message) | ||
} | ||
}) | ||
} | ||
return this.client.query(`NOTIFY ${format.ident(channel)}, ${parsedPayload}`) | ||
} | ||
on (topic, listener, callback) { | ||
debug('[subscribe]', topic) | ||
if (this.state === states.closing) { | ||
return Promise.resolve() | ||
async on (channel, listener) { | ||
debug('[subscribe]', channel) | ||
if (this.state !== states.connected) { | ||
throw new Error('[PGPubSub]: not connected') | ||
} | ||
let handler = listener | ||
if (this.emulateMqEmitterApi) { | ||
// needed to support this as drop-in replacement for mqemitter | ||
handler = (payload) => { | ||
listener({ payload }, () => {}) | ||
} | ||
if (this.channels[channel]) { | ||
this.ee.on(channel, listener) | ||
this.channels[channel].listeners++ | ||
return | ||
} | ||
if (this.channels[topic]) { | ||
this.ee.on(topic, handler) | ||
this.channels[topic].listeners++ | ||
if (callback) { | ||
callback() | ||
return | ||
} else { | ||
return Promise.resolve() | ||
} | ||
} | ||
this.ee.on(channel, listener) | ||
this.channels[channel] = { listeners: 1 } | ||
this.ee.on(topic, handler) | ||
this.channels[topic] = { listeners: 1 } | ||
if (this.state !== states.connected) { | ||
return | ||
} | ||
return this.client.query(`LISTEN ${format.ident(topic)}`) | ||
.then(() => { | ||
if (callback) { | ||
callback() | ||
} | ||
}) | ||
.catch((err) => { | ||
if (this.state === states.connected) { | ||
console.error('[PGPubSub]: subscribe failed', err.message) | ||
} | ||
}) | ||
return this.client.query(`LISTEN ${format.ident(channel)}`) | ||
} | ||
removeListener (topic, handler, callback) { | ||
if (!this.channels[topic]) { | ||
callback() | ||
async removeListener (channel, listener) { | ||
if (!this.channels[channel]) { | ||
return | ||
} | ||
this.ee.removeListener(topic, handler) | ||
this.channels[topic].listeners-- | ||
this.ee.removeListener(channel, listener) | ||
this.channels[channel].listeners-- | ||
if (this.channels[topic].listeners === 0) { | ||
delete this.channels[topic] | ||
if (this.state !== states.connected) { | ||
if (callback) { | ||
callback() | ||
return | ||
} else { | ||
return Promise.resolve() | ||
} | ||
} | ||
return this.client.query(`UNLISTEN ${format.ident(topic)}`) | ||
.then(() => { | ||
if (callback) { | ||
callback() | ||
} | ||
}) | ||
.catch(err => { | ||
if (this.state === states.connected) { | ||
console.error('[PGPubSub]: removeListener failed', err.message) | ||
} | ||
}) | ||
} else { | ||
if (callback) { | ||
callback() | ||
} else { | ||
return Promise.resolve() | ||
} | ||
if (this.channels[channel].listeners === 0) { | ||
delete this.channels[channel] | ||
return this.client.query(`UNLISTEN ${format.ident(channel)}`) | ||
} | ||
@@ -198,3 +98,2 @@ } | ||
} catch (e) { | ||
console.error('[PGPubSub]: error setting up client, reconnecting', e) | ||
await this._reconnect() | ||
@@ -210,3 +109,2 @@ } | ||
this.channels = {} | ||
this.queue = [] | ||
this.ee.removeAllListeners() | ||
@@ -228,6 +126,2 @@ if (this.client) { | ||
if (this.reconnectRetries > 5) { | ||
await sleep(this.reconnectDelay) | ||
} | ||
try { | ||
@@ -239,17 +133,13 @@ this.client.end() | ||
this.close() | ||
throw new Error('[PGPubSub]: Max reconnect attempts reached, aborting', err) | ||
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', err) | ||
} | ||
if (![states.closing, states.connected].includes((this.state))) { | ||
await sleep(10) | ||
await this._reconnect(true) | ||
} | ||
} | ||
if (this.state === states.connected) { | ||
debug('[_reconnect] flushing queue') | ||
await this._flushQueue() | ||
} | ||
} | ||
async _setupClient () { | ||
this.client = new pg.Client(this.opts.db) | ||
this.client = new pg.Client(this.opts) | ||
await this.client.connect() | ||
@@ -272,3 +162,3 @@ | ||
this.close() | ||
throw new Error('[PGPubSub]: Max reconnect attempts reached, aborting', err) | ||
throw new Error('[PGPubSub]: max reconnect attempts reached, aborting', err) | ||
} | ||
@@ -289,55 +179,5 @@ | ||
debug('[_setupClient] init listeners done') | ||
await this._flushQueue() | ||
} | ||
_insertMessageInQueue (message) { | ||
debug('[_insertMessageInQueue] queue.length', this.queue.length) | ||
if (typeof message._retries !== 'undefined') { | ||
message._retries++ | ||
} else { | ||
message._retries = 0 | ||
} | ||
debug('[_insertMessageInQueue] message.retries', message._retries) | ||
if (this.state !== states.closing && this.queue.length < this.queueSize) { | ||
this.queue.push(message) | ||
} | ||
} | ||
async _flushQueue () { | ||
if (this.flushingQueue) { | ||
return | ||
} | ||
this.flushingQueue = true | ||
debug('[_flushQueue] flushing queue') | ||
while (this.queue.length) { | ||
if (this.state !== states.connected) { | ||
break | ||
} | ||
const message = this.queue.shift() | ||
if (message._retries && message._retries > this.maxEmitRetries) { | ||
// skip messages that continuously failed | ||
console.error('[PGPubSub]: emit failed after retries', message) | ||
} else { | ||
// start throttling retries when emits start continuously failing | ||
if (this.continuousEmitFails > this.continuousEmitFailureThreshold) { | ||
await sleep(this.emitThrottleDelay) | ||
} | ||
try { | ||
await this.emit(message) | ||
} catch (e) { | ||
this._insertMessageInQueue(message) | ||
} | ||
} | ||
} | ||
this.flushingQueue = false | ||
} | ||
} | ||
module.exports = PGPubSub |
{ | ||
"name": "pg-notify", | ||
"version": "0.0.12", | ||
"version": "0.0.14", | ||
"description": "Postgres pubsub client", | ||
@@ -32,4 +32,4 @@ "main": "index.js", | ||
"dependencies": { | ||
"debug": "^4.2.0", | ||
"pg": "^8.4.1", | ||
"debug": "^4.3.1", | ||
"pg": "^8.5.1", | ||
"pg-format": "^1.0.4", | ||
@@ -39,9 +39,10 @@ "secure-json-parse": "^2.1.0" | ||
"devDependencies": { | ||
"@types/pg": "^7.14.5", | ||
"ava": "^3.13.0", | ||
"@types/pg": "^7.14.7", | ||
"ava": "^3.14.0", | ||
"benchmark": "^2.1.4", | ||
"coveralls": "^3.1.0", | ||
"dotenv": "^8.2.0", | ||
"nyc": "^15.1.0", | ||
"standard": "^16.0.0", | ||
"tsd": "^0.13.1" | ||
"standard": "^16.0.3", | ||
"tsd": "^0.14.0" | ||
}, | ||
@@ -48,0 +49,0 @@ "tsd": { |
@@ -22,6 +22,9 @@ <h1 align="center">pg-notify</h1> | ||
This is a pre-release version, which does not follow semver. There can be breaking changes in patch/minor versions. | ||
The first stable release will be released with v1.0.0. | ||
Use this at your own risk. | ||
## Features | ||
- Auto reconnect | ||
- Payload size validation | ||
- Channel and payload sanitization | ||
## Install | ||
@@ -38,2 +41,4 @@ | ||
> PGPubSub accepts the same config as [pg](https://github.com/brianc/node-postgres). | ||
```js | ||
@@ -44,4 +49,4 @@ const PGPubSub = require('pg-notify') | ||
;(async () => { | ||
const pubsub = new PGPubSub({ | ||
db: { connectionString: 'postgres://postgres:postgres@localhost:5432/db' } | ||
const pubsub = new PGPubSub({ | ||
connectionString: 'postgres://postgres:postgres@localhost:5432/db' | ||
}) | ||
@@ -60,2 +65,23 @@ | ||
## API | ||
### new PubSub(options) | ||
- `options` (`object`) Configuration options for pg-notify pubsub instance. Accepts same options as [pg](https://github.com/brianc/node-postgres) with few custom ones described below. | ||
- reconnectMaxRetries (`number`) Maximum number of reconnect attempts after losing connection. Default: `10`. | ||
- maxPayloadSize (`number`) Maximum payload size, exceeding given size will throw an error. Default: `7999` ([In the default configuration it must be shorter than 8000 bytes.](https://www.postgresql.org/docs/current/sql-notify.html)). | ||
### emit(channel, payload) | ||
- `channel` (`string`) | ||
- `payload` (`string` or `object`) | ||
### on(channel, listener) | ||
- `channel` (`string`) | ||
- `listener` (`function`) accepting single argument `payload` | ||
### removeListener(listener) | ||
- `listener` (`function`) accepting single argument `payload` | ||
### close() | ||
### connect() | ||
## Contributing | ||
@@ -62,0 +88,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
92
9999
8
154
1
Updateddebug@^4.3.1
Updatedpg@^8.5.1