Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

configurapi-handler-ws

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

configurapi-handler-ws - npm Package Compare versions

Comparing version
1.0.1
to
1.1.0
+4
src/index.d.ts
export * from "./streamResponse";
export * from "./postbackFunction";
export * from "./postbackResponse";
export * from "./webSocketClient";
import { PostbackResponse } from "./postbackResponse";
export function pushFunction(postBackFunction: Function, response: PostbackResponse): Promise<void>;
const { randomUUID } = require("node:crypto");
const StreamResponse = require("./streamResponse");
// tiny sleep for retry/backoff
function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
module.exports = async function pushFunction(postBackFunction, response)
{
// Helper: postToConnection with light retry on throttling
const postJSON = async (body, headers) =>
{
try
{
if(typeof body === 'object')
{
headers['Content-Type'] = 'application/json';
}
await postBackFunction.apply(undefined, [body, headers])
}
catch(err)
{
for(let i=0; i<10; i++)
{
if (err && err.name === 'GoneException') throw err; // client disconnected
if (err && err.name === 'LimitExceededException')
{
await sleep(i*250 + Math.floor(Math.random() * 250));
try
{
await postBackFunction.apply(undefined, [body, headers])
break;
}
catch(e){ err = e};
}
else throw err;
}
}
};
await new Promise((resolve, reject)=>
{
try
{
if(response instanceof StreamResponse)
{
const baseHeaders = {
...response.headers
};
const stream = response.body;
const streamId = baseHeaders?.['message-id'] || randomUUID();
// Guard: need a readable stream
if(!stream || typeof stream.on !== 'function')
{
return reject(new Error('StreamResponse.stream is not a readable stream.'));
}
// API Gateway WS frame hard limit is 32 KB; keep batches small.
const BATCH_BYTES = 1024;
let bucket = '';
let bucketBytes = 0;
let flushTimer = undefined;
const frames = []; //{statusCode:number, body:any, headers: Record<string, string>}[]
let sendingPromise = undefined; //Promise<void>|undefined
async function drain()// Promise<void>
{
if (sendingPromise) return sendingPromise; // wait for whichever send is active
sendingPromise = (async () =>
{
try
{
while (frames.length)
{
const resp = frames.shift();
if(resp)
{
await postJSON(resp.body, resp.headers);
}
}
}
finally
{
sendingPromise = undefined;
}
})();
}
async function enqueue(resp)
{
frames.push(resp);
// fire-and-forget; finish() will await completion
// eslint-disable-next-line @typescript-eslint/no-floating-promises
await drain();
}
// snapshot & reset BEFORE any await to avoid duplicate sends
async function flushChunkIfAny()
{
if (bucketBytes === 0) return;
const payload = bucket; // snapshot current buffer
bucket = '';
bucketBytes = 0;
const headers = { 'message-id': streamId, 'x-stream': 'chunk', 'x-stream-id': streamId, 'Content-Type': 'text/plain' };
const resp = {body: payload, statusCode: 200, headers};
await enqueue(resp);
}
function scheduleFlush(delayMs)
{
if (flushTimer) return;
flushTimer = setTimeout(() => {
flushTimer = undefined;
flushChunkIfAny()
.then(() =>
{
// re-arm only if there is still pending data *and* we didn't reject
if (bucketBytes > 0) scheduleFlush(50);
})
.catch(err =>
{
try { cleanup(); } catch {}
// Propagate to the outer promise so Lambda sees the failure
reject(err);
});
}, delayMs);
}
const onData = (chunk) =>
{
const text = Buffer.isBuffer(chunk) ? chunk.toString('utf8') : String(chunk);
bucket += text;
bucketBytes += Buffer.byteLength(text);
if (bucketBytes >= BATCH_BYTES || text.includes('\n'))
{
scheduleFlush(0); // flush ASAP; no overlap due to queue
}
else
{
scheduleFlush(50);
}
};
// finish by flushing tail, enqueuing 'done', and awaiting queue
async function finish()
{
if (flushTimer) { clearTimeout(flushTimer); flushTimer = undefined; }
await flushChunkIfAny();
const headers = { 'message-id': streamId, 'x-stream': 'done', 'x-stream-id': streamId };
await enqueue({body: '', statusCode: 204, headers});
await drain(); // wait until all frames are sent
}
const cleanup = () =>
{
try
{
if (flushTimer) { clearTimeout(flushTimer); flushTimer = undefined; }
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onErr);
} catch(_) {}
};
// use finish() so we never resolve before queue is empty
const onEnd = () => { finish().then(() => { cleanup(); resolve(undefined); }).catch(err => { cleanup(); reject(err); }); };
const onErr = (e) =>
{
cleanup();
return reject(e);
};
stream.on('data', onData);
stream.once('end', onEnd);
stream.once('error', onErr);
// Important: stop here; promise resolves on 'end' or rejects on 'error'
return;
}
else
{
// Non-stream response: send as-is
postJSON(response.body, response.headers)
.then(() => resolve(undefined))
.catch((err) => reject(err));
}
}
catch(e)
{
reject(e)
}
});
}
import { Response } from "configurapi";
export class PostbackResponse extends Response
{
body: any;
headers: Record<string, any>;
constructor(body: any, headers?: Record<string, any>);
}
const Response = require('configurapi').Response;
module.exports = class PostbackResponse extends Response
{
body;
headers;
constructor(body, headers = {})
{
this.body = body;
this.headers = headers;
super(body, 200, headers);
}
};
import { Response } from "configurapi";
import type { Readable } from 'node:stream';
declare class StreamResponse extends Response
{
constructor(stream: Readable, statusCode?: number, headers?: Record<string, string>);
}
export interface ClientOptions
{
/** Reject if no reply arrives within this many ms (default 15000) */
replyTimeoutMs?: number;
/** Header name to carry the correlation id (default "message-id") */
idHeaderName?: string;
/** If a response arrives without an id, deliver it here (or ignore if omitted) */
onPush?: (data: {
body: any;
statusCode: number;
headers: Record<string, string>;
}) => Promise<void>;
/**
* A single string or an array of strings representing the sub-protocol(s)
* that the client would like to use, in order of preference.
* If it is omitted, an empty array is used by default, i.e., [].
*/
protocols?: () => string | string[];
}
declare class WebSocketClient
{
constructor(endpoint: string, opts?: ClientOptions);
readonly isOpen: boolean;
connecting: boolean;
connect(): Promise<void>;
close(code?: number, reason?: string): Promise<void>;
/**
* Send IRequest and resolve when an IResponse arrives
* with the same messageId
*/
send<T>(
req: any,
opts?: {
timeoutMs?: number;
onChunk?: (chunk: string) => Promise<void>;
onDone?: () => Promise<void>;
onError?: (e: Error) => Promise<void>;
}
): Promise<T>;
}
const MessageIdHeaderName = 'message-id';
module.exports = class WebSocketClient
{
url;
ws;
connecting = false;
replyTimeoutMs;
protocols;
onPush;
// Correlate by messageId: id -> waiter
inflight = new Map();
constructor(endpoint, opts = {})
{
this.url = endpoint;
this.replyTimeoutMs = opts.replyTimeoutMs ?? 27000;
this.onPush = opts.onPush;
this.protocols = opts.protocols;
}
get isOpen()
{
return !!this.ws && this.ws.readyState === this.ws.OPEN;
}
async connect()
{
if (this.isOpen || this.connecting) return;
this.connecting = true;
await new Promise((resolve, reject) =>
{
const protocols = this.protocols?.();
this.ws = new WebSocket(this.url, protocols);
this.ws.addEventListener("open", () =>
{
this.connecting = false;
resolve();
});
this.ws.addEventListener("message", async (ev) =>
{
try
{
await this.handleResponse(ev.data);
}
catch(error)
{
reject(error)
}
});
this.ws.addEventListener("close", () =>
{
this.connecting = false;
// Fail all pending calls
for (const [id, waiter] of this.inflight) {
clearTimeout(waiter.timer);
waiter.reject(new Error(`WebSocket closed before response (id=${id})`));
}
this.inflight.clear();
});
this.ws.addEventListener("error", (err) =>
{
this.connecting = false;
if (!this.isOpen) reject(err);
});
});
}
async close(code, reason)
{
if (!this.ws) return;
await new Promise((resolve) =>
{
this.ws?.addEventListener("close", () => resolve(), { once: true });
try { this.ws.close(code, reason); } catch { resolve(); }
});
}
/** Send IRequest and resolve when an IResponse arrives with the same messageId */
async send(req, opts)
{
if (!this.isOpen) await this.connect();
const id = this.ensureMessageId(req);
// Set up the waiter BEFORE sending (avoid fast-response race)
const timeoutMs = opts?.timeoutMs ?? this.replyTimeoutMs;
const promise = new Promise((resolve, reject) =>
{
let timer = null;
const waiter = { resolve, reject, timer: null, onChunk: opts?.onChunk, onDone: opts?.onDone, onError: opts?.onError, timedOut: false, muted: false };
if (timeoutMs > 0)
{
timer = setTimeout(() => {
waiter.timedOut = true;
waiter.onChunk?.("\n\nI've reached my response time limit and couldn't complete my full answer. Could you let me know which part you'd like me to focus on?");
// Reject the promise so callers can show a warning, but keep waiter so 'done' can clean up
reject(new Error(`Timed out waiting for response (messageId=${id})`));
}, timeoutMs);
}
waiter.timer = timer;
this.inflight.set(id, waiter);
});
try
{
this.ws.send(JSON.stringify(req));
}
catch (e)
{
const waiter = this.inflight.get(id);
if (waiter) { clearTimeout(waiter.timer); this.inflight.delete(id); }
throw e;
}
return promise;
}
async handleResponse(text)
{
let msg;
try { msg = JSON.parse(text); } catch { msg = { headers: {}, payload: text }; }
const rawFlag = this.getHeaderCaseInsensitive(msg.headers, 'x-stream');
const streamFlag = rawFlag ? String(rawFlag).toLowerCase() : '';
if (streamFlag)
{
// correlate by message-id; fallback is applied only for CHUNK below
const id = this.getHeaderCaseInsensitive(msg.headers, MessageIdHeaderName);
const waiter = id ? this.inflight.get(id) : undefined;
if(msg?.statusCode > 299)
{
const error = new Error(`${msg.message} - (${msg.statusCode})`)
if(waiter)
{
await waiter.onError?.(error);
}
}
if (streamFlag === 'chunk')
{
if (waiter?.timedOut) return;
if (waiter)
{
await waiter.onChunk?.(msg.body ?? msg.payload ?? msg ?? '');
}
else if (this.onPush)
{
this.onPush(msg);
}
}
else if (streamFlag === 'done' || streamFlag === 'error')
{
if (!waiter)
{
if (this.onPush) this.onPush(msg);
return;
}
await waiter.onChunk?.(msg.body ?? msg.payload ?? '');
await waiter.onDone?.();
clearTimeout(waiter.timer);
if(id)
{
this.inflight.delete(id);
}
// We alaready rejected if timed out.
if(waiter.timedOut) return
if (streamFlag === 'error')
{
waiter.reject(msg);
}
else
{
waiter.resolve(msg);
}
return;
}
// Unknown x-stream value: treat as push to avoid nuking inflight state
if (this.onPush) this.onPush(msg);
return;
}
const id = this.getHeaderCaseInsensitive(msg.headers, MessageIdHeaderName);
if (msg?.statusCode && msg.statusCode !== 202 && id && this.inflight.has(id))
{
const waiter = this.inflight.get(id);
if(msg.statusCode > 299)
{
const error = new Error(`${msg.message} - (${msg.statusCode})`)
if(waiter)
{
await waiter.onError?.(error);
}
}
clearTimeout(waiter.timer);
this.inflight.delete(id);
if(waiter.timedOut) return // Already rejected
waiter.resolve(msg);
}
else
{
if(msg?.statusCode > 299)
{
const error = new Error(`${msg.message} - (${msg.statusCode})`)
throw error;
}
// No id or unknown id. Treat as push/unsolicited.
if (this.onPush) this.onPush(msg);
}
}
ensureMessageId(request)
{
if(request === undefined) return '';
if(request.headers === undefined)
{
request.headers = {}
}
let id = this.getHeaderCaseInsensitive(request.headers, MessageIdHeaderName);
if(!id)
{
id = crypto.randomUUID()
request.headers[MessageIdHeaderName] = id;
}
return id;
}
getHeaderCaseInsensitive(headers, name)
{
if (!headers || typeof headers !== "object") return undefined;
const target = name.toLowerCase();
for (const k of Object.keys(headers))
{
if (k.toLowerCase() === target) return String(headers[k]);
}
return undefined;
}
}
+7
-4
{
"name": "configurapi-handler-ws",
"version": "1.0.1",
"version": "1.1.0",
"main": "src/index",
"files": [
"src/*",
"types.d.ts"
"src/*.d.ts"
],
"types": "types.d.ts",
"types": "src/index.d.ts",
"repository": {

@@ -23,3 +23,6 @@ "type": "git",

"homepage": "https://gitlab.com/mappies/configurapi-handler-ws#readme",
"description": ""
"description": "",
"devDependencies": {
"@types/node": "^25.2.3"
}
}
# configurapi-handler-ws
Configurapi request handlers for websocket
## Usage
### Server side:
* If you want to stream data back to the client, return a `StreamResponse` with a readable stream.
* Use pushbackFunction(postbackFunction: Function, response: PostbackResponse) to send a response to the postback endpoint. The response will be wrapped in the specified postbackFunction.
```
const { pushbackFunction, PostbackResponse } = require("configurapi-handler-ws");
let callback = async (body:any, headers:Record<string,string>) =>
{
const isStream = body && typeof body === 'object' && typeof body.pipe === 'function';
const isBinary = typeof body === 'string' || Buffer.isBuffer(body) || body instanceof Uint8Array;
let resp = await fetch(`http://localhost:9100/@connections/${connectionId}`, {
method: 'POST',
headers,
body: isStream || isBinary ? body : JSON.stringify(body),
...(isStream ? { duplex: 'half' } : {})
});
console.log(resp.status + ' ' + resp.statusText)
console.log(await resp.json())
}
await pushbackFunction(callback, new PostbackResponse('body', {'content-type': 'application/json'}))
```
### Client side:
* Use WebSocketClient to connect to a WebSocket runner, send data over the connection, and disconnect from the runner when finished.
* To pass data (such as a protocol or authentication token) when establishing the WebSocket connection, provide it as the second parameter to the constructor.
```
const { WebSocketClient } = require("configurapi-handler-ws");
let client = new WebSocketClient(WEBSOCKET_BASE_URL, {onPush: (data:IResponse)=>console.log(data)});
```

@@ -1,4 +0,6 @@

module.exports = {
StreamResponse: require('./streamResponse'),
pushbackFunction: require('./postbackFunction'),
PostbackResponse: require('./postbackResponse'),
WebSocketClient: require('./webSocketClient')
};
import { Response } from "configurapi";
import type { Readable } from 'node:stream';
declare class StreamResponse extends Response
{
constructor(stream: Readable, statusCode?: number, headers?: Record<string, string>);
}