Socket
Socket
Sign inDemoInstall

level-ws

Package Overview
Dependencies
9
Maintainers
4
Versions
7
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.1.0 to 1.0.0

CHANGELOG.md

187

level-ws.js

@@ -1,69 +0,28 @@

/* Copyright (c) 2013 LevelUP contributors
* See list at <https://github.com/rvagg/node-levelup#contributing>
* MIT +no-false-attribs License
* <https://github.com/Level/level-ws/master/LICENSE>
*/
var Writable = require('readable-stream').Writable
var inherits = require('inherits')
var extend = require('xtend')
var Writable = require('stream').Writable || require('readable-stream').Writable
, inherits = require('util').inherits
, extend = require('xtend')
var defaultOptions = { type: 'put' }
, defaultOptions = {
type : 'put'
, keyEncoding : 'utf8'
, valueEncoding : 'utf8'
}
function WriteStream (db, options) {
if (!(this instanceof WriteStream)) {
return new WriteStream(db, options)
}
// copied from LevelUP
, encodingNames = [
'hex'
, 'utf8'
, 'utf-8'
, 'ascii'
, 'binary'
, 'base64'
, 'ucs2'
, 'ucs-2'
, 'utf16le'
, 'utf-16le'
]
options = extend(defaultOptions, options)
// copied from LevelUP
, encodingOpts = (function () {
var eo = {}
encodingNames.forEach(function (e) {
eo[e] = { valueEncoding : e }
})
return eo
}())
Writable.call(this, {
objectMode: true,
highWaterMark: options.highWaterMark || 16
})
// copied from LevelUP
function getOptions (levelup, options) {
var s = typeof options == 'string' // just an encoding
if (!s && options && options.encoding && !options.valueEncoding)
options.valueEncoding = options.encoding
return extend(
(levelup && levelup.options) || {}
, s ? encodingOpts[options] || encodingOpts[defaultOptions.valueEncoding]
: options
)
}
function WriteStream (options, db) {
if (!(this instanceof WriteStream))
return new WriteStream(options, db)
Writable.call(this, { objectMode: true })
this._options = extend(defaultOptions, getOptions(db, options))
this._db = db
this._options = options
this._db = db
this._buffer = []
this.writable = true
this.readable = false
this._flushing = false
this._maxBufferLength = options.maxBufferLength || Infinity
var self = this
this.on('finish', function f () {
if (self._buffer && self._buffer.length) {
return self._flush(f)
}
self.writable = false
this.on('finish', function () {
self.emit('close')

@@ -75,19 +34,18 @@ })

WriteStream.prototype._write = function write (d, enc, next) {
WriteStream.prototype._write = function (data, enc, next) {
var self = this
if (self._destroyed)
return
if (!self._db.isOpen())
return self._db.once('ready', function () {
write.call(self, d, enc, next)
})
if (self.destroyed) return
if (self._options.maxBufferLength &&
self._buffer.length > self._options.maxBufferLength) {
self.once('_flush', next)
if (!self._flushing) {
self._flushing = true
process.nextTick(function () { self._flush() })
}
else {
if (self._buffer.length === 0)
process.nextTick(function () { self._flush() })
self._buffer.push(d)
if (self._buffer.length >= self._maxBufferLength) {
self.once('_flush', function (err) {
if (err) return self.destroy(err)
self._write(data, enc, next)
})
} else {
self._buffer.push(extend({ type: self._options.type }, data))
next()

@@ -97,60 +55,53 @@ }

WriteStream.prototype._flush = function (f) {
WriteStream.prototype._flush = function () {
var self = this
, buffer = self._buffer
var buffer = self._buffer
if (self._destroyed || !buffer) return
if (!self._db.isOpen()) {
return self._db.on('ready', function () { self._flush(f) })
}
if (self.destroyed) return
self._buffer = []
self._db.batch(buffer, cb)
self._db.batch(buffer.map(function (d) {
return {
type : d.type || self._options.type
, key : d.key
, value : d.value
, keyEncoding : d.keyEncoding || self._options.keyEncoding
, valueEncoding : d.valueEncoding
|| d.encoding
|| self._options.valueEncoding
}
}), cb)
function cb (err) {
self._flushing = false
function cb (err) {
if (err) {
self.writable = false
self.emit('error', err)
if (!self.emit('_flush', err) && err) {
// There was no _flush listener.
self.destroy(err)
}
else {
if (f) f()
self.emit('_flush')
}
}
}
WriteStream.prototype.toString = function () {
return 'LevelUP.WriteStream'
WriteStream.prototype._final = function (cb) {
var self = this
if (this._flushing) {
// Wait for scheduled or in-progress _flush()
this.once('_flush', function (err) {
if (err) return cb(err)
// There could be additional buffered writes
self._final(cb)
})
} else if (this._buffer && this._buffer.length) {
this.once('_flush', cb)
this._flush()
} else {
cb()
}
}
WriteStream.prototype.destroy = function () {
if (this._destroyed) return
WriteStream.prototype._destroy = function (err, cb) {
var self = this
this._buffer = null
this._destroyed = true
this.writable = false
this.emit('close')
}
cb(err)
WriteStream.prototype.destroySoon = function () {
this.end()
// TODO when the next readable-stream (mirroring node v10) is out:
// remove this. Since nodejs/node#19836, streams always emit close.
process.nextTick(function () {
self.emit('close')
})
}
module.exports = function (db) {
db.writeStream = db.createWriteStream = function (options) {
return new WriteStream(options, db)
}
return db
}
module.exports.WriteStream = WriteStream
module.exports = WriteStream
{
"name": "level-ws",
"description": "A basic WriteStream implementation for LevelUP",
"version": "0.1.0",
"contributors": [
"Rod Vagg <r@va.gg> (https://github.com/rvagg)",
"John Chesley <john@chesl.es> (https://github.com/chesles/)",
"Jake Verbaten <raynos2@gmail.com> (https://github.com/raynos)",
"Dominic Tarr <dominic.tarr@gmail.com> (https://github.com/dominictarr)",
"Max Ogden <max@maxogden.com> (https://github.com/maxogden)",
"Lars-Magnus Skog <ralphtheninja@riseup.net> (https://github.com/ralphtheninja)",
"David Björklund <david.bjorklund@gmail.com> (https://github.com/kesla)",
"Julian Gruber <julian@juliangruber.com> (https://github.com/juliangruber)",
"Paolo Fragomeni <paolo@async.ly> (https://github.com/hij1nx)",
"Anton Whalley <anton.whalley@nearform.com> (https://github.com/No9)",
"Matteo Collina <matteo.collina@gmail.com> (https://github.com/mcollina)",
"Pedro Teixeira <pedro.teixeira@gmail.com> (https://github.com/pgte)",
"James Halliday <mail@substack.net> (https://github.com/substack)"
],
"version": "1.0.0",
"repository": {

@@ -32,15 +17,24 @@ "type": "git",

"dependencies": {
"inherits": "^2.0.3",
"readable-stream": "^2.2.8",
"xtend": "^4.0.0"
"xtend": "^4.0.1"
},
"devDependencies": {
"tape": "*",
"level": "*",
"after": "*",
"rimraf": "*"
"coveralls": "~3.0.1",
"level": "^4.0.0",
"level-concat-iterator": "^2.0.0",
"nyc": "~12.0.2",
"secret-event-listener": "~1.0.0",
"standard": "^11.0.1",
"tape": "^4.9.0",
"tempy": "^0.2.1"
},
"scripts": {
"test": "node test.js"
"test": "standard && nyc node test.js",
"coverage": "nyc report --reporter=text-lcov | coveralls"
},
"license": "MIT"
"license": "MIT",
"engines": {
"node": ">=6"
}
}

@@ -1,41 +0,44 @@

level-ws
========
# level-ws
<img alt="LevelDB Logo" height="100" src="http://leveldb.org/img/logo.svg">
> A basic WriteStream implementation for [levelup](https://github.com/level/levelup)
**A basic WriteStream implementation for [LevelUP](https://github.com/rvagg/node-levelup)**
[![level badge][level-badge]](https://github.com/level/awesome)
[![npm](https://img.shields.io/npm/v/level-ws.svg)](https://www.npmjs.com/package/level-ws)
![Node version](https://img.shields.io/node/v/level-ws.svg)
[![Build Status](https://img.shields.io/travis/Level/level-ws.svg)](http://travis-ci.org/Level/level-ws)
[![dependencies](https://david-dm.org/Level/level-ws.svg)](https://david-dm.org/level/level-ws)
[![npm](https://img.shields.io/npm/dm/level-ws.svg)](https://www.npmjs.com/package/level-ws)
[![Coverage Status](https://coveralls.io/repos/github/Level/level-ws/badge.svg)](https://coveralls.io/github/Level/level-ws)
[![JavaScript Style Guide](https://img.shields.io/badge/code_style-standard-brightgreen.svg)](https://standardjs.com)
[![Build Status](https://secure.travis-ci.org/Level/level-ws.png)](http://travis-ci.org/Level/level-ws)
`level-ws` provides the most basic general-case WriteStream for `levelup`. It was extracted from the core `levelup` at version 0.18.0.
[![NPM](https://nodei.co/npm/level-ws.png?downloads)](https://nodei.co/npm/level-ws/)
`level-ws` is not a high-performance WriteStream. If your benchmarking shows that your particular usage pattern and data types do not perform well with this WriteStream then you should try one of the alternative WriteStreams available for `levelup` that are optimised for different use-cases.
**level-ws** provides the most basic general-case WriteStream for LevelUP. It was extracted from the core LevelUP at version 0.18.0 but is bundled with [level](https://github.com/Level/level) and similar packages as it provides a general symmetry to the ReadStream in LevelUP.
**If you are upgrading:** please see [`UPGRADING.md`](UPGRADING.md).
**level-ws** is not a high-performance WriteStream, if your benchmarking shows that your particular usage pattern and data types do not perform well with this WriteStream then you should try one of the alternative WriteStreams available for LevelUP that are optimised for different use-cases.
## Alternative WriteStream packages
***TODO***
## Usage
To use **level-ws** you simply need to wrap a LevelUP instance and you get a `createWriteStream()` method on it.
```js
var level = require('level')
var levelws = require('level-ws')
var WriteStream = require('level-ws')
var db = level('/path/to/db')
db = levelws(db)
db.createWriteStream() // ...
var ws = WriteStream(db) // ...
```
### db.createWriteStream([options])
## API
A **WriteStream** can be obtained by calling the `createWriteStream()` method. The resulting stream is a Node.js **streams2** [Writable](http://nodejs.org/docs/latest/api/stream.html#stream_class_stream_writable_1) which operates in **objectMode**, accepting objects with `'key'` and `'value'` pairs on its `write()` method.
### `ws = WriteStream(db[, options])`
Creates a [Writable](https://nodejs.org/dist/latest-v8.x/docs/api/stream.html#stream_class_stream_writable) stream which operates in **objectMode**, accepting objects with `'key'` and `'value'` pairs on its `write()` method.
The optional `options` argument may contain:
* `type` *(string, default: `'put'`)*: Default batch operation for missing `type` property during `ws.write()`.
The WriteStream will buffer writes and submit them as a `batch()` operations where writes occur *within the same tick*.
```js
var ws = db.createWriteStream()
var ws = WriteStream(db)

@@ -56,25 +59,19 @@ ws.on('error', function (err) {

The standard `write()`, `end()`, `destroy()` and `destroySoon()` methods are implemented on the WriteStream. `'drain'`, `'error'`, `'close'` and `'pipe'` events are emitted.
The standard `write()`, `end()` and `destroy()` methods are implemented on the WriteStream. `'drain'`, `'error'`, `'close'` and `'pipe'` events are emitted.
You can specify encodings both for the whole stream and individual entries:
You can specify encodings for individual entries by setting `.keyEncoding` and/or `.valueEncoding`:
To set the encoding for the whole stream, provide an options object as the first parameter to `createWriteStream()` with `'keyEncoding'` and/or `'valueEncoding'`.
To set the encoding for an individual entry:
```js
writeStream.write({
key : new Buffer([1, 2, 3])
, value : { some: 'json' }
, keyEncoding : 'binary'
, valueEncoding : 'json'
key: new Buffer([1, 2, 3]),
value: { some: 'json' },
keyEncoding: 'binary',
valueEncoding : 'json'
})
```
#### write({ type: 'put' })
If individual `write()` operations are performed with a `'type'` property of `'del'`, they will be passed on as `'del'` operations to the batch.
```js
var ws = db.createWriteStream()
var ws = WriteStream(db)

@@ -95,8 +92,6 @@ ws.on('error', function (err) {

#### db.createWriteStream({ type: 'del' })
If the *WriteStream* is created with a `'type'` option of `'del'`, all `write()` operations will be interpreted as `'del'`, unless explicitly specified as `'put'`.
```js
var ws = db.createWriteStream({ type: 'del' })
var ws = WriteStream(db, { type: 'del' })

@@ -118,29 +113,6 @@ ws.on('error', function (err) {

## License
### Contributors
[MIT](./LICENSE.md) © 2012-present `level-ws` [Contributors](./CONTRIBUTORS.md).
**level-ws** is only possible due to the excellent work of the following contributors:
<table><tbody>
<tr><th align="left">Rod Vagg</th><td><a href="https://github.com/rvagg">GitHub/rvagg</a></td><td><a href="http://twitter.com/rvagg">Twitter/@rvagg</a></td></tr>
<tr><th align="left">John Chesley</th><td><a href="https://github.com/chesles/">GitHub/chesles</a></td><td><a href="http://twitter.com/chesles">Twitter/@chesles</a></td></tr>
<tr><th align="left">Jake Verbaten</th><td><a href="https://github.com/raynos">GitHub/raynos</a></td><td><a href="http://twitter.com/raynos2">Twitter/@raynos2</a></td></tr>
<tr><th align="left">Dominic Tarr</th><td><a href="https://github.com/dominictarr">GitHub/dominictarr</a></td><td><a href="http://twitter.com/dominictarr">Twitter/@dominictarr</a></td></tr>
<tr><th align="left">Max Ogden</th><td><a href="https://github.com/maxogden">GitHub/maxogden</a></td><td><a href="http://twitter.com/maxogden">Twitter/@maxogden</a></td></tr>
<tr><th align="left">Lars-Magnus Skog</th><td><a href="https://github.com/ralphtheninja">GitHub/ralphtheninja</a></td><td><a href="http://twitter.com/ralphtheninja">Twitter/@ralphtheninja</a></td></tr>
<tr><th align="left">David Björklund</th><td><a href="https://github.com/kesla">GitHub/kesla</a></td><td><a href="http://twitter.com/david_bjorklund">Twitter/@david_bjorklund</a></td></tr>
<tr><th align="left">Julian Gruber</th><td><a href="https://github.com/juliangruber">GitHub/juliangruber</a></td><td><a href="http://twitter.com/juliangruber">Twitter/@juliangruber</a></td></tr>
<tr><th align="left">Paolo Fragomeni</th><td><a href="https://github.com/hij1nx">GitHub/hij1nx</a></td><td><a href="http://twitter.com/hij1nx">Twitter/@hij1nx</a></td></tr>
<tr><th align="left">Anton Whalley</th><td><a href="https://github.com/No9">GitHub/No9</a></td><td><a href="https://twitter.com/antonwhalley">Twitter/@antonwhalley</a></td></tr>
<tr><th align="left">Matteo Collina</th><td><a href="https://github.com/mcollina">GitHub/mcollina</a></td><td><a href="https://twitter.com/matteocollina">Twitter/@matteocollina</a></td></tr>
<tr><th align="left">Pedro Teixeira</th><td><a href="https://github.com/pgte">GitHub/pgte</a></td><td><a href="https://twitter.com/pgte">Twitter/@pgte</a></td></tr>
<tr><th align="left">James Halliday</th><td><a href="https://github.com/substack">GitHub/substack</a></td><td><a href="https://twitter.com/substack">Twitter/@substack</a></td></tr>
</tbody></table>
<a name="licence"></a>
Licence &amp; copyright
-------------------
Copyright (c) 2012-2015 **level-ws** contributors (listed above).
**level-ws** is licensed under an MIT +no-false-attribs license. All rights not explicitly granted in the MIT license are reserved. See the included LICENSE file for more details.
[level-badge]: http://leveldb.org/img/badge.svg

@@ -1,283 +0,266 @@

/* Copyright (c) 2012-2013 LevelUP contributors
* See list at <https://github.com/rvagg/node-levelup#contributing>
* MIT +no-false-attribs License <https://github.com/rvagg/node-levelup/blob/master/LICENSE>
*/
var tape = require('tape')
var level = require('level')
var WriteStream = require('.')
var concat = require('level-concat-iterator')
var secretListener = require('secret-event-listener')
var tempy = require('tempy')
var after = require('after')
, tape = require('tape')
, path = require('path')
, fs = require('fs')
, level = require('level')
, rimraf = require('rimraf')
, ws = require('./')
function monitor (stream) {
var order = []
function cleanup (callback) {
fs.readdir(__dirname, function (err, list) {
if (err) return callback(err)
list = list.filter(function (f) {
return (/^_level-ws_test_db\./).test(f)
;['error', 'finish', 'close'].forEach(function (event) {
secretListener(stream, event, function () {
order.push(event)
})
})
if (!list.length)
return callback()
return order
}
var ret = 0
function monkeyBatch (db, fn) {
var down = db.db
var original = down._batch.bind(down)
down._batch = fn.bind(down, original)
}
list.forEach(function (f) {
rimraf(path.join(__dirname, f), function () {
if (++ret == list.length)
callback()
})
})
function slowdown (db) {
monkeyBatch(db, function (original, ops, options, cb) {
setTimeout(function () {
original(ops, options, cb)
}, 500)
})
}
function openTestDatabase (t, options, callback) {
var location = path.join(__dirname, '_level-ws_test_db.' + Math.random())
if (typeof options == 'function') {
callback = options
options = { createIfMissing: true, errorIfExists: true }
function test (label, options, fn) {
if (typeof options === 'function') {
fn = options
options = {}
}
rimraf(location, function (err) {
t.notOk(err, 'no error')
level(location, options, function (err, db) {
t.notOk(err, 'no error')
if (!err) {
this.db = ws(db) // invoke ws!
callback(this.db)
}
}.bind(this))
}.bind(this))
}
options.createIfMissing = true
options.errorIfExists = true
function setUp (t) {
this.openTestDatabase = openTestDatabase.bind(this, t)
tape(label, function (t) {
var ctx = {}
this.timeout = 1000
var sourceData = ctx.sourceData = []
for (var i = 0; i < 2; i++) {
ctx.sourceData.push({ key: String(i), value: 'value' })
}
this.sourceData = []
ctx.verify = function (ws, done, data) {
concat(ctx.db.iterator(), function (err, result) {
t.error(err, 'no error')
t.same(result, data || sourceData, 'correct data')
done()
})
}
for (var i = 0; i < 10; i++) {
this.sourceData.push({
type : 'put'
, key : i
, value : Math.random()
})
}
this.verify = function (ws, db, done, data) {
if (!data) data = this.sourceData // can pass alternative data array for verification
t.ok(ws.writable === false, 'not writable')
t.ok(ws.readable === false, 'not readable')
var _done = after(data.length, done)
data.forEach(function (data) {
db.get(data.key, function (err, value) {
t.notOk(err, 'no error')
if (typeof value == 'object')
t.deepEqual(value, data.value, 'WriteStream data #' + data.key + ' has correct value')
else
t.equal(+value, +data.value, 'WriteStream data #' + data.key + ' has correct value')
_done()
level(tempy.directory(), options, function (err, db) {
t.notOk(err, 'no error')
ctx.db = db
fn(t, ctx, function () {
ctx.db.close(function (err) {
t.notOk(err, 'no error')
t.end()
})
})
})
}
})
}
// TODO: test various encodings
function test (label, fn) {
tape(label, function (t) {
var ctx = {}
setUp.call(ctx, t)
fn.call(ctx, t, function () {
var _cleanup = cleanup.bind(ctx, t.end.bind(t))
if (ctx.db)
return ctx.db.close(_cleanup)
_cleanup()
})
test('test simple WriteStream', function (t, ctx, done) {
var ws = WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
}
//TODO: test various encodings
test('test simple WriteStream', function (t, done) {
this.openTestDatabase(function (db) {
var ws = db.createWriteStream()
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', this.verify.bind(this, ws, db, done))
this.sourceData.forEach(function (d) {
ws.write(d)
})
ws.end()
}.bind(this))
ws.on('close', ctx.verify.bind(ctx, ws, done))
ctx.sourceData.forEach(function (d) {
ws.write(d)
})
ws.end()
})
test('test WriteStream with async writes', function (t, done) {
this.openTestDatabase(function (db) {
var ws = db.createWriteStream()
, sourceData = this.sourceData
, i = -1
test('test WriteStream with async writes', function (t, ctx, done) {
var ws = WriteStream(ctx.db)
var sourceData = ctx.sourceData
var i = -1
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', this.verify.bind(this, ws, db, done))
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', ctx.verify.bind(ctx, ws, done))
function write () {
if (++i >= sourceData.length)
return ws.end()
function write () {
if (++i >= sourceData.length) return ws.end()
var d = sourceData[i]
// some should batch() and some should put()
if (d.key % 3) {
setTimeout(function () {
ws.write(d)
process.nextTick(write)
}, 10)
} else {
var d = sourceData[i]
// some should batch() and some should put()
if (d.key % 3) {
setTimeout(function () {
ws.write(d)
process.nextTick(write)
}
}, 10)
} else {
ws.write(d)
process.nextTick(write)
}
}
write()
}.bind(this))
write()
})
test('test end accepts data', function (t, done) {
this.openTestDatabase(function (db) {
var ws = db.createWriteStream()
, i = 0
test('race condition between batch callback and close event', function (t, ctx, done) {
// Delaying the batch should not be a problem
slowdown(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', this.verify.bind(this, ws, db, done))
this.sourceData.forEach(function (d) {
i++
if (i < this.sourceData.length) {
ws.write(d)
} else {
ws.end(d)
}
}.bind(this))
}.bind(this))
var ws = WriteStream(ctx.db)
var i = 0
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', ctx.verify.bind(ctx, ws, done))
ctx.sourceData.forEach(function (d) {
i++
if (i < ctx.sourceData.length) {
ws.write(d)
} else {
ws.end(d)
}
})
})
// at the moment, destroySoon() is basically just end()
test('test destroySoon()', function (t, done) {
this.openTestDatabase(function (db) {
var ws = db.createWriteStream()
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', this.verify.bind(this, ws, db, done))
this.sourceData.forEach(function (d) {
test('race condition between two flushes', function (t, ctx, done) {
slowdown(ctx.db)
var ws = WriteStream(ctx.db)
var order = monitor(ws)
ws.on('close', function () {
t.same(order, ['batch', 'batch', 'close'])
ctx.verify(ws, done, [
{ key: 'a', value: 'a' },
{ key: 'b', value: 'b' }
])
})
ctx.db.on('batch', function () {
order.push('batch')
})
ws.write({ key: 'a', value: 'a' })
// Schedule another flush while the first is in progress
ctx.db.once('batch', function (ops) {
ws.end({ key: 'b', value: 'b' })
})
})
test('test end accepts data', function (t, ctx, done) {
var ws = WriteStream(ctx.db)
var i = 0
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', ctx.verify.bind(ctx, ws, done))
ctx.sourceData.forEach(function (d) {
i++
if (i < ctx.sourceData.length) {
ws.write(d)
})
ws.destroySoon()
}.bind(this))
} else {
ws.end(d)
}
})
})
test('test destroy()', function (t, done) {
var verify = function (ws, db) {
t.ok(ws.writable === false, 'not writable')
var _done = after(this.sourceData.length, done)
this.sourceData.forEach(function (data) {
db.get(data.key, function (err, value) {
// none of them should exist
t.ok(err, 'got expected error')
t.notOk(value, 'did not get value')
_done()
})
test('test destroy()', function (t, ctx, done) {
var ws = WriteStream(ctx.db)
var verify = function () {
concat(ctx.db.iterator(), function (err, result) {
t.error(err, 'no error')
t.same(result, [], 'results should be empty')
done()
})
}
this.openTestDatabase(function (db) {
var ws = db.createWriteStream()
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
t.ok(ws.writable === true, 'is writable')
t.ok(ws.readable === false, 'not readable')
ws.on('close', verify.bind(this, ws, db))
this.sourceData.forEach(function (d) {
ws.write(d)
t.ok(ws.writable === true, 'is writable')
t.ok(ws.readable === false, 'not readable')
})
t.ok(ws.writable === true, 'is writable')
t.ok(ws.readable === false, 'not readable')
ws.destroy()
}.bind(this))
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', verify.bind(null))
ctx.sourceData.forEach(function (d) {
ws.write(d)
})
ws.destroy()
})
test('test json encoding', function (t, done) {
var options = { createIfMissing: true, errorIfExists: true, keyEncoding: 'utf8', valueEncoding: 'json' }
, data = [
{ type: 'put', key: 'aa', value: { a: 'complex', obj: 100 } }
, { type: 'put', key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { type: 'put', key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
, { type: 'put', key: 'ba', value: { a: 'complex', obj: 100 } }
, { type: 'put', key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { type: 'put', key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
, { type: 'put', key: 'ca', value: { a: 'complex', obj: 100 } }
, { type: 'put', key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { type: 'put', key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
]
test('test destroy(err)', function (t, ctx, done) {
var ws = WriteStream(ctx.db)
var order = monitor(ws)
this.openTestDatabase(options, function (db) {
var ws = db.createWriteStream()
ws.on('error', function (err) {
t.notOk(err, 'no error')
ws.on('error', function (err) {
t.is(err.message, 'user error', 'got error')
})
ws.on('close', function () {
t.same(order, ['error', 'close'])
concat(ctx.db.iterator(), function (err, result) {
t.error(err, 'no error')
t.same(result, [], 'results should be empty')
done()
})
ws.on('close', this.verify.bind(this, ws, db, done, data))
data.forEach(function (d) {
ws.write(d)
})
ws.end()
}.bind(this))
})
ctx.sourceData.forEach(function (d) {
ws.write(d)
})
ws.destroy(new Error('user error'))
})
test('test del capabilities for each key/value', function (t, done) {
var options = { createIfMissing: true, errorIfExists: true, keyEncoding: 'utf8', valueEncoding: 'json' }
, data = [
{ type: 'put', key: 'aa', value: { a: 'complex', obj: 100 } }
, { type: 'put', key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { type: 'put', key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
, { type: 'put', key: 'ba', value: { a: 'complex', obj: 100 } }
, { type: 'put', key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { type: 'put', key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
, { type: 'put', key: 'ca', value: { a: 'complex', obj: 100 } }
, { type: 'put', key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { type: 'put', key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
]
, self = this
test('test json encoding', { keyEncoding: 'utf8', valueEncoding: 'json' }, function (t, ctx, done) {
var data = [
{ key: 'aa', value: { a: 'complex', obj: 100 } },
{ key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } },
{ key: 'ba', value: { a: 'complex', obj: 100 } },
{ key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } },
{ key: 'ca', value: { a: 'complex', obj: 100 } },
{ key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
]
function open () {
self.openTestDatabase(options, function (db) {
write(db);
});
}
var ws = WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', ctx.verify.bind(ctx, ws, done, data))
data.forEach(function (d) {
ws.write(d)
})
ws.end()
})
function write (db) {
var ws = db.createWriteStream()
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', function () {
del(db);
})
data.forEach(function (d) {
ws.write(d)
})
test('test del capabilities for each key/value', { keyEncoding: 'utf8', valueEncoding: 'json' }, function (t, ctx, done) {
var data = [
{ key: 'aa', value: { a: 'complex', obj: 100 } },
{ key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } },
{ key: 'ba', value: { a: 'complex', obj: 100 } },
{ key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } },
{ key: 'ca', value: { a: 'complex', obj: 100 } },
{ key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
]
ws.end()
}
function del (db) {
var delStream = db.createWriteStream()
function del () {
var delStream = WriteStream(ctx.db)
delStream.on('error', function (err) {

@@ -287,3 +270,3 @@ t.notOk(err, 'no error')

delStream.on('close', function () {
verify(db);
verify()
})

@@ -298,56 +281,38 @@ data.forEach(function (d) {

function verify (db) {
var _done = after(data.length, done)
data.forEach(function (data) {
db.get(data.key, function (err, value) {
// none of them should exist
t.ok(err, 'got expected error')
t.notOk(value, 'did not get value')
_done()
})
function verify () {
concat(ctx.db.iterator(), function (err, result) {
t.error(err, 'no error')
t.same(result, [], 'results should be empty')
done()
})
}
open()
var ws = WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', function () {
del()
})
data.forEach(function (d) {
ws.write(d)
})
ws.end()
})
test('test del capabilities as constructor option', function (t, done) {
test('test del capabilities as constructor option', { keyEncoding: 'utf8', valueEncoding: 'json' }, function (t, ctx, done) {
var data = [
{ key: 'aa', value: { a: 'complex', obj: 100 } },
{ key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } },
{ key: 'ba', value: { a: 'complex', obj: 100 } },
{ key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } },
{ key: 'ca', value: { a: 'complex', obj: 100 } },
{ key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
]
var options = { createIfMissing: true, errorIfExists: true, keyEncoding: 'utf8', valueEncoding: 'json' }
, data = [
{ key: 'aa', value: { a: 'complex', obj: 100 } }
, { key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
, { key: 'ba', value: { a: 'complex', obj: 100 } }
, { key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
, { key: 'ca', value: { a: 'complex', obj: 100 } }
, { key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
]
, self = this
function open () {
self.openTestDatabase(options, function (db) {
write(db);
});
}
function write (db) {
var ws = db.createWriteStream()
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', function () {
del(db);
})
data.forEach(function (d) {
ws.write(d)
})
ws.end()
}
function del (db) {
var delStream = db.createWriteStream({ type: 'del' })
function del () {
var delStream = WriteStream(ctx.db, { type: 'del' })
delStream.on('error', function (err) {

@@ -357,3 +322,3 @@ t.notOk(err, 'no error')

delStream.on('close', function () {
verify(db);
verify()
})

@@ -367,58 +332,41 @@ data.forEach(function (d) {

function verify (db) {
var _done = after(data.length, done)
data.forEach(function (data) {
db.get(data.key, function (err, value) {
// none of them should exist
t.ok(err, 'got expected error')
t.notOk(value, 'did not get value')
_done()
})
function verify () {
concat(ctx.db.iterator(), function (err, result) {
t.error(err, 'no error')
t.same(result, [], 'results should be empty')
done()
})
}
open()
var ws = WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', function () {
del()
})
data.forEach(function (d) {
ws.write(d)
})
ws.end()
})
test('test type at key/value level must take precedence on the constructor', function (t, done) {
var options = { createIfMissing: true, errorIfExists: true, keyEncoding: 'utf8', valueEncoding: 'json' }
, data = [
{ key: 'aa', value: { a: 'complex', obj: 100 } }
, { key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
, { key: 'ba', value: { a: 'complex', obj: 100 } }
, { key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
, { key: 'ca', value: { a: 'complex', obj: 100 } }
, { key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }
, { key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
]
, exception = data[0]
, self = this
test('test type at key/value level must take precedence on the constructor', { keyEncoding: 'utf8', valueEncoding: 'json' }, function (t, ctx, done) {
var data = [
{ key: 'aa', value: { a: 'complex', obj: 100 } },
{ key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } },
{ key: 'ba', value: { a: 'complex', obj: 100 } },
{ key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } },
{ key: 'ca', value: { a: 'complex', obj: 100 } },
{ key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } },
{ key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }
]
var exception = data[0]
exception['type'] = 'put'
function open () {
self.openTestDatabase(options, function (db) {
write(db);
});
}
function write (db) {
var ws = db.createWriteStream()
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', function () {
del(db);
})
data.forEach(function (d) {
ws.write(d)
})
ws.end()
}
function del (db) {
var delStream = db.createWriteStream({ type: 'del' })
function del () {
var delStream = WriteStream(ctx.db, { type: 'del' })
delStream.on('error', function (err) {

@@ -428,3 +376,3 @@ t.notOk(err, 'no error')

delStream.on('close', function () {
verify(db);
verify()
})

@@ -438,55 +386,167 @@ data.forEach(function (d) {

function verify (db) {
var _done = after(data.length, done)
data.forEach(function (data) {
db.get(data.key, function (err, value) {
if (data.type === 'put') {
t.ok(value, 'got value')
_done()
} else {
t.ok(err, 'got expected error')
t.notOk(value, 'did not get value')
_done()
}
})
function verify () {
concat(ctx.db.iterator(), function (err, result) {
t.error(err, 'no error')
var expected = [ { key: data[0].key, value: data[0].value } ]
t.same(result, expected, 'only one element')
done()
})
}
open()
var ws = WriteStream(ctx.db)
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
ws.on('close', function () {
del()
})
data.forEach(function (d) {
ws.write(d)
})
ws.end()
})
test('test ignoring pairs with the wrong type', function (t, done) {
var self = this
test('test that missing type errors', function (t, ctx, done) {
var data = { key: 314, type: 'foo' }
var errored = false
function open () {
self.openTestDatabase(write)
function verify () {
ctx.db.get(data.key, function (err, value) {
t.equal(errored, true, 'error received in stream')
t.ok(err, 'got expected error')
t.equal(err.notFound, true, 'not found error')
t.notOk(value, 'did not get value')
done()
})
}
function write (db) {
var ws = db.createWriteStream()
ws.on('error', function (err) {
t.notOk(err, 'no error')
})
var ws = WriteStream(ctx.db)
ws.on('error', function (err) {
t.equal(err.message, '`type` must be \'put\' or \'del\'', 'should error')
errored = true
})
ws.on('close', function () {
verify()
})
ws.write(data)
ws.end()
})
test('test limbo batch error', function (t, ctx, done) {
var ws = WriteStream(ctx.db)
var order = monitor(ws)
monkeyBatch(ctx.db, function (original, ops, options, cb) {
process.nextTick(cb, new Error('batch error'))
})
ws.on('error', function (err) {
t.is(err.message, 'batch error')
})
ws.on('close', function () {
t.same(order, ['error', 'close'])
t.end()
})
// Don't end(), because we want the error to follow a
// specific code path (when there is no _flush listener).
ws.write({ key: 'a', value: 'a' })
})
test('test batch error when buffer is full', function (t, ctx, done) {
var ws = WriteStream(ctx.db, { maxBufferLength: 1 })
var order = monitor(ws)
monkeyBatch(ctx.db, function (original, ops, options, cb) {
process.nextTick(cb, new Error('batch error'))
})
ws.on('error', function (err) {
t.is(err.message, 'batch error', 'got error')
})
ws.on('close', function () {
t.same(order, ['error', 'close'])
t.end()
})
// Don't end(), because we want the error to follow a
// specific code path (when we're waiting to drain).
ws.write({ key: 'a', value: 'a' })
ws.write({ key: 'b', value: 'b' })
})
test('test destroy while waiting to drain', function (t, ctx, done) {
var ws = WriteStream(ctx.db, { maxBufferLength: 1 })
var order = monitor(ws)
ws.on('error', function (err) {
t.is(err.message, 'user error', 'got error')
})
ws.on('close', function () {
t.same(order, ['error', 'close'])
t.end()
})
ws.prependListener('_flush', function (err) {
t.ifError(err, 'no _flush error')
ws.destroy(new Error('user error'))
})
// Don't end.
ws.write({ key: 'a', value: 'a' })
ws.write({ key: 'b', value: 'b' })
})
;[0, 1, 2, 10, 20, 100].forEach(function (max) {
test('test maxBufferLength: ' + max, testMaxBuffer(max, false))
test('test maxBufferLength: ' + max + ' (random)', testMaxBuffer(max, true))
})
function testMaxBuffer (max, randomize) {
return function (t, ctx, done) {
var ws = WriteStream(ctx.db, { maxBufferLength: max })
var sourceData = []
var batches = []
for (var i = 0; i < 20; i++) {
sourceData.push({ key: i < 10 ? '0' + i : String(i), value: 'value' })
}
var expectedSize = max || sourceData.length
var remaining = sourceData.slice()
ws.on('close', function () {
verify(db)
t.ok(batches.every(function (size, index) {
// Last batch may contain additional items
return size <= expectedSize || index === batches.length - 1
}), 'batch sizes are <= max')
ctx.verify(ws, done, sourceData)
})
self.sourceData.forEach(function (d) {
d.type = 'x' + Math.random()
ws.write(d)
ctx.db.on('batch', function (ops) {
batches.push(ops.length)
})
ws.end()
}
function verify (db) {
var _done = after(self.sourceData.length, done)
self.sourceData.forEach(function (data) {
db.get(data.key, function (err, value) {
t.ok(err, 'got expected error')
t.notOk(value, 'did not get value')
_done()
loop()
function loop () {
var toWrite = randomize
? Math.floor(Math.random() * remaining.length + 1)
: remaining.length
remaining.splice(0, toWrite).forEach(function (d) {
ws.write(d)
})
})
if (remaining.length) {
setImmediate(loop)
} else {
ws.end()
}
}
}
open()
})
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc