pg-logical-replication
Advanced tools
Comparing version 1.0.4 to 1.1.0
57
index.js
@@ -59,2 +59,6 @@ /** | ||
var stoped = false; | ||
var lastLsn; | ||
var lastStatus = 0; | ||
var feedbackCheckInterval; | ||
var standbyMessageTimeout; | ||
@@ -68,2 +72,5 @@ this.getChanges = function(slotName, uptoLsn, option, cb /*(start_err)*/) { | ||
option = option || {}; | ||
standbyMessageTimeout = (typeof option.standbyMessageTimeout === 'undefined') ? 10 : option.standbyMessageTimeout; | ||
/* | ||
@@ -92,2 +99,15 @@ * includeXids : include xid on BEGIN and COMMIT, default false | ||
]; | ||
if (option.queryOptions) { | ||
Object.keys(option.queryOptions).forEach(key => { | ||
var value = option.queryOptions[key]; | ||
if (typeof value === 'boolean') { | ||
value = value === true ? 'on' : 'off'; | ||
} | ||
opts.push( | ||
`"${key}" '${value}'` | ||
) | ||
}) | ||
} | ||
sql += ' (' + (opts.join(' , ')) + ')'; | ||
@@ -117,2 +137,3 @@ | ||
self.emit('acknowledge', { lsn }); | ||
lastLsn = lsn; | ||
} else if (msg.chunk[0] == 0x6b) { // Primary keepalive message | ||
@@ -127,2 +148,3 @@ var lsn = (msg.chunk.readUInt32BE(1).toString(16).toUpperCase()) + '/' + (msg.chunk.readUInt32BE(5).toString(16).toUpperCase()); | ||
}); | ||
lastLsn = lsn; | ||
} else { | ||
@@ -134,2 +156,3 @@ console.log('Unknown message', msg.chunk[0]); | ||
self.on('acknowledge', onAcknowledge); | ||
startStandbyTimeoutCheck(); | ||
}); | ||
@@ -143,6 +166,40 @@ }); | ||
standbyStatusUpdate(client, parseInt(lsn[0], 16), parseInt(lsn[1], 16), 'acknowledge'); | ||
updateLastStatus(); | ||
} | ||
function startStandbyTimeoutCheck() { | ||
if (standbyMessageTimeout <= 0) { | ||
return; | ||
} | ||
feedbackCheckInterval = setInterval(function () { | ||
if ((Date.now() - lastStatus) > standbyMessageTimeout * 1000) { | ||
sendFeedback(); | ||
} | ||
}, 1000); | ||
} | ||
function stopStandbyTimeoutCheck() { | ||
clearInterval(feedbackCheckInterval); | ||
} | ||
function updateLastStatus() { | ||
lastStatus = Date.now(); | ||
} | ||
function sendFeedback() { | ||
if (!lastLsn) { | ||
return; | ||
} | ||
var lsn = lastLsn.split('/'); | ||
if (!stoped && client) { | ||
standbyStatusUpdate(client, parseInt(lsn[0], 16), parseInt(lsn[1], 16), 'feedback'); | ||
updateLastStatus(); | ||
} | ||
} | ||
this.stop = function() { | ||
stoped = true; | ||
stopStandbyTimeoutCheck(); | ||
if (client) { | ||
@@ -149,0 +206,0 @@ client.removeAllListeners(); |
{ | ||
"name": "pg-logical-replication", | ||
"version": "1.0.4", | ||
"version": "1.1.0", | ||
"description": "PostgreSQL Location Replication client - logical WAL replication streaming", | ||
@@ -37,2 +37,6 @@ "keywords": [ | ||
"url": "https://github.com/jaryl" | ||
}, | ||
{ | ||
"name": "Caleb", | ||
"url": "https://github.com/c4l3b" | ||
} | ||
@@ -39,0 +43,0 @@ ], |
@@ -32,4 +32,8 @@ # pg-logical-replication | ||
- ```option``` can contain any of the following optional properties | ||
- ```standbyMessageTimeout``` : maximum seconds between keepalive messages (default: 10) | ||
- ```includeXids``` : bool (default: false) | ||
- ```includeTimestamp``` : bool (default: false) | ||
- ```queryOptions``` : object containing decoder specific options (optional) | ||
- ```'include-types': false``` | ||
- ```'filter-tables': 'foo.bar'``` | ||
@@ -36,0 +40,0 @@ ### 3-2. Method - Stream.stop |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
55182
1543
130