pg-logical-replication
Advanced tools
Comparing version 2.0.1 to 2.0.2
"use strict"; | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
@@ -29,3 +20,7 @@ return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
this.config = { | ||
acknowledge: Object.assign({ auto: true, timeoutSeconds: 10 }, ((config === null || config === void 0 ? void 0 : config.acknowledge) || {})), | ||
acknowledge: { | ||
auto: true, | ||
timeoutSeconds: 10, | ||
...(config?.acknowledge || {}), | ||
}, | ||
}; | ||
@@ -36,12 +31,13 @@ } | ||
} | ||
client() { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
yield this.stop(); | ||
this._client = new pg_1.Client(Object.assign(Object.assign({}, this.clientConfig), { replication: 'database' })); | ||
yield this._client.connect(); | ||
// @ts-ignore | ||
this._connection = this._client.connection; | ||
this._client.on('error', (e) => this.emit('error', e)); | ||
return [this._client, this._connection]; | ||
async client() { | ||
await this.stop(); | ||
this._client = new pg_1.Client({ | ||
...this.clientConfig, | ||
replication: 'database', | ||
}); | ||
await this._client.connect(); | ||
// @ts-ignore | ||
this._connection = this._client.connection; | ||
this._client.on('error', (e) => this.emit('error', e)); | ||
return [this._client, this._connection]; | ||
} | ||
@@ -51,14 +47,11 @@ isStop() { | ||
} | ||
stop() { | ||
var _a, _b, _c; | ||
return __awaiter(this, void 0, void 0, function* () { | ||
this._stop = true; | ||
(_a = this._connection) === null || _a === void 0 ? void 0 : _a.removeAllListeners(); | ||
this._connection = null; | ||
(_b = this._client) === null || _b === void 0 ? void 0 : _b.removeAllListeners(); | ||
yield ((_c = this._client) === null || _c === void 0 ? void 0 : _c.end()); | ||
this._client = null; | ||
this.checkStandbyStatus(false); | ||
return this; | ||
}); | ||
async stop() { | ||
this._stop = true; | ||
this._connection?.removeAllListeners(); | ||
this._connection = null; | ||
this._client?.removeAllListeners(); | ||
await this._client?.end(); | ||
this._client = null; | ||
this.checkStandbyStatus(false); | ||
return this; | ||
} | ||
@@ -71,48 +64,44 @@ /** | ||
*/ | ||
subscribe(plugin, slotName, uptoLsn) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
try { | ||
const [client, connection] = yield this.client(); | ||
this._lastLsn = uptoLsn || this._lastLsn; | ||
// check replicationStart | ||
connection.once('replicationStart', () => { | ||
this._stop = false; | ||
this.emit('start'); | ||
this.checkStandbyStatus(true); | ||
}); | ||
connection.on('copyData', ({ chunk: buffer }) => { | ||
if (buffer[0] != 0x77 && buffer[0] != 0x6b) { | ||
console.warn('Unknown message', buffer[0]); | ||
return; | ||
} | ||
const lsn = buffer.readUInt32BE(1).toString(16).toUpperCase() + '/' + buffer.readUInt32BE(5).toString(16).toUpperCase(); | ||
if (buffer[0] == 0x77) { | ||
// XLogData | ||
this.emit('data', lsn, plugin.parse(buffer.subarray(25))); | ||
this._acknowledge(lsn); | ||
} | ||
else if (buffer[0] == 0x6b) { | ||
// Primary keepalive message | ||
const timestamp = Math.floor(buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + 946080000000); | ||
const shouldRespond = buffer.readInt8(17); | ||
this.emit('heartbeat', lsn, timestamp, shouldRespond); | ||
} | ||
this._lastLsn = lsn; | ||
}); | ||
return plugin.start(client, slotName, this._lastLsn || '0/00000000'); | ||
} | ||
catch (e) { | ||
yield this.stop(); | ||
this.emit('error', e); | ||
throw e; | ||
} | ||
}); | ||
async subscribe(plugin, slotName, uptoLsn) { | ||
try { | ||
const [client, connection] = await this.client(); | ||
this._lastLsn = uptoLsn || this._lastLsn; | ||
// check replicationStart | ||
connection.once('replicationStart', () => { | ||
this._stop = false; | ||
this.emit('start'); | ||
this.checkStandbyStatus(true); | ||
}); | ||
connection.on('copyData', ({ chunk: buffer }) => { | ||
if (buffer[0] != 0x77 && buffer[0] != 0x6b) { | ||
console.warn('Unknown message', buffer[0]); | ||
return; | ||
} | ||
const lsn = buffer.readUInt32BE(1).toString(16).toUpperCase() + '/' + buffer.readUInt32BE(5).toString(16).toUpperCase(); | ||
if (buffer[0] == 0x77) { | ||
// XLogData | ||
this.emit('data', lsn, plugin.parse(buffer.subarray(25))); | ||
this._acknowledge(lsn); | ||
} | ||
else if (buffer[0] == 0x6b) { | ||
// Primary keepalive message | ||
const timestamp = Math.floor(buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + 946080000000); | ||
const shouldRespond = buffer.readInt8(17); | ||
this.emit('heartbeat', lsn, timestamp, shouldRespond); | ||
} | ||
this._lastLsn = lsn; | ||
}); | ||
return plugin.start(client, slotName, this._lastLsn || '0/00000000'); | ||
} | ||
catch (e) { | ||
await this.stop(); | ||
this.emit('error', e); | ||
throw e; | ||
} | ||
} | ||
_acknowledge(lsn) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
if (!this.config.acknowledge.auto) | ||
return; | ||
this.emit('acknowledge', lsn); | ||
yield this.acknowledge(lsn); | ||
}); | ||
async _acknowledge(lsn) { | ||
if (!this.config.acknowledge.auto) | ||
return; | ||
this.emit('acknowledge', lsn); | ||
await this.acknowledge(lsn); | ||
} | ||
@@ -125,3 +114,3 @@ checkStandbyStatus(enable) { | ||
if (this.config.acknowledge.timeoutSeconds > 0 && enable) | ||
this.checkStandbyStatusTimer = setInterval(() => __awaiter(this, void 0, void 0, function* () { | ||
this.checkStandbyStatusTimer = setInterval(async () => { | ||
if (this._stop) | ||
@@ -131,4 +120,4 @@ return; | ||
Date.now() - this.lastStandbyStatusUpdatedTime > this.config.acknowledge.timeoutSeconds * 1000) | ||
yield this.acknowledge(this._lastLsn); | ||
}), 1000); | ||
await this.acknowledge(this._lastLsn); | ||
}, 1000); | ||
} | ||
@@ -138,44 +127,41 @@ /** | ||
*/ | ||
acknowledge(lsn) { | ||
var _a; | ||
return __awaiter(this, void 0, void 0, function* () { | ||
if (this._stop) | ||
return false; | ||
this.lastStandbyStatusUpdatedTime = Date.now(); | ||
const slice = lsn.split('/'); | ||
let [upperWAL, lowerWAL] = [parseInt(slice[0], 16), parseInt(slice[1], 16)]; | ||
// Timestamp as microseconds since midnight 2000-01-01 | ||
const now = Date.now() - 946080000000; | ||
const upperTimestamp = Math.floor(now / 4294967.296); | ||
const lowerTimestamp = Math.floor(now - upperTimestamp * 4294967.296); | ||
if (lowerWAL === 4294967295) { | ||
// [0xff, 0xff, 0xff, 0xff] | ||
upperWAL = upperWAL + 1; | ||
lowerWAL = 0; | ||
} | ||
else { | ||
lowerWAL = lowerWAL + 1; | ||
} | ||
const response = Buffer.alloc(34); | ||
response.fill(0x72); // 'r' | ||
// Last WAL Byte + 1 received and written to disk locally | ||
response.writeUInt32BE(upperWAL, 1); | ||
response.writeUInt32BE(lowerWAL, 5); | ||
// Last WAL Byte + 1 flushed to disk in the standby | ||
response.writeUInt32BE(upperWAL, 9); | ||
response.writeUInt32BE(lowerWAL, 13); | ||
// Last WAL Byte + 1 applied in the standby | ||
response.writeUInt32BE(upperWAL, 17); | ||
response.writeUInt32BE(lowerWAL, 21); | ||
// Timestamp as microseconds since midnight 2000-01-01 | ||
response.writeUInt32BE(upperTimestamp, 25); | ||
response.writeUInt32BE(lowerTimestamp, 29); | ||
// If 1, requests server to respond immediately - can be used to verify connectivity | ||
response.writeInt8(0, 33); | ||
// @ts-ignore | ||
(_a = this._connection) === null || _a === void 0 ? void 0 : _a.sendCopyFromChunk(response); | ||
return true; | ||
}); | ||
async acknowledge(lsn) { | ||
if (this._stop) | ||
return false; | ||
this.lastStandbyStatusUpdatedTime = Date.now(); | ||
const slice = lsn.split('/'); | ||
let [upperWAL, lowerWAL] = [parseInt(slice[0], 16), parseInt(slice[1], 16)]; | ||
// Timestamp as microseconds since midnight 2000-01-01 | ||
const now = Date.now() - 946080000000; | ||
const upperTimestamp = Math.floor(now / 4294967.296); | ||
const lowerTimestamp = Math.floor(now - upperTimestamp * 4294967.296); | ||
if (lowerWAL === 4294967295) { | ||
// [0xff, 0xff, 0xff, 0xff] | ||
upperWAL = upperWAL + 1; | ||
lowerWAL = 0; | ||
} | ||
else { | ||
lowerWAL = lowerWAL + 1; | ||
} | ||
const response = Buffer.alloc(34); | ||
response.fill(0x72); // 'r' | ||
// Last WAL Byte + 1 received and written to disk locally | ||
response.writeUInt32BE(upperWAL, 1); | ||
response.writeUInt32BE(lowerWAL, 5); | ||
// Last WAL Byte + 1 flushed to disk in the standby | ||
response.writeUInt32BE(upperWAL, 9); | ||
response.writeUInt32BE(lowerWAL, 13); | ||
// Last WAL Byte + 1 applied in the standby | ||
response.writeUInt32BE(upperWAL, 17); | ||
response.writeUInt32BE(lowerWAL, 21); | ||
// Timestamp as microseconds since midnight 2000-01-01 | ||
response.writeUInt32BE(upperTimestamp, 25); | ||
response.writeUInt32BE(lowerTimestamp, 29); | ||
// If 1, requests server to respond immediately - can be used to verify connectivity | ||
response.writeInt8(0, 33); | ||
// @ts-ignore | ||
this._connection?.sendCopyFromChunk(response); | ||
return true; | ||
} | ||
} | ||
exports.LogicalReplicationService = LogicalReplicationService; |
"use strict"; | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
var __importDefault = (this && this.__importDefault) || function (mod) { | ||
return (mod && mod.__esModule) ? mod : { "default": mod }; | ||
}; | ||
@@ -14,4 +8,3 @@ Object.defineProperty(exports, "__esModule", { value: true }); | ||
const abstract_plugin_1 = require("../abstract.plugin"); | ||
// https://github.com/debezium/postgres-decoderbufs/blob/main/proto/pg_logicaldec.proto | ||
const decoderbufsProto = require('./pg_logicaldec.proto.json'); | ||
const pg_logicaldec_proto_1 = __importDefault(require("./pg_logicaldec.proto")); | ||
class ProtocolBuffersPlugin extends abstract_plugin_1.AbstractPlugin { | ||
@@ -22,3 +15,3 @@ constructor(options) { | ||
const protobufjs = require('protobufjs'); | ||
this.proto = protobufjs.Root.fromJSON(decoderbufsProto); | ||
this.proto = protobufjs.Root.fromJSON(pg_logicaldec_proto_1.default); | ||
this.rowMessage = this.proto.lookupType('RowMessage'); | ||
@@ -35,11 +28,9 @@ } | ||
} | ||
start(client, slotName, lastLsn) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const options = []; | ||
let sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn}`; | ||
if (options.length > 0) | ||
sql += ` (${options.join(' , ')})`; | ||
// console.log(sql); | ||
return client.query(sql); | ||
}); | ||
async start(client, slotName, lastLsn) { | ||
const options = []; | ||
let sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn}`; | ||
if (options.length > 0) | ||
sql += ` (${options.join(' , ')})`; | ||
// console.log(sql); | ||
return client.query(sql); | ||
} | ||
@@ -46,0 +37,0 @@ parse(buffer) { |
@@ -109,6 +109,12 @@ "use strict"; | ||
const typeMod = reader.readInt32(); | ||
return Object.assign(Object.assign({ flags, | ||
return { | ||
flags, | ||
name, | ||
typeOid, | ||
typeMod, typeSchema: null, typeName: null }, this._typeCache.get(typeOid)), { parser: pg_1.types.getTypeParser(typeOid) }); | ||
typeMod, | ||
typeSchema: null, | ||
typeName: null, | ||
...this._typeCache.get(typeOid), | ||
parser: pg_1.types.getTypeParser(typeOid), | ||
}; | ||
} | ||
@@ -210,3 +216,3 @@ msgInsert(reader) { | ||
case 0x75: // 'u' unchanged toast datum | ||
tuple[name] = unchangedToastFallback === null || unchangedToastFallback === void 0 ? void 0 : unchangedToastFallback[name]; | ||
tuple[name] = unchangedToastFallback?.[name]; | ||
break; | ||
@@ -213,0 +219,0 @@ default: |
"use strict"; | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -22,19 +13,17 @@ exports.TestDecodingPlugin = void 0; | ||
} | ||
start(client, slotName, lastLsn) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const options = [ | ||
`"include-xids" '${this.options.includeXids === true ? 'on' : 'off'}'`, | ||
`"include-timestamp" '${this.options.includeTimestamp === true ? 'on' : 'off'}'`, | ||
]; | ||
if (this.options.skipEmptyXacts) | ||
options.push(`"skip-empty-xacts" 'on'`); | ||
if (this.options.includeRewrites) | ||
options.push(`"include-rewrites" 'on'`); | ||
if (this.options.includeSequences) | ||
options.push(`"include-sequences" 'on'`); | ||
if (this.options.streamChanges) | ||
options.push(`"stream-changes" 'on'`); | ||
const sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn} (${options.join(' , ')})`; | ||
return client.query(sql); | ||
}); | ||
async start(client, slotName, lastLsn) { | ||
const options = [ | ||
`"include-xids" '${this.options.includeXids === true ? 'on' : 'off'}'`, | ||
`"include-timestamp" '${this.options.includeTimestamp === true ? 'on' : 'off'}'`, | ||
]; | ||
if (this.options.skipEmptyXacts) | ||
options.push(`"skip-empty-xacts" 'on'`); | ||
if (this.options.includeRewrites) | ||
options.push(`"include-rewrites" 'on'`); | ||
if (this.options.includeSequences) | ||
options.push(`"include-sequences" 'on'`); | ||
if (this.options.streamChanges) | ||
options.push(`"stream-changes" 'on'`); | ||
const sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn} (${options.join(' , ')})`; | ||
return client.query(sql); | ||
} | ||
@@ -41,0 +30,0 @@ parse(buffer) { |
"use strict"; | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -26,17 +17,15 @@ exports.Wal2JsonPlugin = void 0; | ||
} | ||
start(client, slotName, lastLsn) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const options = []; | ||
Object.entries(this.options).map(([key, value]) => { | ||
if (wal2json_plugin_options_type_1.StringOptionKeys.includes(key)) | ||
options.push(`"${dashCase(key)}" '${value}'`); | ||
else | ||
options.push(`"${dashCase(key)}" '${value ? 'on' : 'off'}'`); | ||
}); | ||
let sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn}`; | ||
if (options.length > 0) | ||
sql += ` (${options.join(' , ')})`; | ||
// console.log(sql); | ||
return client.query(sql); | ||
async start(client, slotName, lastLsn) { | ||
const options = []; | ||
Object.entries(this.options).map(([key, value]) => { | ||
if (wal2json_plugin_options_type_1.StringOptionKeys.includes(key)) | ||
options.push(`"${dashCase(key)}" '${value}'`); | ||
else | ||
options.push(`"${dashCase(key)}" '${value ? 'on' : 'off'}'`); | ||
}); | ||
let sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn}`; | ||
if (options.length > 0) | ||
sql += ` (${options.join(' , ')})`; | ||
// console.log(sql); | ||
return client.query(sql); | ||
} | ||
@@ -43,0 +32,0 @@ parse(buffer) { |
{ | ||
"name": "pg-logical-replication", | ||
"version": "2.0.1", | ||
"version": "2.0.2", | ||
"description": "PostgreSQL Location Replication client - logical WAL replication streaming", | ||
@@ -69,2 +69,3 @@ "main": "dist/index.js", | ||
"@types/jest": "^28.1.6", | ||
"@types/node": "^14.14.31", | ||
"@types/pg": "^8.6.5", | ||
@@ -83,6 +84,2 @@ "jest": "^28.1.3", | ||
}, | ||
"peerDependencies": { | ||
"eventemitter2": ">=6.4.0", | ||
"pg": ">=6.2.2" | ||
}, | ||
"license": "MIT", | ||
@@ -89,0 +86,0 @@ "engines": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
123669
2
36
2982
10