Comparing version 2.0.0 to 2.1.1
@@ -36,2 +36,5 @@ var path = require('path') | ||
} | ||
if (!onReady) onReady = function(){} | ||
this.dir = dir | ||
@@ -41,3 +44,4 @@ this.opts = opts | ||
this.meta = meta(this.dir, function(err) { | ||
if (onReady) onReady(err) | ||
if (err) return onReady() | ||
self._storage(opts, onReady) | ||
}) | ||
@@ -44,0 +48,0 @@ } |
// this module assumes it will be used as a .prototype (e.g. uses `this`) | ||
var EOL = require('os').EOL | ||
var fs = require('fs') | ||
var path = require('path') | ||
var http = require('http') | ||
var EOL = require('os').EOL | ||
var events = require('events') | ||
var http = require('http') | ||
var bops = require('bops') | ||
var through = require('through') | ||
var rimraf = require('rimraf') | ||
var mkdirp = require('mkdirp') | ||
@@ -14,29 +15,20 @@ var extend = require('extend') | ||
var level = require('level-hyper') | ||
var sleepRef = require('sleep-ref') | ||
var LiveStream = require('level-live-stream') | ||
var sleepRef = require('sleep-ref') | ||
var rimraf = require('rimraf') | ||
var byteStream = require('byte-stream') | ||
var combiner = require('stream-combiner') | ||
var binaryCSV = require('binary-csv') | ||
var multibuffer = require('multibuffer') | ||
var mbstream = require('multibuffer-stream') | ||
var split = require('binary-split') | ||
var levelBackup = require('hyperlevel-backup') | ||
var connectionManager = require(path.join(__dirname, 'connection-manager')) | ||
var storage = require(path.join(__dirname, 'storage')) | ||
var restHandler = require(path.join(__dirname, 'rest-handler')) | ||
var csvBuffEncoder = require(path.join(__dirname, 'csv-buff-encoder')) | ||
var jsonBuffEncoder = require(path.join(__dirname, 'json-buff-encoder')) | ||
var storage = require(path.join(__dirname, 'storage')) | ||
var headStream = require(path.join(__dirname, 'head-stream')) | ||
var writeStream = require(path.join(__dirname, 'write-stream')) | ||
var jsonLogStream = require(path.join(__dirname, 'json-log-stream')) | ||
var connectionManager = require(path.join(__dirname, 'connection-manager')) | ||
var sleepPrefix = 'd' | ||
var dbOptions = { | ||
var dat = {} | ||
module.exports = dat | ||
dat.dbOptions = { | ||
writeBufferSize: 1024 * 1024 * 16 // 16MB | ||
} | ||
var dat = {} | ||
module.exports = dat | ||
dat.paths = function(root) { | ||
@@ -168,3 +160,3 @@ root = root || this.dir || process.cwd() | ||
path = path || this.paths(path).level | ||
var db = level(path, extend({}, opts, dbOptions), cb) | ||
var db = level(path, extend({}, opts, this.dbOptions), cb) | ||
LiveStream.install(db) | ||
@@ -255,3 +247,3 @@ this.db = db | ||
if (err) return cb(err) | ||
store.currentData().pipe(jsonLogStream(cb)) | ||
self.createReadStream().pipe(jsonLogStream(cb)) | ||
}) | ||
@@ -281,232 +273,4 @@ }) | ||
// TODO split this function up into modules | ||
dat.createWriteStream = function(options) { | ||
if (typeof options === 'undefined') options = {} | ||
if (options.argv) options = options.argv | ||
// grab columns from options | ||
var columns = options.c || options.columns | ||
if (columns && !(columns instanceof Array)) columns = [columns] | ||
var self = this | ||
var store = self.storage | ||
var ended = false | ||
var writing = false | ||
if (Object.keys(options).indexOf('overwrite') === -1) { | ||
// if db is empty then use overwrite mode (faster) | ||
if (store.seq === 0) options.overwrite = true | ||
} | ||
var primary | ||
var primaryKeys = [] | ||
var primaryIndex | ||
if (!options.overwrite) primary = '_id' | ||
if (options.primary) primary = options.primary | ||
if (primary) { | ||
var currentColumns = self.meta.json.columns.length ? self.meta.json.columns : columns | ||
if (currentColumns) primaryIndex = currentColumns.indexOf(primary) | ||
var onRow = function (row) { | ||
var primaryVal | ||
if (primary === '_id' || primaryIndex > -1) { | ||
if (Array.isArray(row)) primaryVal = row[primaryIndex] | ||
else if (bops.is(row)) primaryVal = bufferAt(row, primaryIndex) | ||
else primaryVal = row[primary] | ||
} | ||
if (primary === '_id' && !primaryVal) primaryVal = store.uuid() | ||
primaryKeys.push(primaryVal) | ||
if (this.queue) this.queue(row) | ||
} | ||
} | ||
if (columns) { | ||
var newColumns = self.meta.getNewColumns(columns) | ||
if (newColumns.length > 0) { | ||
self.meta.addColumns(newColumns, function(err) { | ||
if (err) console.error('error updating columns', err) | ||
}) | ||
} | ||
} | ||
var batchStream = byteStream(dbOptions.writeBufferSize) | ||
var writeStream = through(onWrite, onEnd) | ||
var pipeChain = [batchStream, writeStream] | ||
if (options.csv || options.f == 'csv') { // raw csv | ||
var delim = options.d || options.delim | ||
var csvParser = binaryCSV(delim) | ||
// grab first row of csv and store columns | ||
function onFirstCSV(buf, next) { | ||
var csvColumns = csvParser.line(buf) | ||
for (var i = 0; i < csvColumns.length; i++) csvColumns[i] = csvParser.cell(csvColumns[i]) | ||
csvColumns = csvColumns.map(function(i) { return i.toString() }) | ||
if (primary) { primaryIndex = csvColumns.indexOf(primary) } | ||
var newColumns = self.meta.getNewColumns(csvColumns) | ||
if (newColumns.length > 0) { | ||
self.meta.addColumns(newColumns, function(err) { | ||
if (err) console.error('error updating columns', err) | ||
if (primary) primaryIndex = self.meta.json.columns.indexOf(primary) | ||
next() | ||
}) | ||
} else { | ||
next() | ||
} | ||
} | ||
if (typeof options.headerRow === 'undefined') options.headerRow = true | ||
pipeChain.unshift(csvBuffEncoder(onRow)) | ||
if (options.headerRow) pipeChain.unshift(headStream(onFirstCSV)) // skip first line of csv | ||
pipeChain.unshift(csvParser) | ||
} else if (options.json || options.f == 'json') { // raw ndjson | ||
var newlineParser = split() | ||
var jsonEncoder = jsonBuffEncoder(store, onRow) | ||
function onFirstJSON(obj, next) { | ||
var newColumns = self.meta.getNewColumns(Object.keys(JSON.parse(obj))) | ||
if (newColumns.length > 0) { | ||
self.meta.addColumns(newColumns, function(err) { | ||
if (err) console.error('error updating columns', err) | ||
if (primary) primaryIndex = self.meta.json.columns.indexOf(primary) | ||
next() | ||
}) | ||
} else { | ||
next() | ||
} | ||
} | ||
pipeChain.unshift(jsonEncoder) | ||
pipeChain.unshift(headStream(onFirstJSON, {includeHead: true})) | ||
pipeChain.unshift(newlineParser) | ||
} else if (options.objects || options.f == 'objects') { // stream of JS Objects (not JSON) | ||
var jsonEncoder = jsonBuffEncoder(store, onRow) | ||
function onFirstObject(obj, next) { | ||
var newColumns = self.meta.getNewColumns(Object.keys(obj)) | ||
if (newColumns.length > 0) { | ||
self.meta.addColumns(newColumns, function(err) { | ||
if (err) console.error('error updating columns', err) | ||
if (primary) primaryIndex = self.meta.json.columns.indexOf(primary) | ||
next() | ||
}) | ||
} else { | ||
next() | ||
} | ||
} | ||
pipeChain.unshift(jsonEncoder) | ||
pipeChain.unshift(headStream(onFirstObject, {includeHead: true})) | ||
} else { // if no specific format is specified then assume .buff | ||
if (primary) { | ||
var primaryExtractor = through(onRow) | ||
pipeChain.unshift(primaryExtractor) | ||
} | ||
pipeChain.unshift(mbstream.unpackStream()) | ||
} | ||
return combiner.apply(combiner, pipeChain) | ||
function writeBatch(rows) { | ||
var batch = store.db.batch() | ||
var len = rows.length | ||
var pending = len | ||
if (pending > 0) writing = true | ||
for (var i = 0; i < len; i++) { | ||
var row = rows[i] | ||
var doc = {} | ||
if (row._rev) { | ||
doc._rev = row._rev | ||
row = row.buffer | ||
} | ||
if (primary) doc._id = primaryKeys.shift().toString() | ||
var meta = store.updateRevision(doc, row) | ||
if (!meta) { | ||
rows[i] = {success: true, row: doc, existed: true} | ||
pending-- | ||
if (pending === 0) commit() | ||
continue | ||
} | ||
var seq = store.seq = store.seq + 1 | ||
var keys = store.rowKeys(meta._id, meta._rev, seq) | ||
batch.put(keys.seq, JSON.stringify([seq, meta._id, meta._rev])) | ||
batch.put(keys.row, row) | ||
rows[i] = {success: true, row: meta} | ||
pending-- | ||
if (pending === 0) commit() | ||
} | ||
function commit() { | ||
if (batch.ops.length === 0) return next() | ||
batch.write(function(err) { | ||
if (err) console.error('batch write err', err) | ||
next() | ||
}) | ||
function next() { | ||
writing = false | ||
for (var i = 0; i < len; i++) writeStream.queue(rows[i]) | ||
batchStream.next() | ||
if (ended) writeStream.queue(null) | ||
} | ||
} | ||
} | ||
function checkRows(rows, cb) { | ||
var len = rows.length | ||
var pending = len | ||
var results = [] | ||
var errors = [] | ||
for (var i = 0; i < len; i++) { | ||
var key = primaryKeys[i].toString() | ||
store.get(key, onRow) | ||
} | ||
function onRow(err, row) { | ||
results.push([err, row]) | ||
pending-- | ||
if (pending === 0) finish() | ||
} | ||
function finish() { | ||
for (var i = 0; i < results.length; i++) { | ||
var err = results[i][0] | ||
var row = results[i][1] | ||
var result = {} | ||
if (err && err.message !== 'range not found') { | ||
result.key = key | ||
result.error = err.message | ||
errors.push(result) | ||
} | ||
if (row) { | ||
result._rev = row._rev | ||
result.buffer = rows[i] | ||
rows[i] = result | ||
} | ||
} | ||
cb(errors.length > 0 ? errors : null, rows) | ||
} | ||
} | ||
function onWrite(rows) { | ||
if (options.overwrite) { | ||
writeBatch(rows) | ||
} else { | ||
checkRows(rows, function(errs, updatedRows) { | ||
if (errs) return console.error('fatal write errors', errs) | ||
writeBatch(updatedRows) | ||
}) | ||
} | ||
} | ||
function onEnd() { | ||
ended = true | ||
if (!writing) writeStream.queue(null) | ||
} | ||
return writeStream(this, options) | ||
} | ||
@@ -546,7 +310,1 @@ | ||
} | ||
function bufferAt(mb, idx) { | ||
var data = [null, mb] | ||
for (var i = 0; i < idx + 1; i++) data = multibuffer.readPartial(data[1]) | ||
return data[0] | ||
} |
@@ -9,2 +9,3 @@ var through = require('through') | ||
var rest = false | ||
var ended = false | ||
var stream = through(write, end) | ||
@@ -22,2 +23,3 @@ | ||
self.resume() | ||
if (ended) self.queue(null) | ||
}) | ||
@@ -27,5 +29,5 @@ } | ||
function end() { | ||
this.resume() | ||
this.queue(null) | ||
ended = true | ||
if (!this.paused) this.queue(null) | ||
} | ||
} |
@@ -19,3 +19,3 @@ var datapackage = require('datapackage-json') | ||
if (!self.json.columns) self.json.columns = [] | ||
ready() | ||
ready(err) | ||
}) | ||
@@ -22,0 +22,0 @@ } |
{ | ||
"name": "dat", | ||
"version": "2.0.0", | ||
"version": "2.1.1", | ||
"description": "real-time replication and versioning for large tabular data sets", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
var path = require('path') | ||
var os = require('os') | ||
var crypto = require('crypto') | ||
var datPath = path.join(__dirname, '..') | ||
@@ -150,3 +151,3 @@ var Dat = require(datPath) | ||
getDat(t, function(dat, done) { | ||
var ws = dat.createWriteStream({ json: true }) | ||
@@ -471,2 +472,55 @@ | ||
test('composite primary key', function(t) { | ||
getDat(t, function(dat, done) { | ||
var ws = dat.createWriteStream({ objects: true, primary: ['a', 'b'] }) | ||
ws.on('close', function() { | ||
dat.get('foobar', function(err, data) { | ||
t.false(err, 'no error') | ||
t.equal(data.c, "hello") | ||
done() | ||
}) | ||
}) | ||
ws.write({"a": "foo", "b": "bar", "c": "hello"}) | ||
ws.end() | ||
}) | ||
}) | ||
test('composite primary key w/ custom keySeparator', function(t) { | ||
getDat(t, function(dat, done) { | ||
var ws = dat.createWriteStream({ objects: true, primary: ['a', 'b'], separator: '@' }) | ||
ws.on('close', function() { | ||
dat.get('foo@bar', function(err, data) { | ||
t.false(err, 'no error') | ||
t.equal(data.c, "hello") | ||
done() | ||
}) | ||
}) | ||
ws.write({"a": "foo", "b": "bar", "c": "hello"}) | ||
ws.end() | ||
}) | ||
}) | ||
test('composite primary key w/ composite hashing enabled', function(t) { | ||
getDat(t, function(dat, done) { | ||
var ws = dat.createWriteStream({ objects: true, primary: ['a', 'b'], hash: true }) | ||
ws.on('close', function() { | ||
var key = crypto.createHash('md5').update('foobar').digest("hex") | ||
dat.get(key, function(err, data) { | ||
t.false(err, 'no error') | ||
t.equal(data.c, "hello") | ||
done() | ||
}) | ||
}) | ||
ws.write({"a": "foo", "b": "bar", "c": "hello"}) | ||
ws.end() | ||
}) | ||
}) | ||
test('csv writeStream w/ headerRow false', function(t) { | ||
@@ -473,0 +527,0 @@ getDat(t, function(dat, done) { |
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
14221539
30
2136
20