Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@sebspark/pubsub

Package Overview
Dependencies
Maintainers
0
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@sebspark/pubsub - npm Package Compare versions

Comparing version 0.6.3 to 1.0.0

64

dist/index.d.ts

@@ -1,47 +0,35 @@

import * as express_serve_static_core from 'express-serve-static-core';
import { ClientConfig, Message, Subscription } from '@google-cloud/pubsub';
import { schema } from 'avsc';
import { ZodTypeAny } from 'zod';
type ClientConfig = {
projectId: string;
credentials?: {
client_email: string;
private_key: string;
type CloudSchema = {
schemaId: string;
avroDefinition: string;
};
type PublisherClient<T extends Record<string, unknown>> = {
topic<K extends keyof T>(name: K, cloudSchema?: CloudSchema): {
publish<M extends T[K]>(message: M): Promise<void>;
initiate(): Promise<void>;
};
};
declare const createPublisher: <T extends Record<string, unknown>>(clientOptions?: ClientConfig | undefined) => PublisherClient<T>;
type AuthenticatedUser = {
token?: string;
type TypedMessage<T> = Omit<Message, 'data'> & {
data: T;
};
type PubSubHeaders = {
authenticatedUser?: AuthenticatedUser;
correlationId?: string;
traceparent?: string;
type SubscriptionClient<T extends Record<string, unknown>> = {
topic<K extends keyof T>(name: K): {
subscribe<M extends T[K]>(name: string, callback: (message: TypedMessage<M>) => void, options?: PubSubOptions): Promise<Subscription>;
};
};
type SubscriberArgs<T> = {
subscriberName: string;
} & SubscriberHandler<T>;
type SubscriberHandler<T> = {
onError?: (err: Error) => void | Promise<void>;
onSuccess: (msg: T, headers: PubSubHeaders) => void | Promise<void>;
type PubSubOptions = {
expirationPolicy: number;
messageRetentionDuration: number;
};
type Unsubscriber = () => void;
type Subscriber<T> = (args: SubscriberArgs<T>) => Promise<Unsubscriber>;
declare const createSubscriber: <T extends Record<string, unknown>>(clientOptions?: ClientConfig | undefined) => SubscriptionClient<T>;
interface TypeMap {
[event: string]: unknown;
}
interface PubSubTopic<Msg, Topics extends TypeMap> {
publish: (message: Msg, headers?: Record<string, unknown>, raw?: boolean) => Promise<string>;
subscribe: Subscriber<Msg>;
name: keyof Topics;
}
declare const createPubsub: <Topics extends TypeMap, SubscriberName extends string>() => {
topic: <T extends keyof Topics>(name: T, config?: ClientConfig) => PubSubTopic<Topics[T], Topics>;
subscribeToMultipleAs: (name: SubscriberName, config?: ClientConfig) => {
wait: () => Promise<Unsubscriber[]>;
subscribe: <T_1 extends keyof Topics>(topicName: T_1, { onSuccess, onError }: SubscriberHandler<Topics[keyof Topics]>) => any;
};
};
declare const zodToAvro: (name: string, zodType: ZodTypeAny, options?: {
namespace: string;
}, cache?: Map<ZodTypeAny, string>) => schema.AvroSchema;
declare const pushRouter: () => express_serve_static_core.Router;
export { type ClientConfig, type PubSubHeaders, type PubSubTopic, type Unsubscriber, createPubsub, pushRouter };
export { type PubSubOptions, type PublisherClient, type SubscriptionClient, type TypedMessage, createPublisher, createSubscriber, zodToAvro };

@@ -23,249 +23,247 @@ "use strict";

__export(src_exports, {
createPubsub: () => createPubsub,
pushRouter: () => pushRouter
createPublisher: () => createPublisher,
createSubscriber: () => createSubscriber,
zodToAvro: () => zodToAvro
});
module.exports = __toCommonJS(src_exports);
// src/lib/client.ts
// src/lib/publisher.ts
var import_pubsub = require("@google-cloud/pubsub");
var localProjectId = "local";
var clients = {};
var init = ({ projectId, credentials } = { projectId: localProjectId }) => {
if (!clients[projectId]) {
if (projectId === localProjectId) {
clients[projectId] = new import_pubsub.PubSub();
} else {
clients[projectId] = new import_pubsub.PubSub({
projectId,
credentials
});
}
var import_avsc = require("avsc");
var schemaIdPattern = /^(?!goog)[a-zA-Z][a-zA-Z0-9-._~%+]{2,254}$/;
var syncTopicSchema = async (client, cloudSchema) => {
if (!schemaIdPattern.test(cloudSchema.schemaId)) {
throw Error(
"schemaId is no in a valid format. Check google cloud platform for more information"
);
}
const schema = await client.schema(cloudSchema.schemaId);
const exits = await schemaExists(client, cloudSchema.schemaId);
if (exits) {
const data2 = await schema.get();
return data2;
}
await client.createSchema(
cloudSchema.schemaId,
import_pubsub.SchemaTypes.Avro,
cloudSchema.avroDefinition
);
const data = await schema.get();
return data;
};
var getOrCreateTopic = async (topicName, config, tries = 0) => {
init(config);
try {
const [t] = await clients[(config == null ? void 0 : config.projectId) || localProjectId].topic(topicName).get({ autoCreate: true });
return t;
} catch (err) {
if (err.code && (err == null ? void 0 : err.code) === 6 && tries < 3) {
return getOrCreateTopic(topicName, config, tries + 1);
var createOrGetTopic = async (client, name, schemaData) => {
const [topic] = await client.topic(name).get({ autoCreate: true });
if (!schemaData) {
return topic;
}
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
await topic.setMetadata({
...topicMetadata,
schemaSettings: {
encoding: "JSON",
firstRevisionId: (topicSchemaMetadata == null ? void 0 : topicSchemaMetadata.firstRevisionId) ?? schemaData.revisionId,
lastRevisionId: schemaData.revisionId,
schema: schemaData.name
}
throw err;
}
});
return topic;
};
// src/lib/publisher.ts
var publisher = (topicName, config) => async (message, headers, raw) => {
const topic = await getOrCreateTopic(topicName.toString(), config);
const msg = {
message,
headers
var createPublisher = (clientOptions) => {
const client = clientOptions ? new import_pubsub.PubSub(clientOptions) : new import_pubsub.PubSub();
let _topic;
let _type;
const ensureInitiated = async (name, schema) => {
if (!_topic) {
if (schema) {
const schemaData = await syncTopicSchema(client, schema);
_topic = await createOrGetTopic(client, name, schemaData);
}
_topic = await createOrGetTopic(client, name);
}
if (schema && !_type) {
const schemaType = import_avsc.Type.forSchema(JSON.parse(schema.avroDefinition));
_type = schemaType;
}
};
let data;
if (raw) {
data = Buffer.from(JSON.stringify(message));
} else {
data = Buffer.from(JSON.stringify(msg));
const typedClient = {
topic: (name, schema) => {
return {
initiate: async () => {
return ensureInitiated(name, schema);
},
publish: async (json) => {
await ensureInitiated(name, schema);
if (_type) {
const data = _type.toBuffer(json);
await _topic.publishMessage({ data });
} else {
await _topic.publishMessage({ json });
}
}
};
}
};
return typedClient;
};
var schemaExists = async (client, schemaId) => {
for await (const s of client.listSchemas()) {
if (s.name === schemaId) {
return true;
}
}
return topic.publishMessage({ data });
return false;
};
// src/lib/subscriber.ts
var ALREADY_EXISTS_ERROR = "6 ALREADY_EXISTS";
var subscriptions = {};
var subscriptionDefaultConfig = async () => {
const deadLetterTopicName = process.env.PUBSUB_DEAD_LETTER_TOPIC || "dead.letter.topic";
const maxDeliveryAttempts = Number.parseInt(
process.env.PUBSUB_MAX_DELIVERY_ATTEMPTS || "20",
10
);
const deadLetterTopic = (await getOrCreateTopic(deadLetterTopicName)).name;
const subscriptionOptions = {
deadLetterPolicy: {
deadLetterTopic,
maxDeliveryAttempts
var import_pubsub2 = require("@google-cloud/pubsub");
var createOrGetSubscription = async (topic, name, options) => {
const createSubscriptionOptions = {
messageRetentionDuration: {
seconds: (options == null ? void 0 : options.messageRetentionDuration) || 3600 * 24
// Default to 1 day.
},
retryPolicy: {
minimumBackoff: { seconds: 10 },
maximumBackoff: { seconds: 600 }
expirationPolicy: {
ttl: {
seconds: (options == null ? void 0 : options.expirationPolicy) || 3600 * 24 * 7
// Default to 7 days.
}
}
};
return subscriptionOptions;
};
var subscriptionPullConfig = () => {
return subscriptionDefaultConfig();
};
var subscriptionPushConfig = async () => {
if (!process.env.PUBSUB_PUSH_HOST) {
throw new Error(
"Environment variable PUBSUB_PUSH_HOST is missing and cannot set a push endpoint"
);
const [exists] = await topic.subscription(name).exists();
if (exists) {
return topic.subscription(name);
}
if (!process.env.PUBSUB_SERVICE_ACCOUNT_EMAIL) {
console.warn(
"Environment variable PUBSUB_SERVICE_ACCOUNT_EMAIL should be set if running in GCP"
);
}
const subscriptionOptions = await subscriptionDefaultConfig();
const pushEndpoint = `${process.env.PUBSUB_PUSH_HOST}/pubsub/push`;
const serviceAccountEmail = process.env.PUBSUB_SERVICE_ACCOUNT_EMAIL;
subscriptionOptions.pushConfig = {
pushEndpoint
};
if (serviceAccountEmail) {
subscriptionOptions.pushConfig.oidcToken = {
serviceAccountEmail
};
}
return subscriptionOptions;
};
var getCreateSubscriptionOptions = async () => {
switch (process.env.PUBSUB_DELIVERY_MODE) {
case "push":
return subscriptionPushConfig();
case "pull":
return subscriptionPullConfig();
default:
throw new Error(
"Environment variable PUBSUB_DELIVERY_MODE must be set to either push or pull"
);
}
};
var createOrGetSubscription = async (subscriptionName, topic) => {
let subscription;
try {
;
[subscription] = await topic.createSubscription(
subscriptionName,
await getCreateSubscriptionOptions()
);
} catch (ex) {
if (ex instanceof Error && !ex.message.startsWith(ALREADY_EXISTS_ERROR)) {
throw ex;
}
;
[subscription] = await topic.subscription(subscriptionName).get();
}
subscriptions[subscription.name] = subscription;
const [subscription] = await topic.createSubscription(
name,
createSubscriptionOptions
);
return subscription;
};
var subscriber = (topicName, config) => async ({ subscriberName, onSuccess, onError }) => {
const topic = await getOrCreateTopic(topicName.toString(), config);
const subscriptionName = `${topicName.toString()}.${subscriberName}`;
const subscription = await createOrGetSubscription(subscriptionName, topic);
const messageHandler = async (message) => {
const data = JSON.parse(message.data.toString());
const typed = {
...message,
body: data.message,
headers: {
authenticatedUser: {
token: data.identity
var createSubscriber = (clientOptions) => {
const client = clientOptions ? new import_pubsub2.PubSub(clientOptions) : new import_pubsub2.PubSub();
const typedClient = {
topic: (name) => {
let _topic;
return {
subscribe: async (subscriptionName, callback, options) => {
if (!_topic) {
_topic = client.topic(name);
}
const fullName = `${name}_${subscriptionName}`;
const subscription = await createOrGetSubscription(
_topic,
fullName,
options
);
subscription.on("message", (msg) => {
const data = JSON.parse(msg.data.toString("utf8"));
callback(Object.assign(msg, { data }));
});
return subscription;
}
}
};
if (!typed.body) {
typed.body = data;
};
}
try {
await onSuccess(typed.body, typed.headers);
message.ack();
} catch (err) {
message.nack();
console.error(err);
}
};
const errorHandler = async (err) => {
if (onError) await onError(err);
};
let messageHandlerEvent;
switch (process.env.PUBSUB_DELIVERY_MODE) {
case "push":
messageHandlerEvent = "push-message";
break;
case "pull":
messageHandlerEvent = "message";
break;
default:
throw new Error(
"Environment variable PUBSUB_DELIVERY_MODE must be set to either push or pull"
);
}
subscription.on(messageHandlerEvent, messageHandler);
subscription.on("error", errorHandler);
const unsubscriber = () => {
subscription.off(messageHandlerEvent, messageHandler);
subscription.off("error", errorHandler);
};
return unsubscriber;
return typedClient;
};
// src/lib/pubsub.ts
var createPubsub = () => {
const topic = (name, config) => {
// src/lib/zod-to-avro.ts
var import_ts_pattern = require("ts-pattern");
var import_zod = require("zod");
var zodToAvro = (name, zodType, options, cache = /* @__PURE__ */ new Map()) => {
const fqn = `${options == null ? void 0 : options.namespace}.${name}`;
if (cache.has(zodType)) {
return cache.get(zodType);
}
const retval = (0, import_ts_pattern.match)({
value: zodType
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodOptional) }, (zodObject) => {
return Array.from(
new Set(
[
"null",
zodToAvro(name, zodObject.value.unwrap(), options, cache)
].flat()
)
);
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodNullable) }, (zodObject) => {
return Array.from(
new Set(
[
"null",
zodToAvro(name, zodObject.value.unwrap(), options, cache)
].flat()
)
);
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodObject) }, (zodObject) => {
cache.set(zodObject.value, fqn);
return parseZodObjectToAvscRecord(name, zodObject.value, cache, options);
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodString) }, () => {
return "string";
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodUnion) }, (zodUnion) => {
return Array.from(
new Set(
zodUnion.value.options.flatMap(
(zodType2) => zodToAvro(name, zodType2, options, cache)
)
)
);
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodEnum) }, (zodEnum) => {
cache.set(zodEnum.value, fqn);
return {
publish: publisher(name, config),
subscribe: subscriber(name, config),
name
name,
type: "enum",
symbols: zodEnum.value.options,
doc: zodEnum.value.description,
namespace: options == null ? void 0 : options.namespace
};
};
const subscribeToMultipleAs = (name, config) => {
const promises = [];
const obj = {
wait: async () => await Promise.all(promises),
subscribe: (topicName, { onSuccess, onError }) => {
promises.push(
topic(topicName.toString(), config).subscribe({
subscriberName: name,
onSuccess,
onError
})
);
return obj;
}
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodNumber) }, () => {
return "double";
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodDate) }, () => {
return "long";
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodArray) }, (zodArray) => {
return {
type: "array",
items: zodToAvro(
`${name}-value`,
zodArray.value._def.type,
options,
cache
)
};
return obj;
};
return {
topic,
subscribeToMultipleAs
};
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodBigInt) }, () => {
return "long";
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodBoolean) }, () => {
return "boolean";
}).otherwise((v) => {
throw new Error(`Unsupported type ${v}`);
});
return retval;
};
// src/lib/router.ts
var import_express = require("express");
var import_html_escaper = require("html-escaper");
var pushRouter = () => {
const r = (0, import_express.Router)();
r.post("/pubsub/push", (req, res) => {
if (!req.body) {
const msg = "no Pub/Sub message received";
return res.status(400).send(`Bad Request: ${msg}`);
var parseZodObjectToAvscRecord = (name, zodObject, cache, options) => {
const shape = zodObject.shape;
const fields = Object.entries(shape).map((k) => {
const type = zodToAvro(k[0], k[1], options, cache);
const name2 = k[0];
const doc = k[1].description;
const fieldDef = { name: name2, type, doc };
if (type === "null" || Array.isArray(type) && type.includes("null")) {
fieldDef.default = null;
}
if (!req.body.message) {
const msg = "invalid Pub/Sub message format";
return res.status(400).send(`Bad Request: ${msg}`);
}
const pubSubMessage = req.body.message;
const decodedMessage = Buffer.from(pubSubMessage.data, "base64").toString().trim();
const subscriptionName = (0, import_html_escaper.escape)(req.body.subscription);
if (subscriptions[subscriptionName]) {
const message = {
data: Buffer.from(decodedMessage),
id: pubSubMessage.messageId,
ack: () => res.status(204).send(),
nack: () => res.status(500).send()
};
subscriptions[subscriptionName].emit("push-message", message);
} else {
res.status(404).send(`No listener for: ${subscriptionName}`);
}
return fieldDef;
});
return r;
return {
name,
type: "record",
fields,
namespace: options == null ? void 0 : options.namespace,
doc: zodObject.description
};
};
// Annotate the CommonJS export names for ESM import in node:
0 && (module.exports = {
createPubsub,
pushRouter
createPublisher,
createSubscriber,
zodToAvro
});
{
"name": "@sebspark/pubsub",
"version": "0.6.3",
"version": "1.0.0",
"license": "Apache-2.0",

@@ -25,4 +25,7 @@ "main": "dist/index.js",

"@types/html-escaper": "3.0.2",
"avsc": "^5.7.7",
"express": "4.21.0",
"html-escaper": "3.0.3"
"html-escaper": "3.0.3",
"ts-pattern": "^5.5.0",
"zod": "^3.23.8"
},

@@ -29,0 +32,0 @@ "keywords": [

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc