csv-to-pouch
Advanced tools
Comparing version 3.1.2 to 3.3.0
121
cmd.js
@@ -1,2 +0,2 @@ | ||
const parseArgs = require('minimist'); | ||
const parseArgs = require("minimist"); | ||
@@ -19,64 +19,75 @@ const helpText = ` | ||
if (require.main === module) { | ||
const args = parseArgs(process.argv, { | ||
alias: { | ||
i: 'input', | ||
h: 'help', | ||
u: 'username', | ||
p: 'password', | ||
}, | ||
string: ['input', 'username', 'password', 'db'], | ||
boolean: ['help'], | ||
}); | ||
const args = parseArgs(process.argv, { | ||
alias: { | ||
i: "input", | ||
h: "help", | ||
u: "username", | ||
p: "password" | ||
}, | ||
string: ["input", "username", "password", "db"], | ||
boolean: ["help"] | ||
}); | ||
if (args.help) { | ||
console.log(helpText); | ||
return process.exit(0); | ||
} | ||
if (args.help) { | ||
console.log(helpText); | ||
return process.exit(0); | ||
} | ||
let dbName = args.db || args._[0]; | ||
if (!dbName) { | ||
console.error('You need to supply a database URL or filepath. -h for help'); | ||
return process.exit(1); | ||
} | ||
/** @type {string} */ | ||
let dbName = args.db || args._[0]; | ||
if (!dbName) { | ||
console.error( | ||
"You need to supply a database URL or filepath. -h for help" | ||
); | ||
return process.exit(1); | ||
} | ||
const { input } = args; | ||
if (process.stdin.isTTY && !input) { | ||
console.error( | ||
'Missing --input path, either provide it or pipe in a file from stdin.'); | ||
return process.exit(1); | ||
} | ||
const { input } = args; | ||
if (process.stdin.isTTY && !input) { | ||
console.error( | ||
"Missing --input path, either provide it or pipe in a file from stdin." | ||
); | ||
return process.exit(1); | ||
} | ||
const { password, username } = args; | ||
if ((password && !username) || (username && !password)) { | ||
console.error('You must either supply both a username and password, or neither'); | ||
return process.exit(1); | ||
} else if (password) { | ||
const { parse } = require('url'); | ||
const parsedURL = parse(dbName); | ||
if (!parsedURL.protocol) { | ||
console.error('Usernames/passwords are only for remote databases'); | ||
console.error('Is ' + dbName + ' a remote database?'); | ||
return process.exit(1); | ||
} | ||
const { password, username } = args; | ||
if ((password && !username) || (username && !password)) { | ||
console.error( | ||
"You must either supply both a username and password, or neither" | ||
); | ||
return process.exit(1); | ||
} else if (password) { | ||
const { parse } = require("url"); | ||
const parsedURL = parse(dbName); | ||
if (!parsedURL.protocol) { | ||
console.error("Usernames/passwords are only for remote databases"); | ||
console.error("Is " + dbName + " a remote database?"); | ||
return process.exit(1); | ||
} | ||
dbName = parsedURL.protocol + '//' + encodeURIComponent(username) + | ||
':' + encodeURIComponent(password) + '@' + parsedURL.host + | ||
parsedURL.path; | ||
} | ||
dbName = | ||
parsedURL.protocol + | ||
"//" + | ||
encodeURIComponent(username) + | ||
":" + | ||
encodeURIComponent(password) + | ||
"@" + | ||
parsedURL.host + | ||
parsedURL.path; | ||
} | ||
const PouchDB = require('pouchdb'); | ||
const { createReadStream } = require('fs'); | ||
const parseCSVFile = require('./index.js'); | ||
const PouchDB = require("pouchdb"); | ||
const { createReadStream } = require("fs"); | ||
const parseCSVFile = require("./index.js"); | ||
const inputStream = input ? createReadStream(input) : process.stdin; | ||
const db = new PouchDB(dbName, { ajax: { timeout: 60000 } }); | ||
const inputStream = input ? createReadStream(input) : process.stdin; | ||
const db = new PouchDB(dbName, { ajax: { timeout: 60000 } }); | ||
parseCSVFile(db, inputStream) | ||
.then(() => process.exit(0)) | ||
.catch(err => { | ||
console.error('unexpected error'); | ||
console.error(err); | ||
process.exit(1); | ||
}); | ||
parseCSVFile(db, inputStream) | ||
.then(() => process.exit(0)) | ||
.catch(err => { | ||
console.error("unexpected error"); | ||
console.error(err); | ||
process.exit(1); | ||
}); | ||
} | ||
/** | ||
* Creates the output dump file for the given GTFS file, and resolves once | ||
* complete. Tries to use 'memory' adapter for PouchDB, which requires a plugin. | ||
* Parse CSV data from the input stream and save it to the PouchDB database. | ||
* Each row is saved as a separate PouchDB document. If the `_id` preexists in | ||
* the database, the existing document will be updated with the new version | ||
* and the revision version will change. | ||
* @param db Database to save results to. | ||
* @param input Stream representing the CSV file, | ||
* such as `fs.createReadableStream('data.csv')` | ||
* @param transformer Optional function to transform CSV rows. | ||
* Input `row` represents the CSV data, and the returned object will be used as | ||
* a PouchDB document. | ||
* @returns Promise that resolves when db has been written to | ||
*/ | ||
function parseCSVFile<T>( | ||
db: PouchDB.Database<T>, | ||
input: NodeJS.ReadableStream, | ||
transformer?: (row: any) => T, | ||
): Promise<void> | ||
declare function parseCSVFile<T>( | ||
db: PouchDB.Database<T>, | ||
input: NodeJS.ReadableStream, | ||
transformer?: (row: any) => T | ||
): Promise<void>; | ||
export default parseCSVFile | ||
export default parseCSVFile; | ||
export { parseCSVFile }; |
368
index.es.js
import parse from 'csv-parse'; | ||
import stream, { Writable } from 'stream'; | ||
import util from 'util'; | ||
import transform from 'stream-transform'; | ||
import batch from 'stream-batch'; | ||
import { Writable } from 'stream'; | ||
// Generated by CoffeeScript 1.9.2 | ||
var Transformer; | ||
var stream$1; | ||
var util$1; | ||
var slice = [].slice; | ||
stream$1 = stream; | ||
util$1 = util; | ||
var index = function() { | ||
var argument, callback, data, error, handler, i, j, k, len, options, result, transform, type, v; | ||
options = {}; | ||
for (i = j = 0, len = arguments.length; j < len; i = ++j) { | ||
argument = arguments[i]; | ||
type = typeof argument; | ||
if (argument === null) { | ||
type = 'null'; | ||
} else if (type === 'object' && Array.isArray(argument)) { | ||
type = 'array'; | ||
} | ||
if (i === 0) { | ||
if (type === 'function') { | ||
handler = argument; | ||
} else if (type !== null) { | ||
data = argument; | ||
} | ||
continue; | ||
} | ||
if (type === 'object') { | ||
for (k in argument) { | ||
v = argument[k]; | ||
options[k] = v; | ||
} | ||
} else if (type === 'function') { | ||
if (handler && i === arguments.length - 1) { | ||
callback = argument; | ||
} else { | ||
handler = argument; | ||
} | ||
} else if (type !== 'null') { | ||
throw new Error('Invalid arguments'); | ||
} | ||
} | ||
transform = new Transformer(options, handler); | ||
error = false; | ||
if (data) { | ||
process.nextTick(function() { | ||
var len1, m, row; | ||
for (m = 0, len1 = data.length; m < len1; m++) { | ||
row = data[m]; | ||
if (error) { | ||
break; | ||
} | ||
transform.write(row); | ||
} | ||
return transform.end(); | ||
/** | ||
* Resolves when the `'finish'` event fires, rejects when the `'error'` event fires | ||
* @param {import("stream").Writable} stream | ||
* @returns {Promise<void>} | ||
*/ | ||
function finished(stream$$1) { | ||
return new Promise((resolve, reject) => { | ||
stream$$1.once('finish', resolve).once('error', reject); | ||
}); | ||
} | ||
if (callback || options.consume) { | ||
result = []; | ||
transform.on('readable', function() { | ||
var r, results; | ||
results = []; | ||
while ((r = transform.read())) { | ||
if (callback) { | ||
results.push(result.push(r)); | ||
} else { | ||
results.push(void 0); | ||
} | ||
} | ||
return results; | ||
}); | ||
transform.on('error', function(err) { | ||
error = true; | ||
if (callback) { | ||
return callback(err); | ||
} | ||
}); | ||
transform.on('end', function() { | ||
if (callback && !error) { | ||
return callback(null, result); | ||
} | ||
}); | ||
} | ||
return transform; | ||
}; | ||
Transformer = function(options1, transform1) { | ||
var base; | ||
this.options = options1 != null ? options1 : {}; | ||
this.transform = transform1; | ||
this.options.objectMode = true; | ||
if ((base = this.options).parallel == null) { | ||
base.parallel = 100; | ||
} | ||
stream$1.Transform.call(this, this.options); | ||
this.running = 0; | ||
this.started = 0; | ||
this.finished = 0; | ||
return this; | ||
}; | ||
util$1.inherits(Transformer, stream$1.Transform); | ||
var Transformer_1 = Transformer; | ||
Transformer.prototype._transform = function(chunk, encoding, cb) { | ||
var callback, err, l; | ||
this.started++; | ||
this.running++; | ||
if (this.running < this.options.parallel) { | ||
cb(); | ||
cb = null; | ||
} | ||
try { | ||
l = this.transform.length; | ||
if (this.options.params != null) { | ||
l--; | ||
} | ||
if (l === 1) { | ||
this._done(null, [this.transform.call(null, chunk, this.options.params)], cb); | ||
} else if (l === 2) { | ||
callback = (function(_this) { | ||
return function() { | ||
var chunks, err; | ||
err = arguments[0], chunks = 2 <= arguments.length ? slice.call(arguments, 1) : []; | ||
return _this._done(err, chunks, cb); | ||
}; | ||
})(this); | ||
this.transform.call(null, chunk, callback, this.options.params); | ||
} else { | ||
throw Error("Invalid handler arguments"); | ||
} | ||
return false; | ||
} catch (_error) { | ||
err = _error; | ||
return this._done(err); | ||
} | ||
}; | ||
Transformer.prototype._flush = function(cb) { | ||
this._ending = function() { | ||
if (this.running === 0) { | ||
return cb(); | ||
} | ||
}; | ||
return this._ending(); | ||
}; | ||
Transformer.prototype._done = function(err, chunks, cb) { | ||
var chunk, j, len; | ||
this.running--; | ||
if (err) { | ||
return this.emit('error', err); | ||
} | ||
this.finished++; | ||
for (j = 0, len = chunks.length; j < len; j++) { | ||
chunk = chunks[j]; | ||
if (typeof chunk === 'number') { | ||
chunk = "" + chunk; | ||
} | ||
if (chunk != null) { | ||
this.push(chunk); | ||
} | ||
} | ||
if (cb) { | ||
cb(); | ||
} | ||
if (this._ending) { | ||
return this._ending(); | ||
} | ||
}; | ||
index.Transformer = Transformer_1; | ||
} | ||
function wrapWrite(writeFunc) { | ||
return (chunk, encoding, next) => | ||
Promise.resolve(writeFunc(chunk, encoding)).then(() => next(), next); | ||
return (chunk, encoding, next) => | ||
Promise.resolve(writeFunc(chunk, encoding)).then(() => next(), next); | ||
} | ||
function wrapWritev(writeFunc) { | ||
return (chunks, next) => | ||
Promise.resolve(writeFunc(chunks)).then(() => next(), next); | ||
return (chunks, next) => | ||
Promise.resolve(writeFunc(chunks)).then(() => next(), next); | ||
} | ||
/** | ||
@@ -195,74 +30,92 @@ * Writable stream constructor that uses async functions for write rather | ||
class PromiseWritable extends Writable { | ||
constructor(options) { | ||
const superOpts = Object.assign({}, options); | ||
if (options.write) superOpts.write = wrapWrite(options.write); | ||
if (options.writev) superOpts.writev = wrapWritev(options.writev); | ||
constructor(options) { | ||
const superOpts = Object.assign({}, options); | ||
if (options.write) superOpts.write = wrapWrite(options.write); | ||
if (options.writev) superOpts.writev = wrapWritev(options.writev); | ||
super(superOpts); | ||
} | ||
super(superOpts); | ||
} | ||
} | ||
// @ts-check | ||
/** | ||
* Checks if a PouchDB row from `allDocs` is for a nonexistent document. | ||
* @param {any} row | ||
*/ | ||
function isErrorRow(row) { | ||
return row && row.error === "not_found"; | ||
} | ||
/** | ||
* Creates a Writable stream that saves data into a PouchDB database. | ||
* @param {PouchDB.Database} db - database to save to | ||
* @template T | ||
* @param {PouchDB.Database<T>} db - database to save to | ||
* @param {object} [pouchOpts] - options passed to PouchDB's put or bulkDocs | ||
* function | ||
* @returns {stream.Writable} | ||
* @returns {NodeJS.WritableStream} | ||
*/ | ||
function createPouchStream(db, pouchOpts) { | ||
return new PromiseWritable({ | ||
objectMode: true, | ||
decodeStrings: false, | ||
async write(chunk) { | ||
// Data should be in object format. Convert buffers and strings | ||
// by parsing the JSON they should represent | ||
let data = chunk; | ||
if (Buffer.isBuffer(data)) data = data.toString(); | ||
if (typeof data === 'string') data = JSON.parse(data); | ||
return new PromiseWritable({ | ||
objectMode: true, | ||
decodeStrings: false, | ||
async write(chunk) { | ||
// Data should be in object format. Convert buffers and strings | ||
// by parsing the JSON they should represent | ||
let data = /** @type {any} */ (chunk); | ||
if (Buffer.isBuffer(data)) data = data.toString(); | ||
if (typeof data === "string") data = JSON.parse(data); | ||
if (Array.isArray(data)) { | ||
// Find all objects in the array without a `_rev` field | ||
const noRevs = data | ||
.filter(doc => !doc._rev) | ||
.reduce((map, doc) => map.set(doc._id, doc), new Map()); | ||
if (Array.isArray(data)) { | ||
// Find all objects in the array without a `_rev` field | ||
const noRevs = data | ||
.filter(doc => !doc._rev) | ||
.reduce((map, doc) => map.set(doc._id, doc), new Map()); | ||
if (noRevs.size > 0) { | ||
// If there are objects without a `_rev` key, | ||
// check if they already exist in the database. | ||
// If so, add the existing rev property to the doc. | ||
const existing = await db.allDocs({ keys: [...noRevs.keys()] }); | ||
existing.rows | ||
.filter(row => !row.error) | ||
.forEach((row) => { noRevs.get(row.id)._rev = row.value.rev; }); | ||
} | ||
if (noRevs.size > 0) { | ||
// If there are objects without a `_rev` key, | ||
// check if they already exist in the database. | ||
// If so, add the existing rev property to the doc. | ||
const existing = await db.allDocs({ | ||
keys: [...noRevs.keys()] | ||
}); | ||
existing.rows | ||
.filter(row => !isErrorRow(row)) | ||
.forEach(row => { | ||
noRevs.get(row.id)._rev = row.value.rev; | ||
}); | ||
} | ||
// Save the results in the database | ||
return db.bulkDocs(data, pouchOpts); | ||
} else { | ||
if (!data._rev) { | ||
try { | ||
// If there is no `_rev` key, check if the doc already exists | ||
const doc = await db.get(data._id, { latest: true }); | ||
// Save the rev from the database onto the object if the doc exists | ||
data._rev = doc._rev; | ||
} catch (err) { | ||
// If the doc doesn't already exist, just keep going | ||
if (err.status !== 404) throw err; | ||
} | ||
} | ||
return db.put(data, pouchOpts); | ||
} | ||
} | ||
}); | ||
// Save the results in the database | ||
await db.bulkDocs(data, pouchOpts); | ||
} else { | ||
if (!data._rev) { | ||
try { | ||
// If there is no `_rev` key, check if the doc already exists | ||
const doc = await db.get(data._id, { latest: true }); | ||
// Save the rev from the database onto the object if the doc exists | ||
data._rev = doc._rev; | ||
} catch (err) { | ||
// If the doc doesn't already exist, just keep going | ||
if (err.status !== 404) throw err; | ||
} | ||
} | ||
await db.put(data, pouchOpts); | ||
} | ||
} | ||
}); | ||
} | ||
// @ts-check | ||
/** | ||
* Parse CSV data from the input stream and save it to the PouchDB database. | ||
* Each row is saved as a seperate PouchDB document. If the `_id` prexists in | ||
* Each row is saved as a separate PouchDB document. If the `_id` preexists in | ||
* the database, the existing document will be updated with the new version | ||
* and the revision version will change. | ||
* @param {PouchDB.Database} db Database to save results to. | ||
* @param {stream.Readable} input Stream representing the CSV file, | ||
* @template T | ||
* @param {PouchDB.Database<T>} db Database to save results to. | ||
* @param {NodeJS.ReadableStream} input Stream representing the CSV file, | ||
* such as `fs.createReadableStream('data.csv')` | ||
* @param {function} [transformer] Optional function to transform CSV rows. | ||
* @param {(row: any) => T} [transformer] Optional function to transform CSV rows. | ||
* Input `row` represents the CSV data, and the returned object will be used as | ||
@@ -273,35 +126,24 @@ * a PouchDB document. | ||
function parseCSVFile(db, input, transformer = doc => doc) { | ||
return new Promise((resolve, reject) => { | ||
const csvParser = parse({ | ||
columns: true, | ||
ltrim: true, rtrim: true, | ||
skip_empty_lines: true, | ||
}); | ||
const _transformer = index(transformer); | ||
const batcher = batch({ maxWait: 100, maxItems: 50 }); | ||
const dbStream = createPouchStream(db); | ||
const csvParser = parse({ | ||
columns: true, | ||
ltrim: true, | ||
rtrim: true, | ||
skip_empty_lines: true, | ||
relax_column_count: true | ||
}); | ||
const _transformer = transform(transformer); | ||
const batcher = batch({ maxWait: 100, maxItems: 50 }); | ||
const dbStream = createPouchStream(db); | ||
function rejectAndClear(err) { | ||
reject(err); | ||
csvParser.removeListener('error', rejectAndClear); | ||
_transformer.removeListener('error', rejectAndClear); | ||
batcher.removeListener('error', rejectAndClear); | ||
dbStream.removeListener('error', rejectAndClear); | ||
} | ||
const parsingStream = input | ||
.pipe(csvParser) | ||
.pipe(_transformer) | ||
.pipe(batcher) | ||
.pipe(dbStream); | ||
csvParser.on('error', rejectAndClear); | ||
_transformer.on('error', rejectAndClear); | ||
batcher.on('error', rejectAndClear); | ||
dbStream.on('error', rejectAndClear); | ||
const parsingStream = input | ||
.pipe(csvParser) | ||
.pipe(_transformer) | ||
.pipe(batcher) | ||
.pipe(dbStream) | ||
.once('finish', resolve); | ||
}); | ||
return finished(parsingStream); | ||
} | ||
export { parseCSVFile };export default parseCSVFile; | ||
export default parseCSVFile; | ||
export { parseCSVFile }; | ||
//# sourceMappingURL=index.es.js.map |
368
index.js
@@ -8,191 +8,25 @@ 'use strict'; | ||
var parse = _interopDefault(require('csv-parse')); | ||
var transform = _interopDefault(require('stream-transform')); | ||
var batch = _interopDefault(require('stream-batch')); | ||
var stream = require('stream'); | ||
var stream__default = _interopDefault(stream); | ||
var util = _interopDefault(require('util')); | ||
var batch = _interopDefault(require('stream-batch')); | ||
// Generated by CoffeeScript 1.9.2 | ||
var Transformer; | ||
var stream$1; | ||
var util$1; | ||
var slice = [].slice; | ||
stream$1 = stream__default; | ||
util$1 = util; | ||
var index = function() { | ||
var argument, callback, data, error, handler, i, j, k, len, options, result, transform, type, v; | ||
options = {}; | ||
for (i = j = 0, len = arguments.length; j < len; i = ++j) { | ||
argument = arguments[i]; | ||
type = typeof argument; | ||
if (argument === null) { | ||
type = 'null'; | ||
} else if (type === 'object' && Array.isArray(argument)) { | ||
type = 'array'; | ||
} | ||
if (i === 0) { | ||
if (type === 'function') { | ||
handler = argument; | ||
} else if (type !== null) { | ||
data = argument; | ||
} | ||
continue; | ||
} | ||
if (type === 'object') { | ||
for (k in argument) { | ||
v = argument[k]; | ||
options[k] = v; | ||
} | ||
} else if (type === 'function') { | ||
if (handler && i === arguments.length - 1) { | ||
callback = argument; | ||
} else { | ||
handler = argument; | ||
} | ||
} else if (type !== 'null') { | ||
throw new Error('Invalid arguments'); | ||
} | ||
} | ||
transform = new Transformer(options, handler); | ||
error = false; | ||
if (data) { | ||
process.nextTick(function() { | ||
var len1, m, row; | ||
for (m = 0, len1 = data.length; m < len1; m++) { | ||
row = data[m]; | ||
if (error) { | ||
break; | ||
} | ||
transform.write(row); | ||
} | ||
return transform.end(); | ||
/** | ||
* Resolves when the `'finish'` event fires, rejects when the `'error'` event fires | ||
* @param {import("stream").Writable} stream | ||
* @returns {Promise<void>} | ||
*/ | ||
function finished(stream$$1) { | ||
return new Promise((resolve, reject) => { | ||
stream$$1.once('finish', resolve).once('error', reject); | ||
}); | ||
} | ||
if (callback || options.consume) { | ||
result = []; | ||
transform.on('readable', function() { | ||
var r, results; | ||
results = []; | ||
while ((r = transform.read())) { | ||
if (callback) { | ||
results.push(result.push(r)); | ||
} else { | ||
results.push(void 0); | ||
} | ||
} | ||
return results; | ||
}); | ||
transform.on('error', function(err) { | ||
error = true; | ||
if (callback) { | ||
return callback(err); | ||
} | ||
}); | ||
transform.on('end', function() { | ||
if (callback && !error) { | ||
return callback(null, result); | ||
} | ||
}); | ||
} | ||
return transform; | ||
}; | ||
Transformer = function(options1, transform1) { | ||
var base; | ||
this.options = options1 != null ? options1 : {}; | ||
this.transform = transform1; | ||
this.options.objectMode = true; | ||
if ((base = this.options).parallel == null) { | ||
base.parallel = 100; | ||
} | ||
stream$1.Transform.call(this, this.options); | ||
this.running = 0; | ||
this.started = 0; | ||
this.finished = 0; | ||
return this; | ||
}; | ||
util$1.inherits(Transformer, stream$1.Transform); | ||
var Transformer_1 = Transformer; | ||
Transformer.prototype._transform = function(chunk, encoding, cb) { | ||
var callback, err, l; | ||
this.started++; | ||
this.running++; | ||
if (this.running < this.options.parallel) { | ||
cb(); | ||
cb = null; | ||
} | ||
try { | ||
l = this.transform.length; | ||
if (this.options.params != null) { | ||
l--; | ||
} | ||
if (l === 1) { | ||
this._done(null, [this.transform.call(null, chunk, this.options.params)], cb); | ||
} else if (l === 2) { | ||
callback = (function(_this) { | ||
return function() { | ||
var chunks, err; | ||
err = arguments[0], chunks = 2 <= arguments.length ? slice.call(arguments, 1) : []; | ||
return _this._done(err, chunks, cb); | ||
}; | ||
})(this); | ||
this.transform.call(null, chunk, callback, this.options.params); | ||
} else { | ||
throw Error("Invalid handler arguments"); | ||
} | ||
return false; | ||
} catch (_error) { | ||
err = _error; | ||
return this._done(err); | ||
} | ||
}; | ||
Transformer.prototype._flush = function(cb) { | ||
this._ending = function() { | ||
if (this.running === 0) { | ||
return cb(); | ||
} | ||
}; | ||
return this._ending(); | ||
}; | ||
Transformer.prototype._done = function(err, chunks, cb) { | ||
var chunk, j, len; | ||
this.running--; | ||
if (err) { | ||
return this.emit('error', err); | ||
} | ||
this.finished++; | ||
for (j = 0, len = chunks.length; j < len; j++) { | ||
chunk = chunks[j]; | ||
if (typeof chunk === 'number') { | ||
chunk = "" + chunk; | ||
} | ||
if (chunk != null) { | ||
this.push(chunk); | ||
} | ||
} | ||
if (cb) { | ||
cb(); | ||
} | ||
if (this._ending) { | ||
return this._ending(); | ||
} | ||
}; | ||
index.Transformer = Transformer_1; | ||
} | ||
function wrapWrite(writeFunc) { | ||
return (chunk, encoding, next) => | ||
Promise.resolve(writeFunc(chunk, encoding)).then(() => next(), next); | ||
return (chunk, encoding, next) => | ||
Promise.resolve(writeFunc(chunk, encoding)).then(() => next(), next); | ||
} | ||
function wrapWritev(writeFunc) { | ||
return (chunks, next) => | ||
Promise.resolve(writeFunc(chunks)).then(() => next(), next); | ||
return (chunks, next) => | ||
Promise.resolve(writeFunc(chunks)).then(() => next(), next); | ||
} | ||
/** | ||
@@ -203,74 +37,92 @@ * Writable stream constructor that uses async functions for write rather | ||
class PromiseWritable extends stream.Writable { | ||
constructor(options) { | ||
const superOpts = Object.assign({}, options); | ||
if (options.write) superOpts.write = wrapWrite(options.write); | ||
if (options.writev) superOpts.writev = wrapWritev(options.writev); | ||
constructor(options) { | ||
const superOpts = Object.assign({}, options); | ||
if (options.write) superOpts.write = wrapWrite(options.write); | ||
if (options.writev) superOpts.writev = wrapWritev(options.writev); | ||
super(superOpts); | ||
} | ||
super(superOpts); | ||
} | ||
} | ||
// @ts-check | ||
/** | ||
* Checks if a PouchDB row from `allDocs` is for a nonexistent document. | ||
* @param {any} row | ||
*/ | ||
function isErrorRow(row) { | ||
return row && row.error === "not_found"; | ||
} | ||
/** | ||
* Creates a Writable stream that saves data into a PouchDB database. | ||
* @param {PouchDB.Database} db - database to save to | ||
* @template T | ||
* @param {PouchDB.Database<T>} db - database to save to | ||
* @param {object} [pouchOpts] - options passed to PouchDB's put or bulkDocs | ||
* function | ||
* @returns {stream.Writable} | ||
* @returns {NodeJS.WritableStream} | ||
*/ | ||
function createPouchStream(db, pouchOpts) { | ||
return new PromiseWritable({ | ||
objectMode: true, | ||
decodeStrings: false, | ||
async write(chunk) { | ||
// Data should be in object format. Convert buffers and strings | ||
// by parsing the JSON they should represent | ||
let data = chunk; | ||
if (Buffer.isBuffer(data)) data = data.toString(); | ||
if (typeof data === 'string') data = JSON.parse(data); | ||
return new PromiseWritable({ | ||
objectMode: true, | ||
decodeStrings: false, | ||
async write(chunk) { | ||
// Data should be in object format. Convert buffers and strings | ||
// by parsing the JSON they should represent | ||
let data = /** @type {any} */ (chunk); | ||
if (Buffer.isBuffer(data)) data = data.toString(); | ||
if (typeof data === "string") data = JSON.parse(data); | ||
if (Array.isArray(data)) { | ||
// Find all objects in the array without a `_rev` field | ||
const noRevs = data | ||
.filter(doc => !doc._rev) | ||
.reduce((map, doc) => map.set(doc._id, doc), new Map()); | ||
if (Array.isArray(data)) { | ||
// Find all objects in the array without a `_rev` field | ||
const noRevs = data | ||
.filter(doc => !doc._rev) | ||
.reduce((map, doc) => map.set(doc._id, doc), new Map()); | ||
if (noRevs.size > 0) { | ||
// If there are objects without a `_rev` key, | ||
// check if they already exist in the database. | ||
// If so, add the existing rev property to the doc. | ||
const existing = await db.allDocs({ keys: [...noRevs.keys()] }); | ||
existing.rows | ||
.filter(row => !row.error) | ||
.forEach((row) => { noRevs.get(row.id)._rev = row.value.rev; }); | ||
} | ||
if (noRevs.size > 0) { | ||
// If there are objects without a `_rev` key, | ||
// check if they already exist in the database. | ||
// If so, add the existing rev property to the doc. | ||
const existing = await db.allDocs({ | ||
keys: [...noRevs.keys()] | ||
}); | ||
existing.rows | ||
.filter(row => !isErrorRow(row)) | ||
.forEach(row => { | ||
noRevs.get(row.id)._rev = row.value.rev; | ||
}); | ||
} | ||
// Save the results in the database | ||
return db.bulkDocs(data, pouchOpts); | ||
} else { | ||
if (!data._rev) { | ||
try { | ||
// If there is no `_rev` key, check if the doc already exists | ||
const doc = await db.get(data._id, { latest: true }); | ||
// Save the rev from the database onto the object if the doc exists | ||
data._rev = doc._rev; | ||
} catch (err) { | ||
// If the doc doesn't already exist, just keep going | ||
if (err.status !== 404) throw err; | ||
} | ||
} | ||
return db.put(data, pouchOpts); | ||
} | ||
} | ||
}); | ||
// Save the results in the database | ||
await db.bulkDocs(data, pouchOpts); | ||
} else { | ||
if (!data._rev) { | ||
try { | ||
// If there is no `_rev` key, check if the doc already exists | ||
const doc = await db.get(data._id, { latest: true }); | ||
// Save the rev from the database onto the object if the doc exists | ||
data._rev = doc._rev; | ||
} catch (err) { | ||
// If the doc doesn't already exist, just keep going | ||
if (err.status !== 404) throw err; | ||
} | ||
} | ||
await db.put(data, pouchOpts); | ||
} | ||
} | ||
}); | ||
} | ||
// @ts-check | ||
/** | ||
* Parse CSV data from the input stream and save it to the PouchDB database. | ||
* Each row is saved as a seperate PouchDB document. If the `_id` prexists in | ||
* Each row is saved as a separate PouchDB document. If the `_id` preexists in | ||
* the database, the existing document will be updated with the new version | ||
* and the revision version will change. | ||
* @param {PouchDB.Database} db Database to save results to. | ||
* @param {stream.Readable} input Stream representing the CSV file, | ||
* @template T | ||
* @param {PouchDB.Database<T>} db Database to save results to. | ||
* @param {NodeJS.ReadableStream} input Stream representing the CSV file, | ||
* such as `fs.createReadableStream('data.csv')` | ||
* @param {function} [transformer] Optional function to transform CSV rows. | ||
* @param {(row: any) => T} [transformer] Optional function to transform CSV rows. | ||
* Input `row` represents the CSV data, and the returned object will be used as | ||
@@ -281,36 +133,24 @@ * a PouchDB document. | ||
function parseCSVFile(db, input, transformer = doc => doc) { | ||
return new Promise((resolve, reject) => { | ||
const csvParser = parse({ | ||
columns: true, | ||
ltrim: true, rtrim: true, | ||
skip_empty_lines: true, | ||
}); | ||
const _transformer = index(transformer); | ||
const batcher = batch({ maxWait: 100, maxItems: 50 }); | ||
const dbStream = createPouchStream(db); | ||
const csvParser = parse({ | ||
columns: true, | ||
ltrim: true, | ||
rtrim: true, | ||
skip_empty_lines: true, | ||
relax_column_count: true | ||
}); | ||
const _transformer = transform(transformer); | ||
const batcher = batch({ maxWait: 100, maxItems: 50 }); | ||
const dbStream = createPouchStream(db); | ||
function rejectAndClear(err) { | ||
reject(err); | ||
csvParser.removeListener('error', rejectAndClear); | ||
_transformer.removeListener('error', rejectAndClear); | ||
batcher.removeListener('error', rejectAndClear); | ||
dbStream.removeListener('error', rejectAndClear); | ||
} | ||
const parsingStream = input | ||
.pipe(csvParser) | ||
.pipe(_transformer) | ||
.pipe(batcher) | ||
.pipe(dbStream); | ||
csvParser.on('error', rejectAndClear); | ||
_transformer.on('error', rejectAndClear); | ||
batcher.on('error', rejectAndClear); | ||
dbStream.on('error', rejectAndClear); | ||
const parsingStream = input | ||
.pipe(csvParser) | ||
.pipe(_transformer) | ||
.pipe(batcher) | ||
.pipe(dbStream) | ||
.once('finish', resolve); | ||
}); | ||
return finished(parsingStream); | ||
} | ||
exports['default'] = parseCSVFile; | ||
exports.default = parseCSVFile; | ||
exports.parseCSVFile = parseCSVFile; | ||
//# sourceMappingURL=index.js.map |
{ | ||
"name": "csv-to-pouch", | ||
"version": "3.1.2", | ||
"description": "Parse CSV files and save their data to a PouchDB database.", | ||
"main": "index.js", | ||
"module": "index.es.js", | ||
"license": "MIT", | ||
"repository": "NotWoods/csv-to-pouch", | ||
"bin": { | ||
"csv-to-pouch": "./cmd.js" | ||
}, | ||
"dependencies": { | ||
"csv-parse": "^1.2.0", | ||
"minimist": "^1.2.0", | ||
"stream-batch": "^1.0.0" | ||
}, | ||
"devDependencies": { | ||
"promise-stream-utils": "^1.0.7", | ||
"rollup": "^0.41.5", | ||
"rollup-plugin-commonjs": "^8.0.2", | ||
"rollup-plugin-node-resolve": "^2.0.0", | ||
"stream-transform": "^0.1.2" | ||
}, | ||
"scripts": { | ||
"prepare": "rollup -c", | ||
"prepublish": "npm run prepare" | ||
}, | ||
"files": [ | ||
"index.js", | ||
"index.js.map", | ||
"index.es.js", | ||
"index.es.js.map", | ||
"index.d.ts", | ||
"cmd.js" | ||
] | ||
"name": "csv-to-pouch", | ||
"version": "3.3.0", | ||
"description": "Parse CSV files and save their data to a PouchDB database.", | ||
"license": "MIT", | ||
"author": "Tiger Oakes <contact@tigeroakes.com> (https://tigeroakes.com)", | ||
"repository": "NotWoods/csv-to-pouch", | ||
"bugs": "https://github.com/NotWoods/csv-to-pouch/issues", | ||
"homepage": "https://github.com/NotWoods/csv-to-pouch#readme", | ||
"main": "index.js", | ||
"module": "index.es.js", | ||
"types": "index.d.ts", | ||
"bin": { | ||
"csv-to-pouch": "./cmd.js" | ||
}, | ||
"scripts": { | ||
"build": "rollup -c", | ||
"lint": "prettier --parser babel \"src/**/*.js\" \"cmd.js\" --write", | ||
"lint:check": "tsc --noEmit && prettier --parser babylon \"src/**/*.js\" \"cmd.js\" --list-different" | ||
}, | ||
"dependencies": { | ||
"csv-parse": "^1.2.0", | ||
"minimist": "^1.2.0", | ||
"stream-batch": "^1.0.0", | ||
"stream-transform": "^1.0.8" | ||
}, | ||
"devDependencies": { | ||
"@types/pouchdb": "^6.3.2", | ||
"husky": "^1.2.0", | ||
"prettier": "^1.19.1", | ||
"promise-stream-utils": "^1.1.0", | ||
"rollup": "^0.67.4", | ||
"rollup-plugin-node-resolve": "^4.0.0", | ||
"typescript": "^3.7.2" | ||
}, | ||
"files": [ | ||
"index.js", | ||
"index.js.map", | ||
"index.es.js", | ||
"index.es.js.map", | ||
"index.d.ts", | ||
"cmd.js" | ||
] | ||
} |
@@ -0,0 +0,0 @@ # csv-to-pouch |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
No contributors or author data
MaintenancePackage does not specify a list of contributors or an author in package.json.
Found 1 instance in 1 package
No bug tracker
MaintenancePackage does not have a linked bug tracker in package.json.
Found 1 instance in 1 package
No website
QualityPackage does not have a website.
Found 1 instance in 1 package
2
0
42986
4
7
370
1
+ Addedstream-transform@^1.0.8
+ Addedstream-transform@1.0.8(transitive)