Comparing version
/// <reference types="node" /> | ||
export type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined; | ||
export type Bytes = ArrayBuffer | Uint8Array | Buffer; | ||
export interface ServerSentEvent { | ||
@@ -18,1 +18,5 @@ event: string | null; | ||
} | ||
export interface LinesResult { | ||
fieldLength: number; | ||
line: Uint8Array; | ||
} |
@@ -1,25 +0,19 @@ | ||
import { Bytes, ServerSentEvent } from './interface'; | ||
import { ServerSentEvent, LinesResult } from './interface'; | ||
export declare const NewLineChars: { | ||
NewLine: number; | ||
CarriageReturn: number; | ||
Space: number; | ||
Colon: number; | ||
}; | ||
export declare function parseServerSentEvent(stream: ReadableStream<Uint8Array>, onMessage: (event: ServerSentEvent) => void): Promise<void>; | ||
/** | ||
* Converts a ReadableStream into a callback pattern. | ||
* @param stream The input ReadableStream. | ||
* @param onChunk A function that will be called on each new byte chunk in the stream. | ||
* @returns A promise that will be resolved when the stream closes. | ||
* Parses any byte chunks into EventSource line buffers. | ||
*/ | ||
export declare function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void): Promise<void>; | ||
/** | ||
* from openai sdk. | ||
* A re-implementation of http[s]'s `LineDecoder` that handles incrementally | ||
* reading lines from text. | ||
* | ||
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258 | ||
*/ | ||
export declare class LineDecoder { | ||
buffer: string[]; | ||
trailingCR: boolean; | ||
textDecoder: any; | ||
private buffer; | ||
private position; | ||
private fieldLength; | ||
private trailingNewLine; | ||
constructor(); | ||
decode(chunk: Bytes): string[]; | ||
decodeText(bytes: Bytes): string; | ||
flush(): string[]; | ||
getLines(chunk: Uint8Array): LinesResult[]; | ||
} | ||
@@ -34,3 +28,4 @@ /** | ||
constructor(); | ||
decode(line: string): ServerSentEvent | null; | ||
decode(line: Uint8Array, filedLength: number): ServerSentEvent | undefined; | ||
private decodeText; | ||
} |
221
build/sse.js
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.SSEDecoder = exports.LineDecoder = exports.getBytes = exports.parseServerSentEvent = void 0; | ||
// prettier-ignore | ||
const NEWLINE_CHARS = new Set(['\n', '\r']); | ||
// const NEWLINE_CHARS = new Set(['\n', '\r', '\x0b', '\x0c', '\x1c', '\x1d', '\x1e', '\x85', '\u2028', '\u2029']); | ||
// eslint-disable-next-line no-control-regex | ||
const NEWLINE_REGEXP = /\r\n|[\n\r]/g; | ||
// const NEWLINE_REGEXP = /\r\n|[\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029]/g; | ||
exports.SSEDecoder = exports.LineDecoder = exports.parseServerSentEvent = exports.NewLineChars = void 0; | ||
exports.NewLineChars = { | ||
NewLine: 10, | ||
CarriageReturn: 13, | ||
Space: 12, | ||
Colon: 58 | ||
}; | ||
async function parseServerSentEvent(stream, onMessage) { | ||
@@ -15,14 +15,8 @@ const decoder = new SSEDecoder(); | ||
// get string lines, newline-separated should be \n,\r,\r\n | ||
const lines = lineDecoder.decode(chunk); | ||
for (const line of lines) { | ||
const sseData = decoder.decode(line); | ||
if (sseData) { | ||
onMessage(sseData); | ||
} | ||
const list = lineDecoder.getLines(chunk); | ||
for (const data of list) { | ||
const source = decoder.decode(data.line, data.fieldLength); | ||
if (source) | ||
onMessage(source); | ||
} | ||
for (const line of lineDecoder.flush()) { | ||
const sseData = decoder.decode(line); | ||
if (sseData) | ||
onMessage(sseData); | ||
} | ||
}); | ||
@@ -33,5 +27,2 @@ } | ||
* Converts a ReadableStream into a callback pattern. | ||
* @param stream The input ReadableStream. | ||
* @param onChunk A function that will be called on each new byte chunk in the stream. | ||
* @returns A promise that will be resolved when the stream closes. | ||
*/ | ||
@@ -47,78 +38,74 @@ async function getBytes(stream, onChunk) { | ||
} | ||
exports.getBytes = getBytes; | ||
/** | ||
* from openai sdk. | ||
* A re-implementation of http[s]'s `LineDecoder` that handles incrementally | ||
* reading lines from text. | ||
* | ||
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258 | ||
* Parses any byte chunks into EventSource line buffers. | ||
*/ | ||
class LineDecoder { | ||
constructor() { | ||
this.buffer = []; | ||
this.trailingCR = false; | ||
this.position = 0; | ||
this.fieldLength = -1; | ||
this.buffer = undefined; | ||
this.trailingNewLine = false; | ||
} | ||
decode(chunk) { | ||
let text = this.decodeText(chunk); | ||
// end with Carriage Return | ||
if (this.trailingCR) { | ||
text = '\r' + text; | ||
this.trailingCR = false; | ||
getLines(chunk) { | ||
if (this.buffer === undefined) { | ||
this.buffer = chunk; | ||
this.position = 0; | ||
this.fieldLength = -1; | ||
} | ||
if (text.endsWith('\r')) { | ||
this.trailingCR = true; | ||
text = text.slice(0, -1); | ||
else { | ||
const buffer = new Uint8Array(this.buffer.length + chunk.length); | ||
buffer.set(this.buffer); | ||
buffer.set(chunk, this.buffer.length); | ||
this.buffer = buffer; | ||
} | ||
if (!text) { | ||
return []; | ||
} | ||
const trailingNewline = NEWLINE_CHARS.has(text[text.length - 1] || ''); | ||
let lines = text.split(NEWLINE_REGEXP); | ||
if (lines.length === 1 && !trailingNewline) { | ||
this.buffer.push(lines[0]); | ||
return []; | ||
} | ||
if (this.buffer.length > 0) { | ||
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)]; | ||
this.buffer = []; | ||
} | ||
if (!trailingNewline) { | ||
this.buffer = [lines.pop() || '']; | ||
} | ||
return lines; | ||
} | ||
decodeText(bytes) { | ||
var _a; | ||
if (bytes == null) | ||
return ''; | ||
if (typeof bytes === 'string') | ||
return bytes; | ||
// Node: | ||
if (typeof Buffer !== 'undefined') { | ||
if (bytes instanceof Buffer) { | ||
return bytes.toString('utf-8'); | ||
const { buffer } = this; | ||
const bufLength = this.buffer.length; | ||
let lineStart = 0; | ||
let resultBuf = new Uint8Array(); | ||
let resultFieldLength = -1; | ||
const list = []; | ||
while (this.position < bufLength) { | ||
// check new line char, if checked, skip to next char | ||
if (this.trailingNewLine) { | ||
if (buffer[this.position] === exports.NewLineChars.NewLine) { | ||
lineStart = ++this.position; | ||
} | ||
this.trailingNewLine = false; | ||
} | ||
if (bytes instanceof Uint8Array) { | ||
return Buffer.from(bytes).toString('utf-8'); | ||
let lineEnd = -1; | ||
for (; this.position < bufLength && lineEnd === -1; ++this.position) { | ||
switch (buffer[this.position]) { | ||
case exports.NewLineChars.Colon: | ||
if (this.fieldLength === -1) | ||
this.fieldLength = this.position - lineStart; | ||
break; | ||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | ||
// @ts-ignore - this case ('\r') should fallthrough to NewLine '\n' | ||
case exports.NewLineChars.CarriageReturn: | ||
this.trailingNewLine = true; | ||
// eslint-disable-next-line no-fallthrough | ||
case exports.NewLineChars.NewLine: | ||
lineEnd = this.position; | ||
break; | ||
} | ||
} | ||
throw new Error(`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`); | ||
} | ||
// Browser | ||
if (typeof TextDecoder !== 'undefined') { | ||
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) { | ||
(_a = this.textDecoder) !== null && _a !== void 0 ? _a : (this.textDecoder = new TextDecoder('utf8')); | ||
return this.textDecoder.decode(bytes); | ||
if (lineEnd === -1) { | ||
// the line has not ended, so we need to the next line and continue parsing. | ||
break; | ||
} | ||
throw new Error(`Unexpected: received non-Uint8Array/ArrayBuffer (${bytes.constructor.name}) in a web platform. Please report this error.`); | ||
// got the data | ||
resultBuf = this.buffer.subarray(lineStart, lineEnd); | ||
resultFieldLength = this.fieldLength; | ||
list.push({ fieldLength: resultFieldLength, line: resultBuf }); | ||
lineStart = this.position; | ||
this.fieldLength = -1; | ||
} | ||
throw new Error('Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.'); | ||
} | ||
flush() { | ||
if (!this.buffer.length && !this.trailingCR) { | ||
return []; | ||
if (lineStart === bufLength) { | ||
this.buffer = undefined; | ||
} | ||
const lines = [this.buffer.join('')]; | ||
this.buffer = []; | ||
this.trailingCR = false; | ||
return lines; | ||
else if (lineStart !== 0) { | ||
this.buffer = this.buffer.subarray(lineStart); | ||
this.position -= lineStart; | ||
} | ||
return list; | ||
} | ||
@@ -136,10 +123,5 @@ } | ||
} | ||
decode(line) { | ||
if (line.endsWith('\r')) { | ||
line = line.substring(0, line.length - 1); | ||
} | ||
if (!line) { | ||
// empty line and we didn't previously encounter any messages | ||
if (!this.event && !this.data.length) | ||
return null; | ||
decode(line, filedLength) { | ||
if (line.length === 0) { | ||
// empty line denotes end of message. return event data and start a new message: | ||
const sse = { | ||
@@ -150,2 +132,3 @@ event: this.event, | ||
}; | ||
// new message | ||
this.event = null; | ||
@@ -156,27 +139,39 @@ this.data = []; | ||
} | ||
this.chunks.push(line); | ||
if (line.startsWith(':')) { | ||
return null; | ||
else if (filedLength > 0) { | ||
// line is of format "<field>:<value>" or "<field>: <value>" | ||
const field = this.decodeText(line.subarray(0, filedLength)); | ||
const valueOffset = filedLength + (line[filedLength + 1] === exports.NewLineChars.Space ? 2 : 1); | ||
const value = this.decodeText(line.subarray(valueOffset)); | ||
switch (field) { | ||
case 'event': | ||
this.event = value; | ||
break; | ||
case 'data': | ||
this.data.push(value); | ||
break; | ||
} | ||
} | ||
const [fieldName, , value] = partition(line, ':'); | ||
let str = value; | ||
if (value.startsWith(' ')) { | ||
str = value.substring(1); | ||
} | ||
decodeText(bytes) { | ||
// Node: | ||
if (typeof Buffer !== 'undefined') { | ||
if (bytes instanceof Buffer) { | ||
return bytes.toString('utf-8'); | ||
} | ||
if (bytes instanceof Uint8Array) { | ||
return Buffer.from(bytes).toString('utf-8'); | ||
} | ||
throw new Error(`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`); | ||
} | ||
if (fieldName === 'event') { | ||
this.event = str; | ||
// Browser | ||
if (typeof TextDecoder !== 'undefined') { | ||
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) { | ||
const decoder = new TextDecoder('utf8'); | ||
return decoder.decode(bytes); | ||
} | ||
throw new Error(`Unexpected: received non-Uint8Array/ArrayBuffer (${bytes.constructor.name}) in a web platform. Please report this error.`); | ||
} | ||
else if (fieldName === 'data') { | ||
this.data.push(str); | ||
} | ||
return null; | ||
throw new Error('Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.'); | ||
} | ||
} | ||
exports.SSEDecoder = SSEDecoder; | ||
function partition(str, delimiter) { | ||
const index = str.indexOf(delimiter); | ||
if (index !== -1) { | ||
return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)]; | ||
} | ||
return [str, '', '']; | ||
} |
{ | ||
"name": "fetch-sse", | ||
"version": "1.0.21", | ||
"description": "An easy API for making Event Source requests, with all the features of fetch(), Supports browsers and node.", | ||
"version": "1.0.22", | ||
"description": "An easy API for making Event Source requests, with all the features of fetch(), Supports browsers and nodejs.", | ||
"main": "./build/index.js", | ||
@@ -6,0 +6,0 @@ "types": "./build/index.d.ts", |
@@ -66,3 +66,3 @@ # Fetch SSE (Server-Sent Events) | ||
# Event stream format | ||
# Event stream pattern | ||
The event stream is a simple stream of text data that encoded using UTF-8. You can find more information [here](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events). | ||
@@ -69,0 +69,0 @@ |
@@ -1,2 +0,2 @@ | ||
export type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined; | ||
export type Bytes = ArrayBuffer | Uint8Array | Buffer; | ||
@@ -19,1 +19,7 @@ export interface ServerSentEvent { | ||
} | ||
export interface LinesResult { | ||
fieldLength: number; | ||
line: Uint8Array; | ||
} |
256
src/sse.ts
@@ -1,29 +0,20 @@ | ||
import { Bytes, ServerSentEvent } from './interface'; | ||
import { Bytes, ServerSentEvent, LinesResult } from './interface'; | ||
// prettier-ignore | ||
const NEWLINE_CHARS = new Set(['\n', '\r']); | ||
// const NEWLINE_CHARS = new Set(['\n', '\r', '\x0b', '\x0c', '\x1c', '\x1d', '\x1e', '\x85', '\u2028', '\u2029']); | ||
// eslint-disable-next-line no-control-regex | ||
const NEWLINE_REGEXP = /\r\n|[\n\r]/g; | ||
// const NEWLINE_REGEXP = /\r\n|[\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029]/g; | ||
export const NewLineChars = { | ||
NewLine: 10, | ||
CarriageReturn: 13, | ||
Space: 12, | ||
Colon: 58 | ||
}; | ||
export async function parseServerSentEvent(stream: ReadableStream<Uint8Array>, onMessage: (event: ServerSentEvent) => void) { | ||
const decoder = new SSEDecoder(); | ||
await getBytes(stream, (chunk: Uint8Array) => { | ||
const lineDecoder = new LineDecoder(); | ||
// get string lines, newline-separated should be \n,\r,\r\n | ||
const lines = lineDecoder.decode(chunk); | ||
for (const line of lines) { | ||
const sseData = decoder.decode(line); | ||
if (sseData) { | ||
onMessage(sseData); | ||
} | ||
const list = lineDecoder.getLines(chunk); | ||
for (const data of list) { | ||
const source = decoder.decode(data.line, data.fieldLength); | ||
if (source) onMessage(source); | ||
} | ||
for (const line of lineDecoder.flush()) { | ||
const sseData = decoder.decode(line); | ||
if (sseData) onMessage(sseData); | ||
} | ||
}); | ||
@@ -34,7 +25,4 @@ } | ||
* Converts a ReadableStream into a callback pattern. | ||
* @param stream The input ReadableStream. | ||
* @param onChunk A function that will be called on each new byte chunk in the stream. | ||
* @returns A promise that will be resolved when the stream closes. | ||
*/ | ||
export async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) { | ||
async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) { | ||
const reader = stream.getReader(); | ||
@@ -49,102 +37,85 @@ while (true) { | ||
/** | ||
* from openai sdk. | ||
* A re-implementation of http[s]'s `LineDecoder` that handles incrementally | ||
* reading lines from text. | ||
* | ||
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258 | ||
* Parses any byte chunks into EventSource line buffers. | ||
*/ | ||
export class LineDecoder { | ||
buffer: string[]; | ||
trailingCR: boolean; | ||
// TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types. | ||
textDecoder: any; | ||
private buffer: Uint8Array | undefined; | ||
private position: number; | ||
private fieldLength: number; | ||
private trailingNewLine: boolean; | ||
constructor() { | ||
this.buffer = []; | ||
this.trailingCR = false; | ||
this.position = 0; | ||
this.fieldLength = -1; | ||
this.buffer = undefined; | ||
this.trailingNewLine = false; | ||
} | ||
decode(chunk: Bytes): string[] { | ||
let text = this.decodeText(chunk); | ||
// end with Carriage Return | ||
if (this.trailingCR) { | ||
text = '\r' + text; | ||
this.trailingCR = false; | ||
getLines(chunk: Uint8Array): LinesResult[] { | ||
if (this.buffer === undefined) { | ||
this.buffer = chunk; | ||
this.position = 0; | ||
this.fieldLength = -1; | ||
} else { | ||
const buffer = new Uint8Array(this.buffer.length + chunk.length); | ||
buffer.set(this.buffer); | ||
buffer.set(chunk, this.buffer.length); | ||
this.buffer = buffer; | ||
} | ||
if (text.endsWith('\r')) { | ||
this.trailingCR = true; | ||
text = text.slice(0, -1); | ||
} | ||
if (!text) { | ||
return []; | ||
} | ||
const { buffer } = this; | ||
const trailingNewline = NEWLINE_CHARS.has(text[text.length - 1] || ''); | ||
let lines = text.split(NEWLINE_REGEXP); | ||
const bufLength = this.buffer.length; | ||
let lineStart = 0; | ||
let resultBuf: Uint8Array = new Uint8Array(); | ||
let resultFieldLength = -1; | ||
const list: LinesResult[] = []; | ||
while (this.position < bufLength) { | ||
// check new line char, if checked, skip to next char | ||
if (this.trailingNewLine) { | ||
if (buffer[this.position] === NewLineChars.NewLine) { | ||
lineStart = ++this.position; | ||
} | ||
if (lines.length === 1 && !trailingNewline) { | ||
this.buffer.push(lines[0]!); | ||
return []; | ||
} | ||
this.trailingNewLine = false; | ||
} | ||
if (this.buffer.length > 0) { | ||
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)]; | ||
this.buffer = []; | ||
} | ||
if (!trailingNewline) { | ||
this.buffer = [lines.pop() || '']; | ||
} | ||
return lines; | ||
} | ||
decodeText(bytes: Bytes): string { | ||
if (bytes == null) return ''; | ||
if (typeof bytes === 'string') return bytes; | ||
// Node: | ||
if (typeof Buffer !== 'undefined') { | ||
if (bytes instanceof Buffer) { | ||
return bytes.toString('utf-8'); | ||
let lineEnd = -1; | ||
for (; this.position < bufLength && lineEnd === -1; ++this.position) { | ||
switch (buffer[this.position]) { | ||
case NewLineChars.Colon: | ||
if (this.fieldLength === -1) this.fieldLength = this.position - lineStart; | ||
break; | ||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment | ||
// @ts-ignore - this case ('\r') should fallthrough to NewLine '\n' | ||
case NewLineChars.CarriageReturn: | ||
this.trailingNewLine = true; | ||
// eslint-disable-next-line no-fallthrough | ||
case NewLineChars.NewLine: | ||
lineEnd = this.position; | ||
break; | ||
} | ||
} | ||
if (bytes instanceof Uint8Array) { | ||
return Buffer.from(bytes).toString('utf-8'); | ||
} | ||
throw new Error( | ||
`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`, | ||
); | ||
} | ||
// Browser | ||
if (typeof TextDecoder !== 'undefined') { | ||
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) { | ||
this.textDecoder ??= new TextDecoder('utf8'); | ||
return this.textDecoder.decode(bytes); | ||
if (lineEnd === -1) { | ||
// the line has not ended, so we need to the next line and continue parsing. | ||
break; | ||
} | ||
throw new Error( | ||
`Unexpected: received non-Uint8Array/ArrayBuffer (${ | ||
(bytes as any).constructor.name | ||
}) in a web platform. Please report this error.`, | ||
); | ||
// got the data | ||
resultBuf = this.buffer.subarray(lineStart, lineEnd); | ||
resultFieldLength = this.fieldLength; | ||
list.push({ fieldLength: resultFieldLength, line: resultBuf }); | ||
lineStart = this.position; | ||
this.fieldLength = -1; | ||
} | ||
throw new Error( | ||
'Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.', | ||
); | ||
} | ||
flush(): string[] { | ||
if (!this.buffer.length && !this.trailingCR) { | ||
return []; | ||
if (lineStart === bufLength) { | ||
this.buffer = undefined; | ||
} else if (lineStart !== 0) { | ||
this.buffer = this.buffer.subarray(lineStart); | ||
this.position -= lineStart; | ||
} | ||
const lines = [this.buffer.join('')]; | ||
this.buffer = []; | ||
this.trailingCR = false; | ||
return lines; | ||
return list; | ||
} | ||
@@ -168,10 +139,5 @@ } | ||
public decode(line: string) { | ||
if (line.endsWith('\r')) { | ||
line = line.substring(0, line.length - 1); | ||
} | ||
if (!line) { | ||
// empty line and we didn't previously encounter any messages | ||
if (!this.event && !this.data.length) return null; | ||
public decode(line: Uint8Array, filedLength: number) { | ||
if (line.length === 0) { | ||
// empty line denotes end of message. return event data and start a new message: | ||
const sse: ServerSentEvent = { | ||
@@ -183,2 +149,3 @@ event: this.event, | ||
// new message | ||
this.event = null; | ||
@@ -189,31 +156,52 @@ this.data = []; | ||
return sse; | ||
} else if (filedLength > 0) { | ||
// line is of format "<field>:<value>" or "<field>: <value>" | ||
const field = this.decodeText(line.subarray(0, filedLength)); | ||
const valueOffset = filedLength + (line[filedLength + 1] === NewLineChars.Space ? 2 : 1); | ||
const value = this.decodeText(line.subarray(valueOffset)); | ||
switch (field) { | ||
case 'event': | ||
this.event = value; | ||
break; | ||
case 'data': | ||
this.data.push(value); | ||
break; | ||
} | ||
} | ||
} | ||
this.chunks.push(line); | ||
private decodeText(bytes: Bytes): string { | ||
// Node: | ||
if (typeof Buffer !== 'undefined') { | ||
if (bytes instanceof Buffer) { | ||
return bytes.toString('utf-8'); | ||
} | ||
if (bytes instanceof Uint8Array) { | ||
return Buffer.from(bytes).toString('utf-8'); | ||
} | ||
if (line.startsWith(':')) { | ||
return null; | ||
throw new Error( | ||
`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`, | ||
); | ||
} | ||
const [fieldName, , value] = partition(line, ':'); | ||
let str = value; | ||
if (value.startsWith(' ')) { | ||
str = value.substring(1); | ||
} | ||
if (fieldName === 'event') { | ||
this.event = str; | ||
} else if (fieldName === 'data') { | ||
this.data.push(str); | ||
// Browser | ||
if (typeof TextDecoder !== 'undefined') { | ||
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) { | ||
const decoder = new TextDecoder('utf8'); | ||
return decoder.decode(bytes); | ||
} | ||
throw new Error( | ||
`Unexpected: received non-Uint8Array/ArrayBuffer (${ | ||
(bytes as any).constructor.name | ||
}) in a web platform. Please report this error.`, | ||
); | ||
} | ||
return null; | ||
} | ||
} | ||
function partition(str: string, delimiter: string): [string, string, string] { | ||
const index = str.indexOf(delimiter); | ||
if (index !== -1) { | ||
return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)]; | ||
throw new Error( | ||
'Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.', | ||
); | ||
} | ||
return [str, '', '']; | ||
} |
import { LineDecoder } from '../src/sse'; | ||
import { checkOk } from '../src/utils'; | ||
function decodeChunks(chunks: string[]) { | ||
const decoder = new LineDecoder(); | ||
const lines: string[] = []; | ||
for (const chunk of chunks) { | ||
lines.push(...decoder.decode(chunk)); | ||
} | ||
for (const line of decoder.flush()) { | ||
lines.push(line); | ||
} | ||
return lines; | ||
} | ||
describe('SSEDecoder', () => { | ||
const encoder = new TextEncoder(); | ||
const decoder = new TextDecoder(); | ||
describe('SSEDecoder', () => { | ||
const parseString = (str: string) => { | ||
const parse = new LineDecoder(); | ||
const encode = encoder.encode(str); | ||
return parse.getLines(encode).map(item => { | ||
return { | ||
message: decoder.decode(item.line), | ||
fieldLength: item.fieldLength | ||
}; | ||
}); | ||
}; | ||
const parseMultiple = (arr: string[]) => { | ||
const parse = new LineDecoder(); | ||
const list = []; | ||
for (const str of arr) { | ||
const encode = encoder.encode(str); | ||
list.push(...parse.getLines(encode)); | ||
} | ||
return list.map(item => { | ||
return { | ||
message: decoder.decode(item.line), | ||
fieldLength: item.fieldLength | ||
}; | ||
}); | ||
}; | ||
test('basic \n', () => { | ||
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar', 'baz']); | ||
expect(parseString('id: foo bar \n')).toEqual([{ message: 'id: foo bar ', fieldLength: 2 }]); | ||
}); | ||
test('basic with \r', () => { | ||
expect(parseString('data: foo bar\r')).toEqual([{ message: 'data: foo bar', fieldLength: 4 }]); | ||
}); | ||
test('basic with \r\n', () => { | ||
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar', 'baz']); | ||
expect(parseString('data: foo bar\r\n')).toEqual([{ message: 'data: foo bar', fieldLength: 4 }]); | ||
}); | ||
test('should escape new lines', () => { | ||
expect(decodeChunks(['foo \\nbaz'])).toEqual(['foo \\nbaz']); | ||
test('should escape "\\n"', () => { | ||
expect(parseString('id: foo \\n bar \n')).toEqual([{ message: 'id: foo \\n bar ', fieldLength: 2 }]); | ||
}); | ||
test('should escape "\\r"', () => { | ||
expect(parseString('id: foo \\r bar \n')).toEqual([{ message: 'id: foo \\r bar ', fieldLength: 2 }]); | ||
}); | ||
test('multiple lines', () => { | ||
const list = parseMultiple(['id:1\n', 'data: 1234\n']); | ||
expect(list[0]).toEqual({ message: 'id:1', fieldLength: 2 }); | ||
expect(list[1]).toEqual({ message: 'data: 1234', fieldLength: 4 }); | ||
}); | ||
test('should escape new lines with \\r', () => { | ||
expect(decodeChunks(['foo\\n \\rbaz'])).toEqual(['foo\\n \\rbaz']); | ||
test('single line split across multiple arrays', () => { | ||
const list = parseMultiple(['id: 1', '23', '456\n']); | ||
expect(list[0]).toEqual({ message: 'id: 123456', fieldLength: 2 }); | ||
}); | ||
test('should catch JSON.parse error of response', async () => { | ||
const response = new Response(null, { | ||
status: 422, | ||
statusText: 'Invalid parameters', | ||
headers: { | ||
'Content-Type': 'application/json' | ||
} | ||
}); | ||
try { | ||
await checkOk(response); | ||
} catch(error: any) { | ||
expect(error?.message).toEqual('Failed to parse error response as JSON'); | ||
} | ||
test('multiple lines split across multiple arrays', () => { | ||
const list = parseMultiple(['id: 1', '23\nda', 'ta: 456\n']); | ||
expect(list[0]).toEqual({ message: 'id: 123', fieldLength: 2 }); | ||
expect(list[1]).toEqual({ message: 'data: 456', fieldLength: 4 }); | ||
}); | ||
test('should catch text error of response', async () => { | ||
const response = new Response(null, { | ||
status: 422, | ||
statusText: 'Invalid parameters' | ||
}); | ||
try { | ||
await checkOk(response); | ||
} catch(error: any) { | ||
expect(error?.message).toEqual('Error 422: Invalid parameters'); | ||
} | ||
}); | ||
test('comment line', () => { | ||
expect(parseString(': 123\n')).toEqual([{ message: ': 123', fieldLength: 0 }]); | ||
}); | ||
test('line with multiple colons', () => { | ||
expect(parseString('id: 123: 456\n')).toEqual([{ message: 'id: 123: 456', fieldLength: 2 }]); | ||
}); | ||
test('single byte array with multiple lines separated by \\n', () => { | ||
const list = parseString('id: abc\ndata: def\n'); | ||
for (let i = 0; i < list.length; i++) { | ||
expect(list[i].message).toEqual(i === 0 ? 'id: abc' : 'data: def'); | ||
} | ||
}); | ||
test('single byte array with multiple lines separated by \\r', () => { | ||
const list = parseString('id: abc\rdata: def\r'); | ||
for (let i = 0; i < list.length; i++) { | ||
expect(list[i].message).toEqual(i === 0 ? 'id: abc' : 'data: def'); | ||
} | ||
}); | ||
test('single byte array with multiple lines separated by \\r\\n', () => { | ||
const list = parseString('id: abc\r\ndata: def\r\n'); | ||
for (let i = 0; i < list.length; i++) { | ||
expect(list[i].message).toEqual(i === 0 ? 'id: abc' : 'data: def'); | ||
} | ||
}); | ||
}); |
29654
9.75%25
4.17%735
7.61%