level-ws
Advanced tools
Comparing version 0.1.0 to 1.0.0
187
level-ws.js
@@ -1,69 +0,28 @@ | ||
/* Copyright (c) 2013 LevelUP contributors | ||
* See list at <https://github.com/rvagg/node-levelup#contributing> | ||
* MIT +no-false-attribs License | ||
* <https://github.com/Level/level-ws/master/LICENSE> | ||
*/ | ||
var Writable = require('readable-stream').Writable | ||
var inherits = require('inherits') | ||
var extend = require('xtend') | ||
var Writable = require('stream').Writable || require('readable-stream').Writable | ||
, inherits = require('util').inherits | ||
, extend = require('xtend') | ||
var defaultOptions = { type: 'put' } | ||
, defaultOptions = { | ||
type : 'put' | ||
, keyEncoding : 'utf8' | ||
, valueEncoding : 'utf8' | ||
} | ||
function WriteStream (db, options) { | ||
if (!(this instanceof WriteStream)) { | ||
return new WriteStream(db, options) | ||
} | ||
// copied from LevelUP | ||
, encodingNames = [ | ||
'hex' | ||
, 'utf8' | ||
, 'utf-8' | ||
, 'ascii' | ||
, 'binary' | ||
, 'base64' | ||
, 'ucs2' | ||
, 'ucs-2' | ||
, 'utf16le' | ||
, 'utf-16le' | ||
] | ||
options = extend(defaultOptions, options) | ||
// copied from LevelUP | ||
, encodingOpts = (function () { | ||
var eo = {} | ||
encodingNames.forEach(function (e) { | ||
eo[e] = { valueEncoding : e } | ||
}) | ||
return eo | ||
}()) | ||
Writable.call(this, { | ||
objectMode: true, | ||
highWaterMark: options.highWaterMark || 16 | ||
}) | ||
// copied from LevelUP | ||
function getOptions (levelup, options) { | ||
var s = typeof options == 'string' // just an encoding | ||
if (!s && options && options.encoding && !options.valueEncoding) | ||
options.valueEncoding = options.encoding | ||
return extend( | ||
(levelup && levelup.options) || {} | ||
, s ? encodingOpts[options] || encodingOpts[defaultOptions.valueEncoding] | ||
: options | ||
) | ||
} | ||
function WriteStream (options, db) { | ||
if (!(this instanceof WriteStream)) | ||
return new WriteStream(options, db) | ||
Writable.call(this, { objectMode: true }) | ||
this._options = extend(defaultOptions, getOptions(db, options)) | ||
this._db = db | ||
this._options = options | ||
this._db = db | ||
this._buffer = [] | ||
this.writable = true | ||
this.readable = false | ||
this._flushing = false | ||
this._maxBufferLength = options.maxBufferLength || Infinity | ||
var self = this | ||
this.on('finish', function f () { | ||
if (self._buffer && self._buffer.length) { | ||
return self._flush(f) | ||
} | ||
self.writable = false | ||
this.on('finish', function () { | ||
self.emit('close') | ||
@@ -75,19 +34,18 @@ }) | ||
WriteStream.prototype._write = function write (d, enc, next) { | ||
WriteStream.prototype._write = function (data, enc, next) { | ||
var self = this | ||
if (self._destroyed) | ||
return | ||
if (!self._db.isOpen()) | ||
return self._db.once('ready', function () { | ||
write.call(self, d, enc, next) | ||
}) | ||
if (self.destroyed) return | ||
if (self._options.maxBufferLength && | ||
self._buffer.length > self._options.maxBufferLength) { | ||
self.once('_flush', next) | ||
if (!self._flushing) { | ||
self._flushing = true | ||
process.nextTick(function () { self._flush() }) | ||
} | ||
else { | ||
if (self._buffer.length === 0) | ||
process.nextTick(function () { self._flush() }) | ||
self._buffer.push(d) | ||
if (self._buffer.length >= self._maxBufferLength) { | ||
self.once('_flush', function (err) { | ||
if (err) return self.destroy(err) | ||
self._write(data, enc, next) | ||
}) | ||
} else { | ||
self._buffer.push(extend({ type: self._options.type }, data)) | ||
next() | ||
@@ -97,60 +55,53 @@ } | ||
WriteStream.prototype._flush = function (f) { | ||
WriteStream.prototype._flush = function () { | ||
var self = this | ||
, buffer = self._buffer | ||
var buffer = self._buffer | ||
if (self._destroyed || !buffer) return | ||
if (!self._db.isOpen()) { | ||
return self._db.on('ready', function () { self._flush(f) }) | ||
} | ||
if (self.destroyed) return | ||
self._buffer = [] | ||
self._db.batch(buffer, cb) | ||
self._db.batch(buffer.map(function (d) { | ||
return { | ||
type : d.type || self._options.type | ||
, key : d.key | ||
, value : d.value | ||
, keyEncoding : d.keyEncoding || self._options.keyEncoding | ||
, valueEncoding : d.valueEncoding | ||
|| d.encoding | ||
|| self._options.valueEncoding | ||
} | ||
}), cb) | ||
function cb (err) { | ||
self._flushing = false | ||
function cb (err) { | ||
if (err) { | ||
self.writable = false | ||
self.emit('error', err) | ||
if (!self.emit('_flush', err) && err) { | ||
// There was no _flush listener. | ||
self.destroy(err) | ||
} | ||
else { | ||
if (f) f() | ||
self.emit('_flush') | ||
} | ||
} | ||
} | ||
WriteStream.prototype.toString = function () { | ||
return 'LevelUP.WriteStream' | ||
WriteStream.prototype._final = function (cb) { | ||
var self = this | ||
if (this._flushing) { | ||
// Wait for scheduled or in-progress _flush() | ||
this.once('_flush', function (err) { | ||
if (err) return cb(err) | ||
// There could be additional buffered writes | ||
self._final(cb) | ||
}) | ||
} else if (this._buffer && this._buffer.length) { | ||
this.once('_flush', cb) | ||
this._flush() | ||
} else { | ||
cb() | ||
} | ||
} | ||
WriteStream.prototype.destroy = function () { | ||
if (this._destroyed) return | ||
WriteStream.prototype._destroy = function (err, cb) { | ||
var self = this | ||
this._buffer = null | ||
this._destroyed = true | ||
this.writable = false | ||
this.emit('close') | ||
} | ||
cb(err) | ||
WriteStream.prototype.destroySoon = function () { | ||
this.end() | ||
// TODO when the next readable-stream (mirroring node v10) is out: | ||
// remove this. Since nodejs/node#19836, streams always emit close. | ||
process.nextTick(function () { | ||
self.emit('close') | ||
}) | ||
} | ||
module.exports = function (db) { | ||
db.writeStream = db.createWriteStream = function (options) { | ||
return new WriteStream(options, db) | ||
} | ||
return db | ||
} | ||
module.exports.WriteStream = WriteStream | ||
module.exports = WriteStream |
{ | ||
"name": "level-ws", | ||
"description": "A basic WriteStream implementation for LevelUP", | ||
"version": "0.1.0", | ||
"contributors": [ | ||
"Rod Vagg <r@va.gg> (https://github.com/rvagg)", | ||
"John Chesley <john@chesl.es> (https://github.com/chesles/)", | ||
"Jake Verbaten <raynos2@gmail.com> (https://github.com/raynos)", | ||
"Dominic Tarr <dominic.tarr@gmail.com> (https://github.com/dominictarr)", | ||
"Max Ogden <max@maxogden.com> (https://github.com/maxogden)", | ||
"Lars-Magnus Skog <ralphtheninja@riseup.net> (https://github.com/ralphtheninja)", | ||
"David Björklund <david.bjorklund@gmail.com> (https://github.com/kesla)", | ||
"Julian Gruber <julian@juliangruber.com> (https://github.com/juliangruber)", | ||
"Paolo Fragomeni <paolo@async.ly> (https://github.com/hij1nx)", | ||
"Anton Whalley <anton.whalley@nearform.com> (https://github.com/No9)", | ||
"Matteo Collina <matteo.collina@gmail.com> (https://github.com/mcollina)", | ||
"Pedro Teixeira <pedro.teixeira@gmail.com> (https://github.com/pgte)", | ||
"James Halliday <mail@substack.net> (https://github.com/substack)" | ||
], | ||
"version": "1.0.0", | ||
"repository": { | ||
@@ -32,15 +17,24 @@ "type": "git", | ||
"dependencies": { | ||
"inherits": "^2.0.3", | ||
"readable-stream": "^2.2.8", | ||
"xtend": "^4.0.0" | ||
"xtend": "^4.0.1" | ||
}, | ||
"devDependencies": { | ||
"tape": "*", | ||
"level": "*", | ||
"after": "*", | ||
"rimraf": "*" | ||
"coveralls": "~3.0.1", | ||
"level": "^4.0.0", | ||
"level-concat-iterator": "^2.0.0", | ||
"nyc": "~12.0.2", | ||
"secret-event-listener": "~1.0.0", | ||
"standard": "^11.0.1", | ||
"tape": "^4.9.0", | ||
"tempy": "^0.2.1" | ||
}, | ||
"scripts": { | ||
"test": "node test.js" | ||
"test": "standard && nyc node test.js", | ||
"coverage": "nyc report --reporter=text-lcov | coveralls" | ||
}, | ||
"license": "MIT" | ||
"license": "MIT", | ||
"engines": { | ||
"node": ">=6" | ||
} | ||
} |
100
README.md
@@ -1,41 +0,44 @@ | ||
level-ws | ||
======== | ||
# level-ws | ||
<img alt="LevelDB Logo" height="100" src="http://leveldb.org/img/logo.svg"> | ||
> A basic WriteStream implementation for [levelup](https://github.com/level/levelup) | ||
**A basic WriteStream implementation for [LevelUP](https://github.com/rvagg/node-levelup)** | ||
[![level badge][level-badge]](https://github.com/level/awesome) | ||
[![npm](https://img.shields.io/npm/v/level-ws.svg)](https://www.npmjs.com/package/level-ws) | ||
![Node version](https://img.shields.io/node/v/level-ws.svg) | ||
[![Build Status](https://img.shields.io/travis/Level/level-ws.svg)](http://travis-ci.org/Level/level-ws) | ||
[![dependencies](https://david-dm.org/Level/level-ws.svg)](https://david-dm.org/level/level-ws) | ||
[![npm](https://img.shields.io/npm/dm/level-ws.svg)](https://www.npmjs.com/package/level-ws) | ||
[![Coverage Status](https://coveralls.io/repos/github/Level/level-ws/badge.svg)](https://coveralls.io/github/Level/level-ws) | ||
[![JavaScript Style Guide](https://img.shields.io/badge/code_style-standard-brightgreen.svg)](https://standardjs.com) | ||
[![Build Status](https://secure.travis-ci.org/Level/level-ws.png)](http://travis-ci.org/Level/level-ws) | ||
`level-ws` provides the most basic general-case WriteStream for `levelup`. It was extracted from the core `levelup` at version 0.18.0. | ||
[![NPM](https://nodei.co/npm/level-ws.png?downloads)](https://nodei.co/npm/level-ws/) | ||
`level-ws` is not a high-performance WriteStream. If your benchmarking shows that your particular usage pattern and data types do not perform well with this WriteStream then you should try one of the alternative WriteStreams available for `levelup` that are optimised for different use-cases. | ||
**level-ws** provides the most basic general-case WriteStream for LevelUP. It was extracted from the core LevelUP at version 0.18.0 but is bundled with [level](https://github.com/Level/level) and similar packages as it provides a general symmetry to the ReadStream in LevelUP. | ||
**If you are upgrading:** please see [`UPGRADING.md`](UPGRADING.md). | ||
**level-ws** is not a high-performance WriteStream, if your benchmarking shows that your particular usage pattern and data types do not perform well with this WriteStream then you should try one of the alternative WriteStreams available for LevelUP that are optimised for different use-cases. | ||
## Alternative WriteStream packages | ||
***TODO*** | ||
## Usage | ||
To use **level-ws** you simply need to wrap a LevelUP instance and you get a `createWriteStream()` method on it. | ||
```js | ||
var level = require('level') | ||
var levelws = require('level-ws') | ||
var WriteStream = require('level-ws') | ||
var db = level('/path/to/db') | ||
db = levelws(db) | ||
db.createWriteStream() // ... | ||
var ws = WriteStream(db) // ... | ||
``` | ||
### db.createWriteStream([options]) | ||
## API | ||
A **WriteStream** can be obtained by calling the `createWriteStream()` method. The resulting stream is a Node.js **streams2** [Writable](http://nodejs.org/docs/latest/api/stream.html#stream_class_stream_writable_1) which operates in **objectMode**, accepting objects with `'key'` and `'value'` pairs on its `write()` method. | ||
### `ws = WriteStream(db[, options])` | ||
Creates a [Writable](https://nodejs.org/dist/latest-v8.x/docs/api/stream.html#stream_class_stream_writable) stream which operates in **objectMode**, accepting objects with `'key'` and `'value'` pairs on its `write()` method. | ||
The optional `options` argument may contain: | ||
* `type` *(string, default: `'put'`)*: Default batch operation for missing `type` property during `ws.write()`. | ||
The WriteStream will buffer writes and submit them as a `batch()` operations where writes occur *within the same tick*. | ||
```js | ||
var ws = db.createWriteStream() | ||
var ws = WriteStream(db) | ||
@@ -56,25 +59,19 @@ ws.on('error', function (err) { | ||
The standard `write()`, `end()`, `destroy()` and `destroySoon()` methods are implemented on the WriteStream. `'drain'`, `'error'`, `'close'` and `'pipe'` events are emitted. | ||
The standard `write()`, `end()` and `destroy()` methods are implemented on the WriteStream. `'drain'`, `'error'`, `'close'` and `'pipe'` events are emitted. | ||
You can specify encodings both for the whole stream and individual entries: | ||
You can specify encodings for individual entries by setting `.keyEncoding` and/or `.valueEncoding`: | ||
To set the encoding for the whole stream, provide an options object as the first parameter to `createWriteStream()` with `'keyEncoding'` and/or `'valueEncoding'`. | ||
To set the encoding for an individual entry: | ||
```js | ||
writeStream.write({ | ||
key : new Buffer([1, 2, 3]) | ||
, value : { some: 'json' } | ||
, keyEncoding : 'binary' | ||
, valueEncoding : 'json' | ||
key: new Buffer([1, 2, 3]), | ||
value: { some: 'json' }, | ||
keyEncoding: 'binary', | ||
valueEncoding : 'json' | ||
}) | ||
``` | ||
#### write({ type: 'put' }) | ||
If individual `write()` operations are performed with a `'type'` property of `'del'`, they will be passed on as `'del'` operations to the batch. | ||
```js | ||
var ws = db.createWriteStream() | ||
var ws = WriteStream(db) | ||
@@ -95,8 +92,6 @@ ws.on('error', function (err) { | ||
#### db.createWriteStream({ type: 'del' }) | ||
If the *WriteStream* is created with a `'type'` option of `'del'`, all `write()` operations will be interpreted as `'del'`, unless explicitly specified as `'put'`. | ||
```js | ||
var ws = db.createWriteStream({ type: 'del' }) | ||
var ws = WriteStream(db, { type: 'del' }) | ||
@@ -118,29 +113,6 @@ ws.on('error', function (err) { | ||
## License | ||
### Contributors | ||
[MIT](./LICENSE.md) © 2012-present `level-ws` [Contributors](./CONTRIBUTORS.md). | ||
**level-ws** is only possible due to the excellent work of the following contributors: | ||
<table><tbody> | ||
<tr><th align="left">Rod Vagg</th><td><a href="https://github.com/rvagg">GitHub/rvagg</a></td><td><a href="http://twitter.com/rvagg">Twitter/@rvagg</a></td></tr> | ||
<tr><th align="left">John Chesley</th><td><a href="https://github.com/chesles/">GitHub/chesles</a></td><td><a href="http://twitter.com/chesles">Twitter/@chesles</a></td></tr> | ||
<tr><th align="left">Jake Verbaten</th><td><a href="https://github.com/raynos">GitHub/raynos</a></td><td><a href="http://twitter.com/raynos2">Twitter/@raynos2</a></td></tr> | ||
<tr><th align="left">Dominic Tarr</th><td><a href="https://github.com/dominictarr">GitHub/dominictarr</a></td><td><a href="http://twitter.com/dominictarr">Twitter/@dominictarr</a></td></tr> | ||
<tr><th align="left">Max Ogden</th><td><a href="https://github.com/maxogden">GitHub/maxogden</a></td><td><a href="http://twitter.com/maxogden">Twitter/@maxogden</a></td></tr> | ||
<tr><th align="left">Lars-Magnus Skog</th><td><a href="https://github.com/ralphtheninja">GitHub/ralphtheninja</a></td><td><a href="http://twitter.com/ralphtheninja">Twitter/@ralphtheninja</a></td></tr> | ||
<tr><th align="left">David Björklund</th><td><a href="https://github.com/kesla">GitHub/kesla</a></td><td><a href="http://twitter.com/david_bjorklund">Twitter/@david_bjorklund</a></td></tr> | ||
<tr><th align="left">Julian Gruber</th><td><a href="https://github.com/juliangruber">GitHub/juliangruber</a></td><td><a href="http://twitter.com/juliangruber">Twitter/@juliangruber</a></td></tr> | ||
<tr><th align="left">Paolo Fragomeni</th><td><a href="https://github.com/hij1nx">GitHub/hij1nx</a></td><td><a href="http://twitter.com/hij1nx">Twitter/@hij1nx</a></td></tr> | ||
<tr><th align="left">Anton Whalley</th><td><a href="https://github.com/No9">GitHub/No9</a></td><td><a href="https://twitter.com/antonwhalley">Twitter/@antonwhalley</a></td></tr> | ||
<tr><th align="left">Matteo Collina</th><td><a href="https://github.com/mcollina">GitHub/mcollina</a></td><td><a href="https://twitter.com/matteocollina">Twitter/@matteocollina</a></td></tr> | ||
<tr><th align="left">Pedro Teixeira</th><td><a href="https://github.com/pgte">GitHub/pgte</a></td><td><a href="https://twitter.com/pgte">Twitter/@pgte</a></td></tr> | ||
<tr><th align="left">James Halliday</th><td><a href="https://github.com/substack">GitHub/substack</a></td><td><a href="https://twitter.com/substack">Twitter/@substack</a></td></tr> | ||
</tbody></table> | ||
<a name="licence"></a> | ||
Licence & copyright | ||
------------------- | ||
Copyright (c) 2012-2015 **level-ws** contributors (listed above). | ||
**level-ws** is licensed under an MIT +no-false-attribs license. All rights not explicitly granted in the MIT license are reserved. See the included LICENSE file for more details. | ||
[level-badge]: http://leveldb.org/img/badge.svg |
802
test.js
@@ -1,283 +0,266 @@ | ||
/* Copyright (c) 2012-2013 LevelUP contributors | ||
* See list at <https://github.com/rvagg/node-levelup#contributing> | ||
* MIT +no-false-attribs License <https://github.com/rvagg/node-levelup/blob/master/LICENSE> | ||
*/ | ||
var tape = require('tape') | ||
var level = require('level') | ||
var WriteStream = require('.') | ||
var concat = require('level-concat-iterator') | ||
var secretListener = require('secret-event-listener') | ||
var tempy = require('tempy') | ||
var after = require('after') | ||
, tape = require('tape') | ||
, path = require('path') | ||
, fs = require('fs') | ||
, level = require('level') | ||
, rimraf = require('rimraf') | ||
, ws = require('./') | ||
function monitor (stream) { | ||
var order = [] | ||
function cleanup (callback) { | ||
fs.readdir(__dirname, function (err, list) { | ||
if (err) return callback(err) | ||
list = list.filter(function (f) { | ||
return (/^_level-ws_test_db\./).test(f) | ||
;['error', 'finish', 'close'].forEach(function (event) { | ||
secretListener(stream, event, function () { | ||
order.push(event) | ||
}) | ||
}) | ||
if (!list.length) | ||
return callback() | ||
return order | ||
} | ||
var ret = 0 | ||
function monkeyBatch (db, fn) { | ||
var down = db.db | ||
var original = down._batch.bind(down) | ||
down._batch = fn.bind(down, original) | ||
} | ||
list.forEach(function (f) { | ||
rimraf(path.join(__dirname, f), function () { | ||
if (++ret == list.length) | ||
callback() | ||
}) | ||
}) | ||
function slowdown (db) { | ||
monkeyBatch(db, function (original, ops, options, cb) { | ||
setTimeout(function () { | ||
original(ops, options, cb) | ||
}, 500) | ||
}) | ||
} | ||
function openTestDatabase (t, options, callback) { | ||
var location = path.join(__dirname, '_level-ws_test_db.' + Math.random()) | ||
if (typeof options == 'function') { | ||
callback = options | ||
options = { createIfMissing: true, errorIfExists: true } | ||
function test (label, options, fn) { | ||
if (typeof options === 'function') { | ||
fn = options | ||
options = {} | ||
} | ||
rimraf(location, function (err) { | ||
t.notOk(err, 'no error') | ||
level(location, options, function (err, db) { | ||
t.notOk(err, 'no error') | ||
if (!err) { | ||
this.db = ws(db) // invoke ws! | ||
callback(this.db) | ||
} | ||
}.bind(this)) | ||
}.bind(this)) | ||
} | ||
options.createIfMissing = true | ||
options.errorIfExists = true | ||
function setUp (t) { | ||
this.openTestDatabase = openTestDatabase.bind(this, t) | ||
tape(label, function (t) { | ||
var ctx = {} | ||
this.timeout = 1000 | ||
var sourceData = ctx.sourceData = [] | ||
for (var i = 0; i < 2; i++) { | ||
ctx.sourceData.push({ key: String(i), value: 'value' }) | ||
} | ||
this.sourceData = [] | ||
ctx.verify = function (ws, done, data) { | ||
concat(ctx.db.iterator(), function (err, result) { | ||
t.error(err, 'no error') | ||
t.same(result, data || sourceData, 'correct data') | ||
done() | ||
}) | ||
} | ||
for (var i = 0; i < 10; i++) { | ||
this.sourceData.push({ | ||
type : 'put' | ||
, key : i | ||
, value : Math.random() | ||
}) | ||
} | ||
this.verify = function (ws, db, done, data) { | ||
if (!data) data = this.sourceData // can pass alternative data array for verification | ||
t.ok(ws.writable === false, 'not writable') | ||
t.ok(ws.readable === false, 'not readable') | ||
var _done = after(data.length, done) | ||
data.forEach(function (data) { | ||
db.get(data.key, function (err, value) { | ||
t.notOk(err, 'no error') | ||
if (typeof value == 'object') | ||
t.deepEqual(value, data.value, 'WriteStream data #' + data.key + ' has correct value') | ||
else | ||
t.equal(+value, +data.value, 'WriteStream data #' + data.key + ' has correct value') | ||
_done() | ||
level(tempy.directory(), options, function (err, db) { | ||
t.notOk(err, 'no error') | ||
ctx.db = db | ||
fn(t, ctx, function () { | ||
ctx.db.close(function (err) { | ||
t.notOk(err, 'no error') | ||
t.end() | ||
}) | ||
}) | ||
}) | ||
} | ||
}) | ||
} | ||
// TODO: test various encodings | ||
function test (label, fn) { | ||
tape(label, function (t) { | ||
var ctx = {} | ||
setUp.call(ctx, t) | ||
fn.call(ctx, t, function () { | ||
var _cleanup = cleanup.bind(ctx, t.end.bind(t)) | ||
if (ctx.db) | ||
return ctx.db.close(_cleanup) | ||
_cleanup() | ||
}) | ||
test('test simple WriteStream', function (t, ctx, done) { | ||
var ws = WriteStream(ctx.db) | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
} | ||
//TODO: test various encodings | ||
test('test simple WriteStream', function (t, done) { | ||
this.openTestDatabase(function (db) { | ||
var ws = db.createWriteStream() | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', this.verify.bind(this, ws, db, done)) | ||
this.sourceData.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.end() | ||
}.bind(this)) | ||
ws.on('close', ctx.verify.bind(ctx, ws, done)) | ||
ctx.sourceData.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.end() | ||
}) | ||
test('test WriteStream with async writes', function (t, done) { | ||
this.openTestDatabase(function (db) { | ||
var ws = db.createWriteStream() | ||
, sourceData = this.sourceData | ||
, i = -1 | ||
test('test WriteStream with async writes', function (t, ctx, done) { | ||
var ws = WriteStream(ctx.db) | ||
var sourceData = ctx.sourceData | ||
var i = -1 | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', this.verify.bind(this, ws, db, done)) | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', ctx.verify.bind(ctx, ws, done)) | ||
function write () { | ||
if (++i >= sourceData.length) | ||
return ws.end() | ||
function write () { | ||
if (++i >= sourceData.length) return ws.end() | ||
var d = sourceData[i] | ||
// some should batch() and some should put() | ||
if (d.key % 3) { | ||
setTimeout(function () { | ||
ws.write(d) | ||
process.nextTick(write) | ||
}, 10) | ||
} else { | ||
var d = sourceData[i] | ||
// some should batch() and some should put() | ||
if (d.key % 3) { | ||
setTimeout(function () { | ||
ws.write(d) | ||
process.nextTick(write) | ||
} | ||
}, 10) | ||
} else { | ||
ws.write(d) | ||
process.nextTick(write) | ||
} | ||
} | ||
write() | ||
}.bind(this)) | ||
write() | ||
}) | ||
test('test end accepts data', function (t, done) { | ||
this.openTestDatabase(function (db) { | ||
var ws = db.createWriteStream() | ||
, i = 0 | ||
test('race condition between batch callback and close event', function (t, ctx, done) { | ||
// Delaying the batch should not be a problem | ||
slowdown(ctx.db) | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', this.verify.bind(this, ws, db, done)) | ||
this.sourceData.forEach(function (d) { | ||
i++ | ||
if (i < this.sourceData.length) { | ||
ws.write(d) | ||
} else { | ||
ws.end(d) | ||
} | ||
}.bind(this)) | ||
}.bind(this)) | ||
var ws = WriteStream(ctx.db) | ||
var i = 0 | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', ctx.verify.bind(ctx, ws, done)) | ||
ctx.sourceData.forEach(function (d) { | ||
i++ | ||
if (i < ctx.sourceData.length) { | ||
ws.write(d) | ||
} else { | ||
ws.end(d) | ||
} | ||
}) | ||
}) | ||
// at the moment, destroySoon() is basically just end() | ||
test('test destroySoon()', function (t, done) { | ||
this.openTestDatabase(function (db) { | ||
var ws = db.createWriteStream() | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', this.verify.bind(this, ws, db, done)) | ||
this.sourceData.forEach(function (d) { | ||
test('race condition between two flushes', function (t, ctx, done) { | ||
slowdown(ctx.db) | ||
var ws = WriteStream(ctx.db) | ||
var order = monitor(ws) | ||
ws.on('close', function () { | ||
t.same(order, ['batch', 'batch', 'close']) | ||
ctx.verify(ws, done, [ | ||
{ key: 'a', value: 'a' }, | ||
{ key: 'b', value: 'b' } | ||
]) | ||
}) | ||
ctx.db.on('batch', function () { | ||
order.push('batch') | ||
}) | ||
ws.write({ key: 'a', value: 'a' }) | ||
// Schedule another flush while the first is in progress | ||
ctx.db.once('batch', function (ops) { | ||
ws.end({ key: 'b', value: 'b' }) | ||
}) | ||
}) | ||
test('test end accepts data', function (t, ctx, done) { | ||
var ws = WriteStream(ctx.db) | ||
var i = 0 | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', ctx.verify.bind(ctx, ws, done)) | ||
ctx.sourceData.forEach(function (d) { | ||
i++ | ||
if (i < ctx.sourceData.length) { | ||
ws.write(d) | ||
}) | ||
ws.destroySoon() | ||
}.bind(this)) | ||
} else { | ||
ws.end(d) | ||
} | ||
}) | ||
}) | ||
test('test destroy()', function (t, done) { | ||
var verify = function (ws, db) { | ||
t.ok(ws.writable === false, 'not writable') | ||
var _done = after(this.sourceData.length, done) | ||
this.sourceData.forEach(function (data) { | ||
db.get(data.key, function (err, value) { | ||
// none of them should exist | ||
t.ok(err, 'got expected error') | ||
t.notOk(value, 'did not get value') | ||
_done() | ||
}) | ||
test('test destroy()', function (t, ctx, done) { | ||
var ws = WriteStream(ctx.db) | ||
var verify = function () { | ||
concat(ctx.db.iterator(), function (err, result) { | ||
t.error(err, 'no error') | ||
t.same(result, [], 'results should be empty') | ||
done() | ||
}) | ||
} | ||
this.openTestDatabase(function (db) { | ||
var ws = db.createWriteStream() | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
t.ok(ws.writable === true, 'is writable') | ||
t.ok(ws.readable === false, 'not readable') | ||
ws.on('close', verify.bind(this, ws, db)) | ||
this.sourceData.forEach(function (d) { | ||
ws.write(d) | ||
t.ok(ws.writable === true, 'is writable') | ||
t.ok(ws.readable === false, 'not readable') | ||
}) | ||
t.ok(ws.writable === true, 'is writable') | ||
t.ok(ws.readable === false, 'not readable') | ||
ws.destroy() | ||
}.bind(this)) | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', verify.bind(null)) | ||
ctx.sourceData.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.destroy() | ||
}) | ||
test('test json encoding', function (t, done) { | ||
var options = { createIfMissing: true, errorIfExists: true, keyEncoding: 'utf8', valueEncoding: 'json' } | ||
, data = [ | ||
{ type: 'put', key: 'aa', value: { a: 'complex', obj: 100 } } | ||
, { type: 'put', key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { type: 'put', key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
, { type: 'put', key: 'ba', value: { a: 'complex', obj: 100 } } | ||
, { type: 'put', key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { type: 'put', key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
, { type: 'put', key: 'ca', value: { a: 'complex', obj: 100 } } | ||
, { type: 'put', key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { type: 'put', key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
] | ||
test('test destroy(err)', function (t, ctx, done) { | ||
var ws = WriteStream(ctx.db) | ||
var order = monitor(ws) | ||
this.openTestDatabase(options, function (db) { | ||
var ws = db.createWriteStream() | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
ws.on('error', function (err) { | ||
t.is(err.message, 'user error', 'got error') | ||
}) | ||
ws.on('close', function () { | ||
t.same(order, ['error', 'close']) | ||
concat(ctx.db.iterator(), function (err, result) { | ||
t.error(err, 'no error') | ||
t.same(result, [], 'results should be empty') | ||
done() | ||
}) | ||
ws.on('close', this.verify.bind(this, ws, db, done, data)) | ||
data.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.end() | ||
}.bind(this)) | ||
}) | ||
ctx.sourceData.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.destroy(new Error('user error')) | ||
}) | ||
test('test del capabilities for each key/value', function (t, done) { | ||
var options = { createIfMissing: true, errorIfExists: true, keyEncoding: 'utf8', valueEncoding: 'json' } | ||
, data = [ | ||
{ type: 'put', key: 'aa', value: { a: 'complex', obj: 100 } } | ||
, { type: 'put', key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { type: 'put', key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
, { type: 'put', key: 'ba', value: { a: 'complex', obj: 100 } } | ||
, { type: 'put', key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { type: 'put', key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
, { type: 'put', key: 'ca', value: { a: 'complex', obj: 100 } } | ||
, { type: 'put', key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { type: 'put', key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
] | ||
, self = this | ||
test('test json encoding', { keyEncoding: 'utf8', valueEncoding: 'json' }, function (t, ctx, done) { | ||
var data = [ | ||
{ key: 'aa', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }, | ||
{ key: 'ba', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }, | ||
{ key: 'ca', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
] | ||
function open () { | ||
self.openTestDatabase(options, function (db) { | ||
write(db); | ||
}); | ||
} | ||
var ws = WriteStream(ctx.db) | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', ctx.verify.bind(ctx, ws, done, data)) | ||
data.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.end() | ||
}) | ||
function write (db) { | ||
var ws = db.createWriteStream() | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', function () { | ||
del(db); | ||
}) | ||
data.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
test('test del capabilities for each key/value', { keyEncoding: 'utf8', valueEncoding: 'json' }, function (t, ctx, done) { | ||
var data = [ | ||
{ key: 'aa', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }, | ||
{ key: 'ba', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }, | ||
{ key: 'ca', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
] | ||
ws.end() | ||
} | ||
function del (db) { | ||
var delStream = db.createWriteStream() | ||
function del () { | ||
var delStream = WriteStream(ctx.db) | ||
delStream.on('error', function (err) { | ||
@@ -287,3 +270,3 @@ t.notOk(err, 'no error') | ||
delStream.on('close', function () { | ||
verify(db); | ||
verify() | ||
}) | ||
@@ -298,56 +281,38 @@ data.forEach(function (d) { | ||
function verify (db) { | ||
var _done = after(data.length, done) | ||
data.forEach(function (data) { | ||
db.get(data.key, function (err, value) { | ||
// none of them should exist | ||
t.ok(err, 'got expected error') | ||
t.notOk(value, 'did not get value') | ||
_done() | ||
}) | ||
function verify () { | ||
concat(ctx.db.iterator(), function (err, result) { | ||
t.error(err, 'no error') | ||
t.same(result, [], 'results should be empty') | ||
done() | ||
}) | ||
} | ||
open() | ||
var ws = WriteStream(ctx.db) | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', function () { | ||
del() | ||
}) | ||
data.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.end() | ||
}) | ||
test('test del capabilities as constructor option', function (t, done) { | ||
test('test del capabilities as constructor option', { keyEncoding: 'utf8', valueEncoding: 'json' }, function (t, ctx, done) { | ||
var data = [ | ||
{ key: 'aa', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }, | ||
{ key: 'ba', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }, | ||
{ key: 'ca', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
] | ||
var options = { createIfMissing: true, errorIfExists: true, keyEncoding: 'utf8', valueEncoding: 'json' } | ||
, data = [ | ||
{ key: 'aa', value: { a: 'complex', obj: 100 } } | ||
, { key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
, { key: 'ba', value: { a: 'complex', obj: 100 } } | ||
, { key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
, { key: 'ca', value: { a: 'complex', obj: 100 } } | ||
, { key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
] | ||
, self = this | ||
function open () { | ||
self.openTestDatabase(options, function (db) { | ||
write(db); | ||
}); | ||
} | ||
function write (db) { | ||
var ws = db.createWriteStream() | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', function () { | ||
del(db); | ||
}) | ||
data.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.end() | ||
} | ||
function del (db) { | ||
var delStream = db.createWriteStream({ type: 'del' }) | ||
function del () { | ||
var delStream = WriteStream(ctx.db, { type: 'del' }) | ||
delStream.on('error', function (err) { | ||
@@ -357,3 +322,3 @@ t.notOk(err, 'no error') | ||
delStream.on('close', function () { | ||
verify(db); | ||
verify() | ||
}) | ||
@@ -367,58 +332,41 @@ data.forEach(function (d) { | ||
function verify (db) { | ||
var _done = after(data.length, done) | ||
data.forEach(function (data) { | ||
db.get(data.key, function (err, value) { | ||
// none of them should exist | ||
t.ok(err, 'got expected error') | ||
t.notOk(value, 'did not get value') | ||
_done() | ||
}) | ||
function verify () { | ||
concat(ctx.db.iterator(), function (err, result) { | ||
t.error(err, 'no error') | ||
t.same(result, [], 'results should be empty') | ||
done() | ||
}) | ||
} | ||
open() | ||
var ws = WriteStream(ctx.db) | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', function () { | ||
del() | ||
}) | ||
data.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.end() | ||
}) | ||
test('test type at key/value level must take precedence on the constructor', function (t, done) { | ||
var options = { createIfMissing: true, errorIfExists: true, keyEncoding: 'utf8', valueEncoding: 'json' } | ||
, data = [ | ||
{ key: 'aa', value: { a: 'complex', obj: 100 } } | ||
, { key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
, { key: 'ba', value: { a: 'complex', obj: 100 } } | ||
, { key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
, { key: 'ca', value: { a: 'complex', obj: 100 } } | ||
, { key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } } | ||
, { key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
] | ||
, exception = data[0] | ||
, self = this | ||
test('test type at key/value level must take precedence on the constructor', { keyEncoding: 'utf8', valueEncoding: 'json' }, function (t, ctx, done) { | ||
var data = [ | ||
{ key: 'aa', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'ab', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'ac', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }, | ||
{ key: 'ba', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'bb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'bc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } }, | ||
{ key: 'ca', value: { a: 'complex', obj: 100 } }, | ||
{ key: 'cb', value: { b: 'foo', bar: [ 1, 2, 3 ] } }, | ||
{ key: 'cc', value: { c: 'w00t', d: { e: [ 0, 10, 20, 30 ], f: 1, g: 'wow' } } } | ||
] | ||
var exception = data[0] | ||
exception['type'] = 'put' | ||
function open () { | ||
self.openTestDatabase(options, function (db) { | ||
write(db); | ||
}); | ||
} | ||
function write (db) { | ||
var ws = db.createWriteStream() | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', function () { | ||
del(db); | ||
}) | ||
data.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.end() | ||
} | ||
function del (db) { | ||
var delStream = db.createWriteStream({ type: 'del' }) | ||
function del () { | ||
var delStream = WriteStream(ctx.db, { type: 'del' }) | ||
delStream.on('error', function (err) { | ||
@@ -428,3 +376,3 @@ t.notOk(err, 'no error') | ||
delStream.on('close', function () { | ||
verify(db); | ||
verify() | ||
}) | ||
@@ -438,55 +386,167 @@ data.forEach(function (d) { | ||
function verify (db) { | ||
var _done = after(data.length, done) | ||
data.forEach(function (data) { | ||
db.get(data.key, function (err, value) { | ||
if (data.type === 'put') { | ||
t.ok(value, 'got value') | ||
_done() | ||
} else { | ||
t.ok(err, 'got expected error') | ||
t.notOk(value, 'did not get value') | ||
_done() | ||
} | ||
}) | ||
function verify () { | ||
concat(ctx.db.iterator(), function (err, result) { | ||
t.error(err, 'no error') | ||
var expected = [ { key: data[0].key, value: data[0].value } ] | ||
t.same(result, expected, 'only one element') | ||
done() | ||
}) | ||
} | ||
open() | ||
var ws = WriteStream(ctx.db) | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
ws.on('close', function () { | ||
del() | ||
}) | ||
data.forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
ws.end() | ||
}) | ||
test('test ignoring pairs with the wrong type', function (t, done) { | ||
var self = this | ||
test('test that missing type errors', function (t, ctx, done) { | ||
var data = { key: 314, type: 'foo' } | ||
var errored = false | ||
function open () { | ||
self.openTestDatabase(write) | ||
function verify () { | ||
ctx.db.get(data.key, function (err, value) { | ||
t.equal(errored, true, 'error received in stream') | ||
t.ok(err, 'got expected error') | ||
t.equal(err.notFound, true, 'not found error') | ||
t.notOk(value, 'did not get value') | ||
done() | ||
}) | ||
} | ||
function write (db) { | ||
var ws = db.createWriteStream() | ||
ws.on('error', function (err) { | ||
t.notOk(err, 'no error') | ||
}) | ||
var ws = WriteStream(ctx.db) | ||
ws.on('error', function (err) { | ||
t.equal(err.message, '`type` must be \'put\' or \'del\'', 'should error') | ||
errored = true | ||
}) | ||
ws.on('close', function () { | ||
verify() | ||
}) | ||
ws.write(data) | ||
ws.end() | ||
}) | ||
test('test limbo batch error', function (t, ctx, done) { | ||
var ws = WriteStream(ctx.db) | ||
var order = monitor(ws) | ||
monkeyBatch(ctx.db, function (original, ops, options, cb) { | ||
process.nextTick(cb, new Error('batch error')) | ||
}) | ||
ws.on('error', function (err) { | ||
t.is(err.message, 'batch error') | ||
}) | ||
ws.on('close', function () { | ||
t.same(order, ['error', 'close']) | ||
t.end() | ||
}) | ||
// Don't end(), because we want the error to follow a | ||
// specific code path (when there is no _flush listener). | ||
ws.write({ key: 'a', value: 'a' }) | ||
}) | ||
test('test batch error when buffer is full', function (t, ctx, done) { | ||
var ws = WriteStream(ctx.db, { maxBufferLength: 1 }) | ||
var order = monitor(ws) | ||
monkeyBatch(ctx.db, function (original, ops, options, cb) { | ||
process.nextTick(cb, new Error('batch error')) | ||
}) | ||
ws.on('error', function (err) { | ||
t.is(err.message, 'batch error', 'got error') | ||
}) | ||
ws.on('close', function () { | ||
t.same(order, ['error', 'close']) | ||
t.end() | ||
}) | ||
// Don't end(), because we want the error to follow a | ||
// specific code path (when we're waiting to drain). | ||
ws.write({ key: 'a', value: 'a' }) | ||
ws.write({ key: 'b', value: 'b' }) | ||
}) | ||
test('test destroy while waiting to drain', function (t, ctx, done) { | ||
var ws = WriteStream(ctx.db, { maxBufferLength: 1 }) | ||
var order = monitor(ws) | ||
ws.on('error', function (err) { | ||
t.is(err.message, 'user error', 'got error') | ||
}) | ||
ws.on('close', function () { | ||
t.same(order, ['error', 'close']) | ||
t.end() | ||
}) | ||
ws.prependListener('_flush', function (err) { | ||
t.ifError(err, 'no _flush error') | ||
ws.destroy(new Error('user error')) | ||
}) | ||
// Don't end. | ||
ws.write({ key: 'a', value: 'a' }) | ||
ws.write({ key: 'b', value: 'b' }) | ||
}) | ||
;[0, 1, 2, 10, 20, 100].forEach(function (max) { | ||
test('test maxBufferLength: ' + max, testMaxBuffer(max, false)) | ||
test('test maxBufferLength: ' + max + ' (random)', testMaxBuffer(max, true)) | ||
}) | ||
function testMaxBuffer (max, randomize) { | ||
return function (t, ctx, done) { | ||
var ws = WriteStream(ctx.db, { maxBufferLength: max }) | ||
var sourceData = [] | ||
var batches = [] | ||
for (var i = 0; i < 20; i++) { | ||
sourceData.push({ key: i < 10 ? '0' + i : String(i), value: 'value' }) | ||
} | ||
var expectedSize = max || sourceData.length | ||
var remaining = sourceData.slice() | ||
ws.on('close', function () { | ||
verify(db) | ||
t.ok(batches.every(function (size, index) { | ||
// Last batch may contain additional items | ||
return size <= expectedSize || index === batches.length - 1 | ||
}), 'batch sizes are <= max') | ||
ctx.verify(ws, done, sourceData) | ||
}) | ||
self.sourceData.forEach(function (d) { | ||
d.type = 'x' + Math.random() | ||
ws.write(d) | ||
ctx.db.on('batch', function (ops) { | ||
batches.push(ops.length) | ||
}) | ||
ws.end() | ||
} | ||
function verify (db) { | ||
var _done = after(self.sourceData.length, done) | ||
self.sourceData.forEach(function (data) { | ||
db.get(data.key, function (err, value) { | ||
t.ok(err, 'got expected error') | ||
t.notOk(value, 'did not get value') | ||
_done() | ||
loop() | ||
function loop () { | ||
var toWrite = randomize | ||
? Math.floor(Math.random() * remaining.length + 1) | ||
: remaining.length | ||
remaining.splice(0, toWrite).forEach(function (d) { | ||
ws.write(d) | ||
}) | ||
}) | ||
if (remaining.length) { | ||
setImmediate(loop) | ||
} else { | ||
ws.end() | ||
} | ||
} | ||
} | ||
open() | ||
}) | ||
} |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
SPDX disjunction
LicenseSPDX disjunction for an artifact's license information
Found 1 instance in 1 package
No contributors or author data
MaintenancePackage does not specify a list of contributors or an author in package.json.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
SPDX disjunction
LicenseSPDX disjunction for an artifact's license information
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
30037
9
1
0
0
3
8
542
2
116
+ Addedinherits@^2.0.3
Updatedxtend@^4.0.1