Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pg-logical-replication

Package Overview
Dependencies
Maintainers
2
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pg-logical-replication - npm Package Compare versions

Comparing version 2.0.1 to 2.0.2

dist/output-plugins/decoderbufs/pg_logicaldec.proto.d.ts

224

dist/logical-replication-service.js
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
var __importDefault = (this && this.__importDefault) || function (mod) {

@@ -29,3 +20,7 @@ return (mod && mod.__esModule) ? mod : { "default": mod };

this.config = {
acknowledge: Object.assign({ auto: true, timeoutSeconds: 10 }, ((config === null || config === void 0 ? void 0 : config.acknowledge) || {})),
acknowledge: {
auto: true,
timeoutSeconds: 10,
...(config?.acknowledge || {}),
},
};

@@ -36,12 +31,13 @@ }

}
client() {
return __awaiter(this, void 0, void 0, function* () {
yield this.stop();
this._client = new pg_1.Client(Object.assign(Object.assign({}, this.clientConfig), { replication: 'database' }));
yield this._client.connect();
// @ts-ignore
this._connection = this._client.connection;
this._client.on('error', (e) => this.emit('error', e));
return [this._client, this._connection];
async client() {
await this.stop();
this._client = new pg_1.Client({
...this.clientConfig,
replication: 'database',
});
await this._client.connect();
// @ts-ignore
this._connection = this._client.connection;
this._client.on('error', (e) => this.emit('error', e));
return [this._client, this._connection];
}

@@ -51,14 +47,11 @@ isStop() {

}
stop() {
var _a, _b, _c;
return __awaiter(this, void 0, void 0, function* () {
this._stop = true;
(_a = this._connection) === null || _a === void 0 ? void 0 : _a.removeAllListeners();
this._connection = null;
(_b = this._client) === null || _b === void 0 ? void 0 : _b.removeAllListeners();
yield ((_c = this._client) === null || _c === void 0 ? void 0 : _c.end());
this._client = null;
this.checkStandbyStatus(false);
return this;
});
async stop() {
this._stop = true;
this._connection?.removeAllListeners();
this._connection = null;
this._client?.removeAllListeners();
await this._client?.end();
this._client = null;
this.checkStandbyStatus(false);
return this;
}

@@ -71,48 +64,44 @@ /**

*/
subscribe(plugin, slotName, uptoLsn) {
return __awaiter(this, void 0, void 0, function* () {
try {
const [client, connection] = yield this.client();
this._lastLsn = uptoLsn || this._lastLsn;
// check replicationStart
connection.once('replicationStart', () => {
this._stop = false;
this.emit('start');
this.checkStandbyStatus(true);
});
connection.on('copyData', ({ chunk: buffer }) => {
if (buffer[0] != 0x77 && buffer[0] != 0x6b) {
console.warn('Unknown message', buffer[0]);
return;
}
const lsn = buffer.readUInt32BE(1).toString(16).toUpperCase() + '/' + buffer.readUInt32BE(5).toString(16).toUpperCase();
if (buffer[0] == 0x77) {
// XLogData
this.emit('data', lsn, plugin.parse(buffer.subarray(25)));
this._acknowledge(lsn);
}
else if (buffer[0] == 0x6b) {
// Primary keepalive message
const timestamp = Math.floor(buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + 946080000000);
const shouldRespond = buffer.readInt8(17);
this.emit('heartbeat', lsn, timestamp, shouldRespond);
}
this._lastLsn = lsn;
});
return plugin.start(client, slotName, this._lastLsn || '0/00000000');
}
catch (e) {
yield this.stop();
this.emit('error', e);
throw e;
}
});
async subscribe(plugin, slotName, uptoLsn) {
try {
const [client, connection] = await this.client();
this._lastLsn = uptoLsn || this._lastLsn;
// check replicationStart
connection.once('replicationStart', () => {
this._stop = false;
this.emit('start');
this.checkStandbyStatus(true);
});
connection.on('copyData', ({ chunk: buffer }) => {
if (buffer[0] != 0x77 && buffer[0] != 0x6b) {
console.warn('Unknown message', buffer[0]);
return;
}
const lsn = buffer.readUInt32BE(1).toString(16).toUpperCase() + '/' + buffer.readUInt32BE(5).toString(16).toUpperCase();
if (buffer[0] == 0x77) {
// XLogData
this.emit('data', lsn, plugin.parse(buffer.subarray(25)));
this._acknowledge(lsn);
}
else if (buffer[0] == 0x6b) {
// Primary keepalive message
const timestamp = Math.floor(buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + 946080000000);
const shouldRespond = buffer.readInt8(17);
this.emit('heartbeat', lsn, timestamp, shouldRespond);
}
this._lastLsn = lsn;
});
return plugin.start(client, slotName, this._lastLsn || '0/00000000');
}
catch (e) {
await this.stop();
this.emit('error', e);
throw e;
}
}
_acknowledge(lsn) {
return __awaiter(this, void 0, void 0, function* () {
if (!this.config.acknowledge.auto)
return;
this.emit('acknowledge', lsn);
yield this.acknowledge(lsn);
});
async _acknowledge(lsn) {
if (!this.config.acknowledge.auto)
return;
this.emit('acknowledge', lsn);
await this.acknowledge(lsn);
}

@@ -125,3 +114,3 @@ checkStandbyStatus(enable) {

if (this.config.acknowledge.timeoutSeconds > 0 && enable)
this.checkStandbyStatusTimer = setInterval(() => __awaiter(this, void 0, void 0, function* () {
this.checkStandbyStatusTimer = setInterval(async () => {
if (this._stop)

@@ -131,4 +120,4 @@ return;

Date.now() - this.lastStandbyStatusUpdatedTime > this.config.acknowledge.timeoutSeconds * 1000)
yield this.acknowledge(this._lastLsn);
}), 1000);
await this.acknowledge(this._lastLsn);
}, 1000);
}

@@ -138,44 +127,41 @@ /**

*/
acknowledge(lsn) {
var _a;
return __awaiter(this, void 0, void 0, function* () {
if (this._stop)
return false;
this.lastStandbyStatusUpdatedTime = Date.now();
const slice = lsn.split('/');
let [upperWAL, lowerWAL] = [parseInt(slice[0], 16), parseInt(slice[1], 16)];
// Timestamp as microseconds since midnight 2000-01-01
const now = Date.now() - 946080000000;
const upperTimestamp = Math.floor(now / 4294967.296);
const lowerTimestamp = Math.floor(now - upperTimestamp * 4294967.296);
if (lowerWAL === 4294967295) {
// [0xff, 0xff, 0xff, 0xff]
upperWAL = upperWAL + 1;
lowerWAL = 0;
}
else {
lowerWAL = lowerWAL + 1;
}
const response = Buffer.alloc(34);
response.fill(0x72); // 'r'
// Last WAL Byte + 1 received and written to disk locally
response.writeUInt32BE(upperWAL, 1);
response.writeUInt32BE(lowerWAL, 5);
// Last WAL Byte + 1 flushed to disk in the standby
response.writeUInt32BE(upperWAL, 9);
response.writeUInt32BE(lowerWAL, 13);
// Last WAL Byte + 1 applied in the standby
response.writeUInt32BE(upperWAL, 17);
response.writeUInt32BE(lowerWAL, 21);
// Timestamp as microseconds since midnight 2000-01-01
response.writeUInt32BE(upperTimestamp, 25);
response.writeUInt32BE(lowerTimestamp, 29);
// If 1, requests server to respond immediately - can be used to verify connectivity
response.writeInt8(0, 33);
// @ts-ignore
(_a = this._connection) === null || _a === void 0 ? void 0 : _a.sendCopyFromChunk(response);
return true;
});
async acknowledge(lsn) {
if (this._stop)
return false;
this.lastStandbyStatusUpdatedTime = Date.now();
const slice = lsn.split('/');
let [upperWAL, lowerWAL] = [parseInt(slice[0], 16), parseInt(slice[1], 16)];
// Timestamp as microseconds since midnight 2000-01-01
const now = Date.now() - 946080000000;
const upperTimestamp = Math.floor(now / 4294967.296);
const lowerTimestamp = Math.floor(now - upperTimestamp * 4294967.296);
if (lowerWAL === 4294967295) {
// [0xff, 0xff, 0xff, 0xff]
upperWAL = upperWAL + 1;
lowerWAL = 0;
}
else {
lowerWAL = lowerWAL + 1;
}
const response = Buffer.alloc(34);
response.fill(0x72); // 'r'
// Last WAL Byte + 1 received and written to disk locally
response.writeUInt32BE(upperWAL, 1);
response.writeUInt32BE(lowerWAL, 5);
// Last WAL Byte + 1 flushed to disk in the standby
response.writeUInt32BE(upperWAL, 9);
response.writeUInt32BE(lowerWAL, 13);
// Last WAL Byte + 1 applied in the standby
response.writeUInt32BE(upperWAL, 17);
response.writeUInt32BE(lowerWAL, 21);
// Timestamp as microseconds since midnight 2000-01-01
response.writeUInt32BE(upperTimestamp, 25);
response.writeUInt32BE(lowerTimestamp, 29);
// If 1, requests server to respond immediately - can be used to verify connectivity
response.writeInt8(0, 33);
// @ts-ignore
this._connection?.sendCopyFromChunk(response);
return true;
}
}
exports.LogicalReplicationService = LogicalReplicationService;
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};

@@ -14,4 +8,3 @@ Object.defineProperty(exports, "__esModule", { value: true });

const abstract_plugin_1 = require("../abstract.plugin");
// https://github.com/debezium/postgres-decoderbufs/blob/main/proto/pg_logicaldec.proto
const decoderbufsProto = require('./pg_logicaldec.proto.json');
const pg_logicaldec_proto_1 = __importDefault(require("./pg_logicaldec.proto"));
class ProtocolBuffersPlugin extends abstract_plugin_1.AbstractPlugin {

@@ -22,3 +15,3 @@ constructor(options) {

const protobufjs = require('protobufjs');
this.proto = protobufjs.Root.fromJSON(decoderbufsProto);
this.proto = protobufjs.Root.fromJSON(pg_logicaldec_proto_1.default);
this.rowMessage = this.proto.lookupType('RowMessage');

@@ -35,11 +28,9 @@ }

}
start(client, slotName, lastLsn) {
return __awaiter(this, void 0, void 0, function* () {
const options = [];
let sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn}`;
if (options.length > 0)
sql += ` (${options.join(' , ')})`;
// console.log(sql);
return client.query(sql);
});
async start(client, slotName, lastLsn) {
const options = [];
let sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn}`;
if (options.length > 0)
sql += ` (${options.join(' , ')})`;
// console.log(sql);
return client.query(sql);
}

@@ -46,0 +37,0 @@ parse(buffer) {

@@ -109,6 +109,12 @@ "use strict";

const typeMod = reader.readInt32();
return Object.assign(Object.assign({ flags,
return {
flags,
name,
typeOid,
typeMod, typeSchema: null, typeName: null }, this._typeCache.get(typeOid)), { parser: pg_1.types.getTypeParser(typeOid) });
typeMod,
typeSchema: null,
typeName: null,
...this._typeCache.get(typeOid),
parser: pg_1.types.getTypeParser(typeOid),
};
}

@@ -210,3 +216,3 @@ msgInsert(reader) {

case 0x75: // 'u' unchanged toast datum
tuple[name] = unchangedToastFallback === null || unchangedToastFallback === void 0 ? void 0 : unchangedToastFallback[name];
tuple[name] = unchangedToastFallback?.[name];
break;

@@ -213,0 +219,0 @@ default:

"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
Object.defineProperty(exports, "__esModule", { value: true });

@@ -22,19 +13,17 @@ exports.TestDecodingPlugin = void 0;

}
start(client, slotName, lastLsn) {
return __awaiter(this, void 0, void 0, function* () {
const options = [
`"include-xids" '${this.options.includeXids === true ? 'on' : 'off'}'`,
`"include-timestamp" '${this.options.includeTimestamp === true ? 'on' : 'off'}'`,
];
if (this.options.skipEmptyXacts)
options.push(`"skip-empty-xacts" 'on'`);
if (this.options.includeRewrites)
options.push(`"include-rewrites" 'on'`);
if (this.options.includeSequences)
options.push(`"include-sequences" 'on'`);
if (this.options.streamChanges)
options.push(`"stream-changes" 'on'`);
const sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn} (${options.join(' , ')})`;
return client.query(sql);
});
async start(client, slotName, lastLsn) {
const options = [
`"include-xids" '${this.options.includeXids === true ? 'on' : 'off'}'`,
`"include-timestamp" '${this.options.includeTimestamp === true ? 'on' : 'off'}'`,
];
if (this.options.skipEmptyXacts)
options.push(`"skip-empty-xacts" 'on'`);
if (this.options.includeRewrites)
options.push(`"include-rewrites" 'on'`);
if (this.options.includeSequences)
options.push(`"include-sequences" 'on'`);
if (this.options.streamChanges)
options.push(`"stream-changes" 'on'`);
const sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn} (${options.join(' , ')})`;
return client.query(sql);
}

@@ -41,0 +30,0 @@ parse(buffer) {

"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
Object.defineProperty(exports, "__esModule", { value: true });

@@ -26,17 +17,15 @@ exports.Wal2JsonPlugin = void 0;

}
start(client, slotName, lastLsn) {
return __awaiter(this, void 0, void 0, function* () {
const options = [];
Object.entries(this.options).map(([key, value]) => {
if (wal2json_plugin_options_type_1.StringOptionKeys.includes(key))
options.push(`"${dashCase(key)}" '${value}'`);
else
options.push(`"${dashCase(key)}" '${value ? 'on' : 'off'}'`);
});
let sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn}`;
if (options.length > 0)
sql += ` (${options.join(' , ')})`;
// console.log(sql);
return client.query(sql);
async start(client, slotName, lastLsn) {
const options = [];
Object.entries(this.options).map(([key, value]) => {
if (wal2json_plugin_options_type_1.StringOptionKeys.includes(key))
options.push(`"${dashCase(key)}" '${value}'`);
else
options.push(`"${dashCase(key)}" '${value ? 'on' : 'off'}'`);
});
let sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn}`;
if (options.length > 0)
sql += ` (${options.join(' , ')})`;
// console.log(sql);
return client.query(sql);
}

@@ -43,0 +32,0 @@ parse(buffer) {

{
"name": "pg-logical-replication",
"version": "2.0.1",
"version": "2.0.2",
"description": "PostgreSQL Location Replication client - logical WAL replication streaming",

@@ -69,2 +69,3 @@ "main": "dist/index.js",

"@types/jest": "^28.1.6",
"@types/node": "^14.14.31",
"@types/pg": "^8.6.5",

@@ -83,6 +84,2 @@ "jest": "^28.1.3",

},
"peerDependencies": {
"eventemitter2": ">=6.4.0",
"pg": ">=6.2.2"
},
"license": "MIT",

@@ -89,0 +86,0 @@ "engines": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc