@cortexkit/anthropic-auth-core
Advanced tools
+14
-0
@@ -97,2 +97,10 @@ import { type Cache1hMode } from './constants.ts'; | ||
| }; | ||
| /** | ||
| * Zero out Anthropic OAuth model costs in the provider hook. Default: enabled | ||
| * (OAuth usage is quota-based, not per-token billed, so costs show as $0). | ||
| * Set `enabled: false` to opt out and display the provider's real model costs. | ||
| */ | ||
| costZeroing?: { | ||
| enabled?: boolean; | ||
| }; | ||
| cacheKeep?: { | ||
@@ -113,2 +121,8 @@ enabled?: boolean; | ||
| }; | ||
| /** | ||
| * Whether Anthropic OAuth model costs should be zeroed in the provider hook. | ||
| * Defaults to enabled; only an explicit `costZeroing.enabled === false` opts out | ||
| * (to display the provider's real model costs). | ||
| */ | ||
| export declare function isCostZeroingEnabled(storage: Pick<AccountStorage, 'costZeroing'>): boolean; | ||
| export type AccountRuntimeEntry = Partial<Pick<OAuthAccount, 'access' | 'refresh' | 'expires' | 'lastUsed' | 'lastRefreshedAt' | 'lastRefreshError' | 'lastQuotaRefreshError' | 'quota'> & Pick<ApiKeyAccount, 'apiKey' | 'lastUsed'>>; | ||
@@ -115,0 +129,0 @@ export type AccountRuntimeState = { |
+10
-0
@@ -33,2 +33,10 @@ import { createHash, randomUUID } from 'node:crypto'; | ||
| } | ||
| /** | ||
| * Whether Anthropic OAuth model costs should be zeroed in the provider hook. | ||
| * Defaults to enabled; only an explicit `costZeroing.enabled === false` opts out | ||
| * (to display the provider's real model costs). | ||
| */ | ||
| export function isCostZeroingEnabled(storage) { | ||
| return storage.costZeroing?.enabled !== false; | ||
| } | ||
| const DEFAULT_FALLBACK_ON = [401, 403, 429]; | ||
@@ -180,2 +188,3 @@ const MIN_REFRESH_BEFORE_EXPIRY_MINUTES = 240; | ||
| claudeFast: isRecord(value.claudeFast) ? value.claudeFast : undefined, | ||
| costZeroing: isRecord(value.costZeroing) ? value.costZeroing : undefined, | ||
| cacheKeep: isRecord(value.cacheKeep) ? value.cacheKeep : undefined, | ||
@@ -313,2 +322,3 @@ relay: isRecord(value.relay) ? value.relay : undefined, | ||
| claudeFast: storage.claudeFast, | ||
| costZeroing: storage.costZeroing, | ||
| cacheKeep: storage.cacheKeep, | ||
@@ -315,0 +325,0 @@ relay: storage.relay, |
+1
-1
@@ -28,2 +28,2 @@ import type { AccountStorage } from './accounts.ts'; | ||
| }): Promise<Response>; | ||
| export declare const WORKER_SCRIPT = "\nasync function hashBody(body) {\n const bytes = new TextEncoder().encode(body)\n const digest = await crypto.subtle.digest('SHA-256', bytes)\n const hex = [...new Uint8Array(digest)].map((b) => b.toString(16).padStart(2, '0')).join('')\n return 'sha256:' + hex\n}\n\nfunction applyPatch(base, patch) {\n return base.slice(0, patch.start) + patch.insert + base.slice(patch.start + patch.deleteCount)\n}\n\nfunction applyPatchSet(base, patch) {\n if (!Array.isArray(patch)) return applyPatch(base, patch)\n if (patch.length === 0) return base\n\n let result = ''\n let cursor = 0\n for (const item of patch) {\n if (item.start < cursor) return { error: 'overlapping patch', status: 400 }\n result += base.slice(cursor, item.start)\n result += item.insert\n cursor = item.start + item.deleteCount\n }\n return result + base.slice(cursor)\n}\n\nasync function readState(env, affinity) {\n const raw = await env.RELAY_STATE.get('session:' + affinity)\n return raw ? JSON.parse(raw) : null\n}\n\nasync function writeState(env, affinity, state) {\n await env.RELAY_STATE.put('session:' + affinity, JSON.stringify(state), { expirationTtl: 86400 })\n}\n\nasync function resolveBody(env, payload) {\n if (payload.mode === 'full_sync') return payload.body\n if (payload.mode === 'patch') {\n const state = await readState(env, payload.affinity)\n if (!state || state.hash !== payload.base_hash) {\n return { error: 'state mismatch', status: 409 }\n }\n return applyPatchSet(state.body, payload.patch)\n }\n return { error: 'unknown mode', status: 400 }\n}\n\nasync function prepareUpstream(env, payload) {\n if ((payload.protocol !== 1 && payload.protocol !== 2) || payload.type !== 'request' || !payload.affinity || !payload.upstream?.url || !payload.next_hash) {\n return { error: 'invalid payload', status: 400 }\n }\n\n const body = await resolveBody(env, payload)\n if (body && typeof body === 'object' && 'error' in body) return body\n\n if (typeof body !== 'string' || (await hashBody(body)) !== payload.next_hash) {\n return { error: 'hash mismatch', status: 409 }\n }\n\n const stateWrite = writeState(env, payload.affinity, { body, hash: payload.next_hash, revision: payload.revision }).catch(() => {})\n console.log(JSON.stringify({\n relay: 'opencode-anthropic-auth',\n transport: 'relay',\n mode: payload.mode,\n revision: payload.revision,\n affinity: String(payload.affinity).slice(0, 12),\n bodyBytes: body.length,\n }))\n\n return { body, stateWrite }\n}\n\nfunction resolveBodyFromState(state, payload) {\n if (payload.mode === 'full_sync') return payload.body\n if (payload.mode === 'patch') {\n if (!state || state.hash !== payload.base_hash) {\n return { error: 'state mismatch', status: 409 }\n }\n return applyPatchSet(state.body, payload.patch)\n }\n return { error: 'unknown mode', status: 400 }\n}\n\nfunction deferWorkerTask(task) {\n return new Promise((resolve) => setTimeout(resolve, 0)).then(task)\n}\n\nfunction checkpointWebSocketState(env, payload, body, nextState) {\n return deferWorkerTask(async () => {\n try {\n if ((await hashBody(body)) !== payload.next_hash) {\n throw new Error('hash mismatch')\n }\n await writeState(env, payload.affinity, nextState)\n } catch (error) {\n console.log(JSON.stringify({\n relay: 'opencode-anthropic-auth',\n transport: 'websocket',\n event: 'checkpoint_write_failed',\n affinity: String(payload.affinity).slice(0, 12),\n message: error instanceof Error ? error.message : String(error),\n }))\n }\n })\n}\n\nasync function prepareWebSocketUpstream(env, state, payload) {\n if (payload.protocol !== 2 || payload.type !== 'request' || !payload.id || !payload.affinity || !payload.upstream?.url || !payload.next_hash) {\n return { error: 'invalid payload', status: 400 }\n }\n\n const body = resolveBodyFromState(state, payload)\n if (body && typeof body === 'object' && 'error' in body) return body\n if (typeof body !== 'string') return { error: 'invalid body', status: 400 }\n if ((await hashBody(body)) !== payload.next_hash) {\n return { error: 'hash mismatch', status: 409 }\n }\n\n const nextState = { body, hash: payload.next_hash, revision: payload.revision }\n const checkpoint = checkpointWebSocketState(env, payload, body, nextState)\n const logAccepted = () => console.log(JSON.stringify({\n relay: 'opencode-anthropic-auth',\n transport: 'websocket',\n mode: payload.mode,\n revision: payload.revision,\n affinity: String(payload.affinity).slice(0, 12),\n bodyBytes: body.length,\n }))\n\n return { body, state: nextState, checkpoint, logAccepted }\n}\n\nfunction headersToObject(headers) {\n const result = {}\n for (const [key, value] of headers.entries()) result[key] = value\n return result\n}\n\nasync function handleRelayPayload(env, payload) {\n const prepared = await prepareUpstream(env, payload)\n if (prepared.error) return prepared\n const upstream = await fetch(payload.upstream.url, {\n method: payload.upstream.method || 'POST',\n headers: payload.upstream.headers,\n body: prepared.body,\n })\n return { upstream, stateWrite: prepared.stateWrite }\n}\n\nasync function handleWebSocket(socket, env, ctx, payload, getState, setState) {\n const heartbeat = setInterval(() => {\n try {\n socket.send(JSON.stringify({ protocol: 2, type: 'keepalive' }))\n } catch {\n clearInterval(heartbeat)\n }\n }, 15000)\n try {\n const result = await prepareWebSocketUpstream(env, getState(), payload)\n if (result.error) {\n socket.send(JSON.stringify({ protocol: 2, type: 'error', id: payload.id, status: result.status, message: result.error }))\n return\n }\n\n setState(result.state)\n socket.send(JSON.stringify({ protocol: 2, type: 'accepted', id: payload.id, hash: result.state.hash, revision: result.state.revision }))\n ctx?.waitUntil?.(deferWorkerTask(result.logAccepted))\n\n const upstreamPromise = fetch(payload.upstream.url, {\n method: payload.upstream.method || 'POST',\n headers: payload.upstream.headers,\n body: result.body,\n })\n ctx?.waitUntil?.(result.checkpoint)\n const upstream = await upstreamPromise\n socket.send(JSON.stringify({\n protocol: 2,\n type: 'response_start',\n id: payload.id,\n status: upstream.status,\n statusText: upstream.statusText,\n headers: headersToObject(upstream.headers),\n }))\n\n const reader = upstream.body?.getReader()\n if (reader) {\n while (true) {\n const { done, value } = await reader.read()\n if (done) break\n socket.send(value)\n }\n }\n socket.send(JSON.stringify({ protocol: 2, type: 'done', id: payload.id }))\n } catch (error) {\n socket.send(JSON.stringify({ protocol: 2, type: 'error', id: payload.id, status: 500, message: error instanceof Error ? error.message : String(error) }))\n } finally {\n clearInterval(heartbeat)\n }\n}\n\nexport default {\n async fetch(request, env, ctx) {\n if (request.headers.get('Upgrade') === 'websocket') {\n const url = new URL(request.url)\n const token = url.searchParams.get('token')\n const affinity = url.searchParams.get('affinity')\n if (token !== env.RELAY_TOKEN) return new Response('unauthorized', { status: 401 })\n if (!affinity) return new Response('missing affinity', { status: 400 })\n const pair = new WebSocketPair()\n const client = pair[0]\n const server = pair[1]\n server.binaryType = 'arraybuffer'\n let state = null\n let ready = false\n server.accept()\n const loadState = readState(env, affinity).then((loadedState) => {\n state = loadedState\n ready = true\n server.send(JSON.stringify({\n protocol: 2,\n type: 'ready',\n state: state ? { hash: state.hash, revision: state.revision } : null,\n }))\n }).catch((error) => {\n server.send(JSON.stringify({ protocol: 2, type: 'error', status: 500, message: error instanceof Error ? error.message : String(error) }))\n server.close(1011, 'state load failed')\n })\n ctx?.waitUntil?.(loadState)\n if (!ctx?.waitUntil) void loadState\n let busy = false\n server.addEventListener('message', (event) => {\n if (!ready) {\n server.send(JSON.stringify({ protocol: 2, type: 'error', status: 425, message: 'relay state is not ready' }))\n return\n }\n if (busy) {\n server.send(JSON.stringify({ protocol: 2, type: 'error', status: 429, message: 'request already in flight' }))\n return\n }\n let payload\n try {\n payload = JSON.parse(event.data)\n } catch {\n server.send(JSON.stringify({ protocol: 2, type: 'error', status: 400, message: 'invalid JSON payload' }))\n return\n }\n payload.affinity = affinity\n busy = true\n const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }).finally(() => { busy = false })\n ctx?.waitUntil?.(run)\n if (!ctx?.waitUntil) void run\n })\n return new Response(null, { status: 101, webSocket: client })\n }\n\n if (request.method === 'GET') {\n return Response.json({ status: 'ok', transports: ['http', 'websocket'] })\n }\n if (request.method !== 'POST') return new Response('method not allowed', { status: 405 })\n if (request.headers.get('x-relay-token') !== env.RELAY_TOKEN) {\n return new Response('unauthorized', { status: 401 })\n }\n\n try {\n const payload = await request.json()\n const result = await handleRelayPayload(env, payload)\n if (result.error) return Response.json({ error: result.error }, { status: result.status })\n\n if (result.stateWrite) ctx.waitUntil(result.stateWrite)\n\n const upstream = result.upstream\n return new Response(upstream.body, {\n status: upstream.status,\n statusText: upstream.statusText,\n headers: upstream.headers,\n })\n } catch (error) {\n return Response.json(\n { error: error instanceof Error ? error.message : 'internal relay error' },\n { status: 502 },\n )\n }\n },\n}\n"; | ||
| export declare const WORKER_SCRIPT = "\nasync function hashBody(body) {\n const bytes = new TextEncoder().encode(body)\n const digest = await crypto.subtle.digest('SHA-256', bytes)\n const hex = [...new Uint8Array(digest)].map((b) => b.toString(16).padStart(2, '0')).join('')\n return 'sha256:' + hex\n}\n\nfunction applyPatch(base, patch) {\n return base.slice(0, patch.start) + patch.insert + base.slice(patch.start + patch.deleteCount)\n}\n\nfunction applyPatchSet(base, patch) {\n if (!Array.isArray(patch)) return applyPatch(base, patch)\n if (patch.length === 0) return base\n\n let result = ''\n let cursor = 0\n for (const item of patch) {\n if (item.start < cursor) return { error: 'overlapping patch', status: 400 }\n result += base.slice(cursor, item.start)\n result += item.insert\n cursor = item.start + item.deleteCount\n }\n return result + base.slice(cursor)\n}\n\nasync function readState(env, affinity) {\n const raw = await env.RELAY_STATE.get('session:' + affinity)\n return raw ? JSON.parse(raw) : null\n}\n\nasync function writeState(env, affinity, state) {\n await env.RELAY_STATE.put('session:' + affinity, JSON.stringify(state), { expirationTtl: 86400 })\n}\n\nasync function resolveBody(env, payload) {\n if (payload.mode === 'full_sync') return payload.body\n if (payload.mode === 'patch') {\n const state = await readState(env, payload.affinity)\n if (!state || state.hash !== payload.base_hash) {\n return { error: 'state mismatch', status: 409 }\n }\n return applyPatchSet(state.body, payload.patch)\n }\n return { error: 'unknown mode', status: 400 }\n}\n\nasync function prepareUpstream(env, payload) {\n if ((payload.protocol !== 1 && payload.protocol !== 2) || payload.type !== 'request' || !payload.affinity || !payload.upstream?.url || !payload.next_hash) {\n return { error: 'invalid payload', status: 400 }\n }\n\n const body = await resolveBody(env, payload)\n if (body && typeof body === 'object' && 'error' in body) return body\n\n if (typeof body !== 'string' || (await hashBody(body)) !== payload.next_hash) {\n return { error: 'hash mismatch', status: 409 }\n }\n\n const stateWrite = writeState(env, payload.affinity, { body, hash: payload.next_hash, revision: payload.revision }).catch(() => {})\n console.log(JSON.stringify({\n relay: 'opencode-anthropic-auth',\n transport: 'relay',\n mode: payload.mode,\n revision: payload.revision,\n affinity: String(payload.affinity).slice(0, 12),\n bodyBytes: body.length,\n }))\n\n return { body, stateWrite }\n}\n\nfunction resolveBodyFromState(state, payload) {\n if (payload.mode === 'full_sync') return payload.body\n if (payload.mode === 'patch') {\n if (!state || state.hash !== payload.base_hash) {\n return { error: 'state mismatch', status: 409 }\n }\n return applyPatchSet(state.body, payload.patch)\n }\n return { error: 'unknown mode', status: 400 }\n}\n\nfunction deferWorkerTask(task) {\n return new Promise((resolve) => setTimeout(resolve, 0)).then(task)\n}\n\nconst WEBSOCKET_STREAM_FLUSH_BYTES = 1024\nconst WEBSOCKET_STREAM_FLUSH_MS = 20\n\nfunction createWebSocketStreamCoalescer(socket, options = {}) {\n const flushBytes = options.flushBytes || WEBSOCKET_STREAM_FLUSH_BYTES\n const flushMs = options.flushMs ?? WEBSOCKET_STREAM_FLUSH_MS\n let chunks = []\n let byteLength = 0\n let flushTimer = null\n const stats = {\n upstreamChunks: 0,\n upstreamBytes: 0,\n frames: 0,\n frameBytes: 0,\n }\n\n const clearFlushTimer = () => {\n if (flushTimer === null) return\n clearTimeout(flushTimer)\n flushTimer = null\n }\n\n const mergeChunks = () => {\n if (chunks.length === 1) return chunks[0]\n const frame = new Uint8Array(byteLength)\n let offset = 0\n for (const chunk of chunks) {\n frame.set(chunk, offset)\n offset += chunk.byteLength\n }\n return frame\n }\n\n const flush = () => {\n clearFlushTimer()\n if (byteLength === 0) return\n const frame = mergeChunks()\n chunks = []\n byteLength = 0\n socket.send(frame)\n stats.frames += 1\n stats.frameBytes += frame.byteLength\n }\n\n const scheduleFlush = () => {\n if (flushTimer !== null) return\n flushTimer = setTimeout(flush, flushMs)\n }\n\n return {\n push(value) {\n const chunk = value instanceof Uint8Array ? value : new Uint8Array(value)\n if (chunk.byteLength === 0) return\n stats.upstreamChunks += 1\n stats.upstreamBytes += chunk.byteLength\n chunks.push(chunk)\n byteLength += chunk.byteLength\n if (byteLength >= flushBytes) flush()\n else scheduleFlush()\n },\n flush,\n stats() {\n return { ...stats, pendingBytes: byteLength }\n },\n }\n}\n\nfunction checkpointWebSocketState(env, payload, body, nextState) {\n return deferWorkerTask(async () => {\n try {\n if ((await hashBody(body)) !== payload.next_hash) {\n throw new Error('hash mismatch')\n }\n await writeState(env, payload.affinity, nextState)\n } catch (error) {\n console.log(JSON.stringify({\n relay: 'opencode-anthropic-auth',\n transport: 'websocket',\n event: 'checkpoint_write_failed',\n affinity: String(payload.affinity).slice(0, 12),\n message: error instanceof Error ? error.message : String(error),\n }))\n }\n })\n}\n\nasync function prepareWebSocketUpstream(env, state, payload) {\n if (payload.protocol !== 2 || payload.type !== 'request' || !payload.id || !payload.affinity || !payload.upstream?.url || !payload.next_hash) {\n return { error: 'invalid payload', status: 400 }\n }\n\n const body = resolveBodyFromState(state, payload)\n if (body && typeof body === 'object' && 'error' in body) return body\n if (typeof body !== 'string') return { error: 'invalid body', status: 400 }\n if ((await hashBody(body)) !== payload.next_hash) {\n return { error: 'hash mismatch', status: 409 }\n }\n\n const nextState = { body, hash: payload.next_hash, revision: payload.revision }\n const checkpoint = checkpointWebSocketState(env, payload, body, nextState)\n const logAccepted = () => console.log(JSON.stringify({\n relay: 'opencode-anthropic-auth',\n transport: 'websocket',\n mode: payload.mode,\n revision: payload.revision,\n affinity: String(payload.affinity).slice(0, 12),\n bodyBytes: body.length,\n }))\n\n return { body, state: nextState, checkpoint, logAccepted }\n}\n\nfunction headersToObject(headers) {\n const result = {}\n for (const [key, value] of headers.entries()) result[key] = value\n return result\n}\n\nasync function handleRelayPayload(env, payload) {\n const prepared = await prepareUpstream(env, payload)\n if (prepared.error) return prepared\n const upstream = await fetch(payload.upstream.url, {\n method: payload.upstream.method || 'POST',\n headers: payload.upstream.headers,\n body: prepared.body,\n })\n return { upstream, stateWrite: prepared.stateWrite }\n}\n\nasync function handleWebSocket(socket, env, ctx, payload, getState, setState) {\n const heartbeat = setInterval(() => {\n try {\n socket.send(JSON.stringify({ protocol: 2, type: 'keepalive' }))\n } catch {\n clearInterval(heartbeat)\n }\n }, 15000)\n let streamCoalescer = null\n try {\n const result = await prepareWebSocketUpstream(env, getState(), payload)\n if (result.error) {\n socket.send(JSON.stringify({ protocol: 2, type: 'error', id: payload.id, status: result.status, message: result.error }))\n return\n }\n\n setState(result.state)\n socket.send(JSON.stringify({ protocol: 2, type: 'accepted', id: payload.id, hash: result.state.hash, revision: result.state.revision }))\n ctx?.waitUntil?.(deferWorkerTask(result.logAccepted))\n\n const upstreamPromise = fetch(payload.upstream.url, {\n method: payload.upstream.method || 'POST',\n headers: payload.upstream.headers,\n body: result.body,\n })\n ctx?.waitUntil?.(result.checkpoint)\n const upstream = await upstreamPromise\n socket.send(JSON.stringify({\n protocol: 2,\n type: 'response_start',\n id: payload.id,\n status: upstream.status,\n statusText: upstream.statusText,\n headers: headersToObject(upstream.headers),\n }))\n\n const reader = upstream.body?.getReader()\n if (reader) {\n streamCoalescer = createWebSocketStreamCoalescer(socket)\n while (true) {\n const { done, value } = await reader.read()\n if (done) break\n streamCoalescer.push(value)\n }\n streamCoalescer.flush()\n }\n const stats = streamCoalescer?.stats?.() || {}\n socket.send(JSON.stringify({ protocol: 2, type: 'done', id: payload.id, ...stats }))\n } catch (error) {\n try {\n streamCoalescer?.flush?.()\n } catch {}\n socket.send(JSON.stringify({ protocol: 2, type: 'error', id: payload.id, status: 500, message: error instanceof Error ? error.message : String(error) }))\n } finally {\n clearInterval(heartbeat)\n }\n}\n\nexport default {\n async fetch(request, env, ctx) {\n if (request.headers.get('Upgrade') === 'websocket') {\n const url = new URL(request.url)\n const token = url.searchParams.get('token')\n const affinity = url.searchParams.get('affinity')\n if (token !== env.RELAY_TOKEN) return new Response('unauthorized', { status: 401 })\n if (!affinity) return new Response('missing affinity', { status: 400 })\n const pair = new WebSocketPair()\n const client = pair[0]\n const server = pair[1]\n server.binaryType = 'arraybuffer'\n let state = null\n let ready = false\n server.accept()\n const loadState = readState(env, affinity).then((loadedState) => {\n state = loadedState\n ready = true\n server.send(JSON.stringify({\n protocol: 2,\n type: 'ready',\n state: state ? { hash: state.hash, revision: state.revision } : null,\n }))\n }).catch((error) => {\n server.send(JSON.stringify({ protocol: 2, type: 'error', status: 500, message: error instanceof Error ? error.message : String(error) }))\n server.close(1011, 'state load failed')\n })\n ctx?.waitUntil?.(loadState)\n if (!ctx?.waitUntil) void loadState\n let busy = false\n server.addEventListener('message', (event) => {\n if (!ready) {\n server.send(JSON.stringify({ protocol: 2, type: 'error', status: 425, message: 'relay state is not ready' }))\n return\n }\n if (busy) {\n server.send(JSON.stringify({ protocol: 2, type: 'error', status: 429, message: 'request already in flight' }))\n return\n }\n let payload\n try {\n payload = JSON.parse(event.data)\n } catch {\n server.send(JSON.stringify({ protocol: 2, type: 'error', status: 400, message: 'invalid JSON payload' }))\n return\n }\n payload.affinity = affinity\n busy = true\n const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }).finally(() => { busy = false })\n ctx?.waitUntil?.(run)\n if (!ctx?.waitUntil) void run\n })\n return new Response(null, { status: 101, webSocket: client })\n }\n\n if (request.method === 'GET') {\n return Response.json({ status: 'ok', transports: ['http', 'websocket'] })\n }\n if (request.method !== 'POST') return new Response('method not allowed', { status: 405 })\n if (request.headers.get('x-relay-token') !== env.RELAY_TOKEN) {\n return new Response('unauthorized', { status: 401 })\n }\n\n try {\n const payload = await request.json()\n const result = await handleRelayPayload(env, payload)\n if (result.error) return Response.json({ error: result.error }, { status: result.status })\n\n if (result.stateWrite) ctx.waitUntil(result.stateWrite)\n\n const upstream = result.upstream\n return new Response(upstream.body, {\n status: upstream.status,\n statusText: upstream.statusText,\n headers: upstream.headers,\n })\n } catch (error) {\n return Response.json(\n { error: error instanceof Error ? error.message : 'internal relay error' },\n { status: 502 },\n )\n }\n },\n}\n"; |
+82
-5
@@ -628,3 +628,3 @@ import { Buffer } from 'node:buffer'; | ||
| if (message.type === 'done') { | ||
| this.finishPending(); | ||
| this.finishPending(message); | ||
| return; | ||
@@ -654,3 +654,3 @@ } | ||
| } | ||
| finishPending() { | ||
| finishPending(done) { | ||
| const pending = this.pending; | ||
@@ -660,3 +660,6 @@ if (!pending) | ||
| const finishedAt = perfNowMs(); | ||
| relayLog(`perf websocket done session=${shortAffinity(this.affinity)} request=${pending.payload.id} sentMs=${formatMs(finishedAt - pending.sentAt)} streamMs=${pending.responseStartedAt == null ? 'unknown' : formatMs(finishedAt - pending.responseStartedAt)} chunks=${pending.streamChunkCount} bytes=${pending.streamByteCount}`); | ||
| const workerStats = done && 'frames' in done | ||
| ? ` upstreamChunks=${done.upstreamChunks ?? 'unknown'} upstreamBytes=${done.upstreamBytes ?? 'unknown'} frames=${done.frames} frameBytes=${done.frameBytes ?? 'unknown'}` | ||
| : ''; | ||
| relayLog(`perf websocket done session=${shortAffinity(this.affinity)} request=${pending.payload.id} sentMs=${formatMs(finishedAt - pending.sentAt)} streamMs=${pending.responseStartedAt == null ? 'unknown' : formatMs(finishedAt - pending.responseStartedAt)} chunks=${pending.streamChunkCount} bytes=${pending.streamByteCount}${workerStats}`); | ||
| clearTimeout(pending.timeout); | ||
@@ -932,2 +935,69 @@ if (!pending.streamDone) { | ||
| const WEBSOCKET_STREAM_FLUSH_BYTES = 1024 | ||
| const WEBSOCKET_STREAM_FLUSH_MS = 20 | ||
| function createWebSocketStreamCoalescer(socket, options = {}) { | ||
| const flushBytes = options.flushBytes || WEBSOCKET_STREAM_FLUSH_BYTES | ||
| const flushMs = options.flushMs ?? WEBSOCKET_STREAM_FLUSH_MS | ||
| let chunks = [] | ||
| let byteLength = 0 | ||
| let flushTimer = null | ||
| const stats = { | ||
| upstreamChunks: 0, | ||
| upstreamBytes: 0, | ||
| frames: 0, | ||
| frameBytes: 0, | ||
| } | ||
| const clearFlushTimer = () => { | ||
| if (flushTimer === null) return | ||
| clearTimeout(flushTimer) | ||
| flushTimer = null | ||
| } | ||
| const mergeChunks = () => { | ||
| if (chunks.length === 1) return chunks[0] | ||
| const frame = new Uint8Array(byteLength) | ||
| let offset = 0 | ||
| for (const chunk of chunks) { | ||
| frame.set(chunk, offset) | ||
| offset += chunk.byteLength | ||
| } | ||
| return frame | ||
| } | ||
| const flush = () => { | ||
| clearFlushTimer() | ||
| if (byteLength === 0) return | ||
| const frame = mergeChunks() | ||
| chunks = [] | ||
| byteLength = 0 | ||
| socket.send(frame) | ||
| stats.frames += 1 | ||
| stats.frameBytes += frame.byteLength | ||
| } | ||
| const scheduleFlush = () => { | ||
| if (flushTimer !== null) return | ||
| flushTimer = setTimeout(flush, flushMs) | ||
| } | ||
| return { | ||
| push(value) { | ||
| const chunk = value instanceof Uint8Array ? value : new Uint8Array(value) | ||
| if (chunk.byteLength === 0) return | ||
| stats.upstreamChunks += 1 | ||
| stats.upstreamBytes += chunk.byteLength | ||
| chunks.push(chunk) | ||
| byteLength += chunk.byteLength | ||
| if (byteLength >= flushBytes) flush() | ||
| else scheduleFlush() | ||
| }, | ||
| flush, | ||
| stats() { | ||
| return { ...stats, pendingBytes: byteLength } | ||
| }, | ||
| } | ||
| } | ||
| function checkpointWebSocketState(env, payload, body, nextState) { | ||
@@ -1003,2 +1073,3 @@ return deferWorkerTask(async () => { | ||
| }, 15000) | ||
| let streamCoalescer = null | ||
| try { | ||
@@ -1033,10 +1104,16 @@ const result = await prepareWebSocketUpstream(env, getState(), payload) | ||
| if (reader) { | ||
| streamCoalescer = createWebSocketStreamCoalescer(socket) | ||
| while (true) { | ||
| const { done, value } = await reader.read() | ||
| if (done) break | ||
| socket.send(value) | ||
| streamCoalescer.push(value) | ||
| } | ||
| streamCoalescer.flush() | ||
| } | ||
| socket.send(JSON.stringify({ protocol: 2, type: 'done', id: payload.id })) | ||
| const stats = streamCoalescer?.stats?.() || {} | ||
| socket.send(JSON.stringify({ protocol: 2, type: 'done', id: payload.id, ...stats })) | ||
| } catch (error) { | ||
| try { | ||
| streamCoalescer?.flush?.() | ||
| } catch {} | ||
| socket.send(JSON.stringify({ protocol: 2, type: 'error', id: payload.id, status: 500, message: error instanceof Error ? error.message : String(error) })) | ||
@@ -1043,0 +1120,0 @@ } finally { |
+1
-1
| { | ||
| "name": "@cortexkit/anthropic-auth-core", | ||
| "version": "1.8.0", | ||
| "version": "1.9.0", | ||
| "type": "module", | ||
@@ -5,0 +5,0 @@ "repository": { |
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
URL strings
Supply chain riskPackage contains fragments of external URLs or IP addresses, which the package may be accessing at runtime.
Found 1 instance in 1 package
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
URL strings
Supply chain riskPackage contains fragments of external URLs or IP addresses, which the package may be accessing at runtime.
Found 1 instance in 1 package
236782
2.26%5756
1.77%