Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

hyparquet

Package Overview
Dependencies
Maintainers
1
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

hyparquet - npm Package Compare versions

Comparing version 0.9.5 to 0.9.6

8

package.json
{
"name": "hyparquet",
"version": "0.9.5",
"version": "0.9.6",
"description": "parquet file parser for javascript",

@@ -31,9 +31,9 @@ "keywords": [

"@types/node": "20.12.12",
"@typescript-eslint/eslint-plugin": "7.9.0",
"@typescript-eslint/eslint-plugin": "7.10.0",
"@vitest/coverage-v8": "1.6.0",
"eslint": "8.57.0",
"eslint-plugin-import": "2.29.1",
"eslint-plugin-jsdoc": "48.2.5",
"eslint-plugin-jsdoc": "48.2.6",
"http-server": "14.1.1",
"hyparquet-compressors": "0.1.2",
"hyparquet-compressors": "0.1.3",
"typescript": "5.4.5",

@@ -40,0 +40,0 @@ "vitest": "1.6.0"

@@ -9,3 +9,3 @@ # hyparquet

[![dependencies](https://img.shields.io/badge/Dependencies-0-blueviolet)](https://www.npmjs.com/package/hyparquet?activeTab=dependencies)
![coverage](https://img.shields.io/badge/Coverage-95-darkred)
![coverage](https://img.shields.io/badge/Coverage-96-darkred)

@@ -12,0 +12,0 @@ Dependency free since 2023!

@@ -40,3 +40,3 @@ import { isListLike, isMapLike } from './schema.js'

// Pop up to start of rep level
while (currentDepth && (rep < currentRepLevel || repetitionPath[currentDepth] === 'OPTIONAL')) {
while (currentDepth && (rep < currentRepLevel || repetitionPath[currentDepth] !== 'REPEATED')) {
if (repetitionPath[currentDepth] !== 'REQUIRED') {

@@ -53,3 +53,6 @@ containerStack.pop()

// Go deeper to end of definition level
while (currentDepth < repetitionPath.length - 2 && currentDefLevel < def) {
while (
(currentDepth < repetitionPath.length - 2 || repetitionPath[currentDepth + 1] === 'REPEATED') &&
(currentDefLevel < def || repetitionPath[currentDepth + 1] === 'REQUIRED')
) {
currentDepth++

@@ -110,4 +113,9 @@ if (repetitionPath[currentDepth] !== 'REQUIRED') {

if (isListLike(schema)) {
const sublist = schema.children[0].children[0]
assembleNested(subcolumnData, sublist, nextDepth + 1)
let sublist = schema.children[0]
let subDepth = nextDepth
if (sublist.children.length === 1) {
sublist = sublist.children[0]
subDepth++
}
assembleNested(subcolumnData, sublist, subDepth)

@@ -164,6 +172,6 @@ const subcolumn = sublist.path.join('.')

// invert struct by depth
const inverted = invertStruct(struct, nextDepth)
const invertDepth = schema.element.repetition_type === 'REQUIRED' ? depth : depth + 1
const inverted = invertStruct(struct, invertDepth)
if (optional) flattenAtDepth(inverted, depth)
subcolumnData.set(path, inverted)
return
}

@@ -170,0 +178,0 @@ // assert(schema.element.repetition_type !== 'REPEATED')

import { assembleLists } from './assemble.js'
import { convert } from './convert.js'
import { convert, dereferenceDictionary } from './convert.js'
import { readDataPage, readDictionaryPage } from './datapage.js'

@@ -11,39 +11,29 @@ import { readDataPageV2 } from './datapageV2.js'

/**
* @typedef {import('./types.js').SchemaTree} SchemaTree
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData
* @typedef {import('./types.js').Compressors} Compressors
* @typedef {import('./types.js').RowGroup} RowGroup
*/
/**
* Parse column data from a buffer.
*
* @param {ArrayBuffer} arrayBuffer parquet file contents
* @param {number} columnOffset offset to start reading from
* @param {RowGroup} rowGroup row group metadata
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData
* @typedef {import('./types.js').DecodedArray} DecodedArray
* @param {import('./types.js').DataReader} reader
* @param {import('./types.js').RowGroup} rowGroup row group metadata
* @param {ColumnMetaData} columnMetadata column metadata
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {Compressors} [compressors] custom decompressors
* @param {import('./types.js').SchemaTree[]} schemaPath schema path for the column
* @param {import('./hyparquet.js').ParquetReadOptions} options read options
* @returns {any[]} array of values
*/
export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schemaPath, compressors) {
/** @type {ArrayLike<any> | undefined} */
export function readColumn(reader, rowGroup, columnMetadata, schemaPath, { compressors, utf8 }) {
const { element } = schemaPath[schemaPath.length - 1]
/** @type {DecodedArray | undefined} */
let dictionary = undefined
let valuesSeen = 0
let seen = 0
/** @type {any[]} */
const rowData = []
const { element } = schemaPath[schemaPath.length - 1]
// column reader:
const reader = { view: new DataView(arrayBuffer, columnOffset), offset: 0 }
while (valuesSeen < rowGroup.num_rows) {
while (seen < rowGroup.num_rows) {
// parse column header
const header = parquetHeader(reader)
if (header.compressed_page_size === undefined) {
throw new Error('parquet compressed page size is undefined')
}
// assert(header.compressed_page_size !== undefined)
// read compressed_page_size bytes starting at offset
const compressedBytes = new Uint8Array(
arrayBuffer, columnOffset + reader.offset, header.compressed_page_size
reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size
)

@@ -58,12 +48,10 @@

const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors)
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
valuesSeen += daph.num_values
seen += daph.num_values
// assert(!daph.statistics || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length))
// construct output values: skip nulls and construct lists
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
values = dereferenceDictionary(dictionary, dataPage)
values = convert(values, element, utf8)
if (repetitionLevels.length || definitionLevels?.length) {

@@ -81,3 +69,3 @@ // Use repetition levels to construct lists

if (schemaPath[i].element.repetition_type !== 'REQUIRED') {
values = [values]
values = Array.from(values, e => [e])
}

@@ -95,6 +83,6 @@ }

)
valuesSeen += daph2.num_values
seen += daph2.num_values
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
values = dereferenceDictionary(dictionary, dataPage)
values = convert(values, element, utf8)
if (repetitionLevels.length || definitionLevels?.length) {

@@ -130,17 +118,2 @@ // Use repetition levels to construct lists

/**
* Map data to dictionary values in place.
*
* @typedef {import('./types.js').DecodedArray} DecodedArray
* @param {ArrayLike<any> | undefined} dictionary
* @param {DecodedArray} dataPage
*/
function dereferenceDictionary(dictionary, dataPage) {
if (dictionary) {
for (let i = 0; i < dataPage.length; i++) {
dataPage[i] = dictionary[dataPage[i]]
}
}
}
/**
* Find the start byte offset for a column chunk.

@@ -153,3 +126,3 @@ *

let columnOffset = dictionary_page_offset
if (dictionary_page_offset === undefined || data_page_offset < dictionary_page_offset) {
if (!dictionary_page_offset || data_page_offset < dictionary_page_offset) {
columnOffset = data_page_offset

@@ -164,3 +137,3 @@ }

* @param {import('./types.js').CompressionCodec} codec
* @param {Compressors | undefined} compressors
* @param {import('./types.js').Compressors | undefined} compressors
* @returns {Uint8Array}

@@ -167,0 +140,0 @@ */

@@ -9,14 +9,7 @@ const dayMillis = 86400000 // 1 day in milliseconds

* @param {import('./types.js').SchemaElement} schemaElement schema element for the data
* @param {boolean | undefined} utf8 decode bytes as utf8?
* @returns {DecodedArray} series of rich types
*/
export function convert(data, schemaElement) {
export function convert(data, schemaElement, utf8 = true) {
const ctype = schemaElement.converted_type
if (ctype === 'UTF8') {
const decoder = new TextDecoder()
const arr = new Array(data.length)
for (let i = 0; i < arr.length; i++) {
arr[i] = data[i] && decoder.decode(data[i])
}
return arr
}
if (ctype === 'DECIMAL') {

@@ -45,4 +38,19 @@ const scale = schemaElement.scale || 0

}
if (ctype === 'TIMESTAMP_MILLIS') {
const arr = new Array(data.length)
for (let i = 0; i < arr.length; i++) {
arr[i] = new Date(Number(data[i]))
}
return arr
}
if (ctype === 'TIMESTAMP_MICROS') {
const arr = new Array(data.length)
for (let i = 0; i < arr.length; i++) {
arr[i] = new Date(Number(data[i] / 1000n))
}
return arr
}
if (ctype === 'JSON') {
return data.map(v => JSON.parse(v))
const decoder = new TextDecoder()
return data.map(v => JSON.parse(decoder.decode(v)))
}

@@ -55,3 +63,17 @@ if (ctype === 'BSON') {

}
// TODO: ctype UINT
if (ctype === 'UTF8' || utf8 && schemaElement.type === 'BYTE_ARRAY') {
const decoder = new TextDecoder()
const arr = new Array(data.length)
for (let i = 0; i < arr.length; i++) {
arr[i] = data[i] && decoder.decode(data[i])
}
return arr
}
if (ctype === 'UINT_64') {
const arr = new BigUint64Array(data.length)
for (let i = 0; i < arr.length; i++) {
arr[i] = BigInt(data[i])
}
return arr
}
const logicalType = schemaElement.logical_type?.type

@@ -61,3 +83,9 @@ if (logicalType === 'FLOAT16') {

}
// TODO: logical types
if (logicalType === 'TIMESTAMP') {
const arr = new Array(data.length)
for (let i = 0; i < arr.length; i++) {
arr[i] = new Date(Number(data[i]))
}
return arr
}
return data

@@ -104,1 +132,23 @@ }

}
/**
* Map data to dictionary values in place.
*
* @param {DecodedArray | undefined} dictionary
* @param {DecodedArray} dataPage
* @returns {DecodedArray}
*/
export function dereferenceDictionary(dictionary, dataPage) {
let output = dataPage
if (dictionary) {
if (dataPage instanceof Uint8Array && !(dictionary instanceof Uint8Array)) {
// upgrade dataPage to match dictionary type
// @ts-expect-error not my fault typescript doesn't understand constructors
output = new dictionary.constructor(dataPage.length)
}
for (let i = 0; i < dataPage.length; i++) {
output[i] = dictionary[dataPage[i]]
}
}
return output
}

@@ -1,8 +0,7 @@

import { byteStreamSplit } from './byteStreamSplit.js'
import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js'
import { readPlain } from './plain.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
/**
* Read a data page from the given Uint8Array.
* Read a data page from uncompressed reader.
*

@@ -40,3 +39,2 @@ * @typedef {import("./types.d.ts").DataPage} DataPage

) {
// TODO: RLE encoding uses bitWidth = schemaElement.type_length
const bitWidth = type === 'BOOLEAN' ? 1 : view.getUint8(reader.offset++)

@@ -50,6 +48,4 @@ if (bitWidth) {

} else if (daph.encoding === 'BYTE_STREAM_SPLIT') {
if (type === 'FLOAT') dataPage = new Float32Array(nValues)
else if (type === 'DOUBLE') dataPage = new Float64Array(nValues)
else throw new Error(`parquet byte_stream_split unsupported type: ${type}`)
byteStreamSplit(reader, nValues, dataPage)
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = byteStreamSplit(reader, nValues, type, type_length)
} else {

@@ -63,4 +59,2 @@ throw new Error(`parquet unsupported encoding: ${daph.encoding}`)

/**
* Read a page containing dictionary data.
*
* @param {Uint8Array} bytes raw page data

@@ -70,3 +64,3 @@ * @param {import("./types.d.ts").DictionaryPageHeader} diph dictionary page header

* @param {number | undefined} typeLength - type_length from schema
* @returns {ArrayLike<any>} array of values
* @returns {DecodedArray}
*/

@@ -80,4 +74,2 @@ export function readDictionaryPage(bytes, diph, columnMetadata, typeLength) {

/**
* Read the repetition levels from this page, if any.
*
* @typedef {import("./types.d.ts").DataReader} DataReader

@@ -93,5 +85,4 @@ * @param {DataReader} reader data view for the page

if (maxRepetitionLevel) {
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
const values = new Array(daph.num_values)
readRleBitPackedHybrid(reader, bitWidth, 0, values)
readRleBitPackedHybrid(reader, bitWidth(maxRepetitionLevel), 0, values)
return values

@@ -104,4 +95,2 @@ }

/**
* Read the definition levels from this page, if any.
*
* @param {DataReader} reader data view for the page

@@ -113,23 +102,16 @@ * @param {DataPageHeader} daph data page header

function readDefinitionLevels(reader, daph, schemaPath) {
if (!isRequired(schemaPath)) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
if (bitWidth) {
// num_values is index 1 for either type of page header
const definitionLevels = new Array(daph.num_values)
readRleBitPackedHybrid(reader, bitWidth, 0, definitionLevels)
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
if (!maxDefinitionLevel) return { definitionLevels: [], numNulls: 0 }
// count nulls
let numNulls = daph.num_values
for (const def of definitionLevels) {
if (def === maxDefinitionLevel) numNulls--
}
if (numNulls === 0) {
definitionLevels.length = 0
}
const definitionLevels = new Array(daph.num_values)
readRleBitPackedHybrid(reader, bitWidth(maxDefinitionLevel), 0, definitionLevels)
return { definitionLevels, numNulls }
}
// count nulls
let numNulls = daph.num_values
for (const def of definitionLevels) {
if (def === maxDefinitionLevel) numNulls--
}
return { definitionLevels: [], numNulls: 0 }
if (numNulls === 0) definitionLevels.length = 0
return { definitionLevels, numNulls }
}

@@ -1,5 +0,4 @@

import { byteStreamSplit } from './byteStreamSplit.js'
import { decompressPage } from './column.js'
import { deltaBinaryUnpack, deltaByteArray, deltaLengthByteArray } from './delta.js'
import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js'
import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js'
import { readPlain } from './plain.js'

@@ -32,7 +31,6 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'

const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schemaPath)
// assert(reader.offset === daph2.repetition_levels_byte_length)
reader.offset = daph2.repetition_levels_byte_length // readVarInt() => len for boolean v2?
// definition levels
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
const definitionLevels = readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel)
const definitionLevels = readDefinitionLevelsV2(reader, daph2, schemaPath)
// assert(reader.offset === daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length)

@@ -43,3 +41,5 @@

let page = compressedBytes.subarray(reader.offset)
page = decompressPage(page, uncompressedPageSize, codec, compressors)
if (daph2.is_compressed !== false) {
page = decompressPage(page, uncompressedPageSize, codec, compressors)
}
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)

@@ -56,5 +56,6 @@ const pageReader = { view: pageView, offset: 0 }

} else if (daph2.encoding === 'RLE') {
pageReader.offset = 4
// assert(columnMetadata.type === 'BOOLEAN')
dataPage = new Array(nValues)
readRleBitPackedHybrid(pageReader, 1, uncompressedPageSize, dataPage)
readRleBitPackedHybrid(pageReader, 1, 0, dataPage)
dataPage = dataPage.map(x => !!x)
} else if (

@@ -64,6 +65,5 @@ daph2.encoding === 'PLAIN_DICTIONARY' ||

) {
const bitWidth = pageView.getUint8(0)
pageReader.offset = 1
const bitWidth = pageView.getUint8(pageReader.offset++)
dataPage = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, dataPage)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize - 1, dataPage)
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') {

@@ -80,6 +80,4 @@ const int32 = type === 'INT32'

} else if (daph2.encoding === 'BYTE_STREAM_SPLIT') {
if (type === 'FLOAT') dataPage = new Float32Array(nValues)
else if (type === 'DOUBLE') dataPage = new Float64Array(nValues)
else throw new Error(`parquet byte_stream_split unsupported type: ${type}`)
byteStreamSplit(pageReader, nValues, dataPage)
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = byteStreamSplit(reader, nValues, type, type_length)
} else {

@@ -103,7 +101,5 @@ throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)

const bitWidth = widthFromMaxInt(maxRepetitionLevel)
// num_values is index 1 for either type of page header
const values = new Array(daph2.num_values)
readRleBitPackedHybrid(
reader, bitWidth, daph2.repetition_levels_byte_length, values
reader, bitWidth(maxRepetitionLevel), daph2.repetition_levels_byte_length, values
)

@@ -116,13 +112,13 @@ return values

* @param {DataPageHeaderV2} daph2 data page header v2
* @param {number} maxDefinitionLevel
* @param {SchemaTree[]} schemaPath
* @returns {number[] | undefined} definition levels
*/
function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) {
function readDefinitionLevelsV2(reader, daph2, schemaPath) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
if (maxDefinitionLevel) {
// not the same as V1, because we know the length
const bitWidth = widthFromMaxInt(maxDefinitionLevel)
// V2 we know the length
const values = new Array(daph2.num_values)
readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values)
readRleBitPackedHybrid(reader, bitWidth(maxDefinitionLevel), daph2.definition_levels_byte_length, values)
return values
}
}

@@ -6,10 +6,10 @@ import { readVarInt, readZigZagBigInt } from './thrift.js'

* @param {DataReader} reader
* @param {number} nValues number of values to read
* @param {Int32Array | BigInt64Array} output output array
* @param {number} count number of values to read
* @param {Int32Array | BigInt64Array} output
*/
export function deltaBinaryUnpack(reader, nValues, output) {
export function deltaBinaryUnpack(reader, count, output) {
const int32 = output instanceof Int32Array
const blockSize = readVarInt(reader)
const miniblockPerBlock = readVarInt(reader)
readVarInt(reader) // assert(count === nValues)
readVarInt(reader) // assert(=== count)
let value = readZigZagBigInt(reader) // first value

@@ -21,3 +21,3 @@ let outputIndex = 0

while (outputIndex < nValues) {
while (outputIndex < count) {
// new block

@@ -30,3 +30,3 @@ const minDelta = readZigZagBigInt(reader)

for (let i = 0; i < miniblockPerBlock && outputIndex < nValues; i++) {
for (let i = 0; i < miniblockPerBlock && outputIndex < count; i++) {
// new miniblock

@@ -38,3 +38,3 @@ const bitWidth = BigInt(bitWidths[i])

const mask = (1n << bitWidth) - 1n
while (miniblockCount && outputIndex < nValues) {
while (miniblockCount && outputIndex < count) {
let bits = BigInt(reader.view.getUint8(reader.offset)) >> bitpackPos & mask // TODO: don't re-read value every time

@@ -59,3 +59,3 @@ bitpackPos += bitWidth

} else {
for (let j = 0; j < valuesPerMiniblock && outputIndex < nValues; j++) {
for (let j = 0; j < valuesPerMiniblock && outputIndex < count; j++) {
value += minDelta

@@ -71,9 +71,9 @@ output[outputIndex++] = int32 ? Number(value) : value

* @param {DataReader} reader
* @param {number} nValues
* @param {number} count
* @param {Uint8Array[]} output
*/
export function deltaLengthByteArray(reader, nValues, output) {
const lengths = new Int32Array(nValues)
deltaBinaryUnpack(reader, nValues, lengths)
for (let i = 0; i < nValues; i++) {
export function deltaLengthByteArray(reader, count, output) {
const lengths = new Int32Array(count)
deltaBinaryUnpack(reader, count, lengths)
for (let i = 0; i < count; i++) {
output[i] = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, lengths[i])

@@ -86,12 +86,12 @@ reader.offset += lengths[i]

* @param {DataReader} reader
* @param {number} nValues
* @param {number} count
* @param {Uint8Array[]} output
*/
export function deltaByteArray(reader, nValues, output) {
const prefixData = new Int32Array(nValues)
deltaBinaryUnpack(reader, nValues, prefixData)
const suffixData = new Int32Array(nValues)
deltaBinaryUnpack(reader, nValues, suffixData)
export function deltaByteArray(reader, count, output) {
const prefixData = new Int32Array(count)
deltaBinaryUnpack(reader, count, prefixData)
const suffixData = new Int32Array(count)
deltaBinaryUnpack(reader, count, suffixData)
for (let i = 0; i < nValues; i++) {
for (let i = 0; i < count; i++) {
const suffix = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, suffixData[i])

@@ -98,0 +98,0 @@ if (prefixData[i]) {

import { readVarInt } from './thrift.js'
/**
* Convert the value specified to a bit width.
* Minimum bits needed to store value.
*
* @param {number} value - value to convert to bitwidth
* @returns {number} bit width of the value
* @param {number} value
* @returns {number}
*/
export function widthFromMaxInt(value) {
return Math.ceil(Math.log2(value + 1))
export function bitWidth(value) {
return 32 - Math.clz32(value)
}

@@ -16,44 +16,42 @@

*
* If length is zero, then read as int32 at the start of the encoded data.
* If length is zero, then read int32 length at the start.
*
* @typedef {import("./types.d.ts").DataReader} DataReader
* @typedef {number[]} DecodedArray
* @param {DataReader} reader - buffer to read data from
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @param {DataReader} reader
* @param {number} width - width of each bit-packed group
* @param {number} length - length of the encoded data
* @param {DecodedArray} values - output array
* @param {DecodedArray} output
*/
export function readRleBitPackedHybrid(reader, width, length, values) {
export function readRleBitPackedHybrid(reader, width, length, output) {
if (!length) {
length = reader.view.getUint32(reader.offset, true)
// length = reader.view.getUint32(reader.offset, true)
reader.offset += 4
}
let seen = 0
while (seen < values.length) {
while (seen < output.length) {
const header = readVarInt(reader)
if (header & 1) {
// bit-packed
seen = readBitPacked(reader, header, width, values, seen)
seen = readBitPacked(reader, header, width, output, seen)
} else {
// rle
const count = header >>> 1
readRle(reader, count, width, values, seen)
readRle(reader, count, width, output, seen)
seen += count
}
}
// assert(reader.offset - startOffset === length)
}
/**
* Read a run-length encoded value.
* Run-length encoding: read value with bitWidth and repeat it count times.
*
* The count is determined from the header and the width is used to grab the
* value that's repeated. Yields the value repeated count times.
*
* @param {DataReader} reader - buffer to read data from
* @param {number} count - number of values to read
* @param {number} bitWidth - width of each bit-packed group
* @param {DecodedArray} values - output array
* @param {number} seen - number of values seen so far
* @param {DataReader} reader
* @param {number} count
* @param {number} bitWidth
* @param {DecodedArray} output
* @param {number} seen
*/
function readRle(reader, count, bitWidth, values, seen) {
function readRle(reader, count, bitWidth, output, seen) {
const width = bitWidth + 7 >> 3

@@ -63,2 +61,3 @@ let value = 0

value = reader.view.getUint8(reader.offset)
// assert(value < 1 << bitWidth)
} else if (width === 2) {

@@ -75,3 +74,3 @@ value = reader.view.getUint16(reader.offset, true)

for (let i = 0; i < count; i++) {
values[seen + i] = value
output[seen + i] = value
}

@@ -84,13 +83,11 @@ }

*
* @param {DataReader} reader - buffer to read data from
* @param {number} header - header information
* @param {number} bitWidth - width of each bit-packed group
* @param {number[]} values - output array
* @param {number} seen - number of values seen so far
* @returns {number} number of values seen
* @param {DataReader} reader
* @param {number} header - bit-pack header
* @param {number} bitWidth
* @param {DecodedArray} output
* @param {number} seen
* @returns {number} total output values so far
*/
function readBitPacked(reader, header, bitWidth, values, seen) {
// extract number of values to read from header
let count = header >> 1 << 3
// mask for bitWidth number of bits
function readBitPacked(reader, header, bitWidth, output, seen) {
let count = header >> 1 << 3 // values to read
const mask = (1 << bitWidth) - 1

@@ -121,5 +118,5 @@

} else {
if (seen < values.length) {
// emit value by shifting off to the right and masking
values[seen++] = data >> right & mask
if (seen < output.length) {
// emit value
output[seen++] = data >> right & mask
}

@@ -133,1 +130,54 @@ count--

}
/**
* @typedef {import("./types.d.ts").ParquetType} ParquetType
* @param {DataReader} reader
* @param {number} count
* @param {ParquetType} type
* @param {number | undefined} typeLength
* @returns {DecodedArray}
*/
export function byteStreamSplit(reader, count, type, typeLength) {
const width = byteWidth(type, typeLength)
const bytes = new Uint8Array(count * width)
for (let b = 0; b < width; b++) {
for (let i = 0; i < count; i++) {
bytes[i * width + b] = reader.view.getUint8(reader.offset++)
}
}
// interpret bytes as typed array
if (type === 'FLOAT') return new Float32Array(bytes.buffer)
else if (type === 'DOUBLE') return new Float64Array(bytes.buffer)
else if (type === 'INT32') return new Int32Array(bytes.buffer)
else if (type === 'INT64') return new BigInt64Array(bytes.buffer)
else if (type === 'FIXED_LEN_BYTE_ARRAY') {
// split into arrays of typeLength
const split = new Array(count)
for (let i = 0; i < count; i++) {
split[i] = bytes.subarray(i * width, (i + 1) * width)
}
return split
}
throw new Error(`parquet byte_stream_split unsupported type: ${type}`)
}
/**
* @param {ParquetType} type
* @param {number | undefined} typeLength
* @returns {number}
*/
function byteWidth(type, typeLength) {
switch (type) {
case 'INT32':
case 'FLOAT':
return 4
case 'INT64':
case 'DOUBLE':
return 8
case 'FIXED_LEN_BYTE_ARRAY':
if (!typeLength) throw new Error('parquet byteWidth missing type_length')
return typeLength
default:
throw new Error(`parquet unsupported type: ${type}`)
}
}

@@ -103,2 +103,3 @@ import type { AsyncBuffer, Compressors, FileMetaData, SchemaTree } from './types.d.ts'

compressors?: Compressors // custom decompressors
utf8?: boolean // decode byte arrays as utf8 strings (default true)
}

@@ -105,0 +106,0 @@

@@ -1,2 +0,2 @@

import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, ParquetType } from './constants.js'
import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, PageType, ParquetType } from './constants.js'
import { parseFloat16 } from './convert.js'

@@ -142,3 +142,3 @@ import { getSchemaPath } from './schema.js'

encoding_stats: column.field_3.field_13?.map((/** @type {any} */ encodingStat) => ({
page_type: encodingStat.field_1,
page_type: PageType[encodingStat.field_1],
encoding: Encoding[encodingStat.field_2],

@@ -145,0 +145,0 @@ count: encodingStat.field_3,

@@ -47,3 +47,3 @@ /**

for (let i = 0; i < count; i++) {
const byteOffset = reader.offset + Math.floor(i / 8)
const byteOffset = reader.offset + (i / 8 | 0)
const bitOffset = i % 8

@@ -50,0 +50,0 @@ const byte = reader.view.getUint8(byteOffset)

@@ -41,5 +41,4 @@

const { metadata, onComplete } = options
const { metadata, onComplete, rowEnd } = options
const rowStart = options.rowStart || 0
const rowEnd = options.rowEnd || Number(metadata.num_rows)
/** @type {any[][]} */

@@ -54,3 +53,3 @@ const rowData = []

// if row group overlaps with row range, read it
if (groupStart + groupRows >= rowStart && groupStart < rowEnd) {
if (groupStart + groupRows >= rowStart && (rowEnd === undefined || groupStart < rowEnd)) {
// read row group

@@ -61,3 +60,3 @@ const groupData = await readRowGroup(options, rowGroup, groupStart)

const start = Math.max(rowStart - groupStart, 0)
const end = Math.min(rowEnd - groupStart, groupRows)
const end = rowEnd === undefined ? undefined : rowEnd - groupStart
concat(rowData, groupData.slice(start, end))

@@ -88,3 +87,3 @@ }

async function readRowGroup(options, rowGroup, groupStart) {
const { file, metadata, columns, compressors } = options
const { file, metadata, columns } = options
if (!metadata) throw new Error('parquet metadata not found')

@@ -159,6 +158,5 @@

const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema)
const reader = { view: new DataView(arrayBuffer), offset: bufferOffset }
/** @type {any[] | undefined} */
let columnData = readColumn(
arrayBuffer, bufferOffset, rowGroup, columnMetadata, schemaPath, compressors
)
let columnData = readColumn(reader, rowGroup, columnMetadata, schemaPath, options)
// assert(columnData.length === Number(rowGroup.num_rows)

@@ -165,0 +163,0 @@

@@ -49,17 +49,2 @@ /**

/**
* Check if the schema path and all its ancestors are required.
*
* @param {SchemaTree[]} schemaPath
* @returns {boolean} true if the element is required
*/
export function isRequired(schemaPath) {
for (const { element } of schemaPath.slice(1)) {
if (element.repetition_type !== 'REQUIRED') {
return false
}
}
return true
}
/**
* Get the max repetition level for a given schema path.

@@ -130,3 +115,3 @@ *

const keyChild = firstChild.children.find(child => child.element.name === 'key')
if (keyChild?.element.repetition_type !== 'REQUIRED') return false
if (keyChild?.element.repetition_type === 'REPEATED') return false

@@ -133,0 +118,0 @@ const valueChild = firstChild.children.find(child => child.element.name === 'value')

@@ -143,5 +143,3 @@ /**

if (outPos !== outputLength) {
throw new Error('premature end of input')
}
if (outPos !== outputLength) throw new Error('premature end of input')
}

@@ -298,4 +298,5 @@ export type Awaitable<T> = T | Promise<T>

BigInt64Array |
BigUint64Array |
Float32Array |
Float64Array |
any[]
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc