ipfs-unixfs-exporter
Advanced tools
Comparing version
{ | ||
"name": "ipfs-unixfs-exporter", | ||
"version": "8.0.5", | ||
"version": "8.0.6", | ||
"description": "JavaScript implementation of the UnixFs exporter used by IPFS", | ||
@@ -166,2 +166,3 @@ "license": "Apache-2.0 OR MIT", | ||
"it-map": "^1.0.6", | ||
"p-queue": "^7.3.0", | ||
"multiformats": "^9.4.2", | ||
@@ -168,0 +169,0 @@ "uint8arrays": "^3.0.0" |
@@ -11,2 +11,3 @@ import extractDataFromBlock from '../../../utils/extract-data-from-block.js' | ||
import map from 'it-map' | ||
import PQueue from 'p-queue' | ||
@@ -23,10 +24,11 @@ /** | ||
* @param {PBNode | Uint8Array} node | ||
* @param {import('it-pushable').Pushable<Uint8Array | undefined>} queue | ||
* @param {import('it-pushable').Pushable<Uint8Array>} queue | ||
* @param {number} streamPosition | ||
* @param {number} start | ||
* @param {number} end | ||
* @param {PQueue} walkQueue | ||
* @param {ExporterOptions} options | ||
* @returns {Promise<void>} | ||
*/ | ||
async function walkDAG (blockstore, node, queue, streamPosition, start, end, options) { | ||
async function walkDAG (blockstore, node, queue, streamPosition, start, end, walkQueue, options) { | ||
// a `raw` node | ||
@@ -105,6 +107,7 @@ if (node instanceof Uint8Array) { | ||
for await (const { link, block, blockStart } of source) { | ||
/** @type {PBNode | Uint8Array} */ | ||
let child | ||
switch (link.Hash.code) { | ||
case dagPb.code: | ||
child = await dagPb.decode(block) | ||
child = dagPb.decode(block) | ||
break | ||
@@ -115,6 +118,9 @@ case raw.code: | ||
default: | ||
throw errCode(new Error(`Unsupported codec: ${link.Hash.code}`), 'ERR_NOT_UNIXFS') | ||
queue.end(errCode(new Error(`Unsupported codec: ${link.Hash.code}`), 'ERR_NOT_UNIXFS')) | ||
return | ||
} | ||
await walkDAG(blockstore, child, queue, blockStart, start, end, options) | ||
walkQueue.add(async () => { | ||
await walkDAG(blockstore, child, queue, blockStart, start, end, walkQueue, options) | ||
}) | ||
} | ||
@@ -148,11 +154,17 @@ } | ||
const queue = pushable({ | ||
objectMode: true | ||
// use a queue to walk the DAG instead of recursion to ensure very deep DAGs | ||
// don't overflow the stack | ||
const walkQueue = new PQueue({ | ||
concurrency: 1 | ||
}) | ||
const queue = pushable() | ||
walkDAG(blockstore, node, queue, 0, offset, offset + length, options) | ||
.catch(err => { | ||
queue.end(err) | ||
}) | ||
walkQueue.add(async () => { | ||
await walkDAG(blockstore, node, queue, 0, offset, offset + length, walkQueue, options) | ||
}) | ||
walkQueue.on('error', error => { | ||
queue.end(error) | ||
}) | ||
let read = 0 | ||
@@ -159,0 +171,0 @@ |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
174640
6.67%1603
3.29%15
7.14%+ Added
+ Added
+ Added
+ Added