arangochair
Advanced tools
Comparing version 3.0.1 to 4.0.1
99
index.js
@@ -30,2 +30,7 @@ 'use strict'; | ||
const db = '/' === adbUrl.pathname ? '/_system' : adbUrl.pathname; | ||
this._loggerStatePath = `/_db${db}/_api/replication/logger-state`; | ||
this._loggerFollowPath = `/_db${db}/_api/replication/logger-follow`; | ||
this.collectionsMap = new Map(); | ||
@@ -45,3 +50,3 @@ this._stopped = false; | ||
_startLoggerState() { | ||
this.req.get({path:'/_api/replication/logger-state'}, (status, headers, body) => { | ||
this.req.get({path:this._loggerStatePath}, (status, headers, body) => { | ||
if (200 !== status) { | ||
@@ -58,6 +63,44 @@ this.emit('error', new Error('E_LOGGERSTATE'), status, headers, body); | ||
let type = 0; | ||
let tid = 0; | ||
let entry = 0; | ||
let typeStartIdx = 0; | ||
let typeEndIdx = 0; | ||
let idx0 = 0; | ||
let idx1 = 0; | ||
const typeStartBuffer = Buffer.from('type":'); | ||
const cnameStartBuffer = Buffer.from('cname":"'); | ||
const keyStartBuffer = Buffer.from('_key":"'); | ||
const commaDoubleTickBuffer = Buffer.from(',"'); | ||
const txns = new Map(); | ||
const handleEntry = () => { | ||
idx0 = entry.indexOf(cnameStartBuffer, idx0 + 2) + 8; | ||
idx1 = entry.indexOf(commaDoubleTickBuffer, idx0) - 1; | ||
const colName = entry.slice(idx0, idx1).toString(); | ||
const colConf = this.collectionsMap.get(colName); | ||
if (undefined === colConf) return; | ||
const events = colConf.get('events'); | ||
if (0 !== events.size && !events.has(type)) return; | ||
idx0 = entry.indexOf(keyStartBuffer, idx1 + 9); | ||
const key = entry.slice(idx0+7, entry.indexOf(commaDoubleTickBuffer, idx0+7)-1).toString(); | ||
const keys = colConf.get('keys'); | ||
if (0 !== keys.size && !events.has(key)) return; | ||
this.emit(colName, entry.slice(idx1 + 9, -1), mapTypeToText[type]); | ||
}; | ||
const ticktock = () => { | ||
if (this._stopped) return; | ||
this.req.get({path:`/_api/replication/logger-follow?from=${lastLogTick}`}, (status, headers, body) => { | ||
this.req.get({path:`${this._loggerFollowPath}?from=${lastLogTick}`}, (status, headers, body) => { | ||
if (204 < status || 0 === status) { | ||
@@ -80,27 +123,39 @@ this.emit('error', new Error('E_LOGGERFOLLOW'), status, headers, body); | ||
const entry = body.toString('utf8', start, idx); | ||
entry = body.slice(start, idx); | ||
start = idx+1; | ||
try { | ||
// insert/update {"tick":"514092205556","type":2300,"tid":"0","database":"1","cid":"513417247371","cname":"test","data":{"_id":"test/testkey","_key":"testkey","_rev":"514092205554",...}} | ||
// delete . {"tick":"514092206277","type":2302,"tid":"0","database":"1","cid":"513417247371","cname":"test","data":{"_key":"abcdef","_rev":"514092206275"}} | ||
/* | ||
operation type transaction id collection name document id optional select id (!in del ops) | ||
| | | | | document key | ||
| | | | | | */ | ||
const [str, type, tid, colName, strFix, id, key] = entry.match(/\{.*type\"\:(.*?)\,\"tid\"\:\"(.*?)\".*cname\"\:\"(.*?)\"[^{]*(\{(\"_id\"\:\".*?\"\,)?\"\_key\"\:\"(.*?)\")/); | ||
const strLen = str.length - strFix.length; | ||
// transaction {"tick":"514132959101","type":2200,"tid":"514132959099","database":"1"} | ||
// insert/update {"tick":"514092205556","type":2300,"tid":"0","database":"1","cid":"513417247371","cname":"test","data":{"_id":"test/testkey","_key":"testkey","_rev":"514092205554",...}} | ||
// delete {"tick":"514092206277","type":2302,"tid":"0","database":"1","cid":"513417247371","cname":"test","data":{"_key":"abcdef","_rev":"514092206275"}} | ||
const colConf = this.collectionsMap.get(colName); | ||
if (undefined === colConf) continue; | ||
const events = colConf.get('events'); | ||
const keys = colConf.get('keys'); | ||
idx0 = entry.indexOf(typeStartBuffer) + 6; // find type": | ||
idx1 = entry.indexOf(commaDoubleTickBuffer, idx0); // find ," | ||
type = entry.slice(idx0, idx1).toString(); | ||
if ( (0 === events.size || events.has(type)) && | ||
(0 === keys.size || keys.has(key)) ) { | ||
this.emit(colName, entry.slice(strLen, -1), mapTypeToText[type]); | ||
idx0 = entry.indexOf(commaDoubleTickBuffer, idx1+8) - 1; // find ," | ||
tid = entry.slice(idx1+8, idx0).toString(); | ||
if ('2200' === type) { // txn start | ||
txns.set(tid, new Set()); | ||
} else if ('2201' === type) { // txn commit and replay docs | ||
for(const data of txns.get(tid)) { | ||
idx0 = 0; | ||
[type, entry] = data; | ||
handleEntry(); | ||
} // for | ||
} else if ('2002' === type) { // txn abort | ||
txns.delete(tid); | ||
} else { | ||
if ('2300' !== type && '2302' !== type) continue; | ||
if ('0' !== tid) { | ||
txns.get(tid).add([type,entry.slice(idx0+14)]); | ||
continue; | ||
} // if | ||
}catch(e) { | ||
; // handle transactions 2200 / 2201 | ||
} | ||
handleEntry(); | ||
} // else | ||
} // while | ||
@@ -107,0 +162,0 @@ ticktock(); |
@@ -6,4 +6,4 @@ { | ||
"name": "arangochair", | ||
"version": "3.0.1", | ||
"description": "3rd party library; get notified when ArangoDBs documents change'", | ||
"version": "4.0.1", | ||
"description": "`arangochair` pushs ArangoDB changes in realtime to you", | ||
"main": "index.js", | ||
@@ -10,0 +10,0 @@ "devDependencies": {}, |
# arangochair | ||
get notified in realtime when ArangoDBs collections / documents change. | ||
`arangochair` pushs ArangoDB changes in realtime to you. | ||
## install | ||
@@ -15,4 +16,6 @@ | ||
no4 = new arangochair('http://127.0.0.1:8529'); | ||
const no4 = new arangochair('http://127.0.0.1:8529/'); // ArangoDB node to monitor | ||
const no4 = new arangochair('http://127.0.0.1:8529/myDb'); // ArangoDB node to monitor, with database name | ||
no4.subscribe({collection:'users'}); | ||
@@ -22,2 +25,5 @@ no4.start(); | ||
// do something awesome | ||
// doc:Buffer | ||
// type:'insert/update'|'delete' | ||
}); | ||
@@ -24,0 +30,0 @@ |
9865
168
64