Socket
Socket
Sign inDemoInstall

kappa-view-query

Package Overview
Dependencies
124
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.0.6 to 2.0.7

test/drive.json

12

example.js

@@ -14,3 +14,3 @@ const kappa = require('kappa-core')

const indexes = [
{ key: 'log', value: ['value', 'timestamp'] },
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] },

@@ -29,19 +29,19 @@ { key: 'cha', value: [['value', 'type'], ['value', 'content', 'channel'], ['value', 'timestamp']] }

type: 'user/about',
timestamp: Date.now() + 1,
timestamp: Date.now(),
content: { name: 'Grace' }
}, {
type: 'chat/message',
timestamp: Date.now() + 2,
timestamp: Date.now(),
content: { body: 'Second post' }
}, {
type: 'chat/message',
timestamp: Date.now() + 3,
timestamp: Date.now(),
content: { channel: 'dogs', body: 'Lurchers rule' }
}, {
type: 'chat/message',
timestamp: Date.now() + 4,
timestamp: Date.now(),
content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' }
}, {
type: 'user/about',
timestamp: Date.now() + 5,
timestamp: Date.now(),
content: { name: 'Poison Ivy' }

@@ -48,0 +48,0 @@ }]

@@ -6,2 +6,3 @@ const through = require('through2')

const debug = require('debug')('kappa-view-query')
const liveStream = require('level-live-stream')

@@ -38,3 +39,3 @@ const Explain = require('./explain')

type: 'put',
key: [idx.key, ...indexKeys],
key: [idx.key, ...indexKeys, msg.seq, msg.key],
value: [msg.key, msg.seq].join('@'),

@@ -60,3 +61,3 @@ keyEncoding,

debug(`[INDEXING] ${JSON.stringify(ops)}`)
debug(`indexing ${JSON.stringify(msgs, null, 2)} AS ${JSON.stringify(ops, null, 2)}`)

@@ -66,3 +67,5 @@ db.batch(ops, next)

indexed: (msgs) => {
msgs.forEach((msg) => events.emit('update', msg))
msgs.forEach((msg) => {
events.emit('update', msg)
})
},

@@ -80,2 +83,4 @@ api: {

var thru = through.obj(function (msg, enc, next) {
if (msg.sync) return next()
var msgId = msg.value

@@ -88,3 +93,9 @@ var [ feedId, sequence ] = msgId.split('@')

if (err) return next()
var msg = validator({ key: feed.key.toString('hex'), seq, value })
var msg = validator({
key: feed.key.toString('hex'),
seq,
value
})
if (!msg) return next()

@@ -96,3 +107,3 @@ this.push(msg)

var stream = db.createReadStream(Object.assign(__opts, {
var streamOpts = Object.assign(__opts, {
lte: [idx.key, ...__opts.lte],

@@ -103,4 +114,8 @@ gte: [idx.key, ...__opts.gte],

values: true
}))
})
var stream = __opts.live
? liveStream(db, streamOpts)
: db.createReadStream(streamOpts)
stream.pipe(thru)

@@ -107,0 +122,0 @@

{
"name": "kappa-view-query",
"version": "2.0.6",
"version": "2.0.7",
"description": "",
"main": "index.js",
"scripts": {
"test": "nyc tape test/**/*.test.js | tap-spec"
"test": "tape test/**/*.test.js | tap-spec",
"cover": "nyc tape test/**/*.test.js | tap-spec"
},

@@ -14,2 +15,3 @@ "dependencies": {

"events": "^3.0.0",
"level-live-stream": "^1.4.12",
"map-filter-reduce": "^3.2.2",

@@ -16,0 +18,0 @@ "pull-stream-to-stream": "^1.3.4",

@@ -63,3 +63,3 @@ # Kappa View Query

// Define a validator to determine if a message should be indexed or not
// Define a validator or a message decoder to determine if a message should be indexed or not
function validator (msg) {

@@ -73,5 +73,14 @@ if (typeof msg !== 'object') return null

// here's an alternative using protocol buffers, assuming a message schema exists
const { Message } = protobuf(fs.readFileSync(path.join(path.resolve(__dirname), 'message.proto')))
function validator (msg) {
try { msg.value = Message.decode(msg.value) }
catch (err) { return console.error(err) && false }
return msg
}
// Define a set of indexes under a namespace
const indexes = [
{ key: 'log', value: ['value', 'timestamp'] },
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] },

@@ -163,10 +172,14 @@ { key: 'cha', value: [['value', 'type'], ['value', 'content', 'channel']] }

## Acknowledgments
kappa-view-query was built by [@kyphae](https://github.com/kyphae/) and assisted by [@dominictarr](https://github.com/dominictarr). It uses [@dominictarr](https://github.com/dominictarr)'s scoring system and query interface.
kappa-view-query was built by [@kyphae](https://github.com/kyphae/) and assisted by [@dominictarr](https://github.com/dominictarr). It uses [@dominictarr](https://github.com/dominictarr)'s scoring system and query interface from [flumeview-query](https://github.com/flumedb/flumeview-query).
## Version Changes
Version 2 - Updated to remove `pull-stream` and `flumeview-query` as a core dependency for better compatibility with the rest of the `kappa-db` ecosystem. `api.query.read` now returns a regular readable node stream. In order to continue to use pull-streams, check out an updated fork of V1: [kappa-view-pull-query](https://github.com/kappa-db/kappa-view-pull-query)
## Releases
### 2.0.0
- Updated to remove the need for `pull-stream` and `flumeview-query` as an external dependency, providing better compatibility with the rest of the `kappa-db` ecosystem.
- `core.api.query.read` returns a regular readable node stream.
- In order to continue to use pull-streams:
- Use the updated fork of v1 at [kappa-view-pull-query](https://github.com/kappa-db/kappa-view-pull-query),
- Or make use of [stream-to-pull-stream](https://github.com/pull-stream/pull-stream-to-stream/).
## Todos
* [ ] write more comprehensive tests to ensure we're properly using map-filter-reduce
* [ ] write tests for allowing different kinds of validators
* [ ] make it so we can use multiple message schemas, and it selects the most appropriate schema based on the query
### 2.0.7
- Fixed an outstanding issue where live streams were not working. Queries with `{ live: true }` setup will now properly pipe messages through as they are indexed.
- Fixed an outstanding issue where messages with a matching timestamp were colliding, where the last indexed would over-writing the previous. Messages are now indexed, on top of provided values, on the sequence number and the feed id, for guaranteed uniqueness.

@@ -11,2 +11,8 @@ const { describe } = require('tape-plus')

const seeds = require('./seeds.json')
.sort((a, b) => a.timestamp > b.timestamp ? +1 : -1)
const drive = require('./drive.json')
.sort((a, b) => a.timestamp > b.timestamp ? +1 : -1)
const { cleanup, tmp, replicate } = require('./util')

@@ -21,3 +27,7 @@

indexes = [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }]
indexes = [
{ key: 'log', value: [['value', 'timestamp']] },
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] },
{ key: 'fil', value: [['value', 'filename'], ['value', 'timestamp']] }
]

@@ -28,27 +38,5 @@ core.use('query', Query(db, { indexes }))

context('perform a query', (assert, next) => {
let data = [{
type: 'chat/message',
timestamp: Date.now(),
content: { body: 'First message' }
}, {
type: 'user/about',
timestamp: Date.now(),
content: { name: 'Grace' }
}, {
type: 'chat/message',
timestamp: Date.now() + 3,
content: { body: 'Third message' }
}, {
type: 'chat/message',
timestamp: Date.now() + 2,
content: { body: 'Second message' }
}, {
type: 'user/about',
timestamp: Date.now() + 1,
content: { name: 'Poison Ivy' }
}]
core.writer('local', (err, feed) => {
feed.append(data, (err, _) => {
assert.error(err)
feed.append(seeds, (err, _) => {
assert.error(err, 'no error')

@@ -58,6 +46,4 @@ let query = [{ $filter: { value: { type: 'chat/message' } } }]

core.ready('query', () => {
collect(core.api.query.read({ reverse: true, query }), (err, msgs) => {
var check = data
.filter((msg) => msg.type === 'chat/message')
.sort((a, b) => a.timestamp < b.timestamp ? +1 : -1)
collect(core.api.query.read({ query }), (err, msgs) => {
var check = seeds.filter((msg) => msg.type === 'chat/message')

@@ -71,2 +57,80 @@ assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index')

})
context('get all messages', (assert, next) => {
core.writer('local', (err, feed) => {
feed.append(seeds, (err, _) => {
assert.error(err, 'no error')
let query = [{ $filter: { value: { timestamp: { $gt: 0 } } } }]
core.ready('query', () => {
collect(core.api.query.read({ query }), (err, msgs) => {
var check = seeds
assert.equal(msgs.length, check.length, 'gets the same number of messages')
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index')
next()
})
})
})
})
})
context('fil index - get all changes to a specific file, then get all changes to all files', (assert, next) => {
core.writer('local', (err, feed) => {
feed.append(drive, (err, _) => {
assert.error(err, 'no error')
let filename = 'hello.txt'
let helloQuery = [{ $filter: { value: { filename, timestamp: { $gt: 0 } } } }]
core.ready('query', () => {
collect(core.api.query.read({ query: helloQuery }), (err, msgs) => {
var check = drive.filter((msg) => msg.filename === filename)
assert.equal(msgs.length, check.length, 'gets the same number of messages')
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index')
let fileQuery = [{ $filter: { value: { timestamp: { $gt: 0 } } } }]
collect(core.api.query.read({ query: fileQuery }), (err, msgs) => {
var check = drive
assert.equal(msgs.length, check.length, 'gets the same number of messages')
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index')
next()
})
})
})
})
})
})
context('live', (assert, next) => {
core.writer('local', (err, feed) => {
assert.error(err, 'no error')
feed.append(seeds.slice(0, 2), (err, _) => {
assert.error(err, 'no error')
let count = 0
let check = seeds.filter((msg) => msg.type === 'chat/message')
let query = [{ $filter: { value: { type: 'chat/message' } } }]
core.ready('query', () => {
var stream = core.api.query.read({ live: true, query })
stream.on('data', (msg) => {
assert.same(check[count], msg.value, 'streams each message live')
++count
done()
})
feed.append(seeds.slice(3, 5), (err, _) => {
assert.error(err, 'no error')
})
function done (err) {
if (count === check.length) return next()
}
})
})
})
})
})

@@ -90,3 +154,3 @@

context('aggregates all valid messages from all feeds when querying', (assert, next) => {
context('aggregates all feeds', (assert, next) => {
var query = [{ $filter: { value: { type: 'chat/message' } } }]

@@ -96,4 +160,8 @@ var timestamp = Date.now()

setup(name1, (feed1) => {
setup(name2, (feed2) => {
setup(name1, (err, feed1) => {
assert.error(err, 'no error')
setup(name2, (err, feed2) => {
assert.error(err, 'no error')
debug(`initialised feed1: ${feed1.key.toString('hex')} feed2: ${feed2.key.toString('hex')}`)

@@ -118,3 +186,2 @@ assert.same(2, core.feeds().length, 'two local feeds')

core.writer(name, (err, feed) => {
assert.error(err, 'no error')
feed.append({

@@ -125,5 +192,4 @@ type: 'chat/message',

}, (err, seq) => {
assert.error(err, 'no error')
count++
cb(feed)
cb(null, feed)
})

@@ -133,2 +199,84 @@ })

})
context('aggregates all feeds, colliding timestamps', (assert, next) => {
var query = [{ $filter: { value: { type: 'chat/message' } } }]
var timestamp = Date.now()
setup(name1, (err, feed1) => {
assert.error(err, 'no error')
setup(name2, (err, feed2) => {
assert.error(err, 'no error')
core.ready('query', () => {
collect(core.api.query.read({ query }), (err, msgs) => {
assert.error(err, 'no error')
assert.ok(msgs.length === 2, 'returns two messages')
assert.same(msgs, [
{ key: feed1.key.toString('hex'), seq: 0, value: { type: 'chat/message', timestamp, content: { body: name1 } } },
{ key: feed2.key.toString('hex'), seq: 1, value: { type: 'chat/message', timestamp, content: { body: name2 } }}
], 'aggregates all feeds')
next()
})
})
})
})
function setup (name, cb) {
core.writer(name1, (err, feed) => {
feed.append({
type: 'chat/message',
timestamp,
content: { body: name }
}, (err, seq) => {
cb(null, feed)
})
})
}
})
context('live', (assert, next) => {
var query = [{ $filter: { value: { type: 'chat/message' } } }]
let timestamp = Date.now()
core.writer(name1, (err, feed1) => {
assert.error(err, 'no error')
core.writer(name2, (err, feed2) => {
assert.error(err, 'no error')
let count = 0
let check = seeds
.map((msg) => Object.assign(msg, { timestamp }))
.filter((msg) => msg.type === 'chat/message')
core.ready('query', () => {
var stream = core.api.query.read({ live: true, old: false, query })
stream.on('data', (msg) => {
assert.same(msg.value, check[count], 'streams each message live')
++count
done()
})
})
core.ready('query', () => {
var batch2 = seeds.slice(3, 5)
feed2.append(batch2, (err, _) => {
assert.error(err, 'no error')
})
})
core.ready('query', () => {
var batch1 = seeds.slice(0, 3)
feed1.append(batch1, (err, _) => {
assert.error(err, 'no error')
})
})
function done () {
if (count === check.length) return next()
}
})
})
})
})

@@ -155,4 +303,7 @@

setup(core1, (feed1) => {
setup(core2, (feed2) => {
setup(core1, (err, feed1) => {
assert.error(err, 'no error')
setup(core2, (err, feed2) => {
assert.error(err, 'no error')
debug(`initialised core1: ${feed1.key.toString('hex')} core2: ${feed2.key.toString('hex')}`)

@@ -178,3 +329,3 @@ assert.same(1, core1.feeds().length, 'one feed')

{ key: feed1.key.toString('hex'), seq: 0, value: { type: 'chat/message', timestamp } },
{ key: feed2.key.toString('hex'), seq: 0, value: { type: 'chat/message', timestamp: timestamp + 1 } }
{ key: feed2.key.toString('hex'), seq: 0, value: { type: 'chat/message', timestamp: timestamp + 1} }
], 'query aggregates messages from all feeds')

@@ -192,10 +343,8 @@ next()

kcore.writer('local', (err, feed) => {
assert.error(err, 'no error')
feed.append({
type: 'chat/message',
timestamp: timestamp + count
timestamp: timestamp + count,
}, (err, seq) => {
count++
assert.error(err, 'no error')
cb(feed)
++count
cb(null, feed)
})

@@ -205,2 +354,79 @@ })

})
context('live', (assert, next) => {
let query = [{ $filter: { value: { type: 'user/about' } } }]
let timestamp = Date.now()
let feed1Name = { type: 'user/about', timestamp, content: { name: 'Magpie' } }
let feed2Name = { type: 'user/about', timestamp, content: { name: 'Jackdaw' } }
let count1 = 0
let count2 = 0
let check1 = [feed1Name, feed2Name]
let check2 = [feed2Name, feed1Name]
setup(core1, (err, feed1) => {
assert.error(err, 'no error')
setup(core2, (err, feed2) => {
assert.error(err, 'no error')
let core1ready, core2ready
debug(`initialised core1: ${feed1.key.toString('hex')} core2: ${feed2.key.toString('hex')}`)
core1.ready('query', () => {
let stream1 = core1.api.query.read({ live: true, old: false, query })
stream1.on('data', (msg) => {
debug(`stream 1: ${JSON.stringify(msg, null, 2)}` )
assert.same(msg.value, check1[count1], 'streams each message live')
++count1
done()
})
core1ready = true
doReplication()
})
core2.ready('query', () => {
let stream2 = core2.api.query.read({ live: true, old: false, query })
stream2.on('data', (msg) => {
debug(`stream 2: ${JSON.stringify(msg, null, 2)}` )
assert.same(msg.value, check2[count2], 'streams each message live')
++count2
done()
})
core2ready = true
doReplication()
})
function doReplication () {
if (!(core1ready && core2ready)) return
feed1.append(feed1Name, (err, seq) => {
assert.error(err, 'no error')
feed2.append(feed2Name, (err, seq) => {
assert.error(err, 'no error')
debug('replicating')
replicate(core1, core2, (err) => {
assert.error(err, 'no error')
assert.same(2, core1.feeds().length, `first core has replicated second core's feed`)
assert.same(2, core2.feeds().length, `second core has replicated first core's feed`)
})
})
})
}
})
})
function done () {
if (count1 === 2 && count2 === 2) return next()
}
function setup (kcore, cb) {
kcore.writer('local', cb)
}
})
})

@@ -10,13 +10,12 @@ const rimraf = require('rimraf')

if (!Array.isArray(dirs)) dirs = [dirs]
var pending = 1
var pending = dirs.length
function next (n) {
var dir = dirs[n]
if (!dir) return
if (!dir) return done()
++pending
process.nextTick(next, n + 1)
rimraf(dir, (err) => {
debug(`[CLEANUP] ${dir} : ${ err ? 'failed' : 'success'}`)
if (err) return done(err)
process.nextTick(next, n + 1)
done()

@@ -38,5 +37,4 @@ })

function tmp () {
var tmpDir = `./${tmpdir().name}`
var tmpDir = tmpdir().name
mkdirp.sync(tmpDir)
debug(`[TEMP] creating temp directory ${tmpDir}`)
return tmpDir

@@ -51,3 +49,2 @@ }

function replicate (core1, core2, cb) {
debug(`[REPLICATE] replicating...`)
var stream = core1.replicate()

@@ -60,2 +57,2 @@ stream.pipe(core2.replicate()).pipe(stream)

module.exports = { cleanup, tmp, uniq, replicate, noop }
module.exports = { cleanup, tmp, uniq, replicate }
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc