kappa-view-query
Advanced tools
Comparing version 2.0.9 to 3.0.0-alpha1
119
example.js
@@ -1,57 +0,44 @@ | ||
const kappa = require('kappa-core') | ||
const Kappa = require('kappa-core') | ||
const multifeed = require('multifeed') | ||
const hypercore = require('hypercore') | ||
const ram = require('random-access-memory') | ||
const collect = require('collect-stream') | ||
const memdb = require('memdb') | ||
const sub = require('subleveldown') | ||
const Query = require('./') | ||
const { validator } = require('./util') | ||
const { validator, fromMultifeed, fromHypercore } = require('./util') | ||
const { cleaup, tmp } = require('./test/util') | ||
const core = kappa(ram, { valueEncoding: 'json' }) | ||
const db = memdb() | ||
const seedData = require('./test/seeds.json') | ||
const indexes = [ | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }, | ||
{ key: 'cha', value: [['value', 'type'], ['value', 'content', 'channel'], ['value', 'timestamp']] } | ||
] | ||
// An example using a single hypercore | ||
function HypercoreExample () { | ||
const core = new Kappa() | ||
const feed = hypercore(ram, { valueEncoding: 'json' }) | ||
const db = memdb() | ||
core.use('query', Query(db, { indexes, validator })) | ||
core.use('query', createHypercoreSource({ feed, db: sub(db, 'state') }), Query(sub(db, 'view'), { | ||
// define a set of indexes | ||
indexes: [ | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] } | ||
], | ||
// you can pass a custom validator function to ensure all messages entering a feed match a specific format | ||
validator, | ||
// implement your own getMessage function, and perform any desired validation on each message returned by the query | ||
getMessage: fromHypercore(feed) | ||
})) | ||
core.writer('local', (err, feed) => { | ||
const data = [{ | ||
type: 'chat/message', | ||
timestamp: Date.now(), | ||
content: { body: 'Hi im new here...' } | ||
}, { | ||
type: 'user/about', | ||
timestamp: Date.now(), | ||
content: { name: 'Grace' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now(), | ||
content: { body: 'Second post' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now(), | ||
content: { channel: 'dogs', body: 'Lurchers rule' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now(), | ||
content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' } | ||
}, { | ||
type: 'user/about', | ||
timestamp: Date.now(), | ||
content: { name: 'Poison Ivy' } | ||
}] | ||
feed.append(data, (err, _) => { | ||
feed.append(seedData, (err, _) => { | ||
core.ready('query', () => { | ||
const query = [{ $filter: { value: { type: 'chat/message', content: { channel: 'dogs' } } } }] | ||
const query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
collect(core.api.query.read({ query }), (err, chats) => { | ||
// grab then log all chat/message message types up until this point | ||
collect(core.view.query.read({ query }), (err, chats) => { | ||
if (err) return console.error(err) | ||
console.log(chats) | ||
collect(core.api.query.read({ query: [{ $filter: { value: { type: 'user/about' } } }] }), (err, users) => { | ||
// grab then log all user/about message types up until this point | ||
collect(core.view.query.read({ query: [{ $filter: { value: { type: 'user/about' } } }] }), (err, users) => { | ||
if (err) return console.error(err) | ||
@@ -63,2 +50,50 @@ console.log(users) | ||
}) | ||
}) | ||
} | ||
// an example using multifeed for aggregating and querying all feeds | ||
function MultifeedExample () { | ||
const core = new Kappa() | ||
const feeds = multifeed(ram, { valueEncoding: 'json' }) | ||
const db = memdb() | ||
core.use('query', createMultifeedSource({ feeds, db: sub(db, 'state') }), Query(sub(db, 'view'), { | ||
indexes: [ | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] } | ||
], | ||
validator, | ||
getMessage: fromMultifeed(feeds) | ||
})) | ||
core.ready('query', () => { | ||
// setup a live query to first log all chat/message | ||
core.view.query.ready({ | ||
query: [{ $filter: { value: { type: 'chat/message' } } }], | ||
live: true, | ||
old: false | ||
}).on('data', (msg) => { | ||
if (msg.sync) return next() | ||
console.log(msg) | ||
}) | ||
function next () { | ||
// then to first log all user/about | ||
core.view.query.read({ | ||
query: [{ $filter: { value: { type: 'user/about' } } }], | ||
live: true, | ||
old: false | ||
}).on('data', (msg) => { | ||
console.log(msg) | ||
}) | ||
} | ||
}) | ||
// then append a bunch of data to two different feeds in a multifeed | ||
feeds.writer('one', (err, one) => { | ||
feeds.writer('two', (err, two) => { | ||
one.append(seedData.slice(0, 3)) | ||
two.append(seedData.slice(3, 5)) | ||
}) | ||
}) | ||
} |
203
index.js
@@ -7,2 +7,3 @@ const through = require('through2') | ||
const liveStream = require('level-live-stream') | ||
const { isFunction } = require('util') | ||
@@ -12,4 +13,2 @@ const Explain = require('./explain') | ||
const { isFunction } = require('./util') | ||
module.exports = function KappaViewQuery (db = memdb(), opts = {}) { | ||
@@ -21,132 +20,112 @@ const events = new EventEmitter() | ||
validator = (msg) => msg, | ||
keyEncoding = charwise | ||
keyEncoding = charwise, | ||
getMessage, | ||
} = opts | ||
const view = { | ||
return { | ||
maxBatch: opts.maxBatch || 100, | ||
map, | ||
indexed, | ||
api: { | ||
read, | ||
explain, | ||
add, | ||
onUpdate: (core, cb) => events.on('update', cb), | ||
events | ||
} | ||
} | ||
map: (msgs, next) => { | ||
var ops = [] | ||
function explain (core, _opts) { | ||
var expl = Explain(indexes.map((idx) => Object.assign(idx, { | ||
exact: typeof idx.exact === 'boolean' ? idx.exact : false, | ||
createStream: (__opts) => { | ||
var thru = through.obj(function (msg, enc, next) { | ||
if (msg.sync) { | ||
this.push(msg) | ||
return next() | ||
} | ||
msgs.forEach((msg) => { | ||
msg = validator(msg) | ||
if (!msg) return | ||
getMessage(msg, (err, msg) => { | ||
if (err) return next() | ||
this.push(msg) | ||
next() | ||
}) | ||
}) | ||
indexes.forEach((idx) => { | ||
var indexKeys = getIndexValues(msg, idx.value) | ||
var streamOpts = Object.assign(__opts, { | ||
lte: [idx.key, ...__opts.lte], | ||
gte: [idx.key, ...__opts.gte], | ||
keyEncoding, | ||
keys: true, | ||
values: true | ||
}) | ||
if (indexKeys.length) { | ||
ops.push({ | ||
type: 'put', | ||
key: [idx.key, ...indexKeys, msg.seq, msg.key], | ||
value: [msg.key, msg.seq].join('@'), | ||
keyEncoding, | ||
}) | ||
} | ||
var stream = __opts.live | ||
? liveStream(db, streamOpts) | ||
: db.createReadStream(streamOpts) | ||
function getIndexValues (msg, value) { | ||
var child = value[0] | ||
if (Array.isArray(child)) { | ||
return value | ||
.map((val) => getIndexValues(msg, val)) | ||
.reduce((acc, arr) => [...acc, ...arr], []) | ||
.filter(Boolean) | ||
} else if (typeof child === 'string') { | ||
return [value.reduce((obj, val) => obj[val], msg)] | ||
.filter(Boolean) | ||
} else return [] | ||
} | ||
}) | ||
}) | ||
stream.pipe(thru) | ||
db.batch(ops, next) | ||
}, | ||
indexed: (msgs) => { | ||
debug(`indexing ${JSON.stringify(msgs, null, 2)}`) | ||
return thru | ||
} | ||
}))) | ||
msgs.forEach((msg) => { | ||
events.emit('update', msg) | ||
}) | ||
}, | ||
api: { | ||
read: (core, _opts) => { | ||
var __opts = view.api.explain(core, _opts) | ||
var source = __opts.createStream(__opts) | ||
return Filter(source, _opts) | ||
}, | ||
explain: (core, _opts) => { | ||
var explain = Explain(indexes.map((idx) => Object.assign(idx, { | ||
exact: typeof idx.exact === 'boolean' ? idx.exact : false, | ||
createStream: (__opts) => { | ||
var thru = through.obj(function (msg, enc, next) { | ||
if (msg.sync) { | ||
this.push(msg) | ||
return next() | ||
} | ||
return expl(_opts) | ||
} | ||
var msgId = msg.value | ||
var [ feedId, sequence ] = msgId.split('@') | ||
var feed = core._logs.feed(feedId) | ||
var seq = Number(sequence) | ||
function read (core, _opts) { | ||
var __opts = explain(core, _opts) | ||
var source = __opts.createStream(__opts) | ||
return Filter(source, _opts) | ||
} | ||
feed.get(seq, (err, value) => { | ||
if (err) return next() | ||
function add (core, _opts) { | ||
var isValid = _opts && isFunction(_opts.createStream) && Array.isArray(_opts.index || _opts.value) | ||
if(!isValid) throw new Error('kappa-view-query.add: expected opts { index, createStream }') | ||
_opts.value = _opts.index || _opts.value | ||
indexes.push(_opts) | ||
} | ||
var msg = validator({ | ||
key: feed.key.toString('hex'), | ||
seq, | ||
value | ||
}) | ||
function map (msgs, next) { | ||
var ops = [] | ||
if (!msg) return next() | ||
this.push(msg) | ||
next() | ||
}) | ||
}) | ||
msgs.forEach((msg) => { | ||
msg = validator(msg) | ||
if (!msg) return | ||
var streamOpts = Object.assign(__opts, { | ||
lte: [idx.key, ...__opts.lte], | ||
gte: [idx.key, ...__opts.gte], | ||
keyEncoding, | ||
keys: true, | ||
values: true | ||
}) | ||
indexes.forEach((idx) => { | ||
var indexKeys = getIndexValues(msg, idx.value) | ||
var stream = __opts.live | ||
? liveStream(db, streamOpts) | ||
: db.createReadStream(streamOpts) | ||
if (indexKeys.length) { | ||
ops.push({ | ||
type: 'put', | ||
key: [idx.key, ...indexKeys, msg.seq, msg.key], | ||
value: [msg.key, msg.seq].join('@'), | ||
keyEncoding, | ||
}) | ||
} | ||
stream.pipe(thru) | ||
function getIndexValues (msg, value) { | ||
var child = value[0] | ||
if (Array.isArray(child)) { | ||
return value | ||
.map((val) => getIndexValues(msg, val)) | ||
.reduce((acc, arr) => [...acc, ...arr], []) | ||
.filter(Boolean) | ||
} else if (typeof child === 'string') { | ||
return [value.reduce((obj, val) => obj[val], msg)] | ||
.filter(Boolean) | ||
} else return [] | ||
} | ||
}) | ||
}) | ||
return thru | ||
} | ||
}))) | ||
db.batch(ops, next) | ||
} | ||
return explain(_opts) | ||
}, | ||
add: (core, _opts) => { | ||
var isValid = _opts && isFunction(_opts.createStream) && Array.isArray(_opts.index || _opts.value) | ||
if(!isValid) throw new Error('kappa-view-query.add: expected opts { index, createStream }') | ||
_opts.value = _opts.index || _opts.value | ||
indexes.push(_opts) | ||
}, | ||
onUpdate: (core, cb) => { | ||
events.on('update', cb) | ||
}, | ||
storeState: (state, cb) => { | ||
state = state.toString('base64') | ||
db.put('state', state, cb) | ||
}, | ||
fetchState: (cb) => { | ||
db.get('state', function (err, state) { | ||
if (err && err.notFound) cb() | ||
else if (err) cb(err) | ||
else cb(null, Buffer.from(state, 'base64')) | ||
}) | ||
}, | ||
events | ||
} | ||
function indexed (msgs) { | ||
msgs.forEach((msg) => { | ||
events.emit('update', msg) | ||
}) | ||
} | ||
return view | ||
} |
{ | ||
"name": "kappa-view-query", | ||
"version": "2.0.9", | ||
"description": "", | ||
"version": "3.0.0-alpha1", | ||
"description": "define your own indexes and execute map-filter-reduce queries over a set of hypercores using kappa-core", | ||
"main": "index.js", | ||
"repository": { | ||
"type": "git", | ||
"url": "git+https://github.com/kappa-db/kappa-view-query.git" | ||
}, | ||
"scripts": { | ||
@@ -23,9 +27,12 @@ "test": "tape test/**/*.test.js | tap-spec", | ||
"collect-stream": "^1.2.1", | ||
"kappa-core": "^3.0.2", | ||
"hypercore": "^8.4.1", | ||
"kappa-core": "github:Frando/kappa-core#kappa5-new", | ||
"level": "^5.0.1", | ||
"memdb": "^1.3.1", | ||
"mkdirp": "^0.5.1", | ||
"multifeed": "5.1.1", | ||
"nyc": "^14.1.1", | ||
"random-access-memory": "^3.1.1", | ||
"rimraf": "^2.6.3", | ||
"subleveldown": "^4.1.4", | ||
"tap-spec": "^5.0.0", | ||
@@ -32,0 +39,0 @@ "tape": "^4.11.0", |
176
README.md
# Kappa View Query | ||
`kappa-view-query` is a materialised view to be used with kappa-core. It provides an API that allows you to define your own indexes and execute custom [map-filter-reduce](https://github.com/dominictarr/map-filter-reduce) queries over a collection of hypercores. | ||
`kappa-view-query` is a materialised view to be used with kappa-core. It provides an API that allows you to define your own indexes and execute custom [map-filter-reduce](https://github.com/dominictarr/map-filter-reduce) queries over your indexes. | ||
@@ -53,89 +53,115 @@ `kappa-view-query` is inspired by [flumeview-query](https://github.com/flumedb/flumeview-query). It uses the same scoring system for determining the most efficient index relevant to the provided query. | ||
### Hypercore | ||
This example uses a single hypercore and collects all messages at a given point in time. | ||
```js | ||
const kappa = require('kappa-core') | ||
const Query = require('./') | ||
const Kappa = require('kappa-core') | ||
const hypercore = require('hypercore') | ||
const ram = require('random-access-memory') | ||
const collect = require('collect-stream') | ||
const memdb = require('memdb') | ||
const sub = require('subleveldown') | ||
// Initialised your kappa-core back-end | ||
const core = kappa(ram, { valueEncoding: 'json' }) | ||
const Query = require('./') | ||
const { validator, fromHypercore } = require('./util') | ||
const { cleaup, tmp } = require('./test/util') | ||
const seedData = require('./test/seeds.json') | ||
const core = new Kappa() | ||
const feed = hypercore(ram, { valueEncoding: 'json' }) | ||
const db = memdb() | ||
// Define a validator or a message decoder to determine if a message should be indexed or not | ||
function validator (msg) { | ||
if (typeof msg !== 'object') return null | ||
if (typeof msg.value !== 'object') return null | ||
if (typeof msg.value.timestamp !== 'number') return null | ||
if (typeof msg.value.type !== 'string') return null | ||
return msg | ||
} | ||
core.use('query', createHypercoreSource({ feed, db: sub(db, 'state') }), Query(sub(db, 'view'), { | ||
indexes: [ | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] } | ||
], | ||
// you can pass a custom validator function to ensure all messages entering a feed match a specific format | ||
validator, | ||
// implement your own getMessage function, and perform any desired validation on each message returned by the query | ||
getMessage: fromHypercore(feed) | ||
})) | ||
// here's an alternative using protocol buffers, assuming a message schema exists | ||
const { Message } = protobuf(fs.readFileSync(path.join(path.resolve(__dirname), 'message.proto'))) | ||
feed.append(seedData, (err, _) => { | ||
core.ready('query', () => { | ||
const query = [{ $filter: { value: { type: 'chat/message' } }] | ||
function validator (msg) { | ||
try { msg.value = Message.decode(msg.value) } | ||
catch (err) { return console.error(err) && false } | ||
return msg | ||
} | ||
// grab then log all chat/message message types up until this point | ||
collect(core.view.query.read({ query }), (err, chats) => { | ||
if (err) return console.error(err) | ||
console.log(chats) | ||
// Define a set of indexes under a namespace | ||
const indexes = [ | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }, | ||
{ key: 'cha', value: [['value', 'type'], ['value', 'content', 'channel']] } | ||
] | ||
// grab then log all user/about message types up until this point | ||
collect(core.view.query.read({ query: [{ $filter: { value: { type: 'user/about' } } }] }), (err, users) => { | ||
if (err) return console.error(err) | ||
console.log(users) | ||
}) | ||
}) | ||
}) | ||
}) | ||
``` | ||
core.use('query', Query(db, { indexes, validator })) | ||
### Multifeed | ||
core.writer('local', (err, feed) => { | ||
// Populate our feed with some messages | ||
const data = [{ | ||
type: 'chat/message', | ||
timestamp: Date.now(), | ||
content: { body: 'Hi im new here...' } | ||
}, { | ||
type: 'user/about', | ||
timestamp: Date.now(), | ||
content: { name: 'Grace' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now(), | ||
content: { body: 'Second post' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now(), | ||
content: { channel: 'dogs', body: 'Lurchers rule' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now(), | ||
content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' } | ||
}, { | ||
type: 'user/about', | ||
timestamp: Date.now(), | ||
content: { name: 'Poison Ivy' } | ||
}] | ||
This example uses a multifeed instance for managing hypercores and sets up two live streams to dump messages to the console as they arrive. | ||
feed.append(data, (err, seq) => { | ||
// Define a query: filter where the message value contains type 'chat/message', and the content references the channel 'dogs' | ||
const query = [{ $filter: { value: { type: 'chat/message', content: { channel: 'dogs' } } } }] | ||
```js | ||
const Kappa = require('kappa-core') | ||
const multifeed = require('multifeed') | ||
const ram = require('random-access-memory') | ||
const collect = require('collect-stream') | ||
const memdb = require('memdb') | ||
const sub = require('subleveldown') | ||
core.ready('query', () => { | ||
// For static queries | ||
collect(core.api.query.read({ query }), (err, msgs) => { | ||
console.log(msgs) | ||
const Query = require('./') | ||
const { validator, fromMultifeed } = require('./util') | ||
const { cleaup, tmp } = require('./test/util') | ||
// Logs all messages of type chat/message that reference the dogs channel, and order by timestamp... | ||
// { | ||
// type: 'chat/message', | ||
// timestamp: 1561996331743, | ||
// content: { channel: 'dogs', body: 'Lurchers rule' } | ||
// } | ||
// { | ||
// type: 'chat/message', | ||
// timestamp: Date.now(), | ||
// content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' } | ||
// } | ||
}) | ||
const seedData = require('./test/seeds.json') | ||
const core = new Kappa() | ||
const feeds = multifeed(ram, { valueEncoding: 'json' }) | ||
const db = memdb() | ||
core.use('query', createMultifeedSource({ feeds, db: sub(db, 'state') }), Query(sub(db, 'view'), { | ||
indexes: [ | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] } | ||
], | ||
validator, | ||
// make sure you define your own getMessage function, otherwise nothing will be returned by your queries | ||
getMessage: fromMultifeed(feeds) | ||
})) | ||
core.ready('query', () => { | ||
// setup a live query to first log all chat/message | ||
core.view.query.ready({ | ||
query: [{ $filter: { value: { type: 'chat/message' } } }], | ||
live: true, | ||
old: false | ||
}).on('data', (msg) => { | ||
if (msg.sync) return next() | ||
console.log(msg) | ||
}) | ||
function next () { | ||
// then to first log all user/about | ||
core.view.query.read({ | ||
query: [{ $filter: { value: { type: 'user/about' } } }], | ||
live: true, | ||
old: false | ||
}).on('data', (msg) => { | ||
console.log(msg) | ||
}) | ||
} | ||
}) | ||
// then append a bunch of data to two different feeds in a multifeed | ||
feeds.writer('one', (err, one) => { | ||
feeds.writer('two', (err, two) => { | ||
one.append(seedData.slice(0, 3)) | ||
two.append(seedData.slice(3, 5)) | ||
}) | ||
@@ -152,2 +178,3 @@ }) | ||
Expects a LevelUP or LevelDOWN instance `leveldb`. | ||
Expects a `getMessage` function to use your defined index to grab the message from the feed. | ||
@@ -185,1 +212,4 @@ ```js | ||
- Fixed an outstanding issue where messages with a matching timestamp were colliding, where the last indexed would over-writing the previous. Messages are now indexed, on top of provided values, on the sequence number and the feed id, for guaranteed uniqueness. | ||
### 3.0.0 (not yet released) | ||
- Updated to use the experimental version of kappa-core which includes breaking API changes. See Frando's [kappa-core fork](https://github.com/Frando/kappa-core/tree/kappa5-new). |
const { describe } = require('tape-plus') | ||
const kappa = require('kappa-core') | ||
const Query = require('../') | ||
const Kappa = require('kappa-core') | ||
const createMultifeedSource = require('kappa-core/sources/multifeed') | ||
const createHypercoreSource = require('kappa-core/sources/hypercore') | ||
const multifeed = require('multifeed') | ||
const hypercore = require('hypercore') | ||
const ram = require('random-access-memory') | ||
const memdb = require('memdb') | ||
const sub = require('subleveldown') | ||
const level = require('level') | ||
@@ -11,28 +15,23 @@ const collect = require('collect-stream') | ||
const seeds = require('./seeds.json') | ||
.sort((a, b) => a.timestamp > b.timestamp ? +1 : -1) | ||
const Query = require('../') | ||
const drive = require('./drive.json') | ||
.sort((a, b) => a.timestamp > b.timestamp ? +1 : -1) | ||
const { fromMultifeed, fromHypercore } = require('../util') | ||
const { cleanup, tmp, replicate } = require('./util') | ||
describe('basic', (context) => { | ||
let core, db, indexes | ||
const seeds = require('./seeds.json').sort((a, b) => a.timestamp > b.timestamp ? +1 : -1) | ||
const drive = require('./drive.json').sort((a, b) => a.timestamp > b.timestamp ? +1 : -1) | ||
context.beforeEach((c) => { | ||
core = kappa(ram, { valueEncoding: 'json' }) | ||
db = memdb() | ||
describe('hypercore', (context) => { | ||
context('perform a query', (assert, next) => { | ||
var core = new Kappa() | ||
var feed = hypercore(ram, { valueEncoding: 'json' }) | ||
var db = memdb() | ||
indexes = [ | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }, | ||
{ key: 'fil', value: [['value', 'filename'], ['value', 'timestamp']] } | ||
] | ||
var source = createHypercoreSource({ feed, db: sub(db, 'state') }) | ||
core.use('query', source, Query(sub(db, 'view'), { | ||
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }], | ||
getMessage: fromHypercore(feed) | ||
})) | ||
core.use('query', Query(db, { indexes })) | ||
}) | ||
context('perform a query', (assert, next) => { | ||
core.writer('local', (err, feed) => { | ||
core.ready('query', () => { | ||
feed.append(seeds, (err, _) => { | ||
@@ -44,3 +43,3 @@ assert.error(err, 'no error') | ||
core.ready('query', () => { | ||
collect(core.api.query.read({ query }), (err, msgs) => { | ||
collect(core.view.query.read({ query }), (err, msgs) => { | ||
var check = seeds.filter((msg) => msg.type === 'chat/message') | ||
@@ -57,15 +56,23 @@ | ||
context('get all messages', (assert, next) => { | ||
core.writer('local', (err, feed) => { | ||
feed.append(seeds, (err, _) => { | ||
assert.error(err, 'no error') | ||
var core = new Kappa() | ||
var feed = hypercore(ram, { valueEncoding: 'json' }) | ||
var db = memdb() | ||
let query = [{ $filter: { value: { timestamp: { $gt: 0 } } } }] | ||
var source = createHypercoreSource({ feed, db: sub(db, 'state') }) | ||
core.use('query', source, Query(sub(db, 'view'), { | ||
indexes: [{ key: 'log', value: [['value', 'timestamp']] }], | ||
getMessage: fromHypercore(feed) | ||
})) | ||
core.ready('query', () => { | ||
collect(core.api.query.read({ query }), (err, msgs) => { | ||
var check = seeds | ||
assert.equal(msgs.length, check.length, 'gets the same number of messages') | ||
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index') | ||
next() | ||
}) | ||
feed.append(seeds, (err, _) => { | ||
assert.error(err, 'no error') | ||
let query = [{ $filter: { value: { timestamp: { $gt: 0 } } } }] | ||
core.ready('query', () => { | ||
collect(core.view.query.read({ query }), (err, msgs) => { | ||
var check = seeds | ||
assert.equal(msgs.length, check.length, 'gets the same number of messages') | ||
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index') | ||
next() | ||
}) | ||
@@ -76,23 +83,34 @@ }) | ||
context('fil index - get all changes to a specific file, then get all changes to all files', (assert, next) => { | ||
core.writer('local', (err, feed) => { | ||
feed.append(drive, (err, _) => { | ||
assert.error(err, 'no error') | ||
let filename = 'hello.txt' | ||
let helloQuery = [{ $filter: { value: { filename, timestamp: { $gt: 0 } } } }] | ||
context('fil (used by cobox state feed)', (assert, next) => { | ||
var core = new Kappa() | ||
var feed = hypercore(ram, { valueEncoding: 'json' }) | ||
var db = memdb() | ||
core.ready('query', () => { | ||
collect(core.api.query.read({ query: helloQuery }), (err, msgs) => { | ||
var check = drive.filter((msg) => msg.filename === filename) | ||
var source = createHypercoreSource({ feed, db: sub(db, 'state') }) | ||
core.use('query', source, Query(sub(db, 'view'), { | ||
indexes: [ | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'fil', value: [['value', 'filename']] }, | ||
], | ||
getMessage: fromHypercore(feed) | ||
})) | ||
feed.append(drive, (err, _) => { | ||
assert.error(err, 'no error') | ||
let filename = 'hello.txt' | ||
let helloQuery = [{ $filter: { value: { filename, timestamp: { $gt: 0 } } } }] | ||
core.ready('query', () => { | ||
collect(core.view.query.read({ query: helloQuery }), (err, msgs) => { | ||
var check = drive.filter((msg) => msg.filename === filename) | ||
assert.equal(msgs.length, check.length, 'gets the same number of messages') | ||
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index') | ||
let fileQuery = [{ $filter: { value: { timestamp: { $gt: 0 } } } }] | ||
collect(core.view.query.read({ query: fileQuery }), (err, msgs) => { | ||
var check = drive | ||
assert.equal(msgs.length, check.length, 'gets the same number of messages') | ||
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index') | ||
let fileQuery = [{ $filter: { value: { timestamp: { $gt: 0 } } } }] | ||
collect(core.api.query.read({ query: fileQuery }), (err, msgs) => { | ||
var check = drive | ||
assert.equal(msgs.length, check.length, 'gets the same number of messages') | ||
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index') | ||
next() | ||
}) | ||
next() | ||
}) | ||
@@ -105,30 +123,37 @@ }) | ||
context('live', (assert, next) => { | ||
core.writer('local', (err, feed) => { | ||
var core = new Kappa() | ||
var feed = hypercore(ram, { valueEncoding: 'json' }) | ||
var db = memdb() | ||
var source = createHypercoreSource({ feed, db: sub(db, 'state') }) | ||
core.use('query', source, Query(sub(db, 'view'), { | ||
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }], | ||
getMessage: fromHypercore(feed) | ||
})) | ||
feed.append(seeds.slice(0, 2), (err, _) => { | ||
assert.error(err, 'no error') | ||
feed.append(seeds.slice(0, 2), (err, _) => { | ||
assert.error(err, 'no error') | ||
let count = 0 | ||
let check = seeds.filter((msg) => msg.type === 'chat/message') | ||
let count = 0 | ||
let check = seeds.filter((msg) => msg.type === 'chat/message') | ||
let query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
let query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
core.ready('query', () => { | ||
var stream = core.api.query.read({ live: true, query }) | ||
core.ready('query', () => { | ||
var stream = core.view.query.read({ live: true, query }) | ||
stream.on('data', (msg) => { | ||
if (msg.sync) return done() | ||
assert.same(check[count], msg.value, 'streams each message live') | ||
++count | ||
done() | ||
}) | ||
stream.on('data', (msg) => { | ||
if (msg.sync) return done() | ||
assert.same(check[count], msg.value, 'streams each message live') | ||
++count | ||
done() | ||
}) | ||
feed.append(seeds.slice(3, 5), (err, _) => { | ||
assert.error(err, 'no error') | ||
}) | ||
feed.append(seeds.slice(3, 5), (err, _) => { | ||
assert.error(err, 'no error') | ||
}) | ||
function done (err) { | ||
if (count === check.length) return next() | ||
} | ||
}) | ||
function done (err) { | ||
if (count === check.length) return next() | ||
} | ||
}) | ||
@@ -139,19 +164,16 @@ }) | ||
describe('multiple feeds', (context) => { | ||
let core, db | ||
let name1, name2 | ||
describe('multifeed', (context) => { | ||
context('aggregates all feeds', (assert, next) => { | ||
var core = new Kappa() | ||
var feeds = multifeed(ram, { valueEncoding: 'json' }) | ||
var db = memdb() | ||
context.beforeEach((c) => { | ||
core = kappa(ram, { valueEncoding: 'json' }) | ||
db = memdb() | ||
var source = createMultifeedSource({ feeds, db: sub(db, 'state') }) | ||
core.use('query', source, Query(sub(db, 'view'), { | ||
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }], | ||
getMessage: fromMultifeed(feeds) | ||
})) | ||
indexes = [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }] | ||
name1 = crypto.randomBytes(16).toString('hex') | ||
name2 = crypto.randomBytes(16).toString('hex') | ||
core.use('query', Query(db, { indexes })) | ||
}) | ||
context('aggregates all feeds', (assert, next) => { | ||
var name1 = crypto.randomBytes(16).toString('hex') | ||
var name2 = crypto.randomBytes(16).toString('hex') | ||
var query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
@@ -168,6 +190,6 @@ var timestamp = Date.now() | ||
debug(`initialised feed1: ${feed1.key.toString('hex')} feed2: ${feed2.key.toString('hex')}`) | ||
assert.same(2, core.feeds().length, 'two local feeds') | ||
assert.same(2, feeds.feeds().length, 'two local feeds') | ||
core.ready('query', () => { | ||
collect(core.api.query.read({ query }), (err, msgs) => { | ||
collect(core.view.query.read({ query }), (err, msgs) => { | ||
assert.error(err, 'no error') | ||
@@ -186,3 +208,3 @@ assert.ok(msgs.length === 2, 'returns two messages') | ||
function setup (name, cb) { | ||
core.writer(name, (err, feed) => { | ||
feeds.writer(name, (err, feed) => { | ||
feed.append({ | ||
@@ -201,2 +223,14 @@ type: 'chat/message', | ||
context('aggregates all feeds, colliding timestamps', (assert, next) => { | ||
var core = new Kappa() | ||
var feeds = multifeed(ram, { valueEncoding: 'json' }) | ||
var db = memdb() | ||
var source = createMultifeedSource({ feeds, db: sub(db, 'state') }) | ||
core.use('query', source, Query(sub(db, 'view'), { | ||
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }], | ||
getMessage: fromMultifeed(feeds) | ||
})) | ||
var name1 = crypto.randomBytes(16).toString('hex') | ||
var name2 = crypto.randomBytes(16).toString('hex') | ||
var query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
@@ -211,3 +245,3 @@ var timestamp = Date.now() | ||
core.ready('query', () => { | ||
collect(core.api.query.read({ query }), (err, msgs) => { | ||
collect(core.view.query.read({ query }), (err, msgs) => { | ||
assert.error(err, 'no error') | ||
@@ -226,3 +260,3 @@ assert.ok(msgs.length === 2, 'returns two messages') | ||
function setup (name, cb) { | ||
core.writer(name1, (err, feed) => { | ||
feeds.writer(name1, (err, feed) => { | ||
feed.append({ | ||
@@ -240,9 +274,21 @@ type: 'chat/message', | ||
context('live', (assert, next) => { | ||
var core = new Kappa() | ||
var feeds = multifeed(ram, { valueEncoding: 'json' }) | ||
var db = memdb() | ||
var source = createMultifeedSource({ feeds, db: sub(db, 'state') }) | ||
core.use('query', source, Query(sub(db, 'view'), { | ||
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }], | ||
getMessage: fromMultifeed(feeds) | ||
})) | ||
var name1 = crypto.randomBytes(16).toString('hex') | ||
var name2 = crypto.randomBytes(16).toString('hex') | ||
var query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
let timestamp = Date.now() | ||
core.writer(name1, (err, feed1) => { | ||
feeds.writer(name1, (err, feed1) => { | ||
assert.error(err, 'no error') | ||
core.writer(name2, (err, feed2) => { | ||
feeds.writer(name2, (err, feed2) => { | ||
assert.error(err, 'no error') | ||
@@ -256,3 +302,3 @@ | ||
core.ready('query', () => { | ||
var stream = core.api.query.read({ live: true, old: false, query }) | ||
var stream = core.view.query.read({ live: true, old: false, query }) | ||
@@ -288,17 +334,23 @@ stream.on('data', (msg) => { | ||
describe('multiple cores', (context) => { | ||
let core1, db1 | ||
let core2, db2 | ||
describe('multiple multifeeds', (context) => { | ||
context('aggregates all valid messages from all feeds when querying', (assert, next) => { | ||
var core1 = new Kappa() | ||
var core2 = new Kappa() | ||
var feeds1 = multifeed(ram, { valueEncoding: 'json' }) | ||
var feeds2 = multifeed(ram, { valueEncoding: 'json' }) | ||
var db1 = memdb() | ||
var db2 = memdb() | ||
context.beforeEach((c) => { | ||
core1 = kappa(ram, { valueEncoding: 'json' }) | ||
core2 = kappa(ram, { valueEncoding: 'json' }) | ||
var source1 = createMultifeedSource({ feeds: feeds1, db: sub(db1, 'state') }) | ||
core1.use('query', source1, Query(sub(db1, 'view'), { | ||
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }], | ||
getMessage: fromMultifeed(feeds1) | ||
})) | ||
indexes = [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }] | ||
var source2 = createMultifeedSource({ feeds: feeds2, db: sub(db2, 'state') }) | ||
core2.use('query', source2, Query(sub(db2, 'view'), { | ||
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }], | ||
getMessage: fromMultifeed(feeds2) | ||
})) | ||
core1.use('query', Query(memdb(), { indexes })) | ||
core2.use('query', Query(memdb(), { indexes })) | ||
}) | ||
context('aggregates all valid messages from all feeds when querying', (assert, next) => { | ||
var query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
@@ -308,23 +360,23 @@ var timestamp = Date.now() | ||
setup(core1, (err, feed1) => { | ||
setup(feeds1, (err, feed1) => { | ||
assert.error(err, 'no error') | ||
setup(core2, (err, feed2) => { | ||
setup(feeds2, (err, feed2) => { | ||
assert.error(err, 'no error') | ||
debug(`initialised core1: ${feed1.key.toString('hex')} core2: ${feed2.key.toString('hex')}`) | ||
assert.same(1, core1.feeds().length, 'one feed') | ||
assert.same(1, core2.feeds().length, 'one feed') | ||
assert.same(1, feeds1.feeds().length, 'one feed') | ||
assert.same(1, feeds2.feeds().length, 'one feed') | ||
core1.ready('query', () => { | ||
collect(core1.api.query.read({ query }), (err, msgs) => { | ||
collect(core1.view.query.read({ query }), (err, msgs) => { | ||
assert.error(err, 'no error') | ||
assert.ok(msgs.length === 1, 'returns a single message') | ||
replicate(core1, core2, (err) => { | ||
replicate(feeds1, feeds2, (err) => { | ||
assert.error(err, 'no error') | ||
assert.same(2, core1.feeds().length, `first core has second core's feed`) | ||
assert.same(2, core2.feeds().length, `second core has first core's feed`) | ||
assert.same(2, feeds1.feeds().length, `first core has second core's feed`) | ||
assert.same(2, feeds2.feeds().length, `second core has first core's feed`) | ||
core2.ready('query', () => { | ||
collect(core2.api.query.read({ query }), (err, msgs) => { | ||
collect(core2.view.query.read({ query }), (err, msgs) => { | ||
assert.error(err, 'no error') | ||
@@ -345,4 +397,4 @@ assert.ok(msgs.length === 2, 'returns two messages') | ||
function setup (kcore, cb) { | ||
kcore.writer('local', (err, feed) => { | ||
function setup (multifeed, cb) { | ||
multifeed.writer('local', (err, feed) => { | ||
feed.append({ | ||
@@ -360,2 +412,21 @@ type: 'chat/message', | ||
context('live', (assert, next) => { | ||
var core1 = new Kappa() | ||
var core2 = new Kappa() | ||
var feeds1 = multifeed(ram, { valueEncoding: 'json' }) | ||
var feeds2 = multifeed(ram, { valueEncoding: 'json' }) | ||
var db1 = memdb() | ||
var db2 = memdb() | ||
var source1 = createMultifeedSource({ feeds: feeds1, db: sub(db1, 'state') }) | ||
core1.use('query', source1, Query(sub(db1, 'view'), { | ||
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }], | ||
getMessage: fromMultifeed(feeds1) | ||
})) | ||
var source2 = createMultifeedSource({ feeds: feeds2, db: sub(db2, 'state') }) | ||
core2.use('query', source2, Query(sub(db2, 'view'), { | ||
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }], | ||
getMessage: fromMultifeed(feeds2) | ||
})) | ||
let query = [{ $filter: { value: { type: 'user/about' } } }] | ||
@@ -370,6 +441,6 @@ let timestamp = Date.now() | ||
setup(core1, (err, feed1) => { | ||
setup(feeds1, (err, feed1) => { | ||
assert.error(err, 'no error') | ||
setup(core2, (err, feed2) => { | ||
setup(feeds2, (err, feed2) => { | ||
assert.error(err, 'no error') | ||
@@ -381,3 +452,3 @@ let core1ready, core2ready | ||
core1.ready('query', () => { | ||
let stream1 = core1.api.query.read({ live: true, old: false, query }) | ||
let stream1 = core1.view.query.read({ live: true, old: false, query }) | ||
@@ -396,3 +467,3 @@ stream1.on('data', (msg) => { | ||
core2.ready('query', () => { | ||
let stream2 = core2.api.query.read({ live: true, old: false, query }) | ||
let stream2 = core2.view.query.read({ live: true, old: false, query }) | ||
@@ -420,6 +491,6 @@ stream2.on('data', (msg) => { | ||
debug('replicating') | ||
replicate(core1, core2, (err) => { | ||
replicate(feeds1, feeds2, (err) => { | ||
assert.error(err, 'no error') | ||
assert.same(2, core1.feeds().length, `first core has replicated second core's feed`) | ||
assert.same(2, core2.feeds().length, `second core has replicated first core's feed`) | ||
assert.same(2, feeds1.feeds().length, `first core has replicated second core's feed`) | ||
assert.same(2, feeds2.feeds().length, `second core has replicated first core's feed`) | ||
}) | ||
@@ -436,6 +507,6 @@ }) | ||
function setup (kcore, cb) { | ||
kcore.writer('local', cb) | ||
function setup (multifeed, cb) { | ||
multifeed.writer('local', cb) | ||
} | ||
}) | ||
}) |
@@ -46,6 +46,16 @@ const rimraf = require('rimraf') | ||
function replicate (core1, core2, cb) { | ||
var stream = core1.replicate() | ||
stream.pipe(core2.replicate()).pipe(stream) | ||
stream.on('end', cb) | ||
function replicate (a, b, opts, cb) { | ||
if (typeof opts === 'function') return replicate(a, b, {}, opts) | ||
if (!cb) cb = noop | ||
var s = a.replicate(true, Object.assign({ live: false }, opts)) | ||
var d = b.replicate(false, Object.assign({ live: false }, opts)) | ||
s.pipe(d).pipe(s) | ||
s.on('error', (err) => { | ||
if (err) return cb(err) | ||
}) | ||
s.on('end', cb) | ||
} | ||
@@ -52,0 +62,0 @@ |
51
util.js
@@ -41,6 +41,2 @@ var deepEqual = require('deep-equal') | ||
function isFunction (variable) { | ||
return typeof variable === 'function' | ||
} | ||
function validator (msg) { | ||
@@ -50,6 +46,46 @@ if (typeof msg !== 'object') return null | ||
if (typeof msg.value.timestamp !== 'number') return null | ||
if (typeof msg.value.type !== 'string') return null | ||
return msg | ||
} | ||
function fromMultifeed (feeds, opts = {}) { | ||
var validate = opts.validator || function (msg) { return msg } | ||
return function getMessage (msg, cb) { | ||
var msgId = msg.value | ||
var [ feedId, sequence ] = msgId.split('@') | ||
var feed = feeds.feed(feedId) | ||
var seq = Number(sequence) | ||
feed.get(seq, (err, value) => { | ||
var msg = validate({ | ||
key: feed.key.toString('hex'), | ||
seq, | ||
value | ||
}) | ||
if (!msg) return cb(new Error('message failed to validate')) | ||
return cb(null, msg) | ||
}) | ||
} | ||
} | ||
function fromHypercore (feed, opts = {}) { | ||
var validate = opts.validator || function (msg) { return msg } | ||
return function getMessage (msg, cb) { | ||
var msgId = msg.value | ||
var sequence = msgId.split('@')[1] | ||
var seq = Number(sequence) | ||
feed.get(seq, (err, value) => { | ||
var msg = validate({ | ||
key: feed.key.toString('hex'), | ||
seq, | ||
value | ||
}) | ||
if (!msg) return cb(new Error('message failed to validate')) | ||
return cb(null, msg) | ||
}) | ||
} | ||
} | ||
module.exports = { | ||
@@ -60,4 +96,5 @@ has, | ||
findByPath, | ||
isFunction, | ||
validator | ||
validator, | ||
fromMultifeed, | ||
fromHypercore | ||
} |
Sorry, the diff of this file is not supported yet
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
No bug tracker
MaintenancePackage does not have a linked bug tracker in package.json.
Found 1 instance in 1 package
No repository
Supply chain riskPackage does not have a linked source code repository. Without this field, a package will have no reference to the location of the source code use to generate the package.
Found 1 instance in 1 package
No website
QualityPackage does not have a website.
Found 1 instance in 1 package
148143
970
1
213
0
15