first-chunk-stream
Advanced tools
Comparing version 3.0.0 to 4.0.0
@@ -6,2 +6,4 @@ import { | ||
declare const stop: unique symbol; | ||
declare namespace FirstChunkStream { | ||
@@ -12,11 +14,10 @@ interface Options extends Readonly<DuplexStreamOption> { | ||
*/ | ||
readonly chunkLength: number; | ||
readonly chunkSize: number; | ||
} | ||
type TransformFunction = ( | ||
error: Error | null, | ||
chunk: Buffer, | ||
encoding: string, | ||
callback: (error?: Error | null, buffer?: string | Buffer | Uint8Array, encoding?: string) => void | ||
) => void; | ||
type StopSymbol = typeof stop; | ||
type BufferLike = string | Buffer | Uint8Array; | ||
type TransformFunction = (chunk: Buffer, encoding: string) => Promise<StopSymbol | BufferLike | {buffer: BufferLike, encoding?: string}>; | ||
} | ||
@@ -29,5 +30,5 @@ | ||
@param options - The options object is passed to the [`Duplex` stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex) constructor allowing you to customize your stream behavior. | ||
@param transform - The function that gets the required `options.chunkLength` bytes. | ||
@param transform - Async function that receives the required `options.chunkSize` bytes. | ||
Note that the buffer can have a smaller length than the required one. In that case, it will be due to the fact that the complete stream contents has a length less than the `options.chunkLength` value. You should check for this yourself if you strictly depend on the length. | ||
Note that the buffer can have a smaller length than the required one. In that case, it will be due to the fact that the complete stream contents has a length less than the `options.chunkSize` value. You should check for this yourself if you strictly depend on the length. | ||
@@ -42,9 +43,4 @@ @example | ||
const stream = fs.createReadStream('unicorn.txt') | ||
.pipe(new FirstChunkStream({chunkLength: 7}, (error, chunk, encoding, callback) => { | ||
if (error) { | ||
callback(error); | ||
return; | ||
} | ||
callback(null, chunk.toString(encoding).toUpperCase()); | ||
.pipe(new FirstChunkStream({chunkSize: 7}, async (chunk, encoding) => { | ||
return chunk.toString(encoding).toUpperCase(); | ||
})); | ||
@@ -68,4 +64,16 @@ | ||
); | ||
/** | ||
Symbol used to end the stream early. | ||
@example | ||
``` | ||
new FirstChunkStream({chunkSize: 7}, async (chunk, encoding) => { | ||
return FirstChunkStream.stop; | ||
}); | ||
``` | ||
*/ | ||
static readonly stop: FirstChunkStream.StopSymbol; | ||
} | ||
export = FirstChunkStream; |
71
index.js
'use strict'; | ||
const {Duplex: DuplexStream} = require('stream'); | ||
const stop = Symbol('FirstChunkStream.stop'); | ||
class FirstChunkStream extends DuplexStream { | ||
constructor(options = {}, callback) { | ||
constructor(options, callback) { | ||
const state = { | ||
@@ -12,2 +14,6 @@ sent: false, | ||
if (typeof options !== 'object' || options === null) { | ||
throw new TypeError('FirstChunkStream constructor requires `options` to be an object.'); | ||
} | ||
if (typeof callback !== 'function') { | ||
@@ -17,4 +23,4 @@ throw new TypeError('FirstChunkStream constructor requires a callback as its second argument.'); | ||
if (typeof options.chunkLength !== 'number') { | ||
throw new TypeError('FirstChunkStream constructor requires `options.chunkLength` to be a number.'); | ||
if (typeof options.chunkSize !== 'number') { | ||
throw new TypeError('FirstChunkStream constructor requires `options.chunkSize` to be a number.'); | ||
} | ||
@@ -31,37 +37,25 @@ | ||
// Errors management | ||
// We need to execute the callback or emit en error dependending on the fact | ||
// the firstChunk is sent or not | ||
state.errorHandler = error => { | ||
processCallback(error, Buffer.concat(state.chunks, state.size), state.encoding); | ||
}; | ||
this.on('error', state.errorHandler); | ||
// Callback management | ||
const processCallback = (error, buffer, encoding, done = () => {}) => { | ||
// When doing sync writes + emitting an errror it can happen that | ||
// Remove the error listener on the next tick if an error where fired | ||
// to avoid unwanted error throwing | ||
if (error) { | ||
setImmediate(() => this.removeListener('error', state.errorHandler)); | ||
} else { | ||
this.removeListener('error', state.errorHandler); | ||
} | ||
const processCallback = (buffer, encoding, done) => { | ||
state.sent = true; | ||
callback(error, buffer, encoding, (error, buffer, encoding) => { | ||
if (error) { | ||
setImmediate(() => this.emit('error', error)); | ||
(async () => { | ||
let result; | ||
try { | ||
result = await callback(buffer, encoding); | ||
} catch (error) { | ||
setImmediate(() => { | ||
this.emit('error', error); | ||
done(); | ||
}); | ||
return; | ||
} | ||
if (!buffer) { | ||
done(); | ||
return; | ||
if (result === stop) { | ||
state.manager.programPush(null, undefined, done); | ||
} else if (Buffer.isBuffer(result) || (result instanceof Uint8Array) || (typeof result === 'string')) { | ||
state.manager.programPush(result, undefined, done); | ||
} else { | ||
state.manager.programPush(result.buffer, result.encoding, done); | ||
} | ||
state.manager.programPush(buffer, encoding, done); | ||
}); | ||
})(); | ||
}; | ||
@@ -72,6 +66,5 @@ | ||
state.encoding = encoding; | ||
if (state.sent) { | ||
state.manager.programPush(chunk, state.encoding, done); | ||
} else if (chunk.length < options.chunkLength - state.size) { | ||
} else if (chunk.length < options.chunkSize - state.size) { | ||
state.chunks.push(chunk); | ||
@@ -81,7 +74,7 @@ state.size += chunk.length; | ||
} else { | ||
state.chunks.push(chunk.slice(0, options.chunkLength - state.size)); | ||
chunk = chunk.slice(options.chunkLength - state.size); | ||
state.chunks.push(chunk.slice(0, options.chunkSize - state.size)); | ||
chunk = chunk.slice(options.chunkSize - state.size); | ||
state.size += state.chunks[state.chunks.length - 1].length; | ||
processCallback(null, Buffer.concat(state.chunks, state.size), state.encoding, () => { | ||
processCallback(Buffer.concat(state.chunks, state.size), state.encoding, () => { | ||
if (chunk.length === 0) { | ||
@@ -99,3 +92,3 @@ done(); | ||
if (!state.sent) { | ||
return processCallback(null, Buffer.concat(state.chunks, state.size), state.encoding, () => { | ||
return processCallback(Buffer.concat(state.chunks, state.size), state.encoding, () => { | ||
state.manager.programPush(null, state.encoding); | ||
@@ -155,2 +148,4 @@ }); | ||
FirstChunkStream.stop = stop; | ||
module.exports = FirstChunkStream; |
{ | ||
"name": "first-chunk-stream", | ||
"version": "3.0.0", | ||
"description": "Transform the first chunk in a stream", | ||
"version": "4.0.0", | ||
"description": "Buffer and transform the n first bytes of a stream", | ||
"license": "MIT", | ||
@@ -31,7 +31,8 @@ "repository": "sindresorhus/first-chunk-stream", | ||
"min", | ||
"minimum" | ||
"minimum", | ||
"bytes" | ||
], | ||
"devDependencies": { | ||
"@types/node": "^12.0.0", | ||
"ava": "^1.4.1", | ||
"@types/node": "^12.0.8", | ||
"ava": "^2.1.0", | ||
"nyc": "^14.0.0", | ||
@@ -38,0 +39,0 @@ "streamtest": "^1.2.1", |
@@ -22,9 +22,4 @@ # first-chunk-stream [![Build Status](https://travis-ci.org/sindresorhus/first-chunk-stream.svg?branch=master)](https://travis-ci.org/sindresorhus/first-chunk-stream) | ||
const stream = fs.createReadStream('unicorn.txt') | ||
.pipe(new FirstChunkStream({chunkLength: 7}, (error, chunk, encoding, callback) => { | ||
if (error) { | ||
callback(error); | ||
return; | ||
} | ||
callback(null, chunk.toString(encoding).toUpperCase()); | ||
.pipe(new FirstChunkStream({chunkSize: 7}, async (chunk, encoding) => { | ||
return chunk.toString(encoding).toUpperCase(); | ||
})); | ||
@@ -47,14 +42,43 @@ | ||
### firstChunkStream(options, transform) | ||
### FirstChunkStream(options, transform) | ||
Returns a `FirstChunkStream` instance. | ||
`FirstChunkStream` constructor. | ||
#### transform(error, chunk, encoding, callback) | ||
#### transform(chunk, encoding) | ||
Type: `Function` | ||
The function that gets the required `options.chunkLength` bytes. | ||
Async function that receives the required `options.chunkSize` bytes. | ||
Note that the buffer can have a smaller length than the required one. In that case, it will be due to the fact that the complete stream contents has a length less than the `options.chunkLength` value. You should check for this yourself if you strictly depend on the length. | ||
Expected to return an buffer-like object or `string` or object of form {buffer: `Buffer`, encoding: `string`} to send to stream or `firstChunkStream.stop` to end stream right away. | ||
An error thrown from this function will be emitted as stream errors. | ||
Note that the buffer can have a smaller length than the required one. In that case, it will be due to the fact that the complete stream contents has a length less than the `options.chunkSize` value. You should check for this yourself if you strictly depend on the length. | ||
```js | ||
new FirstChunkStream({chunkSize: 7}, async (chunk, encoding) => { | ||
return chunk.toString(encoding).toUpperCase(); // Send string to stream | ||
}); | ||
new FirstChunkStream({chunkSize: 7}, async (chunk, encoding) => { | ||
return chunk; // Send buffer to stream | ||
}); | ||
new FirstChunkStream({chunkSize: 7}, async (chunk, encoding) => { | ||
return { | ||
buffer: chunk, | ||
encoding: encoding, | ||
}; // Send buffer with encoding to stream | ||
}); | ||
new FirstChunkStream({chunkSize: 7}, async (chunk, encoding) => { | ||
return FirstChunkStream.stop; // End the stream early | ||
}); | ||
new FirstChunkStream({chunkSize: 7}, async (chunk, encoding) => { | ||
throw new Error('Unconditional error'); // Emit stream error | ||
}); | ||
``` | ||
#### options | ||
@@ -64,5 +88,5 @@ | ||
The options object is passed to the [`Duplex` stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex) constructor allowing you to customize your stream behavior. In addition you can specify the following option: | ||
The options object is passed to the [`Duplex` stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex) constructor allowing you to customize your stream behavior. In addition, you can specify the following option: | ||
###### chunkLength | ||
###### chunkSize | ||
@@ -72,6 +96,1 @@ Type: `number` | ||
How many bytes you want to buffer. | ||
## License | ||
MIT © [Sindre Sorhus](https://sindresorhus.com) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
10473
179
93