Security News
Fluent Assertions Faces Backlash After Abandoning Open Source Licensing
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Keep your pubsub saucy
This package is a wrapper for guaranteeing exactly-once handling of messages from Google Cloud PubSub.
> npm install a1pubsub
This package requries running in a environment with GCP Application Default Credentials. If you authenticate on your machine using the gcloud
cli, then you're good to go!
When developing on your machine, you will want to publish events and also receive them (once you've configured a subscription).
Trigger the "flow" that contains the .publish
API call. Example, if you want to trigger the quote approval pubsub event, then approve a quote.
Whatever environment (local, staging) this event was invoked in will transmit the event over to GCP PubSub.
One of three things will subsequently occur:
Let's suppose that you're working with push subscriptions. The question is then, how do you get GCP pubsub to stream events to your machine?
Steps:
$> ~/ngrok http <PORT_NUMBER>
You'll now have a url such as https://c251a9ae.ngrok.io that you can copy/paste into the GCP pubsub UI.
If your HTTP server has a request handler set up at /webhooks/pubsub
then you'll want to paste https://c251a9ae.ngrok.io/webhooks/pubsub into the GCP PubSub UI for the PubSub URL.
Setter needs one-to-many relationships between events that are published and the corresponding event handlers. i.e. We would like to emit an event such as JOB_APPROVED
, and have various services be able to do different things with those events independently.
The way GCP PubSub works is that it delivers messages to every subscriber at least once (docs). Thus, if you have a system that must only process events exactly once, you have to implement idempotence yourself.
This package is intended to abstract away the need to manage idempotence yourself.
What follows is a tldr of the actual GCP documentation located at: https://cloud.google.com/pubsub/docs
Topic:
quote_approved
events, and then a separate stream for job_scheduled
events, etc etcSubscription:
Let's do a code-a-long...
First, import the PubSub
class from a1pubsub
import { PubSub } from 'a1pubsub'
Let's instantiate the PubSub
class:
const myGcpProjecId = 'setter-develop-82828'
const subscriptionHandlers = {} // more to come here soon
const ps = new PubSub(myGcpProjecId, subscriptionHandlers)
When you instantiate PubSub
, the module will try to authenticate to gcp using Application Default Credentials.
Please take 5 minutes to familiarize yourself with ADC, as this is the only way to authenticate to GCP PubSub for the moment. Regardless, here's what you need to know when developing on your machine:
gcloud
CLI installed on your machine
gcloud
CLI to the corresponding project that you're trying to work with
PubSub
with the 'setter-develop-82828'
project - thus I must be authenticated via gcloud auth
to that project as wellNow that you've instantiated PubSub
, you can now publish events!
await ps.publish(
'quote_approved', // topic name
{ id: 1232, title: 'Window Cleaning', client_first_name: 'Jerry' } // data - must be serializeable to JSON
)
Note that any topic that you publish to must already exist! See the links above for creating topics.
Note For Setter engineers: Create topics sparringly and with good reason. Try to adhere to creating topics that represent events that have occurred. Don't create a topics that represent actions to be done. For more context, refer to this talk.
Ok back to the code along.
So in the above code snippet, you saw that PubSub
was instantiated with a subscriptionHandlers
object. The type of subscriptionHandlers
must be SubscriptionMap
.
A SubscriptionHandler
is a plain js object whose keys are strings (that represent subscription identifiers), and whose values are a object of type SubscriptionHandler
, a SubscriptionHandler
contains:
validator
: As I mentioned alredy, GCP pubsub data is schemaless. All you know is that the data is serializeable to json.
JSON
type is defined in src/json.ts
and it's just a type-level definition of JSON.parse
.handler
: The actual subscription handler, it takes your validated data and returns a promise with a boolean.
HandlerResult
value.HandlerResult.Success
: any subsequent messages that GCP pubsub might deliver will get ignored
HandlerResult.FailedToProcess
: the event will be tracked, but our system will be expecting that same event from being delivered again on a retry
So if your pubsub module needs to handle 5 subscriptions, then your SubscriptionMap
will have 5 keys, and 5 corresponding SubscriptionHandler
s.
import express from 'express'
import { PubSub } from 'a1pubsub'
import * as Joi from '@hapi/joi'
import { SubscriptionMap, HandlerResult } from 'a1pubsub'
import { notifyClientViaTicketComment } from './quote-approved/zendesk-notification'
import { quoteApprovalValidator, ApprovedQuoteData } from './quote-approved'
// defining the shape of our specific SubscriptionMap
declare module 'a1pubsub' {
interface SubscriptionMap {
quote_approved__client_ticket_comment: SubscriptionHandler<
ApprovedQuoteData
>
job_cancelled__pro_sms: SubscriptionHandler<{ example: string }>
}
}
/* eslint-disable @typescript-eslint/camelcase */
export const eventHandlers: SubscriptionMap = {
quote_approved__client_ticket_comment: {
validator: quoteApprovalValidator.check,
handler: notifyClientViaTicketComment,
},
job_cancelled__pro_sms: {
validator: data => data as { example: 'testing' },
handler: data => {
console.log(data)
return Promise.resolve(HandlerResult.Success)
},
},
}
const app = express()
const port = 3000
const myGcpProjecId = 'setter-develop-82828'
const quoteApprovedSchema = {
quote_id: Joi.number().required()
title: Joi.string().required(),
client_first_name: Joi.string().required(),
}
/*
* pretend a whole bunch of other schemas were defined here
* such as:
* - jobCancelledSchema
* - homeConsultationCompleted
* - purchaseOrderCreated
* - jobCompleted
* - etc etc
*/
const ps = new PubSub(myGcpProjecId, eventHandlers)
app.post('/pubsub', async (req, res) => {
const pubsubMessage = req.body
// if error is undefined, then all went well
// the PubSub module will guarantee that duplicate messages
// are not processed again
const error = await ps.handlePubSubMessage(pubsubMessage)
if (error) {
// GCP PubSub will re-enqueue the message and retry at a later point in time
res.sendStatus(422)
} else {
// GCP PubSub will **PROBABLY** not send the same message again
res.sendStatus(204)
}
})
app.listen(port, () => console.log(`Example app listening on port ${port}!`))
Note that by default, your pubsub events are not authenticated. Please ensure that you authenticate events. More info: https://cloud.google.com/pubsub/docs/authentication
Out of the box, idempotency is implemented / managed via an in-memory hash map (using the js Map
). But you can provide your own persistence mechanism so long as it implements the StateManager
interface (link).
Example:
import { PubSub } from 'a1pubsub'
new PubSub('my-project-id', eventHandlers, psqlStateManager)
FAQs
GCP PubSub wrapper to add idempotency
The npm package a1pubsub receives a total of 18 weekly downloads. As such, a1pubsub popularity was classified as not popular.
We found that a1pubsub demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 5 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Research
Security News
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
Security News
The UK is proposing a bold ban on ransomware payments by public entities to disrupt cybercrime, protect critical services, and lead global cybersecurity efforts.