Comparing version 0.9.3 to 0.9.4
{ | ||
"name": "hyparquet", | ||
"version": "0.9.3", | ||
"version": "0.9.4", | ||
"description": "parquet file parser for javascript", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -177,2 +177,13 @@ # hyparquet | ||
Parquet encodings: | ||
- [X] PLAIN | ||
- [X] PLAIN_DICTIONARY | ||
- [X] RLE_DICTIONARY | ||
- [X] RLE | ||
- [X] BIT_PACKED | ||
- [X] DELTA_BINARY_PACKED | ||
- [X] DELTA_BYTE_ARRAY | ||
- [ ] DELTA_LENGTH_BYTE_ARRAY | ||
- [ ] BYTE_STREAM_SPLIT | ||
## Hysnappy | ||
@@ -179,0 +190,0 @@ |
@@ -0,1 +1,3 @@ | ||
import { isListLike, isMapLike } from './schema.js' | ||
/** | ||
@@ -7,43 +9,56 @@ * Dremel-assembly of arrays of values into lists | ||
* | ||
* @param {number[] | undefined} definitionLevels definition levels | ||
* @param {number[]} repetitionLevels repetition levels | ||
* @param {ArrayLike<any>} values values to process | ||
* @param {boolean} isNullable can entries be null? | ||
* @typedef {import('./types.d.ts').DecodedArray} DecodedArray | ||
* @typedef {import('./types.d.ts').FieldRepetitionType} FieldRepetitionType | ||
* @param {number[] | undefined} definitionLevels | ||
* @param {number[]} repetitionLevels | ||
* @param {DecodedArray} values | ||
* @param {(FieldRepetitionType | undefined)[]} repetitionPath | ||
* @param {number} maxDefinitionLevel definition level that corresponds to non-null | ||
* @param {number} maxRepetitionLevel repetition level that corresponds to a new row | ||
* @returns {any[]} array of values | ||
* @returns {DecodedArray} array of values | ||
*/ | ||
export function assembleObjects( | ||
definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel | ||
export function assembleLists( | ||
definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel, maxRepetitionLevel | ||
) { | ||
const n = definitionLevels?.length || repetitionLevels.length | ||
let valueIndex = 0 | ||
/** @type {any[]} */ | ||
const output = [] | ||
let currentContainer = output | ||
// Trackers for nested structures. | ||
// Track state of nested structures | ||
const containerStack = [output] | ||
let currentContainer = output | ||
let currentDepth = 0 // schema depth | ||
let currentDefLevel = 0 // list depth | ||
let currentRepLevel = 0 | ||
for (let i = 0; i < repetitionLevels.length; i++) { | ||
for (let i = 0; i < n; i++) { | ||
// assert(currentDefLevel === containerStack.length - 1) | ||
const def = definitionLevels?.length ? definitionLevels[i] : maxDefinitionLevel | ||
const rep = repetitionLevels[i] | ||
if (rep !== maxRepetitionLevel) { | ||
// Move back to the parent container | ||
while (rep < containerStack.length - 1) { | ||
// Pop up to start of rep level | ||
while (currentDepth && (rep < currentRepLevel || repetitionPath[currentDepth] === 'OPTIONAL')) { | ||
if (repetitionPath[currentDepth] !== 'REQUIRED') { | ||
containerStack.pop() | ||
currentDefLevel-- | ||
} | ||
// Construct new lists up to max repetition level | ||
// @ts-expect-error won't be empty | ||
currentContainer = containerStack.at(-1) | ||
if (repetitionPath[currentDepth] === 'REPEATED') currentRepLevel-- | ||
currentDepth-- | ||
} | ||
// @ts-expect-error won't be empty | ||
currentContainer = containerStack.at(-1) | ||
// Add lists up to definition level | ||
const targetDepth = isNullable ? (def + 1) / 2 : maxRepetitionLevel + 1 | ||
for (let j = containerStack.length; j < targetDepth; j++) { | ||
/** @type {any[]} */ | ||
const newList = [] | ||
currentContainer.push(newList) | ||
currentContainer = newList | ||
containerStack.push(newList) | ||
// Go deeper to end of definition level | ||
while (currentDepth < repetitionPath.length - 2 && currentDefLevel < def) { | ||
currentDepth++ | ||
if (repetitionPath[currentDepth] !== 'REQUIRED') { | ||
/** @type {any[]} */ | ||
const newList = [] | ||
currentContainer.push(newList) | ||
currentContainer = newList | ||
containerStack.push(newList) | ||
currentDefLevel++ | ||
} | ||
if (repetitionPath[currentDepth] === 'REPEATED') currentRepLevel++ | ||
} | ||
@@ -53,10 +68,8 @@ | ||
if (def === maxDefinitionLevel) { | ||
// assert(currentDepth === maxDefinitionLevel || currentDepth === repetitionPath.length - 2) | ||
currentContainer.push(values[valueIndex++]) | ||
} else if (isNullable) { | ||
// TODO: actually depends on level required or not | ||
if (def % 2 === 0) { | ||
currentContainer.push(undefined) | ||
} else { | ||
currentContainer.push([]) | ||
} | ||
} else if (currentDepth === repetitionPath.length - 2) { | ||
currentContainer.push(null) | ||
} else { | ||
currentContainer.push([]) | ||
} | ||
@@ -68,7 +81,5 @@ } | ||
if (values.length > 0 && maxRepetitionLevel === 0) { | ||
// All values belong to the same (root) list | ||
return [values] | ||
return values // flat list | ||
} | ||
// return max definition level of nested lists | ||
/** @type {any[]} */ | ||
for (let i = 0; i < maxDefinitionLevel; i++) { | ||
@@ -85,2 +96,145 @@ /** @type {any[]} */ | ||
// TODO: depends on prior def level | ||
/** | ||
* Assemble a nested structure from subcolumn data. | ||
* https://github.com/apache/parquet-format/blob/apache-parquet-format-2.10.0/LogicalTypes.md#nested-types | ||
* | ||
* @typedef {import('./types.d.ts').SchemaTree} SchemaTree | ||
* @param {Map<string, any[]>} subcolumnData | ||
* @param {SchemaTree} schema top-level schema element | ||
* @param {number} [depth] depth of nested structure | ||
*/ | ||
export function assembleNested(subcolumnData, schema, depth = 0) { | ||
const path = schema.path.join('.') | ||
const optional = schema.element.repetition_type === 'OPTIONAL' | ||
const nextDepth = optional ? depth + 1 : depth | ||
if (isListLike(schema)) { | ||
const sublist = schema.children[0].children[0] | ||
assembleNested(subcolumnData, sublist, nextDepth + 1) | ||
const subcolumn = sublist.path.join('.') | ||
const values = subcolumnData.get(subcolumn) | ||
if (!values) throw new Error('parquet list-like column missing values') | ||
if (optional) flattenAtDepth(values, depth) | ||
subcolumnData.set(path, values) | ||
subcolumnData.delete(subcolumn) | ||
return | ||
} | ||
if (isMapLike(schema)) { | ||
const mapName = schema.children[0].element.name | ||
// Assemble keys and values | ||
assembleNested(subcolumnData, schema.children[0].children[0], nextDepth + 1) | ||
assembleNested(subcolumnData, schema.children[0].children[1], nextDepth + 1) | ||
const keys = subcolumnData.get(`${path}.${mapName}.key`) | ||
const values = subcolumnData.get(`${path}.${mapName}.value`) | ||
if (!keys) throw new Error('parquet map-like column missing keys') | ||
if (!values) throw new Error('parquet map-like column missing values') | ||
if (keys.length !== values.length) { | ||
throw new Error('parquet map-like column key/value length mismatch') | ||
} | ||
const out = assembleMaps(keys, values, nextDepth) | ||
if (optional) flattenAtDepth(out, depth) | ||
subcolumnData.delete(`${path}.${mapName}.key`) | ||
subcolumnData.delete(`${path}.${mapName}.value`) | ||
subcolumnData.set(path, out) | ||
return | ||
} | ||
// Struct-like column | ||
if (schema.children.length) { | ||
// construct a meta struct and then invert | ||
/** @type {Record<string, any>} */ | ||
const struct = {} | ||
for (const child of schema.children) { | ||
assembleNested(subcolumnData, child, nextDepth) | ||
const childData = subcolumnData.get(child.path.join('.')) | ||
if (!childData) throw new Error('parquet struct-like column missing child data') | ||
if (child.element.repetition_type === 'OPTIONAL') { | ||
flattenAtDepth(childData, depth) | ||
} | ||
struct[child.element.name] = childData | ||
} | ||
// remove children | ||
for (const child of schema.children) { | ||
subcolumnData.delete(child.path.join('.')) | ||
} | ||
// invert struct by depth | ||
subcolumnData.set(path, invertStruct(struct, depth)) | ||
return | ||
} | ||
// assert(schema.element.repetition_type !== 'REPEATED') | ||
} | ||
/** | ||
* @param {any[]} arr | ||
* @param {number} depth | ||
*/ | ||
function flattenAtDepth(arr, depth) { | ||
for (let i = 0; i < arr.length; i++) { | ||
if (depth) { | ||
flattenAtDepth(arr[i], depth - 1) | ||
} else { | ||
arr[i] = arr[i][0] | ||
} | ||
} | ||
} | ||
/** | ||
* @param {any[]} keys | ||
* @param {any[]} values | ||
* @param {number} depth | ||
* @returns {any[]} | ||
*/ | ||
function assembleMaps(keys, values, depth) { | ||
const out = [] | ||
for (let i = 0; i < keys.length; i++) { | ||
if (depth) { | ||
out.push(assembleMaps(keys[i], values[i], depth - 1)) // go deeper | ||
} else { | ||
if (keys[i]) { | ||
/** @type {Record<string, any>} */ | ||
const obj = {} | ||
for (let j = 0; j < keys[i].length; j++) { | ||
const value = values[i][j] | ||
obj[keys[i][j]] = value === undefined ? null : value | ||
} | ||
out.push(obj) | ||
} else { | ||
out.push(undefined) | ||
} | ||
} | ||
} | ||
return out | ||
} | ||
/** | ||
* Invert a struct-like object by depth. | ||
* | ||
* @param {Record<string, any[]>} struct | ||
* @param {number} depth | ||
* @returns {any[]} | ||
*/ | ||
function invertStruct(struct, depth) { | ||
const keys = Object.keys(struct) | ||
const length = struct[keys[0]]?.length | ||
const out = [] | ||
for (let i = 0; i < length; i++) { | ||
/** @type {Record<string, any>} */ | ||
const obj = {} | ||
for (const key of keys) { | ||
obj[key] = struct[key][i] | ||
} | ||
if (depth) { | ||
out.push(invertStruct(obj, depth - 1)) // deeper | ||
} else { | ||
out.push(obj) | ||
} | ||
} | ||
return out | ||
} |
@@ -1,2 +0,2 @@ | ||
import { assembleObjects } from './assemble.js' | ||
import { assembleLists } from './assemble.js' | ||
import { convert } from './convert.js' | ||
@@ -6,3 +6,3 @@ import { readDataPage, readDictionaryPage } from './datapage.js' | ||
import { parquetHeader } from './header.js' | ||
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js' | ||
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' | ||
import { snappyUncompress } from './snappy.js' | ||
@@ -43,3 +43,3 @@ import { concat } from './utils.js' | ||
if (header.compressed_page_size === undefined) { | ||
throw new Error(`parquet compressed page size is undefined in column '${columnMetadata.path_in_schema}'`) | ||
throw new Error('parquet compressed page size is undefined') | ||
} | ||
@@ -67,34 +67,22 @@ | ||
// construct output values: skip nulls and construct lists | ||
if (repetitionLevels.length) { | ||
dereferenceDictionary(dictionary, dataPage) | ||
dereferenceDictionary(dictionary, dataPage) | ||
values = convert(dataPage, element) | ||
if (repetitionLevels.length || definitionLevels?.length) { | ||
// Use repetition levels to construct lists | ||
const isNullable = columnMetadata && !isRequired(schemaPath.slice(0, 2)) | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) | ||
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) | ||
// convert primitive types to rich types | ||
values = convert(dataPage, element) | ||
values = assembleObjects( | ||
definitionLevels, repetitionLevels, values, isNullable, maxDefinitionLevel, maxRepetitionLevel | ||
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) | ||
values = assembleLists( | ||
definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel, maxRepetitionLevel | ||
) | ||
} else if (definitionLevels?.length) { | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) | ||
// Use definition levels to skip nulls | ||
values = [] | ||
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values) | ||
values = convert(values, element) | ||
} else { | ||
dereferenceDictionary(dictionary, dataPage) | ||
values = convert(dataPage, element) | ||
// wrap nested flat data by depth | ||
for (let i = 2; i < schemaPath.length; i++) { | ||
if (schemaPath[i].element.repetition_type !== 'REQUIRED') { | ||
values = [values] | ||
} | ||
} | ||
} | ||
// assert(BigInt(values.length) === rowGroup.num_rows) | ||
concat(rowData, values) | ||
} else if (header.type === 'DICTIONARY_PAGE') { | ||
const diph = header.dictionary_page_header | ||
if (!diph) throw new Error('parquet dictionary page header is undefined') | ||
const page = decompressPage( | ||
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors | ||
) | ||
dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length) | ||
} else if (header.type === 'DATA_PAGE_V2') { | ||
@@ -109,22 +97,22 @@ const daph2 = header.data_page_header_v2 | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) | ||
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) | ||
if (repetitionLevels.length) { | ||
dereferenceDictionary(dictionary, dataPage) | ||
values = convert(dataPage, element) | ||
dereferenceDictionary(dictionary, dataPage) | ||
values = convert(dataPage, element) | ||
if (repetitionLevels.length || definitionLevels?.length) { | ||
// Use repetition levels to construct lists | ||
values = assembleObjects( | ||
definitionLevels, repetitionLevels, values, true, maxDefinitionLevel, maxRepetitionLevel | ||
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath) | ||
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath) | ||
const repetitionPath = schemaPath.map(({ element }) => element.repetition_type) | ||
values = assembleLists( | ||
definitionLevels, repetitionLevels, values, repetitionPath, maxDefinitionLevel, maxRepetitionLevel | ||
) | ||
} else if (daph2.num_nulls) { | ||
// skip nulls | ||
if (!definitionLevels) throw new Error('parquet data page v2 nulls missing definition levels') | ||
values = [] // TODO: copy straight into rowData, combine convert into skipNulls | ||
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values) | ||
values = convert(values, element) | ||
} else { | ||
dereferenceDictionary(dictionary, dataPage) | ||
values = convert(dataPage, element) | ||
} | ||
concat(rowData, values) | ||
} else if (header.type === 'DICTIONARY_PAGE') { | ||
const diph = header.dictionary_page_header | ||
if (!diph) throw new Error('parquet dictionary page header is undefined') | ||
const page = decompressPage( | ||
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors | ||
) | ||
dictionary = readDictionaryPage(page, diph, columnMetadata, element.type_length) | ||
} else { | ||
@@ -197,32 +185,1 @@ throw new Error(`parquet unsupported page type: ${header.type}`) | ||
} | ||
/** | ||
* Expand data page list with nulls. | ||
* | ||
* @param {number[]} definitionLevels | ||
* @param {number} maxDefinitionLevel | ||
* @param {ArrayLike<any>} dataPage | ||
* @param {any} dictionary | ||
* @param {any[]} output | ||
*/ | ||
function skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, output) { | ||
if (output.length) throw new Error('parquet output array is not empty') | ||
// Use definition levels to skip nulls | ||
let index = 0 | ||
for (let i = 0; i < definitionLevels.length; i++) { | ||
if (definitionLevels[i] === maxDefinitionLevel) { | ||
if (index > dataPage.length) { | ||
throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`) | ||
} | ||
let v = dataPage[index++] | ||
// map to dictionary value | ||
if (dictionary) { | ||
v = dictionary[v] | ||
} | ||
output[i] = v | ||
} else { | ||
output[i] = undefined | ||
} | ||
} | ||
} |
@@ -92,5 +92,5 @@ const dayMillis = 86400000 // 1 day in milliseconds | ||
if (!bytes) return undefined | ||
const int16 = (bytes[1] << 8) | bytes[0] | ||
const int16 = bytes[1] << 8 | bytes[0] | ||
const sign = int16 >> 15 ? -1 : 1 | ||
const exp = (int16 >> 10) & 0x1f | ||
const exp = int16 >> 10 & 0x1f | ||
const frac = int16 & 0x3ff | ||
@@ -97,0 +97,0 @@ if (exp === 0) return sign * Math.pow(2, -14) * (frac / 1024) // subnormals |
import { decompressPage } from './column.js' | ||
import { deltaBinaryUnpack, deltaByteArray } from './delta.js' | ||
import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' | ||
import { readPlain } from './plain.js' | ||
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' | ||
import { readVarInt, readZigZagBigInt } from './thrift.js' | ||
@@ -15,3 +15,3 @@ /** | ||
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree | ||
* @param {Uint8Array} compressedBytes raw page data (should already be decompressed) | ||
* @param {Uint8Array} compressedBytes raw page data | ||
* @param {import("./types.d.ts").PageHeader} ph page header | ||
@@ -46,16 +46,15 @@ * @param {SchemaTree[]} schemaPath | ||
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength) | ||
const pageReader = { view: pageView, offset: 0 } | ||
// read values based on encoding | ||
/** @type {import('./types.d.ts').DecodedArray} */ | ||
let dataPage = [] | ||
let dataPage | ||
const nValues = daph2.num_values - daph2.num_nulls | ||
if (daph2.encoding === 'PLAIN') { | ||
const pageReader = { view: pageView, offset: 0 } | ||
const { type_length } = schemaPath[schemaPath.length - 1].element | ||
dataPage = readPlain(pageReader, columnMetadata.type, nValues, type_length) | ||
} else if (daph2.encoding === 'RLE') { | ||
const bitWidth = 1 | ||
const pageReader = { view: pageView, offset: 4 } | ||
pageReader.offset = 4 | ||
dataPage = new Array(nValues) | ||
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, dataPage) | ||
readRleBitPackedHybrid(pageReader, 1, uncompressedPageSize, dataPage) | ||
} else if ( | ||
@@ -66,3 +65,3 @@ daph2.encoding === 'PLAIN_DICTIONARY' || | ||
const bitWidth = pageView.getUint8(0) | ||
const pageReader = { view: pageView, offset: 1 } | ||
pageReader.offset = 1 | ||
dataPage = new Array(nValues) | ||
@@ -73,3 +72,6 @@ readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize, dataPage) | ||
dataPage = int32 ? new Int32Array(nValues) : new BigInt64Array(nValues) | ||
deltaBinaryUnpack(page, nValues, dataPage) | ||
deltaBinaryUnpack(pageReader, nValues, dataPage) | ||
} else if (daph2.encoding === 'DELTA_BYTE_ARRAY') { | ||
dataPage = new Array(nValues) | ||
deltaByteArray(pageReader, nValues, dataPage) | ||
} else { | ||
@@ -83,9 +85,7 @@ throw new Error(`parquet unsupported encoding: ${daph2.encoding}`) | ||
/** | ||
* Read the repetition levels from this page, if any. | ||
* | ||
* @typedef {import("./types.d.ts").DataReader} DataReader | ||
* @param {DataReader} reader data view for the page | ||
* @param {DataPageHeaderV2} daph2 data page header | ||
* @param {DataReader} reader | ||
* @param {DataPageHeaderV2} daph2 data page header v2 | ||
* @param {SchemaTree[]} schemaPath | ||
* @returns {any[]} repetition levels and number of bytes read | ||
* @returns {any[]} repetition levels | ||
*/ | ||
@@ -106,8 +106,6 @@ export function readRepetitionLevelsV2(reader, daph2, schemaPath) { | ||
/** | ||
* Read the definition levels from this page, if any. | ||
* | ||
* @param {DataReader} reader data view for the page | ||
* @param {DataReader} reader | ||
* @param {DataPageHeaderV2} daph2 data page header v2 | ||
* @param {number} maxDefinitionLevel | ||
* @returns {number[] | undefined} definition levels and number of bytes read | ||
* @returns {number[] | undefined} definition levels | ||
*/ | ||
@@ -123,60 +121,1 @@ function readDefinitionLevelsV2(reader, daph2, maxDefinitionLevel) { | ||
} | ||
/** | ||
* Unpack the delta binary packed encoding. | ||
* | ||
* @param {Uint8Array} page page data | ||
* @param {number} nValues number of values to read | ||
* @param {Int32Array | BigInt64Array} values array to write to | ||
*/ | ||
function deltaBinaryUnpack(page, nValues, values) { | ||
const int32 = values instanceof Int32Array | ||
const view = new DataView(page.buffer, page.byteOffset, page.byteLength) | ||
const reader = { view, offset: 0 } | ||
const blockSize = readVarInt(reader) | ||
const miniblockPerBlock = readVarInt(reader) | ||
let count = readVarInt(reader) | ||
let value = readZigZagBigInt(reader) // first value | ||
let valueIndex = 0 | ||
values[valueIndex++] = int32 ? Number(value) : value | ||
const valuesPerMiniblock = blockSize / miniblockPerBlock | ||
while (valueIndex < nValues) { | ||
const minDelta = readZigZagBigInt(reader) | ||
const bitWidths = new Uint8Array(miniblockPerBlock) | ||
for (let i = 0; i < miniblockPerBlock; i++) { | ||
bitWidths[i] = page[reader.offset++] | ||
} | ||
for (let i = 0; i < miniblockPerBlock; i++) { | ||
let miniblockCount = Math.min(count, valuesPerMiniblock) | ||
const bitWidth = BigInt(bitWidths[i]) | ||
if (bitWidth) { | ||
if (count > 1) { | ||
const mask = (1n << bitWidth) - 1n | ||
let bitpackPos = 0n | ||
while (count && miniblockCount) { | ||
let bits = (BigInt(view.getUint8(reader.offset)) >> bitpackPos) & mask // TODO: don't re-read value every time | ||
bitpackPos += bitWidth | ||
while (bitpackPos >= 8) { | ||
bitpackPos -= 8n | ||
reader.offset++ | ||
bits |= (BigInt(view.getUint8(reader.offset)) << bitWidth - bitpackPos) & mask | ||
} | ||
const delta = minDelta + bits | ||
value += delta | ||
values[valueIndex++] = int32 ? Number(value) : value | ||
count-- | ||
miniblockCount-- | ||
} | ||
} | ||
} else { | ||
for (let j = 0; j < valuesPerMiniblock && valueIndex < nValues; j++) { | ||
value += minDelta | ||
values[valueIndex++] = int32 ? Number(value) : value | ||
} | ||
} | ||
} | ||
} | ||
} |
@@ -60,3 +60,3 @@ import { readVarInt } from './thrift.js' | ||
function readRle(reader, count, bitWidth, values, seen) { | ||
const width = (bitWidth + 7) >> 3 | ||
const width = bitWidth + 7 >> 3 | ||
let value = 0 | ||
@@ -93,3 +93,3 @@ if (width === 1) { | ||
// extract number of values to read from header | ||
let count = (header >> 1) << 3 | ||
let count = header >> 1 << 3 | ||
// mask for bitWidth number of bits | ||
@@ -123,3 +123,3 @@ const mask = (1 << bitWidth) - 1 | ||
// emit value by shifting off to the right and masking | ||
values[seen++] = (data >> right) & mask | ||
values[seen++] = data >> right & mask | ||
} | ||
@@ -126,0 +126,0 @@ count-- |
@@ -62,3 +62,3 @@ import { CompressionCodec, ConvertedType, Encoding, FieldRepetitionType, ParquetType } from './constants.js' | ||
const combinedView = new Uint8Array(combinedBuffer) | ||
combinedView.set(new Uint8Array(metadataBuffer), 0) | ||
combinedView.set(new Uint8Array(metadataBuffer)) | ||
combinedView.set(new Uint8Array(footerBuffer), footerOffset - metadataOffset) | ||
@@ -65,0 +65,0 @@ return parquetMetadata(combinedBuffer) |
@@ -50,3 +50,3 @@ /** | ||
const byte = reader.view.getUint8(byteOffset) | ||
values[i] = (byte & (1 << bitOffset)) !== 0 | ||
values[i] = (byte & 1 << bitOffset) !== 0 | ||
} | ||
@@ -99,3 +99,3 @@ reader.offset += Math.ceil(count / 8) | ||
const high = reader.view.getInt32(reader.offset + i * 12 + 8, true) | ||
values[i] = (BigInt(high) << 64n) | low | ||
values[i] = BigInt(high) << 64n | low | ||
} | ||
@@ -102,0 +102,0 @@ reader.offset += count * 12 |
import { assembleNested } from './assemble.js' | ||
import { getColumnOffset, readColumn } from './column.js' | ||
import { parquetMetadataAsync } from './metadata.js' | ||
import { getSchemaPath, isMapLike } from './schema.js' | ||
import { getSchemaPath } from './schema.js' | ||
import { concat } from './utils.js' | ||
@@ -114,3 +115,6 @@ | ||
const promises = [] | ||
const maps = new Map() | ||
// Top-level columns to assemble | ||
const { children } = getSchemaPath(metadata.schema, [])[0] | ||
const subcolumnNames = new Map(children.map(child => [child.element.name, getSubcolumns(child)])) | ||
const subcolumnData = new Map() // columns to assemble as maps | ||
// read column data | ||
@@ -156,44 +160,17 @@ for (let columnIndex = 0; columnIndex < rowGroup.columns.length; columnIndex++) { | ||
) | ||
if (columnData.length !== Number(rowGroup.num_rows)) { | ||
throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`) | ||
} | ||
// assert(columnData.length === Number(rowGroup.num_rows) | ||
if (isMapLike(schemaPath[schemaPath.length - 3])) { | ||
const name = columnMetadata.path_in_schema.slice(0, -2).join('.') | ||
if (!maps.has(name)) { | ||
maps.set(name, columnData) | ||
columnData = undefined // do not emit column data until both key and value are read | ||
} else { | ||
if (columnMetadata.path_in_schema[0] === 'key') { | ||
throw new Error('parquet map-like column key is not first') // TODO: support value-first | ||
} else { | ||
const values = columnData | ||
const keys = maps.get(name) | ||
const out = [] | ||
if (keys.length !== values.length) { | ||
throw new Error('parquet map-like column key/value length mismatch') | ||
} | ||
// assemble map-like column data | ||
for (let i = 0; i < keys.length; i++) { | ||
// keys will be empty for {} and undefined for null | ||
if (keys[i]) { | ||
/** @type {Record<string, any>} */ | ||
const obj = {} | ||
for (let j = 0; j < keys[i].length; j++) { | ||
if (Array.isArray(keys[i][j])) { | ||
// TODO: key should not be an array, this is an assemble bug? | ||
keys[i][j] = keys[i][j][0] | ||
values[i][j] = values[i][j][0] | ||
} | ||
if (!keys[i][j]) continue | ||
obj[keys[i][j]] = values[i][j] === undefined ? null : values[i][j] | ||
} | ||
out.push(obj) | ||
} else { | ||
out.push(undefined) | ||
} | ||
} | ||
columnData = out | ||
} | ||
maps.delete(name) | ||
// TODO: fast path for non-nested columns | ||
// Save column data for assembly | ||
const subcolumn = columnMetadata.path_in_schema.join('.') | ||
subcolumnData.set(subcolumn, columnData) | ||
columnData = undefined | ||
const subcolumns = subcolumnNames.get(columnName) | ||
if (subcolumns?.every(name => subcolumnData.has(name))) { | ||
// We have all data needed to assemble a top level column | ||
assembleNested(subcolumnData, schemaPath[1]) | ||
columnData = subcolumnData.get(columnName) | ||
if (!columnData) { | ||
throw new Error(`parquet column data not assembled: ${columnName}`) | ||
} | ||
@@ -222,1 +199,20 @@ } | ||
} | ||
/** | ||
* Return a list of sub-columns needed to construct a top-level column. | ||
* | ||
* @param {import('./types.js').SchemaTree} schema | ||
* @param {string[]} output | ||
* @returns {string[]} | ||
*/ | ||
function getSubcolumns(schema, output = []) { | ||
if (schema.children.length) { | ||
for (const child of schema.children) { | ||
getSubcolumns(child, output) | ||
} | ||
} else { | ||
output.push(schema.path.join('.')) | ||
} | ||
return output | ||
} |
@@ -104,4 +104,4 @@ /** | ||
// Copy with 1-byte offset | ||
len = ((c >>> 2) & 0x7) + 4 | ||
offset = input[pos] + ((c >>> 5) << 8) | ||
len = (c >>> 2 & 0x7) + 4 | ||
offset = input[pos] + (c >>> 5 << 8) | ||
pos++ | ||
@@ -108,0 +108,0 @@ break |
@@ -22,2 +22,3 @@ // TCompactProtocol types | ||
* | ||
* @typedef {import("./types.d.ts").DataReader} DataReader | ||
* @param {DataReader} reader | ||
@@ -123,3 +124,2 @@ * @returns {Record<string, any>} | ||
* | ||
* @typedef {import("./types.d.ts").DataReader} DataReader | ||
* @param {DataReader} reader | ||
@@ -170,3 +170,3 @@ * @returns {number} value | ||
// convert zigzag to int | ||
return (zigzag >>> 1) ^ -(zigzag & 1) | ||
return zigzag >>> 1 ^ -(zigzag & 1) | ||
} | ||
@@ -184,3 +184,3 @@ | ||
// convert zigzag to int | ||
return (zigzag >> BigInt(1)) ^ -(zigzag & BigInt(1)) | ||
return zigzag >> BigInt(1) ^ -(zigzag & BigInt(1)) | ||
} | ||
@@ -254,3 +254,3 @@ | ||
} else { | ||
varInt[idx++] = (n & 0x7f) | 0x80 | ||
varInt[idx++] = n & 0x7f | 0x80 | ||
n >>>= 7 | ||
@@ -257,0 +257,0 @@ } |
99330
22
2602
220