New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@google-cloud/pubsub

Package Overview
Dependencies
Maintainers
1
Versions
172
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@google-cloud/pubsub - npm Package Compare versions

Comparing version

to
2.5.0

build/src/opentelemetry-tracing.d.ts

23

build/src/publisher/index.d.ts

@@ -18,2 +18,3 @@ /*!

import { CallOptions } from 'google-gax';
import { Span } from '@opentelemetry/api';
import { BatchPublishOptions } from './message-batch';

@@ -24,2 +25,3 @@ import { Queue, OrderedQueue } from './message-queues';

import { google } from '../../protos/protos';
import { OpenTelemetryTracer } from '../opentelemetry-tracing';
export declare type PubsubMessage = google.pubsub.v1.IPubsubMessage;

@@ -34,3 +36,15 @@ export interface Attributes {

messageOrdering?: boolean;
enableOpenTelemetryTracing?: boolean;
}
/**
* @typedef PublishOptions
* @property {BatchPublishOptions} [batching] The maximum number of bytes to
* buffer before sending a payload.
* @property {object} [gaxOpts] Request configuration options, outlined
* {@link https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html|here.}
* @property {boolean} [messageOrdering] If true, messages published with the
* same order key in Message will be delivered to the subscribers in the order in which they
* are received by the Pub/Sub system. Otherwise, they may be delivered in
* any order.
*/
export declare const BATCH_LIMITS: BatchPublishOptions;

@@ -53,2 +67,3 @@ /**

orderedQueues: Map<string, OrderedQueue>;
tracing: OpenTelemetryTracer | undefined;
constructor(topic: Topic, options?: PublishOptions);

@@ -89,2 +104,10 @@ flush(): Promise<void>;

setOptions(options?: PublishOptions): void;
/**
* Constructs an OpenTelemetry span
*
* @private
*
* @param {PubsubMessage} message The message to create a span for
*/
constructSpan(message: PubsubMessage): Span | undefined;
}

58

build/src/publisher/index.js

@@ -23,2 +23,14 @@ "use strict";

const default_options_1 = require("../default-options");
const opentelemetry_tracing_1 = require("../opentelemetry-tracing");
/**
* @typedef PublishOptions
* @property {BatchPublishOptions} [batching] The maximum number of bytes to
* buffer before sending a payload.
* @property {object} [gaxOpts] Request configuration options, outlined
* {@link https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html|here.}
* @property {boolean} [messageOrdering] If true, messages published with the
* same order key in Message will be delivered to the subscribers in the order in which they
* are received by the Pub/Sub system. Otherwise, they may be delivered in
* any order.
*/
exports.BATCH_LIMITS = {

@@ -45,2 +57,6 @@ maxBytes: Math.pow(1024, 2) * 9,

this.orderedQueues = new Map();
this.tracing =
this.settings && this.settings.enableOpenTelemetryTracing
? new opentelemetry_tracing_1.OpenTelemetryTracer()
: undefined;
}

@@ -110,4 +126,8 @@ /**

}
const span = this.constructSpan(message);
if (!message.orderingKey) {
this.queue.add(message, callback);
if (span) {
span.end();
}
return;

@@ -123,2 +143,5 @@ }

queue.add(message, callback);
if (span) {
span.end();
}
}

@@ -157,4 +180,8 @@ /**

},
enableOpenTelemetryTracing: false,
};
const { batching, gaxOpts, messageOrdering } = extend(true, defaults, options);
const { batching, gaxOpts, messageOrdering, enableOpenTelemetryTracing, } = extend(true, defaults, options);
this.tracing = enableOpenTelemetryTracing
? new opentelemetry_tracing_1.OpenTelemetryTracer()
: undefined;
this.settings = {

@@ -168,4 +195,31 @@ batching: {

messageOrdering,
enableOpenTelemetryTracing,
};
}
/**
* Constructs an OpenTelemetry span
*
* @private
*
* @param {PubsubMessage} message The message to create a span for
*/
constructSpan(message) {
const spanAttributes = {
data: message.data,
};
const span = this.tracing
? this.tracing.createSpan(`${this.topic.name} publisher`, spanAttributes)
: undefined;
if (span) {
if (message.attributes &&
message.attributes['googclient_OpenTelemetrySpanContext']) {
console.warn('googclient_OpenTelemetrySpanContext key set as message attribute, but will be overridden.');
}
if (!message.attributes) {
message.attributes = {};
}
message.attributes['googclient_OpenTelemetrySpanContext'] = JSON.stringify(span.context());
}
return span;
}
}

@@ -175,4 +229,4 @@ exports.Publisher = Publisher;

singular: true,
exclude: ['publish', 'setOptions'],
exclude: ['publish', 'setOptions', 'constructSpan'],
});
//# sourceMappingURL=index.js.map

6

build/src/publisher/message-batch.d.ts

@@ -24,7 +24,7 @@ /*!

* @typedef BatchPublishOptions
* @property {number} [maxBytes=1024^2 * 5] The maximum number of bytes to
* @property {number} [maxBytes=1 * 1024 * 1024] The maximum number of bytes to
* buffer before sending a payload.
* @property {number} [maxMessages=1000] The maximum number of messages to
* @property {number} [maxMessages=100] The maximum number of messages to
* buffer before sending a payload.
* @property {number} [maxMilliseconds=100] The maximum duration to wait before
* @property {number} [maxMilliseconds=10] The maximum duration to wait before
* sending a payload.

@@ -31,0 +31,0 @@ */

@@ -22,7 +22,7 @@ "use strict";

* @typedef BatchPublishOptions
* @property {number} [maxBytes=1024^2 * 5] The maximum number of bytes to
* @property {number} [maxBytes=1 * 1024 * 1024] The maximum number of bytes to
* buffer before sending a payload.
* @property {number} [maxMessages=1000] The maximum number of messages to
* @property {number} [maxMessages=100] The maximum number of messages to
* buffer before sending a payload.
* @property {number} [maxMilliseconds=100] The maximum duration to wait before
* @property {number} [maxMilliseconds=10] The maximum duration to wait before
* sending a payload.

@@ -29,0 +29,0 @@ */

@@ -100,5 +100,7 @@ /*!

* @property {string} [apiEndpoint] The `apiEndpoint` from options will set the
* host. If not set, the `PUBSUB_EMULATOR_HOST` environment variable from
* the gcloud SDK is honored, otherwise the actual API endpoint will be
* used.
* host. If not set, the `PUBSUB_EMULATOR_HOST` environment variable from the
* gcloud SDK is honored. We also check the `CLOUD_API_ENDPOINT_OVERRIDES_PUBSUB`
* environment variable used by `gcloud alpha pubsub`. Otherwise the actual API
* endpoint will be used. Note that if the URL doesn't end in '.googleapis.com',
* we will assume that it's an emulator and disable strict SSL checks.
* @property {string} [email] Account email address. Required when using a .pem

@@ -171,6 +173,11 @@ * or .p12 keyFilename.

* Determine the appropriate endpoint to use for API requests, first trying
* the local `apiEndpoint` parameter. If the `apiEndpoint` parameter is null
* we try Pub/Sub emulator environment variable (PUBSUB_EMULATOR_HOST),
* otherwise the default JSON API.
* the `apiEndpoint` parameter. If that isn't set, we try the Pub/Sub emulator
* environment variable (PUBSUB_EMULATOR_HOST). If that is also null, we try
* the standard `gcloud alpha pubsub` environment variable
* (CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB). Otherwise the default production
* API is used.
*
* Note that if the URL doesn't end in '.googleapis.com', we will assume that
* it's an emulator and disable strict SSL checks.
*
* @private

@@ -291,2 +298,3 @@ */

* @param {string} name The name of the topic.
* @param {PublishOptions} [options] Publisher configuration object.
* @returns {Topic} A {@link Topic} instance.

@@ -293,0 +301,0 @@ *

@@ -52,5 +52,7 @@ "use strict";

* @property {string} [apiEndpoint] The `apiEndpoint` from options will set the
* host. If not set, the `PUBSUB_EMULATOR_HOST` environment variable from
* the gcloud SDK is honored, otherwise the actual API endpoint will be
* used.
* host. If not set, the `PUBSUB_EMULATOR_HOST` environment variable from the
* gcloud SDK is honored. We also check the `CLOUD_API_ENDPOINT_OVERRIDES_PUBSUB`
* environment variable used by `gcloud alpha pubsub`. Otherwise the actual API
* endpoint will be used. Note that if the URL doesn't end in '.googleapis.com',
* we will assume that it's an emulator and disable strict SSL checks.
* @property {string} [email] Account email address. Required when using a .pem

@@ -383,18 +385,29 @@ * or .p12 keyFilename.

* Determine the appropriate endpoint to use for API requests, first trying
* the local `apiEndpoint` parameter. If the `apiEndpoint` parameter is null
* we try Pub/Sub emulator environment variable (PUBSUB_EMULATOR_HOST),
* otherwise the default JSON API.
* the `apiEndpoint` parameter. If that isn't set, we try the Pub/Sub emulator
* environment variable (PUBSUB_EMULATOR_HOST). If that is also null, we try
* the standard `gcloud alpha pubsub` environment variable
* (CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB). Otherwise the default production
* API is used.
*
* Note that if the URL doesn't end in '.googleapis.com', we will assume that
* it's an emulator and disable strict SSL checks.
*
* @private
*/
determineBaseUrl_() {
const apiEndpoint = this.options.apiEndpoint;
if (!apiEndpoint && !process.env.PUBSUB_EMULATOR_HOST) {
// We allow an override from the client object options, or from
// one of these variables. The CLOUDSDK variable is provided for
// compatibility with the `gcloud alpha` utility.
const gcloudVarName = 'CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB';
const emulatorVarName = 'PUBSUB_EMULATOR_HOST';
const apiEndpoint = this.options.apiEndpoint ||
process.env[emulatorVarName] ||
process.env[gcloudVarName];
if (!apiEndpoint) {
return;
}
const grpcInstance = this.options.grpc || gax.grpc;
const baseUrl = apiEndpoint || process.env.PUBSUB_EMULATOR_HOST;
const leadingProtocol = new RegExp('^https*://');
// Parse the URL into a hostname and port, if possible.
const leadingProtocol = new RegExp('^https?://');
const trailingSlashes = new RegExp('/*$');
const baseUrlParts = baseUrl
const baseUrlParts = apiEndpoint
.replace(leadingProtocol, '')

@@ -404,5 +417,26 @@ .replace(trailingSlashes, '')

this.options.servicePath = baseUrlParts[0];
this.options.port = baseUrlParts[1];
this.options.sslCreds = grpcInstance.credentials.createInsecure();
this.isEmulator = true;
if (!baseUrlParts[1]) {
// No port was given -- figure it out from the protocol.
if (apiEndpoint.startsWith('https')) {
this.options.port = 443;
}
else if (apiEndpoint.startsWith('http')) {
this.options.port = 80;
}
else {
this.options.port = undefined;
}
}
else {
this.options.port = parseInt(baseUrlParts[1], 10);
}
// If this looks like a GCP URL of some kind, don't go into emulator
// mode. Otherwise, supply a fake SSL provider so a real cert isn't
// required for running the emulator.
const officialUrlMatch = this.options.servicePath.endsWith('.googleapis.com');
if (!officialUrlMatch) {
const grpcInstance = this.options.grpc || gax.grpc;
this.options.sslCreds = grpcInstance.credentials.createInsecure();
this.isEmulator = true;
}
if (!this.options.projectId && process.env.PUBSUB_PROJECT_ID) {

@@ -828,2 +862,3 @@ this.options.projectId = process.env.PUBSUB_PROJECT_ID;

* @param {string} name The name of the topic.
* @param {PublishOptions} [options] Publisher configuration object.
* @returns {Topic} A {@link Topic} instance.

@@ -830,0 +865,0 @@ *

@@ -109,2 +109,3 @@ /*!

streamingOptions?: MessageStreamOptions;
enableOpenTelemetryTracing?: boolean;
}

@@ -144,2 +145,3 @@ /**

private _subscription;
private _tracing;
constructor(subscription: Subscription, options?: {});

@@ -214,2 +216,9 @@ /**

/**
* Constructs an OpenTelemetry span from the incoming message.
*
* @param {Message} message One of the received messages
* @private
*/
private _constructSpan;
/**
* Callback to be invoked when a new message is available.

@@ -216,0 +225,0 @@ *

@@ -28,2 +28,3 @@ "use strict";

const default_options_1 = require("./default-options");
const opentelemetry_tracing_1 = require("./opentelemetry-tracing");
/**

@@ -361,4 +362,35 @@ * Date object with nanosecond precision. Supports all standard Date arguments

}
this._tracing = options.enableOpenTelemetryTracing
? new opentelemetry_tracing_1.OpenTelemetryTracer()
: undefined;
}
/**
* Constructs an OpenTelemetry span from the incoming message.
*
* @param {Message} message One of the received messages
* @private
*/
_constructSpan(message) {
// Handle cases where OpenTelemetry is disabled or no span context was sent through message
if (!this._tracing ||
!message.attributes ||
!message.attributes['googclient_OpenTelemetrySpanContext']) {
return undefined;
}
const spanValue = message.attributes['googclient_OpenTelemetrySpanContext'];
const parentSpanContext = spanValue
? JSON.parse(spanValue)
: undefined;
const spanAttributes = {
ackId: message.ackId,
deliveryAttempt: message.deliveryAttempt,
};
// Subscriber spans should always have a publisher span as a parent.
// Return undefined if no parent is provided
const span = parentSpanContext
? this._tracing.createSpan(this._name, spanAttributes, parentSpanContext)
: undefined;
return span;
}
/**
* Callback to be invoked when a new message is available.

@@ -383,2 +415,3 @@ *

const message = new Message(this, data);
const span = this._constructSpan(message);
if (this.isOpen) {

@@ -391,2 +424,5 @@ message.modAck(this.ackDeadline);

}
if (span) {
span.end();
}
}

@@ -393,0 +429,0 @@ }

@@ -150,3 +150,3 @@ "use strict";

this.descriptors.batching = {
Publish: new this._gaxModule.BundleDescriptor('messages', ['topic'], 'message_ids', gax.createByteLengthFunction(
publish: new this._gaxModule.BundleDescriptor('messages', ['topic'], 'message_ids', gax.createByteLengthFunction(
// eslint-disable-next-line @typescript-eslint/no-explicit-any

@@ -174,2 +174,3 @@ protoFilesRoot.lookupType('google.pubsub.v1.PubsubMessage'))),

initialize() {
var _a;
// If the client stub promise is already initialized, return immediately.

@@ -208,5 +209,5 @@ if (this.publisherStub) {

});
const apiCall = this._gaxModule.createApiCall(callPromise, this._defaults[methodName], this.descriptors.page[methodName] ||
this.descriptors.stream[methodName] ||
this.descriptors.longrunning[methodName]);
const descriptor = this.descriptors.page[methodName] || ((_a = this.descriptors.batching) === null || _a === void 0 ? void 0 : _a[methodName]) ||
undefined;
const apiCall = this._gaxModule.createApiCall(callPromise, this._defaults[methodName], descriptor);
this.innerApiCalls[methodName] = apiCall;

@@ -258,5 +259,4 @@ }

/**
* Creates the given topic with the given name. See the
* <a href="https://cloud.google.com/pubsub/docs/admin#resource_names">
* resource name rules</a>.
* Creates the given topic with the given name. See the [resource name rules](
* https://cloud.google.com/pubsub/docs/admin#resource_names).
*

@@ -273,4 +273,4 @@ * @param {Object} request

* @param {number[]} request.labels
* See <a href="https://cloud.google.com/pubsub/docs/labels"> Creating and
* managing labels</a>.
* See [Creating and managing labels]
* (https://cloud.google.com/pubsub/docs/labels).
* @param {google.pubsub.v1.MessageStoragePolicy} request.messageStoragePolicy

@@ -745,7 +745,6 @@ * Policy constraining the set of Google Cloud Platform regions where messages

* Lists the names of the snapshots on this topic. Snapshots are used in
* <a href="https://cloud.google.com/pubsub/docs/replay-overview">Seek</a>
* operations, which allow
* you to manage message acknowledgments in bulk. That is, you can set the
* acknowledgment state of messages in an existing subscription to the state
* captured by a snapshot.
* [Seek](https://cloud.google.com/pubsub/docs/replay-overview) operations,
* which allow you to manage message acknowledgments in bulk. That is, you can
* set the acknowledgment state of messages in an existing subscription to the
* state captured by a snapshot.
*

@@ -752,0 +751,0 @@ * @param {Object} request

@@ -7,2 +7,17 @@ # Changelog

## [2.5.0](https://www.github.com/googleapis/nodejs-pubsub/compare/v2.4.0...v2.5.0) (2020-08-17)
### Features
* Opentelemetry integration ([#1078](https://www.github.com/googleapis/nodejs-pubsub/issues/1078)) ([76db007](https://www.github.com/googleapis/nodejs-pubsub/commit/76db007f270a646e8570768fa827ea2a97b62cbc)), closes [#1066](https://www.github.com/googleapis/nodejs-pubsub/issues/1066) [#1070](https://www.github.com/googleapis/nodejs-pubsub/issues/1070)
### Bug Fixes
* make request batching work again ([#1087](https://www.github.com/googleapis/nodejs-pubsub/issues/1087)) ([80e0ee3](https://www.github.com/googleapis/nodejs-pubsub/commit/80e0ee3a4ef0da325e61ce2b869f4c0f9829b136))
* properly handle non-emulator alternate endpoints for pub/sub ([#1060](https://www.github.com/googleapis/nodejs-pubsub/issues/1060)) ([195ebf6](https://www.github.com/googleapis/nodejs-pubsub/commit/195ebf648e00ba35f567cef06a06c31f3f9c57d9))
* **deps:** update opentelemetry monorepo to ^0.10.0 ([#1090](https://www.github.com/googleapis/nodejs-pubsub/issues/1090)) ([78a45ff](https://www.github.com/googleapis/nodejs-pubsub/commit/78a45ff1cb8fb921e5ca05e435554d684a777185))
* update minimum gax version to 2.7.0 to fix recent protobuf errors ([#1085](https://www.github.com/googleapis/nodejs-pubsub/issues/1085)) ([904348c](https://www.github.com/googleapis/nodejs-pubsub/commit/904348cd6471f267a54635fcd65fe4191896308e))
## [2.4.0](https://www.github.com/googleapis/nodejs-pubsub/compare/v2.3.0...v2.4.0) (2020-08-08)

@@ -9,0 +24,0 @@

{
"name": "@google-cloud/pubsub",
"description": "Cloud Pub/Sub Client Library for Node.js",
"version": "2.4.0",
"version": "2.5.0",
"license": "Apache-2.0",

@@ -33,3 +33,3 @@ "author": "Google Inc.",

"samples-test": "cd samples/ && npm link ../ && npm install && npm test && cd ../",
"test": "c8 mocha build/test",
"test": "c8 mocha build/test --recursive",
"lint": "gts check",

@@ -48,5 +48,5 @@ "predocs": "npm run compile",

"prelint": "cd samples; npm link ../; npm install",
"precompile": "gts clean",
"api-extractor": "api-extractor run --local",
"api-documenter": "api-documenter yaml --input-folder=temp"
"api-documenter": "api-documenter yaml --input-folder=temp",
"precompile": "gts clean"
},

@@ -58,2 +58,4 @@ "dependencies": {

"@google-cloud/promisify": "^2.0.0",
"@opentelemetry/api": "^0.10.0",
"@opentelemetry/tracing": "^0.10.0",
"@types/duplexify": "^3.6.0",

@@ -64,7 +66,6 @@ "@types/long": "^4.0.0",

"google-auth-library": "^6.0.0",
"google-gax": "^2.1.0",
"google-gax": "^2.7.0",
"is-stream-ended": "^0.1.4",
"lodash.snakecase": "^4.1.1",
"p-defer": "^3.0.0",
"protobufjs": "^6.8.1"
"p-defer": "^3.0.0"
},

@@ -96,2 +97,3 @@ "devDependencies": {

"null-loader": "^4.0.0",
"protobufjs": "^6.10.1",
"proxyquire": "^2.0.0",

@@ -98,0 +100,0 @@ "sinon": "^9.0.0",

@@ -25,2 +25,5 @@ [//]: # "This README.md file is auto-generated, all changes to this file will be lost."

A comprehensive list of changes in each version may be found in
[the CHANGELOG](https://github.com/googleapis/nodejs-pubsub/blob/master/CHANGELOG.md).
* [Google Cloud Pub/Sub Node.js Client API Reference][client-docs]

@@ -27,0 +30,0 @@ * [Google Cloud Pub/Sub Documentation][product-docs]

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet