pg-logical-replication
- PostgreSQL Logical Replication client for node.js
1. Install
$ npm install pg-logical-replication
2. LogicalReplication
new LogicalReplication( object config ) : Stream
var LogicalReplication = require('pg-logical-replication');
var stream = new LogicalReplication({});
3. Stream
3-1. Method - Stream.getChanges
- Start WAL streaming of changes.
stream.getChanges( slotName, uptoLsn, option, initialErrorCallback );
uptoLsn
can be null, the minimum value is "0/00000000".option
can contain any of the following optional properties
includeXids
: bool (default: false)includeTimestamp
: bool (default: false)
3-2. Method - Stream.stop
stream.stop();
3-3. Event - Stream.on('data')
- Raised when new data streamed from PostgreSQL server.
stream.on('data', ( msg)=>{});
msg
contains lsn (string)
, log (buffer)
3-4. Event - Stream.on('error')
- Raised when error or disconnected.
stream.on('error', ( err)=>{});
4. Plugin
4-1. test_decoding output
- If you are using
test_decoding
, this plugin will be useful.
var PluginTestDecoding = LogicalReplication.LoadPlugin('output/test_decoding');
PluginTestDecoding.parse(msg.log.toString('utf8'));
Example
var LogicalReplication = require('pg-logical-replication');
var PluginTestDecoding = LogicalReplication.LoadPlugin('output/test_decoding');
var connInfo = {};
var lastLsn = null;
var stream = (new LogicalReplication(connInfo))
.on('data', function(msg) {
lastLsn = msg.lsn || lastLsn;
var log = (msg.log || '').toString('utf8');
try {
console.log(PluginTestDecoding.parse(log));
} catch (e) {
console.trace(log, e);
}
}).on('error', function(err) {
console.trace('Error #2', err);
setTimeout(proc, 1000);
});
(function proc() {
stream.getChanges('test_slot', lastLsn, {
includeXids: false,
includeTimestamp: false,
}, function(err) {
if (err) {
console.trace('Logical replication initialize error', err);
setTimeout(proc, 1000);
}
});
})();
PostgreSQL side
wal_level = logical
max_wal_senders = [bigger than 1]
max_replication_slots = [bigger than 1]
- Create logical replication slot
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
- Delete logical replication slot
SELECT pg_drop_replication_slot('test_slot');