New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

dat

Package Overview
Dependencies
Maintainers
1
Versions
212
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

dat - npm Package Compare versions

Comparing version 2.0.0 to 2.1.1

lib/write-stream.js

6

index.js

@@ -36,2 +36,5 @@ var path = require('path')

}
if (!onReady) onReady = function(){}
this.dir = dir

@@ -41,3 +44,4 @@ this.opts = opts

this.meta = meta(this.dir, function(err) {
if (onReady) onReady(err)
if (err) return onReady()
self._storage(opts, onReady)
})

@@ -44,0 +48,0 @@ }

274

lib/commands.js
// this module assumes it will be used as a .prototype (e.g. uses `this`)
var EOL = require('os').EOL
var fs = require('fs')
var path = require('path')
var http = require('http')
var EOL = require('os').EOL
var events = require('events')
var http = require('http')
var bops = require('bops')
var through = require('through')
var rimraf = require('rimraf')
var mkdirp = require('mkdirp')

@@ -14,29 +15,20 @@ var extend = require('extend')

var level = require('level-hyper')
var sleepRef = require('sleep-ref')
var LiveStream = require('level-live-stream')
var sleepRef = require('sleep-ref')
var rimraf = require('rimraf')
var byteStream = require('byte-stream')
var combiner = require('stream-combiner')
var binaryCSV = require('binary-csv')
var multibuffer = require('multibuffer')
var mbstream = require('multibuffer-stream')
var split = require('binary-split')
var levelBackup = require('hyperlevel-backup')
var connectionManager = require(path.join(__dirname, 'connection-manager'))
var storage = require(path.join(__dirname, 'storage'))
var restHandler = require(path.join(__dirname, 'rest-handler'))
var csvBuffEncoder = require(path.join(__dirname, 'csv-buff-encoder'))
var jsonBuffEncoder = require(path.join(__dirname, 'json-buff-encoder'))
var storage = require(path.join(__dirname, 'storage'))
var headStream = require(path.join(__dirname, 'head-stream'))
var writeStream = require(path.join(__dirname, 'write-stream'))
var jsonLogStream = require(path.join(__dirname, 'json-log-stream'))
var connectionManager = require(path.join(__dirname, 'connection-manager'))
var sleepPrefix = 'd'
var dbOptions = {
var dat = {}
module.exports = dat
dat.dbOptions = {
writeBufferSize: 1024 * 1024 * 16 // 16MB
}
var dat = {}
module.exports = dat
dat.paths = function(root) {

@@ -168,3 +160,3 @@ root = root || this.dir || process.cwd()

path = path || this.paths(path).level
var db = level(path, extend({}, opts, dbOptions), cb)
var db = level(path, extend({}, opts, this.dbOptions), cb)
LiveStream.install(db)

@@ -255,3 +247,3 @@ this.db = db

if (err) return cb(err)
store.currentData().pipe(jsonLogStream(cb))
self.createReadStream().pipe(jsonLogStream(cb))
})

@@ -281,232 +273,4 @@ })

// TODO split this function up into modules
dat.createWriteStream = function(options) {
if (typeof options === 'undefined') options = {}
if (options.argv) options = options.argv
// grab columns from options
var columns = options.c || options.columns
if (columns && !(columns instanceof Array)) columns = [columns]
var self = this
var store = self.storage
var ended = false
var writing = false
if (Object.keys(options).indexOf('overwrite') === -1) {
// if db is empty then use overwrite mode (faster)
if (store.seq === 0) options.overwrite = true
}
var primary
var primaryKeys = []
var primaryIndex
if (!options.overwrite) primary = '_id'
if (options.primary) primary = options.primary
if (primary) {
var currentColumns = self.meta.json.columns.length ? self.meta.json.columns : columns
if (currentColumns) primaryIndex = currentColumns.indexOf(primary)
var onRow = function (row) {
var primaryVal
if (primary === '_id' || primaryIndex > -1) {
if (Array.isArray(row)) primaryVal = row[primaryIndex]
else if (bops.is(row)) primaryVal = bufferAt(row, primaryIndex)
else primaryVal = row[primary]
}
if (primary === '_id' && !primaryVal) primaryVal = store.uuid()
primaryKeys.push(primaryVal)
if (this.queue) this.queue(row)
}
}
if (columns) {
var newColumns = self.meta.getNewColumns(columns)
if (newColumns.length > 0) {
self.meta.addColumns(newColumns, function(err) {
if (err) console.error('error updating columns', err)
})
}
}
var batchStream = byteStream(dbOptions.writeBufferSize)
var writeStream = through(onWrite, onEnd)
var pipeChain = [batchStream, writeStream]
if (options.csv || options.f == 'csv') { // raw csv
var delim = options.d || options.delim
var csvParser = binaryCSV(delim)
// grab first row of csv and store columns
function onFirstCSV(buf, next) {
var csvColumns = csvParser.line(buf)
for (var i = 0; i < csvColumns.length; i++) csvColumns[i] = csvParser.cell(csvColumns[i])
csvColumns = csvColumns.map(function(i) { return i.toString() })
if (primary) { primaryIndex = csvColumns.indexOf(primary) }
var newColumns = self.meta.getNewColumns(csvColumns)
if (newColumns.length > 0) {
self.meta.addColumns(newColumns, function(err) {
if (err) console.error('error updating columns', err)
if (primary) primaryIndex = self.meta.json.columns.indexOf(primary)
next()
})
} else {
next()
}
}
if (typeof options.headerRow === 'undefined') options.headerRow = true
pipeChain.unshift(csvBuffEncoder(onRow))
if (options.headerRow) pipeChain.unshift(headStream(onFirstCSV)) // skip first line of csv
pipeChain.unshift(csvParser)
} else if (options.json || options.f == 'json') { // raw ndjson
var newlineParser = split()
var jsonEncoder = jsonBuffEncoder(store, onRow)
function onFirstJSON(obj, next) {
var newColumns = self.meta.getNewColumns(Object.keys(JSON.parse(obj)))
if (newColumns.length > 0) {
self.meta.addColumns(newColumns, function(err) {
if (err) console.error('error updating columns', err)
if (primary) primaryIndex = self.meta.json.columns.indexOf(primary)
next()
})
} else {
next()
}
}
pipeChain.unshift(jsonEncoder)
pipeChain.unshift(headStream(onFirstJSON, {includeHead: true}))
pipeChain.unshift(newlineParser)
} else if (options.objects || options.f == 'objects') { // stream of JS Objects (not JSON)
var jsonEncoder = jsonBuffEncoder(store, onRow)
function onFirstObject(obj, next) {
var newColumns = self.meta.getNewColumns(Object.keys(obj))
if (newColumns.length > 0) {
self.meta.addColumns(newColumns, function(err) {
if (err) console.error('error updating columns', err)
if (primary) primaryIndex = self.meta.json.columns.indexOf(primary)
next()
})
} else {
next()
}
}
pipeChain.unshift(jsonEncoder)
pipeChain.unshift(headStream(onFirstObject, {includeHead: true}))
} else { // if no specific format is specified then assume .buff
if (primary) {
var primaryExtractor = through(onRow)
pipeChain.unshift(primaryExtractor)
}
pipeChain.unshift(mbstream.unpackStream())
}
return combiner.apply(combiner, pipeChain)
function writeBatch(rows) {
var batch = store.db.batch()
var len = rows.length
var pending = len
if (pending > 0) writing = true
for (var i = 0; i < len; i++) {
var row = rows[i]
var doc = {}
if (row._rev) {
doc._rev = row._rev
row = row.buffer
}
if (primary) doc._id = primaryKeys.shift().toString()
var meta = store.updateRevision(doc, row)
if (!meta) {
rows[i] = {success: true, row: doc, existed: true}
pending--
if (pending === 0) commit()
continue
}
var seq = store.seq = store.seq + 1
var keys = store.rowKeys(meta._id, meta._rev, seq)
batch.put(keys.seq, JSON.stringify([seq, meta._id, meta._rev]))
batch.put(keys.row, row)
rows[i] = {success: true, row: meta}
pending--
if (pending === 0) commit()
}
function commit() {
if (batch.ops.length === 0) return next()
batch.write(function(err) {
if (err) console.error('batch write err', err)
next()
})
function next() {
writing = false
for (var i = 0; i < len; i++) writeStream.queue(rows[i])
batchStream.next()
if (ended) writeStream.queue(null)
}
}
}
function checkRows(rows, cb) {
var len = rows.length
var pending = len
var results = []
var errors = []
for (var i = 0; i < len; i++) {
var key = primaryKeys[i].toString()
store.get(key, onRow)
}
function onRow(err, row) {
results.push([err, row])
pending--
if (pending === 0) finish()
}
function finish() {
for (var i = 0; i < results.length; i++) {
var err = results[i][0]
var row = results[i][1]
var result = {}
if (err && err.message !== 'range not found') {
result.key = key
result.error = err.message
errors.push(result)
}
if (row) {
result._rev = row._rev
result.buffer = rows[i]
rows[i] = result
}
}
cb(errors.length > 0 ? errors : null, rows)
}
}
function onWrite(rows) {
if (options.overwrite) {
writeBatch(rows)
} else {
checkRows(rows, function(errs, updatedRows) {
if (errs) return console.error('fatal write errors', errs)
writeBatch(updatedRows)
})
}
}
function onEnd() {
ended = true
if (!writing) writeStream.queue(null)
}
return writeStream(this, options)
}

@@ -546,7 +310,1 @@

}
function bufferAt(mb, idx) {
var data = [null, mb]
for (var i = 0; i < idx + 1; i++) data = multibuffer.readPartial(data[1])
return data[0]
}

@@ -9,2 +9,3 @@ var through = require('through')

var rest = false
var ended = false
var stream = through(write, end)

@@ -22,2 +23,3 @@

self.resume()
if (ended) self.queue(null)
})

@@ -27,5 +29,5 @@ }

function end() {
this.resume()
this.queue(null)
ended = true
if (!this.paused) this.queue(null)
}
}

@@ -19,3 +19,3 @@ var datapackage = require('datapackage-json')

if (!self.json.columns) self.json.columns = []
ready()
ready(err)
})

@@ -22,0 +22,0 @@ }

{
"name": "dat",
"version": "2.0.0",
"version": "2.1.1",
"description": "real-time replication and versioning for large tabular data sets",

@@ -5,0 +5,0 @@ "main": "index.js",

var path = require('path')
var os = require('os')
var crypto = require('crypto')
var datPath = path.join(__dirname, '..')

@@ -150,3 +151,3 @@ var Dat = require(datPath)

getDat(t, function(dat, done) {
var ws = dat.createWriteStream({ json: true })

@@ -471,2 +472,55 @@

test('composite primary key', function(t) {
getDat(t, function(dat, done) {
var ws = dat.createWriteStream({ objects: true, primary: ['a', 'b'] })
ws.on('close', function() {
dat.get('foobar', function(err, data) {
t.false(err, 'no error')
t.equal(data.c, "hello")
done()
})
})
ws.write({"a": "foo", "b": "bar", "c": "hello"})
ws.end()
})
})
test('composite primary key w/ custom keySeparator', function(t) {
getDat(t, function(dat, done) {
var ws = dat.createWriteStream({ objects: true, primary: ['a', 'b'], separator: '@' })
ws.on('close', function() {
dat.get('foo@bar', function(err, data) {
t.false(err, 'no error')
t.equal(data.c, "hello")
done()
})
})
ws.write({"a": "foo", "b": "bar", "c": "hello"})
ws.end()
})
})
test('composite primary key w/ composite hashing enabled', function(t) {
getDat(t, function(dat, done) {
var ws = dat.createWriteStream({ objects: true, primary: ['a', 'b'], hash: true })
ws.on('close', function() {
var key = crypto.createHash('md5').update('foobar').digest("hex")
dat.get(key, function(err, data) {
t.false(err, 'no error')
t.equal(data.c, "hello")
done()
})
})
ws.write({"a": "foo", "b": "bar", "c": "hello"})
ws.end()
})
})
test('csv writeStream w/ headerRow false', function(t) {

@@ -473,0 +527,0 @@ getDat(t, function(dat, done) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc