Comparing version 0.0.12 to 0.0.13
@@ -33,3 +33,3 @@ import { JSON } from './json'; | ||
InvalidEventData = "invalid_event_data", | ||
MissingHandlerForTopic = "missing_handler_for_subscription", | ||
MissingHandlerForSubscription = "missing_handler_for_subscription", | ||
HandlerFailedToProcessMessage = "handler_failed_to_process_message" | ||
@@ -46,11 +46,27 @@ } | ||
created_at: Date; | ||
status: EventStatus; | ||
attempt_statuses: EventStatus[]; | ||
last_failure_reason?: string; | ||
subscription: string; | ||
base64_event_data: Base64String; | ||
} | ||
/** | ||
* This interface represents any state manager | ||
* | ||
* If you want to handle state in a more robust way than | ||
* in-memory, then you would implement this interface: | ||
* | ||
* ```typescript | ||
* class PostgresStateManager implements StateManager { ... } | ||
* ``` | ||
*/ | ||
export interface StateManager { | ||
getPubSubEvent(messageId: MessageId): Promise<PubSubEvent | undefined>; | ||
recordMessageReceived(rawMessage: UnprocessedPubSubMessage, subscription: string, cachedEvent?: PubSubEvent): Promise<PubSubEvent>; | ||
recordMessageProcessingOutcome(cachedEvent: PubSubEvent, outcome: EventStatus): Promise<void>; | ||
recordMessageProcessingOutcome(cachedEvent: PubSubEvent, outcome: EventStatus, failureReason?: FailureReason): Promise<void>; | ||
} | ||
/** | ||
* Exported for testing purposes, you do not need to ever import this class | ||
* as it is being used automatically if you instantiate PubSub without your own | ||
* state manager (the third argument in the PubSub constructor) | ||
*/ | ||
export declare class InMemoryStateManager implements StateManager { | ||
@@ -61,4 +77,7 @@ private cache; | ||
recordMessageReceived(rawMessage: UnprocessedPubSubMessage, subscription: string, cachedEvent?: PubSubEvent): Promise<PubSubEvent>; | ||
recordMessageProcessingOutcome(cachedEvent: PubSubEvent, outcome: EventStatus): Promise<void>; | ||
recordMessageProcessingOutcome(cachedEvent: PubSubEvent, outcome: EventStatus, failureReason?: FailureReason): Promise<void>; | ||
} | ||
/** | ||
* Represents the outcome of your `handler` inside of a `SubscriptionHandler` | ||
*/ | ||
export declare enum HandlerResult { | ||
@@ -68,4 +87,18 @@ FailedToProcess = "failed_to_process", | ||
} | ||
declare type FailureReason = string; | ||
/** | ||
* Takes deserialized and untyped JSON and either returns data that conforms | ||
* to a schema or nothing (to represent failed validation) | ||
* | ||
* Under the hood, the `a1pubsub` module will catch any errors within this function, | ||
* so you do not need to implement error handling. | ||
*/ | ||
declare type Validator<T> = (json: JSON) => T | undefined; | ||
export declare type MessageHandler<D extends {}> = (subscription: string, data: PubSubMessage<D>) => Promise<HandlerResult>; | ||
/** | ||
* The function that actually processes your validated pubsub event | ||
* | ||
* If this function returns `HandlerResult.FailedToProcess` then the retry | ||
* mechanism as described in the GCP PubSub documentation will kick into place | ||
*/ | ||
export declare type MessageHandler<D extends {}> = (event: PubSubEvent, subscription: string, data: PubSubMessage<D>) => Promise<[HandlerResult] | [HandlerResult, FailureReason]>; | ||
export interface SubscriptionHandler<D extends {} = {}> { | ||
@@ -83,5 +116,21 @@ validator: Validator<D>; | ||
constructor(projectId: string, decodingTable: SubscriptionMap, customStateManager?: StateManager); | ||
/** | ||
* Publish an event to a topic | ||
* | ||
* This method assumes that your data is serializeable to JSON | ||
* | ||
* @param topic the event stream in GCP PubSub | ||
* @param data the data associated with this event stream | ||
*/ | ||
publish<J extends {}>(topic: string, data: J): Promise<void>; | ||
handlePubSubMessage(rawMsg: UnprocessedPubSubMessage): Promise<SubscriptionError | undefined>; | ||
/** | ||
* This is the entrypoint for all pubsub subscription events | ||
* | ||
* @param rawMsg An unprocessed pubsub message that must first be validated | ||
*/ | ||
handlePubSubMessage(rawMsg: UnprocessedPubSubMessage): Promise<{ | ||
error: SubscriptionError; | ||
reason?: string; | ||
} | undefined>; | ||
} | ||
export {}; |
@@ -18,3 +18,3 @@ "use strict"; | ||
SubscriptionError["InvalidEventData"] = "invalid_event_data"; | ||
SubscriptionError["MissingHandlerForTopic"] = "missing_handler_for_subscription"; | ||
SubscriptionError["MissingHandlerForSubscription"] = "missing_handler_for_subscription"; | ||
SubscriptionError["HandlerFailedToProcessMessage"] = "handler_failed_to_process_message"; | ||
@@ -28,2 +28,7 @@ })(SubscriptionError = exports.SubscriptionError || (exports.SubscriptionError = {})); | ||
})(EventStatus = exports.EventStatus || (exports.EventStatus = {})); | ||
/** | ||
* Exported for testing purposes, you do not need to ever import this class | ||
* as it is being used automatically if you instantiate PubSub without your own | ||
* state manager (the third argument in the PubSub constructor) | ||
*/ | ||
class InMemoryStateManager { | ||
@@ -58,3 +63,3 @@ constructor() { | ||
else { | ||
/* eslint-disable */ | ||
/* eslint-disable @typescript-eslint/camelcase */ | ||
const event = { | ||
@@ -64,3 +69,3 @@ idempotency_key: rawMessage.message.messageId, | ||
created_at: today, | ||
status: EventStatus.InProgress, | ||
attempt_statuses: [EventStatus.InProgress], | ||
subscription: subscription, | ||
@@ -75,5 +80,7 @@ base64_event_data: rawMessage.message.data, | ||
} | ||
recordMessageProcessingOutcome(cachedEvent, outcome) { | ||
recordMessageProcessingOutcome(cachedEvent, outcome, failureReason) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
this.cache.set(cachedEvent.idempotency_key, Object.assign(Object.assign({}, cachedEvent), { status: outcome })); | ||
/* eslint-disable @typescript-eslint/camelcase */ | ||
this.cache.set(cachedEvent.idempotency_key, Object.assign(Object.assign({}, cachedEvent), { attempt_statuses: cachedEvent.attempt_statuses.concat(outcome), last_failure_reason: failureReason || cachedEvent.last_failure_reason })); | ||
/* eslint-enable */ | ||
}); | ||
@@ -83,2 +90,5 @@ } | ||
exports.InMemoryStateManager = InMemoryStateManager; | ||
/** | ||
* Represents the outcome of your `handler` inside of a `SubscriptionHandler` | ||
*/ | ||
var HandlerResult; | ||
@@ -122,2 +132,10 @@ (function (HandlerResult) { | ||
} | ||
/** | ||
* Publish an event to a topic | ||
* | ||
* This method assumes that your data is serializeable to JSON | ||
* | ||
* @param topic the event stream in GCP PubSub | ||
* @param data the data associated with this event stream | ||
*/ | ||
publish(topic, data) { | ||
@@ -134,2 +152,7 @@ return __awaiter(this, void 0, void 0, function* () { | ||
} | ||
/** | ||
* This is the entrypoint for all pubsub subscription events | ||
* | ||
* @param rawMsg An unprocessed pubsub message that must first be validated | ||
*/ | ||
handlePubSubMessage(rawMsg) { | ||
@@ -139,8 +162,15 @@ return __awaiter(this, void 0, void 0, function* () { | ||
if (!subscription) { | ||
return SubscriptionError.InvalidSubscription; | ||
return { | ||
error: SubscriptionError.InvalidSubscription, | ||
reason: `Subscription "${subscription}" doesn't follow the "projects/<GCP_PROJECT_NAME>/subscriptions/<SUBSCRIPTION_NAME>" pattern`, | ||
}; | ||
} | ||
const cachedMessage = yield this.stateManager.getPubSubEvent(rawMsg.message.messageId); | ||
if (cachedMessage && cachedMessage.status === EventStatus.Completed) { | ||
// idempotency :) | ||
return; | ||
if (cachedMessage && cachedMessage.attempt_statuses.length > 0) { | ||
const lastIndex = cachedMessage.attempt_statuses.length - 1; | ||
const mostRecentStatus = cachedMessage.attempt_statuses[lastIndex]; | ||
if (mostRecentStatus === EventStatus.Completed) { | ||
// idempotency :) | ||
return; | ||
} | ||
} | ||
@@ -150,8 +180,15 @@ const updatedCachedMessage = yield this.stateManager.recordMessageReceived(rawMsg, subscription, cachedMessage); | ||
if (!subscriptionHandler) { | ||
return SubscriptionError.MissingHandlerForTopic; | ||
return { | ||
error: SubscriptionError.MissingHandlerForSubscription, | ||
reason: `Subscription "${subscription}" doesn't have a corresponding handler`, | ||
}; | ||
} | ||
const { validator, handler } = subscriptionHandler; | ||
const decodedMessage = handleValidator(validator, json_1.base64ToParsedJSON(rawMsg.message.data)); | ||
const untypedJsonData = json_1.base64ToParsedJSON(rawMsg.message.data); | ||
const decodedMessage = handleValidator(validator, untypedJsonData); | ||
if (decodedMessage.type === 'error') { | ||
return decodedMessage.error; | ||
return { | ||
error: decodedMessage.error, | ||
reason: `Data for subscription "${subscription}" did not pass validation. Data: ${untypedJsonData}`, | ||
}; | ||
} | ||
@@ -162,5 +199,9 @@ const pubSubMessage = { | ||
}; | ||
const handlerResult = yield handler(subscription, pubSubMessage).catch(() => { | ||
const [handlerResult, failureReason] = yield handler(updatedCachedMessage, subscription, pubSubMessage).catch((err) => { | ||
// catching in case handler didn't catch its own errors | ||
return HandlerResult.FailedToProcess; | ||
const errorMessage = [ | ||
'uncaught promise rejection within handler', | ||
`Error: ${err}`, | ||
].join(' - '); | ||
return [HandlerResult.FailedToProcess, errorMessage]; | ||
}); | ||
@@ -172,4 +213,7 @@ if (handlerResult === HandlerResult.Success) { | ||
else { | ||
this.stateManager.recordMessageProcessingOutcome(updatedCachedMessage, EventStatus.Failed); | ||
return SubscriptionError.HandlerFailedToProcessMessage; | ||
this.stateManager.recordMessageProcessingOutcome(updatedCachedMessage, EventStatus.Failed, failureReason); | ||
return { | ||
error: SubscriptionError.HandlerFailedToProcessMessage, | ||
reason: `Handler for subscription "${subscription}" failed to process successfully. `.concat(failureReason ? `Failure reason: ${failureReason}` : ''), | ||
}; | ||
} | ||
@@ -176,0 +220,0 @@ }); |
{ | ||
"name": "a1pubsub", | ||
"version": "0.0.12", | ||
"version": "0.0.13", | ||
"description": "GCP PubSub wrapper to add idempotency", | ||
@@ -13,3 +13,4 @@ "main": "dist/index.js", | ||
"clean": "rm -rf ./dist", | ||
"build": "npm run clean && tsc" | ||
"build": "npm run clean && tsc", | ||
"docgen": "typedoc --excludeNotExported --excludePrivate --excludeProtected" | ||
}, | ||
@@ -23,4 +24,5 @@ "author": "Setter Inc.", | ||
"@types/jest": "25.1.4", | ||
"@typescript-eslint/eslint-plugin": "2.22.0", | ||
"@typescript-eslint/parser": "2.22.0", | ||
"@types/lodash": "^4.14.149", | ||
"@typescript-eslint/eslint-plugin": "2.24.0", | ||
"@typescript-eslint/parser": "2.24.0", | ||
"eslint": "6.8.0", | ||
@@ -30,6 +32,8 @@ "eslint-config-prettier": "6.10.0", | ||
"jest": "25.1.0", | ||
"lodash": "^4.17.15", | ||
"prettier": "1.19.1", | ||
"ts-jest": "25.2.1", | ||
"typedoc": "^0.17.1", | ||
"typescript": "3.8.3" | ||
} | ||
} |
126
README.md
@@ -13,13 +13,14 @@ # A1 PubSub | ||
* [Installation](#installation) | ||
* [API Documentation](#api-documentation) | ||
* [Overview (Why & How)](#overview-why--how) | ||
+ [Quick Summary of Google Cloud PubSub Terminology:](#quick-summary-of-google-cloud-pubsub-terminology) | ||
+ [How Does It Work?](#how-does-it-work) | ||
- [Decoding Table](#decoding-table) | ||
- [Subscription Handlers](#subscription-handlers) | ||
* [Full Example of Subscription Handling With ExpressJS](#full-example-of-subscription-handling-with-expressjs) | ||
* [Security & Authentication](#security--authentication) | ||
* [Providing Alternative State Managers](#providing-alternative-state-managers) | ||
* [API Documentation](#api-documentation) | ||
## Installation | ||
@@ -33,2 +34,7 @@ | ||
## API Documentation | ||
[link](https://setter-a1pubsub.netlify.com/classes/_index_.pubsub.html) | ||
## Overview (Why & How) | ||
@@ -56,3 +62,3 @@ | ||
- [use the CLI](https://cloud.google.com/pubsub/docs/quickstart-cli#use_the_gcloud_command-line_tool) | ||
- `gcloud pubsub topics create myTopic` | ||
- `gcloud pubsub topics create my_topic` | ||
- [use the gcp UI](https://cloud.google.com/pubsub/docs/quickstart-console#create_a_topic) | ||
@@ -71,4 +77,7 @@ - Topic names must be unique | ||
- [use the CLI](https://cloud.google.com/pubsub/docs/admin#creating_subscriptions) | ||
- `gcloud pubsub subscriptions create --topic myTopic mySubscriptionName` | ||
- `gcloud pubsub subscriptions create --topic <topic_name> <topic_name>__<subscription_name>` | ||
- [use the gcp UI](https://cloud.google.com/pubsub/docs/quickstart-console#add_a_subscription) | ||
- Ensure you enable authentication for your subscription | ||
- Set the endpoint appropriately | ||
- For local development, I recomment you use [ngrok](https://ngrok.com/) | ||
- Subscription names must be unique | ||
@@ -92,5 +101,5 @@ | ||
const decodingTable = justHoldYourHorsesForASecond | ||
const subscriptionHandlers = {} // more to come here soon | ||
const ps = new PubSub(myGcpProjecId, decodingTable) | ||
const ps = new PubSub(myGcpProjecId, subscriptionHandlers) | ||
``` | ||
@@ -124,32 +133,18 @@ | ||
#### Decoding Table | ||
#### Subscription Handlers | ||
So in the above code snippet, you saw that `PubSub` was instantiated with a `decodingTable`. | ||
So in the above code snippet, you saw that `PubSub` was instantiated with a `subscriptionHandlers` object. The type of `subscriptionHandlers` must be `SubscriptionMap`. | ||
Before I get to explaining in plain english, here is the type definition: | ||
A `SubscriptionHandler` is a plain js object whose keys are strings (that represent subscription identifiers), and whose values are a object of type `SubscriptionHandler`, a `SubscriptionHandler` contains: | ||
```typescript | ||
type SubscriptionId = string | ||
interface SubscriptionHandler<T extends {}> { | ||
validator: (json: JSON) => T | ||
handler: (data: T) => Promise<boolean> | ||
} | ||
export type DecodingTable<T extends {}> = Map<SubscriptionId, SubscriptionHandler<T>> | ||
``` | ||
In plain english: A decoding table is a `Map` whose keys are strings (that represent subscription identifiers), and whose values are `SubscriptionHandler`s. | ||
A `SubscriptionHandler` is an object with two keys: | ||
- `validator`: As I mentioned alredy, GCP pubsub data is schemaless. All you know is that the data is serializeable to json. | ||
- the `JSON` type is defined in `src/json.ts` and it's just a type-level definition of `JSON.parse`. | ||
- `handler`: The actual subscription handler, it takes your validated data and returns a promise with a boolean. | ||
- Feel free to do whatever you want here, the only requirement is that you must return a promise with a boolean value. | ||
- `true`: success, any subsequent messages that GCP pubsub might deliver will get ignored | ||
- Feel free to do whatever you want here, the only requirement is that you must return a promise with a `HandlerResult` value. | ||
- `HandlerResult.Success`: any subsequent messages that GCP pubsub might deliver will get ignored | ||
- **you, the developer** must send a 2XX HTTP response to google cloud pubsub so that it knows that the event has been processed | ||
- `false`: failure, the event will be tracked, but our system will be expecting that same event from being delivered again on a retry | ||
- `HandlerResult.FailedToProcess`: the event will be tracked, but our system will be expecting that same event from being delivered again on a retry | ||
- **you, the developer** must send a non 2XX HTTP response to google cloud pubsub so that it knows to retry later | ||
So if your pubsub module needs to handle 5 subscriptions, then your `DecodingTable` will have 5 keys, and 5 corresponding `SubscriptionHandler`s. | ||
So if your pubsub module needs to handle 5 subscriptions, then your `SubscriptionMap` will have 5 keys, and 5 corresponding `SubscriptionHandler`s. | ||
@@ -164,2 +159,34 @@ | ||
import { SubscriptionMap, HandlerResult } from 'a1pubsub' | ||
import { notifyClientViaTicketComment } from './quote-approved/zendesk-notification' | ||
import { quoteApprovalValidator, ApprovedQuoteData } from './quote-approved' | ||
// defining the shape of our specific SubscriptionMap | ||
declare module 'a1pubsub' { | ||
interface SubscriptionMap { | ||
quote_approved__client_ticket_comment: SubscriptionHandler< | ||
ApprovedQuoteData | ||
> | ||
job_cancelled__pro_sms: SubscriptionHandler<{ example: string }> | ||
} | ||
} | ||
/* eslint-disable @typescript-eslint/camelcase */ | ||
export const eventHandlers: SubscriptionMap = { | ||
quote_approved__client_ticket_comment: { | ||
validator: quoteApprovalValidator.check, | ||
handler: notifyClientViaTicketComment, | ||
}, | ||
job_cancelled__pro_sms: { | ||
validator: data => data as { example: 'testing' }, | ||
handler: data => { | ||
console.log(data) | ||
return Promise.resolve(HandlerResult.Success) | ||
}, | ||
}, | ||
} | ||
const app = express() | ||
@@ -171,4 +198,2 @@ const port = 3000 | ||
const decodingTable = new Map() | ||
const quoteApprovedSchema = { | ||
@@ -190,42 +215,7 @@ quote_id: Joi.number().required() | ||
decodingTable.set('quote_approved', { | ||
validator: (data) => { | ||
// using joi here ... but you can use anything you want | ||
// runtypes, yum, validatorjs etc etc etc | ||
const { approvedQuoteData, error } = Joi.object(quoteApprovedSchema) | ||
.options({ stripUnknown: true }) | ||
.validate(data) | ||
if (approvedQuoteData) { | ||
return approvedQuoteData | ||
} else { | ||
return | ||
} | ||
}, | ||
handler: sendQuoteApprovalEmailToClient, | ||
}) | ||
decodingTable.set('job_cancelled', { | ||
validator: (data) => { | ||
const { cancelledJobData, error } = Joi.object(jobCancelledSchema) | ||
.options({ stripUnknown: true }) | ||
.validate(data) | ||
const ps = new PubSub(myGcpProjecId, eventHandlers) | ||
if (cancelledJobData) { | ||
return cancelledJobData | ||
} | ||
}, | ||
handler: refundClient | ||
}) | ||
/* | ||
* pretend a whole bunch of SubscriptionId + SubscriptionHandler pairs | ||
* have been added to the decodingTable Map | ||
* to handle all the various subscriptions | ||
*/ | ||
const ps = new PubSub(myGcpProjecId, decodingTable) | ||
app.post('/pubsub', async (req, res) => { | ||
@@ -265,7 +255,3 @@ const pubsubMessage = req.body | ||
new PubSub('my-project-id', decodingTable, psqlStateManager) | ||
new PubSub('my-project-id', eventHandlers, psqlStateManager) | ||
``` | ||
## API Documentation | ||
[link](https://priceless-meitner-38fa2b.netlify.com/classes/_index_.pubsub.html) |
27343
355
13
248