Comparing version 3.1.3 to 4.0.0
@@ -26,2 +26,3 @@ /** | ||
EventEmitter.apply(this) | ||
this.id | ||
this.adapter = adapter | ||
@@ -55,6 +56,10 @@ this.ottype = ottype | ||
var doc = new Document(adapter, ottype) | ||
doc.history.createDocument({contents: ottype.serialize? ottype.serialize(content) : content, edit: Edit.newInitial(ottype)}, function(er) { | ||
doc.history.createDocument({ | ||
contents: ottype.serialize? ottype.serialize(content) : content | ||
, edit: Edit.newInitial(ottype) | ||
}, function(er, id) { | ||
if(er) return cb(er) | ||
doc.initialized = true | ||
doc.content = content | ||
doc.id = id | ||
doc.emit('init') | ||
@@ -66,8 +71,11 @@ cb(null, doc) | ||
Document.load = function(adapter, ottype, cb) { | ||
Document.load = function(adapter, ottype, id, cb) { | ||
var doc = new Document(adapter, ottype) | ||
doc.adapter.getLatestSnapshot(function(er, snapshot) { | ||
doc.adapter.getLatestSnapshot(id, function(er, snapshot) { | ||
if(er) return cb(er) | ||
doc.initialized = true | ||
doc.content = ottype.deserialize? ottype.deserialize(snapshot.contents) : snapshot.contents | ||
doc.content = ottype.deserialize? | ||
ottype.deserialize(snapshot.contents) | ||
: snapshot.contents | ||
doc.id = id | ||
doc.emit('init') | ||
@@ -159,3 +167,3 @@ cb(null, doc) | ||
link.on('link:edit', function onedit(edit) { | ||
this.receiveEdit(edit, link) | ||
this.receiveEdit(edit, link.authenticated, link) | ||
}.bind(this)) | ||
@@ -258,3 +266,3 @@ | ||
*/ | ||
Document.prototype.receiveEdit = function(edit, fromLink, callback) { | ||
Document.prototype.receiveEdit = function(edit, author, fromLink, callback) { | ||
console.log('receiveEdit', edit) | ||
@@ -265,3 +273,3 @@ edit = Edit.unpack(edit, this.ottype) | ||
this.queue.push(function(cb) { | ||
this.dispatchEdit(edit, fromLink, function(er, edit) { | ||
this.dispatchEdit(edit, author, fromLink, function(er, edit) { | ||
cb() | ||
@@ -276,3 +284,3 @@ callback && callback(er, edit) | ||
this.queue.push(function(cb) { | ||
this.dispatchEdit(edit, fromLink, function(er, edit) { | ||
this.dispatchEdit(edit, author,fromLink, function(er, edit) { | ||
cb() | ||
@@ -293,3 +301,3 @@ callback && callback(er, edit) | ||
*/ | ||
Document.prototype.dispatchEdit = function(edit, fromLink, cb) { | ||
Document.prototype.dispatchEdit = function(edit, author, fromLink, cb) { | ||
@@ -362,4 +370,11 @@ // Also check if this might be sentEdit, cause if we've requested History, then | ||
// add to history | ||
var content = this.ottype.serialize? this.ottype.serialize(this.content) : this.content | ||
this.history.storeSnapshot({id: edit.id, contents: content, edit: edit}, function(er) { | ||
var content = this.ottype.serialize? | ||
this.ottype.serialize(this.content) | ||
: this.content | ||
this.history.storeSnapshot({ | ||
id: edit.id | ||
, contents: content | ||
, edit: edit | ||
, author: author | ||
}, function(er) { | ||
if(er) { | ||
@@ -366,0 +381,0 @@ this.emit('error', er) |
@@ -22,3 +22,2 @@ /** | ||
this.document = document | ||
this.idCounter = 0 | ||
} | ||
@@ -32,3 +31,8 @@ module.exports = History | ||
History.prototype.createDocument = function(snapshot, cb) { | ||
this.document.adapter.createDocument({id: snapshot.edit.id, changes: JSON.stringify(snapshot.edit.changeset), contents: snapshot.contents}, function(er) { | ||
this.document.adapter.createDocument({ | ||
id: snapshot.edit.id | ||
, changes: JSON.stringify(snapshot.edit.changeset) | ||
, contents: snapshot.contents | ||
, author: snapshot.author | ||
}, function(er) { | ||
if(er) return cb(er) | ||
@@ -40,3 +44,3 @@ cb() | ||
History.prototype.earliest = function(cb) { | ||
this.document.adapter.getFirstSnapshot(function(er, snapshot) { | ||
this.document.adapter.getFirstSnapshot(this.document.id, function(er, snapshot) { | ||
if(er) return cb && cb(er) | ||
@@ -51,3 +55,3 @@ snapshot = { edit: Edit.fromSnapshot(snapshot, this.document.ottype) | ||
History.prototype.latest = function(cb) { | ||
this.document.adapter.getLatestSnapshot(function(er, snapshot) { | ||
this.document.adapter.getLatestSnapshot(this.document.id, function(er, snapshot) { | ||
if(er) return cb && cb(er) | ||
@@ -67,3 +71,9 @@ if(!snapshot) return cb(new Error('No snapshot found')) | ||
if(!snapshot.edit.id) snapshot.edit.id = Edit.randString() | ||
this.document.adapter.storeSnapshot({id: snapshot.edit.id, changes: JSON.stringify(snapshot.edit.changeset), parent: snapshot.edit.parent, contents: snapshot.contents}, function(er) { | ||
this.document.adapter.storeSnapshot(this.document.id, { | ||
id: snapshot.edit.id | ||
, changes: JSON.stringify(snapshot.edit.changeset) | ||
, parent: snapshot.edit.parent | ||
, contents: snapshot.contents | ||
, author: snapshot.author | ||
}, function(er) { | ||
cb && cb(er) | ||
@@ -75,3 +85,3 @@ }) | ||
History.prototype.remembers = function(snapshotId, cb) { | ||
this.document.adapter.existsSnapshot(snapshotId, function(er, remembers) { | ||
this.document.adapter.existsSnapshot(this.document.id, snapshotId, function(er, remembers) { | ||
cb && cb(er, remembers) | ||
@@ -82,3 +92,3 @@ }) | ||
History.prototype.getAllAfter = function(snapshotId, cb) { | ||
this.document.adapter.getSnapshotsAfter(snapshotId, function(er, snapshots) { | ||
this.document.adapter.getSnapshotsAfter(this.document.id, snapshotId, function(er, snapshots) { | ||
if(er) return cb && cb(er) | ||
@@ -85,0 +95,0 @@ snapshots = snapshots.map(function(snapshot) { |
104
lib/Link.js
@@ -20,4 +20,4 @@ /** | ||
require('setimmediate') | ||
var SECONDS = 1000 | ||
/** | ||
@@ -30,9 +30,10 @@ * This is a Link | ||
if(!opts) opts = {} | ||
this.timeout = opts.timeout || 10*SECONDS | ||
this.credentials = opts.credentials | ||
this.sentCredentials | ||
this.receivedCredentials | ||
this.authenticateFn = opts.authenticate | ||
this.authorizeReadFn = opts.authorizeRead | ||
this.authorizeWriteFn = opts.authorizeWrite | ||
this.needInit | ||
this.authenticated | ||
this.sentEdit | ||
this.sentRequestInit | ||
this.queue | ||
@@ -68,7 +69,28 @@ this.callbacks | ||
Link.prototype.send = function(event/*, args..*/) { | ||
if('requestInit' === event) this.needInit = true | ||
var data = JSON.stringify(Array.prototype.slice.call(arguments)) | ||
this.authorizeRead(Array.prototype.slice.apply(arguments), function(er, authorized) { | ||
if('requestInit' === event) this.sentRequestInit = true | ||
if(!this.authenticated && this.authenticateFn) return | ||
var msg = Array.prototype.slice.call(arguments) | ||
// Authorize message | ||
this.authorizeRead(msg, function(er, authorized) { | ||
if(er) return this.emit('error', er) | ||
// If unauthorized, tell them | ||
if(!authorized) return this.sendUnauthorized() | ||
// If this is an edit, add a timeout, after which we retry | ||
if('edit' === event) { | ||
var edit = msg[1] | ||
, cb = edit.callback | ||
var timeout = setTimeout(function() { | ||
this.send('edit', edit) | ||
}.bind(this), this.timeout) | ||
edit.callback = function() { | ||
clearTimeout(timeout) | ||
cb && cb.apply(null, arguments) | ||
} | ||
msg[1] = edit.pack() | ||
} | ||
var data = JSON.stringify(msg) | ||
console.log('->', data) | ||
@@ -79,2 +101,14 @@ this.push(data) | ||
Link.prototype.sendUnauthenticated = function() { | ||
this.push(JSON.stringify(['unauthenticated'])) | ||
} | ||
Link.prototype.sendAuthenticate = function() { | ||
this.push(JSON.stringify(['authenticate', this.credentials])) | ||
} | ||
Link.prototype.sendAuthenticated = function(status) { | ||
this.push(JSON.stringify(['authenticated', status])) | ||
} | ||
Link.prototype.sendUnauthorized = function() { | ||
@@ -97,3 +131,3 @@ this.push(JSON.stringify(['unauthorized'])) | ||
this.sentEdit = edit | ||
this.send('edit', edit.pack()) | ||
this.send('edit', edit) | ||
} | ||
@@ -120,3 +154,2 @@ } | ||
this.sentEdit = msg[1] | ||
msg[1] = msg[1].pack() | ||
} | ||
@@ -133,24 +166,45 @@ | ||
// ['authenticate', Mixed] | ||
if(args[0] === 'authenticate') { | ||
this.receivedCredentials = args[1] | ||
this.authenticate(args[1], function(er, authed) { | ||
this.authenticated = authed | ||
this.sendAuthenticated(!!(!er && authed)) | ||
cb() | ||
}.bind(this)) | ||
return | ||
} | ||
// ['authenticated', Bool] | ||
if(args[0] === 'authenticated') { | ||
if(!args[1]) return this.emit('error', new Error('Authentication failed')) | ||
if(this.sentRequestInit) this.send('requestInit') | ||
else if(this.sentEdit) this.send('edit', this.sentEdit) | ||
cb() | ||
return | ||
} | ||
// ['unauthenticated'] | ||
if(args[0] === 'unauthenticated') { | ||
this.sendAuthenticate() | ||
cb() | ||
return | ||
} | ||
// ['unauthorized'] | ||
if(args[0] === 'unauthorized') { | ||
if(this.sentCredentials) return this.emit('error', new Error('Authentication failed')) | ||
this.send('authenticate', this.credentials) | ||
if(this.needInit) this.send('requestInit') | ||
else if(this.sentEdit) this.send('edit', this.sentEdit.pack()) // this is a reconnect! | ||
this.sentCredentials = true | ||
this.send('requestInit') | ||
cb() | ||
return | ||
} | ||
if(!this.authenticated && this.authenticateFn) { | ||
this.sendUnauthenticated() | ||
cb() | ||
return | ||
} | ||
if(args[0] === 'init') { | ||
this.sentRequestInit = false | ||
} | ||
// Allow reconnecting: Once we receive data from the other end | ||
// sentCredentials will be false again, to allow sending an authenticate msg | ||
this.sentCredentials = false | ||
if(args[0] === 'init') this.needInit = false | ||
this.authorizeWrite(args, function(er, authorized) { | ||
@@ -195,5 +249,9 @@ | ||
Link.prototype.authenticate = function(credentials, cb) { | ||
this.authenticateFn(credentials, cb) | ||
} | ||
Link.prototype.authorizeWrite = function(msg, cb) { | ||
if(!this.authorizeWriteFn) return cb(null, true) | ||
this.authorizeWriteFn(msg, this.receivedCredentials, cb) | ||
this.authorizeWriteFn(msg, this.authenticated, cb) | ||
} | ||
@@ -203,3 +261,3 @@ | ||
if(!this.authorizeReadFn) return cb(null, true) | ||
this.authorizeReadFn(msg, this.receivedCredentials, cb) | ||
this.authorizeReadFn(msg, this.authenticated, cb) | ||
} |
@@ -26,6 +26,6 @@ /** | ||
this.reset() | ||
this.storeSnapshot(initialSnapshot, cb) | ||
this.storeSnapshot(null, initialSnapshot, cb) | ||
} | ||
MemoryAdapter.prototype.getFirstSnapshot = function(cb) { | ||
MemoryAdapter.prototype.getFirstSnapshot = function(docId, cb) { | ||
if(!this.history.length) return cb() | ||
@@ -37,3 +37,3 @@ var snapshot = this.snapshots[this.history[0]] | ||
MemoryAdapter.prototype.getLatestSnapshot = function(cb) { | ||
MemoryAdapter.prototype.getLatestSnapshot = function(docId, cb) { | ||
if(!this.history.length) return cb() | ||
@@ -45,3 +45,3 @@ var snapshot = this.snapshots[this.history[this.history.length-1]] | ||
MemoryAdapter.prototype.storeSnapshot = function(snapshot, cb) { | ||
MemoryAdapter.prototype.storeSnapshot = function(docId, snapshot, cb) { | ||
this.history.push(snapshot.id) | ||
@@ -57,7 +57,7 @@ this.snapshots[snapshot.id] = JSON.parse(JSON.stringify(snapshot)) | ||
MemoryAdapter.prototype.existsSnapshot = function(editId, cb) { | ||
MemoryAdapter.prototype.existsSnapshot = function(docId, editId, cb) { | ||
cb(null, !!this.snapshots[editId]) | ||
} | ||
MemoryAdapter.prototype.getSnapshotsAfter = function(editId, cb) { | ||
MemoryAdapter.prototype.getSnapshotsAfter = function(docId, editId, cb) { | ||
var arr = [] | ||
@@ -68,2 +68,2 @@ for(var i = this.history.indexOf(editId)+1; i < this.history.length; i++) { | ||
cb(null, arr) | ||
} | ||
} |
{ | ||
"name": "gulf", | ||
"version": "3.1.3", | ||
"version": "4.0.0", | ||
"description": "transport-agnostic operational transformation control layer", | ||
@@ -5,0 +5,0 @@ "repository": { |
@@ -153,2 +153,4 @@ # Gulf [![Build Status](https://travis-ci.org/marcelklehr/gulf.png)](https://travis-ci.org/marcelklehr/gulf) | ||
If you'd like to write your own storage adapter, head on to the API docs. | ||
## API | ||
@@ -161,5 +163,52 @@ | ||
* `opts.credentials` The credentials to be sent to the other end for authentication purposes. | ||
* `opts.authorizeWrite` A function which gets called when the other end writes a message, and has the following signature: `function (msg, receivedCredentials, cb)` | ||
* `opts.authorizeRead` A function which gets called when this side of the link writes a message, and has the following signature: `function (msg, receivedCredentials, cb)` | ||
* `opts.authenticate` A functon which gets called with the credentials from the other side and has the following signature: `(credentials, cb(er, user))` | ||
* `opts.authorizeWrite` A function which gets called when the other end writes a message, and has the following signature: `(msg, user, cb(er, granted))`; `user` is the value returned by your `authenticate` hook. | ||
* `opts.authorizeRead` A function which gets called when this side of the link writes a message, and has the following signature: `(msg, user, cb(er, granted))`; `user` is the value returned by your `authenticate` hook. | ||
The return value of `opts.authenticate` is also used as the author field when saving snapshots. | ||
Here's an example of how to setup link authentication and authorization: | ||
```js | ||
var link = new gulf.Link({ | ||
authenticate: function(credentials, cb) { | ||
authenticate('token', credentials) | ||
.then((userId) => { | ||
cb(null, userId) | ||
}) | ||
.catch(cb) | ||
} | ||
, authorizeWrite: function(msg, userId, cb) { | ||
switch(msg[0]) { | ||
case 'edit': | ||
authorize(userId, 'document:change') | ||
.then(allowed => cb(null, allowed)) | ||
.catch(cb) | ||
break; | ||
case 'ack': | ||
case 'requestInit': | ||
authorize(userId, 'document:read') | ||
.then(allowed => cb(null, allowed)) | ||
.catch(cb) | ||
break; | ||
} | ||
} | ||
, authorizeRead:function(msg, userId, cb) { | ||
switch(msg[0]) { | ||
case 'init': | ||
case 'edit': | ||
authorize(userId, 'document:read') | ||
.then(allowed => cb(null, allowed)) | ||
.catch(cb) | ||
break; | ||
case 'ack': | ||
authorize(userId, 'document:change') | ||
.then(allowed => cb(null, allowed)) | ||
.catch(cb) | ||
break; | ||
} | ||
} | ||
}) | ||
``` | ||
### Class: gulf.Document | ||
@@ -265,9 +314,23 @@ | ||
## FAQ | ||
### Adapter | ||
A snapshot is an object that looks like this: | ||
How does it work? Gulf uses operational transformation, which is all about making edits fit. Node.js streams make sure linking documents is a pure joy. Everything else is in teh codez. | ||
```js | ||
{ | ||
id: 'sdgh684eb68eth' | ||
, changes: '[0, "h"]' | ||
, parent: '5dfhg68aefh65ae' // ID of another snapshot | ||
, contents: '"Hello world"' // stringified representation of the new contents | ||
, author: 12 // The id of the author, as returned by `opts.authenticate` in the Link options (or the value you passed to gulf.Document#receiveEdit, if you passed in the edit directly) | ||
} | ||
``` | ||
Does it support peer-to-peer linking? No. | ||
If you're having trouble writing your own adapter, check out [the in-memory adapter](https://github.com/marcelklehr/gulf/blob/master/lib/MemoryAdapter.js) and the [mongoDB adapter](https://github.com/marcelklehr/gulf-mongodb). | ||
Why? Well, Peer-to-peer is a pain-in-the-ass scenario with operational transformation and not at all performant. If you have a peer-to-peer scenario electing a master might be easier. | ||
#### Adapter#createDocument(initialSnapshot, cb(er, docId)) | ||
#### Adapter#getFirstSnapshot(docId, cb(er, snapshot)) | ||
#### Adapter#getLatestSnapshot(docId, cb(er, snapshot)) | ||
#### Adapter#storeSnapshot(docId, snapshot, cb(er)) | ||
#### Adapter#existsSnapshot(docId, editId, cb(er, exists:Bool)) | ||
#### Adapter#getSnapshotsAfter(docId, editId, cb(er, snapshots:Array)) | ||
@@ -274,0 +337,0 @@ ## Tests? |
@@ -1,2 +0,2 @@ | ||
/* global xdescribe, describe, it, xit */ | ||
/* global describe, xdescribe, it, xit */ | ||
var gulf, expect | ||
@@ -175,4 +175,4 @@ , ottype = require('ottypes').text | ||
linkA = docA.masterLink() | ||
linkB = docB.masterLink() | ||
linkA = docA.masterLink(/*{timeout: 3000}*/) | ||
linkB = docB.masterLink(/*{timeout: 3000}*/) | ||
cb() | ||
@@ -205,2 +205,3 @@ }) | ||
var slaveB | ||
it('should correctly propagate edits from one end to the other end', function(cb) { | ||
@@ -226,3 +227,3 @@ linkA.unpipe() | ||
linkA.pipe(masterDoc.slaveLink()).pipe(linkA) | ||
linkB.pipe(masterDoc.slaveLink()).pipe(linkB) | ||
linkB.pipe(slaveB = masterDoc.slaveLink()).pipe(linkB) | ||
}) | ||
@@ -237,31 +238,32 @@ | ||
it('should catch up on reconnect', function(cb) { | ||
this.timeout(12500) | ||
// disconnect B | ||
linkB.unpipe() | ||
masterDoc.links[3].unpipe() | ||
slaveB.unpipe() | ||
contentA = 'abcdx1324' | ||
docA.update([4, 'x']) // this edit will be sent | ||
docA.update([4, 'x']) // this edit will be sent from A -> Master |-> B | ||
contentB = 'abcd1324QR' | ||
docB.update([8, 'Q']) | ||
docB.update([9, 'R']) | ||
docB.update([8, 'Q']) // these edits will be sent from B |-> Master -> A | ||
setImmediate(function() { | ||
docB.update([9, 'R']) | ||
}) | ||
setTimeout(function() { | ||
expect(masterDoc.content).to.equal('abcdx1324') | ||
// reconnect B | ||
console.log('reconnect B') | ||
linkB.pipe(masterDoc.slaveLink()).pipe(linkB) | ||
// change A | ||
contentA = 'abcdxy1324' | ||
docA.update([5, 'y']) | ||
setTimeout(function() { | ||
expect(contentB).to.equal('abcdxy1324QR') | ||
expect(contentB).to.equal(contentA) | ||
expect(contentA).to.equal('abcdx1324RQ') | ||
cb() | ||
}, 500) | ||
}, 500) | ||
}, 1000) | ||
}, 1000) | ||
}) | ||
}) | ||
describe('Linking to protected documents', function() { | ||
describe('Linking to documents protected by authentication', function() { | ||
var docA, docB | ||
@@ -275,10 +277,5 @@ var linkA, linkB | ||
linkA = docA.slaveLink({ | ||
authorizeRead: function(msg, credentials, cb) { | ||
if(credentials == 'rightCredentials') return cb(null, true) | ||
else return cb(null, false) | ||
authenticate: function(credentials, cb) { | ||
cb(null, credentials == 'rightCredentials') | ||
} | ||
, authorizeWrite: function(msg, credentials, cb) { | ||
if(credentials == 'rightCredentials') return cb(null, true) | ||
else return cb(null, false) | ||
} | ||
}) | ||
@@ -299,3 +296,3 @@ cb() | ||
it('should not adopt the current document state if athentication failed', function(done) { | ||
it('should not adopt the current document state if authentication failed', function(done) { | ||
linkB = docB.masterLink({credentials: 'wrongCredentials'}) | ||
@@ -310,3 +307,53 @@ linkA.pipe(linkB).pipe(linkA) | ||
}) | ||
describe('Linking to documents protected by write authorization', function() { | ||
var docA, docB | ||
var linkA, linkB | ||
var initialContents = 'abc' | ||
beforeEach(function(cb) { | ||
gulf.Document.create(new gulf.MemoryAdapter, ottype, initialContents, function(er, doc) { | ||
docA = doc | ||
docB = new gulf.EditableDocument(new gulf.MemoryAdapter, ottype) | ||
docB._setContents = function(content, cb) {cb()} | ||
docB._change = function(cs, cb) {cb()} | ||
linkA = docA.slaveLink({ | ||
authenticate: function(credentials, cb) { | ||
cb(null, credentials == 'rightCredentials') | ||
} | ||
, authorizeWrite: function(msg, user, cb) { | ||
if(msg[0] === 'requestInit'|| msg[0] === 'ack') return cb(null, true) | ||
cb(null, false) | ||
} | ||
}) | ||
cb() | ||
}) | ||
}) | ||
it('should adopt the current document state correctly', function(done) { | ||
linkB = docB.masterLink({credentials: 'rightCredentials'}) | ||
linkA.pipe(linkB).pipe(linkA) | ||
setTimeout(function() { | ||
expect(docA.content).to.eql(docB.content) | ||
done() | ||
}, 100) | ||
}) | ||
it('should not accept edits', function(done) { | ||
linkB = docB.masterLink({credentials: 'rightCredentials'}) | ||
linkA.pipe(linkB).pipe(linkA) | ||
setImmediate(function() { | ||
docB.update([3,'d']) | ||
}) | ||
setTimeout(function() { | ||
expect(docB.content).to.eql(initialContents) | ||
done() | ||
}, 100) | ||
}) | ||
}) | ||
describe('Linking documents in parallel environments', function() { | ||
@@ -313,0 +360,0 @@ var initialContent = 'abc' |
97717
1399
343