ipfs-unixfs-exporter
Advanced tools
Comparing version
@@ -5,2 +5,3 @@ export default fileContent; | ||
export type PBNode = import('@ipld/dag-pb').PBNode; | ||
export type PBLink = import('@ipld/dag-pb').PBLink; | ||
/** | ||
@@ -7,0 +8,0 @@ * @type {import('../').UnixfsV1Resolver} |
{ | ||
"name": "ipfs-unixfs-exporter", | ||
"version": "8.0.1", | ||
"version": "8.0.2", | ||
"description": "JavaScript implementation of the UnixFs exporter used by IPFS", | ||
@@ -162,2 +162,6 @@ "license": "Apache-2.0 OR MIT", | ||
"it-last": "^1.0.5", | ||
"it-parallel": "^2.0.1", | ||
"it-pipe": "^2.0.4", | ||
"it-pushable": "^3.1.0", | ||
"it-map": "^1.0.6", | ||
"multiformats": "^9.4.2", | ||
@@ -172,2 +176,3 @@ "uint8arrays": "^3.0.0" | ||
"crypto-browserify": "^3.12.0", | ||
"delay": "^5.0.0", | ||
"ipfs-unixfs-importer": "^10.0.0", | ||
@@ -174,0 +179,0 @@ "it-all": "^1.0.5", |
@@ -6,4 +6,7 @@ import extractDataFromBlock from '../../../utils/extract-data-from-block.js' | ||
import * as dagPb from '@ipld/dag-pb' | ||
import * as dagCbor from '@ipld/dag-cbor' | ||
import * as raw from 'multiformats/codecs/raw' | ||
import { pushable } from 'it-pushable' | ||
import parallel from 'it-parallel' | ||
import { pipe } from 'it-pipe' | ||
import map from 'it-map' | ||
@@ -14,23 +17,21 @@ /** | ||
* @typedef {import('@ipld/dag-pb').PBNode} PBNode | ||
* | ||
* @typedef {import('@ipld/dag-pb').PBLink} PBLink | ||
*/ | ||
/** | ||
* @param {Blockstore} blockstore | ||
* @param {PBNode} node | ||
* @param {PBNode | Uint8Array} node | ||
* @param {import('it-pushable').Pushable<Uint8Array | undefined>} queue | ||
* @param {number} streamPosition | ||
* @param {number} start | ||
* @param {number} end | ||
* @param {number} streamPosition | ||
* @param {ExporterOptions} options | ||
* @returns {AsyncIterable<Uint8Array>} | ||
* @returns {Promise<void>} | ||
*/ | ||
async function * emitBytes (blockstore, node, start, end, streamPosition = 0, options) { | ||
async function walkDAG (blockstore, node, queue, streamPosition, start, end, options) { | ||
// a `raw` node | ||
if (node instanceof Uint8Array) { | ||
const buf = extractDataFromBlock(node, streamPosition, start, end) | ||
queue.push(extractDataFromBlock(node, streamPosition, start, end)) | ||
if (buf.length) { | ||
yield buf | ||
} | ||
streamPosition += buf.length | ||
return streamPosition | ||
return | ||
} | ||
@@ -42,2 +43,3 @@ | ||
/** @type {UnixFS} */ | ||
let file | ||
@@ -52,50 +54,70 @@ | ||
// might be a unixfs `raw` node or have data on intermediate nodes | ||
if (file.data && file.data.length) { | ||
const buf = extractDataFromBlock(file.data, streamPosition, start, end) | ||
if (file.data != null) { | ||
const data = file.data | ||
const buf = extractDataFromBlock(data, streamPosition, start, end) | ||
if (buf.length) { | ||
yield buf | ||
} | ||
queue.push(buf) | ||
streamPosition += file.data.length | ||
streamPosition += buf.byteLength | ||
} | ||
let childStart = streamPosition | ||
/** @type {Array<{ link: PBLink, blockStart: number }>} */ | ||
const childOps = [] | ||
// work out which child nodes contain the requested data | ||
for (let i = 0; i < node.Links.length; i++) { | ||
const childLink = node.Links[i] | ||
const childEnd = streamPosition + file.blockSizes[i] | ||
const childStart = streamPosition // inclusive | ||
const childEnd = childStart + file.blockSizes[i] // exclusive | ||
if ((start >= childStart && start < childEnd) || // child has offset byte | ||
(end > childStart && end <= childEnd) || // child has end byte | ||
(end >= childStart && end <= childEnd) || // child has end byte | ||
(start < childStart && end > childEnd)) { // child is between offset and end bytes | ||
const block = await blockstore.get(childLink.Hash, { | ||
signal: options.signal | ||
childOps.push({ | ||
link: childLink, | ||
blockStart: streamPosition | ||
}) | ||
let child | ||
switch (childLink.Hash.code) { | ||
case dagPb.code: | ||
child = await dagPb.decode(block) | ||
break | ||
case raw.code: | ||
child = block | ||
break | ||
case dagCbor.code: | ||
child = await dagCbor.decode(block) | ||
break | ||
default: | ||
throw Error(`Unsupported codec: ${childLink.Hash.code}`) | ||
} | ||
} | ||
for await (const buf of emitBytes(blockstore, child, start, end, streamPosition, options)) { | ||
streamPosition += buf.length | ||
streamPosition = childEnd | ||
yield buf | ||
if (streamPosition > end) { | ||
break | ||
} | ||
} | ||
await pipe( | ||
childOps, | ||
(source) => map(source, (op) => { | ||
return async () => { | ||
const block = await blockstore.get(op.link.Hash, { | ||
signal: options.signal | ||
}) | ||
return { | ||
...op, | ||
block | ||
} | ||
} | ||
}), | ||
(source) => parallel(source, { | ||
ordered: true | ||
}), | ||
async (source) => { | ||
for await (const { link, block, blockStart } of source) { | ||
let child | ||
switch (link.Hash.code) { | ||
case dagPb.code: | ||
child = await dagPb.decode(block) | ||
break | ||
case raw.code: | ||
child = block | ||
break | ||
default: | ||
throw errCode(new Error(`Unsupported codec: ${link.Hash.code}`), 'ERR_NOT_UNIXFS') | ||
} | ||
await walkDAG(blockstore, child, queue, blockStart, start, end, options) | ||
} | ||
} | ||
streamPosition = childEnd | ||
childStart = childEnd + 1 | ||
} | ||
) | ||
} | ||
@@ -110,3 +132,3 @@ | ||
*/ | ||
function yieldFileContent (options = {}) { | ||
async function * yieldFileContent (options = {}) { | ||
const fileSize = unixfs.fileSize() | ||
@@ -123,6 +145,24 @@ | ||
const start = offset | ||
const end = offset + length | ||
const queue = pushable({ | ||
objectMode: true | ||
}) | ||
return emitBytes(blockstore, node, start, end, 0, options) | ||
walkDAG(blockstore, node, queue, 0, offset, offset + length, options) | ||
.catch(err => { | ||
queue.end(err) | ||
}) | ||
let read = 0 | ||
for await (const buf of queue) { | ||
if (buf != null) { | ||
yield buf | ||
read += buf.byteLength | ||
if (read === length) { | ||
queue.end() | ||
} | ||
} | ||
} | ||
} | ||
@@ -129,0 +169,0 @@ |
@@ -19,3 +19,3 @@ /** | ||
// If the end byte is in the current block, truncate the block to the end byte | ||
block = block.slice(0, requestedEnd - blockStart) | ||
block = block.subarray(0, requestedEnd - blockStart) | ||
} | ||
@@ -25,3 +25,3 @@ | ||
// If the start byte is in the current block, skip to the start byte | ||
block = block.slice(requestedStart - blockStart) | ||
block = block.subarray(requestedStart - blockStart) | ||
} | ||
@@ -28,0 +28,0 @@ |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Unidentified License
License(Experimental) Something that seems like a license was found, but its contents could not be matched with a known license.
Found 1 instance in 1 package
Unidentified License
License(Experimental) Something that seems like a license was found, but its contents could not be matched with a known license.
Found 1 instance in 1 package
163339
9.26%1546
5.75%14
40%13
8.33%+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added