@live-change/db
Advanced tools
Comparing version 0.3.60 to 0.3.61
@@ -8,8 +8,10 @@ class ChangeStream { | ||
to(output) { | ||
this.onChange((obj, oldObj, id, timestamp) => output.change(obj, oldObj, id, timestamp)) | ||
return this.onChange((obj, oldObj, id, timestamp) => output.change(obj, oldObj, id, timestamp)) | ||
} | ||
filter(func) { | ||
const pipe = new ChangeStreamPipe() | ||
this.onChange((obj, oldObj, id, timestamp) => | ||
const observerPromise = this.onChange((obj, oldObj, id, timestamp) => | ||
pipe.change(obj && func(obj) ? obj : null, oldObj && func(oldObj) ? oldObj : null, id, timestamp)) | ||
pipe.master = this | ||
pipe.observerPromise = observerPromise | ||
return pipe | ||
@@ -19,4 +21,6 @@ } | ||
const pipe = new ChangeStreamPipe() | ||
this.onChange((obj, oldObj, id, timestamp) => | ||
const observerPromise = this.onChange((obj, oldObj, id, timestamp) => | ||
pipe.change(obj && func(obj), oldObj && func(oldObj), id, timestamp)) | ||
pipe.master = this | ||
pipe.observerPromise = observerPromise | ||
return pipe | ||
@@ -26,5 +30,5 @@ } | ||
const pipe = new ChangeStreamPipe() | ||
this.onChange((obj, oldObj, id, timestamp) => { | ||
const observerPromise = this.onChange((obj, oldObj, id, timestamp) => { | ||
const indList = obj && func(obj) | ||
const oldIndList = oldObj && func(obj) | ||
const oldIndList = oldObj && func(oldObj) | ||
const ind = indList && indList.map(v => JSON.stringify(v)).join(':')+'_'+id | ||
@@ -40,2 +44,4 @@ const oldInd = oldIndList && oldIndList.map(v => JSON.stringify(v)).join(':')+'_'+id | ||
}) | ||
pipe.master = this | ||
pipe.observerPromise = observerPromise | ||
return pipe | ||
@@ -52,3 +58,11 @@ } | ||
this.callbacks.push(cb) | ||
return cb | ||
} | ||
async unobserve(cb) { | ||
const cbIndex = this.callbacks.indexOf(cb) | ||
if(cbIndex == -1) throw new Error("observer not found") | ||
if(this.callbacks.length == 0) { | ||
this.master.unobservePromise = await this.observerPromise | ||
} | ||
} | ||
async change(obj, oldObj, id, timestamp) { | ||
@@ -55,0 +69,0 @@ for(const callback of this.callbacks) await callback(obj, oldObj, id, timestamp) |
@@ -89,3 +89,3 @@ const IntervalTree = require('node-interval-tree').default | ||
}, 1000)*/ | ||
/* if(this.prefix == 'table_triggers'){ | ||
/* //if(this.prefix == 'table_triggers'){ | ||
let triggersDataJson = "" | ||
@@ -226,3 +226,3 @@ setInterval(() => { | ||
async readTo(endKey) { | ||
//console.log("RT", endKey, "IN", this.opLogBuffer) | ||
//if(this.prefix == 'table_triggers') console.log("RT", endKey, "IN", this.opLogBuffer) | ||
let lastKey = null | ||
@@ -236,3 +236,3 @@ while(this.opLogBuffer[0] && this.opLogBuffer[0].id <= endKey) { | ||
const op = next.operation | ||
//console.log("HANDLE OP LOG OPERATION", next) | ||
//if(this.prefix == 'table_triggers') console.log("HANDLE OP LOG OPERATION", next) | ||
if(op) { | ||
@@ -305,6 +305,6 @@ if(op.type == 'put') { | ||
if(this.readingMore) { | ||
//console.log("STORE SIGNAL") | ||
//if(this.indexName == 'triggers_new') console.log("STORE SIGNAL") | ||
this.gotSignals = true | ||
} else { | ||
//console.log("READ MORE ON SIGNAL") | ||
//if(this.indexName == 'triggers_new') console.log("READ MORE ON SIGNAL") | ||
this.readMore() | ||
@@ -315,2 +315,3 @@ } | ||
this.readingMore = true | ||
//if(this.indexName == 'triggers_new') console.log("READING TRIGGERS STARTED!") | ||
do { | ||
@@ -325,5 +326,5 @@ while(true) { | ||
) | ||
//console.log("GOT NEXT KEYS") | ||
//if(this.indexName == 'triggers_new') console.log("GOT NEXT KEYS") | ||
if(this.disposed) return | ||
//console.log("POSSIBLE NEXT KEYS", possibleNextKeys.map(({key, reader}) => [reader.prefix, key])) | ||
//if(this.indexName == 'triggers_new') console.log("POSSIBLE NEXT KEYS", possibleNextKeys.map(({key, reader}) => [reader.prefix, key])) | ||
if(possibleNextKeys.length == 0) { /// It could happen when oplog is cleared | ||
@@ -338,5 +339,5 @@ return | ||
} | ||
//console.log("NEXT KEY", next && next.reader && next.reader.prefix, next && next.key) | ||
//if(this.indexName == 'triggers_new') console.log("NEXT KEY", next && next.reader && next.reader.prefix, next && next.key) | ||
const lastKey = '\xFF\xFF\xFF\xFF' | ||
//console.log("NEXT", !!next, "KEY", next && next.key, lastKey) | ||
//if(this.indexName == 'triggers_new') console.log("NEXT", !!next, "KEY", next && next.key, lastKey) | ||
if(!next || next.key == lastKey) break // nothing to read | ||
@@ -349,6 +350,10 @@ let otherReaderNext = null | ||
} | ||
//console.log("OTHER READ NEXT", otherReaderNext && otherReaderNext.reader && otherReaderNext.reader.prefix, | ||
// otherReaderNext && otherReaderNext.key) | ||
const readEnd = (otherReaderNext && otherReaderNext.key) // Read to next other reader key | ||
//if(this.indexName == 'triggers_new') | ||
// console.log("OTHER READ NEXT", otherReaderNext && otherReaderNext.reader && otherReaderNext.reader.prefix, | ||
// otherReaderNext && otherReaderNext.key) | ||
let readEnd = (otherReaderNext && otherReaderNext.key) // Read to next other reader key | ||
|| (((''+(now - 1))).padStart(16, '0'))+':' // or to current timestamp | ||
if(readEnd < next) { | ||
readEnd = next+'\xff' | ||
} | ||
@@ -360,5 +365,7 @@ if((next.key||'') < this.currentKey) { | ||
} | ||
//console.log("CKN", this.currentKey, '=>', next.key) | ||
//if(this.indexName == 'triggers_new') console.log("CKN", this.currentKey, '=>', next.key) | ||
this.currentKey = next.key | ||
//if(this.indexName == 'triggers_new') console.log("READ TO", readEnd) | ||
const readKey = await next.reader.readTo(readEnd) | ||
//if(this.indexName == 'triggers_new') console.log("READED") | ||
if(readKey) { | ||
@@ -370,3 +377,3 @@ if((readKey||'') < this.currentKey) { | ||
} | ||
//console.log("CKR", this.currentKey, '=>', readKey) | ||
//if(this.indexName == 'triggers_new') console.log("CKR", this.currentKey, '=>', readKey) | ||
this.currentKey = readKey | ||
@@ -377,2 +384,3 @@ } | ||
this.readingMore = false | ||
//if(this.indexName == 'triggers_new') console.log("READING TRIGGERS FINISHED!") | ||
} | ||
@@ -407,3 +415,3 @@ dispose() { | ||
change(obj, oldObj) { | ||
//console.log("INDEX WRITE", obj, oldObj) | ||
//if(this.index.name == 'triggers_new') console.log("INDEX WRITE", obj, oldObj) | ||
if(obj) { | ||
@@ -410,0 +418,0 @@ if(oldObj && oldObj.id != obj.id) { |
@@ -94,3 +94,6 @@ const ReactiveDao = require("@live-change/dao") | ||
const index = this.#observers.indexOf(observer) | ||
if(index == -1) throw new Error("observer not found") | ||
if(index == -1) { | ||
console.error("OBSERVER NOT FOUND", observer) | ||
throw new Error("observer not found") | ||
} | ||
this.#observers.splice(index, 1) | ||
@@ -97,0 +100,0 @@ ;(await this.#observable).unobserve(observer) |
{ | ||
"name": "@live-change/db", | ||
"version": "0.3.60", | ||
"version": "0.3.61", | ||
"description": "Database with observable data for live queries", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
96338
2840