make-fetch-happen
Advanced tools
Comparing version 10.1.3 to 10.1.4
const { Request, Response } = require('minipass-fetch') | ||
const Minipass = require('minipass') | ||
const MinipassCollect = require('minipass-collect') | ||
const MinipassFlush = require('minipass-flush') | ||
@@ -15,6 +14,2 @@ const MinipassPipeline = require('minipass-pipeline') | ||
// maximum amount of data we will buffer into memory | ||
// if we'll exceed this, we switch to streaming | ||
const MAX_MEM_SIZE = 5 * 1024 * 1024 // 5MB | ||
// allow list for request headers that will be written to the cache index | ||
@@ -260,4 +255,2 @@ // note: we will also store any request headers | ||
const size = this.response.headers.get('content-length') | ||
const fitsInMemory = !!size && Number(size) < MAX_MEM_SIZE | ||
const shouldBuffer = this.options.memoize !== false && fitsInMemory | ||
const cacheOpts = { | ||
@@ -267,3 +260,4 @@ algorithms: this.options.algorithms, | ||
size, | ||
memoize: fitsInMemory && this.options.memoize, | ||
integrity: this.options.integrity, | ||
integrityEmitter: this.response.body.hasIntegrityEmitter && this.response.body, | ||
} | ||
@@ -286,32 +280,18 @@ | ||
})) | ||
// this is always true since if we aren't reusing the one from the remote fetch, we | ||
// are using the one from cacache | ||
body.hasIntegrityEmitter = true | ||
let abortStream, onResume | ||
if (shouldBuffer) { | ||
// if the result fits in memory, use a collect stream to gather | ||
// the response and write it to cacache while also passing it through | ||
// to the user | ||
onResume = () => { | ||
const collector = new MinipassCollect.PassThrough() | ||
abortStream = collector | ||
collector.on('collect', (data) => { | ||
// TODO if the cache write fails, log a warning but return the response anyway | ||
cacache.put(this.options.cachePath, this.key, data, cacheOpts) | ||
.then(cacheWriteResolve, cacheWriteReject) | ||
}) | ||
body.unshift(collector) | ||
body.unshift(this.response.body) | ||
} | ||
} else { | ||
// if it does not fit in memory, create a tee stream and use | ||
// that to pipe to both the cache and the user simultaneously | ||
onResume = () => { | ||
const tee = new Minipass() | ||
const cacheStream = cacache.put.stream(this.options.cachePath, this.key, cacheOpts) | ||
abortStream = cacheStream | ||
tee.pipe(cacheStream) | ||
// TODO if the cache write fails, log a warning but return the response anyway | ||
cacheStream.promise().then(cacheWriteResolve, cacheWriteReject) | ||
body.unshift(tee) | ||
body.unshift(this.response.body) | ||
} | ||
const onResume = () => { | ||
const tee = new Minipass() | ||
const cacheStream = cacache.put.stream(this.options.cachePath, this.key, cacheOpts) | ||
// re-emit the integrity and size events on our new response body so they can be reused | ||
cacheStream.on('integrity', i => body.emit('integrity', i)) | ||
cacheStream.on('size', s => body.emit('size', s)) | ||
// stick a flag on here so downstream users will know if they can expect integrity events | ||
tee.pipe(cacheStream) | ||
// TODO if the cache write fails, log a warning but return the response anyway | ||
cacheStream.promise().then(cacheWriteResolve, cacheWriteReject) | ||
body.unshift(tee) | ||
body.unshift(this.response.body) | ||
} | ||
@@ -321,9 +301,2 @@ | ||
body.once('end', () => body.removeListener('resume', onResume)) | ||
this.response.body.on('error', (err) => { | ||
// the abortStream will either be a MinipassCollect if we buffer | ||
// or a cacache write stream, either way be sure to listen for | ||
// errors from the actual response and avoid writing data that we | ||
// know to be invalid to the cache | ||
abortStream.destroy(err) | ||
}) | ||
} else { | ||
@@ -339,3 +312,3 @@ await cacache.index.insert(this.options.cachePath, this.key, null, cacheOpts) | ||
this.response.headers.set('x-local-cache-key', encodeURIComponent(this.key)) | ||
this.response.headers.set('x-local-cache-mode', shouldBuffer ? 'buffer' : 'stream') | ||
this.response.headers.set('x-local-cache-mode', 'stream') | ||
this.response.headers.set('x-local-cache-status', status) | ||
@@ -355,5 +328,2 @@ this.response.headers.set('x-local-cache-time', new Date().toISOString()) | ||
let response | ||
const size = Number(this.response.headers.get('content-length')) | ||
const fitsInMemory = !!size && size < MAX_MEM_SIZE | ||
const shouldBuffer = this.options.memoize !== false && fitsInMemory | ||
if (method === 'HEAD' || [301, 308].includes(this.response.status)) { | ||
@@ -368,48 +338,28 @@ // if the request is a HEAD, or the response is a redirect, | ||
const body = new Minipass() | ||
const removeOnResume = () => body.removeListener('resume', onResume) | ||
let onResume | ||
if (shouldBuffer) { | ||
onResume = async () => { | ||
removeOnResume() | ||
try { | ||
const content = await cacache.get.byDigest( | ||
const headers = { ...this.policy.responseHeaders() } | ||
const onResume = () => { | ||
const cacheStream = cacache.get.stream.byDigest( | ||
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } | ||
) | ||
cacheStream.on('error', async (err) => { | ||
cacheStream.pause() | ||
if (err.code === 'EINTEGRITY') { | ||
await cacache.rm.content( | ||
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } | ||
) | ||
body.end(content) | ||
} catch (err) { | ||
if (err.code === 'EINTEGRITY') { | ||
await cacache.rm.content( | ||
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } | ||
) | ||
} | ||
if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') { | ||
await CacheEntry.invalidate(this.request, this.options) | ||
} | ||
body.emit('error', err) | ||
} | ||
} | ||
} else { | ||
onResume = () => { | ||
const cacheStream = cacache.get.stream.byDigest( | ||
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } | ||
) | ||
cacheStream.on('error', async (err) => { | ||
cacheStream.pause() | ||
if (err.code === 'EINTEGRITY') { | ||
await cacache.rm.content( | ||
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } | ||
) | ||
} | ||
if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') { | ||
await CacheEntry.invalidate(this.request, this.options) | ||
} | ||
body.emit('error', err) | ||
cacheStream.resume() | ||
}) | ||
cacheStream.pipe(body) | ||
} | ||
if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') { | ||
await CacheEntry.invalidate(this.request, this.options) | ||
} | ||
body.emit('error', err) | ||
cacheStream.resume() | ||
}) | ||
// emit the integrity and size events based on our metadata so we're consistent | ||
body.emit('integrity', this.entry.integrity) | ||
body.emit('size', Number(headers['content-length'])) | ||
cacheStream.pipe(body) | ||
} | ||
body.once('resume', onResume) | ||
body.once('end', removeOnResume) | ||
body.once('end', () => body.removeListener('resume', onResume)) | ||
response = new Response(body, { | ||
@@ -419,5 +369,3 @@ url: this.entry.metadata.url, | ||
status: 200, | ||
headers: { | ||
...this.policy.responseHeaders(), | ||
}, | ||
headers, | ||
}) | ||
@@ -429,3 +377,3 @@ } | ||
response.headers.set('x-local-cache-key', encodeURIComponent(this.key)) | ||
response.headers.set('x-local-cache-mode', shouldBuffer ? 'buffer' : 'stream') | ||
response.headers.set('x-local-cache-mode', 'stream') | ||
response.headers.set('x-local-cache-status', status) | ||
@@ -432,0 +380,0 @@ response.headers.set('x-local-cache-time', new Date(this.entry.metadata.time).toUTCString()) |
@@ -56,3 +56,10 @@ const Minipass = require('minipass') | ||
const integrityStream = ssri.integrityStream({ integrity: _opts.integrity }) | ||
res = new fetch.Response(new MinipassPipeline(res.body, integrityStream), res) | ||
const pipeline = new MinipassPipeline(res.body, integrityStream) | ||
// we also propagate the integrity and size events out to the pipeline so we can use | ||
// this new response body as an integrityEmitter for cacache | ||
integrityStream.on('integrity', i => pipeline.emit('integrity', i)) | ||
integrityStream.on('size', s => pipeline.emit('size', s)) | ||
res = new fetch.Response(pipeline, res) | ||
// set an explicit flag so we know if our response body will emit integrity and size | ||
res.body.hasIntegrityEmitter = true | ||
} | ||
@@ -59,0 +66,0 @@ |
{ | ||
"name": "make-fetch-happen", | ||
"version": "10.1.3", | ||
"version": "10.1.4", | ||
"description": "Opinionated, caching, retrying fetch client", | ||
@@ -40,3 +40,3 @@ "main": "lib/index.js", | ||
"agentkeepalive": "^4.2.1", | ||
"cacache": "^16.0.2", | ||
"cacache": "^16.1.0", | ||
"http-cache-semantics": "^4.1.0", | ||
@@ -43,0 +43,0 @@ "http-proxy-agent": "^5.0.0", |
@@ -66,3 +66,2 @@ # make-fetch-happen | ||
* Built-in request caching following full HTTP caching rules (`Cache-Control`, `ETag`, `304`s, cache fallback on error, etc). | ||
* Customize cache storage with any [Cache API](https://developer.mozilla.org/en-US/docs/Web/API/Cache)-compliant `Cache` instance. Cache to Redis! | ||
* Node.js Stream support | ||
@@ -164,3 +163,3 @@ * Transparent gzip and deflate support | ||
* `X-Local-Cache-Key`: Unique cache entry key for this response | ||
* `X-Local-Cache-Mode`: Either `stream` or `buffer` to indicate how the response was read from cacache | ||
* `X-Local-Cache-Mode`: Always `stream` to indicate how the response was read from cacache | ||
* `X-Local-Cache-Hash`: Specific integrity hash for the cached entry | ||
@@ -167,0 +166,0 @@ * `X-Local-Cache-Status`: One of `miss`, `hit`, `stale`, `revalidated`, `updated`, or `skip` to signal how the response was created |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
57824
1088
362
Updatedcacache@^16.1.0