New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@google-cloud/pubsub

Package Overview
Dependencies
Maintainers
16
Versions
171
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@google-cloud/pubsub - npm Package Compare versions

Comparing version 0.16.5 to 0.17.0

12

package.json
{
"name": "@google-cloud/pubsub",
"description": "Cloud Pub/Sub Client Library for Node.js",
"version": "0.16.5",
"version": "0.17.0",
"license": "Apache-2.0",

@@ -35,2 +35,5 @@ "author": "Google Inc.",

"Ali Ijaz Sheikh <ofrobots@google.com>",
"Antono Vasiljev <antono.vasiljev@gmail.com>",
"Chet Husk <baronfel@users.noreply.github.com>",
"Christoph Tavan <dev@tavan.de>",
"Dave Gramlich <callmehiphop@gmail.com>",

@@ -67,5 +70,7 @@ "David Fox <djfox14@gmail.com>",

"async-each": "^1.0.1",
"delay": "^2.0.0",
"duplexify": "^3.5.4",
"extend": "^3.0.1",
"google-auto-auth": "^0.9.0",
"google-gax": "^0.15.0",
"google-gax": "^0.16.0",
"google-proto-files": "^0.15.0",

@@ -77,6 +82,7 @@ "is": "^3.0.1",

"protobufjs": "^6.8.1",
"through2": "^2.0.3",
"uuid": "^3.1.0"
},
"devDependencies": {
"@google-cloud/nodejs-repo-tools": "^2.1.3",
"@google-cloud/nodejs-repo-tools": "^2.2.3",
"async": "^2.6.0",

@@ -83,0 +89,0 @@ "codecov": "^3.0.0",

@@ -130,2 +130,2 @@ <img src="https://avatars2.githubusercontent.com/u/2810941?v=3&s=96" alt="Google Cloud Platform logo" title="Google Cloud Platform" align="right" height="96" width="96"/>

[product-docs]: https://cloud.google.com/pubsub/docs
[shell_img]: http://gstatic.com/cloudssh/images/open-btn.png
[shell_img]: //gstatic.com/cloudssh/images/open-btn.png

@@ -19,7 +19,8 @@ /*!

var arrify = require('arrify');
var common = require('@google-cloud/common');
var duplexify = require('duplexify');
var each = require('async-each');
var events = require('events');
var is = require('is');
var through = require('through2');
var util = require('util');

@@ -146,2 +147,3 @@ var uuid = require('uuid');

ConnectionPool.prototype.close = function(callback) {
var self = this;
var connections = Array.from(this.connections.values());

@@ -151,6 +153,2 @@

if (this.client) {
this.client.close();
}
this.connections.clear();

@@ -173,5 +171,15 @@ this.queue.forEach(clearTimeout);

function(connection, onEndCallback) {
connection.end(onEndCallback);
connection.end(function(err) {
connection.cancel();
onEndCallback(err);
});
},
callback
function(err) {
if (self.client) {
self.client.close();
self.client = null;
}
callback(err);
}
);

@@ -195,6 +203,19 @@ };

var requestStream = client.streamingPull();
var readStream = requestStream.pipe(
through.obj(function(chunk, enc, next) {
chunk.receivedMessages.forEach(function(message) {
readStream.push(message);
});
next();
})
);
var connection = duplexify(requestStream, readStream, {objectMode: true});
var id = uuid.v4();
var connection = client.streamingPull();
var errorImmediateHandle;
var lastStatus;
connection.cancel = requestStream.cancel.bind(requestStream);
if (self.isPaused) {

@@ -208,6 +229,7 @@ connection.pause();

requestStream.on('status', onConnectionStatus);
connection
.on('error', onConnectionError)
.on('data', onConnectionData)
.on('status', onConnectionStatus)
.write({

@@ -226,3 +248,3 @@ subscription: common.util.replaceProjectIdToken(

connection.cancel();
requestStream.cancel();
}

@@ -248,13 +270,13 @@

function onConnectionError(err) {
errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err);
if (!lastStatus || err.code !== lastStatus.code) {
self.emit('error', err);
}
}
function onConnectionData(data) {
arrify(data.receivedMessages).forEach(function(message) {
self.emit('message', self.createMessage(id, message));
});
function onConnectionData(message) {
self.emit('message', self.createMessage(id, message));
}
function onConnectionStatus(status) {
clearImmediate(errorImmediateHandle);
lastStatus = status;

@@ -261,0 +283,0 @@ connection.end();

@@ -19,4 +19,3 @@ /*!

var MIN_VALUE = 10000;
var MAX_VALUE = 600000;
var extend = require('extend');

@@ -31,3 +30,11 @@ /*!

*/
function Histogram() {
function Histogram(options) {
this.options = extend(
{
min: 10000,
max: 600000,
},
options
);
this.data = new Map();

@@ -44,4 +51,4 @@ this.length = 0;

Histogram.prototype.add = function(value) {
value = Math.max(value, MIN_VALUE);
value = Math.min(value, MAX_VALUE);
value = Math.max(value, this.options.min);
value = Math.min(value, this.options.max);
value = Math.ceil(value / 1000) * 1000;

@@ -81,5 +88,5 @@

return MIN_VALUE;
return this.options.min;
};
module.exports = Histogram;

@@ -40,3 +40,3 @@ /*!

* @param {number} [options.batching.maxMilliseconds] The maximum duration to
* wait before sending a payload.
* wait before sending a payload. Defaults to 100 milliseconds.
*

@@ -61,3 +61,3 @@ * @example

maxMessages: 1000,
maxMilliseconds: 1000,
maxMilliseconds: 100,
},

@@ -64,0 +64,0 @@ },

@@ -22,2 +22,3 @@ /*!

var common = require('@google-cloud/common');
var delay = require('delay');
var events = require('events');

@@ -156,2 +157,3 @@ var extend = require('extend');

this.histogram = new Histogram();
this.latency_ = new Histogram({min: 0});

@@ -286,7 +288,8 @@ this.name = Subscription.formatName_(pubsub.projectId, name);

Subscription.prototype.ack_ = function(message) {
this.breakLease_(message);
var breakLease = this.breakLease_.bind(this, message);
this.histogram.add(Date.now() - message.received);
if (this.isConnected_()) {
this.acknowledge_(message.ackId, message.connectionId);
this.acknowledge_(message.ackId, message.connectionId).then(breakLease);
return;

@@ -296,3 +299,3 @@ }

this.inventory_.ack.push(message.ackId);
this.setFlushTimeout_();
this.setFlushTimeout_().then(breakLease);
};

@@ -317,27 +320,13 @@

) {
if (!self.isConnected_()) {
return common.util.promisify(self.request).call(self, {
client: 'SubscriberClient',
method: 'acknowledge',
reqOpts: {
subscription: self.name,
ackIds: ackIdChunk,
},
});
if (self.isConnected_()) {
return self.writeTo_(connId, {ackIds: ackIdChunk});
}
return new Promise(function(resolve, reject) {
self.connectionPool.acquire(connId, function(err, connection) {
if (err) {
reject(err);
return;
}
connection.write(
{
ackIds: ackIdChunk,
},
resolve
);
});
return common.util.promisify(self.request).call(self, {
client: 'SubscriberClient',
method: 'acknowledge',
reqOpts: {
subscription: self.name,
ackIds: ackIdChunk,
},
});

@@ -642,4 +631,6 @@ });

clearTimeout(this.flushTimeoutHandle_);
this.flushTimeoutHandle_ = null;
if (this.flushTimeoutHandle_) {
this.flushTimeoutHandle_.cancel();
this.flushTimeoutHandle_ = null;
}

@@ -907,29 +898,17 @@ var acks = this.inventory_.ack;

) {
if (!self.isConnected_()) {
return common.util.promisify(self.request).call(self, {
client: 'SubscriberClient',
method: 'modifyAckDeadline',
reqOpts: {
subscription: self.name,
ackDeadlineSeconds: deadline,
ackIds: ackIdChunk,
},
if (self.isConnected_()) {
return self.writeTo_(connId, {
modifyDeadlineAckIds: ackIdChunk,
modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline),
});
}
return new Promise(function(resolve, reject) {
self.connectionPool.acquire(connId, function(err, connection) {
if (err) {
reject(err);
return;
}
connection.write(
{
modifyDeadlineAckIds: ackIdChunk,
modifyDeadlineSeconds: Array(ackIdChunk.length).fill(deadline),
},
resolve
);
});
return common.util.promisify(self.request).call(self, {
client: 'SubscriberClient',
method: 'modifyAckDeadline',
reqOpts: {
subscription: self.name,
ackDeadlineSeconds: deadline,
ackIds: ackIdChunk,
},
});

@@ -1023,6 +1002,8 @@ });

Subscription.prototype.nack_ = function(message) {
this.breakLease_(message);
var breakLease = this.breakLease_.bind(this, message);
if (this.isConnected_()) {
this.modifyAckDeadline_(message.ackId, 0, message.connectionId);
this.modifyAckDeadline_(message.ackId, 0, message.connectionId).then(
breakLease
);
return;

@@ -1032,3 +1013,3 @@ }

this.inventory_.nack.push(message.ackId);
this.setFlushTimeout_();
this.setFlushTimeout_().then(breakLease);
};

@@ -1052,12 +1033,7 @@

pool.on('message', function(message) {
if (!self.hasMaxMessages_()) {
self.emit('message', self.leaseMessage_(message));
return;
}
self.emit('message', self.leaseMessage_(message));
if (!pool.isPaused) {
if (!pool.isPaused && self.hasMaxMessages_()) {
pool.pause();
}
message.nack();
});

@@ -1168,7 +1144,13 @@

Subscription.prototype.setFlushTimeout_ = function() {
if (this.flushTimeoutHandle_) {
return;
if (!this.flushTimeoutHandle_) {
var timeout = delay(1000);
var promise = timeout
.then(this.flushQueues_.bind(this))
.catch(common.util.noop);
promise.cancel = timeout.cancel.bind(timeout);
this.flushTimeoutHandle_ = promise;
}
this.flushTimeoutHandle_ = setTimeout(this.flushQueues_.bind(this), 1000);
return this.flushTimeoutHandle_;
};

@@ -1187,3 +1169,5 @@

var timeout = Math.random() * this.ackDeadline * 0.9;
var latency = this.latency_.percentile(99);
var timeout = Math.random() * this.ackDeadline * 0.9 - latency;
this.leaseTimeoutHandle_ = setTimeout(this.renewLeases_.bind(this), timeout);

@@ -1273,2 +1257,36 @@ };

/**
* Writes to specified duplex stream. This is useful for capturing write
* latencies that can later be used to adjust the auto lease timeout.
*
* @private
*
* @param {string} connId The ID of the connection to write to.
* @param {object} data The data to be written to the stream.
* @returns {Promise}
*/
Subscription.prototype.writeTo_ = function(connId, data) {
var self = this;
var startTime = Date.now();
return new Promise(function(resolve, reject) {
self.connectionPool.acquire(connId, function(err, connection) {
if (err) {
reject(err);
return;
}
connection.write(data, function(err) {
if (err) {
reject(err);
return;
}
self.latency_.add(Date.now() - startTime);
resolve();
});
});
});
};
/*! Developer Documentation

@@ -1275,0 +1293,0 @@ *

@@ -89,3 +89,3 @@ {

"StreamingPull": {
"timeout_millis": 60000,
"timeout_millis": 900000,
"retry_codes_name": "pull",

@@ -92,0 +92,0 @@ "retry_params_name": "streaming_messaging"

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc