Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

hyparquet

Package Overview
Dependencies
Maintainers
1
Versions
58
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

hyparquet - npm Package Compare versions

Comparing version 0.3.3 to 0.4.0

src/convert.js

4

package.json
{
"name": "hyparquet",
"version": "0.3.3",
"version": "0.4.0",
"description": "parquet file parser for javascript",

@@ -31,3 +31,3 @@ "keywords": [

"@types/node": "20.11.20",
"@typescript-eslint/eslint-plugin": "7.0.2",
"@typescript-eslint/eslint-plugin": "7.1.0",
"@vitest/coverage-v8": "1.3.1",

@@ -34,0 +34,0 @@ "eslint": "8.57.0",

@@ -72,2 +72,7 @@ # hyparquet

## Async
Hyparquet supports asynchronous fetching of parquet files, over a network.
You can provide an `AsyncBuffer` which is like a js `ArrayBuffer` but the `slice` method returns `Promise<ArrayBuffer>`.
## Supported Parquet Files

@@ -92,3 +97,3 @@

- [X] Dictionary Page
- [ ] Data Page V2
- [X] Data Page V2

@@ -95,0 +100,0 @@ Contributions are welcome!

import { Encoding, PageType } from './constants.js'
import { convert } from './convert.js'
import { assembleObjects, readDataPage, readDictionaryPage } from './datapage.js'
import { readDataPageV2 } from './datapageV2.js'
import { parquetHeader } from './header.js'

@@ -8,3 +10,2 @@ import { getMaxDefinitionLevel, isRequired, schemaElement } from './schema.js'

/**
* @typedef {import('./types.js').ArrayBufferLike} ArrayBufferLike
* @typedef {import('./types.js').SchemaElement} SchemaElement

@@ -15,4 +16,2 @@ * @typedef {import('./types.js').ColumnMetaData} ColumnMetaData

const dayMillis = 86400000000000 // 1 day in milliseconds
/**

@@ -62,11 +61,7 @@ * Parse column data from a buffer.

// construct output values: skip nulls and construct lists
/** @type {any[]} */
let values
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
// Use repetition levels to construct lists
if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) {
// dereference dictionary values
for (let i = 0; i < dataPage.length; i++) {
dataPage[i] = dictionary[dataPage[i]]
}
}
const isNull = columnMetadata && !isRequired(schema, [columnMetadata.path_in_schema[0]])

@@ -79,36 +74,8 @@ const nullValue = false // TODO: unused?

// Use definition levels to skip nulls
let index = 0
values = []
const decoder = new TextDecoder()
for (let i = 0; i < definitionLevels.length; i++) {
if (definitionLevels[i] === maxDefinitionLevel) {
if (index > dataPage.length) {
throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`)
}
let v = dataPage[index++]
// map to dictionary value
if (dictionary) {
v = dictionary[v]
if (v instanceof Uint8Array) {
try {
v = decoder.decode(v)
} catch (e) {
console.warn('parquet failed to decode byte array as string', e)
}
}
}
values[i] = v
} else {
values[i] = undefined
}
}
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values)
} else {
if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) {
// dereference dictionary values
values = []
for (let i = 0; i < dataPage.length; i++) {
values[i] = dictionary[dataPage[i]]
}
values = convert(values, schemaElement(schema, columnMetadata.path_in_schema))
if (dictionaryEncoding && dictionary) {
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, schemaElement(schema, columnMetadata.path_in_schema))
} else if (Array.isArray(dataPage)) {

@@ -131,6 +98,31 @@ // convert primitive types to rich types

const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec)
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec
)
dictionary = readDictionaryPage(page, diph, schema, columnMetadata)
} else if (header.type === PageType.DATA_PAGE_V2) {
throw new Error('parquet data page v2 not supported')
const daph2 = header.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
const { definitionLevels, repetitionLevels, value: dataPage } = readDataPageV2(
compressedBytes, header, schema, columnMetadata
)
valuesSeen += daph2.num_values
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
// Use repetition levels to construct lists
rowData.push(...assembleObjects(
definitionLevels, repetitionLevels, dataPage, true, false, maxDefinitionLevel, rowIndex[0]
))
} else if (daph2.num_nulls) {
// skip nulls
if (!definitionLevels) throw new Error('parquet data page v2 nulls missing definition levels')
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, rowData)
} else {
dereferenceDictionary(dictionary, dataPage)
rowData.push(...dataPage)
}
// TODO: convert?
} else {

@@ -142,3 +134,3 @@ throw new Error(`parquet unsupported page type: ${header.type}`)

if (rowData.length !== Number(rowGroup.num_rows)) {
throw new Error(`parquet column length ${rowData.length} does not match row group length ${rowGroup.num_rows}}`)
throw new Error(`parquet row data length ${rowData.length} does not match row group length ${rowGroup.num_rows}}`)
}

@@ -149,2 +141,16 @@ return rowData

/**
* Map data to dictionary values in place.
*
* @param {ArrayLike<any> | undefined} dictionary
* @param {number[]} dataPage
*/
function dereferenceDictionary(dictionary, dataPage) {
if (dictionary) {
for (let i = 0; i < dataPage.length; i++) {
dataPage[i] = dictionary[dataPage[i]]
}
}
}
/**
* Find the start byte offset for a column chunk.

@@ -165,59 +171,2 @@ *

/**
* Convert known types from primitive to rich.
*
* @param {any[]} data series of primitive types
* @param {SchemaElement} schemaElement schema element for the data
* @returns {any[]} series of rich types
*/
function convert(data, schemaElement) {
const ctype = schemaElement.converted_type
if (ctype === undefined) return data
if (ctype === 'UTF8') {
const decoder = new TextDecoder()
return data.map(v => decoder.decode(v))
}
if (ctype === 'DECIMAL') {
const scaleFactor = Math.pow(10, schemaElement.scale || 0)
if (typeof data[0] === 'number') {
return scaleFactor === 1 ? data : data.map(v => v * scaleFactor)
} else if (typeof data[0] === 'bigint') {
return scaleFactor === 1 ? data : data.map(v => Number(v) * scaleFactor)
} else {
return data.map(v => parseDecimal(v) * scaleFactor)
}
}
if (ctype === 'DATE') {
return data.map(v => new Date(v * dayMillis))
}
if (ctype === 'TIME_MILLIS') {
return data.map(v => new Date(v))
}
if (ctype === 'JSON') {
return data.map(v => JSON.parse(v))
}
if (ctype === 'BSON') {
throw new Error('parquet bson not supported')
}
if (ctype === 'INTERVAL') {
throw new Error('parquet interval not supported')
}
return data
}
/**
* Parse decimal from byte array.
*
* @param {Uint8Array} bytes
* @returns {number}
*/
function parseDecimal(bytes) {
// TODO: handle signed
let value = 0
for (const byte of bytes) {
value = value << 8 | byte
}
return value
}
/**
* @typedef {import('./types.js').PageHeader} PageHeader

@@ -230,3 +179,3 @@ * @typedef {import('./types.js').CompressionCodec} CompressionCodec

*/
function decompressPage(compressedBytes, uncompressed_page_size, codec) {
export function decompressPage(compressedBytes, uncompressed_page_size, codec) {
/** @type {Uint8Array | undefined} */

@@ -247,1 +196,39 @@ let page

}
/**
* Expand data page list with nulls and convert to utf8.
* @param {number[]} definitionLevels
* @param {number} maxDefinitionLevel
* @param {ArrayLike<any>} dataPage
* @param {any} dictionary
* @param {any[]} output
*/
function skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, output) {
if (output.length) throw new Error('parquet output array is not empty')
// Use definition levels to skip nulls
let index = 0
const decoder = new TextDecoder()
for (let i = 0; i < definitionLevels.length; i++) {
if (definitionLevels[i] === maxDefinitionLevel) {
if (index > dataPage.length) {
throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`)
}
let v = dataPage[index++]
// map to dictionary value
if (dictionary) {
v = dictionary[v]
if (v instanceof Uint8Array) {
try {
v = decoder.decode(v)
} catch (e) {
console.warn('parquet failed to decode byte array as string', e)
}
}
}
output[i] = v
} else {
output[i] = undefined
}
}
}

@@ -14,3 +14,4 @@ import { Encoding, ParquetType } from './constants.js'

/**
* @typedef {{ definitionLevels: number[] | undefined, repetitionLevels: number[], value: ArrayLike<any> }} DataPage
* @typedef {{ byteLength: number, definitionLevels: number[], numNulls: number }} DefinitionLevels
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData

@@ -38,3 +39,3 @@ * @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader

let offset = 0
/** @type {ArrayLike<any>} */
/** @type {any[]} */
let values = []

@@ -52,2 +53,3 @@

// let maxDefinitionLevel = -1
// TODO: move into readDefinitionLevels
if (skipNulls && !isRequired(schema, columnMetadata.path_in_schema)) {

@@ -57,3 +59,3 @@ // skip_definition_bytes

} else {
const dl = readDefinitionLevels(dataView, offset, daph, schema, columnMetadata)
const dl = readDefinitionLevels(dataView, offset, daph, schema, columnMetadata.path_in_schema)
definitionLevels = dl.definitionLevels

@@ -65,8 +67,8 @@ numNulls = dl.numNulls

// read values based on encoding
const nval = daph.num_values - numNulls
const nValues = daph.num_values - numNulls
if (daph.encoding === Encoding.PLAIN) {
const se = schemaElement(schema, columnMetadata.path_in_schema)
const utf8 = se.converted_type === 'UTF8'
const plainObj = readPlain(dataView, columnMetadata.type, nval, offset, utf8)
values = plainObj.value
const plainObj = readPlain(dataView, columnMetadata.type, nValues, offset, utf8)
values = Array.isArray(plainObj.value) ? plainObj.value : Array.from(plainObj.value)
offset += plainObj.byteLength

@@ -89,9 +91,9 @@ } else if (

const { value, byteLength } = readRleBitPackedHybrid(
dataView, offset, bitWidth, dataView.byteLength - offset, nval
dataView, offset, bitWidth, dataView.byteLength - offset, nValues
)
offset += byteLength
values = value
values = Array.isArray(value) ? value : Array.from(value)
} else {
// nval zeros
values = new Array(nval).fill(0)
values = new Array(nValues).fill(0)
}

@@ -134,5 +136,4 @@ } else {

const maxRepetitionLevel = getMaxRepetitionLevel(schema, columnMetadata.path_in_schema)
if (maxRepetitionLevel !== 0) {
if (maxRepetitionLevel) {
const bitWidth = widthFromMaxInt(maxRepetitionLevel)
// num_values is index 1 for either type of page header
return readData(

@@ -146,4 +147,2 @@ dataView, daph.repetition_level_encoding, offset, daph.num_values, bitWidth

/** @typedef {{ byteLength: number, definitionLevels: number[], numNulls: number }} DefinitionLevels */
/**

@@ -156,8 +155,8 @@ * Read the definition levels from this page, if any.

* @param {SchemaElement[]} schema schema for the file
* @param {ColumnMetaData} columnMetadata metadata for the column
* @param {string[]} path_in_schema path in the schema
* @returns {DefinitionLevels} definition levels and number of bytes read
*/
function readDefinitionLevels(dataView, offset, daph, schema, columnMetadata) {
if (!isRequired(schema, columnMetadata.path_in_schema)) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema)
function readDefinitionLevels(dataView, offset, daph, schema, path_in_schema) {
if (!isRequired(schema, path_in_schema)) {
const maxDefinitionLevel = getMaxDefinitionLevel(schema, path_in_schema)
const bitWidth = widthFromMaxInt(maxDefinitionLevel)

@@ -164,0 +163,0 @@ if (bitWidth) {

@@ -152,2 +152,3 @@ import { Encoding, ParquetType } from './constants.js'

*
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @param {DataView} dataView - buffer to read data from

@@ -158,3 +159,3 @@ * @param {number} type - parquet type of the data

* @param {boolean} utf8 - whether to decode byte arrays as UTF-8
* @returns {Decoded<ArrayLike<any>>} array of values
* @returns {Decoded<DecodedArray>} array of values
*/

@@ -253,3 +254,3 @@ export function readPlain(dataView, type, count, offset, utf8) {

const startByteLength = byteLength
while (byteLength - startByteLength < length) {
while (offset + byteLength - startByteLength < length) {
const [header, newOffset] = readVarInt(dataView, offset + byteLength)

@@ -329,3 +330,9 @@ byteLength = newOffset - offset

let data = dataView.getUint8(offset)
// Sometimes it tries to read outside of available memory, but it will be masked out anyway
let data = 0
if (offset < dataView.byteLength) {
data = dataView.getUint8(offset)
} else if (mask) {
throw new Error(`parquet bitpack offset ${offset} out of range`)
}
let byteLength = 1

@@ -332,0 +339,0 @@ let left = 8

@@ -14,3 +14,2 @@ import { deserializeTCompactProtocol } from './thrift.js'

*
* @typedef {import("./types.d.ts").ArrayBufferLike} ArrayBufferLike
* @typedef {import("./types.d.ts").PageHeader} PageHeader

@@ -56,3 +55,3 @@ * @param {ArrayBuffer} arrayBuffer parquet file contents

repetition_levels_byte_length: header.field_8.field_6,
is_compressed: header.field_8.field_7,
is_compressed: header.field_8.field_7 === undefined ? true : header.field_8.field_7, // default to true
statistics: header.field_8.field_8,

@@ -59,0 +58,0 @@ }

@@ -141,3 +141,3 @@

if (columnData.length !== Number(rowGroup.num_rows)) {
throw new Error('parquet column length does not match row group length')
throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`)
}

@@ -144,0 +144,0 @@ // notify caller of column data

@@ -179,3 +179,3 @@ // TCompactProtocol types

*/
function readZigZag(view, index) {
export function readZigZag(view, index) {
const [zigzag, newIndex] = readVarInt(view, index)

@@ -182,0 +182,0 @@ // convert zigzag to int

@@ -10,10 +10,2 @@ /**

/**
* Just like an ArrayBuffer, but an interface
*/
export interface ArrayBufferLike {
byteLength: number
slice(start: number, end?: number): ArrayBuffer
}
/**
* Represents a decoded value, and includes the number of bytes read.

@@ -216,1 +208,9 @@ * This is used to read data from the file and advance a virtual file pointer.

}
type DecodedArray = any[] | Uint8Array
interface DataPage {
definitionLevels: number[] | undefined
repetitionLevels: number[]
value: any[]
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc