New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

a1pubsub

Package Overview
Dependencies
Maintainers
1
Versions
16
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

a1pubsub - npm Package Compare versions

Comparing version 0.0.12 to 0.0.13

61

dist/index.d.ts

@@ -33,3 +33,3 @@ import { JSON } from './json';

InvalidEventData = "invalid_event_data",
MissingHandlerForTopic = "missing_handler_for_subscription",
MissingHandlerForSubscription = "missing_handler_for_subscription",
HandlerFailedToProcessMessage = "handler_failed_to_process_message"

@@ -46,11 +46,27 @@ }

created_at: Date;
status: EventStatus;
attempt_statuses: EventStatus[];
last_failure_reason?: string;
subscription: string;
base64_event_data: Base64String;
}
/**
* This interface represents any state manager
*
* If you want to handle state in a more robust way than
* in-memory, then you would implement this interface:
*
* ```typescript
* class PostgresStateManager implements StateManager { ... }
* ```
*/
export interface StateManager {
getPubSubEvent(messageId: MessageId): Promise<PubSubEvent | undefined>;
recordMessageReceived(rawMessage: UnprocessedPubSubMessage, subscription: string, cachedEvent?: PubSubEvent): Promise<PubSubEvent>;
recordMessageProcessingOutcome(cachedEvent: PubSubEvent, outcome: EventStatus): Promise<void>;
recordMessageProcessingOutcome(cachedEvent: PubSubEvent, outcome: EventStatus, failureReason?: FailureReason): Promise<void>;
}
/**
* Exported for testing purposes, you do not need to ever import this class
* as it is being used automatically if you instantiate PubSub without your own
* state manager (the third argument in the PubSub constructor)
*/
export declare class InMemoryStateManager implements StateManager {

@@ -61,4 +77,7 @@ private cache;

recordMessageReceived(rawMessage: UnprocessedPubSubMessage, subscription: string, cachedEvent?: PubSubEvent): Promise<PubSubEvent>;
recordMessageProcessingOutcome(cachedEvent: PubSubEvent, outcome: EventStatus): Promise<void>;
recordMessageProcessingOutcome(cachedEvent: PubSubEvent, outcome: EventStatus, failureReason?: FailureReason): Promise<void>;
}
/**
* Represents the outcome of your `handler` inside of a `SubscriptionHandler`
*/
export declare enum HandlerResult {

@@ -68,4 +87,18 @@ FailedToProcess = "failed_to_process",

}
declare type FailureReason = string;
/**
* Takes deserialized and untyped JSON and either returns data that conforms
* to a schema or nothing (to represent failed validation)
*
* Under the hood, the `a1pubsub` module will catch any errors within this function,
* so you do not need to implement error handling.
*/
declare type Validator<T> = (json: JSON) => T | undefined;
export declare type MessageHandler<D extends {}> = (subscription: string, data: PubSubMessage<D>) => Promise<HandlerResult>;
/**
* The function that actually processes your validated pubsub event
*
* If this function returns `HandlerResult.FailedToProcess` then the retry
* mechanism as described in the GCP PubSub documentation will kick into place
*/
export declare type MessageHandler<D extends {}> = (event: PubSubEvent, subscription: string, data: PubSubMessage<D>) => Promise<[HandlerResult] | [HandlerResult, FailureReason]>;
export interface SubscriptionHandler<D extends {} = {}> {

@@ -83,5 +116,21 @@ validator: Validator<D>;

constructor(projectId: string, decodingTable: SubscriptionMap, customStateManager?: StateManager);
/**
* Publish an event to a topic
*
* This method assumes that your data is serializeable to JSON
*
* @param topic the event stream in GCP PubSub
* @param data the data associated with this event stream
*/
publish<J extends {}>(topic: string, data: J): Promise<void>;
handlePubSubMessage(rawMsg: UnprocessedPubSubMessage): Promise<SubscriptionError | undefined>;
/**
* This is the entrypoint for all pubsub subscription events
*
* @param rawMsg An unprocessed pubsub message that must first be validated
*/
handlePubSubMessage(rawMsg: UnprocessedPubSubMessage): Promise<{
error: SubscriptionError;
reason?: string;
} | undefined>;
}
export {};

76

dist/index.js

@@ -18,3 +18,3 @@ "use strict";

SubscriptionError["InvalidEventData"] = "invalid_event_data";
SubscriptionError["MissingHandlerForTopic"] = "missing_handler_for_subscription";
SubscriptionError["MissingHandlerForSubscription"] = "missing_handler_for_subscription";
SubscriptionError["HandlerFailedToProcessMessage"] = "handler_failed_to_process_message";

@@ -28,2 +28,7 @@ })(SubscriptionError = exports.SubscriptionError || (exports.SubscriptionError = {}));

})(EventStatus = exports.EventStatus || (exports.EventStatus = {}));
/**
* Exported for testing purposes, you do not need to ever import this class
* as it is being used automatically if you instantiate PubSub without your own
* state manager (the third argument in the PubSub constructor)
*/
class InMemoryStateManager {

@@ -58,3 +63,3 @@ constructor() {

else {
/* eslint-disable */
/* eslint-disable @typescript-eslint/camelcase */
const event = {

@@ -64,3 +69,3 @@ idempotency_key: rawMessage.message.messageId,

created_at: today,
status: EventStatus.InProgress,
attempt_statuses: [EventStatus.InProgress],
subscription: subscription,

@@ -75,5 +80,7 @@ base64_event_data: rawMessage.message.data,

}
recordMessageProcessingOutcome(cachedEvent, outcome) {
recordMessageProcessingOutcome(cachedEvent, outcome, failureReason) {
return __awaiter(this, void 0, void 0, function* () {
this.cache.set(cachedEvent.idempotency_key, Object.assign(Object.assign({}, cachedEvent), { status: outcome }));
/* eslint-disable @typescript-eslint/camelcase */
this.cache.set(cachedEvent.idempotency_key, Object.assign(Object.assign({}, cachedEvent), { attempt_statuses: cachedEvent.attempt_statuses.concat(outcome), last_failure_reason: failureReason || cachedEvent.last_failure_reason }));
/* eslint-enable */
});

@@ -83,2 +90,5 @@ }

exports.InMemoryStateManager = InMemoryStateManager;
/**
* Represents the outcome of your `handler` inside of a `SubscriptionHandler`
*/
var HandlerResult;

@@ -122,2 +132,10 @@ (function (HandlerResult) {

}
/**
* Publish an event to a topic
*
* This method assumes that your data is serializeable to JSON
*
* @param topic the event stream in GCP PubSub
* @param data the data associated with this event stream
*/
publish(topic, data) {

@@ -134,2 +152,7 @@ return __awaiter(this, void 0, void 0, function* () {

}
/**
* This is the entrypoint for all pubsub subscription events
*
* @param rawMsg An unprocessed pubsub message that must first be validated
*/
handlePubSubMessage(rawMsg) {

@@ -139,8 +162,15 @@ return __awaiter(this, void 0, void 0, function* () {

if (!subscription) {
return SubscriptionError.InvalidSubscription;
return {
error: SubscriptionError.InvalidSubscription,
reason: `Subscription "${subscription}" doesn't follow the "projects/<GCP_PROJECT_NAME>/subscriptions/<SUBSCRIPTION_NAME>" pattern`,
};
}
const cachedMessage = yield this.stateManager.getPubSubEvent(rawMsg.message.messageId);
if (cachedMessage && cachedMessage.status === EventStatus.Completed) {
// idempotency :)
return;
if (cachedMessage && cachedMessage.attempt_statuses.length > 0) {
const lastIndex = cachedMessage.attempt_statuses.length - 1;
const mostRecentStatus = cachedMessage.attempt_statuses[lastIndex];
if (mostRecentStatus === EventStatus.Completed) {
// idempotency :)
return;
}
}

@@ -150,8 +180,15 @@ const updatedCachedMessage = yield this.stateManager.recordMessageReceived(rawMsg, subscription, cachedMessage);

if (!subscriptionHandler) {
return SubscriptionError.MissingHandlerForTopic;
return {
error: SubscriptionError.MissingHandlerForSubscription,
reason: `Subscription "${subscription}" doesn't have a corresponding handler`,
};
}
const { validator, handler } = subscriptionHandler;
const decodedMessage = handleValidator(validator, json_1.base64ToParsedJSON(rawMsg.message.data));
const untypedJsonData = json_1.base64ToParsedJSON(rawMsg.message.data);
const decodedMessage = handleValidator(validator, untypedJsonData);
if (decodedMessage.type === 'error') {
return decodedMessage.error;
return {
error: decodedMessage.error,
reason: `Data for subscription "${subscription}" did not pass validation. Data: ${untypedJsonData}`,
};
}

@@ -162,5 +199,9 @@ const pubSubMessage = {

};
const handlerResult = yield handler(subscription, pubSubMessage).catch(() => {
const [handlerResult, failureReason] = yield handler(updatedCachedMessage, subscription, pubSubMessage).catch((err) => {
// catching in case handler didn't catch its own errors
return HandlerResult.FailedToProcess;
const errorMessage = [
'uncaught promise rejection within handler',
`Error: ${err}`,
].join(' - ');
return [HandlerResult.FailedToProcess, errorMessage];
});

@@ -172,4 +213,7 @@ if (handlerResult === HandlerResult.Success) {

else {
this.stateManager.recordMessageProcessingOutcome(updatedCachedMessage, EventStatus.Failed);
return SubscriptionError.HandlerFailedToProcessMessage;
this.stateManager.recordMessageProcessingOutcome(updatedCachedMessage, EventStatus.Failed, failureReason);
return {
error: SubscriptionError.HandlerFailedToProcessMessage,
reason: `Handler for subscription "${subscription}" failed to process successfully. `.concat(failureReason ? `Failure reason: ${failureReason}` : ''),
};
}

@@ -176,0 +220,0 @@ });

{
"name": "a1pubsub",
"version": "0.0.12",
"version": "0.0.13",
"description": "GCP PubSub wrapper to add idempotency",

@@ -13,3 +13,4 @@ "main": "dist/index.js",

"clean": "rm -rf ./dist",
"build": "npm run clean && tsc"
"build": "npm run clean && tsc",
"docgen": "typedoc --excludeNotExported --excludePrivate --excludeProtected"
},

@@ -23,4 +24,5 @@ "author": "Setter Inc.",

"@types/jest": "25.1.4",
"@typescript-eslint/eslint-plugin": "2.22.0",
"@typescript-eslint/parser": "2.22.0",
"@types/lodash": "^4.14.149",
"@typescript-eslint/eslint-plugin": "2.24.0",
"@typescript-eslint/parser": "2.24.0",
"eslint": "6.8.0",

@@ -30,6 +32,8 @@ "eslint-config-prettier": "6.10.0",

"jest": "25.1.0",
"lodash": "^4.17.15",
"prettier": "1.19.1",
"ts-jest": "25.2.1",
"typedoc": "^0.17.1",
"typescript": "3.8.3"
}
}

@@ -13,13 +13,14 @@ # A1 PubSub

* [Installation](#installation)
* [API Documentation](#api-documentation)
* [Overview (Why & How)](#overview-why--how)
+ [Quick Summary of Google Cloud PubSub Terminology:](#quick-summary-of-google-cloud-pubsub-terminology)
+ [How Does It Work?](#how-does-it-work)
- [Decoding Table](#decoding-table)
- [Subscription Handlers](#subscription-handlers)
* [Full Example of Subscription Handling With ExpressJS](#full-example-of-subscription-handling-with-expressjs)
* [Security & Authentication](#security--authentication)
* [Providing Alternative State Managers](#providing-alternative-state-managers)
* [API Documentation](#api-documentation)
## Installation

@@ -33,2 +34,7 @@

## API Documentation
[link](https://setter-a1pubsub.netlify.com/classes/_index_.pubsub.html)
## Overview (Why & How)

@@ -56,3 +62,3 @@

- [use the CLI](https://cloud.google.com/pubsub/docs/quickstart-cli#use_the_gcloud_command-line_tool)
- `gcloud pubsub topics create myTopic`
- `gcloud pubsub topics create my_topic`
- [use the gcp UI](https://cloud.google.com/pubsub/docs/quickstart-console#create_a_topic)

@@ -71,4 +77,7 @@ - Topic names must be unique

- [use the CLI](https://cloud.google.com/pubsub/docs/admin#creating_subscriptions)
- `gcloud pubsub subscriptions create --topic myTopic mySubscriptionName`
- `gcloud pubsub subscriptions create --topic <topic_name> <topic_name>__<subscription_name>`
- [use the gcp UI](https://cloud.google.com/pubsub/docs/quickstart-console#add_a_subscription)
- Ensure you enable authentication for your subscription
- Set the endpoint appropriately
- For local development, I recomment you use [ngrok](https://ngrok.com/)
- Subscription names must be unique

@@ -92,5 +101,5 @@

const decodingTable = justHoldYourHorsesForASecond
const subscriptionHandlers = {} // more to come here soon
const ps = new PubSub(myGcpProjecId, decodingTable)
const ps = new PubSub(myGcpProjecId, subscriptionHandlers)
```

@@ -124,32 +133,18 @@

#### Decoding Table
#### Subscription Handlers
So in the above code snippet, you saw that `PubSub` was instantiated with a `decodingTable`.
So in the above code snippet, you saw that `PubSub` was instantiated with a `subscriptionHandlers` object. The type of `subscriptionHandlers` must be `SubscriptionMap`.
Before I get to explaining in plain english, here is the type definition:
A `SubscriptionHandler` is a plain js object whose keys are strings (that represent subscription identifiers), and whose values are a object of type `SubscriptionHandler`, a `SubscriptionHandler` contains:
```typescript
type SubscriptionId = string
interface SubscriptionHandler<T extends {}> {
validator: (json: JSON) => T
handler: (data: T) => Promise<boolean>
}
export type DecodingTable<T extends {}> = Map<SubscriptionId, SubscriptionHandler<T>>
```
In plain english: A decoding table is a `Map` whose keys are strings (that represent subscription identifiers), and whose values are `SubscriptionHandler`s.
A `SubscriptionHandler` is an object with two keys:
- `validator`: As I mentioned alredy, GCP pubsub data is schemaless. All you know is that the data is serializeable to json.
- the `JSON` type is defined in `src/json.ts` and it's just a type-level definition of `JSON.parse`.
- `handler`: The actual subscription handler, it takes your validated data and returns a promise with a boolean.
- Feel free to do whatever you want here, the only requirement is that you must return a promise with a boolean value.
- `true`: success, any subsequent messages that GCP pubsub might deliver will get ignored
- Feel free to do whatever you want here, the only requirement is that you must return a promise with a `HandlerResult` value.
- `HandlerResult.Success`: any subsequent messages that GCP pubsub might deliver will get ignored
- **you, the developer** must send a 2XX HTTP response to google cloud pubsub so that it knows that the event has been processed
- `false`: failure, the event will be tracked, but our system will be expecting that same event from being delivered again on a retry
- `HandlerResult.FailedToProcess`: the event will be tracked, but our system will be expecting that same event from being delivered again on a retry
- **you, the developer** must send a non 2XX HTTP response to google cloud pubsub so that it knows to retry later
So if your pubsub module needs to handle 5 subscriptions, then your `DecodingTable` will have 5 keys, and 5 corresponding `SubscriptionHandler`s.
So if your pubsub module needs to handle 5 subscriptions, then your `SubscriptionMap` will have 5 keys, and 5 corresponding `SubscriptionHandler`s.

@@ -164,2 +159,34 @@

import { SubscriptionMap, HandlerResult } from 'a1pubsub'
import { notifyClientViaTicketComment } from './quote-approved/zendesk-notification'
import { quoteApprovalValidator, ApprovedQuoteData } from './quote-approved'
// defining the shape of our specific SubscriptionMap
declare module 'a1pubsub' {
interface SubscriptionMap {
quote_approved__client_ticket_comment: SubscriptionHandler<
ApprovedQuoteData
>
job_cancelled__pro_sms: SubscriptionHandler<{ example: string }>
}
}
/* eslint-disable @typescript-eslint/camelcase */
export const eventHandlers: SubscriptionMap = {
quote_approved__client_ticket_comment: {
validator: quoteApprovalValidator.check,
handler: notifyClientViaTicketComment,
},
job_cancelled__pro_sms: {
validator: data => data as { example: 'testing' },
handler: data => {
console.log(data)
return Promise.resolve(HandlerResult.Success)
},
},
}
const app = express()

@@ -171,4 +198,2 @@ const port = 3000

const decodingTable = new Map()
const quoteApprovedSchema = {

@@ -190,42 +215,7 @@ quote_id: Joi.number().required()

decodingTable.set('quote_approved', {
validator: (data) => {
// using joi here ... but you can use anything you want
// runtypes, yum, validatorjs etc etc etc
const { approvedQuoteData, error } = Joi.object(quoteApprovedSchema)
.options({ stripUnknown: true })
.validate(data)
if (approvedQuoteData) {
return approvedQuoteData
} else {
return
}
},
handler: sendQuoteApprovalEmailToClient,
})
decodingTable.set('job_cancelled', {
validator: (data) => {
const { cancelledJobData, error } = Joi.object(jobCancelledSchema)
.options({ stripUnknown: true })
.validate(data)
const ps = new PubSub(myGcpProjecId, eventHandlers)
if (cancelledJobData) {
return cancelledJobData
}
},
handler: refundClient
})
/*
* pretend a whole bunch of SubscriptionId + SubscriptionHandler pairs
* have been added to the decodingTable Map
* to handle all the various subscriptions
*/
const ps = new PubSub(myGcpProjecId, decodingTable)
app.post('/pubsub', async (req, res) => {

@@ -265,7 +255,3 @@ const pubsubMessage = req.body

new PubSub('my-project-id', decodingTable, psqlStateManager)
new PubSub('my-project-id', eventHandlers, psqlStateManager)
```
## API Documentation
[link](https://priceless-meitner-38fa2b.netlify.com/classes/_index_.pubsub.html)
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc