Comparing version 0.9.5 to 0.9.6
{ | ||
"name": "hyparquet", | ||
"version": "0.9.5", | ||
"version": "0.9.6", | ||
"description": "parquet file parser for javascript", | ||
@@ -31,9 +31,9 @@ "keywords": [ | ||
"@types/node": "20.12.12", | ||
"@typescript-eslint/eslint-plugin": "7.9.0", | ||
"@typescript-eslint/eslint-plugin": "7.10.0", | ||
"@vitest/coverage-v8": "1.6.0", | ||
"eslint": "8.57.0", | ||
"eslint-plugin-import": "2.29.1", | ||
"eslint-plugin-jsdoc": "48.2.5", | ||
"eslint-plugin-jsdoc": "48.2.6", | ||
"http-server": "14.1.1", | ||
"hyparquet-compressors": "0.1.2", | ||
"hyparquet-compressors": "0.1.3", | ||
"typescript": "5.4.5", | ||
@@ -40,0 +40,0 @@ "vitest": "1.6.0" |
@@ -9,3 +9,3 @@ # hyparquet | ||
[![dependencies](https://img.shields.io/badge/Dependencies-0-blueviolet)](https://www.npmjs.com/package/hyparquet?activeTab=dependencies) | ||
![coverage](https://img.shields.io/badge/Coverage-95-darkred) | ||
![coverage](https://img.shields.io/badge/Coverage-96-darkred) | ||
@@ -12,0 +12,0 @@ Dependency free since 2023! |
@@ -40,3 +40,3 @@ import { isListLike, isMapLike } from './schema.js' | ||
// Pop up to start of rep level | ||
while (currentDepth && (rep < currentRepLevel || repetitionPath[currentDepth] === 'OPTIONAL')) { | ||
while (currentDepth && (rep < currentRepLevel || repetitionPath[currentDepth] !== 'REPEATED')) { | ||
if (repetitionPath[currentDepth] !== 'REQUIRED') { | ||
@@ -53,3 +53,6 @@ containerStack.pop() | ||
// Go deeper to end of definition level | ||
while (currentDepth < repetitionPath.length - 2 && currentDefLevel < def) { | ||
while ( | ||
(currentDepth < repetitionPath.length - 2 || repetitionPath[currentDepth + 1] === 'REPEATED') && | ||
(currentDefLevel < def || repetitionPath[currentDepth + 1] === 'REQUIRED') | ||
) { | ||
currentDepth++ | ||
@@ -110,4 +113,9 @@ if (repetitionPath[currentDepth] !== 'REQUIRED') { | ||
if (isListLike(schema)) { | ||
const sublist = schema.children[0].children[0] | ||
assembleNested(subcolumnData, sublist, nextDepth + 1) | ||
let sublist = schema.children[0] | ||
let subDepth = nextDepth | ||
if (sublist.children.length === 1) { | ||
sublist = sublist.children[0] | ||
subDepth++ | ||
} | ||
assembleNested(subcolumnData, sublist, subDepth) | ||
@@ -164,6 +172,6 @@ const subcolumn = sublist.path.join('.') | ||
// invert struct by depth | ||
const inverted = invertStruct(struct, nextDepth) | ||
const invertDepth = schema.element.repetition_type === 'REQUIRED' ? depth : depth + 1 | ||
const inverted = invertStruct(struct, invertDepth) | ||
if (optional) flattenAtDepth(inverted, depth) | ||
subcolumnData.set(path, inverted) | ||
return | ||
} | ||
@@ -170,0 +178,0 @@ // assert(schema.element.repetition_type !== 'REPEATED') |
import { assembleLists } from './assemble.js' | ||
import { convert } from './convert.js' | ||
import { convert, dereferenceDictionary } from './convert.js' | ||
import { readDataPage, readDictionaryPage } from './datapage.js' | ||
@@ -11,39 +11,29 @@ import { readDataPageV2 } from './datapageV2.js' | ||
/** | ||
* @typedef {import('./types.js').SchemaTree} SchemaTree | ||
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData | ||
* @typedef {import('./types.js').Compressors} Compressors | ||
* @typedef {import('./types.js').RowGroup} RowGroup | ||
*/ | ||
/** | ||
* Parse column data from a buffer. | ||
* | ||
* @param {ArrayBuffer} arrayBuffer parquet file contents | ||
* @param {number} columnOffset offset to start reading from | ||
* @param {RowGroup} rowGroup row group metadata | ||
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData | ||
* @typedef {import('./types.js').DecodedArray} DecodedArray | ||
* @param {import('./types.js').DataReader} reader | ||
* @param {import('./types.js').RowGroup} rowGroup row group metadata | ||
* @param {ColumnMetaData} columnMetadata column metadata | ||
* @param {SchemaTree[]} schemaPath schema path for the column | ||
* @param {Compressors} [compressors] custom decompressors | ||
* @param {import('./types.js').SchemaTree[]} schemaPath schema path for the column | ||
* @param {import('./hyparquet.js').ParquetReadOptions} options read options | ||
* @returns {any[]} array of values | ||
*/ | ||
export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schemaPath, compressors) { | ||
/** @type {ArrayLike<any> | undefined} */ | ||
export function readColumn(reader, rowGroup, columnMetadata, schemaPath, { compressors, utf8 }) { | ||
const { element } = schemaPath[schemaPath.length - 1] | ||
/** @type {DecodedArray | undefined} */ | ||
let dictionary = undefined | ||
let valuesSeen = 0 | ||
let seen = 0 | ||
/** @type {any[]} */ | ||
const rowData = [] | ||
const { element } = schemaPath[schemaPath.length - 1] | ||
// column reader: | ||
const reader = { view: new DataView(arrayBuffer, columnOffset), offset: 0 } | ||
while (valuesSeen < rowGroup.num_rows) { | ||
while (seen < rowGroup.num_rows) { | ||
// parse column header | ||
const header = parquetHeader(reader) | ||
if (header.compressed_page_size === undefined) { | ||
throw new Error('parquet compressed page size is undefined') | ||
} | ||
// assert(header.compressed_page_size !== undefined) | ||
// read compressed_page_size bytes starting at offset | ||
const compressedBytes = new Uint8Array( | ||
arrayBuffer, columnOffset + reader.offset, header.compressed_page_size | ||
reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size | ||
) | ||
@@ -58,12 +48,10 @@ | ||
const page = decompressPage( | ||
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors | ||
) | ||
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors) | ||
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata) | ||
valuesSeen += daph.num_values | ||
seen += daph.num_values | ||
// assert(!daph.statistics || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length)) | ||
// construct output values: skip nulls and construct lists | ||
dereferenceDictionary(dictionary, dataPage) | ||
values = convert(dataPage, element) | ||
values = dereferenceDictionary(dictionary, dataPage) | ||
values = convert(values, element, utf8) | ||
if (repetitionLevels.length || definitionLevels?.length) { | ||
@@ -81,3 +69,3 @@ // Use repetition levels to construct lists | ||
if (schemaPath[i].element.repetition_type !== 'REQUIRED') { | ||
values = [values] | ||
values = Array.from(values, e => [e]) | ||
} | ||
@@ -95,6 +83,6 @@ } | ||
) | ||
valuesSeen += daph2.num_values | ||
seen += daph2.num_values | ||
dereferenceDictionary(dictionary, dataPage) | ||
values = convert(dataPage, element) | ||
values = dereferenceDictionary(dictionary, dataPage) | ||
values = convert(values, element, utf8) | ||
if (repetitionLevels.length || definitionLevels?.length) { | ||
@@ -130,17 +118,2 @@ // Use repetition levels to construct lists | ||
/** | ||
* Map data to dictionary values in place. | ||
* | ||
* @typedef {import('./types.js').DecodedArray} DecodedArray | ||
* @param {ArrayLike<any> | undefined} dictionary | ||
* @param {DecodedArray} dataPage | ||
*/ | ||
function dereferenceDictionary(dictionary, dataPage) { | ||
if (dictionary) { | ||
for (let i = 0; i < dataPage.length; i++) { | ||
dataPage[i] = dictionary[dataPage[i]] | ||
} | ||
} | ||
} | ||
/** | ||
* Find the start byte offset for a column chunk. | ||
@@ -153,3 +126,3 @@ * | ||
let columnOffset = dictionary_page_offset | ||
if (dictionary_page_offset === undefined || data_page_offset < dictionary_page_offset) { | ||
if (!dictionary_page_offset || data_page_offset < dictionary_page_offset) { | ||
columnOffset = data_page_offset | ||
@@ -164,3 +137,3 @@ } | ||
* @param {import('./types.js').CompressionCodec} codec | ||
* @param {Compressors | undefined} compressors | ||
* @param {import('./types.js').Compressors | undefined} compressors | ||
* @returns {Uint8Array} | ||
@@ -167,0 +140,0 @@ */ |
@@ -9,14 +9,7 @@ const dayMillis = 86400000 // 1 day in milliseconds | ||
* @param {import('./types.js').SchemaElement} schemaElement schema element for the data | ||
* @param {boolean | undefined} utf8 decode bytes as utf8? | ||
* @returns {DecodedArray} series of rich types | ||
*/ | ||
export function convert(data, schemaElement) { | ||
export function convert(data, schemaElement, utf8 = true) { | ||
const ctype = schemaElement.converted_type | ||
if (ctype === 'UTF8') { | ||
const decoder = new TextDecoder() | ||
const arr = new Array(data.length) | ||
for (let i = 0; i < arr.length; i++) { | ||
arr[i] = data[i] && decoder.decode(data[i]) | ||
} | ||
return arr | ||
} | ||
if (ctype === 'DECIMAL') { | ||
@@ -45,4 +38,19 @@ const scale = schemaElement.scale || 0 | ||
} | ||
if (ctype === 'TIMESTAMP_MILLIS') { | ||
const arr = new Array(data.length) | ||
for (let i = 0; i < arr.length; i++) { | ||
arr[i] = new Date(Number(data[i])) | ||
} | ||
return arr | ||
} | ||
if (ctype === 'TIMESTAMP_MICROS') { | ||
const arr = new Array(data.length) | ||
for (let i = 0; i < arr.length; i++) { | ||
arr[i] = new Date(Number(data[i] / 1000n)) | ||
} | ||
return arr | ||
} | ||
if (ctype === 'JSON') { | ||
return data.map(v => JSON.parse(v)) | ||
const decoder = new TextDecoder() | ||
return data.map(v => JSON.parse(decoder.decode(v))) | ||
} | ||
@@ -55,3 +63,17 @@ if (ctype === 'BSON') { | ||
} | ||
// TODO: ctype UINT | ||
if (ctype === 'UTF8' || utf8 && schemaElement.type === 'BYTE_ARRAY') { | ||
const decoder = new TextDecoder() | ||
const arr = new Array(data.length) | ||
for (let i = 0; i < arr.length; i++) { | ||
arr[i] = data[i] && decoder.decode(data[i]) | ||
} | ||
return arr | ||
} | ||
if (ctype === 'UINT_64') { | ||
const arr = new BigUint64Array(data.length) | ||
for (let i = 0; i < arr.length; i++) { | ||
arr[i] = BigInt(data[i]) | ||
} | ||
return arr | ||
} | ||
const logicalType = schemaElement.logical_type?.type | ||
@@ -61,3 +83,9 @@ if (logicalType === 'FLOAT16') { | ||
} | ||
// TODO: logical types | ||
if (logicalType === 'TIMESTAMP') { | ||
const arr = new Array(data.length) | ||
for (let i = 0; i < arr.length; i++) { | ||
arr[i] = new Date(Number(data[i])) | ||
} | ||
return arr | ||
} | ||
return data | ||
@@ -104,1 +132,23 @@ } | ||
} | ||
/** | ||
* Map data to dictionary values in place. | ||
* | ||
* @param {DecodedArray | undefined} dictionary | ||
* @param {DecodedArray} dataPage | ||
* @returns {DecodedArray} | ||
*/ | ||
export function dereferenceDictionary(dictionary, dataPage) { | ||
let output = dataPage | ||
if (dictionary) { | ||
if (dataPage instanceof Uint8Array && !(dictionary instanceof Uint8Array)) { | ||
// upgrade dataPage to match dictionary type | ||
// @ts-expect-error not my fault typescript doesn't understand constructors | ||
output = new dictionary.constructor(dataPage.length) | ||
} | ||
for (let i = 0; i < dataPage.length; i++) { | ||
output[i] = dictionary[dataPage[i]] | ||
} | ||
} | ||
return output | ||
} |
@@ -1,8 +0,7 @@ | ||
import { byteStreamSplit } from './byteStreamSplit.js' | ||
import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' | ||
import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js' | ||
import { readPlain } from './plain.js' | ||
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js' | ||
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' | ||
/** | ||
* Read a data page from the given Uint8Array. | ||
* Read a data page from uncompressed reader. | ||
* | ||
@@ -40,3 +39,2 @@ * @typedef {import("./types.d.ts").DataPage} DataPage | ||
) { | ||
// TODO: RLE encoding uses bitWidth = schemaElement.type_length | ||
const bitWidth = type === 'BOOLEAN' ? 1 : view.getUint8(reader.offset++) | ||
@@ -50,6 +48,4 @@ if (bitWidth) { | ||
} else if (daph.encoding === 'BYTE_STREAM_SPLIT') { | ||
if (type === 'FLOAT') dataPage = new Float32Array(nValues) | ||
else if (type === 'DOUBLE') dataPage = new Float64Array(nValues) | ||
else throw new Error(`parquet byte_stream_split unsupported type: ${type}`) | ||
byteStreamSplit(reader, nValues, dataPage) | ||
const { type_length } = schemaPath[schemaPath.length - 1].element | ||
dataPage = byteStreamSplit(reader, nValues, type, type_length) | ||
} else { | ||
@@ -63,4 +59,2 @@ throw new Error(`parquet unsupported encoding: ${daph.encoding}`) | ||
/** | ||
* Read a page containing dictionary data. | ||
* | ||
* @param {Uint8Array} bytes raw page data | ||
@@ -70,3 +64,3 @@ * @param {import("./types.d.ts").DictionaryPageHeader} diph dictionary page header | ||
* @param {number | undefined} typeLength - type_length from schema | ||
* @returns {ArrayLike<any>} array of values | ||
* @returns {DecodedArray} | ||
*/ | ||
@@ -80,4 +74,2 @@ export function readDictionaryPage(bytes, diph, columnMetadata, typeLength) { | ||
/** | ||
* Read the repetition levels from this page, if any. | ||
* | ||
* @typedef {import("./types.d.ts").DataReader} DataReader | ||
@@ -93,5 +85,4 @@ * @param {DataReader} reader data view for the page | ||
if (maxRepetitionLevel) { | ||
const bitWidth = widthFromMaxInt(maxRepetitionLevel) | ||
const values = new Array(daph.num_values) | ||
readRleBitPackedHybrid(reader, bitWidth, 0, values) | ||
readRleBitPackedHybrid(reader, bitWidth(maxRepetitionLevel), 0, values) | ||
return values | ||
@@ -104,4 +95,2 @@ } | ||
/** | ||
* Read the definition levels from this page, if any. | ||
* | ||
* @param {DataReader} reader data view for the page | ||
@@ -113,23 +102,16 @@ * @param {DataPageHeader} daph data page header | ||
function readDefinitionLevels(reader, daph, schemaPath) { | ||
if (!isRequired(schemaPath)) { | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) | ||
const bitWidth = widthFromMaxInt(maxDefinitionLevel) | ||
if (bitWidth) { | ||
// num_values is index 1 for either type of page header | ||
const definitionLevels = new Array(daph.num_values) | ||
readRleBitPackedHybrid(reader, bitWidth, 0, definitionLevels) | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) | ||
if (!maxDefinitionLevel) return { definitionLevels: [], numNulls: 0 } | ||
// count nulls | ||
let numNulls = daph.num_values | ||
for (const def of definitionLevels) { | ||
if (def === maxDefinitionLevel) numNulls-- | ||
} | ||
if (numNulls === 0) { | ||
definitionLevels.length = 0 | ||
} | ||
const definitionLevels = new Array(daph.num_values) | ||
readRleBitPackedHybrid(reader, bitWidth(maxDefinitionLevel), 0, definitionLevels) | ||
return { definitionLevels, numNulls } | ||
} | ||
// count nulls | ||
let numNulls = daph.num_values | ||
for (const def of definitionLevels) { | ||
if (def === maxDefinitionLevel) numNulls-- | ||
} | ||
return { definitionLevels: [], numNulls: 0 } | ||
if (numNulls === 0) definitionLevels.length = 0 | ||
return { definitionLevels, numNulls } | ||
} |
@@ -1,5 +0,4 @@ | ||
import { byteStreamSplit } from './byteStreamSplit.js' | ||
import { decompressPage } from './column.js' | ||
import { deltaBinaryUnpack, deltaByteArray, deltaLengthByteArray } from './delta.js' | ||
import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' | ||
import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js' | ||
import { readPlain } from './plain.js' | ||
@@ -32,7 +31,6 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' | ||
const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schemaPath) | ||
// assert(reader.offset === daph2.repetition_levels_byte_length) | ||
reader.offset = daph2.repetition_levels_byte_length // readVarInt() => len for boolean v2? | ||
// definition levels | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) | ||
const definitionLevels = readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) | ||
const definitionLevels = readDefinitionLevelsV2(reader, daph2, schemaPath) | ||
// assert(reader.offset === daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length) | ||
@@ -43,3 +41,5 @@ | ||
let page = compressedBytes.subarray(reader.offset) | ||
page = decompressPage(page, uncompressedPageSize, codec, compressors) | ||
if (daph2.is_compressed !== false) { | ||
page = decompressPage(page, uncompressedPageSize, codec, compressors) | ||
} | ||
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) | ||
@@ -56,5 +56,6 @@ const pageReader = { view: pageView, offset: 0 } | ||
} else if (daph2.encoding === 'RLE') { | ||
pageReader.offset = 4 | ||
// assert(columnMetadata.type === 'BOOLEAN') | ||
dataPage = new Array(nValues) | ||
readRleBitPackedHybrid(pageReader, 1, uncompressedPageSize, dataPage) | ||
readRleBitPackedHybrid(pageReader, 1, 0, dataPage) | ||
dataPage = dataPage.map(x => !!x) | ||
} else if ( | ||
@@ -64,6 +65,5 @@ daph2.encoding === 'PLAIN_DICTIONARY' || | ||
) { | ||
const bitWidth = pageView.getUint8(0) | ||
pageReader.offset = 1 | ||
const bitWidth = pageView.getUint8(pageReader.offset++) | ||
dataPage = new Array(nValues) | ||
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, dataPage) | ||
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize - 1, dataPage) | ||
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') { | ||
@@ -80,6 +80,4 @@ const int32 = type === 'INT32' | ||
} else if (daph2.encoding === 'BYTE_STREAM_SPLIT') { | ||
if (type === 'FLOAT') dataPage = new Float32Array(nValues) | ||
else if (type === 'DOUBLE') dataPage = new Float64Array(nValues) | ||
else throw new Error(`parquet byte_stream_split unsupported type: ${type}`) | ||
byteStreamSplit(pageReader, nValues, dataPage) | ||
const { type_length } = schemaPath[schemaPath.length - 1].element | ||
dataPage = byteStreamSplit(reader, nValues, type, type_length) | ||
} else { | ||
@@ -103,7 +101,5 @@ throw new Error(`parquet unsupported encoding: ${daph2.encoding}`) | ||
const bitWidth = widthFromMaxInt(maxRepetitionLevel) | ||
// num_values is index 1 for either type of page header | ||
const values = new Array(daph2.num_values) | ||
readRleBitPackedHybrid( | ||
reader, bitWidth, daph2.repetition_levels_byte_length, values | ||
reader, bitWidth(maxRepetitionLevel), daph2.repetition_levels_byte_length, values | ||
) | ||
@@ -116,13 +112,13 @@ return values | ||
* @param {DataPageHeaderV2} daph2 data page header v2 | ||
* @param {number} maxDefinitionLevel | ||
* @param {SchemaTree[]} schemaPath | ||
* @returns {number[] | undefined} definition levels | ||
*/ | ||
function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) { | ||
function readDefinitionLevelsV2(reader, daph2, schemaPath) { | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) | ||
if (maxDefinitionLevel) { | ||
// not the same as V1, because we know the length | ||
const bitWidth = widthFromMaxInt(maxDefinitionLevel) | ||
// V2 we know the length | ||
const values = new Array(daph2.num_values) | ||
readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values) | ||
readRleBitPackedHybrid(reader, bitWidth(maxDefinitionLevel), daph2.definition_levels_byte_length, values) | ||
return values | ||
} | ||
} |
@@ -6,10 +6,10 @@ import { readVarInt, readZigZagBigInt } from './thrift.js' | ||
* @param {DataReader} reader | ||
* @param {number} nValues number of values to read | ||
* @param {Int32Array | BigInt64Array} output output array | ||
* @param {number} count number of values to read | ||
* @param {Int32Array | BigInt64Array} output | ||
*/ | ||
export function deltaBinaryUnpack(reader, nValues, output) { | ||
export function deltaBinaryUnpack(reader, count, output) { | ||
const int32 = output instanceof Int32Array | ||
const blockSize = readVarInt(reader) | ||
const miniblockPerBlock = readVarInt(reader) | ||
readVarInt(reader) // assert(count === nValues) | ||
readVarInt(reader) // assert(=== count) | ||
let value = readZigZagBigInt(reader) // first value | ||
@@ -21,3 +21,3 @@ let outputIndex = 0 | ||
while (outputIndex < nValues) { | ||
while (outputIndex < count) { | ||
// new block | ||
@@ -30,3 +30,3 @@ const minDelta = readZigZagBigInt(reader) | ||
for (let i = 0; i < miniblockPerBlock && outputIndex < nValues; i++) { | ||
for (let i = 0; i < miniblockPerBlock && outputIndex < count; i++) { | ||
// new miniblock | ||
@@ -38,3 +38,3 @@ const bitWidth = BigInt(bitWidths[i]) | ||
const mask = (1n << bitWidth) - 1n | ||
while (miniblockCount && outputIndex < nValues) { | ||
while (miniblockCount && outputIndex < count) { | ||
let bits = BigInt(reader.view.getUint8(reader.offset)) >> bitpackPos & mask // TODO: don't re-read value every time | ||
@@ -59,3 +59,3 @@ bitpackPos += bitWidth | ||
} else { | ||
for (let j = 0; j < valuesPerMiniblock && outputIndex < nValues; j++) { | ||
for (let j = 0; j < valuesPerMiniblock && outputIndex < count; j++) { | ||
value += minDelta | ||
@@ -71,9 +71,9 @@ output[outputIndex++] = int32 ? Number(value) : value | ||
* @param {DataReader} reader | ||
* @param {number} nValues | ||
* @param {number} count | ||
* @param {Uint8Array[]} output | ||
*/ | ||
export function deltaLengthByteArray(reader, nValues, output) { | ||
const lengths = new Int32Array(nValues) | ||
deltaBinaryUnpack(reader, nValues, lengths) | ||
for (let i = 0; i < nValues; i++) { | ||
export function deltaLengthByteArray(reader, count, output) { | ||
const lengths = new Int32Array(count) | ||
deltaBinaryUnpack(reader, count, lengths) | ||
for (let i = 0; i < count; i++) { | ||
output[i] = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, lengths[i]) | ||
@@ -86,12 +86,12 @@ reader.offset += lengths[i] | ||
* @param {DataReader} reader | ||
* @param {number} nValues | ||
* @param {number} count | ||
* @param {Uint8Array[]} output | ||
*/ | ||
export function deltaByteArray(reader, nValues, output) { | ||
const prefixData = new Int32Array(nValues) | ||
deltaBinaryUnpack(reader, nValues, prefixData) | ||
const suffixData = new Int32Array(nValues) | ||
deltaBinaryUnpack(reader, nValues, suffixData) | ||
export function deltaByteArray(reader, count, output) { | ||
const prefixData = new Int32Array(count) | ||
deltaBinaryUnpack(reader, count, prefixData) | ||
const suffixData = new Int32Array(count) | ||
deltaBinaryUnpack(reader, count, suffixData) | ||
for (let i = 0; i < nValues; i++) { | ||
for (let i = 0; i < count; i++) { | ||
const suffix = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, suffixData[i]) | ||
@@ -98,0 +98,0 @@ if (prefixData[i]) { |
import { readVarInt } from './thrift.js' | ||
/** | ||
* Convert the value specified to a bit width. | ||
* Minimum bits needed to store value. | ||
* | ||
* @param {number} value - value to convert to bitwidth | ||
* @returns {number} bit width of the value | ||
* @param {number} value | ||
* @returns {number} | ||
*/ | ||
export function widthFromMaxInt(value) { | ||
return Math.ceil(Math.log2(value + 1)) | ||
export function bitWidth(value) { | ||
return 32 - Math.clz32(value) | ||
} | ||
@@ -16,44 +16,42 @@ | ||
* | ||
* If length is zero, then read as int32 at the start of the encoded data. | ||
* If length is zero, then read int32 length at the start. | ||
* | ||
* @typedef {import("./types.d.ts").DataReader} DataReader | ||
* @typedef {number[]} DecodedArray | ||
* @param {DataReader} reader - buffer to read data from | ||
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray | ||
* @param {DataReader} reader | ||
* @param {number} width - width of each bit-packed group | ||
* @param {number} length - length of the encoded data | ||
* @param {DecodedArray} values - output array | ||
* @param {DecodedArray} output | ||
*/ | ||
export function readRleBitPackedHybrid(reader, width, length, values) { | ||
export function readRleBitPackedHybrid(reader, width, length, output) { | ||
if (!length) { | ||
length = reader.view.getUint32(reader.offset, true) | ||
// length = reader.view.getUint32(reader.offset, true) | ||
reader.offset += 4 | ||
} | ||
let seen = 0 | ||
while (seen < values.length) { | ||
while (seen < output.length) { | ||
const header = readVarInt(reader) | ||
if (header & 1) { | ||
// bit-packed | ||
seen = readBitPacked(reader, header, width, values, seen) | ||
seen = readBitPacked(reader, header, width, output, seen) | ||
} else { | ||
// rle | ||
const count = header >>> 1 | ||
readRle(reader, count, width, values, seen) | ||
readRle(reader, count, width, output, seen) | ||
seen += count | ||
} | ||
} | ||
// assert(reader.offset - startOffset === length) | ||
} | ||
/** | ||
* Read a run-length encoded value. | ||
* Run-length encoding: read value with bitWidth and repeat it count times. | ||
* | ||
* The count is determined from the header and the width is used to grab the | ||
* value that's repeated. Yields the value repeated count times. | ||
* | ||
* @param {DataReader} reader - buffer to read data from | ||
* @param {number} count - number of values to read | ||
* @param {number} bitWidth - width of each bit-packed group | ||
* @param {DecodedArray} values - output array | ||
* @param {number} seen - number of values seen so far | ||
* @param {DataReader} reader | ||
* @param {number} count | ||
* @param {number} bitWidth | ||
* @param {DecodedArray} output | ||
* @param {number} seen | ||
*/ | ||
function readRle(reader, count, bitWidth, values, seen) { | ||
function readRle(reader, count, bitWidth, output, seen) { | ||
const width = bitWidth + 7 >> 3 | ||
@@ -63,2 +61,3 @@ let value = 0 | ||
value = reader.view.getUint8(reader.offset) | ||
// assert(value < 1 << bitWidth) | ||
} else if (width === 2) { | ||
@@ -75,3 +74,3 @@ value = reader.view.getUint16(reader.offset, true) | ||
for (let i = 0; i < count; i++) { | ||
values[seen + i] = value | ||
output[seen + i] = value | ||
} | ||
@@ -84,13 +83,11 @@ } | ||
* | ||
* @param {DataReader} reader - buffer to read data from | ||
* @param {number} header - header information | ||
* @param {number} bitWidth - width of each bit-packed group | ||
* @param {number[]} values - output array | ||
* @param {number} seen - number of values seen so far | ||
* @returns {number} number of values seen | ||
* @param {DataReader} reader | ||
* @param {number} header - bit-pack header | ||
* @param {number} bitWidth | ||
* @param {DecodedArray} output | ||
* @param {number} seen | ||
* @returns {number} total output values so far | ||
*/ | ||
function readBitPacked(reader, header, bitWidth, values, seen) { | ||
// extract number of values to read from header | ||
let count = header >> 1 << 3 | ||
// mask for bitWidth number of bits | ||
function readBitPacked(reader, header, bitWidth, output, seen) { | ||
let count = header >> 1 << 3 // values to read | ||
const mask = (1 << bitWidth) - 1 | ||
@@ -121,5 +118,5 @@ | ||
} else { | ||
if (seen < values.length) { | ||
// emit value by shifting off to the right and masking | ||
values[seen++] = data >> right & mask | ||
if (seen < output.length) { | ||
// emit value | ||
output[seen++] = data >> right & mask | ||
} | ||
@@ -133,1 +130,54 @@ count-- | ||
} | ||
/** | ||
* @typedef {import("./types.d.ts").ParquetType} ParquetType | ||
* @param {DataReader} reader | ||
* @param {number} count | ||
* @param {ParquetType} type | ||
* @param {number | undefined} typeLength | ||
* @returns {DecodedArray} | ||
*/ | ||
export function byteStreamSplit(reader, count, type, typeLength) { | ||
const width = byteWidth(type, typeLength) | ||
const bytes = new Uint8Array(count * width) | ||
for (let b = 0; b < width; b++) { | ||
for (let i = 0; i < count; i++) { | ||
bytes[i * width + b] = reader.view.getUint8(reader.offset++) | ||
} | ||
} | ||
// interpret bytes as typed array | ||
if (type === 'FLOAT') return new Float32Array(bytes.buffer) | ||
else if (type === 'DOUBLE') return new Float64Array(bytes.buffer) | ||
else if (type === 'INT32') return new Int32Array(bytes.buffer) | ||
else if (type === 'INT64') return new BigInt64Array(bytes.buffer) | ||
else if (type === 'FIXED_LEN_BYTE_ARRAY') { | ||
// split into arrays of typeLength | ||
const split = new Array(count) | ||
for (let i = 0; i < count; i++) { | ||
split[i] = bytes.subarray(i * width, (i + 1) * width) | ||
} | ||
return split | ||
} | ||
throw new Error(`parquet byte_stream_split unsupported type: ${type}`) | ||
} | ||
/** | ||
* @param {ParquetType} type | ||
* @param {number | undefined} typeLength | ||
* @returns {number} | ||
*/ | ||
function byteWidth(type, typeLength) { | ||
switch (type) { | ||
case 'INT32': | ||
case 'FLOAT': | ||
return 4 | ||
case 'INT64': | ||
case 'DOUBLE': | ||
return 8 | ||
case 'FIXED_LEN_BYTE_ARRAY': | ||
if (!typeLength) throw new Error('parquet byteWidth missing type_length') | ||
return typeLength | ||
default: | ||
throw new Error(`parquet unsupported type: ${type}`) | ||
} | ||
} |
@@ -103,2 +103,3 @@ import type { AsyncBuffer, Compressors, FileMetaData, SchemaTree } from './types.d.ts' | ||
compressors?: Compressors // custom decompressors | ||
utf8?: boolean // decode byte arrays as utf8 strings (default true) | ||
} | ||
@@ -105,0 +106,0 @@ |
@@ -1,2 +0,2 @@ | ||
import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, ParquetType } from './constants.js' | ||
import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, PageType, ParquetType } from './constants.js' | ||
import { parseFloat16 } from './convert.js' | ||
@@ -142,3 +142,3 @@ import { getSchemaPath } from './schema.js' | ||
encoding_stats: column.field_3.field_13?.map((/** @type {any} */ encodingStat) => ({ | ||
page_type: encodingStat.field_1, | ||
page_type: PageType[encodingStat.field_1], | ||
encoding: Encoding[encodingStat.field_2], | ||
@@ -145,0 +145,0 @@ count: encodingStat.field_3, |
@@ -47,3 +47,3 @@ /** | ||
for (let i = 0; i < count; i++) { | ||
const byteOffset = reader.offset + Math.floor(i / 8) | ||
const byteOffset = reader.offset + (i / 8 | 0) | ||
const bitOffset = i % 8 | ||
@@ -50,0 +50,0 @@ const byte = reader.view.getUint8(byteOffset) |
@@ -41,5 +41,4 @@ | ||
const { metadata, onComplete } = options | ||
const { metadata, onComplete, rowEnd } = options | ||
const rowStart = options.rowStart || 0 | ||
const rowEnd = options.rowEnd || Number(metadata.num_rows) | ||
/** @type {any[][]} */ | ||
@@ -54,3 +53,3 @@ const rowData = [] | ||
// if row group overlaps with row range, read it | ||
if (groupStart + groupRows >= rowStart && groupStart < rowEnd) { | ||
if (groupStart + groupRows >= rowStart && (rowEnd === undefined || groupStart < rowEnd)) { | ||
// read row group | ||
@@ -61,3 +60,3 @@ const groupData = await readRowGroup(options, rowGroup, groupStart) | ||
const start = Math.max(rowStart - groupStart, 0) | ||
const end = Math.min(rowEnd - groupStart, groupRows) | ||
const end = rowEnd === undefined ? undefined : rowEnd - groupStart | ||
concat(rowData, groupData.slice(start, end)) | ||
@@ -88,3 +87,3 @@ } | ||
async function readRowGroup(options, rowGroup, groupStart) { | ||
const { file, metadata, columns, compressors } = options | ||
const { file, metadata, columns } = options | ||
if (!metadata) throw new Error('parquet metadata not found') | ||
@@ -159,6 +158,5 @@ | ||
const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema) | ||
const reader = { view: new DataView(arrayBuffer), offset: bufferOffset } | ||
/** @type {any[] | undefined} */ | ||
let columnData = readColumn( | ||
arrayBuffer, bufferOffset, rowGroup, columnMetadata, schemaPath, compressors | ||
) | ||
let columnData = readColumn(reader, rowGroup, columnMetadata, schemaPath, options) | ||
// assert(columnData.length === Number(rowGroup.num_rows) | ||
@@ -165,0 +163,0 @@ |
@@ -49,17 +49,2 @@ /** | ||
/** | ||
* Check if the schema path and all its ancestors are required. | ||
* | ||
* @param {SchemaTree[]} schemaPath | ||
* @returns {boolean} true if the element is required | ||
*/ | ||
export function isRequired(schemaPath) { | ||
for (const { element } of schemaPath.slice(1)) { | ||
if (element.repetition_type !== 'REQUIRED') { | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
/** | ||
* Get the max repetition level for a given schema path. | ||
@@ -130,3 +115,3 @@ * | ||
const keyChild = firstChild.children.find(child => child.element.name === 'key') | ||
if (keyChild?.element.repetition_type !== 'REQUIRED') return false | ||
if (keyChild?.element.repetition_type === 'REPEATED') return false | ||
@@ -133,0 +118,0 @@ const valueChild = firstChild.children.find(child => child.element.name === 'value') |
@@ -143,5 +143,3 @@ /** | ||
if (outPos !== outputLength) { | ||
throw new Error('premature end of input') | ||
} | ||
if (outPos !== outputLength) throw new Error('premature end of input') | ||
} |
@@ -298,4 +298,5 @@ export type Awaitable<T> = T | Promise<T> | ||
BigInt64Array | | ||
BigUint64Array | | ||
Float32Array | | ||
Float64Array | | ||
any[] |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
101540
2660
22