kappa-view-query
Advanced tools
Comparing version 2.0.6 to 2.0.7
@@ -14,3 +14,3 @@ const kappa = require('kappa-core') | ||
const indexes = [ | ||
{ key: 'log', value: ['value', 'timestamp'] }, | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }, | ||
@@ -29,19 +29,19 @@ { key: 'cha', value: [['value', 'type'], ['value', 'content', 'channel'], ['value', 'timestamp']] } | ||
type: 'user/about', | ||
timestamp: Date.now() + 1, | ||
timestamp: Date.now(), | ||
content: { name: 'Grace' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now() + 2, | ||
timestamp: Date.now(), | ||
content: { body: 'Second post' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now() + 3, | ||
timestamp: Date.now(), | ||
content: { channel: 'dogs', body: 'Lurchers rule' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now() + 4, | ||
timestamp: Date.now(), | ||
content: { channel: 'dogs', body: 'But sometimes I prefer labra-doodles' } | ||
}, { | ||
type: 'user/about', | ||
timestamp: Date.now() + 5, | ||
timestamp: Date.now(), | ||
content: { name: 'Poison Ivy' } | ||
@@ -48,0 +48,0 @@ }] |
27
index.js
@@ -6,2 +6,3 @@ const through = require('through2') | ||
const debug = require('debug')('kappa-view-query') | ||
const liveStream = require('level-live-stream') | ||
@@ -38,3 +39,3 @@ const Explain = require('./explain') | ||
type: 'put', | ||
key: [idx.key, ...indexKeys], | ||
key: [idx.key, ...indexKeys, msg.seq, msg.key], | ||
value: [msg.key, msg.seq].join('@'), | ||
@@ -60,3 +61,3 @@ keyEncoding, | ||
debug(`[INDEXING] ${JSON.stringify(ops)}`) | ||
debug(`indexing ${JSON.stringify(msgs, null, 2)} AS ${JSON.stringify(ops, null, 2)}`) | ||
@@ -66,3 +67,5 @@ db.batch(ops, next) | ||
indexed: (msgs) => { | ||
msgs.forEach((msg) => events.emit('update', msg)) | ||
msgs.forEach((msg) => { | ||
events.emit('update', msg) | ||
}) | ||
}, | ||
@@ -80,2 +83,4 @@ api: { | ||
var thru = through.obj(function (msg, enc, next) { | ||
if (msg.sync) return next() | ||
var msgId = msg.value | ||
@@ -88,3 +93,9 @@ var [ feedId, sequence ] = msgId.split('@') | ||
if (err) return next() | ||
var msg = validator({ key: feed.key.toString('hex'), seq, value }) | ||
var msg = validator({ | ||
key: feed.key.toString('hex'), | ||
seq, | ||
value | ||
}) | ||
if (!msg) return next() | ||
@@ -96,3 +107,3 @@ this.push(msg) | ||
var stream = db.createReadStream(Object.assign(__opts, { | ||
var streamOpts = Object.assign(__opts, { | ||
lte: [idx.key, ...__opts.lte], | ||
@@ -103,4 +114,8 @@ gte: [idx.key, ...__opts.gte], | ||
values: true | ||
})) | ||
}) | ||
var stream = __opts.live | ||
? liveStream(db, streamOpts) | ||
: db.createReadStream(streamOpts) | ||
stream.pipe(thru) | ||
@@ -107,0 +122,0 @@ |
{ | ||
"name": "kappa-view-query", | ||
"version": "2.0.6", | ||
"version": "2.0.7", | ||
"description": "", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "nyc tape test/**/*.test.js | tap-spec" | ||
"test": "tape test/**/*.test.js | tap-spec", | ||
"cover": "nyc tape test/**/*.test.js | tap-spec" | ||
}, | ||
@@ -14,2 +15,3 @@ "dependencies": { | ||
"events": "^3.0.0", | ||
"level-live-stream": "^1.4.12", | ||
"map-filter-reduce": "^3.2.2", | ||
@@ -16,0 +18,0 @@ "pull-stream-to-stream": "^1.3.4", |
@@ -63,3 +63,3 @@ # Kappa View Query | ||
// Define a validator to determine if a message should be indexed or not | ||
// Define a validator or a message decoder to determine if a message should be indexed or not | ||
function validator (msg) { | ||
@@ -73,5 +73,14 @@ if (typeof msg !== 'object') return null | ||
// here's an alternative using protocol buffers, assuming a message schema exists | ||
const { Message } = protobuf(fs.readFileSync(path.join(path.resolve(__dirname), 'message.proto'))) | ||
function validator (msg) { | ||
try { msg.value = Message.decode(msg.value) } | ||
catch (err) { return console.error(err) && false } | ||
return msg | ||
} | ||
// Define a set of indexes under a namespace | ||
const indexes = [ | ||
{ key: 'log', value: ['value', 'timestamp'] }, | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }, | ||
@@ -163,10 +172,14 @@ { key: 'cha', value: [['value', 'type'], ['value', 'content', 'channel']] } | ||
## Acknowledgments | ||
kappa-view-query was built by [@kyphae](https://github.com/kyphae/) and assisted by [@dominictarr](https://github.com/dominictarr). It uses [@dominictarr](https://github.com/dominictarr)'s scoring system and query interface. | ||
kappa-view-query was built by [@kyphae](https://github.com/kyphae/) and assisted by [@dominictarr](https://github.com/dominictarr). It uses [@dominictarr](https://github.com/dominictarr)'s scoring system and query interface from [flumeview-query](https://github.com/flumedb/flumeview-query). | ||
## Version Changes | ||
Version 2 - Updated to remove `pull-stream` and `flumeview-query` as a core dependency for better compatibility with the rest of the `kappa-db` ecosystem. `api.query.read` now returns a regular readable node stream. In order to continue to use pull-streams, check out an updated fork of V1: [kappa-view-pull-query](https://github.com/kappa-db/kappa-view-pull-query) | ||
## Releases | ||
### 2.0.0 | ||
- Updated to remove the need for `pull-stream` and `flumeview-query` as an external dependency, providing better compatibility with the rest of the `kappa-db` ecosystem. | ||
- `core.api.query.read` returns a regular readable node stream. | ||
- In order to continue to use pull-streams: | ||
- Use the updated fork of v1 at [kappa-view-pull-query](https://github.com/kappa-db/kappa-view-pull-query), | ||
- Or make use of [stream-to-pull-stream](https://github.com/pull-stream/pull-stream-to-stream/). | ||
## Todos | ||
* [ ] write more comprehensive tests to ensure we're properly using map-filter-reduce | ||
* [ ] write tests for allowing different kinds of validators | ||
* [ ] make it so we can use multiple message schemas, and it selects the most appropriate schema based on the query | ||
### 2.0.7 | ||
- Fixed an outstanding issue where live streams were not working. Queries with `{ live: true }` setup will now properly pipe messages through as they are indexed. | ||
- Fixed an outstanding issue where messages with a matching timestamp were colliding, where the last indexed would over-writing the previous. Messages are now indexed, on top of provided values, on the sequence number and the feed id, for guaranteed uniqueness. |
@@ -11,2 +11,8 @@ const { describe } = require('tape-plus') | ||
const seeds = require('./seeds.json') | ||
.sort((a, b) => a.timestamp > b.timestamp ? +1 : -1) | ||
const drive = require('./drive.json') | ||
.sort((a, b) => a.timestamp > b.timestamp ? +1 : -1) | ||
const { cleanup, tmp, replicate } = require('./util') | ||
@@ -21,3 +27,7 @@ | ||
indexes = [{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }] | ||
indexes = [ | ||
{ key: 'log', value: [['value', 'timestamp']] }, | ||
{ key: 'typ', value: [['value', 'type'], ['value', 'timestamp']] }, | ||
{ key: 'fil', value: [['value', 'filename'], ['value', 'timestamp']] } | ||
] | ||
@@ -28,27 +38,5 @@ core.use('query', Query(db, { indexes })) | ||
context('perform a query', (assert, next) => { | ||
let data = [{ | ||
type: 'chat/message', | ||
timestamp: Date.now(), | ||
content: { body: 'First message' } | ||
}, { | ||
type: 'user/about', | ||
timestamp: Date.now(), | ||
content: { name: 'Grace' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now() + 3, | ||
content: { body: 'Third message' } | ||
}, { | ||
type: 'chat/message', | ||
timestamp: Date.now() + 2, | ||
content: { body: 'Second message' } | ||
}, { | ||
type: 'user/about', | ||
timestamp: Date.now() + 1, | ||
content: { name: 'Poison Ivy' } | ||
}] | ||
core.writer('local', (err, feed) => { | ||
feed.append(data, (err, _) => { | ||
assert.error(err) | ||
feed.append(seeds, (err, _) => { | ||
assert.error(err, 'no error') | ||
@@ -58,6 +46,4 @@ let query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
core.ready('query', () => { | ||
collect(core.api.query.read({ reverse: true, query }), (err, msgs) => { | ||
var check = data | ||
.filter((msg) => msg.type === 'chat/message') | ||
.sort((a, b) => a.timestamp < b.timestamp ? +1 : -1) | ||
collect(core.api.query.read({ query }), (err, msgs) => { | ||
var check = seeds.filter((msg) => msg.type === 'chat/message') | ||
@@ -71,2 +57,80 @@ assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index') | ||
}) | ||
context('get all messages', (assert, next) => { | ||
core.writer('local', (err, feed) => { | ||
feed.append(seeds, (err, _) => { | ||
assert.error(err, 'no error') | ||
let query = [{ $filter: { value: { timestamp: { $gt: 0 } } } }] | ||
core.ready('query', () => { | ||
collect(core.api.query.read({ query }), (err, msgs) => { | ||
var check = seeds | ||
assert.equal(msgs.length, check.length, 'gets the same number of messages') | ||
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index') | ||
next() | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
context('fil index - get all changes to a specific file, then get all changes to all files', (assert, next) => { | ||
core.writer('local', (err, feed) => { | ||
feed.append(drive, (err, _) => { | ||
assert.error(err, 'no error') | ||
let filename = 'hello.txt' | ||
let helloQuery = [{ $filter: { value: { filename, timestamp: { $gt: 0 } } } }] | ||
core.ready('query', () => { | ||
collect(core.api.query.read({ query: helloQuery }), (err, msgs) => { | ||
var check = drive.filter((msg) => msg.filename === filename) | ||
assert.equal(msgs.length, check.length, 'gets the same number of messages') | ||
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index') | ||
let fileQuery = [{ $filter: { value: { timestamp: { $gt: 0 } } } }] | ||
collect(core.api.query.read({ query: fileQuery }), (err, msgs) => { | ||
var check = drive | ||
assert.equal(msgs.length, check.length, 'gets the same number of messages') | ||
assert.same(msgs.map((msg) => msg.value), check, 'querys messages using correct index') | ||
next() | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
context('live', (assert, next) => { | ||
core.writer('local', (err, feed) => { | ||
assert.error(err, 'no error') | ||
feed.append(seeds.slice(0, 2), (err, _) => { | ||
assert.error(err, 'no error') | ||
let count = 0 | ||
let check = seeds.filter((msg) => msg.type === 'chat/message') | ||
let query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
core.ready('query', () => { | ||
var stream = core.api.query.read({ live: true, query }) | ||
stream.on('data', (msg) => { | ||
assert.same(check[count], msg.value, 'streams each message live') | ||
++count | ||
done() | ||
}) | ||
feed.append(seeds.slice(3, 5), (err, _) => { | ||
assert.error(err, 'no error') | ||
}) | ||
function done (err) { | ||
if (count === check.length) return next() | ||
} | ||
}) | ||
}) | ||
}) | ||
}) | ||
}) | ||
@@ -90,3 +154,3 @@ | ||
context('aggregates all valid messages from all feeds when querying', (assert, next) => { | ||
context('aggregates all feeds', (assert, next) => { | ||
var query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
@@ -96,4 +160,8 @@ var timestamp = Date.now() | ||
setup(name1, (feed1) => { | ||
setup(name2, (feed2) => { | ||
setup(name1, (err, feed1) => { | ||
assert.error(err, 'no error') | ||
setup(name2, (err, feed2) => { | ||
assert.error(err, 'no error') | ||
debug(`initialised feed1: ${feed1.key.toString('hex')} feed2: ${feed2.key.toString('hex')}`) | ||
@@ -118,3 +186,2 @@ assert.same(2, core.feeds().length, 'two local feeds') | ||
core.writer(name, (err, feed) => { | ||
assert.error(err, 'no error') | ||
feed.append({ | ||
@@ -125,5 +192,4 @@ type: 'chat/message', | ||
}, (err, seq) => { | ||
assert.error(err, 'no error') | ||
count++ | ||
cb(feed) | ||
cb(null, feed) | ||
}) | ||
@@ -133,2 +199,84 @@ }) | ||
}) | ||
context('aggregates all feeds, colliding timestamps', (assert, next) => { | ||
var query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
var timestamp = Date.now() | ||
setup(name1, (err, feed1) => { | ||
assert.error(err, 'no error') | ||
setup(name2, (err, feed2) => { | ||
assert.error(err, 'no error') | ||
core.ready('query', () => { | ||
collect(core.api.query.read({ query }), (err, msgs) => { | ||
assert.error(err, 'no error') | ||
assert.ok(msgs.length === 2, 'returns two messages') | ||
assert.same(msgs, [ | ||
{ key: feed1.key.toString('hex'), seq: 0, value: { type: 'chat/message', timestamp, content: { body: name1 } } }, | ||
{ key: feed2.key.toString('hex'), seq: 1, value: { type: 'chat/message', timestamp, content: { body: name2 } }} | ||
], 'aggregates all feeds') | ||
next() | ||
}) | ||
}) | ||
}) | ||
}) | ||
function setup (name, cb) { | ||
core.writer(name1, (err, feed) => { | ||
feed.append({ | ||
type: 'chat/message', | ||
timestamp, | ||
content: { body: name } | ||
}, (err, seq) => { | ||
cb(null, feed) | ||
}) | ||
}) | ||
} | ||
}) | ||
context('live', (assert, next) => { | ||
var query = [{ $filter: { value: { type: 'chat/message' } } }] | ||
let timestamp = Date.now() | ||
core.writer(name1, (err, feed1) => { | ||
assert.error(err, 'no error') | ||
core.writer(name2, (err, feed2) => { | ||
assert.error(err, 'no error') | ||
let count = 0 | ||
let check = seeds | ||
.map((msg) => Object.assign(msg, { timestamp })) | ||
.filter((msg) => msg.type === 'chat/message') | ||
core.ready('query', () => { | ||
var stream = core.api.query.read({ live: true, old: false, query }) | ||
stream.on('data', (msg) => { | ||
assert.same(msg.value, check[count], 'streams each message live') | ||
++count | ||
done() | ||
}) | ||
}) | ||
core.ready('query', () => { | ||
var batch2 = seeds.slice(3, 5) | ||
feed2.append(batch2, (err, _) => { | ||
assert.error(err, 'no error') | ||
}) | ||
}) | ||
core.ready('query', () => { | ||
var batch1 = seeds.slice(0, 3) | ||
feed1.append(batch1, (err, _) => { | ||
assert.error(err, 'no error') | ||
}) | ||
}) | ||
function done () { | ||
if (count === check.length) return next() | ||
} | ||
}) | ||
}) | ||
}) | ||
}) | ||
@@ -155,4 +303,7 @@ | ||
setup(core1, (feed1) => { | ||
setup(core2, (feed2) => { | ||
setup(core1, (err, feed1) => { | ||
assert.error(err, 'no error') | ||
setup(core2, (err, feed2) => { | ||
assert.error(err, 'no error') | ||
debug(`initialised core1: ${feed1.key.toString('hex')} core2: ${feed2.key.toString('hex')}`) | ||
@@ -178,3 +329,3 @@ assert.same(1, core1.feeds().length, 'one feed') | ||
{ key: feed1.key.toString('hex'), seq: 0, value: { type: 'chat/message', timestamp } }, | ||
{ key: feed2.key.toString('hex'), seq: 0, value: { type: 'chat/message', timestamp: timestamp + 1 } } | ||
{ key: feed2.key.toString('hex'), seq: 0, value: { type: 'chat/message', timestamp: timestamp + 1} } | ||
], 'query aggregates messages from all feeds') | ||
@@ -192,10 +343,8 @@ next() | ||
kcore.writer('local', (err, feed) => { | ||
assert.error(err, 'no error') | ||
feed.append({ | ||
type: 'chat/message', | ||
timestamp: timestamp + count | ||
timestamp: timestamp + count, | ||
}, (err, seq) => { | ||
count++ | ||
assert.error(err, 'no error') | ||
cb(feed) | ||
++count | ||
cb(null, feed) | ||
}) | ||
@@ -205,2 +354,79 @@ }) | ||
}) | ||
context('live', (assert, next) => { | ||
let query = [{ $filter: { value: { type: 'user/about' } } }] | ||
let timestamp = Date.now() | ||
let feed1Name = { type: 'user/about', timestamp, content: { name: 'Magpie' } } | ||
let feed2Name = { type: 'user/about', timestamp, content: { name: 'Jackdaw' } } | ||
let count1 = 0 | ||
let count2 = 0 | ||
let check1 = [feed1Name, feed2Name] | ||
let check2 = [feed2Name, feed1Name] | ||
setup(core1, (err, feed1) => { | ||
assert.error(err, 'no error') | ||
setup(core2, (err, feed2) => { | ||
assert.error(err, 'no error') | ||
let core1ready, core2ready | ||
debug(`initialised core1: ${feed1.key.toString('hex')} core2: ${feed2.key.toString('hex')}`) | ||
core1.ready('query', () => { | ||
let stream1 = core1.api.query.read({ live: true, old: false, query }) | ||
stream1.on('data', (msg) => { | ||
debug(`stream 1: ${JSON.stringify(msg, null, 2)}` ) | ||
assert.same(msg.value, check1[count1], 'streams each message live') | ||
++count1 | ||
done() | ||
}) | ||
core1ready = true | ||
doReplication() | ||
}) | ||
core2.ready('query', () => { | ||
let stream2 = core2.api.query.read({ live: true, old: false, query }) | ||
stream2.on('data', (msg) => { | ||
debug(`stream 2: ${JSON.stringify(msg, null, 2)}` ) | ||
assert.same(msg.value, check2[count2], 'streams each message live') | ||
++count2 | ||
done() | ||
}) | ||
core2ready = true | ||
doReplication() | ||
}) | ||
function doReplication () { | ||
if (!(core1ready && core2ready)) return | ||
feed1.append(feed1Name, (err, seq) => { | ||
assert.error(err, 'no error') | ||
feed2.append(feed2Name, (err, seq) => { | ||
assert.error(err, 'no error') | ||
debug('replicating') | ||
replicate(core1, core2, (err) => { | ||
assert.error(err, 'no error') | ||
assert.same(2, core1.feeds().length, `first core has replicated second core's feed`) | ||
assert.same(2, core2.feeds().length, `second core has replicated first core's feed`) | ||
}) | ||
}) | ||
}) | ||
} | ||
}) | ||
}) | ||
function done () { | ||
if (count1 === 2 && count2 === 2) return next() | ||
} | ||
function setup (kcore, cb) { | ||
kcore.writer('local', cb) | ||
} | ||
}) | ||
}) |
@@ -10,13 +10,12 @@ const rimraf = require('rimraf') | ||
if (!Array.isArray(dirs)) dirs = [dirs] | ||
var pending = 1 | ||
var pending = dirs.length | ||
function next (n) { | ||
var dir = dirs[n] | ||
if (!dir) return | ||
if (!dir) return done() | ||
++pending | ||
process.nextTick(next, n + 1) | ||
rimraf(dir, (err) => { | ||
debug(`[CLEANUP] ${dir} : ${ err ? 'failed' : 'success'}`) | ||
if (err) return done(err) | ||
process.nextTick(next, n + 1) | ||
done() | ||
@@ -38,5 +37,4 @@ }) | ||
function tmp () { | ||
var tmpDir = `./${tmpdir().name}` | ||
var tmpDir = tmpdir().name | ||
mkdirp.sync(tmpDir) | ||
debug(`[TEMP] creating temp directory ${tmpDir}`) | ||
return tmpDir | ||
@@ -51,3 +49,2 @@ } | ||
function replicate (core1, core2, cb) { | ||
debug(`[REPLICATE] replicating...`) | ||
var stream = core1.replicate() | ||
@@ -60,2 +57,2 @@ stream.pipe(core2.replicate()).pipe(stream) | ||
module.exports = { cleanup, tmp, uniq, replicate, noop } | ||
module.exports = { cleanup, tmp, uniq, replicate } |
136575
14
853
183
9
+ Addedlevel-live-stream@^1.4.12
+ Addedlevel-live-stream@1.4.13(transitive)
+ Addedlevel-post@1.0.7(transitive)
+ Addedlooper@2.0.0(transitive)
+ Addedltgt@2.2.1(transitive)
+ Addedpull-cat@1.1.11(transitive)
+ Addedpull-level@2.0.4(transitive)
+ Addedpull-live@1.0.1(transitive)
+ Addedpull-pushable@2.2.0(transitive)
+ Addedpull-stream-to-stream@2.0.0(transitive)
+ Addedpull-window@2.1.4(transitive)