PubSub
This is a generic PubSub Factory exposing a listen and a emit method.
NOTE: Today, only Google Cloud PubSub has been added.
Installation
npm install --save @algoan/pubsub
Usage
Google Cloud PubSub
Run tests
To run tests or to try the PubSubFactory class, you need to have a google account and have installed gcloud sdk.
Then, to install the Google PubSub simulator, run:
gcloud components install pubsub-emulator
gcloud components install beta
gcloud components update
Start tests running:
npm test
It will launch a Google PubSub emulator thanks to the google-pubsub-emulator library.
Example
To create a PubSub instance using Google Cloud:
import { EmittedMessage, GCPubSub, PubSubFactory, Transport } from '@algoan/pubsub'
const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'test',
}
});
const topicName: string = 'some_topic';
await pubsub.listen(topicName, {
autoAck: true,
onMessage: (data: EmittedMessage<{foo: string}>) => {
console.log(data.parsedData);
},
onError: (error: Error) => {
}
});
await pubsub.emit(topicName, { foo: 'bar' });
Contribution
Thank you for your future contribution 😁 Please follow these instructions before opening a pull request!
API
PubSubFactory.create({ transport, options })
The only static method from the PubSubFactory class. It initiates a new PubSub instance depending on the transport. By default, it connects to Google Cloud PubSub.
transport: PubSub technology to use. Only GOOGLE_PUBSUB is available for now.
options: Options related to the transport.
- If
transport === Transport.GOOGLE_PUBSUB, then have a look at the Google Cloud PubSub config client.
debug: Display logs if it is set to true. It uses a pino logger and pino-pretty if NODE_ENV is not equal to production.
pinoOptions: If debug is set to true, set the pino logger options. Default to level: debug and prettyPrint: true if NODE_ENV is not equal to production.
topicsPrefix: Add a prefix to all created topics. Example: topicsPrefix: 'my-topic', all topics will begin with my-topic+{your topic name}.
topicsSeparator: Customize separator between topicsPrefix and topic name. Example: topicsSeparator: '-', all topics will be {topic prefix}-{your topic name} (default to '+').
subscriptionsPrefix: Add a prefix to all created subscriptions. Example: subscriptionsPrefix: 'my-sub', all subscriptions will begin with my-sub%{your topic name}.
subscriptionsSeparator: Customize separator between subscriptionsPrefix and topic name. Example: subscriptionsSeparator: '-', all subscriptions will be {subscription prefix}-{your topic name} (default to '%').
namespace: Add a namespace property to Message attributes when publishing on a topic.
environment: Add a environment property to Message attributes when publishing on a topic.
pubsub.listen(event, opts)
Listen to a specific event.
NOTE: It only uses the Google Cloud subscription pull delivery for now.
event: Name of the event.
opts: Options related to the Listener method
onMessage: Method called when receiving a message
onError: Method called when an error occurs
options: Option related to the chosen transport
If the chosen transport is Google Cloud PubSub, then options would be:
autoAck: Automatically ACK an event as soon as it is received (default to true)
subscriptionOptions: Options related to the created Subscription:
name: Custom name for the subscription. Default: event (also equal to the topic name)
get: Options applied to the getSubscription method (have a look at Subscription options)
sub: Options applied to the subscription instance (see also setOptions method)
create: Options applied to the createSubscription method (have a look at Create Subscription options)
deadLetterTopicName: Per-subscription override for the dead-letter topic name (see Dead Letter Topics)
topicOptions: Options applied to the created topic (have a look at Topic options)
topicName: Set the topic name. By default, it uses the default name with a prefix.
pubsub.emit(event, payload, opts)
Emit a specific event with a payload. It added attributes in the message if you have added a namespace or an environment when setting the PubSubFactory class. It also adds an _eventName and a time property in the emitted payload.
event: Name of the event to emit.
payload: Payload to send. It will be buffered by Google, and then parsed by the listen method.
opts: Options related to the Emit method
metadata: Custom metadata added to the message
options: Option related to the chosen transport
If the chosen transport is Google Cloud PubSub, then options would be:
topicOptions: Options applied to the created topic (have a look at Topic options)
publishOptions: Publish options set to the topic after its creation. Refer to Publish Options
messageOptions: Additional message options added to the message. Refer to Message Options
pubsub.unsubscribe(event)
Stop the server connection for a given subscription.
event: Name of of the event to stop listening for.
Dead Letter Topics
When deadLetterOptions is set in the constructor options, the library automatically:
- Creates the dead-letter topic (if it does not exist)
- Creates a drain subscription on the dead-letter topic (to prevent GCP from discarding undelivered messages)
- Grants
roles/pubsub.publisher on the dead-letter topic to the GCP Pub/Sub service account
- Grants
roles/pubsub.subscriber on the source subscription to the GCP Pub/Sub service account
- Applies the
deadLetterPolicy to the created subscription
This setup only happens once per new subscription — reconnecting to an already-existing subscription incurs zero overhead.
Mode 1: Per-subscription isolated dead-letter topics (recommended for multi-consumer systems)
Omit deadLetterTopicName. Each subscription automatically gets its own <subscriptionName>-deadletter topic, so failed messages from different consumers are isolated and can be replayed independently.
const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'my-project',
deadLetterOptions: {
maxDeliveryAttempts: 10,
},
},
});
await pubsub.listen('my-orders-sub');
await pubsub.listen('my-payments-sub');
Mode 2: Shared dead-letter topic (single DLT for all subscriptions)
Provide deadLetterTopicName to route all failed messages to one shared topic. The topic must already exist in GCP.
const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'my-project',
deadLetterOptions: {
deadLetterTopicName: 'projects/my-project/topics/my-dead-letter',
maxDeliveryAttempts: 5,
},
},
});
await pubsub.listen('my-orders-sub');
await pubsub.listen('my-payments-sub');
Mode 3: Per-subscription override
Override the dead-letter topic for a specific subscription via subscriptionOptions.deadLetterTopicName. Falls back to the instance-level deadLetterTopicName, then auto-derives if neither is set.
await pubsub.listen('my-special-sub', {
options: {
subscriptionOptions: {
deadLetterTopicName: 'projects/my-project/topics/special-dead-letter',
},
},
});
No dead-letter topic
If deadLetterOptions is not set, no dead-letter resources are created and no IAM calls are made — fully backward compatible.
const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'my-project',
},
});