Socket
Socket
Sign inDemoInstall

pg-logical-replication

Package Overview
Dependencies
17
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.2 to 1.0.3

71

index.js

@@ -12,2 +12,40 @@ /**

function standbyStatusUpdate(client, upperWAL, lowerWAL, msg = 'nothing') {
// Timestamp as microseconds since midnight 2000-01-01
var now = (Date.now() - 946080000000)
var upperTimestamp = Math.floor(now / 4294967.296)
var lowerTimestamp = Math.floor((now - upperTimestamp * 4294967.296))
if (lowerWAL === 4294967295) { // [0xff, 0xff, 0xff, 0xff]
upperWAL = upperWAL + 1
lowerWAL = 0
} else {
lowerWAL = lowerWAL + 1
}
var response = Buffer.alloc(34)
response.fill(0x72) // 'r'
// Last WAL Byte + 1 received and written to disk locally
response.writeUInt32BE(upperWAL, 1)
response.writeUInt32BE(lowerWAL, 5)
// Last WAL Byte + 1 flushed to disk in the standby
response.writeUInt32BE(upperWAL, 9)
response.writeUInt32BE(lowerWAL, 13)
// Last WAL Byte + 1 applied in the standby
response.writeUInt32BE(upperWAL, 17)
response.writeUInt32BE(lowerWAL, 21)
// Timestamp as microseconds since midnight 2000-01-01
response.writeUInt32BE(upperTimestamp, 25)
response.writeUInt32BE(lowerTimestamp, 29)
// If 1, requests server to respond immediately - can be used to verify connectivity
response.writeInt8(0, 33)
client.connection.sendCopyFromChunk(response)
}
var LogicalReplication = function(config) {

@@ -33,3 +71,2 @@ EventEmitter.call(this);

* includeTimestamp : include timestamp on COMMIT, default false
* skipEmptyXacts : skip empty transaction like DDL, default true
*/

@@ -54,3 +91,2 @@ stoped = false;

'"include-timestamp" \'' + (option.includeTimestamp === true ? 'on' : 'off') + '\'',
'"skip-empty-xacts" \'' + (option.skipEmptyXacts !== false ? 'on' : 'off') + '\'',
];

@@ -73,12 +109,29 @@ sql += ' (' + (opts.join(' , ')) + ')';

client.connection.on('copyData', function(msg) {
if (msg.chunk[0] != 0x77) {
return;
if (msg.chunk[0] == 0x77) { // XLogData
var lsn = (msg.chunk.readUInt32BE(1).toString(16).toUpperCase()) + '/' + (msg.chunk.readUInt32BE(5).toString(16).toUpperCase());
self.emit('data', {
lsn,
log: msg.chunk.slice(25),
});
} else if (msg.chunk[0] == 0x6b) { // Primary keepalive message
var lsn = (msg.chunk.readUInt32BE(1).toString(16).toUpperCase()) + '/' + (msg.chunk.readUInt32BE(5).toString(16).toUpperCase());
var timestamp = Math.floor(msg.chunk.readUInt32BE(9) * 4294967.296 + msg.chunk.readUInt32BE(13) / 1000 + 946080000000)
var shouldRespond = msg.chunk.readInt8(17)
self.emit('heartbeat', {
lsn,
timestamp,
shouldRespond
});
} else {
console.log('Unknown message', msg.chunk[0])
}
var lsn = (msg.chunk.readUInt32BE(1).toString(16).toUpperCase()) + '/' + (msg.chunk.readUInt32BE(5).toString(16).toUpperCase());
self.emit('data', {
lsn: lsn,
log: msg.chunk.slice(25),
});
});
self.on('acknowledge', function(msg) {
var lsn = msg.lsn.split('/')
upperWALCheckpoint = lsn[0]
lowerWALCheckpoint = lsn[1]
standbyStatusUpdate(client, parseInt(lsn[0], 16), parseInt(lsn[1], 16), 'acknowledge')
})
});

@@ -85,0 +138,0 @@ });

7

package.json
{
"name": "pg-logical-replication",
"version": "1.0.2",
"version": "1.0.3",
"description": "PostgreSQL Location Replication client - logical WAL replication streaming",

@@ -22,3 +22,6 @@ "keywords": [

},
"author": "Kibae Shin <nonunnet@gmail.com>",
"author": {"name":"Kibae Shin", "email":"kibae.shin@gmail.com", "url":"https://github.com/kibae"},
"contributors": [
{"name":"Jón Tómas Grétarsson", "email":"jon.gretarsson@gmail.com", "url":"https://github.com/jontg"}
],
"main": "./index",

@@ -25,0 +28,0 @@ "dependencies": {

@@ -34,3 +34,2 @@ # pg-logical-replication

- ```includeTimestamp``` : bool (default: false)
- ```skipEmptyXacts``` : bool (default: true)

@@ -104,3 +103,2 @@ ### 3-2. Method - Stream.stop

includeTimestamp: false, //default: false
skipEmptyXacts: true, //default: true
}, function(err) {

@@ -116,2 +114,8 @@ if (err) {

## PostgreSQL side
- postgresql.conf
```
wal_level = logical
max_wal_senders = [bigger than 1]
max_replication_slots = [bigger than 1]
```
- Create logical replication slot

@@ -118,0 +122,0 @@ ```sql

@@ -10,3 +10,3 @@ /*

//Connection parameter : https://github.com/brianc/node-postgres/wiki/Client#parameters
//Connection parameter : https://node-postgres.com/features/connecting
var connInfo = {};

@@ -33,7 +33,6 @@

(function proc() {
function proc() {
stream.getChanges('test_slot', lastLsn, {
includeXids: false, //default: false
includeTimestamp: false, //default: false
skipEmptyXacts: true, //default: true
}, function(err) {

@@ -45,2 +44,3 @@ if (err) {

});
})();
};
proc();
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc