@sebspark/pubsub
Advanced tools
Comparing version 0.6.3 to 1.0.0
@@ -1,47 +0,35 @@ | ||
import * as express_serve_static_core from 'express-serve-static-core'; | ||
import { ClientConfig, Message, Subscription } from '@google-cloud/pubsub'; | ||
import { schema } from 'avsc'; | ||
import { ZodTypeAny } from 'zod'; | ||
type ClientConfig = { | ||
projectId: string; | ||
credentials?: { | ||
client_email: string; | ||
private_key: string; | ||
type CloudSchema = { | ||
schemaId: string; | ||
avroDefinition: string; | ||
}; | ||
type PublisherClient<T extends Record<string, unknown>> = { | ||
topic<K extends keyof T>(name: K, cloudSchema?: CloudSchema): { | ||
publish<M extends T[K]>(message: M): Promise<void>; | ||
initiate(): Promise<void>; | ||
}; | ||
}; | ||
declare const createPublisher: <T extends Record<string, unknown>>(clientOptions?: ClientConfig | undefined) => PublisherClient<T>; | ||
type AuthenticatedUser = { | ||
token?: string; | ||
type TypedMessage<T> = Omit<Message, 'data'> & { | ||
data: T; | ||
}; | ||
type PubSubHeaders = { | ||
authenticatedUser?: AuthenticatedUser; | ||
correlationId?: string; | ||
traceparent?: string; | ||
type SubscriptionClient<T extends Record<string, unknown>> = { | ||
topic<K extends keyof T>(name: K): { | ||
subscribe<M extends T[K]>(name: string, callback: (message: TypedMessage<M>) => void, options?: PubSubOptions): Promise<Subscription>; | ||
}; | ||
}; | ||
type SubscriberArgs<T> = { | ||
subscriberName: string; | ||
} & SubscriberHandler<T>; | ||
type SubscriberHandler<T> = { | ||
onError?: (err: Error) => void | Promise<void>; | ||
onSuccess: (msg: T, headers: PubSubHeaders) => void | Promise<void>; | ||
type PubSubOptions = { | ||
expirationPolicy: number; | ||
messageRetentionDuration: number; | ||
}; | ||
type Unsubscriber = () => void; | ||
type Subscriber<T> = (args: SubscriberArgs<T>) => Promise<Unsubscriber>; | ||
declare const createSubscriber: <T extends Record<string, unknown>>(clientOptions?: ClientConfig | undefined) => SubscriptionClient<T>; | ||
interface TypeMap { | ||
[event: string]: unknown; | ||
} | ||
interface PubSubTopic<Msg, Topics extends TypeMap> { | ||
publish: (message: Msg, headers?: Record<string, unknown>, raw?: boolean) => Promise<string>; | ||
subscribe: Subscriber<Msg>; | ||
name: keyof Topics; | ||
} | ||
declare const createPubsub: <Topics extends TypeMap, SubscriberName extends string>() => { | ||
topic: <T extends keyof Topics>(name: T, config?: ClientConfig) => PubSubTopic<Topics[T], Topics>; | ||
subscribeToMultipleAs: (name: SubscriberName, config?: ClientConfig) => { | ||
wait: () => Promise<Unsubscriber[]>; | ||
subscribe: <T_1 extends keyof Topics>(topicName: T_1, { onSuccess, onError }: SubscriberHandler<Topics[keyof Topics]>) => any; | ||
}; | ||
}; | ||
declare const zodToAvro: (name: string, zodType: ZodTypeAny, options?: { | ||
namespace: string; | ||
}, cache?: Map<ZodTypeAny, string>) => schema.AvroSchema; | ||
declare const pushRouter: () => express_serve_static_core.Router; | ||
export { type ClientConfig, type PubSubHeaders, type PubSubTopic, type Unsubscriber, createPubsub, pushRouter }; | ||
export { type PubSubOptions, type PublisherClient, type SubscriptionClient, type TypedMessage, createPublisher, createSubscriber, zodToAvro }; |
@@ -23,249 +23,247 @@ "use strict"; | ||
__export(src_exports, { | ||
createPubsub: () => createPubsub, | ||
pushRouter: () => pushRouter | ||
createPublisher: () => createPublisher, | ||
createSubscriber: () => createSubscriber, | ||
zodToAvro: () => zodToAvro | ||
}); | ||
module.exports = __toCommonJS(src_exports); | ||
// src/lib/client.ts | ||
// src/lib/publisher.ts | ||
var import_pubsub = require("@google-cloud/pubsub"); | ||
var localProjectId = "local"; | ||
var clients = {}; | ||
var init = ({ projectId, credentials } = { projectId: localProjectId }) => { | ||
if (!clients[projectId]) { | ||
if (projectId === localProjectId) { | ||
clients[projectId] = new import_pubsub.PubSub(); | ||
} else { | ||
clients[projectId] = new import_pubsub.PubSub({ | ||
projectId, | ||
credentials | ||
}); | ||
} | ||
var import_avsc = require("avsc"); | ||
var schemaIdPattern = /^(?!goog)[a-zA-Z][a-zA-Z0-9-._~%+]{2,254}$/; | ||
var syncTopicSchema = async (client, cloudSchema) => { | ||
if (!schemaIdPattern.test(cloudSchema.schemaId)) { | ||
throw Error( | ||
"schemaId is no in a valid format. Check google cloud platform for more information" | ||
); | ||
} | ||
const schema = await client.schema(cloudSchema.schemaId); | ||
const exits = await schemaExists(client, cloudSchema.schemaId); | ||
if (exits) { | ||
const data2 = await schema.get(); | ||
return data2; | ||
} | ||
await client.createSchema( | ||
cloudSchema.schemaId, | ||
import_pubsub.SchemaTypes.Avro, | ||
cloudSchema.avroDefinition | ||
); | ||
const data = await schema.get(); | ||
return data; | ||
}; | ||
var getOrCreateTopic = async (topicName, config, tries = 0) => { | ||
init(config); | ||
try { | ||
const [t] = await clients[(config == null ? void 0 : config.projectId) || localProjectId].topic(topicName).get({ autoCreate: true }); | ||
return t; | ||
} catch (err) { | ||
if (err.code && (err == null ? void 0 : err.code) === 6 && tries < 3) { | ||
return getOrCreateTopic(topicName, config, tries + 1); | ||
var createOrGetTopic = async (client, name, schemaData) => { | ||
const [topic] = await client.topic(name).get({ autoCreate: true }); | ||
if (!schemaData) { | ||
return topic; | ||
} | ||
const [topicMetadata] = await topic.getMetadata(); | ||
const topicSchemaMetadata = topicMetadata.schemaSettings; | ||
await topic.setMetadata({ | ||
...topicMetadata, | ||
schemaSettings: { | ||
encoding: "JSON", | ||
firstRevisionId: (topicSchemaMetadata == null ? void 0 : topicSchemaMetadata.firstRevisionId) ?? schemaData.revisionId, | ||
lastRevisionId: schemaData.revisionId, | ||
schema: schemaData.name | ||
} | ||
throw err; | ||
} | ||
}); | ||
return topic; | ||
}; | ||
// src/lib/publisher.ts | ||
var publisher = (topicName, config) => async (message, headers, raw) => { | ||
const topic = await getOrCreateTopic(topicName.toString(), config); | ||
const msg = { | ||
message, | ||
headers | ||
var createPublisher = (clientOptions) => { | ||
const client = clientOptions ? new import_pubsub.PubSub(clientOptions) : new import_pubsub.PubSub(); | ||
let _topic; | ||
let _type; | ||
const ensureInitiated = async (name, schema) => { | ||
if (!_topic) { | ||
if (schema) { | ||
const schemaData = await syncTopicSchema(client, schema); | ||
_topic = await createOrGetTopic(client, name, schemaData); | ||
} | ||
_topic = await createOrGetTopic(client, name); | ||
} | ||
if (schema && !_type) { | ||
const schemaType = import_avsc.Type.forSchema(JSON.parse(schema.avroDefinition)); | ||
_type = schemaType; | ||
} | ||
}; | ||
let data; | ||
if (raw) { | ||
data = Buffer.from(JSON.stringify(message)); | ||
} else { | ||
data = Buffer.from(JSON.stringify(msg)); | ||
const typedClient = { | ||
topic: (name, schema) => { | ||
return { | ||
initiate: async () => { | ||
return ensureInitiated(name, schema); | ||
}, | ||
publish: async (json) => { | ||
await ensureInitiated(name, schema); | ||
if (_type) { | ||
const data = _type.toBuffer(json); | ||
await _topic.publishMessage({ data }); | ||
} else { | ||
await _topic.publishMessage({ json }); | ||
} | ||
} | ||
}; | ||
} | ||
}; | ||
return typedClient; | ||
}; | ||
var schemaExists = async (client, schemaId) => { | ||
for await (const s of client.listSchemas()) { | ||
if (s.name === schemaId) { | ||
return true; | ||
} | ||
} | ||
return topic.publishMessage({ data }); | ||
return false; | ||
}; | ||
// src/lib/subscriber.ts | ||
var ALREADY_EXISTS_ERROR = "6 ALREADY_EXISTS"; | ||
var subscriptions = {}; | ||
var subscriptionDefaultConfig = async () => { | ||
const deadLetterTopicName = process.env.PUBSUB_DEAD_LETTER_TOPIC || "dead.letter.topic"; | ||
const maxDeliveryAttempts = Number.parseInt( | ||
process.env.PUBSUB_MAX_DELIVERY_ATTEMPTS || "20", | ||
10 | ||
); | ||
const deadLetterTopic = (await getOrCreateTopic(deadLetterTopicName)).name; | ||
const subscriptionOptions = { | ||
deadLetterPolicy: { | ||
deadLetterTopic, | ||
maxDeliveryAttempts | ||
var import_pubsub2 = require("@google-cloud/pubsub"); | ||
var createOrGetSubscription = async (topic, name, options) => { | ||
const createSubscriptionOptions = { | ||
messageRetentionDuration: { | ||
seconds: (options == null ? void 0 : options.messageRetentionDuration) || 3600 * 24 | ||
// Default to 1 day. | ||
}, | ||
retryPolicy: { | ||
minimumBackoff: { seconds: 10 }, | ||
maximumBackoff: { seconds: 600 } | ||
expirationPolicy: { | ||
ttl: { | ||
seconds: (options == null ? void 0 : options.expirationPolicy) || 3600 * 24 * 7 | ||
// Default to 7 days. | ||
} | ||
} | ||
}; | ||
return subscriptionOptions; | ||
}; | ||
var subscriptionPullConfig = () => { | ||
return subscriptionDefaultConfig(); | ||
}; | ||
var subscriptionPushConfig = async () => { | ||
if (!process.env.PUBSUB_PUSH_HOST) { | ||
throw new Error( | ||
"Environment variable PUBSUB_PUSH_HOST is missing and cannot set a push endpoint" | ||
); | ||
const [exists] = await topic.subscription(name).exists(); | ||
if (exists) { | ||
return topic.subscription(name); | ||
} | ||
if (!process.env.PUBSUB_SERVICE_ACCOUNT_EMAIL) { | ||
console.warn( | ||
"Environment variable PUBSUB_SERVICE_ACCOUNT_EMAIL should be set if running in GCP" | ||
); | ||
} | ||
const subscriptionOptions = await subscriptionDefaultConfig(); | ||
const pushEndpoint = `${process.env.PUBSUB_PUSH_HOST}/pubsub/push`; | ||
const serviceAccountEmail = process.env.PUBSUB_SERVICE_ACCOUNT_EMAIL; | ||
subscriptionOptions.pushConfig = { | ||
pushEndpoint | ||
}; | ||
if (serviceAccountEmail) { | ||
subscriptionOptions.pushConfig.oidcToken = { | ||
serviceAccountEmail | ||
}; | ||
} | ||
return subscriptionOptions; | ||
}; | ||
var getCreateSubscriptionOptions = async () => { | ||
switch (process.env.PUBSUB_DELIVERY_MODE) { | ||
case "push": | ||
return subscriptionPushConfig(); | ||
case "pull": | ||
return subscriptionPullConfig(); | ||
default: | ||
throw new Error( | ||
"Environment variable PUBSUB_DELIVERY_MODE must be set to either push or pull" | ||
); | ||
} | ||
}; | ||
var createOrGetSubscription = async (subscriptionName, topic) => { | ||
let subscription; | ||
try { | ||
; | ||
[subscription] = await topic.createSubscription( | ||
subscriptionName, | ||
await getCreateSubscriptionOptions() | ||
); | ||
} catch (ex) { | ||
if (ex instanceof Error && !ex.message.startsWith(ALREADY_EXISTS_ERROR)) { | ||
throw ex; | ||
} | ||
; | ||
[subscription] = await topic.subscription(subscriptionName).get(); | ||
} | ||
subscriptions[subscription.name] = subscription; | ||
const [subscription] = await topic.createSubscription( | ||
name, | ||
createSubscriptionOptions | ||
); | ||
return subscription; | ||
}; | ||
var subscriber = (topicName, config) => async ({ subscriberName, onSuccess, onError }) => { | ||
const topic = await getOrCreateTopic(topicName.toString(), config); | ||
const subscriptionName = `${topicName.toString()}.${subscriberName}`; | ||
const subscription = await createOrGetSubscription(subscriptionName, topic); | ||
const messageHandler = async (message) => { | ||
const data = JSON.parse(message.data.toString()); | ||
const typed = { | ||
...message, | ||
body: data.message, | ||
headers: { | ||
authenticatedUser: { | ||
token: data.identity | ||
var createSubscriber = (clientOptions) => { | ||
const client = clientOptions ? new import_pubsub2.PubSub(clientOptions) : new import_pubsub2.PubSub(); | ||
const typedClient = { | ||
topic: (name) => { | ||
let _topic; | ||
return { | ||
subscribe: async (subscriptionName, callback, options) => { | ||
if (!_topic) { | ||
_topic = client.topic(name); | ||
} | ||
const fullName = `${name}_${subscriptionName}`; | ||
const subscription = await createOrGetSubscription( | ||
_topic, | ||
fullName, | ||
options | ||
); | ||
subscription.on("message", (msg) => { | ||
const data = JSON.parse(msg.data.toString("utf8")); | ||
callback(Object.assign(msg, { data })); | ||
}); | ||
return subscription; | ||
} | ||
} | ||
}; | ||
if (!typed.body) { | ||
typed.body = data; | ||
}; | ||
} | ||
try { | ||
await onSuccess(typed.body, typed.headers); | ||
message.ack(); | ||
} catch (err) { | ||
message.nack(); | ||
console.error(err); | ||
} | ||
}; | ||
const errorHandler = async (err) => { | ||
if (onError) await onError(err); | ||
}; | ||
let messageHandlerEvent; | ||
switch (process.env.PUBSUB_DELIVERY_MODE) { | ||
case "push": | ||
messageHandlerEvent = "push-message"; | ||
break; | ||
case "pull": | ||
messageHandlerEvent = "message"; | ||
break; | ||
default: | ||
throw new Error( | ||
"Environment variable PUBSUB_DELIVERY_MODE must be set to either push or pull" | ||
); | ||
} | ||
subscription.on(messageHandlerEvent, messageHandler); | ||
subscription.on("error", errorHandler); | ||
const unsubscriber = () => { | ||
subscription.off(messageHandlerEvent, messageHandler); | ||
subscription.off("error", errorHandler); | ||
}; | ||
return unsubscriber; | ||
return typedClient; | ||
}; | ||
// src/lib/pubsub.ts | ||
var createPubsub = () => { | ||
const topic = (name, config) => { | ||
// src/lib/zod-to-avro.ts | ||
var import_ts_pattern = require("ts-pattern"); | ||
var import_zod = require("zod"); | ||
var zodToAvro = (name, zodType, options, cache = /* @__PURE__ */ new Map()) => { | ||
const fqn = `${options == null ? void 0 : options.namespace}.${name}`; | ||
if (cache.has(zodType)) { | ||
return cache.get(zodType); | ||
} | ||
const retval = (0, import_ts_pattern.match)({ | ||
value: zodType | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodOptional) }, (zodObject) => { | ||
return Array.from( | ||
new Set( | ||
[ | ||
"null", | ||
zodToAvro(name, zodObject.value.unwrap(), options, cache) | ||
].flat() | ||
) | ||
); | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodNullable) }, (zodObject) => { | ||
return Array.from( | ||
new Set( | ||
[ | ||
"null", | ||
zodToAvro(name, zodObject.value.unwrap(), options, cache) | ||
].flat() | ||
) | ||
); | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodObject) }, (zodObject) => { | ||
cache.set(zodObject.value, fqn); | ||
return parseZodObjectToAvscRecord(name, zodObject.value, cache, options); | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodString) }, () => { | ||
return "string"; | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodUnion) }, (zodUnion) => { | ||
return Array.from( | ||
new Set( | ||
zodUnion.value.options.flatMap( | ||
(zodType2) => zodToAvro(name, zodType2, options, cache) | ||
) | ||
) | ||
); | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodEnum) }, (zodEnum) => { | ||
cache.set(zodEnum.value, fqn); | ||
return { | ||
publish: publisher(name, config), | ||
subscribe: subscriber(name, config), | ||
name | ||
name, | ||
type: "enum", | ||
symbols: zodEnum.value.options, | ||
doc: zodEnum.value.description, | ||
namespace: options == null ? void 0 : options.namespace | ||
}; | ||
}; | ||
const subscribeToMultipleAs = (name, config) => { | ||
const promises = []; | ||
const obj = { | ||
wait: async () => await Promise.all(promises), | ||
subscribe: (topicName, { onSuccess, onError }) => { | ||
promises.push( | ||
topic(topicName.toString(), config).subscribe({ | ||
subscriberName: name, | ||
onSuccess, | ||
onError | ||
}) | ||
); | ||
return obj; | ||
} | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodNumber) }, () => { | ||
return "double"; | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodDate) }, () => { | ||
return "long"; | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodArray) }, (zodArray) => { | ||
return { | ||
type: "array", | ||
items: zodToAvro( | ||
`${name}-value`, | ||
zodArray.value._def.type, | ||
options, | ||
cache | ||
) | ||
}; | ||
return obj; | ||
}; | ||
return { | ||
topic, | ||
subscribeToMultipleAs | ||
}; | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodBigInt) }, () => { | ||
return "long"; | ||
}).with({ value: import_ts_pattern.P.instanceOf(import_zod.ZodBoolean) }, () => { | ||
return "boolean"; | ||
}).otherwise((v) => { | ||
throw new Error(`Unsupported type ${v}`); | ||
}); | ||
return retval; | ||
}; | ||
// src/lib/router.ts | ||
var import_express = require("express"); | ||
var import_html_escaper = require("html-escaper"); | ||
var pushRouter = () => { | ||
const r = (0, import_express.Router)(); | ||
r.post("/pubsub/push", (req, res) => { | ||
if (!req.body) { | ||
const msg = "no Pub/Sub message received"; | ||
return res.status(400).send(`Bad Request: ${msg}`); | ||
var parseZodObjectToAvscRecord = (name, zodObject, cache, options) => { | ||
const shape = zodObject.shape; | ||
const fields = Object.entries(shape).map((k) => { | ||
const type = zodToAvro(k[0], k[1], options, cache); | ||
const name2 = k[0]; | ||
const doc = k[1].description; | ||
const fieldDef = { name: name2, type, doc }; | ||
if (type === "null" || Array.isArray(type) && type.includes("null")) { | ||
fieldDef.default = null; | ||
} | ||
if (!req.body.message) { | ||
const msg = "invalid Pub/Sub message format"; | ||
return res.status(400).send(`Bad Request: ${msg}`); | ||
} | ||
const pubSubMessage = req.body.message; | ||
const decodedMessage = Buffer.from(pubSubMessage.data, "base64").toString().trim(); | ||
const subscriptionName = (0, import_html_escaper.escape)(req.body.subscription); | ||
if (subscriptions[subscriptionName]) { | ||
const message = { | ||
data: Buffer.from(decodedMessage), | ||
id: pubSubMessage.messageId, | ||
ack: () => res.status(204).send(), | ||
nack: () => res.status(500).send() | ||
}; | ||
subscriptions[subscriptionName].emit("push-message", message); | ||
} else { | ||
res.status(404).send(`No listener for: ${subscriptionName}`); | ||
} | ||
return fieldDef; | ||
}); | ||
return r; | ||
return { | ||
name, | ||
type: "record", | ||
fields, | ||
namespace: options == null ? void 0 : options.namespace, | ||
doc: zodObject.description | ||
}; | ||
}; | ||
// Annotate the CommonJS export names for ESM import in node: | ||
0 && (module.exports = { | ||
createPubsub, | ||
pushRouter | ||
createPublisher, | ||
createSubscriber, | ||
zodToAvro | ||
}); |
{ | ||
"name": "@sebspark/pubsub", | ||
"version": "0.6.3", | ||
"version": "1.0.0", | ||
"license": "Apache-2.0", | ||
@@ -25,4 +25,7 @@ "main": "dist/index.js", | ||
"@types/html-escaper": "3.0.2", | ||
"avsc": "^5.7.7", | ||
"express": "4.21.0", | ||
"html-escaper": "3.0.3" | ||
"html-escaper": "3.0.3", | ||
"ts-pattern": "^5.5.0", | ||
"zod": "^3.23.8" | ||
}, | ||
@@ -29,0 +32,0 @@ "keywords": [ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 5 instances in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
30751
549
1
1
8
+ Addedavsc@^5.7.7
+ Addedts-pattern@^5.5.0
+ Addedzod@^3.23.8
+ Addedavsc@5.7.7(transitive)
+ Addedts-pattern@5.5.0(transitive)
+ Addedzod@3.23.8(transitive)