Socket
Socket
Sign inDemoInstall

kappa-view-query

Package Overview
Dependencies
124
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.0.9 to 3.0.0-alpha1

119

example.js

@@ -1,57 +0,44 @@

const kappa = require('kappa-core')
const Kappa = require('kappa-core')
const multifeed = require('multifeed')
const hypercore = require('hypercore')
const ram = require('random-access-memory')
const collect = require('collect-stream')
const memdb = require('memdb')
const sub = require('subleveldown')
const Query = require('./')
const { validator } = require('./util')
const { validator, fromMultifeed, fromHypercore } = require('./util')
const { cleaup, tmp } = require('./test/util')
const core = kappa(ram, { valueEncoding: 'json' })
const db = memdb()
const seedData = require('./test/seeds.json')
const indexes = [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] },
{ key: 'cha', value: [['value', 'type'], ['value', 'content', 'channel'], ['value', 'timestamp']] }
]
// An example using a single hypercore
function HypercoreExample () {
const core = new Kappa()
const feed = hypercore(ram, { valueEncoding: 'json' })
const db = memdb()
core.use('query', Query(db, { indexes, validator }))
core.use('query', createHypercoreSource({ feed, db: sub(db, 'state') }), Query(sub(db, 'view'), {
// define a set of indexes
indexes: [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }
],
// you can pass a custom validator function to ensure all messages entering a feed match a specific format
validator,
// implement your own getMessage function, and perform any desired validation on each message returned by the query
getMessage: fromHypercore(feed)
}))
core.writer('local', (err, feed) => {
const data = [{
type: 'chat/message',
timestamp: Date.now(),
content: { body: 'Hi im new here...' }
}, {
type: 'user/about',
timestamp: Date.now(),
content: { name: 'Grace' }
}, {
type: 'chat/message',
timestamp: Date.now(),
content: { body: 'Second post' }
}, {
type: 'chat/message',
timestamp: Date.now(),
content: { channel: 'dogs', body: 'Lurchers rule' }
}, {
type: 'chat/message',
timestamp: Date.now(),
content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' }
}, {
type: 'user/about',
timestamp: Date.now(),
content: { name: 'Poison Ivy' }
}]
feed.append(data, (err, _) => {
feed.append(seedData, (err, _) => {
core.ready('query', () => {
const query = [{ $filter: { value: { type: 'chat/message', content: { channel: 'dogs' } } } }]
const query = [{ $filter: { value: { type: 'chat/message' } } }]
collect(core.api.query.read({ query }), (err, chats) => {
// grab then log all chat/message message types up until this point
collect(core.view.query.read({ query }), (err, chats) => {
if (err) return console.error(err)
console.log(chats)
collect(core.api.query.read({ query: [{ $filter: { value: { type: 'user/about' } } }] }), (err, users) => {
// grab then log all user/about message types up until this point
collect(core.view.query.read({ query: [{ $filter: { value: { type: 'user/about' } } }] }), (err, users) => {
if (err) return console.error(err)

@@ -63,2 +50,50 @@ console.log(users)

})
})
}
// an example using multifeed for aggregating and querying all feeds
function MultifeedExample () {
const core = new Kappa()
const feeds = multifeed(ram, { valueEncoding: 'json' })
const db = memdb()
core.use('query', createMultifeedSource({ feeds, db: sub(db, 'state') }), Query(sub(db, 'view'), {
indexes: [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }
],
validator,
getMessage: fromMultifeed(feeds)
}))
core.ready('query', () => {
// setup a live query to first log all chat/message
core.view.query.ready({
query: [{ $filter: { value: { type: 'chat/message' } } }],
live: true,
old: false
}).on('data', (msg) => {
if (msg.sync) return next()
console.log(msg)
})
function next () {
// then to first log all user/about
core.view.query.read({
query: [{ $filter: { value: { type: 'user/about' } } }],
live: true,
old: false
}).on('data', (msg) => {
console.log(msg)
})
}
})
// then append a bunch of data to two different feeds in a multifeed
feeds.writer('one', (err, one) => {
feeds.writer('two', (err, two) => {
one.append(seedData.slice(0, 3))
two.append(seedData.slice(3, 5))
})
})
}

@@ -7,2 +7,3 @@ const through = require('through2')

const liveStream = require('level-live-stream')
const { isFunction } = require('util')

@@ -12,4 +13,2 @@ const Explain = require('./explain')

const { isFunction } = require('./util')
module.exports = function KappaViewQuery (db = memdb(), opts = {}) {

@@ -21,132 +20,112 @@ const events = new EventEmitter()

validator = (msg) => msg,
keyEncoding = charwise
keyEncoding = charwise,
getMessage,
} = opts
const view = {
return {
maxBatch: opts.maxBatch || 100,
map,
indexed,
api: {
read,
explain,
add,
onUpdate: (core, cb) => events.on('update', cb),
events
}
}
map: (msgs, next) => {
var ops = []
function explain (core, _opts) {
var expl = Explain(indexes.map((idx) => Object.assign(idx, {
exact: typeof idx.exact === 'boolean' ? idx.exact : false,
createStream: (__opts) => {
var thru = through.obj(function (msg, enc, next) {
if (msg.sync) {
this.push(msg)
return next()
}
msgs.forEach((msg) => {
msg = validator(msg)
if (!msg) return
getMessage(msg, (err, msg) => {
if (err) return next()
this.push(msg)
next()
})
})
indexes.forEach((idx) => {
var indexKeys = getIndexValues(msg, idx.value)
var streamOpts = Object.assign(__opts, {
lte: [idx.key, ...__opts.lte],
gte: [idx.key, ...__opts.gte],
keyEncoding,
keys: true,
values: true
})
if (indexKeys.length) {
ops.push({
type: 'put',
key: [idx.key, ...indexKeys, msg.seq, msg.key],
value: [msg.key, msg.seq].join('@'),
keyEncoding,
})
}
var stream = __opts.live
? liveStream(db, streamOpts)
: db.createReadStream(streamOpts)
function getIndexValues (msg, value) {
var child = value[0]
if (Array.isArray(child)) {
return value
.map((val) => getIndexValues(msg, val))
.reduce((acc, arr) => [...acc, ...arr], [])
.filter(Boolean)
} else if (typeof child === 'string') {
return [value.reduce((obj, val) => obj[val], msg)]
.filter(Boolean)
} else return []
}
})
})
stream.pipe(thru)
db.batch(ops, next)
},
indexed: (msgs) => {
debug(`indexing ${JSON.stringify(msgs, null, 2)}`)
return thru
}
})))
msgs.forEach((msg) => {
events.emit('update', msg)
})
},
api: {
read: (core, _opts) => {
var __opts = view.api.explain(core, _opts)
var source = __opts.createStream(__opts)
return Filter(source, _opts)
},
explain: (core, _opts) => {
var explain = Explain(indexes.map((idx) => Object.assign(idx, {
exact: typeof idx.exact === 'boolean' ? idx.exact : false,
createStream: (__opts) => {
var thru = through.obj(function (msg, enc, next) {
if (msg.sync) {
this.push(msg)
return next()
}
return expl(_opts)
}
var msgId = msg.value
var [ feedId, sequence ] = msgId.split('@')
var feed = core._logs.feed(feedId)
var seq = Number(sequence)
function read (core, _opts) {
var __opts = explain(core, _opts)
var source = __opts.createStream(__opts)
return Filter(source, _opts)
}
feed.get(seq, (err, value) => {
if (err) return next()
function add (core, _opts) {
var isValid = _opts && isFunction(_opts.createStream) && Array.isArray(_opts.index || _opts.value)
if(!isValid) throw new Error('kappa-view-query.add: expected opts { index, createStream }')
_opts.value = _opts.index || _opts.value
indexes.push(_opts)
}
var msg = validator({
key: feed.key.toString('hex'),
seq,
value
})
function map (msgs, next) {
var ops = []
if (!msg) return next()
this.push(msg)
next()
})
})
msgs.forEach((msg) => {
msg = validator(msg)
if (!msg) return
var streamOpts = Object.assign(__opts, {
lte: [idx.key, ...__opts.lte],
gte: [idx.key, ...__opts.gte],
keyEncoding,
keys: true,
values: true
})
indexes.forEach((idx) => {
var indexKeys = getIndexValues(msg, idx.value)
var stream = __opts.live
? liveStream(db, streamOpts)
: db.createReadStream(streamOpts)
if (indexKeys.length) {
ops.push({
type: 'put',
key: [idx.key, ...indexKeys, msg.seq, msg.key],
value: [msg.key, msg.seq].join('@'),
keyEncoding,
})
}
stream.pipe(thru)
function getIndexValues (msg, value) {
var child = value[0]
if (Array.isArray(child)) {
return value
.map((val) => getIndexValues(msg, val))
.reduce((acc, arr) => [...acc, ...arr], [])
.filter(Boolean)
} else if (typeof child === 'string') {
return [value.reduce((obj, val) => obj[val], msg)]
.filter(Boolean)
} else return []
}
})
})
return thru
}
})))
db.batch(ops, next)
}
return explain(_opts)
},
add: (core, _opts) => {
var isValid = _opts && isFunction(_opts.createStream) && Array.isArray(_opts.index || _opts.value)
if(!isValid) throw new Error('kappa-view-query.add: expected opts { index, createStream }')
_opts.value = _opts.index || _opts.value
indexes.push(_opts)
},
onUpdate: (core, cb) => {
events.on('update', cb)
},
storeState: (state, cb) => {
state = state.toString('base64')
db.put('state', state, cb)
},
fetchState: (cb) => {
db.get('state', function (err, state) {
if (err && err.notFound) cb()
else if (err) cb(err)
else cb(null, Buffer.from(state, 'base64'))
})
},
events
}
function indexed (msgs) {
msgs.forEach((msg) => {
events.emit('update', msg)
})
}
return view
}
{
"name": "kappa-view-query",
"version": "2.0.9",
"description": "",
"version": "3.0.0-alpha1",
"description": "define your own indexes and execute map-filter-reduce queries over a set of hypercores using kappa-core",
"main": "index.js",
"repository": {
"type": "git",
"url": "git+https://github.com/kappa-db/kappa-view-query.git"
},
"scripts": {

@@ -23,9 +27,12 @@ "test": "tape test/**/*.test.js | tap-spec",

"collect-stream": "^1.2.1",
"kappa-core": "^3.0.2",
"hypercore": "^8.4.1",
"kappa-core": "github:Frando/kappa-core#kappa5-new",
"level": "^5.0.1",
"memdb": "^1.3.1",
"mkdirp": "^0.5.1",
"multifeed": "5.1.1",
"nyc": "^14.1.1",
"random-access-memory": "^3.1.1",
"rimraf": "^2.6.3",
"subleveldown": "^4.1.4",
"tap-spec": "^5.0.0",

@@ -32,0 +39,0 @@ "tape": "^4.11.0",

# Kappa View Query
`kappa-view-query` is a materialised view to be used with kappa-core. It provides an API that allows you to define your own indexes and execute custom [map-filter-reduce](https://github.com/dominictarr/map-filter-reduce) queries over a collection of hypercores.
`kappa-view-query` is a materialised view to be used with kappa-core. It provides an API that allows you to define your own indexes and execute custom [map-filter-reduce](https://github.com/dominictarr/map-filter-reduce) queries over your indexes.

@@ -53,89 +53,115 @@ `kappa-view-query` is inspired by [flumeview-query](https://github.com/flumedb/flumeview-query). It uses the same scoring system for determining the most efficient index relevant to the provided query.

### Hypercore
This example uses a single hypercore and collects all messages at a given point in time.
```js
const kappa = require('kappa-core')
const Query = require('./')
const Kappa = require('kappa-core')
const hypercore = require('hypercore')
const ram = require('random-access-memory')
const collect = require('collect-stream')
const memdb = require('memdb')
const sub = require('subleveldown')
// Initialised your kappa-core back-end
const core = kappa(ram, { valueEncoding: 'json' })
const Query = require('./')
const { validator, fromHypercore } = require('./util')
const { cleaup, tmp } = require('./test/util')
const seedData = require('./test/seeds.json')
const core = new Kappa()
const feed = hypercore(ram, { valueEncoding: 'json' })
const db = memdb()
// Define a validator or a message decoder to determine if a message should be indexed or not
function validator (msg) {
if (typeof msg !== 'object') return null
if (typeof msg.value !== 'object') return null
if (typeof msg.value.timestamp !== 'number') return null
if (typeof msg.value.type !== 'string') return null
return msg
}
core.use('query', createHypercoreSource({ feed, db: sub(db, 'state') }), Query(sub(db, 'view'), {
indexes: [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }
],
// you can pass a custom validator function to ensure all messages entering a feed match a specific format
validator,
// implement your own getMessage function, and perform any desired validation on each message returned by the query
getMessage: fromHypercore(feed)
}))
// here's an alternative using protocol buffers, assuming a message schema exists
const { Message } = protobuf(fs.readFileSync(path.join(path.resolve(__dirname), 'message.proto')))
feed.append(seedData, (err, _) => {
core.ready('query', () => {
const query = [{ $filter: { value: { type: 'chat/message' } }]
function validator (msg) {
try { msg.value = Message.decode(msg.value) }
catch (err) { return console.error(err) && false }
return msg
}
// grab then log all chat/message message types up until this point
collect(core.view.query.read({ query }), (err, chats) => {
if (err) return console.error(err)
console.log(chats)
// Define a set of indexes under a namespace
const indexes = [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] },
{ key: 'cha', value: [['value', 'type'], ['value', 'content', 'channel']] }
]
// grab then log all user/about message types up until this point
collect(core.view.query.read({ query: [{ $filter: { value: { type: 'user/about' } } }] }), (err, users) => {
if (err) return console.error(err)
console.log(users)
})
})
})
})
```
core.use('query', Query(db, { indexes, validator }))
### Multifeed
core.writer('local', (err, feed) => {
// Populate our feed with some messages
const data = [{
type: 'chat/message',
timestamp: Date.now(),
content: { body: 'Hi im new here...' }
}, {
type: 'user/about',
timestamp: Date.now(),
content: { name: 'Grace' }
}, {
type: 'chat/message',
timestamp: Date.now(),
content: { body: 'Second post' }
}, {
type: 'chat/message',
timestamp: Date.now(),
content: { channel: 'dogs', body: 'Lurchers rule' }
}, {
type: 'chat/message',
timestamp: Date.now(),
content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' }
}, {
type: 'user/about',
timestamp: Date.now(),
content: { name: 'Poison Ivy' }
}]
This example uses a multifeed instance for managing hypercores and sets up two live streams to dump messages to the console as they arrive.
feed.append(data, (err, seq) => {
// Define a query: filter where the message value contains type 'chat/message', and the content references the channel 'dogs'
const query = [{ $filter: { value: { type: 'chat/message', content: { channel: 'dogs' } } } }]
```js
const Kappa = require('kappa-core')
const multifeed = require('multifeed')
const ram = require('random-access-memory')
const collect = require('collect-stream')
const memdb = require('memdb')
const sub = require('subleveldown')
core.ready('query', () => {
// For static queries
collect(core.api.query.read({ query }), (err, msgs) => {
console.log(msgs)
const Query = require('./')
const { validator, fromMultifeed } = require('./util')
const { cleaup, tmp } = require('./test/util')
// Logs all messages of type chat/message that reference the dogs channel, and order by timestamp...
// {
// type: 'chat/message',
// timestamp: 1561996331743,
// content: { channel: 'dogs', body: 'Lurchers rule' }
// }
// {
// type: 'chat/message',
// timestamp: Date.now(),
// content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' }
// }
})
const seedData = require('./test/seeds.json')
const core = new Kappa()
const feeds = multifeed(ram, { valueEncoding: 'json' })
const db = memdb()
core.use('query', createMultifeedSource({ feeds, db: sub(db, 'state') }), Query(sub(db, 'view'), {
indexes: [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }
],
validator,
// make sure you define your own getMessage function, otherwise nothing will be returned by your queries
getMessage: fromMultifeed(feeds)
}))
core.ready('query', () => {
// setup a live query to first log all chat/message
core.view.query.ready({
query: [{ $filter: { value: { type: 'chat/message' } } }],
live: true,
old: false
}).on('data', (msg) => {
if (msg.sync) return next()
console.log(msg)
})
function next () {
// then to first log all user/about
core.view.query.read({
query: [{ $filter: { value: { type: 'user/about' } } }],
live: true,
old: false
}).on('data', (msg) => {
console.log(msg)
})
}
})
// then append a bunch of data to two different feeds in a multifeed
feeds.writer('one', (err, one) => {
feeds.writer('two', (err, two) => {
one.append(seedData.slice(0, 3))
two.append(seedData.slice(3, 5))
})

@@ -152,2 +178,3 @@ })

Expects a LevelUP or LevelDOWN instance `leveldb`.
Expects a `getMessage` function to use your defined index to grab the message from the feed.

@@ -185,1 +212,4 @@ ```js

- Fixed an outstanding issue where messages with a matching timestamp were colliding, where the last indexed would over-writing the previous. Messages are now indexed, on top of provided values, on the sequence number and the feed id, for guaranteed uniqueness.
### 3.0.0 (not yet released)
- Updated to use the experimental version of kappa-core which includes breaking API changes. See Frando's [kappa-core fork](https://github.com/Frando/kappa-core/tree/kappa5-new).
const { describe } = require('tape-plus')
const kappa = require('kappa-core')
const Query = require('../')
const Kappa = require('kappa-core')
const createMultifeedSource = require('kappa-core/sources/multifeed')
const createHypercoreSource = require('kappa-core/sources/hypercore')
const multifeed = require('multifeed')
const hypercore = require('hypercore')
const ram = require('random-access-memory')
const memdb = require('memdb')
const sub = require('subleveldown')
const level = require('level')

@@ -11,28 +15,23 @@ const collect = require('collect-stream')

const seeds = require('./seeds.json')
.sort((a, b) => a.timestamp > b.timestamp ? +1 : -1)
const Query = require('../')
const drive = require('./drive.json')
.sort((a, b) => a.timestamp > b.timestamp ? +1 : -1)
const { fromMultifeed, fromHypercore } = require('../util')
const { cleanup, tmp, replicate } = require('./util')
describe('basic', (context) => {
let core, db, indexes
const seeds = require('./seeds.json').sort((a, b) => a.timestamp > b.timestamp ? +1 : -1)
const drive = require('./drive.json').sort((a, b) => a.timestamp > b.timestamp ? +1 : -1)
context.beforeEach((c) => {
core = kappa(ram, { valueEncoding: 'json' })
db = memdb()
describe('hypercore', (context) => {
context('perform a query', (assert, next) => {
var core = new Kappa()
var feed = hypercore(ram, { valueEncoding: 'json' })
var db = memdb()
indexes = [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] },
{ key: 'fil', value: [['value', 'filename'], ['value', 'timestamp']] }
]
var source = createHypercoreSource({ feed, db: sub(db, 'state') })
core.use('query', source, Query(sub(db, 'view'), {
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }],
getMessage: fromHypercore(feed)
}))
core.use('query', Query(db, { indexes }))
})
context('perform a query', (assert, next) => {
core.writer('local', (err, feed) => {
core.ready('query', () => {
feed.append(seeds, (err, _) => {

@@ -44,3 +43,3 @@ assert.error(err, 'no error')

core.ready('query', () => {
collect(core.api.query.read({ query }), (err, msgs) => {
collect(core.view.query.read({ query }), (err, msgs) => {
var check = seeds.filter((msg) => msg.type === 'chat/message')

@@ -57,15 +56,23 @@

context('get all messages', (assert, next) => {
core.writer('local', (err, feed) => {
feed.append(seeds, (err, _) => {
assert.error(err, 'no error')
var core = new Kappa()
var feed = hypercore(ram, { valueEncoding: 'json' })
var db = memdb()
let query = [{ $filter: { value: { timestamp: { $gt: 0 } } } }]
var source = createHypercoreSource({ feed, db: sub(db, 'state') })
core.use('query', source, Query(sub(db, 'view'), {
indexes: [{ key: 'log', value: [['value', 'timestamp']] }],
getMessage: fromHypercore(feed)
}))
core.ready('query', () => {
collect(core.api.query.read({ query }), (err, msgs) => {
var check = seeds
assert.equal(msgs.length, check.length, 'gets the same number of messages')
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index')
next()
})
feed.append(seeds, (err, _) => {
assert.error(err, 'no error')
let query = [{ $filter: { value: { timestamp: { $gt: 0 } } } }]
core.ready('query', () => {
collect(core.view.query.read({ query }), (err, msgs) => {
var check = seeds
assert.equal(msgs.length, check.length, 'gets the same number of messages')
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index')
next()
})

@@ -76,23 +83,34 @@ })

context('fil index - get all changes to a specific file, then get all changes to all files', (assert, next) => {
core.writer('local', (err, feed) => {
feed.append(drive, (err, _) => {
assert.error(err, 'no error')
let filename = 'hello.txt'
let helloQuery = [{ $filter: { value: { filename, timestamp: { $gt: 0 } } } }]
context('fil (used by cobox state feed)', (assert, next) => {
var core = new Kappa()
var feed = hypercore(ram, { valueEncoding: 'json' })
var db = memdb()
core.ready('query', () => {
collect(core.api.query.read({ query: helloQuery }), (err, msgs) => {
var check = drive.filter((msg) => msg.filename === filename)
var source = createHypercoreSource({ feed, db: sub(db, 'state') })
core.use('query', source, Query(sub(db, 'view'), {
indexes: [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'fil', value: [['value', 'filename']] },
],
getMessage: fromHypercore(feed)
}))
feed.append(drive, (err, _) => {
assert.error(err, 'no error')
let filename = 'hello.txt'
let helloQuery = [{ $filter: { value: { filename, timestamp: { $gt: 0 } } } }]
core.ready('query', () => {
collect(core.view.query.read({ query: helloQuery }), (err, msgs) => {
var check = drive.filter((msg) => msg.filename === filename)
assert.equal(msgs.length, check.length, 'gets the same number of messages')
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index')
let fileQuery = [{ $filter: { value: { timestamp: { $gt: 0 } } } }]
collect(core.view.query.read({ query: fileQuery }), (err, msgs) => {
var check = drive
assert.equal(msgs.length, check.length, 'gets the same number of messages')
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index')
let fileQuery = [{ $filter: { value: { timestamp: { $gt: 0 } } } }]
collect(core.api.query.read({ query: fileQuery }), (err, msgs) => {
var check = drive
assert.equal(msgs.length, check.length, 'gets the same number of messages')
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index')
next()
})
next()
})

@@ -105,30 +123,37 @@ })

context('live', (assert, next) => {
core.writer('local', (err, feed) => {
var core = new Kappa()
var feed = hypercore(ram, { valueEncoding: 'json' })
var db = memdb()
var source = createHypercoreSource({ feed, db: sub(db, 'state') })
core.use('query', source, Query(sub(db, 'view'), {
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }],
getMessage: fromHypercore(feed)
}))
feed.append(seeds.slice(0, 2), (err, _) => {
assert.error(err, 'no error')
feed.append(seeds.slice(0, 2), (err, _) => {
assert.error(err, 'no error')
let count = 0
let check = seeds.filter((msg) => msg.type === 'chat/message')
let count = 0
let check = seeds.filter((msg) => msg.type === 'chat/message')
let query = [{ $filter: { value: { type: 'chat/message' } } }]
let query = [{ $filter: { value: { type: 'chat/message' } } }]
core.ready('query', () => {
var stream = core.api.query.read({ live: true, query })
core.ready('query', () => {
var stream = core.view.query.read({ live: true, query })
stream.on('data', (msg) => {
if (msg.sync) return done()
assert.same(check[count], msg.value, 'streams each message live')
++count
done()
})
stream.on('data', (msg) => {
if (msg.sync) return done()
assert.same(check[count], msg.value, 'streams each message live')
++count
done()
})
feed.append(seeds.slice(3, 5), (err, _) => {
assert.error(err, 'no error')
})
feed.append(seeds.slice(3, 5), (err, _) => {
assert.error(err, 'no error')
})
function done (err) {
if (count === check.length) return next()
}
})
function done (err) {
if (count === check.length) return next()
}
})

@@ -139,19 +164,16 @@ })

describe('multiple feeds', (context) => {
let core, db
let name1, name2
describe('multifeed', (context) => {
context('aggregates all feeds', (assert, next) => {
var core = new Kappa()
var feeds = multifeed(ram, { valueEncoding: 'json' })
var db = memdb()
context.beforeEach((c) => {
core = kappa(ram, { valueEncoding: 'json' })
db = memdb()
var source = createMultifeedSource({ feeds, db: sub(db, 'state') })
core.use('query', source, Query(sub(db, 'view'), {
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }],
getMessage: fromMultifeed(feeds)
}))
indexes = [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }]
name1 = crypto.randomBytes(16).toString('hex')
name2 = crypto.randomBytes(16).toString('hex')
core.use('query', Query(db, { indexes }))
})
context('aggregates all feeds', (assert, next) => {
var name1 = crypto.randomBytes(16).toString('hex')
var name2 = crypto.randomBytes(16).toString('hex')
var query = [{ $filter: { value: { type: 'chat/message' } } }]

@@ -168,6 +190,6 @@ var timestamp = Date.now()

debug(`initialised feed1: ${feed1.key.toString('hex')} feed2: ${feed2.key.toString('hex')}`)
assert.same(2, core.feeds().length, 'two local feeds')
assert.same(2, feeds.feeds().length, 'two local feeds')
core.ready('query', () => {
collect(core.api.query.read({ query }), (err, msgs) => {
collect(core.view.query.read({ query }), (err, msgs) => {
assert.error(err, 'no error')

@@ -186,3 +208,3 @@ assert.ok(msgs.length === 2, 'returns two messages')

function setup (name, cb) {
core.writer(name, (err, feed) => {
feeds.writer(name, (err, feed) => {
feed.append({

@@ -201,2 +223,14 @@ type: 'chat/message',

context('aggregates all feeds, colliding timestamps', (assert, next) => {
var core = new Kappa()
var feeds = multifeed(ram, { valueEncoding: 'json' })
var db = memdb()
var source = createMultifeedSource({ feeds, db: sub(db, 'state') })
core.use('query', source, Query(sub(db, 'view'), {
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }],
getMessage: fromMultifeed(feeds)
}))
var name1 = crypto.randomBytes(16).toString('hex')
var name2 = crypto.randomBytes(16).toString('hex')
var query = [{ $filter: { value: { type: 'chat/message' } } }]

@@ -211,3 +245,3 @@ var timestamp = Date.now()

core.ready('query', () => {
collect(core.api.query.read({ query }), (err, msgs) => {
collect(core.view.query.read({ query }), (err, msgs) => {
assert.error(err, 'no error')

@@ -226,3 +260,3 @@ assert.ok(msgs.length === 2, 'returns two messages')

function setup (name, cb) {
core.writer(name1, (err, feed) => {
feeds.writer(name1, (err, feed) => {
feed.append({

@@ -240,9 +274,21 @@ type: 'chat/message',

context('live', (assert, next) => {
var core = new Kappa()
var feeds = multifeed(ram, { valueEncoding: 'json' })
var db = memdb()
var source = createMultifeedSource({ feeds, db: sub(db, 'state') })
core.use('query', source, Query(sub(db, 'view'), {
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }],
getMessage: fromMultifeed(feeds)
}))
var name1 = crypto.randomBytes(16).toString('hex')
var name2 = crypto.randomBytes(16).toString('hex')
var query = [{ $filter: { value: { type: 'chat/message' } } }]
let timestamp = Date.now()
core.writer(name1, (err, feed1) => {
feeds.writer(name1, (err, feed1) => {
assert.error(err, 'no error')
core.writer(name2, (err, feed2) => {
feeds.writer(name2, (err, feed2) => {
assert.error(err, 'no error')

@@ -256,3 +302,3 @@

core.ready('query', () => {
var stream = core.api.query.read({ live: true, old: false, query })
var stream = core.view.query.read({ live: true, old: false, query })

@@ -288,17 +334,23 @@ stream.on('data', (msg) => {

describe('multiple cores', (context) => {
let core1, db1
let core2, db2
describe('multiple multifeeds', (context) => {
context('aggregates all valid messages from all feeds when querying', (assert, next) => {
var core1 = new Kappa()
var core2 = new Kappa()
var feeds1 = multifeed(ram, { valueEncoding: 'json' })
var feeds2 = multifeed(ram, { valueEncoding: 'json' })
var db1 = memdb()
var db2 = memdb()
context.beforeEach((c) => {
core1 = kappa(ram, { valueEncoding: 'json' })
core2 = kappa(ram, { valueEncoding: 'json' })
var source1 = createMultifeedSource({ feeds: feeds1, db: sub(db1, 'state') })
core1.use('query', source1, Query(sub(db1, 'view'), {
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }],
getMessage: fromMultifeed(feeds1)
}))
indexes = [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }]
var source2 = createMultifeedSource({ feeds: feeds2, db: sub(db2, 'state') })
core2.use('query', source2, Query(sub(db2, 'view'), {
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }],
getMessage: fromMultifeed(feeds2)
}))
core1.use('query', Query(memdb(), { indexes }))
core2.use('query', Query(memdb(), { indexes }))
})
context('aggregates all valid messages from all feeds when querying', (assert, next) => {
var query = [{ $filter: { value: { type: 'chat/message' } } }]

@@ -308,23 +360,23 @@ var timestamp = Date.now()

setup(core1, (err, feed1) => {
setup(feeds1, (err, feed1) => {
assert.error(err, 'no error')
setup(core2, (err, feed2) => {
setup(feeds2, (err, feed2) => {
assert.error(err, 'no error')
debug(`initialised core1: ${feed1.key.toString('hex')} core2: ${feed2.key.toString('hex')}`)
assert.same(1, core1.feeds().length, 'one feed')
assert.same(1, core2.feeds().length, 'one feed')
assert.same(1, feeds1.feeds().length, 'one feed')
assert.same(1, feeds2.feeds().length, 'one feed')
core1.ready('query', () => {
collect(core1.api.query.read({ query }), (err, msgs) => {
collect(core1.view.query.read({ query }), (err, msgs) => {
assert.error(err, 'no error')
assert.ok(msgs.length === 1, 'returns a single message')
replicate(core1, core2, (err) => {
replicate(feeds1, feeds2, (err) => {
assert.error(err, 'no error')
assert.same(2, core1.feeds().length, `first core has second core's feed`)
assert.same(2, core2.feeds().length, `second core has first core's feed`)
assert.same(2, feeds1.feeds().length, `first core has second core's feed`)
assert.same(2, feeds2.feeds().length, `second core has first core's feed`)
core2.ready('query', () => {
collect(core2.api.query.read({ query }), (err, msgs) => {
collect(core2.view.query.read({ query }), (err, msgs) => {
assert.error(err, 'no error')

@@ -345,4 +397,4 @@ assert.ok(msgs.length === 2, 'returns two messages')

function setup (kcore, cb) {
kcore.writer('local', (err, feed) => {
function setup (multifeed, cb) {
multifeed.writer('local', (err, feed) => {
feed.append({

@@ -360,2 +412,21 @@ type: 'chat/message',

context('live', (assert, next) => {
var core1 = new Kappa()
var core2 = new Kappa()
var feeds1 = multifeed(ram, { valueEncoding: 'json' })
var feeds2 = multifeed(ram, { valueEncoding: 'json' })
var db1 = memdb()
var db2 = memdb()
var source1 = createMultifeedSource({ feeds: feeds1, db: sub(db1, 'state') })
core1.use('query', source1, Query(sub(db1, 'view'), {
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }],
getMessage: fromMultifeed(feeds1)
}))
var source2 = createMultifeedSource({ feeds: feeds2, db: sub(db2, 'state') })
core2.use('query', source2, Query(sub(db2, 'view'), {
indexes: [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }],
getMessage: fromMultifeed(feeds2)
}))
let query = [{ $filter: { value: { type: 'user/about' } } }]

@@ -370,6 +441,6 @@ let timestamp = Date.now()

setup(core1, (err, feed1) => {
setup(feeds1, (err, feed1) => {
assert.error(err, 'no error')
setup(core2, (err, feed2) => {
setup(feeds2, (err, feed2) => {
assert.error(err, 'no error')

@@ -381,3 +452,3 @@ let core1ready, core2ready

core1.ready('query', () => {
let stream1 = core1.api.query.read({ live: true, old: false, query })
let stream1 = core1.view.query.read({ live: true, old: false, query })

@@ -396,3 +467,3 @@ stream1.on('data', (msg) => {

core2.ready('query', () => {
let stream2 = core2.api.query.read({ live: true, old: false, query })
let stream2 = core2.view.query.read({ live: true, old: false, query })

@@ -420,6 +491,6 @@ stream2.on('data', (msg) => {

debug('replicating')
replicate(core1, core2, (err) => {
replicate(feeds1, feeds2, (err) => {
assert.error(err, 'no error')
assert.same(2, core1.feeds().length, `first core has replicated second core's feed`)
assert.same(2, core2.feeds().length, `second core has replicated first core's feed`)
assert.same(2, feeds1.feeds().length, `first core has replicated second core's feed`)
assert.same(2, feeds2.feeds().length, `second core has replicated first core's feed`)
})

@@ -436,6 +507,6 @@ })

function setup (kcore, cb) {
kcore.writer('local', cb)
function setup (multifeed, cb) {
multifeed.writer('local', cb)
}
})
})

@@ -46,6 +46,16 @@ const rimraf = require('rimraf')

function replicate (core1, core2, cb) {
var stream = core1.replicate()
stream.pipe(core2.replicate()).pipe(stream)
stream.on('end', cb)
function replicate (a, b, opts, cb) {
if (typeof opts === 'function') return replicate(a, b, {}, opts)
if (!cb) cb = noop
var s = a.replicate(true, Object.assign({ live: false }, opts))
var d = b.replicate(false, Object.assign({ live: false }, opts))
s.pipe(d).pipe(s)
s.on('error', (err) => {
if (err) return cb(err)
})
s.on('end', cb)
}

@@ -52,0 +62,0 @@

@@ -41,6 +41,2 @@ var deepEqual = require('deep-equal')

function isFunction (variable) {
return typeof variable === 'function'
}
function validator (msg) {

@@ -50,6 +46,46 @@ if (typeof msg !== 'object') return null

if (typeof msg.value.timestamp !== 'number') return null
if (typeof msg.value.type !== 'string') return null
return msg
}
function fromMultifeed (feeds, opts = {}) {
var validate = opts.validator || function (msg) { return msg }
return function getMessage (msg, cb) {
var msgId = msg.value
var [ feedId, sequence ] = msgId.split('@')
var feed = feeds.feed(feedId)
var seq = Number(sequence)
feed.get(seq, (err, value) => {
var msg = validate({
key: feed.key.toString('hex'),
seq,
value
})
if (!msg) return cb(new Error('message failed to validate'))
return cb(null, msg)
})
}
}
function fromHypercore (feed, opts = {}) {
var validate = opts.validator || function (msg) { return msg }
return function getMessage (msg, cb) {
var msgId = msg.value
var sequence = msgId.split('@')[1]
var seq = Number(sequence)
feed.get(seq, (err, value) => {
var msg = validate({
key: feed.key.toString('hex'),
seq,
value
})
if (!msg) return cb(new Error('message failed to validate'))
return cb(null, msg)
})
}
}
module.exports = {

@@ -60,4 +96,5 @@ has,

findByPath,
isFunction,
validator
validator,
fromMultifeed,
fromHypercore
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc