Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

a1pubsub

Package Overview
Dependencies
Maintainers
5
Versions
16
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

a1pubsub

GCP PubSub wrapper to add idempotency

  • 0.0.16
  • latest
  • npm
  • Socket score

Version published
Weekly downloads
18
decreased by-56.1%
Maintainers
5
Weekly downloads
 
Created
Source

A1 PubSub

Build Status

Keep your pubsub saucy

This package is a wrapper for guaranteeing exactly-once handling of messages from Google Cloud PubSub.

Table Of Contents:

Installation

> npm install a1pubsub

This package requries running in a environment with GCP Application Default Credentials. If you authenticate on your machine using the gcloud cli, then you're good to go!

API Documentation

link

Local Development

When developing on your machine, you will want to publish events and also receive them (once you've configured a subscription).

Publishing PubSub Events

Trigger the "flow" that contains the .publish API call. Example, if you want to trigger the quote approval pubsub event, then approve a quote.

Whatever environment (local, staging) this event was invoked in will transmit the event over to GCP PubSub.

One of three things will subsequently occur:

  • Error: The topic you tried publishing to does not exist
  • Success and then nothing: The topic you tried publishing to does exist, but there are no subscriptions for this topic, so it's essentially a no-op
  • Success and then delivery to subscriber(s): The topic you tried publishing to does exist, and there are subscribers that are set up as either pull or push.
    • if they're push subscribers, then they'll recive the message in near-realtime.
Receiving PubSub Events to your machine

Let's suppose that you're working with push subscriptions. The question is then, how do you get GCP pubsub to stream events to your machine?

Steps:

  • Make sure your http server is running and has a port exposed
  • Make sure you have ngrok installed on your machine
  • Run ngrok as follows:
$> ~/ngrok http <PORT_NUMBER> 

You'll now have a url such as https://c251a9ae.ngrok.io that you can copy/paste into the GCP pubsub UI.

If your HTTP server has a request handler set up at /webhooks/pubsub then you'll want to paste https://c251a9ae.ngrok.io/webhooks/pubsub into the GCP PubSub UI for the PubSub URL.

Overview (Why & How)

Setter needs one-to-many relationships between events that are published and the corresponding event handlers. i.e. We would like to emit an event such as JOB_APPROVED, and have various services be able to do different things with those events independently.

The way GCP PubSub works is that it delivers messages to every subscriber at least once (docs). Thus, if you have a system that must only process events exactly once, you have to implement idempotence yourself.

This package is intended to abstract away the need to manage idempotence yourself.

Quick Summary of Google Cloud PubSub Terminology:

What follows is a tldr of the actual GCP documentation located at: https://cloud.google.com/pubsub/docs

Topic:

  • A topic is an event stream
    • Example: you could have an event stream of quote_approved events, and then a separate stream for job_scheduled events, etc etc
  • You publish events to topics
    • Publishing has two components:
      • The topic / event stream name
      • The associated data (must be serializeable to JSON)
  • GCP PubSub has a limt of 10,000 topics per project (source)
  • To create a topic, use the gcp UI
  • Topic names must be unique

Subscription:

  • Subscriptions are topic listeners
  • You can have many subscriptions per topic
    • i.e. multiple teams can do different things for the same business event
  • You can have push and pull subscriptions
  • The data received from a subscription is schemaless
    • you have no guarantee that the data you're receiving adheres to a implied schema in your code. You must validate your data.
  • To create a subscription, use the gcp UI
    • Ensure you enable authentication for your subscription
    • Set the endpoint appropriately
      • For local development, I recomment you use ngrok
  • Subscription names must be unique

How Does It Work?

Let's do a code-a-long...

First, import the PubSub class from a1pubsub

import { PubSub } from 'a1pubsub'

Let's instantiate the PubSub class:

const myGcpProjecId = 'setter-develop-82828'

const subscriptionHandlers = {} // more to come here soon

const ps = new PubSub(myGcpProjecId, subscriptionHandlers)

When you instantiate PubSub, the module will try to authenticate to gcp using Application Default Credentials.

Please take 5 minutes to familiarize yourself with ADC, as this is the only way to authenticate to GCP PubSub for the moment. Regardless, here's what you need to know when developing on your machine:

  • You must have the gcloud CLI installed on your machine
  • You must be authenticated via the gcloud CLI to the corresponding project that you're trying to work with
    • For example, in the above code snippet, I am instantiating PubSub with the 'setter-develop-82828' project - thus I must be authenticated via gcloud auth to that project as well

Now that you've instantiated PubSub, you can now publish events!

await ps.publish(
  'quote_approved', // topic name
  { id: 1232, title: 'Window Cleaning', client_first_name: 'Jerry' } // data - must be serializeable to JSON
)

Note that any topic that you publish to must already exist! See the links above for creating topics.

Note For Setter engineers: Create topics sparringly and with good reason. Try to adhere to creating topics that represent events that have occurred. Don't create a topics that represent actions to be done. For more context, refer to this talk.

Ok back to the code along.

Subscription Handlers

So in the above code snippet, you saw that PubSub was instantiated with a subscriptionHandlers object. The type of subscriptionHandlers must be SubscriptionMap.

A SubscriptionHandler is a plain js object whose keys are strings (that represent subscription identifiers), and whose values are a object of type SubscriptionHandler, a SubscriptionHandler contains:

  • validator: As I mentioned alredy, GCP pubsub data is schemaless. All you know is that the data is serializeable to json.
    • the JSON type is defined in src/json.ts and it's just a type-level definition of JSON.parse.
  • handler: The actual subscription handler, it takes your validated data and returns a promise with a boolean.
    • Feel free to do whatever you want here, the only requirement is that you must return a promise with a HandlerResult value.
    • HandlerResult.Success: any subsequent messages that GCP pubsub might deliver will get ignored
      • you, the developer must send a 2XX HTTP response to google cloud pubsub so that it knows that the event has been processed
    • HandlerResult.FailedToProcess: the event will be tracked, but our system will be expecting that same event from being delivered again on a retry
      • you, the developer must send a non 2XX HTTP response to google cloud pubsub so that it knows to retry later

So if your pubsub module needs to handle 5 subscriptions, then your SubscriptionMap will have 5 keys, and 5 corresponding SubscriptionHandlers.

Full Example of Subscription Handling With ExpressJS

import express from 'express'
import { PubSub } from 'a1pubsub'
import * as Joi from '@hapi/joi'

import { SubscriptionMap, HandlerResult } from 'a1pubsub'

import { notifyClientViaTicketComment } from './quote-approved/zendesk-notification'
import { quoteApprovalValidator, ApprovedQuoteData } from './quote-approved'

// defining the shape of our specific SubscriptionMap
declare module 'a1pubsub' {
  interface SubscriptionMap {
    quote_approved__client_ticket_comment: SubscriptionHandler<
      ApprovedQuoteData
    >

    job_cancelled__pro_sms: SubscriptionHandler<{ example: string }>
  }
}

/* eslint-disable @typescript-eslint/camelcase */
export const eventHandlers: SubscriptionMap = {
  quote_approved__client_ticket_comment: {
    validator: quoteApprovalValidator.check,
    handler: notifyClientViaTicketComment,
  },
  job_cancelled__pro_sms: {
    validator: data => data as { example: 'testing' },
    handler: data => {
      console.log(data)
      return Promise.resolve(HandlerResult.Success)
    },
  },
}


const app = express()
const port = 3000


const myGcpProjecId = 'setter-develop-82828'

const quoteApprovedSchema = {
  quote_id: Joi.number().required()
  title: Joi.string().required(),
  client_first_name: Joi.string().required(),
}

/*
 * pretend a whole bunch of other schemas were defined here
 * such as:
 *   - jobCancelledSchema
 *   - homeConsultationCompleted
 *   - purchaseOrderCreated
 *   - jobCompleted
 *   - etc etc
 */




const ps = new PubSub(myGcpProjecId, eventHandlers)

app.post('/pubsub', async (req, res) => {
  const pubsubMessage = req.body

  // if error is undefined, then all went well
  // the PubSub module will guarantee that duplicate messages
  // are not processed again
  const error = await ps.handlePubSubMessage(pubsubMessage)

  if (error) {
    // GCP PubSub will re-enqueue the message and retry at a later point in time
    res.sendStatus(422)
  } else {
    // GCP PubSub will **PROBABLY** not send the same message again
    res.sendStatus(204)
  }
})

app.listen(port, () => console.log(`Example app listening on port ${port}!`))

Security & Authentication

Note that by default, your pubsub events are not authenticated. Please ensure that you authenticate events. More info: https://cloud.google.com/pubsub/docs/authentication

Providing Alternative State Managers

Out of the box, idempotency is implemented / managed via an in-memory hash map (using the js Map). But you can provide your own persistence mechanism so long as it implements the StateManager interface (link).

Example:

import { PubSub } from 'a1pubsub'

new PubSub('my-project-id', eventHandlers, psqlStateManager)

FAQs

Package last updated on 26 Aug 2020

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc