pg-logical-replication
Advanced tools
Comparing version 1.0.2 to 1.0.3
71
index.js
@@ -12,2 +12,40 @@ /** | ||
function standbyStatusUpdate(client, upperWAL, lowerWAL, msg = 'nothing') { | ||
// Timestamp as microseconds since midnight 2000-01-01 | ||
var now = (Date.now() - 946080000000) | ||
var upperTimestamp = Math.floor(now / 4294967.296) | ||
var lowerTimestamp = Math.floor((now - upperTimestamp * 4294967.296)) | ||
if (lowerWAL === 4294967295) { // [0xff, 0xff, 0xff, 0xff] | ||
upperWAL = upperWAL + 1 | ||
lowerWAL = 0 | ||
} else { | ||
lowerWAL = lowerWAL + 1 | ||
} | ||
var response = Buffer.alloc(34) | ||
response.fill(0x72) // 'r' | ||
// Last WAL Byte + 1 received and written to disk locally | ||
response.writeUInt32BE(upperWAL, 1) | ||
response.writeUInt32BE(lowerWAL, 5) | ||
// Last WAL Byte + 1 flushed to disk in the standby | ||
response.writeUInt32BE(upperWAL, 9) | ||
response.writeUInt32BE(lowerWAL, 13) | ||
// Last WAL Byte + 1 applied in the standby | ||
response.writeUInt32BE(upperWAL, 17) | ||
response.writeUInt32BE(lowerWAL, 21) | ||
// Timestamp as microseconds since midnight 2000-01-01 | ||
response.writeUInt32BE(upperTimestamp, 25) | ||
response.writeUInt32BE(lowerTimestamp, 29) | ||
// If 1, requests server to respond immediately - can be used to verify connectivity | ||
response.writeInt8(0, 33) | ||
client.connection.sendCopyFromChunk(response) | ||
} | ||
var LogicalReplication = function(config) { | ||
@@ -33,3 +71,2 @@ EventEmitter.call(this); | ||
* includeTimestamp : include timestamp on COMMIT, default false | ||
* skipEmptyXacts : skip empty transaction like DDL, default true | ||
*/ | ||
@@ -54,3 +91,2 @@ stoped = false; | ||
'"include-timestamp" \'' + (option.includeTimestamp === true ? 'on' : 'off') + '\'', | ||
'"skip-empty-xacts" \'' + (option.skipEmptyXacts !== false ? 'on' : 'off') + '\'', | ||
]; | ||
@@ -73,12 +109,29 @@ sql += ' (' + (opts.join(' , ')) + ')'; | ||
client.connection.on('copyData', function(msg) { | ||
if (msg.chunk[0] != 0x77) { | ||
return; | ||
if (msg.chunk[0] == 0x77) { // XLogData | ||
var lsn = (msg.chunk.readUInt32BE(1).toString(16).toUpperCase()) + '/' + (msg.chunk.readUInt32BE(5).toString(16).toUpperCase()); | ||
self.emit('data', { | ||
lsn, | ||
log: msg.chunk.slice(25), | ||
}); | ||
} else if (msg.chunk[0] == 0x6b) { // Primary keepalive message | ||
var lsn = (msg.chunk.readUInt32BE(1).toString(16).toUpperCase()) + '/' + (msg.chunk.readUInt32BE(5).toString(16).toUpperCase()); | ||
var timestamp = Math.floor(msg.chunk.readUInt32BE(9) * 4294967.296 + msg.chunk.readUInt32BE(13) / 1000 + 946080000000) | ||
var shouldRespond = msg.chunk.readInt8(17) | ||
self.emit('heartbeat', { | ||
lsn, | ||
timestamp, | ||
shouldRespond | ||
}); | ||
} else { | ||
console.log('Unknown message', msg.chunk[0]) | ||
} | ||
var lsn = (msg.chunk.readUInt32BE(1).toString(16).toUpperCase()) + '/' + (msg.chunk.readUInt32BE(5).toString(16).toUpperCase()); | ||
self.emit('data', { | ||
lsn: lsn, | ||
log: msg.chunk.slice(25), | ||
}); | ||
}); | ||
self.on('acknowledge', function(msg) { | ||
var lsn = msg.lsn.split('/') | ||
upperWALCheckpoint = lsn[0] | ||
lowerWALCheckpoint = lsn[1] | ||
standbyStatusUpdate(client, parseInt(lsn[0], 16), parseInt(lsn[1], 16), 'acknowledge') | ||
}) | ||
}); | ||
@@ -85,0 +138,0 @@ }); |
{ | ||
"name": "pg-logical-replication", | ||
"version": "1.0.2", | ||
"version": "1.0.3", | ||
"description": "PostgreSQL Location Replication client - logical WAL replication streaming", | ||
@@ -22,3 +22,6 @@ "keywords": [ | ||
}, | ||
"author": "Kibae Shin <nonunnet@gmail.com>", | ||
"author": {"name":"Kibae Shin", "email":"kibae.shin@gmail.com", "url":"https://github.com/kibae"}, | ||
"contributors": [ | ||
{"name":"Jón Tómas Grétarsson", "email":"jon.gretarsson@gmail.com", "url":"https://github.com/jontg"} | ||
], | ||
"main": "./index", | ||
@@ -25,0 +28,0 @@ "dependencies": { |
@@ -34,3 +34,2 @@ # pg-logical-replication | ||
- ```includeTimestamp``` : bool (default: false) | ||
- ```skipEmptyXacts``` : bool (default: true) | ||
@@ -104,3 +103,2 @@ ### 3-2. Method - Stream.stop | ||
includeTimestamp: false, //default: false | ||
skipEmptyXacts: true, //default: true | ||
}, function(err) { | ||
@@ -116,2 +114,8 @@ if (err) { | ||
## PostgreSQL side | ||
- postgresql.conf | ||
``` | ||
wal_level = logical | ||
max_wal_senders = [bigger than 1] | ||
max_replication_slots = [bigger than 1] | ||
``` | ||
- Create logical replication slot | ||
@@ -118,0 +122,0 @@ ```sql |
@@ -10,3 +10,3 @@ /* | ||
//Connection parameter : https://github.com/brianc/node-postgres/wiki/Client#parameters | ||
//Connection parameter : https://node-postgres.com/features/connecting | ||
var connInfo = {}; | ||
@@ -33,7 +33,6 @@ | ||
(function proc() { | ||
function proc() { | ||
stream.getChanges('test_slot', lastLsn, { | ||
includeXids: false, //default: false | ||
includeTimestamp: false, //default: false | ||
skipEmptyXacts: true, //default: true | ||
}, function(err) { | ||
@@ -45,2 +44,3 @@ if (err) { | ||
}); | ||
})(); | ||
}; | ||
proc(); |
53430
1495
126