@atproto/common
Advanced tools
Comparing version 0.4.2 to 0.4.3
# @atproto/common | ||
## 0.4.3 | ||
### Patch Changes | ||
- [#2770](https://github.com/bluesky-social/atproto/pull/2770) [`a07b21151`](https://github.com/bluesky-social/atproto/commit/a07b21151f1850340c4b7797ebb11521b1a6cdf3) Thanks [@matthieusieben](https://github.com/matthieusieben)! - add streamToNodeBuffer utility to convert Uint8Array (async) iterables to Buffer | ||
- Updated dependencies [[`a07b21151`](https://github.com/bluesky-social/atproto/commit/a07b21151f1850340c4b7797ebb11521b1a6cdf3), [`a07b21151`](https://github.com/bluesky-social/atproto/commit/a07b21151f1850340c4b7797ebb11521b1a6cdf3), [`eb20ff64a`](https://github.com/bluesky-social/atproto/commit/eb20ff64a2d8e3061c652e1e247bf9b0fe3c41a6)]: | ||
- @atproto/common-web@0.3.1 | ||
## 0.4.2 | ||
@@ -4,0 +13,0 @@ |
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
import { Stream, Readable, Transform, TransformCallback } from 'stream'; | ||
import { Duplex, Readable, Stream, Transform, TransformCallback } from 'node:stream'; | ||
export declare const forwardStreamErrors: (...streams: Stream[]) => void; | ||
export declare const cloneStream: (stream: Readable) => Readable; | ||
export declare const streamSize: (stream: Readable) => Promise<number>; | ||
export declare const streamToBytes: (stream: Readable) => Promise<Uint8Array>; | ||
export declare const streamToBytes: (stream: AsyncIterable<Uint8Array>) => Promise<Uint8Array>; | ||
export declare const streamToNodeBuffer: (stream: Iterable<Uint8Array> | AsyncIterable<Uint8Array>) => Promise<Buffer>; | ||
export declare const byteIterableToStream: (iter: AsyncIterable<Uint8Array>) => Readable; | ||
@@ -17,2 +18,9 @@ export declare const bytesToStream: (bytes: Uint8Array) => Readable; | ||
} | ||
export declare function decodeStream(stream: Readable, contentEncoding?: string): Readable; | ||
export declare function decodeStream(stream: AsyncIterable<Uint8Array>, contentEncoding?: string): AsyncIterable<Uint8Array> | Readable; | ||
/** | ||
* Create a series of decoding streams based on the content-encoding header. The | ||
* resulting streams should be piped together to decode the content. | ||
*/ | ||
export declare function createDecoders(contentEncoding?: string): Duplex[]; | ||
//# sourceMappingURL=streams.d.ts.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.MaxSizeChecker = exports.bytesToStream = exports.byteIterableToStream = exports.streamToBytes = exports.streamSize = exports.cloneStream = exports.forwardStreamErrors = void 0; | ||
const stream_1 = require("stream"); | ||
exports.createDecoders = exports.decodeStream = exports.MaxSizeChecker = exports.bytesToStream = exports.byteIterableToStream = exports.streamToNodeBuffer = exports.streamToBytes = exports.streamSize = exports.cloneStream = exports.forwardStreamErrors = void 0; | ||
const node_stream_1 = require("node:stream"); | ||
const node_zlib_1 = require("node:zlib"); | ||
const forwardStreamErrors = (...streams) => { | ||
@@ -14,3 +15,3 @@ for (let i = 1; i < streams.length; ++i) { | ||
const cloneStream = (stream) => { | ||
const passthrough = new stream_1.PassThrough(); | ||
const passthrough = new node_stream_1.PassThrough(); | ||
(0, exports.forwardStreamErrors)(stream, passthrough); | ||
@@ -23,3 +24,3 @@ return stream.pipe(passthrough); | ||
for await (const chunk of stream) { | ||
size += chunk.length; | ||
size += Buffer.byteLength(chunk); | ||
} | ||
@@ -29,16 +30,30 @@ return size; | ||
exports.streamSize = streamSize; | ||
const streamToBytes = async (stream) => { | ||
const bufs = []; | ||
for await (const bytes of stream) { | ||
bufs.push(bytes); | ||
const streamToBytes = async (stream) => | ||
// @NOTE Though Buffer is a sub-class of Uint8Array, we have observed | ||
// inconsistencies when using a Buffer in place of Uint8Array. For this | ||
// reason, we convert the Buffer to a Uint8Array. | ||
new Uint8Array(await (0, exports.streamToNodeBuffer)(stream)); | ||
exports.streamToBytes = streamToBytes; | ||
// streamToBuffer identifier name already taken by @atproto/common-web | ||
const streamToNodeBuffer = async (stream) => { | ||
const chunks = []; | ||
let totalLength = 0; // keep track of total length for Buffer.concat | ||
for await (const chunk of stream) { | ||
if (chunk instanceof Uint8Array) { | ||
chunks.push(chunk); | ||
totalLength += Buffer.byteLength(chunk); | ||
} | ||
else { | ||
throw new TypeError('expected Uint8Array'); | ||
} | ||
} | ||
return new Uint8Array(Buffer.concat(bufs)); | ||
return Buffer.concat(chunks, totalLength); | ||
}; | ||
exports.streamToBytes = streamToBytes; | ||
exports.streamToNodeBuffer = streamToNodeBuffer; | ||
const byteIterableToStream = (iter) => { | ||
return stream_1.Readable.from(iter, { objectMode: false }); | ||
return node_stream_1.Readable.from(iter, { objectMode: false }); | ||
}; | ||
exports.byteIterableToStream = byteIterableToStream; | ||
const bytesToStream = (bytes) => { | ||
const stream = new stream_1.Readable(); | ||
const stream = new node_stream_1.Readable(); | ||
stream.push(bytes); | ||
@@ -49,3 +64,3 @@ stream.push(null); | ||
exports.bytesToStream = bytesToStream; | ||
class MaxSizeChecker extends stream_1.Transform { | ||
class MaxSizeChecker extends node_stream_1.Transform { | ||
constructor(maxSize, createError) { | ||
@@ -83,2 +98,48 @@ super(); | ||
exports.MaxSizeChecker = MaxSizeChecker; | ||
function decodeStream(stream, contentEncoding) { | ||
const decoders = createDecoders(contentEncoding); | ||
if (decoders.length === 0) | ||
return stream; | ||
return (0, node_stream_1.pipeline)([stream, ...decoders], () => { }); | ||
} | ||
exports.decodeStream = decodeStream; | ||
/** | ||
* Create a series of decoding streams based on the content-encoding header. The | ||
* resulting streams should be piped together to decode the content. | ||
*/ | ||
function createDecoders(contentEncoding) { | ||
const decoders = []; | ||
if (contentEncoding) { | ||
const encodings = contentEncoding.split(','); | ||
for (const encoding of encodings) { | ||
const normalizedEncoding = normalizeEncoding(encoding); | ||
if (normalizedEncoding === 'identity') | ||
continue; | ||
decoders.push(createDecoder(normalizedEncoding)); | ||
} | ||
} | ||
return decoders.reverse(); | ||
} | ||
exports.createDecoders = createDecoders; | ||
function normalizeEncoding(encoding) { | ||
// https://www.rfc-editor.org/rfc/rfc7231#section-3.1.2.1 | ||
// > All content-coding values are case-insensitive... | ||
return encoding.trim().toLowerCase(); | ||
} | ||
function createDecoder(normalizedEncoding) { | ||
switch (normalizedEncoding) { | ||
// https://www.rfc-editor.org/rfc/rfc9112.html#section-7.2 | ||
case 'gzip': | ||
case 'x-gzip': | ||
return (0, node_zlib_1.createGunzip)(); | ||
case 'deflate': | ||
return (0, node_zlib_1.createInflate)(); | ||
case 'br': | ||
return (0, node_zlib_1.createBrotliDecompress)(); | ||
case 'identity': | ||
return new node_stream_1.PassThrough(); | ||
default: | ||
throw new TypeError(`Unsupported content-encoding: "${normalizedEncoding}"`); | ||
} | ||
} | ||
//# sourceMappingURL=streams.js.map |
{ | ||
"name": "@atproto/common", | ||
"version": "0.4.2", | ||
"version": "0.4.3", | ||
"license": "MIT", | ||
@@ -23,3 +23,3 @@ "description": "Shared web-platform-friendly code for atproto libraries", | ||
"pino": "^8.21.0", | ||
"@atproto/common-web": "^0.3.0" | ||
"@atproto/common-web": "^0.3.1" | ||
}, | ||
@@ -26,0 +26,0 @@ "devDependencies": { |
import { | ||
Duplex, | ||
PassThrough, | ||
pipeline, | ||
Readable, | ||
Stream, | ||
Readable, | ||
PassThrough, | ||
Transform, | ||
TransformCallback, | ||
} from 'stream' | ||
} from 'node:stream' | ||
import { createBrotliDecompress, createGunzip, createInflate } from 'node:zlib' | ||
@@ -27,3 +30,3 @@ export const forwardStreamErrors = (...streams: Stream[]) => { | ||
for await (const chunk of stream) { | ||
size += chunk.length | ||
size += Buffer.byteLength(chunk) | ||
} | ||
@@ -33,8 +36,23 @@ return size | ||
export const streamToBytes = async (stream: Readable): Promise<Uint8Array> => { | ||
const bufs: Buffer[] = [] | ||
for await (const bytes of stream) { | ||
bufs.push(bytes) | ||
export const streamToBytes = async (stream: AsyncIterable<Uint8Array>) => | ||
// @NOTE Though Buffer is a sub-class of Uint8Array, we have observed | ||
// inconsistencies when using a Buffer in place of Uint8Array. For this | ||
// reason, we convert the Buffer to a Uint8Array. | ||
new Uint8Array(await streamToNodeBuffer(stream)) | ||
// streamToBuffer identifier name already taken by @atproto/common-web | ||
export const streamToNodeBuffer = async ( | ||
stream: Iterable<Uint8Array> | AsyncIterable<Uint8Array>, | ||
): Promise<Buffer> => { | ||
const chunks: Uint8Array[] = [] | ||
let totalLength = 0 // keep track of total length for Buffer.concat | ||
for await (const chunk of stream) { | ||
if (chunk instanceof Uint8Array) { | ||
chunks.push(chunk) | ||
totalLength += Buffer.byteLength(chunk) | ||
} else { | ||
throw new TypeError('expected Uint8Array') | ||
} | ||
} | ||
return new Uint8Array(Buffer.concat(bufs)) | ||
return Buffer.concat(chunks, totalLength) | ||
} | ||
@@ -72,1 +90,63 @@ | ||
} | ||
export function decodeStream( | ||
stream: Readable, | ||
contentEncoding?: string, | ||
): Readable | ||
export function decodeStream( | ||
stream: AsyncIterable<Uint8Array>, | ||
contentEncoding?: string, | ||
): AsyncIterable<Uint8Array> | Readable | ||
export function decodeStream( | ||
stream: Readable | AsyncIterable<Uint8Array>, | ||
contentEncoding?: string, | ||
): Readable | AsyncIterable<Uint8Array> { | ||
const decoders = createDecoders(contentEncoding) | ||
if (decoders.length === 0) return stream | ||
return pipeline([stream as Readable, ...decoders], () => {}) as Duplex | ||
} | ||
/** | ||
* Create a series of decoding streams based on the content-encoding header. The | ||
* resulting streams should be piped together to decode the content. | ||
*/ | ||
export function createDecoders(contentEncoding?: string): Duplex[] { | ||
const decoders: Duplex[] = [] | ||
if (contentEncoding) { | ||
const encodings = contentEncoding.split(',') | ||
for (const encoding of encodings) { | ||
const normalizedEncoding = normalizeEncoding(encoding) | ||
if (normalizedEncoding === 'identity') continue | ||
decoders.push(createDecoder(normalizedEncoding)) | ||
} | ||
} | ||
return decoders.reverse() | ||
} | ||
function normalizeEncoding(encoding: string) { | ||
// https://www.rfc-editor.org/rfc/rfc7231#section-3.1.2.1 | ||
// > All content-coding values are case-insensitive... | ||
return encoding.trim().toLowerCase() | ||
} | ||
function createDecoder(normalizedEncoding: string): Duplex { | ||
switch (normalizedEncoding) { | ||
// https://www.rfc-editor.org/rfc/rfc9112.html#section-7.2 | ||
case 'gzip': | ||
case 'x-gzip': | ||
return createGunzip() | ||
case 'deflate': | ||
return createInflate() | ||
case 'br': | ||
return createBrotliDecompress() | ||
case 'identity': | ||
return new PassThrough() | ||
default: | ||
throw new TypeError( | ||
`Unsupported content-encoding: "${normalizedEncoding}"`, | ||
) | ||
} | ||
} |
@@ -64,6 +64,6 @@ import * as streams from '../src/streams' | ||
describe('streamToBytes', () => { | ||
describe('streamToNodeBuffer', () => { | ||
it('converts stream to byte array', async () => { | ||
const stream = Readable.from(Buffer.from('foo')) | ||
const bytes = await streams.streamToBytes(stream) | ||
const bytes = await streams.streamToNodeBuffer(stream) | ||
@@ -73,3 +73,30 @@ expect(bytes[0]).toBe('f'.charCodeAt(0)) | ||
expect(bytes[2]).toBe('o'.charCodeAt(0)) | ||
expect(bytes.length).toBe(3) | ||
}) | ||
it('converts async iterable to byte array', async () => { | ||
const iterable = (async function* () { | ||
yield Buffer.from('b') | ||
yield Buffer.from('a') | ||
yield new Uint8Array(['r'.charCodeAt(0)]) | ||
})() | ||
const bytes = await streams.streamToNodeBuffer(iterable) | ||
expect(bytes[0]).toBe('b'.charCodeAt(0)) | ||
expect(bytes[1]).toBe('a'.charCodeAt(0)) | ||
expect(bytes[2]).toBe('r'.charCodeAt(0)) | ||
expect(bytes.length).toBe(3) | ||
}) | ||
it('throws error for non Uint8Array chunks', async () => { | ||
const iterable: AsyncIterable<any> = (async function* () { | ||
yield Buffer.from('b') | ||
yield Buffer.from('a') | ||
yield 'r' | ||
})() | ||
await expect(streams.streamToNodeBuffer(iterable)).rejects.toThrow( | ||
'expected Uint8Array', | ||
) | ||
}) | ||
}) | ||
@@ -76,0 +103,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
83844
1608
Updated@atproto/common-web@^0.3.1