@openfn/language-common
Advanced tools
Comparing version 1.10.2 to 1.10.3
@@ -46,7 +46,20 @@ var __defProp = Object.defineProperty; | ||
import { parse as parse2 } from "csv-parse"; | ||
import { Readable as Readable2 } from "stream"; | ||
// src/util.js | ||
import { Readable, Writable } from "stream"; | ||
function expandReferences(state, ...args) { | ||
return args.map((value) => expandReference(state, value)); | ||
} | ||
var isStream = (value) => { | ||
if (value && typeof value == "object") { | ||
if (value instanceof Readable || value instanceof Writable) { | ||
return true; | ||
} | ||
if (value.pipeTo || value.pipe) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
}; | ||
function expandReference(state, value) { | ||
@@ -56,3 +69,3 @@ if (Array.isArray(value)) { | ||
} | ||
if (typeof value == "object" && !!value) { | ||
if (typeof value == "object" && !!value && !isStream(value)) { | ||
return Object.keys(value).reduce((acc, key) => { | ||
@@ -393,2 +406,12 @@ return { ...acc, [key]: expandReference(state, value[key]) }; | ||
} | ||
var getParser = (csvData, options2) => { | ||
if (typeof csvData === "string") { | ||
return parse2(csvData, options2); | ||
} | ||
let stream = csvData; | ||
if (csvData instanceof ReadableStream) { | ||
stream = Readable2.from(csvData); | ||
} | ||
return stream.pipe(parse2(options2)); | ||
}; | ||
function parseCsv(csvData, parsingOptions = {}, callback) { | ||
@@ -423,3 +446,3 @@ const defaultOptions = { | ||
let buffer = []; | ||
const parser = typeof resolvedCsvData === "string" ? parse2(resolvedCsvData, options2) : resolvedCsvData.pipe(parse2(options2)); | ||
const parser = getParser(resolvedCsvData, options2); | ||
const flushBuffer = async (currentState) => { | ||
@@ -430,12 +453,15 @@ const nextState = callback ? await callback(currentState, buffer) : composeNextState(currentState, buffer); | ||
}; | ||
let result = state; | ||
for await (const record of parser) { | ||
buffer.push(record); | ||
if (buffer.length === options2.chunkSize) { | ||
const [nextState, nextBuffer] = await flushBuffer(state); | ||
state = nextState; | ||
const [nextState, nextBuffer] = await flushBuffer(result); | ||
result = nextState; | ||
buffer = nextBuffer; | ||
} | ||
} | ||
const [finalState] = await flushBuffer(state); | ||
return finalState; | ||
if (buffer.length) { | ||
[result] = await flushBuffer(result); | ||
} | ||
return result; | ||
}; | ||
@@ -442,0 +468,0 @@ } |
// src/util.js | ||
import { Readable, Writable } from "stream"; | ||
function expandReferences(state, ...args) { | ||
return args.map((value) => expandReference(state, value)); | ||
} | ||
var isStream = (value) => { | ||
if (value && typeof value == "object") { | ||
if (value instanceof Readable || value instanceof Writable) { | ||
return true; | ||
} | ||
if (value.pipeTo || value.pipe) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
}; | ||
function expandReference(state, value) { | ||
@@ -9,3 +21,3 @@ if (Array.isArray(value)) { | ||
} | ||
if (typeof value == "object" && !!value) { | ||
if (typeof value == "object" && !!value && !isStream(value)) { | ||
return Object.keys(value).reduce((acc, key) => { | ||
@@ -12,0 +24,0 @@ return { ...acc, [key]: expandReference(state, value[key]) }; |
{ | ||
"name": "@openfn/language-common", | ||
"version": "1.10.2", | ||
"version": "1.10.3", | ||
"description": "Common Expressions for OpenFn", | ||
@@ -51,3 +51,4 @@ "homepage": "https://docs.openfn.org", | ||
"nock": "13.2.9", | ||
"rimraf": "^3.0.2" | ||
"rimraf": "^3.0.2", | ||
"undici": "^5.22.1" | ||
}, | ||
@@ -54,0 +55,0 @@ "main": "dist/index.cjs", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
134780
3155
9