@google-cloud/pubsub
Advanced tools
Comparing version 0.16.5 to 0.17.0
{ | ||
"name": "@google-cloud/pubsub", | ||
"description": "Cloud Pub/Sub Client Library for Node.js", | ||
"version": "0.16.5", | ||
"version": "0.17.0", | ||
"license": "Apache-2.0", | ||
@@ -35,2 +35,5 @@ "author": "Google Inc.", | ||
"Ali Ijaz Sheikh <ofrobots@google.com>", | ||
"Antono Vasiljev <antono.vasiljev@gmail.com>", | ||
"Chet Husk <baronfel@users.noreply.github.com>", | ||
"Christoph Tavan <dev@tavan.de>", | ||
"Dave Gramlich <callmehiphop@gmail.com>", | ||
@@ -67,5 +70,7 @@ "David Fox <djfox14@gmail.com>", | ||
"async-each": "^1.0.1", | ||
"delay": "^2.0.0", | ||
"duplexify": "^3.5.4", | ||
"extend": "^3.0.1", | ||
"google-auto-auth": "^0.9.0", | ||
"google-gax": "^0.15.0", | ||
"google-gax": "^0.16.0", | ||
"google-proto-files": "^0.15.0", | ||
@@ -77,6 +82,7 @@ "is": "^3.0.1", | ||
"protobufjs": "^6.8.1", | ||
"through2": "^2.0.3", | ||
"uuid": "^3.1.0" | ||
}, | ||
"devDependencies": { | ||
"@google-cloud/nodejs-repo-tools": "^2.1.3", | ||
"@google-cloud/nodejs-repo-tools": "^2.2.3", | ||
"async": "^2.6.0", | ||
@@ -83,0 +89,0 @@ "codecov": "^3.0.0", |
@@ -130,2 +130,2 @@ <img src="https://avatars2.githubusercontent.com/u/2810941?v=3&s=96" alt="Google Cloud Platform logo" title="Google Cloud Platform" align="right" height="96" width="96"/> | ||
[product-docs]: https://cloud.google.com/pubsub/docs | ||
[shell_img]: http://gstatic.com/cloudssh/images/open-btn.png | ||
[shell_img]: //gstatic.com/cloudssh/images/open-btn.png |
@@ -19,7 +19,8 @@ /*! | ||
var arrify = require('arrify'); | ||
var common = require('@google-cloud/common'); | ||
var duplexify = require('duplexify'); | ||
var each = require('async-each'); | ||
var events = require('events'); | ||
var is = require('is'); | ||
var through = require('through2'); | ||
var util = require('util'); | ||
@@ -146,2 +147,3 @@ var uuid = require('uuid'); | ||
ConnectionPool.prototype.close = function(callback) { | ||
var self = this; | ||
var connections = Array.from(this.connections.values()); | ||
@@ -151,6 +153,2 @@ | ||
if (this.client) { | ||
this.client.close(); | ||
} | ||
this.connections.clear(); | ||
@@ -173,5 +171,15 @@ this.queue.forEach(clearTimeout); | ||
function(connection, onEndCallback) { | ||
connection.end(onEndCallback); | ||
connection.end(function(err) { | ||
connection.cancel(); | ||
onEndCallback(err); | ||
}); | ||
}, | ||
callback | ||
function(err) { | ||
if (self.client) { | ||
self.client.close(); | ||
self.client = null; | ||
} | ||
callback(err); | ||
} | ||
); | ||
@@ -195,6 +203,19 @@ }; | ||
var requestStream = client.streamingPull(); | ||
var readStream = requestStream.pipe( | ||
through.obj(function(chunk, enc, next) { | ||
chunk.receivedMessages.forEach(function(message) { | ||
readStream.push(message); | ||
}); | ||
next(); | ||
}) | ||
); | ||
var connection = duplexify(requestStream, readStream, {objectMode: true}); | ||
var id = uuid.v4(); | ||
var connection = client.streamingPull(); | ||
var errorImmediateHandle; | ||
var lastStatus; | ||
connection.cancel = requestStream.cancel.bind(requestStream); | ||
if (self.isPaused) { | ||
@@ -208,6 +229,7 @@ connection.pause(); | ||
requestStream.on('status', onConnectionStatus); | ||
connection | ||
.on('error', onConnectionError) | ||
.on('data', onConnectionData) | ||
.on('status', onConnectionStatus) | ||
.write({ | ||
@@ -226,3 +248,3 @@ subscription: common.util.replaceProjectIdToken( | ||
connection.cancel(); | ||
requestStream.cancel(); | ||
} | ||
@@ -248,13 +270,13 @@ | ||
function onConnectionError(err) { | ||
errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err); | ||
if (!lastStatus || err.code !== lastStatus.code) { | ||
self.emit('error', err); | ||
} | ||
} | ||
function onConnectionData(data) { | ||
arrify(data.receivedMessages).forEach(function(message) { | ||
self.emit('message', self.createMessage(id, message)); | ||
}); | ||
function onConnectionData(message) { | ||
self.emit('message', self.createMessage(id, message)); | ||
} | ||
function onConnectionStatus(status) { | ||
clearImmediate(errorImmediateHandle); | ||
lastStatus = status; | ||
@@ -261,0 +283,0 @@ connection.end(); |
@@ -19,4 +19,3 @@ /*! | ||
var MIN_VALUE = 10000; | ||
var MAX_VALUE = 600000; | ||
var extend = require('extend'); | ||
@@ -31,3 +30,11 @@ /*! | ||
*/ | ||
function Histogram() { | ||
function Histogram(options) { | ||
this.options = extend( | ||
{ | ||
min: 10000, | ||
max: 600000, | ||
}, | ||
options | ||
); | ||
this.data = new Map(); | ||
@@ -44,4 +51,4 @@ this.length = 0; | ||
Histogram.prototype.add = function(value) { | ||
value = Math.max(value, MIN_VALUE); | ||
value = Math.min(value, MAX_VALUE); | ||
value = Math.max(value, this.options.min); | ||
value = Math.min(value, this.options.max); | ||
value = Math.ceil(value / 1000) * 1000; | ||
@@ -81,5 +88,5 @@ | ||
return MIN_VALUE; | ||
return this.options.min; | ||
}; | ||
module.exports = Histogram; |
@@ -40,3 +40,3 @@ /*! | ||
* @param {number} [options.batching.maxMilliseconds] The maximum duration to | ||
* wait before sending a payload. | ||
* wait before sending a payload. Defaults to 100 milliseconds. | ||
* | ||
@@ -61,3 +61,3 @@ * @example | ||
maxMessages: 1000, | ||
maxMilliseconds: 1000, | ||
maxMilliseconds: 100, | ||
}, | ||
@@ -64,0 +64,0 @@ }, |
@@ -22,2 +22,3 @@ /*! | ||
var common = require('@google-cloud/common'); | ||
var delay = require('delay'); | ||
var events = require('events'); | ||
@@ -156,2 +157,3 @@ var extend = require('extend'); | ||
this.histogram = new Histogram(); | ||
this.latency_ = new Histogram({min: 0}); | ||
@@ -286,7 +288,8 @@ this.name = Subscription.formatName_(pubsub.projectId, name); | ||
Subscription.prototype.ack_ = function(message) { | ||
this.breakLease_(message); | ||
var breakLease = this.breakLease_.bind(this, message); | ||
this.histogram.add(Date.now() - message.received); | ||
if (this.isConnected_()) { | ||
this.acknowledge_(message.ackId, message.connectionId); | ||
this.acknowledge_(message.ackId, message.connectionId).then(breakLease); | ||
return; | ||
@@ -296,3 +299,3 @@ } | ||
this.inventory_.ack.push(message.ackId); | ||
this.setFlushTimeout_(); | ||
this.setFlushTimeout_().then(breakLease); | ||
}; | ||
@@ -317,27 +320,13 @@ | ||
) { | ||
if (!self.isConnected_()) { | ||
return common.util.promisify(self.request).call(self, { | ||
client: 'SubscriberClient', | ||
method: 'acknowledge', | ||
reqOpts: { | ||
subscription: self.name, | ||
ackIds: ackIdChunk, | ||
}, | ||
}); | ||
if (self.isConnected_()) { | ||
return self.writeTo_(connId, {ackIds: ackIdChunk}); | ||
} | ||
return new Promise(function(resolve, reject) { | ||
self.connectionPool.acquire(connId, function(err, connection) { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
connection.write( | ||
{ | ||
ackIds: ackIdChunk, | ||
}, | ||
resolve | ||
); | ||
}); | ||
return common.util.promisify(self.request).call(self, { | ||
client: 'SubscriberClient', | ||
method: 'acknowledge', | ||
reqOpts: { | ||
subscription: self.name, | ||
ackIds: ackIdChunk, | ||
}, | ||
}); | ||
@@ -642,4 +631,6 @@ }); | ||
clearTimeout(this.flushTimeoutHandle_); | ||
this.flushTimeoutHandle_ = null; | ||
if (this.flushTimeoutHandle_) { | ||
this.flushTimeoutHandle_.cancel(); | ||
this.flushTimeoutHandle_ = null; | ||
} | ||
@@ -907,29 +898,17 @@ var acks = this.inventory_.ack; | ||
) { | ||
if (!self.isConnected_()) { | ||
return common.util.promisify(self.request).call(self, { | ||
client: 'SubscriberClient', | ||
method: 'modifyAckDeadline', | ||
reqOpts: { | ||
subscription: self.name, | ||
ackDeadlineSeconds: deadline, | ||
ackIds: ackIdChunk, | ||
}, | ||
if (self.isConnected_()) { | ||
return self.writeTo_(connId, { | ||
modifyDeadlineAckIds: ackIdChunk, | ||
modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline), | ||
}); | ||
} | ||
return new Promise(function(resolve, reject) { | ||
self.connectionPool.acquire(connId, function(err, connection) { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
connection.write( | ||
{ | ||
modifyDeadlineAckIds: ackIdChunk, | ||
modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline), | ||
}, | ||
resolve | ||
); | ||
}); | ||
return common.util.promisify(self.request).call(self, { | ||
client: 'SubscriberClient', | ||
method: 'modifyAckDeadline', | ||
reqOpts: { | ||
subscription: self.name, | ||
ackDeadlineSeconds: deadline, | ||
ackIds: ackIdChunk, | ||
}, | ||
}); | ||
@@ -1023,6 +1002,8 @@ }); | ||
Subscription.prototype.nack_ = function(message) { | ||
this.breakLease_(message); | ||
var breakLease = this.breakLease_.bind(this, message); | ||
if (this.isConnected_()) { | ||
this.modifyAckDeadline_(message.ackId, 0, message.connectionId); | ||
this.modifyAckDeadline_(message.ackId, 0, message.connectionId).then( | ||
breakLease | ||
); | ||
return; | ||
@@ -1032,3 +1013,3 @@ } | ||
this.inventory_.nack.push(message.ackId); | ||
this.setFlushTimeout_(); | ||
this.setFlushTimeout_().then(breakLease); | ||
}; | ||
@@ -1052,12 +1033,7 @@ | ||
pool.on('message', function(message) { | ||
if (!self.hasMaxMessages_()) { | ||
self.emit('message', self.leaseMessage_(message)); | ||
return; | ||
} | ||
self.emit('message', self.leaseMessage_(message)); | ||
if (!pool.isPaused) { | ||
if (!pool.isPaused && self.hasMaxMessages_()) { | ||
pool.pause(); | ||
} | ||
message.nack(); | ||
}); | ||
@@ -1168,7 +1144,13 @@ | ||
Subscription.prototype.setFlushTimeout_ = function() { | ||
if (this.flushTimeoutHandle_) { | ||
return; | ||
if (!this.flushTimeoutHandle_) { | ||
var timeout = delay(1000); | ||
var promise = timeout | ||
.then(this.flushQueues_.bind(this)) | ||
.catch(common.util.noop); | ||
promise.cancel = timeout.cancel.bind(timeout); | ||
this.flushTimeoutHandle_ = promise; | ||
} | ||
this.flushTimeoutHandle_ = setTimeout(this.flushQueues_.bind(this), 1000); | ||
return this.flushTimeoutHandle_; | ||
}; | ||
@@ -1187,3 +1169,5 @@ | ||
var timeout = Math.random() * this.ackDeadline * 0.9; | ||
var latency = this.latency_.percentile(99); | ||
var timeout = Math.random() * this.ackDeadline * 0.9 - latency; | ||
this.leaseTimeoutHandle_ = setTimeout(this.renewLeases_.bind(this), timeout); | ||
@@ -1273,2 +1257,36 @@ }; | ||
/** | ||
* Writes to specified duplex stream. This is useful for capturing write | ||
* latencies that can later be used to adjust the auto lease timeout. | ||
* | ||
* @private | ||
* | ||
* @param {string} connId The ID of the connection to write to. | ||
* @param {object} data The data to be written to the stream. | ||
* @returns {Promise} | ||
*/ | ||
Subscription.prototype.writeTo_ = function(connId, data) { | ||
var self = this; | ||
var startTime = Date.now(); | ||
return new Promise(function(resolve, reject) { | ||
self.connectionPool.acquire(connId, function(err, connection) { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
connection.write(data, function(err) { | ||
if (err) { | ||
reject(err); | ||
return; | ||
} | ||
self.latency_.add(Date.now() - startTime); | ||
resolve(); | ||
}); | ||
}); | ||
}); | ||
}; | ||
/*! Developer Documentation | ||
@@ -1275,0 +1293,0 @@ * |
@@ -89,3 +89,3 @@ { | ||
"StreamingPull": { | ||
"timeout_millis": 60000, | ||
"timeout_millis": 900000, | ||
"retry_codes_name": "pull", | ||
@@ -92,0 +92,0 @@ "retry_params_name": "streaming_messaging" |
Sorry, the diff of this file is not supported yet
556848
6819
16
+ Addeddelay@^2.0.0
+ Addedduplexify@^3.5.4
+ Addedthrough2@^2.0.3
+ Added@mapbox/node-pre-gyp@1.0.11(transitive)
+ Added@types/bytebuffer@5.0.49(transitive)
+ Added@types/long@3.0.32(transitive)
+ Added@types/node@22.10.7(transitive)
+ Addedabbrev@1.1.1(transitive)
+ Addedagent-base@6.0.2(transitive)
+ Addedansi-regex@5.0.1(transitive)
+ Addedaproba@2.0.0(transitive)
+ Addedare-we-there-yet@2.0.0(transitive)
+ Addedchownr@2.0.0(transitive)
+ Addedcolor-support@1.1.3(transitive)
+ Addedconsole-control-strings@1.1.0(transitive)
+ Addeddebug@4.4.0(transitive)
+ Addeddelay@2.0.0(transitive)
+ Addeddelegates@1.0.0(transitive)
+ Addeddetect-libc@2.0.3(transitive)
+ Addedemoji-regex@8.0.0(transitive)
+ Addedes-object-atoms@1.1.1(transitive)
+ Addedfs-minipass@2.1.0(transitive)
+ Addedgauge@3.0.2(transitive)
+ Addedgoogle-auto-auth@0.10.1(transitive)
+ Addedgoogle-gax@0.16.1(transitive)
+ Addedgrpc@1.24.11(transitive)
+ Addedhas-unicode@2.0.1(transitive)
+ Addedhttps-proxy-agent@5.0.1(transitive)
+ Addedis-fullwidth-code-point@3.0.0(transitive)
+ Addedlodash.camelcase@4.3.0(transitive)
+ Addedlodash.clone@4.5.0(transitive)
+ Addedmake-dir@3.1.0(transitive)
+ Addedminipass@3.3.65.0.0(transitive)
+ Addedminizlib@2.1.2(transitive)
+ Addedmkdirp@1.0.4(transitive)
+ Addedms@2.1.3(transitive)
+ Addednopt@5.0.0(transitive)
+ Addednpmlog@5.0.1(transitive)
+ Addedobject-assign@4.1.1(transitive)
+ Addedp-defer@1.0.0(transitive)
+ Addedreadable-stream@3.6.2(transitive)
+ Addedrimraf@3.0.2(transitive)
+ Addedsemver@6.3.17.6.3(transitive)
+ Addedset-blocking@2.0.0(transitive)
+ Addedsignal-exit@3.0.7(transitive)
+ Addedstring-width@4.2.3(transitive)
+ Addedstring_decoder@1.3.0(transitive)
+ Addedstrip-ansi@6.0.1(transitive)
+ Addedtar@6.2.1(transitive)
+ Addedtraverse@0.6.11(transitive)
+ Addedwide-align@1.1.5(transitive)
+ Addedyallist@4.0.0(transitive)
- Removed@types/node@22.10.5(transitive)
- Removedes-object-atoms@1.0.0(transitive)
- Removedgoogle-gax@0.15.0(transitive)
- Removedgrpc@1.9.1(transitive)
- Removedtraverse@0.6.10(transitive)
Updatedgoogle-gax@^0.16.0