Socket
Socket
Sign inDemoInstall

pg-logical-replication

Package Overview
Dependencies
36
Maintainers
2
Versions
13
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.1.0 to 2.0.0

dist/index.d.ts

2

LICENSE.md
MIT License
Copyright (c) 2017 Kibae Shin (nonunnet@gmail.com)
Copyright (c) 2017 Kibae Shin (kibae.shin@gmail.com), [Contributors](https://github.com/kibae/pg-logical-replication/graphs/contributors)

@@ -5,0 +5,0 @@ Permission is hereby granted, free of charge, to any person obtaining a copy

{
"name": "pg-logical-replication",
"version": "1.1.0",
"version": "2.0.0",
"description": "PostgreSQL Location Replication client - logical WAL replication streaming",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"test": "jest --maxWorkers=1",
"test:watch": "jest --watch",
"build": "rm -rf dist/* ; tsc -p tsconfig.build.json",
"publish": "npm run build && npm publish --access public"
},
"keywords": [

@@ -10,3 +18,2 @@ "postgres",

"libpq",
"postgre",
"database",

@@ -16,3 +23,10 @@ "rdbms",

"logical",
"logical-replication"
"logical-decoding",
"logical-replication",
"cdc",
"wal2json",
"pgoutput",
"decoderbufs",
"typescript",
"nodejs"
],

@@ -42,13 +56,61 @@ "homepage": "http://github.com/kibae/pg-logical-replication",

"name": "Caleb",
"email": "caleb.bertsch@gmail.com",
"url": "https://github.com/c4l3b"
},
{
"name": "Matt R. Wilson",
"email": "github.public@mattw.co",
"url": "https://github.com/mastermatt"
}
],
"main": "./index",
"files": [
"dist/**/*",
"*.md"
],
"devDependencies": {
"@types/jest": "^28.1.6",
"@types/pg": "^8.6.5",
"jest": "^28.1.3",
"jest-junit": "^14.0.0",
"prettier": "^2.7.1",
"protobufjs": "^7.0.0",
"ts-jest": "^28.0.7",
"ts-node": "^10.9.1",
"typescript": "^4.7.4"
},
"dependencies": {
"pg": "^6.2.2"
"eventemitter2": ">=6.4.0",
"pg": ">=6.2.2"
},
"peerDependencies": {
"eventemitter2": ">=6.4.0",
"pg": ">=6.2.2"
},
"license": "MIT",
"engines": {
"node": ">= 0.8.0"
"node": ">= 14.0.0"
},
"jest": {
"moduleFileExtensions": [
"js",
"json",
"ts"
],
"rootDir": ".",
"testRegex": ".*\\.spec\\.ts$",
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
},
"collectCoverageFrom": [
"**/*.(t|j)s"
],
"testEnvironment": "node",
"roots": [
"<rootDir>/src/"
],
"reporters": [
"default",
"jest-junit"
]
}
}
# pg-logical-replication
- PostgreSQL Logical Replication client for node.js
- [PostgreSQL Logical Replication](https://www.postgresql.org/docs/current/logical-replication.html) client for node.js
- Supported plugins
- [wal2json](https://github.com/eulerto/wal2json) (Recommended)
- [pgoutput](https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html) (Native to PostgreSQL)
- [decoderbufs](https://github.com/debezium/postgres-decoderbufs)
- [test_decoding](https://www.postgresql.org/docs/current/test-decoding.html) (Not recommended)
- [Document for old version(1.x)](https://github.com/kibae/pg-logical-replication/blob/master/README-1.x.md)
[![Node.js CI](https://github.com/kibae/pg-logical-replication/actions/workflows/node.js.yml/badge.svg)](https://github.com/kibae/pg-logical-replication/actions/workflows/node.js.yml)
[![NPM Version](https://badge.fury.io/js/pg-logical-replication.svg)](https://www.npmjs.com/package/pg-logical-replication)
[![License](https://img.shields.io/github/license/kibae/pg-logical-replication)](https://github.com/kibae/pg-logical-replication/blob/main/LICENSE)
## 1. Install
- **pg-logical-replication** depends on [pq (node-postgres)](https://github.com/brianc/node-postgres) >= 6.2.2
- **pg-logical-replication** depends on [pq(node-postgres) >= 6.2.2](https://github.com/brianc/node-postgres)
and [eventemitter2](https://www.npmjs.com/package/eventemitter2)
```sh

@@ -12,119 +25,153 @@ $ npm install pg-logical-replication

## 2. LogicalReplication
```javascript
new LogicalReplication( object config ) : Stream
```
- Creates a new, unconnected instance of a logical replication stream configured via supplied configuration object.
- https://github.com/brianc/node-postgres/wiki/Client#parameters
## 2. Usage
- This is an example using `wal2json`. A replication slot(`test_slot_wal2json`) must be created on the PostgreSQL server.
- `SELECT * FROM pg_create_logical_replication_slot('test_slot_wal2json', 'wal2json')`
```javascript
var LogicalReplication = require('pg-logical-replication');
var stream = new LogicalReplication({/*config*/});
```
```typescript
const slotName = 'test_slot_wal2json';
## 3. Stream
### 3-1. Method - Stream.getChanges
- Start WAL streaming of changes.
```javascript
stream.getChanges( /*string*/ slotName, /*string*/ uptoLsn, /*object*/ option, /*function(err)*/ initialErrorCallback );
```
- ```uptoLsn``` can be null, the minimum value is "0/00000000".
- ```option``` can contain any of the following optional properties
- ```standbyMessageTimeout``` : maximum seconds between keepalive messages (default: 10)
- ```includeXids``` : bool (default: false)
- ```includeTimestamp``` : bool (default: false)
- ```queryOptions``` : object containing decoder specific options (optional)
- ```'include-types': false```
- ```'filter-tables': 'foo.bar'```
const service = new LogicalReplicationService(
/**
* node-postgres Client options for connection
* https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
*/
{
database: 'playground',
// ...
},
/**
* Logical replication service config
* https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
*/
{
acknowledge: {
auto: true,
timeoutSeconds: 10
}
}
)
### 3-2. Method - Stream.stop
- Stop WAL streaming.
```javascript
stream.stop();
```
// `TestDecodingPlugin` for test_decoding and `ProtocolBuffersDecodingPlugin` for decoderbufs are also available.
const plugin = new Wal2JsonDecodingPlugin({
/**
* Plugin options for wal2json
* https://github.com/kibae/pg-logical-replication/blob/main/src/output-plugins/wal2json/wal2json-plugin-options.type.ts
*/
//...
});
### 3-3. Event - Stream.on('data')
- Raised when new data streamed from PostgreSQL server.
```javascript
stream.on('data', (/*object*/ msg)=>{/*...*/});
```
- ```msg``` contains ```lsn (string)```, ```log (buffer)```
/**
* Wal2Json.Output
* https://github.com/kibae/pg-logical-replication/blob/ts-main/src/output-plugins/wal2json/wal2json-plugin-output.type.ts
*/
service.on('data', (lsn: string, log: Wal2Json.Output) => {
// Do something what you want.
// log.change.filter((change) => change.kind === 'insert').length;
});
### 3-4. Event - Stream.on('error')
- Raised when error or disconnected.
```javascript
stream.on('error', (/*object*/ err)=>{/*...*/});
// Start subscribing to data change events.
(function proc() {
service.subscribe(plugin, slotName)
.catch((e) => {
console.error(e);
})
.then(() => {
setTimeout(proc, 100);
});
})();
```
## 4. Plugin
### 4-1. test_decoding output
- If you are using ```test_decoding```, this plugin will be useful.
```javascript
var PluginTestDecoding = LogicalReplication.LoadPlugin('output/test_decoding');
PluginTestDecoding.parse(msg.log.toString('utf8'));
----
## 3. LogicalReplicationService
### 3-1. `Constructor(clientConfig: ClientConfig, config?: Partial<LogicalReplicationConfig>)`
```typescript
const service = new LogicalReplicationService(
/**
* node-postgres Client options for connection
* https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pg/index.d.ts#L16
*/
clientConfig: {
user?: string | undefined;
database?: string | undefined;
password?: string | (() => string | Promise<string>) | undefined;
port?: number | undefined;
host?: string | undefined;
connectionString?: string | undefined;
keepAlive?: boolean | undefined;
stream?: stream.Duplex | undefined;
statement_timeout?: false | number | undefined;
parseInputDatesAsUTC?: boolean | undefined;
ssl?: boolean | ConnectionOptions | undefined;
query_timeout?: number | undefined;
keepAliveInitialDelayMillis?: number | undefined;
idle_in_transaction_session_timeout?: number | undefined;
application_name?: string | undefined;
connectionTimeoutMillis?: number | undefined;
types?: CustomTypesConfig | undefined;
options?: string | undefined;
},
/**
* Logical replication service config
* https://github.com/kibae/pg-logical-replication/blob/main/src/logical-replication-service.ts#L9
*/
config?: Partial<{
acknowledge?: {
/**
* If the value is false, acknowledge must be done manually.
* Default: true
*/
auto: boolean;
/**
* Acknowledge is performed every set time (sec). If 0, do not do it.
* Default: 10
*/
timeoutSeconds: 0 | 10 | number;
};
}>
)
```
### 3-2. `subscribe(plugin: AbstractPlugin, slotName: string, uptoLsn?: string): Promise<this>`
- **Receive changes from the server.**
- `plugin` [output plugins](#4-output-plugins).
- `slotName` Logical replication slot name. You can create slot via [pg_create_logical_replication_slot](https://www.postgresql.org/docs/current/logicaldecoding-walsender.html) function.
- `uptoLsn` (optional) The starting point of the data to be streamed.
----
### 3-3. `acknowledge(lsn: string): Promise<boolean>`
- After processing the data, it signals the PostgreSQL server that it is OK to clear the WAL log.
- Usually this is done **automatically**.
- Manually use only when `new LogicalReplicationService({}, {acknowledge: {auto: false}})`.
## Example
```javascript
/*
* Test progress
* 1. Create logical replication slot with test_decoding output plugin : SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
* 2. Launch nodejs with this file : node test.js
* 3. Modify data of database
*/
var LogicalReplication = require('pg-logical-replication');
var PluginTestDecoding = LogicalReplication.LoadPlugin('output/test_decoding');
### 3-4. Event
- `on(event: 'start', listener: () => Promise<void> | void)`
- Emitted when start replication.
- `on(event: 'data', listener: (lsn: string, log: any) => Promise<void> | void)`
- Emitted when PostgreSQL data changes. The log value type varies depending on the plugin.
- `on(event: 'error', listener: (err: Error) => void)`
- `on(event: 'acknowledge', listener: (lsn: string) => Promise<void> | void)`
- Emitted when acknowledging automatically.
- `on(event: 'heartbeat', listener: (lsn: string, timestamp: number, shouldRespond: boolean) => Promise<void> | void)`
- A heartbeat check signal has been received from the server. You may need to run `service.acknowledge()`.
//Connection parameter : https://github.com/brianc/node-postgres/wiki/Client#parameters
var connInfo = {};
### 3-5. Misc. method
- `stop(): Promise<this>`
- Terminate the server's connection and stop replication.
- `isStop(): boolean`
- Returns false when replication starts from the server.
- `lastLsn(): string`
- Returns the last [LSN(Log Sequence Number)](https://www.postgresql.org/docs/current/datatype-pg-lsn.html) received from the server.
//Initialize with last LSN value
var lastLsn = null;
----
var stream = (new LogicalReplication(connInfo))
.on('data', function(msg) {
lastLsn = msg.lsn || lastLsn;
## 4. Output Plugins
### 4-1. `Wal2JsonDecodingPlugin` for [wal2json](https://github.com/eulerto/wal2json) (Recommended)
### 4-2. `ProtocolBuffersDecodingPlugin` for [decoderbufs](https://github.com/debezium/postgres-decoderbufs)
### 4-3. `TestDecodingPlugin` for [test_decoding](https://www.postgresql.org/docs/current/test-decoding.html) (Not recommended)
var log = (msg.log || '').toString('utf8');
try {
console.log(PluginTestDecoding.parse(log));
//TODO: DO SOMETHING. eg) replicate to other dbms(pgsql, mysql, ...)
} catch (e) {
console.trace(log, e);
}
}).on('error', function(err) {
console.trace('Error #2', err);
setTimeout(proc, 1000);
});
(function proc() {
stream.getChanges('test_slot', lastLsn, {
includeXids: false, //default: false
includeTimestamp: false, //default: false
}, function(err) {
if (err) {
console.trace('Logical replication initialize error', err);
setTimeout(proc, 1000);
}
});
})();
```
## PostgreSQL side
- postgresql.conf
```
wal_level = logical
max_wal_senders = [bigger than 1]
max_replication_slots = [bigger than 1]
```
- Create logical replication slot
```sql
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
```
- Delete logical replication slot
```sql
SELECT pg_drop_replication_slot('test_slot');
```
## Contributors
<a href="https://github.com/kibae/pg-logical-replication/graphs/contributors">
<img src="https://contrib.rocks/image?repo=kibae/pg-logical-replication" />
</a>
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc