New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

gulf

Package Overview
Dependencies
Maintainers
1
Versions
43
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

gulf - npm Package Compare versions

Comparing version 2.0.1 to 3.0.0

test/helper/parallelmaster_fork.js

127

lib/Document.js

@@ -28,4 +28,7 @@ /**

this.ottype = ottype
this.content = null
this.history = new History(this)
this.initialized = false
this.slaves = []

@@ -53,2 +56,3 @@ this.links = []

doc.content = content
doc.initialized = true
doc.history.createDocument({contents: ottype.serialize? ottype.serialize(content) : content, edit: Edit.newInitial(ottype)}, function(er) {

@@ -64,2 +68,3 @@ if(er) return cb(er)

if(er) return cb(er)
doc.initialized = true
doc.content = ottype.deserialize? ottype.deserialize(snapshot.contents) : snapshot.contents

@@ -151,2 +156,6 @@ cb(null, doc)

link.on('link:requestHistorySince', function(since) {
this.receiveRequestHistorySince(since, link)
}.bind(this))
// Other side sends edit.

@@ -169,2 +178,3 @@ link.on('link:edit', function onedit(edit) {

* @param data {Object} Example: {contents: "", edit: <Edit..>}
* @param fromLink
*/

@@ -195,2 +205,3 @@ Document.prototype.receiveInit = function(data, fromLink) {

this.initialized = true
this.emit('init')

@@ -202,2 +213,34 @@

/**
* Receive a requestHistorySince message
*
* @param sinceEditId String The last known edit id by the slave
* @param fromLink
*/
Document.prototype.receiveRequestHistorySince = function(sinceEditId, fromLink) {
// Check to see if we know that edit
this.history.remembers(sinceEditId, function(er, remembersEdit) {
if(er) return this.emit('error', er)
if(!remembersEdit) {
// Sorry pal.
// XXX: Maybe we should send an 'init' here?
return
}
this.history.getAllAfter(sinceEditId, function(er, snapshots) {
if(er) return this.emit('error', er)
fromLink.reset()
snapshots
.map(function(s) {
return s.edit
})
.forEach(fromLink.sendEdit.bind(fromLink))
}.bind(this))
}.bind(this))
}
/**
* Receive an edit

@@ -241,2 +284,16 @@ *

Document.prototype.dispatchEdit = function(edit, fromLink, cb) {
// Also check if this might be sentEdit, cause if we've requested History, then
// the other side has reset their queue and thus destroyed all acks.
// So, fromLink.sentEdit might have been accepted, but we might not havw got the ACK
if(fromLink.sentEdit && fromLink.sentEdit.id === edit.id) {
fromLink.sentEdit.callback(null, fromLink.sentEdit)
fromLink.sentEdit = null
setImmediate(function() {
fromLink._read(0)
})
cb()
return
}
this.history.remembers(edit.id, function(er, remembers) {

@@ -247,3 +304,8 @@ if(er) return this.emit('error',er)

// We've got this edit already.
if(fromLink) fromLink.send('ack', edit.id)
// If I'm master then we need to queue the ack
// Slaves have to send it straight away
if(fromLink === this.master) fromLink.send('ack' ,edit.id)
else if (fromLink) fromLink.sendAck(edit.id)
return cb(null, edit)

@@ -255,8 +317,19 @@ }

if (!remembersParent) {
var e = new Error('Edit "'+edit.id+'" has unknown parent "'+edit.parent+'"')
fromLink && fromLink.emit('editError', e)
return cb(e)
if(fromLink === this.master) {
// we probably missed some edits, let's ask master!
this.history.latest(function(er, latestSnapshot) {
if(er) return this.emit('error', er)
this.master.send('requestHistorySince', latestSnapshot.edit.id)
}.bind(this))
return cb()
}else {
// I'm master, I can't have missed that edit. So, throw and re-init!
var e = new Error('Edit "'+edit.id+'" has unknown parent "'+edit.parent+'"')
fromLink && fromLink.emit('editError', e)
return cb(e)
}
}
this.sanitizeEdit(edit, fromLink, function(er, edit) {
this.sanitizeEdit(edit, fromLink, function apply(er, edit) {
if(er) {

@@ -267,20 +340,33 @@ fromLink && fromLink.emit('editError', er)

try {
this.applyEdit(edit)
}catch(er) {
fromLink && fromLink.emit('editError', er)
return cb(er)
// EditableDocuments are initialized asynchronously, we have to wait
// for their callback event...
if(!this.initialized) {
this.once('editableInitialized', apply.bind(this, null, edit))
return
}
// add to history
var content = this.ottype.serialize? this.ottype.serialize(this.content) : this.content
this.history.storeSnapshot({id: edit.id, contents: content, edit: edit}, function(er) {
this.applyEdit(edit, false, function(er) {
if(er) {
this.emit('error', er)
fromLink && fromLink.emit('editError', er)
return cb(er)
}
if(fromLink) fromLink.send('ack', edit.id)
this.distributeEdit(edit, fromLink)
this.emit('edit', edit)
cb(null, edit)
// add to history
var content = this.ottype.serialize? this.ottype.serialize(this.content) : this.content
this.history.storeSnapshot({id: edit.id, contents: content, edit: edit}, function(er) {
if(er) {
this.emit('error', er)
return cb(er)
}
// If I'm master then we need to queue the ack
// Slaves have to send it straight away
if(fromLink === this.master) fromLink.send('ack' ,edit.id)
else if (fromLink) fromLink.sendAck(edit.id)
this.distributeEdit(edit, fromLink)
this.emit('edit', edit)
cb(null, edit)
}.bind(this))
}.bind(this))

@@ -324,3 +410,3 @@ }.bind(this))

Document.prototype.applyEdit = function(edit) {
Document.prototype.applyEdit = function(edit, ownEdit, cb) {
// apply changes

@@ -330,5 +416,6 @@ console.log('Document: apply edit', edit)

this.content = edit.apply(this.content)
cb()
}catch(e) {
e.message = 'Applying edit failed: '+e.message
throw e
cb(e)
}

@@ -335,0 +422,0 @@ }

4

lib/Edit.js

@@ -82,5 +82,5 @@ /**

*/
Edit.prototype.follow = function(edit) {
Edit.prototype.follow = function(edit, left) {
if(this.parent != edit.parent) throw new Error('Trying to follow an edit that is not a direct sibling.')
this.transformAgainst(edit)
this.transformAgainst(edit, left)
this.parent = edit.id

@@ -87,0 +87,0 @@ }

@@ -22,2 +22,3 @@ /**

function EditableDocument() {
this.initialized = false
Document.apply(this, arguments)

@@ -40,3 +41,9 @@ }

EditableDocument.prototype.receiveInit = function(data, fromLink) {
this._change(Document.prototype.receiveInit.call(this, data, fromLink))
var content = Document.prototype.receiveInit.call(this, data, fromLink)
this.initialized = false
this._setContents(content, function(er) {
if(er) return this.emit('error', er)
this.initialized = true
this.emit('editableInitialized')
}.bind(this))
}

@@ -62,8 +69,10 @@

// Merge into the queue for increased collab speed
if(this.master.queue.length == 1) {
var parent = this.master.queue[0].parent
, callback =this.master.queue[0].callback
this.master.queue[0] = this.master.queue[0].merge(edit)
this.master.queue[0].callback = callback
this.master.queue[0].parent = parent
if(this.master.queue.length && 'edit' === this.master.queue[this.master.queue.length-1][0]) {
var pendingEdit = this.master.queue.pop()[1]
, parent = pendingEdit.parent
, callback = pendingEdit.callback
pendingEdit = pendingEdit.merge(edit)
pendingEdit.callback = callback
pendingEdit.parent = parent
this.master.queue.push(['edit', pendingEdit])
return

@@ -74,8 +83,11 @@ }

// Update queue
this.master.queue.forEach(function(queuedEdit) {
queuedEdit.parent = edit.id
this.master.queue.forEach(function(pending) {
if('edit' === pending[0]) {
pending[1].parent = edit.id
}
})
this.applyEdit(edit, true)
//this.distributeEdit(edit) // Unnecessary round trip
this.history.storeSnapshot({contents: this.content, edit: edit})
this.applyEdit(edit, true, function() {
//this.distributeEdit(edit) // Unnecessary round trip
this.history.storeSnapshot({contents: this.content, edit: edit})
}.bind(this))
}.bind(this))

@@ -85,51 +97,57 @@ }.bind(this))

// overrides Document#sanitizeEdit
EditableDocument.prototype.sanitizeEdit = function(incoming, fromLink, cb) {
// Collect undetected local changes, before applying the new edit
this._collectChanges()
// overrides Document#applyEdit
EditableDocument.prototype.applyEdit = function(edit, ownEdit, cb) {
// apply changes
console.log('EditableDocument: apply edit', edit, ownEdit)
try {
this.content = edit.apply(this.content)
// Transform against possibly missed edits that have happened in the meantime,
// so that we can apply it
if(!ownEdit) {
// Collect undetected local changes, before applying the new edit
this._collectChanges(function(er) {
if(er) return cb(er)
var incomingOriginal
// Transform against possibly missed edits that have happened in the meantime,
// so that we can apply it
if(this.master.sentEdit) {
incomingOriginal = incoming.clone()
incoming.transformAgainst(this.master.sentEdit)
this.master.sentEdit.follow(incomingOriginal) // Why!?
}
var incoming = edit
, incomingOriginal
incomingOriginal = incoming.clone()
if(this.master.sentEdit) {
incomingOriginal = incoming.clone()
incoming.transformAgainst(this.master.sentEdit, true)
this.master.sentEdit.follow(incomingOriginal) // Why!? So that our history is correct
}
// transform incoming against pending
this.master.queue.forEach(function(pendingEdit) {
incoming.transformAgainst(pendingEdit)
})
incomingOriginal = incoming.clone()
// Transform pending edits against the incoming one
this.master.queue.forEach(function(pendingEdit, i) {
if(i === 0) {
pendingEdit.follow(incomingOriginal) // transform + adjust parentage for the first in the line
}
else {
pendingEdit.transformAgainst(incomingOriginal) // all others have their predecessors as parents
}
// transform incoming against pending
this.master.queue.forEach(function(pending) {
if('edit' === pending[0]) incoming.transformAgainst(pending[1], true)
})
incomingOriginal.transformAgainst(pendingEdit)
})
// Transform pending edits against the incoming one
var first = true
this.master.queue.forEach(function(pending) {
if(pending[0] !== 'edit') return
var pendingEdit = pending[1]
if(first) {
pendingEdit.follow(incomingOriginal) // transform + adjust parentage for the first in the line
first = false
}
else {
pendingEdit.transformAgainst(incomingOriginal) // all others have their predecessors as parents
}
cb(null, incoming)
}
// overrides Document#applyEdit
EditableDocument.prototype.applyEdit = function(edit, ownEdit) {
// apply changes
console.log('EditableDocument: apply edit', edit, ownEdit)
try {
this.content = edit.apply(this.content)
if(!ownEdit) this._change(this.content, edit.changeset)
incomingOriginal.transformAgainst(pendingEdit)
})
this._change(incoming.changeset, cb)
}.bind(this))
}else{
cb()
}
}catch(e) {
e.message = 'Applying edit failed: '+e.message
throw e
cb(e)
}
}

@@ -87,3 +87,3 @@ /**

if(this.queue.length || this.sentEdit) {
this.queue.push(edit)
this.queue.push(['edit', edit])
}

@@ -96,2 +96,11 @@ else {

Link.prototype.sendAck = function(editId) {
if(this.queue.length || this.sentEdit) {
this.queue.push(['ack', editId])
}
else {
this.send('ack', editId)
}
}
// This is only used to push edits from the queue into the pipeline.

@@ -102,4 +111,12 @@ // All other events are pushed directly in .send()

if(!this.queue[0]) return
this.sentEdit = this.queue.shift()
this.send('edit', this.sentEdit.pack())
var msg
while(msg = this.queue.shift()) {
if('edit' === msg[0]) {
this.sentEdit = msg[1]
msg[1] = msg[1].pack()
}
this.send.apply(this, msg)
if('edit' === msg[0]) break
}
}

@@ -139,3 +156,3 @@

if(this.sentEdit && this.sentEdit.callback) {
if(this.sentEdit && typeof(this.sentEdit.callback) == 'function') {
// Callback

@@ -142,0 +159,0 @@ this.sentEdit.id = id

{
"name": "gulf",
"version": "2.0.1",
"version": "3.0.0",
"description": "transport-agnostic operational transformation control layer",

@@ -30,3 +30,5 @@ "repository": {

"expect.js": "*",
"ottypes": "*"
"ottypes": "*",
"mux-dmx": "*",
"through2": "*"
},

@@ -33,0 +35,0 @@ "author": "Marcel Klehr <mklehr@gmx.net>",

/* global xdescribe, describe, it, xit */
var gulf, expect
, ottype = require('ottypes').text
, MuxDmx = require('mux-dmx')
, through = require('through2')

@@ -52,6 +54,12 @@

content = ''
docB._change = function(newcontent, cs) {
docB._collectChanges = function(cb) { cb() }
docB._setContents = function(newcontent, cb) {
content = newcontent
console.log('_change: ', newcontent)
cb()
}
docB._change = function(cs, cb) {
content = ottype.apply(content, cs)
console.log('_change: ', content)
cb()
}

@@ -144,15 +152,25 @@ linkA = docA.slaveLink()

contentA = ''
docA._collectChanges = function() {}
docA._change = function(newcontent, cs) {
docA._collectChanges = function(cb) {cb()}
docA._setContents = function(newcontent, cb) {
contentA = newcontent
console.log('_change(A): ', newcontent, cs)
cb()
}
docA._change = function(cs, cb) {
contentA = ottype.apply(contentA, cs)
console.log('_change(A): ', cs, contentA)
cb()
}
docB = new gulf.EditableDocument(new gulf.MemoryAdapter, ottype)
contentB = ''
docB._collectChanges = function() {}
docB._change = function(newcontent, cs) {
docB._collectChanges = function(cb) {cb()}
docB._setContents = function(newcontent, cb) {
contentB = newcontent
console.log('_change(B): ', newcontent, cs)
cb()
}
docB._change = function(cs, cb) {
contentB = ottype.apply(contentB, cs)
console.log('_change(B): ', cs, contentB)
cb()
}

@@ -194,10 +212,18 @@ linkA = docA.masterLink()

contentA = 'abcd1'
docA.update([4, '1'])
contentA = 'abcd12'
docA.update([4, '1']) // this edit will be sent
setImmediate(function() {
docA.update([5, '2']) // this edit will be queued
})
contentB = 'abcd2'
docB.update([4, '2'])
contentB = 'abcd34'
docB.update([4, '3']) // this edit will be sent
setImmediate(function() {
docB.update([5, '4']) // this edit will be queued
})
linkA.pipe(masterDoc.slaveLink()).pipe(linkA)
linkB.pipe(masterDoc.slaveLink()).pipe(linkB)
setImmediate(function() {
linkA.pipe(masterDoc.slaveLink()).pipe(linkA)
linkB.pipe(masterDoc.slaveLink()).pipe(linkB)
})

@@ -207,4 +233,32 @@ setTimeout(function() {

cb()
}, 200)
}, 500)
})
it('should catch up on reconnect', function(cb) {
// disconnect B
linkB.unpipe()
masterDoc.links[3].unpipe()
contentA = 'abcdx1324'
docA.update([4, 'x']) // this edit will be sent
contentB = 'abcd1324QR'
docB.update([8, 'Q'])
docB.update([9, 'R'])
setTimeout(function() {
// reconnect B
console.log('reconnect B')
linkB.pipe(masterDoc.slaveLink()).pipe(linkB)
// change A
contentA = 'abcdxy1324'
docA.update([5, 'y'])
setTimeout(function() {
expect(contentB).to.equal('abcdxy1324QR')
expect(contentB).to.equal(contentA)
cb()
}, 100)
}, 100)
})
})

@@ -254,2 +308,107 @@

})
describe('Linking documents in parallel environments', function() {
var initialContent = 'abc'
var master, docA, docB
var linkA, linkB
var contentA, contentB
before(function(cb) {
docA = new gulf.EditableDocument(new gulf.MemoryAdapter, ottype)
contentA = ''
docA._collectChanges = function(cb) {cb()}
docA._setContents = function(newcontent, cb) {
contentA = newcontent
cb()
}
docA._change = function(cs, cb) {
contentA = ottype.apply(contentA,cs )
console.log('_change(A): ', cs, contentA)
cb()
}
docB = new gulf.EditableDocument(new gulf.MemoryAdapter, ottype)
contentB = ''
docB._collectChanges = function(cb) {cb()}
docB._setContents = function(newcontent, cb) {
contentB = newcontent
cb()
}
docB._change = function(cs, cb) {
contentB = ottype.apply(contentB, cs)
console.log('_change(B): ', cs, contentB)
cb()
}
master = require('child_process')
.fork(__dirname+'/helper/parallelmaster_fork.js', [initialContent],
{silent: true})
master.stderr.pipe(process.stdout)
master.on('error', function(e) {
throw e
})
linkA = docA.masterLink()
linkB = docB.masterLink()
setTimeout(cb, 100)
})
it('should propagate initial contents correctly', function(cb) {
var mux = MuxDmx()
master.stdout.pipe(mux).pipe(master.stdin)
linkA.pipe(mux.createDuplexStream(new Buffer('a')))
// add 100ms latency
.pipe(through(function(chunk, enc, cb) {
setTimeout(function() {
this.push(chunk)
cb()
}.bind(this), 100)
})).pipe(linkA)
linkB.pipe(mux.createDuplexStream(new Buffer('b'))).pipe(linkB)
setTimeout(function() {
expect(contentA).to.equal(initialContent)
expect(contentB).to.equal(initialContent)
cb()
}, 700)
})
it('should correctly propagate the first edit from one end to the other end', function(cb) {
contentA = 'abcd'
docA.update([3, 'd'])
setTimeout(function() {
expect(docA.content).to.eql(contentA)
expect(docB.content).to.eql(contentA)
expect(contentB).to.eql(contentA)
cb()
}, 500)
})
it('should correctly propagate edits from one end to the other end', function(cb) {
contentA = 'abcd123'
docA.update([4, '1']) // this edit will be sent
contentB = 'abcd45'
docB.update([4, '4']) // this edit will be sent
setImmediate(function() {
docA.update([5, '2']) // this edit will be queued
docA.update([6, '3'])
docB.update([5, '5']) // this edit will be queued
})
setTimeout(function() {
console.log(contentA, contentB)
expect(contentB).to.eql(contentA)
cb()
}, 500)
})
after(function() {
master.kill()
})
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc