configurapi-handler-ws
Advanced tools
| export * from "./streamResponse"; | ||
| export * from "./postbackFunction"; | ||
| export * from "./postbackResponse"; | ||
| export * from "./webSocketClient"; |
| import { PostbackResponse } from "./postbackResponse"; | ||
| export function pushFunction(postBackFunction: Function, response: PostbackResponse): Promise<void>; |
| const { randomUUID } = require("node:crypto"); | ||
| const StreamResponse = require("./streamResponse"); | ||
| // tiny sleep for retry/backoff | ||
| function sleep(ms) { | ||
| return new Promise(r => setTimeout(r, ms)); | ||
| } | ||
| module.exports = async function pushFunction(postBackFunction, response) | ||
| { | ||
| // Helper: postToConnection with light retry on throttling | ||
| const postJSON = async (body, headers) => | ||
| { | ||
| try | ||
| { | ||
| if(typeof body === 'object') | ||
| { | ||
| headers['Content-Type'] = 'application/json'; | ||
| } | ||
| await postBackFunction.apply(undefined, [body, headers]) | ||
| } | ||
| catch(err) | ||
| { | ||
| for(let i=0; i<10; i++) | ||
| { | ||
| if (err && err.name === 'GoneException') throw err; // client disconnected | ||
| if (err && err.name === 'LimitExceededException') | ||
| { | ||
| await sleep(i*250 + Math.floor(Math.random() * 250)); | ||
| try | ||
| { | ||
| await postBackFunction.apply(undefined, [body, headers]) | ||
| break; | ||
| } | ||
| catch(e){ err = e}; | ||
| } | ||
| else throw err; | ||
| } | ||
| } | ||
| }; | ||
| await new Promise((resolve, reject)=> | ||
| { | ||
| try | ||
| { | ||
| if(response instanceof StreamResponse) | ||
| { | ||
| const baseHeaders = { | ||
| ...response.headers | ||
| }; | ||
| const stream = response.body; | ||
| const streamId = baseHeaders?.['message-id'] || randomUUID(); | ||
| // Guard: need a readable stream | ||
| if(!stream || typeof stream.on !== 'function') | ||
| { | ||
| return reject(new Error('StreamResponse.stream is not a readable stream.')); | ||
| } | ||
| // API Gateway WS frame hard limit is 32 KB; keep batches small. | ||
| const BATCH_BYTES = 1024; | ||
| let bucket = ''; | ||
| let bucketBytes = 0; | ||
| let flushTimer = undefined; | ||
| const frames = []; //{statusCode:number, body:any, headers: Record<string, string>}[] | ||
| let sendingPromise = undefined; //Promise<void>|undefined | ||
| async function drain()// Promise<void> | ||
| { | ||
| if (sendingPromise) return sendingPromise; // wait for whichever send is active | ||
| sendingPromise = (async () => | ||
| { | ||
| try | ||
| { | ||
| while (frames.length) | ||
| { | ||
| const resp = frames.shift(); | ||
| if(resp) | ||
| { | ||
| await postJSON(resp.body, resp.headers); | ||
| } | ||
| } | ||
| } | ||
| finally | ||
| { | ||
| sendingPromise = undefined; | ||
| } | ||
| })(); | ||
| } | ||
| async function enqueue(resp) | ||
| { | ||
| frames.push(resp); | ||
| // fire-and-forget; finish() will await completion | ||
| // eslint-disable-next-line @typescript-eslint/no-floating-promises | ||
| await drain(); | ||
| } | ||
| // snapshot & reset BEFORE any await to avoid duplicate sends | ||
| async function flushChunkIfAny() | ||
| { | ||
| if (bucketBytes === 0) return; | ||
| const payload = bucket; // snapshot current buffer | ||
| bucket = ''; | ||
| bucketBytes = 0; | ||
| const headers = { 'message-id': streamId, 'x-stream': 'chunk', 'x-stream-id': streamId, 'Content-Type': 'text/plain' }; | ||
| const resp = {body: payload, statusCode: 200, headers}; | ||
| await enqueue(resp); | ||
| } | ||
| function scheduleFlush(delayMs) | ||
| { | ||
| if (flushTimer) return; | ||
| flushTimer = setTimeout(() => { | ||
| flushTimer = undefined; | ||
| flushChunkIfAny() | ||
| .then(() => | ||
| { | ||
| // re-arm only if there is still pending data *and* we didn't reject | ||
| if (bucketBytes > 0) scheduleFlush(50); | ||
| }) | ||
| .catch(err => | ||
| { | ||
| try { cleanup(); } catch {} | ||
| // Propagate to the outer promise so Lambda sees the failure | ||
| reject(err); | ||
| }); | ||
| }, delayMs); | ||
| } | ||
| const onData = (chunk) => | ||
| { | ||
| const text = Buffer.isBuffer(chunk) ? chunk.toString('utf8') : String(chunk); | ||
| bucket += text; | ||
| bucketBytes += Buffer.byteLength(text); | ||
| if (bucketBytes >= BATCH_BYTES || text.includes('\n')) | ||
| { | ||
| scheduleFlush(0); // flush ASAP; no overlap due to queue | ||
| } | ||
| else | ||
| { | ||
| scheduleFlush(50); | ||
| } | ||
| }; | ||
| // finish by flushing tail, enqueuing 'done', and awaiting queue | ||
| async function finish() | ||
| { | ||
| if (flushTimer) { clearTimeout(flushTimer); flushTimer = undefined; } | ||
| await flushChunkIfAny(); | ||
| const headers = { 'message-id': streamId, 'x-stream': 'done', 'x-stream-id': streamId }; | ||
| await enqueue({body: '', statusCode: 204, headers}); | ||
| await drain(); // wait until all frames are sent | ||
| } | ||
| const cleanup = () => | ||
| { | ||
| try | ||
| { | ||
| if (flushTimer) { clearTimeout(flushTimer); flushTimer = undefined; } | ||
| stream.removeListener('data', onData); | ||
| stream.removeListener('end', onEnd); | ||
| stream.removeListener('error', onErr); | ||
| } catch(_) {} | ||
| }; | ||
| // use finish() so we never resolve before queue is empty | ||
| const onEnd = () => { finish().then(() => { cleanup(); resolve(undefined); }).catch(err => { cleanup(); reject(err); }); }; | ||
| const onErr = (e) => | ||
| { | ||
| cleanup(); | ||
| return reject(e); | ||
| }; | ||
| stream.on('data', onData); | ||
| stream.once('end', onEnd); | ||
| stream.once('error', onErr); | ||
| // Important: stop here; promise resolves on 'end' or rejects on 'error' | ||
| return; | ||
| } | ||
| else | ||
| { | ||
| // Non-stream response: send as-is | ||
| postJSON(response.body, response.headers) | ||
| .then(() => resolve(undefined)) | ||
| .catch((err) => reject(err)); | ||
| } | ||
| } | ||
| catch(e) | ||
| { | ||
| reject(e) | ||
| } | ||
| }); | ||
| } |
| import { Response } from "configurapi"; | ||
| export class PostbackResponse extends Response | ||
| { | ||
| body: any; | ||
| headers: Record<string, any>; | ||
| constructor(body: any, headers?: Record<string, any>); | ||
| } |
| const Response = require('configurapi').Response; | ||
| module.exports = class PostbackResponse extends Response | ||
| { | ||
| body; | ||
| headers; | ||
| constructor(body, headers = {}) | ||
| { | ||
| this.body = body; | ||
| this.headers = headers; | ||
| super(body, 200, headers); | ||
| } | ||
| }; |
| import { Response } from "configurapi"; | ||
| import type { Readable } from 'node:stream'; | ||
| declare class StreamResponse extends Response | ||
| { | ||
| constructor(stream: Readable, statusCode?: number, headers?: Record<string, string>); | ||
| } | ||
| export interface ClientOptions | ||
| { | ||
| /** Reject if no reply arrives within this many ms (default 15000) */ | ||
| replyTimeoutMs?: number; | ||
| /** Header name to carry the correlation id (default "message-id") */ | ||
| idHeaderName?: string; | ||
| /** If a response arrives without an id, deliver it here (or ignore if omitted) */ | ||
| onPush?: (data: { | ||
| body: any; | ||
| statusCode: number; | ||
| headers: Record<string, string>; | ||
| }) => Promise<void>; | ||
| /** | ||
| * A single string or an array of strings representing the sub-protocol(s) | ||
| * that the client would like to use, in order of preference. | ||
| * If it is omitted, an empty array is used by default, i.e., []. | ||
| */ | ||
| protocols?: () => string | string[]; | ||
| } | ||
| declare class WebSocketClient | ||
| { | ||
| constructor(endpoint: string, opts?: ClientOptions); | ||
| readonly isOpen: boolean; | ||
| connecting: boolean; | ||
| connect(): Promise<void>; | ||
| close(code?: number, reason?: string): Promise<void>; | ||
| /** | ||
| * Send IRequest and resolve when an IResponse arrives | ||
| * with the same messageId | ||
| */ | ||
| send<T>( | ||
| req: any, | ||
| opts?: { | ||
| timeoutMs?: number; | ||
| onChunk?: (chunk: string) => Promise<void>; | ||
| onDone?: () => Promise<void>; | ||
| onError?: (e: Error) => Promise<void>; | ||
| } | ||
| ): Promise<T>; | ||
| } |
| const MessageIdHeaderName = 'message-id'; | ||
| module.exports = class WebSocketClient | ||
| { | ||
| url; | ||
| ws; | ||
| connecting = false; | ||
| replyTimeoutMs; | ||
| protocols; | ||
| onPush; | ||
| // Correlate by messageId: id -> waiter | ||
| inflight = new Map(); | ||
| constructor(endpoint, opts = {}) | ||
| { | ||
| this.url = endpoint; | ||
| this.replyTimeoutMs = opts.replyTimeoutMs ?? 27000; | ||
| this.onPush = opts.onPush; | ||
| this.protocols = opts.protocols; | ||
| } | ||
| get isOpen() | ||
| { | ||
| return !!this.ws && this.ws.readyState === this.ws.OPEN; | ||
| } | ||
| async connect() | ||
| { | ||
| if (this.isOpen || this.connecting) return; | ||
| this.connecting = true; | ||
| await new Promise((resolve, reject) => | ||
| { | ||
| const protocols = this.protocols?.(); | ||
| this.ws = new WebSocket(this.url, protocols); | ||
| this.ws.addEventListener("open", () => | ||
| { | ||
| this.connecting = false; | ||
| resolve(); | ||
| }); | ||
| this.ws.addEventListener("message", async (ev) => | ||
| { | ||
| try | ||
| { | ||
| await this.handleResponse(ev.data); | ||
| } | ||
| catch(error) | ||
| { | ||
| reject(error) | ||
| } | ||
| }); | ||
| this.ws.addEventListener("close", () => | ||
| { | ||
| this.connecting = false; | ||
| // Fail all pending calls | ||
| for (const [id, waiter] of this.inflight) { | ||
| clearTimeout(waiter.timer); | ||
| waiter.reject(new Error(`WebSocket closed before response (id=${id})`)); | ||
| } | ||
| this.inflight.clear(); | ||
| }); | ||
| this.ws.addEventListener("error", (err) => | ||
| { | ||
| this.connecting = false; | ||
| if (!this.isOpen) reject(err); | ||
| }); | ||
| }); | ||
| } | ||
| async close(code, reason) | ||
| { | ||
| if (!this.ws) return; | ||
| await new Promise((resolve) => | ||
| { | ||
| this.ws?.addEventListener("close", () => resolve(), { once: true }); | ||
| try { this.ws.close(code, reason); } catch { resolve(); } | ||
| }); | ||
| } | ||
| /** Send IRequest and resolve when an IResponse arrives with the same messageId */ | ||
| async send(req, opts) | ||
| { | ||
| if (!this.isOpen) await this.connect(); | ||
| const id = this.ensureMessageId(req); | ||
| // Set up the waiter BEFORE sending (avoid fast-response race) | ||
| const timeoutMs = opts?.timeoutMs ?? this.replyTimeoutMs; | ||
| const promise = new Promise((resolve, reject) => | ||
| { | ||
| let timer = null; | ||
| const waiter = { resolve, reject, timer: null, onChunk: opts?.onChunk, onDone: opts?.onDone, onError: opts?.onError, timedOut: false, muted: false }; | ||
| if (timeoutMs > 0) | ||
| { | ||
| timer = setTimeout(() => { | ||
| waiter.timedOut = true; | ||
| waiter.onChunk?.("\n\nI've reached my response time limit and couldn't complete my full answer. Could you let me know which part you'd like me to focus on?"); | ||
| // Reject the promise so callers can show a warning, but keep waiter so 'done' can clean up | ||
| reject(new Error(`Timed out waiting for response (messageId=${id})`)); | ||
| }, timeoutMs); | ||
| } | ||
| waiter.timer = timer; | ||
| this.inflight.set(id, waiter); | ||
| }); | ||
| try | ||
| { | ||
| this.ws.send(JSON.stringify(req)); | ||
| } | ||
| catch (e) | ||
| { | ||
| const waiter = this.inflight.get(id); | ||
| if (waiter) { clearTimeout(waiter.timer); this.inflight.delete(id); } | ||
| throw e; | ||
| } | ||
| return promise; | ||
| } | ||
| async handleResponse(text) | ||
| { | ||
| let msg; | ||
| try { msg = JSON.parse(text); } catch { msg = { headers: {}, payload: text }; } | ||
| const rawFlag = this.getHeaderCaseInsensitive(msg.headers, 'x-stream'); | ||
| const streamFlag = rawFlag ? String(rawFlag).toLowerCase() : ''; | ||
| if (streamFlag) | ||
| { | ||
| // correlate by message-id; fallback is applied only for CHUNK below | ||
| const id = this.getHeaderCaseInsensitive(msg.headers, MessageIdHeaderName); | ||
| const waiter = id ? this.inflight.get(id) : undefined; | ||
| if(msg?.statusCode > 299) | ||
| { | ||
| const error = new Error(`${msg.message} - (${msg.statusCode})`) | ||
| if(waiter) | ||
| { | ||
| await waiter.onError?.(error); | ||
| } | ||
| } | ||
| if (streamFlag === 'chunk') | ||
| { | ||
| if (waiter?.timedOut) return; | ||
| if (waiter) | ||
| { | ||
| await waiter.onChunk?.(msg.body ?? msg.payload ?? msg ?? ''); | ||
| } | ||
| else if (this.onPush) | ||
| { | ||
| this.onPush(msg); | ||
| } | ||
| } | ||
| else if (streamFlag === 'done' || streamFlag === 'error') | ||
| { | ||
| if (!waiter) | ||
| { | ||
| if (this.onPush) this.onPush(msg); | ||
| return; | ||
| } | ||
| await waiter.onChunk?.(msg.body ?? msg.payload ?? ''); | ||
| await waiter.onDone?.(); | ||
| clearTimeout(waiter.timer); | ||
| if(id) | ||
| { | ||
| this.inflight.delete(id); | ||
| } | ||
| // We alaready rejected if timed out. | ||
| if(waiter.timedOut) return | ||
| if (streamFlag === 'error') | ||
| { | ||
| waiter.reject(msg); | ||
| } | ||
| else | ||
| { | ||
| waiter.resolve(msg); | ||
| } | ||
| return; | ||
| } | ||
| // Unknown x-stream value: treat as push to avoid nuking inflight state | ||
| if (this.onPush) this.onPush(msg); | ||
| return; | ||
| } | ||
| const id = this.getHeaderCaseInsensitive(msg.headers, MessageIdHeaderName); | ||
| if (msg?.statusCode && msg.statusCode !== 202 && id && this.inflight.has(id)) | ||
| { | ||
| const waiter = this.inflight.get(id); | ||
| if(msg.statusCode > 299) | ||
| { | ||
| const error = new Error(`${msg.message} - (${msg.statusCode})`) | ||
| if(waiter) | ||
| { | ||
| await waiter.onError?.(error); | ||
| } | ||
| } | ||
| clearTimeout(waiter.timer); | ||
| this.inflight.delete(id); | ||
| if(waiter.timedOut) return // Already rejected | ||
| waiter.resolve(msg); | ||
| } | ||
| else | ||
| { | ||
| if(msg?.statusCode > 299) | ||
| { | ||
| const error = new Error(`${msg.message} - (${msg.statusCode})`) | ||
| throw error; | ||
| } | ||
| // No id or unknown id. Treat as push/unsolicited. | ||
| if (this.onPush) this.onPush(msg); | ||
| } | ||
| } | ||
| ensureMessageId(request) | ||
| { | ||
| if(request === undefined) return ''; | ||
| if(request.headers === undefined) | ||
| { | ||
| request.headers = {} | ||
| } | ||
| let id = this.getHeaderCaseInsensitive(request.headers, MessageIdHeaderName); | ||
| if(!id) | ||
| { | ||
| id = crypto.randomUUID() | ||
| request.headers[MessageIdHeaderName] = id; | ||
| } | ||
| return id; | ||
| } | ||
| getHeaderCaseInsensitive(headers, name) | ||
| { | ||
| if (!headers || typeof headers !== "object") return undefined; | ||
| const target = name.toLowerCase(); | ||
| for (const k of Object.keys(headers)) | ||
| { | ||
| if (k.toLowerCase() === target) return String(headers[k]); | ||
| } | ||
| return undefined; | ||
| } | ||
| } |
+7
-4
| { | ||
| "name": "configurapi-handler-ws", | ||
| "version": "1.0.1", | ||
| "version": "1.1.0", | ||
| "main": "src/index", | ||
| "files": [ | ||
| "src/*", | ||
| "types.d.ts" | ||
| "src/*.d.ts" | ||
| ], | ||
| "types": "types.d.ts", | ||
| "types": "src/index.d.ts", | ||
| "repository": { | ||
@@ -23,3 +23,6 @@ "type": "git", | ||
| "homepage": "https://gitlab.com/mappies/configurapi-handler-ws#readme", | ||
| "description": "" | ||
| "description": "", | ||
| "devDependencies": { | ||
| "@types/node": "^25.2.3" | ||
| } | ||
| } |
+39
-0
| # configurapi-handler-ws | ||
| Configurapi request handlers for websocket | ||
| ## Usage | ||
| ### Server side: | ||
| * If you want to stream data back to the client, return a `StreamResponse` with a readable stream. | ||
| * Use pushbackFunction(postbackFunction: Function, response: PostbackResponse) to send a response to the postback endpoint. The response will be wrapped in the specified postbackFunction. | ||
| ``` | ||
| const { pushbackFunction, PostbackResponse } = require("configurapi-handler-ws"); | ||
| let callback = async (body:any, headers:Record<string,string>) => | ||
| { | ||
| const isStream = body && typeof body === 'object' && typeof body.pipe === 'function'; | ||
| const isBinary = typeof body === 'string' || Buffer.isBuffer(body) || body instanceof Uint8Array; | ||
| let resp = await fetch(`http://localhost:9100/@connections/${connectionId}`, { | ||
| method: 'POST', | ||
| headers, | ||
| body: isStream || isBinary ? body : JSON.stringify(body), | ||
| ...(isStream ? { duplex: 'half' } : {}) | ||
| }); | ||
| console.log(resp.status + ' ' + resp.statusText) | ||
| console.log(await resp.json()) | ||
| } | ||
| await pushbackFunction(callback, new PostbackResponse('body', {'content-type': 'application/json'})) | ||
| ``` | ||
| ### Client side: | ||
| * Use WebSocketClient to connect to a WebSocket runner, send data over the connection, and disconnect from the runner when finished. | ||
| * To pass data (such as a protocol or authentication token) when establishing the WebSocket connection, provide it as the second parameter to the constructor. | ||
| ``` | ||
| const { WebSocketClient } = require("configurapi-handler-ws"); | ||
| let client = new WebSocketClient(WEBSOCKET_BASE_URL, {onPush: (data:IResponse)=>console.log(data)}); | ||
| ``` |
+3
-1
@@ -1,4 +0,6 @@ | ||
| module.exports = { | ||
| StreamResponse: require('./streamResponse'), | ||
| pushbackFunction: require('./postbackFunction'), | ||
| PostbackResponse: require('./postbackResponse'), | ||
| WebSocketClient: require('./webSocketClient') | ||
| }; |
| import { Response } from "configurapi"; | ||
| import type { Readable } from 'node:stream'; | ||
| declare class StreamResponse extends Response | ||
| { | ||
| constructor(stream: Readable, statusCode?: number, headers?: Record<string, string>); | ||
| } | ||
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
20950
1744.19%12
140%484
2925%42
950%1
Infinity%1
Infinity%1
Infinity%