Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

flumeview-level

Package Overview
Dependencies
Maintainers
5
Versions
33
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

flumeview-level - npm Package Compare versions

Comparing version
3.0.14
to
4.0.0
+200
-133
index.js

@@ -1,149 +0,201 @@

'use strict'
var pull = require('pull-stream')
var Level = require('level')
var charwise = require('charwise')
var Write = require('pull-write')
var pl = require('pull-level')
var Obv = require('obv')
var path = require('path')
var Paramap = require('pull-paramap')
var ltgt = require('ltgt')
var explain = require('explain-error')
var mkdirp = require('mkdirp')
"use strict";
var pull = require("pull-stream");
var Level = require("level");
var charwise = require("charwise");
var Write = require("pull-write");
var pl = require("pull-level");
var Obv = require("obv");
var path = require("path");
var Paramap = require("pull-paramap");
var ltgt = require("ltgt");
var explain = require("explain-error");
var mkdirp = require("mkdirp");
module.exports = function (version, map) {
return function (log, name) {
var dir = path.dirname(log.filename)
var dbPath = path.join(dir, name)
var db, writer
var dir = path.dirname(log.filename);
var dbPath = path.join(dir, name);
var db, writer;
var META = '\x00', since = Obv()
var META = "\x00";
var since = Obv();
var written = 0, closed, outdated
var closed;
var outdated;
function create() {
closed = false
if(!log.filename)
throw new Error('flumeview-level can only be used with a log that provides a directory')
return Level(path.join(dir, name), {keyEncoding: charwise, valueEncoding: 'json'})
closed = false;
if (!log.filename) {
throw new Error(
"flumeview-level can only be used with a log that provides a directory"
);
}
return Level(path.join(dir, name), {
keyEncoding: charwise,
valueEncoding: "json",
});
}
function close (cb) {
closed = true
//todo: move this bit into pull-write
if (outdated) db.close(cb)
else if(writer) writer.abort(function () { db.close(cb) })
else if(!db) cb()
else since.once(function () {
db.close(cb)
})
function close(cb) {
closed = true;
// todo: move this bit into pull-write
if (outdated) db.close(cb);
else if (writer) {
writer.abort(function () {
db.close(cb);
});
} else if (!db) cb();
else {
since.once(function () {
db.close(cb);
});
}
}
function destroy (cb) {
function destroy(cb) {
close(function () {
Level.destroy(dbPath, cb)
})
Level.destroy(dbPath, cb);
});
}
function dirReady() {
if(closed) return
db = create()
db.get(META, {keyEncoding: 'utf8'}, function (err, value) {
if(err) since.set(-1)
else if(value.version === version)
since.set(value.since)
if (closed) return;
db = create();
db.get(META, { keyEncoding: "utf8" }, function (err, value) {
if (err) since.set(-1);
else if (value.version === version) since.set(value.since);
else {
//version has changed, wipe db and start over.
outdated = true
// version has changed, wipe db and start over.
outdated = true;
destroy(function () {
db = create()
since.set(-1)
})
db = create();
since.set(-1);
});
}
})
});
}
if (process.title == 'browser') {
if (process.title === "browser") {
// in browser level is stored inside IndexedDB
dirReady()
} else
mkdirp(path.join(dir, name), dirReady)
dirReady();
} else mkdirp(path.join(dir, name)).then(dirReady);
return {
since: since,
methods: { get: 'async', read: 'source'},
methods: { get: "async", read: "source" },
createSink: function (cb) {
return writer = Write(function (batch, cb) {
if(closed) return cb(new Error('database closed while index was building'))
db.batch(batch, function (err) {
if(err) return cb(err)
since.set(batch[0].value.since)
//callback to anyone waiting for this point.
cb()
})
}, function reduce (batch, data) {
if(data.sync) return batch
var seq = data.seq
return (writer = Write(
function (batch, cb) {
if (closed) {
return cb(new Error("database closed while index was building"));
}
db.batch(batch, function (err) {
if (err) return cb(err);
since.set(batch[0].value.since);
// callback to anyone waiting for this point.
cb();
});
},
function reduce(batch, data) {
if (data.sync) return batch;
var seq = data.seq;
if(!batch)
batch = [{
key: META,
value: {version: version, since: seq},
valueEncoding: 'json', keyEncoding:'utf8', type: 'put'
}]
if (!batch) {
batch = [
{
key: META,
value: { version: version, since: seq },
valueEncoding: "json",
keyEncoding: "utf8",
type: "put",
},
];
}
//map must return an array (like flatmap) with zero or more values
var indexed = map(data.value, data.seq)
batch = batch.concat(indexed.map(function (key) { return { key: key, value: seq, type: 'put' }}))
batch[0].value.since = Math.max(batch[0].value.since, seq)
return batch
}, 512, cb)
// map must return an array (like flatmap) with zero or more values
var indexed = map(data.value, data.seq);
batch = batch.concat(
indexed.map(function (key) {
return { key: key, value: seq, type: "put" };
})
);
batch[0].value.since = Math.max(batch[0].value.since, seq);
return batch;
},
512,
cb
));
},
get: function (key, cb) {
//wait until the log has been processed up to the current point.
// wait until the log has been processed up to the current point.
db.get(key, function (err, seq) {
if (err && err.name === 'NotFoundError')
return cb(err)
if (err)
return cb(explain(err, 'flumeview-level.get: key not found:'+key))
if (err && err.name === "NotFoundError") return cb(err);
if (err) {
return cb(
explain(err, "flumeview-level.get: key not found:" + key)
);
}
log.get(seq, function (err, value) {
if (err) {
if (err.code === 'flumelog:deleted') {
if (err.code === "flumelog:deleted") {
return db.del(key, (delErr) => {
if (delErr) {
return cb(explain(delErr, 'when trying to delete:'+key+'at since:'+log.since.value))
return cb(
explain(
delErr,
"when trying to delete:" +
key +
"at since:" +
log.since.value
)
);
}
cb(err, null, seq)
})
cb(err, null, seq);
});
}
return cb(explain(err, 'flumeview-level.get: index for: ' +key+'pointed at:'+seq+'but log error'))
return cb(
explain(
err,
"flumeview-level.get: index for: " +
key +
"pointed at:" +
seq +
"but log error"
)
);
} else {
cb(null, value, seq)
cb(null, value, seq);
}
})
})
});
});
},
read: function (opts) {
var keys = opts.keys !== false
var values = opts.values !== false
var seqs = opts.seqs !== false
opts.keys = true; opts.values = true
//TODO: preserve whatever the user passed in on opts...
var keys = opts.keys !== false;
var values = opts.values !== false;
var seqs = opts.seqs !== false;
opts.keys = true;
opts.values = true;
// TODO: preserve whatever the user passed in on opts...
var lower = ltgt.lowerBound(opts)
if(lower == null) opts.gt = null
var lower = ltgt.lowerBound(opts);
if (lower == null) opts.gt = null;
function format (key, seq, value) {
return (
keys && values && seqs ? {key: key, seq: seq, value: value}
: keys && values ? {key: key, value: value}
: keys && seqs ? {key: key, seq: seq}
: seqs && values ? {seq: seq, value: value}
: keys ? key : seqs ? seq : value
)
function format(key, seq, value) {
return keys && values && seqs
? { key: key, seq: seq, value: value }
: keys && values
? { key: key, value: value }
: keys && seqs
? { key: key, seq: seq }
: seqs && values
? { seq: seq, value: value }
: keys
? key
: seqs
? seq
: value;
}

@@ -154,40 +206,55 @@

pull.filter(function (op) {
//this is an ugly hack! ); but it stops the index metadata appearing in the live stream
return op.key !== META
// this is an ugly hack! ); but it stops the index metadata appearing in the live stream
return op.key !== META;
}),
values
? pull(
Paramap(function (data, cb) {
if (data.sync) return cb(null, data)
if (data.type === 'del') return cb(null, null)
? pull(
Paramap(function (data, cb) {
if (data.sync) return cb(null, data);
if (data.type === "del") return cb(null, null);
log.get(data.value, function (err, value) {
if(err) {
if (err.code === 'flumelog:deleted') {
return db.del(data.key, (delErr) => {
if (delErr) {
return cb(explain(err, 'when trying to delete:'+data.key+'at since:'+log.since.value))
log.get(data.value, function (err, value) {
if (err) {
if (err.code === "flumelog:deleted") {
return db.del(data.key, (delErr) => {
if (delErr) {
return cb(
explain(
err,
"when trying to delete:" +
data.key +
"at since:" +
log.since.value
)
);
}
cb(null, null);
});
}
cb(null,null)
})
}
cb(explain(err, 'when trying to retrive:'+data.key+'at since:'+log.since.value))
}
else cb(null, format(data.key, data.value, value))
cb(
explain(
err,
"when trying to retrive:" +
data.key +
"at since:" +
log.since.value
)
);
} else cb(null, format(data.key, data.value, value));
});
}),
pull.filter()
)
: pull.map(function (data) {
return format(data.key, data.value, null);
})
}),
pull.filter()
)
: pull.map(function (data) {
return format(data.key, data.value, null)
})
)
);
},
close: close,
destroy: destroy
//put, del, batch - leave these out for now, since the indexes just map.
}
}
}
destroy: destroy,
// put, del, batch - leave these out for now, since the indexes just map.
};
};
};
{
"name": "flumeview-level",
"description": "a flumeview on level",
"version": "3.0.14",
"version": "4.0.0",
"homepage": "https://github.com/flumedb/flumeview-level",

@@ -13,9 +13,9 @@ "repository": {

"explain-error": "^1.0.4",
"level": "^5.0.0",
"level": "^6.0.0",
"ltgt": "^2.1.3",
"mkdirp": "^0.5.1",
"mkdirp": "^1.0.3",
"obv": "0.0.1",
"pull-level": "^2.0.3",
"pull-paramap": "^1.2.1",
"pull-stream": "^3.5.0",
"pull-stream": "^3.6.14",
"pull-write": "^1.1.1"

@@ -25,6 +25,6 @@ },

"flumecodec": "0.0.1",
"flumedb": "^1.0.0",
"flumelog-offset": "^3.2.6",
"tape": "^4.10.1",
"test-flumeview-index": "^2.2.0"
"flumedb": "^2.0.0",
"flumelog-offset": "^3.4.4",
"tape": "^4.13.0",
"test-flumeview-index": "^2.2.4"
},

@@ -31,0 +31,0 @@ "scripts": {

+28
-22

@@ -7,27 +7,35 @@ # flumeview-level

## example
``` js
var FlumeviewLevel = require('flumeview-level')
```js
const Flume = require('flumedb')
const FlumelogOffset = require('flumelog-offset')
const FlumeviewLevel = require('flumeview-level')
const pull = require('pull-stream')
flumedb.use(name, FlumeviewLevel(1, function map (value) {
return [data.foo] // must return an array
}))
const flumedb = Flume(FlumelogOffset('/tmp/log.offset'))
flumedb.append({foo: 'bar'}, function (err) {
if(err) throw err
const name = 'foo'
//query items from the index directly
flumedb.use(
name,
FlumeviewLevel(1, function map (value) {
return [value.foo] // must return an array
})
)
flumedb.append({ foo: 'bar' }, function (err) {
if (err) throw err
// Query items from the index directly
flumedb[name].get('bar', function (err, value) {
if(err) throw err
console.log(value) // => {foo: 'bar'})
if (err) throw err
console.log(value) // => { foo: 'bar' }
})
//or query ranges via pull-streams
// Or query ranges via pull-streams
pull(
flumedb[name].read({gte: 'bar', live: true}),
...
flumedb[name].read({ gte: 'bar', live: true }),
pull.drain(console.log)
)
})

@@ -41,5 +49,7 @@ ```

#### `version`
The version of the view. Incrementing this number will cause the view to be re-built
#### `map`
A function with signature `(value, seq)`, where `value` is the item from the log coming past, and `seq` is the location of that value in the flume log.

@@ -51,2 +61,3 @@

Examples of index key(s) you might return:
- `[]` - i.e. don't add any indexes for this `value`

@@ -65,7 +76,6 @@ - `['@mix']` - make an index entry for this value under string `@mix`

#### `function`
#### `function`
flumeview-level returns a function which follows the flumeview pattern, enabling it to be installed into a flumedb.
### `get(key, cb)`

@@ -77,3 +87,2 @@

### `read(opts) => pull-stream`

@@ -111,3 +120,3 @@

Assume this is an index where the keys are of the form `[@mentions, timestamp], then this query will get all mentions which are _exactly_ '@mix', and happened more recently than 2018-04-27 5pm NZT (note `undefined` is the highest value in [bytewise](https://github.com/deanlandolt/bytewise#order-of-supported-structures) comparator)
Assume this is an index where the keys are of the form `[@mentions, timestamp], then this query will get all mentions which are _exactly_ '@mix', and happened more recently than 2018-04-27 5pm NZT (note`undefined` is the highest value in [bytewise](https://github.com/deanlandolt/bytewise#order-of-supported-structures) comparator)

@@ -128,7 +137,4 @@ If you wanted to get all mentions which _started with_ `@m` you could use:

## License
MIT

@@ -0,35 +1,26 @@

var Flume = require("flumedb");
var Log = require("flumelog-offset");
var Index = require("../");
var Flume = require('flumedb')
var Log = require('flumelog-offset')
var Index = require('../')
var codec = require('flumecodec')
var decodes = 0, time = 0, start = Date.now()
var codec = {
encode: function (o) {
var s = JSON.stringify(o)
return s
return JSON.stringify(o);
},
decode: function (s) {
decodes ++
// var start = Date.now()
var start = process.hrtime()
var v = JSON.parse(s.toString())
time += process.hrtime(start)[1]
// time += Date.now()-start
return v
return JSON.parse(s.toString());
},
buffer: false,
}
};
process.on('exit', function () {
console.error('memory', process.memoryUsage())
})
process.on("exit", function () {
console.error("memory", process.memoryUsage());
});
require('test-flumeview-index/bench')(function (file, seed) {
return Flume(Log(file+'log.offset', 1024, codec))
.use('index', Index(1, function (e) { return [e.key] }))
}, 5e4)
require("test-flumeview-index/bench")(function (file, seed) {
return Flume(Log(file + "log.offset", 1024, codec)).use(
"index",
Index(1, function (e) {
return [e.key];
})
);
}, 5e4);

@@ -0,15 +1,14 @@

var Flume = require("flumedb");
var Log = require("flumelog-offset");
var Index = require("../");
var codec = require("flumecodec");
var Flume = require('flumedb')
var Log = require('flumelog-offset')
var Index = require('../')
var codec = require('flumecodec')
require('test-flumeview-index')(function (file, seed) {
return Flume(Log(file+'/log.offset', 1024, codec.json))
.use('index', Index(1, function (e) {
console.log(e)
return [e.key]
}))
})
require("test-flumeview-index")(function (file, seed) {
return Flume(Log(file + "/log.offset", 1024, codec.json)).use(
"index",
Index(1, function (e) {
console.log(e);
return [e.key];
})
);
});

@@ -0,16 +1,14 @@

var Flume = require("flumedb");
var Log = require("flumelog-offset");
var Index = require("../");
var codec = require("flumecodec");
var Flume = require('flumedb')
var Log = require('flumelog-offset')
var Index = require('../')
var codec = require('flumecodec')
require('test-flumeview-index/live')(function (file, seed) {
return Flume(Log(file+'/log.offset', 1024, codec.json))
.use('index', Index(1, function (e) {
console.log(e)
return [e.key]
}))
})
require("test-flumeview-index/live")(function (file, seed) {
return Flume(Log(file + "/log.offset", 1024, codec.json)).use(
"index",
Index(1, function (e) {
console.log(e);
return [e.key];
})
);
});

@@ -0,15 +1,14 @@

var Flume = require("flumedb");
var Log = require("flumelog-offset");
var Index = require("../");
var codec = require("flumecodec");
var Flume = require('flumedb')
var Log = require('flumelog-offset')
var Index = require('../')
var codec = require('flumecodec')
require('test-flumeview-index/read')(function (file, seed) {
return Flume(Log(file+'/log.offset', 1024, codec.json))
.use('index', Index(1, function (e) {
console.log(e)
return [e.key]
}))
})
require("test-flumeview-index/read")(function (file, seed) {
return Flume(Log(file + "/log.offset", 1024, codec.json)).use(
"index",
Index(1, function (e) {
console.log(e);
return [e.key];
})
);
});