Comparing version 2.0.1 to 3.0.0
@@ -28,4 +28,7 @@ /** | ||
this.ottype = ottype | ||
this.content = null | ||
this.history = new History(this) | ||
this.initialized = false | ||
this.slaves = [] | ||
@@ -53,2 +56,3 @@ this.links = [] | ||
doc.content = content | ||
doc.initialized = true | ||
doc.history.createDocument({contents: ottype.serialize? ottype.serialize(content) : content, edit: Edit.newInitial(ottype)}, function(er) { | ||
@@ -64,2 +68,3 @@ if(er) return cb(er) | ||
if(er) return cb(er) | ||
doc.initialized = true | ||
doc.content = ottype.deserialize? ottype.deserialize(snapshot.contents) : snapshot.contents | ||
@@ -151,2 +156,6 @@ cb(null, doc) | ||
link.on('link:requestHistorySince', function(since) { | ||
this.receiveRequestHistorySince(since, link) | ||
}.bind(this)) | ||
// Other side sends edit. | ||
@@ -169,2 +178,3 @@ link.on('link:edit', function onedit(edit) { | ||
* @param data {Object} Example: {contents: "", edit: <Edit..>} | ||
* @param fromLink | ||
*/ | ||
@@ -195,2 +205,3 @@ Document.prototype.receiveInit = function(data, fromLink) { | ||
this.initialized = true | ||
this.emit('init') | ||
@@ -202,2 +213,34 @@ | ||
/** | ||
* Receive a requestHistorySince message | ||
* | ||
* @param sinceEditId String The last known edit id by the slave | ||
* @param fromLink | ||
*/ | ||
Document.prototype.receiveRequestHistorySince = function(sinceEditId, fromLink) { | ||
// Check to see if we know that edit | ||
this.history.remembers(sinceEditId, function(er, remembersEdit) { | ||
if(er) return this.emit('error', er) | ||
if(!remembersEdit) { | ||
// Sorry pal. | ||
// XXX: Maybe we should send an 'init' here? | ||
return | ||
} | ||
this.history.getAllAfter(sinceEditId, function(er, snapshots) { | ||
if(er) return this.emit('error', er) | ||
fromLink.reset() | ||
snapshots | ||
.map(function(s) { | ||
return s.edit | ||
}) | ||
.forEach(fromLink.sendEdit.bind(fromLink)) | ||
}.bind(this)) | ||
}.bind(this)) | ||
} | ||
/** | ||
* Receive an edit | ||
@@ -241,2 +284,16 @@ * | ||
Document.prototype.dispatchEdit = function(edit, fromLink, cb) { | ||
// Also check if this might be sentEdit, cause if we've requested History, then | ||
// the other side has reset their queue and thus destroyed all acks. | ||
// So, fromLink.sentEdit might have been accepted, but we might not havw got the ACK | ||
if(fromLink.sentEdit && fromLink.sentEdit.id === edit.id) { | ||
fromLink.sentEdit.callback(null, fromLink.sentEdit) | ||
fromLink.sentEdit = null | ||
setImmediate(function() { | ||
fromLink._read(0) | ||
}) | ||
cb() | ||
return | ||
} | ||
this.history.remembers(edit.id, function(er, remembers) { | ||
@@ -247,3 +304,8 @@ if(er) return this.emit('error',er) | ||
// We've got this edit already. | ||
if(fromLink) fromLink.send('ack', edit.id) | ||
// If I'm master then we need to queue the ack | ||
// Slaves have to send it straight away | ||
if(fromLink === this.master) fromLink.send('ack' ,edit.id) | ||
else if (fromLink) fromLink.sendAck(edit.id) | ||
return cb(null, edit) | ||
@@ -255,8 +317,19 @@ } | ||
if (!remembersParent) { | ||
var e = new Error('Edit "'+edit.id+'" has unknown parent "'+edit.parent+'"') | ||
fromLink && fromLink.emit('editError', e) | ||
return cb(e) | ||
if(fromLink === this.master) { | ||
// we probably missed some edits, let's ask master! | ||
this.history.latest(function(er, latestSnapshot) { | ||
if(er) return this.emit('error', er) | ||
this.master.send('requestHistorySince', latestSnapshot.edit.id) | ||
}.bind(this)) | ||
return cb() | ||
}else { | ||
// I'm master, I can't have missed that edit. So, throw and re-init! | ||
var e = new Error('Edit "'+edit.id+'" has unknown parent "'+edit.parent+'"') | ||
fromLink && fromLink.emit('editError', e) | ||
return cb(e) | ||
} | ||
} | ||
this.sanitizeEdit(edit, fromLink, function(er, edit) { | ||
this.sanitizeEdit(edit, fromLink, function apply(er, edit) { | ||
if(er) { | ||
@@ -267,20 +340,33 @@ fromLink && fromLink.emit('editError', er) | ||
try { | ||
this.applyEdit(edit) | ||
}catch(er) { | ||
fromLink && fromLink.emit('editError', er) | ||
return cb(er) | ||
// EditableDocuments are initialized asynchronously, we have to wait | ||
// for their callback event... | ||
if(!this.initialized) { | ||
this.once('editableInitialized', apply.bind(this, null, edit)) | ||
return | ||
} | ||
// add to history | ||
var content = this.ottype.serialize? this.ottype.serialize(this.content) : this.content | ||
this.history.storeSnapshot({id: edit.id, contents: content, edit: edit}, function(er) { | ||
this.applyEdit(edit, false, function(er) { | ||
if(er) { | ||
this.emit('error', er) | ||
fromLink && fromLink.emit('editError', er) | ||
return cb(er) | ||
} | ||
if(fromLink) fromLink.send('ack', edit.id) | ||
this.distributeEdit(edit, fromLink) | ||
this.emit('edit', edit) | ||
cb(null, edit) | ||
// add to history | ||
var content = this.ottype.serialize? this.ottype.serialize(this.content) : this.content | ||
this.history.storeSnapshot({id: edit.id, contents: content, edit: edit}, function(er) { | ||
if(er) { | ||
this.emit('error', er) | ||
return cb(er) | ||
} | ||
// If I'm master then we need to queue the ack | ||
// Slaves have to send it straight away | ||
if(fromLink === this.master) fromLink.send('ack' ,edit.id) | ||
else if (fromLink) fromLink.sendAck(edit.id) | ||
this.distributeEdit(edit, fromLink) | ||
this.emit('edit', edit) | ||
cb(null, edit) | ||
}.bind(this)) | ||
}.bind(this)) | ||
@@ -324,3 +410,3 @@ }.bind(this)) | ||
Document.prototype.applyEdit = function(edit) { | ||
Document.prototype.applyEdit = function(edit, ownEdit, cb) { | ||
// apply changes | ||
@@ -330,5 +416,6 @@ console.log('Document: apply edit', edit) | ||
this.content = edit.apply(this.content) | ||
cb() | ||
}catch(e) { | ||
e.message = 'Applying edit failed: '+e.message | ||
throw e | ||
cb(e) | ||
} | ||
@@ -335,0 +422,0 @@ } |
@@ -82,5 +82,5 @@ /** | ||
*/ | ||
Edit.prototype.follow = function(edit) { | ||
Edit.prototype.follow = function(edit, left) { | ||
if(this.parent != edit.parent) throw new Error('Trying to follow an edit that is not a direct sibling.') | ||
this.transformAgainst(edit) | ||
this.transformAgainst(edit, left) | ||
this.parent = edit.id | ||
@@ -87,0 +87,0 @@ } |
@@ -22,2 +22,3 @@ /** | ||
function EditableDocument() { | ||
this.initialized = false | ||
Document.apply(this, arguments) | ||
@@ -40,3 +41,9 @@ } | ||
EditableDocument.prototype.receiveInit = function(data, fromLink) { | ||
this._change(Document.prototype.receiveInit.call(this, data, fromLink)) | ||
var content = Document.prototype.receiveInit.call(this, data, fromLink) | ||
this.initialized = false | ||
this._setContents(content, function(er) { | ||
if(er) return this.emit('error', er) | ||
this.initialized = true | ||
this.emit('editableInitialized') | ||
}.bind(this)) | ||
} | ||
@@ -62,8 +69,10 @@ | ||
// Merge into the queue for increased collab speed | ||
if(this.master.queue.length == 1) { | ||
var parent = this.master.queue[0].parent | ||
, callback =this.master.queue[0].callback | ||
this.master.queue[0] = this.master.queue[0].merge(edit) | ||
this.master.queue[0].callback = callback | ||
this.master.queue[0].parent = parent | ||
if(this.master.queue.length && 'edit' === this.master.queue[this.master.queue.length-1][0]) { | ||
var pendingEdit = this.master.queue.pop()[1] | ||
, parent = pendingEdit.parent | ||
, callback = pendingEdit.callback | ||
pendingEdit = pendingEdit.merge(edit) | ||
pendingEdit.callback = callback | ||
pendingEdit.parent = parent | ||
this.master.queue.push(['edit', pendingEdit]) | ||
return | ||
@@ -74,8 +83,11 @@ } | ||
// Update queue | ||
this.master.queue.forEach(function(queuedEdit) { | ||
queuedEdit.parent = edit.id | ||
this.master.queue.forEach(function(pending) { | ||
if('edit' === pending[0]) { | ||
pending[1].parent = edit.id | ||
} | ||
}) | ||
this.applyEdit(edit, true) | ||
//this.distributeEdit(edit) // Unnecessary round trip | ||
this.history.storeSnapshot({contents: this.content, edit: edit}) | ||
this.applyEdit(edit, true, function() { | ||
//this.distributeEdit(edit) // Unnecessary round trip | ||
this.history.storeSnapshot({contents: this.content, edit: edit}) | ||
}.bind(this)) | ||
}.bind(this)) | ||
@@ -85,51 +97,57 @@ }.bind(this)) | ||
// overrides Document#sanitizeEdit | ||
EditableDocument.prototype.sanitizeEdit = function(incoming, fromLink, cb) { | ||
// Collect undetected local changes, before applying the new edit | ||
this._collectChanges() | ||
// overrides Document#applyEdit | ||
EditableDocument.prototype.applyEdit = function(edit, ownEdit, cb) { | ||
// apply changes | ||
console.log('EditableDocument: apply edit', edit, ownEdit) | ||
try { | ||
this.content = edit.apply(this.content) | ||
// Transform against possibly missed edits that have happened in the meantime, | ||
// so that we can apply it | ||
if(!ownEdit) { | ||
// Collect undetected local changes, before applying the new edit | ||
this._collectChanges(function(er) { | ||
if(er) return cb(er) | ||
var incomingOriginal | ||
// Transform against possibly missed edits that have happened in the meantime, | ||
// so that we can apply it | ||
if(this.master.sentEdit) { | ||
incomingOriginal = incoming.clone() | ||
incoming.transformAgainst(this.master.sentEdit) | ||
this.master.sentEdit.follow(incomingOriginal) // Why!? | ||
} | ||
var incoming = edit | ||
, incomingOriginal | ||
incomingOriginal = incoming.clone() | ||
if(this.master.sentEdit) { | ||
incomingOriginal = incoming.clone() | ||
incoming.transformAgainst(this.master.sentEdit, true) | ||
this.master.sentEdit.follow(incomingOriginal) // Why!? So that our history is correct | ||
} | ||
// transform incoming against pending | ||
this.master.queue.forEach(function(pendingEdit) { | ||
incoming.transformAgainst(pendingEdit) | ||
}) | ||
incomingOriginal = incoming.clone() | ||
// Transform pending edits against the incoming one | ||
this.master.queue.forEach(function(pendingEdit, i) { | ||
if(i === 0) { | ||
pendingEdit.follow(incomingOriginal) // transform + adjust parentage for the first in the line | ||
} | ||
else { | ||
pendingEdit.transformAgainst(incomingOriginal) // all others have their predecessors as parents | ||
} | ||
// transform incoming against pending | ||
this.master.queue.forEach(function(pending) { | ||
if('edit' === pending[0]) incoming.transformAgainst(pending[1], true) | ||
}) | ||
incomingOriginal.transformAgainst(pendingEdit) | ||
}) | ||
// Transform pending edits against the incoming one | ||
var first = true | ||
this.master.queue.forEach(function(pending) { | ||
if(pending[0] !== 'edit') return | ||
var pendingEdit = pending[1] | ||
if(first) { | ||
pendingEdit.follow(incomingOriginal) // transform + adjust parentage for the first in the line | ||
first = false | ||
} | ||
else { | ||
pendingEdit.transformAgainst(incomingOriginal) // all others have their predecessors as parents | ||
} | ||
cb(null, incoming) | ||
} | ||
// overrides Document#applyEdit | ||
EditableDocument.prototype.applyEdit = function(edit, ownEdit) { | ||
// apply changes | ||
console.log('EditableDocument: apply edit', edit, ownEdit) | ||
try { | ||
this.content = edit.apply(this.content) | ||
if(!ownEdit) this._change(this.content, edit.changeset) | ||
incomingOriginal.transformAgainst(pendingEdit) | ||
}) | ||
this._change(incoming.changeset, cb) | ||
}.bind(this)) | ||
}else{ | ||
cb() | ||
} | ||
}catch(e) { | ||
e.message = 'Applying edit failed: '+e.message | ||
throw e | ||
cb(e) | ||
} | ||
} |
@@ -87,3 +87,3 @@ /** | ||
if(this.queue.length || this.sentEdit) { | ||
this.queue.push(edit) | ||
this.queue.push(['edit', edit]) | ||
} | ||
@@ -96,2 +96,11 @@ else { | ||
Link.prototype.sendAck = function(editId) { | ||
if(this.queue.length || this.sentEdit) { | ||
this.queue.push(['ack', editId]) | ||
} | ||
else { | ||
this.send('ack', editId) | ||
} | ||
} | ||
// This is only used to push edits from the queue into the pipeline. | ||
@@ -102,4 +111,12 @@ // All other events are pushed directly in .send() | ||
if(!this.queue[0]) return | ||
this.sentEdit = this.queue.shift() | ||
this.send('edit', this.sentEdit.pack()) | ||
var msg | ||
while(msg = this.queue.shift()) { | ||
if('edit' === msg[0]) { | ||
this.sentEdit = msg[1] | ||
msg[1] = msg[1].pack() | ||
} | ||
this.send.apply(this, msg) | ||
if('edit' === msg[0]) break | ||
} | ||
} | ||
@@ -139,3 +156,3 @@ | ||
if(this.sentEdit && this.sentEdit.callback) { | ||
if(this.sentEdit && typeof(this.sentEdit.callback) == 'function') { | ||
// Callback | ||
@@ -142,0 +159,0 @@ this.sentEdit.id = id |
{ | ||
"name": "gulf", | ||
"version": "2.0.1", | ||
"version": "3.0.0", | ||
"description": "transport-agnostic operational transformation control layer", | ||
@@ -30,3 +30,5 @@ "repository": { | ||
"expect.js": "*", | ||
"ottypes": "*" | ||
"ottypes": "*", | ||
"mux-dmx": "*", | ||
"through2": "*" | ||
}, | ||
@@ -33,0 +35,0 @@ "author": "Marcel Klehr <mklehr@gmx.net>", |
/* global xdescribe, describe, it, xit */ | ||
var gulf, expect | ||
, ottype = require('ottypes').text | ||
, MuxDmx = require('mux-dmx') | ||
, through = require('through2') | ||
@@ -52,6 +54,12 @@ | ||
content = '' | ||
docB._change = function(newcontent, cs) { | ||
docB._collectChanges = function(cb) { cb() } | ||
docB._setContents = function(newcontent, cb) { | ||
content = newcontent | ||
console.log('_change: ', newcontent) | ||
cb() | ||
} | ||
docB._change = function(cs, cb) { | ||
content = ottype.apply(content, cs) | ||
console.log('_change: ', content) | ||
cb() | ||
} | ||
@@ -144,15 +152,25 @@ linkA = docA.slaveLink() | ||
contentA = '' | ||
docA._collectChanges = function() {} | ||
docA._change = function(newcontent, cs) { | ||
docA._collectChanges = function(cb) {cb()} | ||
docA._setContents = function(newcontent, cb) { | ||
contentA = newcontent | ||
console.log('_change(A): ', newcontent, cs) | ||
cb() | ||
} | ||
docA._change = function(cs, cb) { | ||
contentA = ottype.apply(contentA, cs) | ||
console.log('_change(A): ', cs, contentA) | ||
cb() | ||
} | ||
docB = new gulf.EditableDocument(new gulf.MemoryAdapter, ottype) | ||
contentB = '' | ||
docB._collectChanges = function() {} | ||
docB._change = function(newcontent, cs) { | ||
docB._collectChanges = function(cb) {cb()} | ||
docB._setContents = function(newcontent, cb) { | ||
contentB = newcontent | ||
console.log('_change(B): ', newcontent, cs) | ||
cb() | ||
} | ||
docB._change = function(cs, cb) { | ||
contentB = ottype.apply(contentB, cs) | ||
console.log('_change(B): ', cs, contentB) | ||
cb() | ||
} | ||
@@ -194,10 +212,18 @@ linkA = docA.masterLink() | ||
contentA = 'abcd1' | ||
docA.update([4, '1']) | ||
contentA = 'abcd12' | ||
docA.update([4, '1']) // this edit will be sent | ||
setImmediate(function() { | ||
docA.update([5, '2']) // this edit will be queued | ||
}) | ||
contentB = 'abcd2' | ||
docB.update([4, '2']) | ||
contentB = 'abcd34' | ||
docB.update([4, '3']) // this edit will be sent | ||
setImmediate(function() { | ||
docB.update([5, '4']) // this edit will be queued | ||
}) | ||
linkA.pipe(masterDoc.slaveLink()).pipe(linkA) | ||
linkB.pipe(masterDoc.slaveLink()).pipe(linkB) | ||
setImmediate(function() { | ||
linkA.pipe(masterDoc.slaveLink()).pipe(linkA) | ||
linkB.pipe(masterDoc.slaveLink()).pipe(linkB) | ||
}) | ||
@@ -207,4 +233,32 @@ setTimeout(function() { | ||
cb() | ||
}, 200) | ||
}, 500) | ||
}) | ||
it('should catch up on reconnect', function(cb) { | ||
// disconnect B | ||
linkB.unpipe() | ||
masterDoc.links[3].unpipe() | ||
contentA = 'abcdx1324' | ||
docA.update([4, 'x']) // this edit will be sent | ||
contentB = 'abcd1324QR' | ||
docB.update([8, 'Q']) | ||
docB.update([9, 'R']) | ||
setTimeout(function() { | ||
// reconnect B | ||
console.log('reconnect B') | ||
linkB.pipe(masterDoc.slaveLink()).pipe(linkB) | ||
// change A | ||
contentA = 'abcdxy1324' | ||
docA.update([5, 'y']) | ||
setTimeout(function() { | ||
expect(contentB).to.equal('abcdxy1324QR') | ||
expect(contentB).to.equal(contentA) | ||
cb() | ||
}, 100) | ||
}, 100) | ||
}) | ||
}) | ||
@@ -254,2 +308,107 @@ | ||
}) | ||
describe('Linking documents in parallel environments', function() { | ||
var initialContent = 'abc' | ||
var master, docA, docB | ||
var linkA, linkB | ||
var contentA, contentB | ||
before(function(cb) { | ||
docA = new gulf.EditableDocument(new gulf.MemoryAdapter, ottype) | ||
contentA = '' | ||
docA._collectChanges = function(cb) {cb()} | ||
docA._setContents = function(newcontent, cb) { | ||
contentA = newcontent | ||
cb() | ||
} | ||
docA._change = function(cs, cb) { | ||
contentA = ottype.apply(contentA,cs ) | ||
console.log('_change(A): ', cs, contentA) | ||
cb() | ||
} | ||
docB = new gulf.EditableDocument(new gulf.MemoryAdapter, ottype) | ||
contentB = '' | ||
docB._collectChanges = function(cb) {cb()} | ||
docB._setContents = function(newcontent, cb) { | ||
contentB = newcontent | ||
cb() | ||
} | ||
docB._change = function(cs, cb) { | ||
contentB = ottype.apply(contentB, cs) | ||
console.log('_change(B): ', cs, contentB) | ||
cb() | ||
} | ||
master = require('child_process') | ||
.fork(__dirname+'/helper/parallelmaster_fork.js', [initialContent], | ||
{silent: true}) | ||
master.stderr.pipe(process.stdout) | ||
master.on('error', function(e) { | ||
throw e | ||
}) | ||
linkA = docA.masterLink() | ||
linkB = docB.masterLink() | ||
setTimeout(cb, 100) | ||
}) | ||
it('should propagate initial contents correctly', function(cb) { | ||
var mux = MuxDmx() | ||
master.stdout.pipe(mux).pipe(master.stdin) | ||
linkA.pipe(mux.createDuplexStream(new Buffer('a'))) | ||
// add 100ms latency | ||
.pipe(through(function(chunk, enc, cb) { | ||
setTimeout(function() { | ||
this.push(chunk) | ||
cb() | ||
}.bind(this), 100) | ||
})).pipe(linkA) | ||
linkB.pipe(mux.createDuplexStream(new Buffer('b'))).pipe(linkB) | ||
setTimeout(function() { | ||
expect(contentA).to.equal(initialContent) | ||
expect(contentB).to.equal(initialContent) | ||
cb() | ||
}, 700) | ||
}) | ||
it('should correctly propagate the first edit from one end to the other end', function(cb) { | ||
contentA = 'abcd' | ||
docA.update([3, 'd']) | ||
setTimeout(function() { | ||
expect(docA.content).to.eql(contentA) | ||
expect(docB.content).to.eql(contentA) | ||
expect(contentB).to.eql(contentA) | ||
cb() | ||
}, 500) | ||
}) | ||
it('should correctly propagate edits from one end to the other end', function(cb) { | ||
contentA = 'abcd123' | ||
docA.update([4, '1']) // this edit will be sent | ||
contentB = 'abcd45' | ||
docB.update([4, '4']) // this edit will be sent | ||
setImmediate(function() { | ||
docA.update([5, '2']) // this edit will be queued | ||
docA.update([6, '3']) | ||
docB.update([5, '5']) // this edit will be queued | ||
}) | ||
setTimeout(function() { | ||
console.log(contentA, contentB) | ||
expect(contentB).to.eql(contentA) | ||
cb() | ||
}, 500) | ||
}) | ||
after(function() { | ||
master.kill() | ||
}) | ||
}) | ||
}) |
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
90581
14
1261
5
1