Comparing version 0.8.5 to 0.9.0
367
lib/agent.js
@@ -1,25 +0,1 @@ | ||
// This implements the network API for ShareJS. | ||
// | ||
// The wire protocol is speccced out here: | ||
// https://github.com/josephg/ShareJS/wiki/Wire-Protocol | ||
// | ||
// When a client connects the server first authenticates it and sends: | ||
// | ||
// S: {id:<agent clientId>} | ||
// | ||
// After that, the client can open documents: | ||
// | ||
// C: {c:'users', d:'fred', sub:true, snapshot:null, create:true, type:'text'} | ||
// S: {c:'users', d:'fred', sub:true, snapshot:{snapshot:'hi there', v:5, meta:{}}, create:false} | ||
// | ||
// ... | ||
// | ||
// The client can send open requests as soon as the socket has opened - it doesn't need to | ||
// wait for its id. | ||
// | ||
// The wire protocol is documented here: | ||
// https://github.com/josephg/ShareJS/wiki/Wire-Protocol | ||
// | ||
var async = require('async'); | ||
var hat = require('hat'); | ||
@@ -67,3 +43,3 @@ var util = require('./util'); | ||
// Initialize the remote client by sending it its agent Id. | ||
this._send({a: 'init', protocol: 0, id: this.clientId}); | ||
this._send({a: 'init', protocol: 1, id: this.clientId}); | ||
} | ||
@@ -187,3 +163,3 @@ module.exports = Agent; | ||
// them in the streams for subscribed documents or queries | ||
return this.clientId === op.src && collection === (op.i || op.c); | ||
return (this.clientId === op.src) && (collection === (op.i || op.c)); | ||
}; | ||
@@ -214,4 +190,15 @@ | ||
Agent.prototype._sendOps = function(collection, id, ops) { | ||
for (var i = 0; i < ops.length; i++) { | ||
this._sendOp(collection, id, ops[i]); | ||
} | ||
}; | ||
Agent.prototype._reply = function(req, err, msg) { | ||
var msg = (err) ? {error: err} : msg || {}; | ||
if (err) { | ||
req.error = err; | ||
this._send(req); | ||
return; | ||
} | ||
if (!msg) msg = {}; | ||
@@ -222,2 +209,3 @@ msg.a = req.a; | ||
if (req.id) msg.id = req.id; | ||
if (req.b && !msg.data) msg.b = req.b; | ||
@@ -258,6 +246,6 @@ this._send(msg); | ||
Agent.prototype._checkRequest = function(req) { | ||
if (req.a === 'qsub' || req.a === 'qfetch' || req.a === 'qunsub') { | ||
if (req.a === 'qf' || req.a === 'qs' || req.a === 'qu') { | ||
// Query messages need an ID property. | ||
if (typeof req.id !== 'number') return 'Missing query ID'; | ||
} else if (req.a === 'op' || req.a === 'sub' || req.a === 'unsub' || req.a === 'fetch') { | ||
} else if (req.a === 'op' || req.a === 'f' || req.a === 's' || req.a === 'u') { | ||
// Doc-based request. | ||
@@ -270,7 +258,6 @@ if (req.c != null && typeof req.c !== 'string') return 'Invalid collection'; | ||
} | ||
} else if (req.a === 'bs') { | ||
// Bulk subscribe | ||
if (typeof req.s !== 'object') return 'Invalid bulk subscribe data'; | ||
} else { | ||
return 'Invalid action'; | ||
} else if (req.a === 'bf' || req.a === 'bs' || req.a === 'bu') { | ||
// Bulk request | ||
if (req.c != null && typeof req.c !== 'string') return 'Invalid collection'; | ||
if (typeof req.b !== 'object') return 'Invalid bulk subscribe data'; | ||
} | ||
@@ -286,33 +273,55 @@ }; | ||
var errMessage = this._checkRequest(req); | ||
if (errMessage) return callback({code: 4000, message: errMessage}); | ||
try { | ||
var errMessage = this._checkRequest(req); | ||
if (errMessage) return callback({code: 4000, message: errMessage}); | ||
switch (req.a) { | ||
case 'qsub': | ||
return this._querySubscribe(req, callback); | ||
case 'qunsub': | ||
return this._queryUnsubscribe(req, callback); | ||
case 'qfetch': | ||
return this._queryFetch(req, callback); | ||
case 'bs': | ||
return this._bulkSubscribe(req, callback); | ||
case 'sub': | ||
return this._subscribe(req, callback); | ||
case 'unsub': | ||
return this._unsubscribe(req, callback); | ||
case 'fetch': | ||
return this._fetch(req, callback); | ||
case 'op': | ||
return this._submit(req, callback); | ||
default: | ||
callback({ | ||
code: 4000, | ||
message: 'Invalid or unknown message' | ||
}); | ||
switch (req.a) { | ||
case 'qf': | ||
return this._queryFetch(req.id, req.c, req.q, getQueryOptions(req), callback); | ||
case 'qs': | ||
return this._querySubscribe(req.id, req.c, req.q, getQueryOptions(req), callback); | ||
case 'qu': | ||
return this._queryUnsubscribe(req.id, callback); | ||
case 'bf': | ||
return this._fetchBulk(req.c, req.b, callback); | ||
case 'bs': | ||
return this._subscribeBulk(req.c, req.b, callback); | ||
case 'bu': | ||
return this._unsubscribeBulk(req.c, req.b, callback); | ||
case 'f': | ||
return this._fetch(req.c, req.d, req.v, callback); | ||
case 's': | ||
return this._subscribe(req.c, req.d, req.v, callback); | ||
case 'u': | ||
return this._unsubscribe(req.c, req.d, callback); | ||
case 'op': | ||
var op = this._createOp(req); | ||
return this._submit(req.c, req.d, op, callback); | ||
default: | ||
callback({ | ||
code: 4000, | ||
message: 'Invalid or unknown message' | ||
}); | ||
} | ||
} catch (err) { | ||
callback(err); | ||
} | ||
}; | ||
function getQueryOptions(req) { | ||
var results = req.r; | ||
var ids, versions; | ||
if (results) { | ||
ids = []; | ||
versions = (results.length) ? {} : null; | ||
for (var i = 0; i < results.length; i++) { | ||
var result = results[i]; | ||
var id = result[0]; | ||
var version = result[1]; | ||
ids.push(id); | ||
versions[id] = version; | ||
} | ||
} | ||
return { | ||
versions: req.vs, | ||
ids: ids, | ||
versions: versions, | ||
db: req.db | ||
@@ -322,59 +331,49 @@ }; | ||
Agent.prototype._querySubscribe = function(req, callback) { | ||
// Subscribe to a query. The client is sent the query results and its | ||
// notified whenever there's a change | ||
var queryId = req.id; | ||
var collection = req.c; | ||
var query = req.q; | ||
var options = getQueryOptions(req); | ||
var agent = this; | ||
this.backend.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) { | ||
if (err) return callback(err); | ||
agent._subscribeToQuery(emitter, queryId, collection, query); | ||
agent._sendQueryResults(queryId, collection, options, results, extra, callback); | ||
}); | ||
}; | ||
Agent.prototype._queryUnsubscribe = function(req, callback) { | ||
var queryId = req.id; | ||
var emitter = this.subscribedQueries[queryId]; | ||
if (emitter) { | ||
emitter.destroy(); | ||
delete this.subscribedQueries[queryId]; | ||
} | ||
process.nextTick(callback); | ||
}; | ||
Agent.prototype._queryFetch = function(req, callback) { | ||
Agent.prototype._queryFetch = function(queryId, collection, query, options, callback) { | ||
// Fetch the results of a query once | ||
var queryId = req.id; | ||
var collection = req.c; | ||
var query = req.q; | ||
var options = getQueryOptions(req); | ||
var agent = this; | ||
this.backend.queryFetch(this, collection, query, options, function(err, results, extra) { | ||
if (err) return callback(err); | ||
agent._sendQueryResults(queryId, collection, options, results, extra, callback); | ||
var message = { | ||
data: getResultsData(results), | ||
extra: extra | ||
}; | ||
callback(null, message); | ||
}); | ||
}; | ||
Agent.prototype._sendQueryResults = function(queryId, collection, options, results, extra, callback) { | ||
var versions = options.versions; | ||
var data = getResultsData(results, versions); | ||
var res = {id: queryId, data: data, extra: extra}; | ||
var opsRequest = getResultsOpsRequest(results, versions); | ||
if (!opsRequest) return callback(null, res); | ||
Agent.prototype._querySubscribe = function(queryId, collection, query, options, callback) { | ||
// Subscribe to a query. The client is sent the query results and its | ||
// notified whenever there's a change | ||
var agent = this; | ||
this.backend.getOpsBulk(this, collection, opsRequest, null, function(err, results) { | ||
var wait = 1; | ||
var message; | ||
function finish(err) { | ||
if (err) return callback(err); | ||
for (var id in results) { | ||
var ops = results[id]; | ||
for (var i = 0; i < ops.length; i++) { | ||
agent._sendOp(collection, id, ops[i]); | ||
} | ||
if (--wait) return; | ||
callback(null, message); | ||
} | ||
if (options.versions) { | ||
wait++; | ||
this._fetchBulk(collection, options.versions, finish); | ||
} | ||
this.backend.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) { | ||
if (err) return finish(err); | ||
agent._subscribeToQuery(emitter, queryId, collection, query); | ||
// No results are returned when ids are passed in as an option. Instead, | ||
// want to re-poll the entire query once we've established listeners to | ||
// emit any diff in results | ||
if (!results) { | ||
emitter.queryPoll(finish); | ||
return; | ||
} | ||
callback(null, res); | ||
message = { | ||
data: getResultsData(results), | ||
extra: extra | ||
}; | ||
finish(); | ||
}); | ||
}; | ||
function getResultsData(results, versions) { | ||
function getResultsData(results) { | ||
var items = []; | ||
@@ -384,9 +383,10 @@ var lastType = null; | ||
var result = results[i]; | ||
var item = {d: result.id, v: result.v}; | ||
var item = { | ||
d: result.id, | ||
v: result.v, | ||
data: result.data | ||
}; | ||
if (lastType !== result.type) { | ||
lastType = item.type = result.type; | ||
} | ||
if (!versions || versions[result.id] == null) { | ||
item.data = result.data; | ||
} | ||
items.push(item); | ||
@@ -396,59 +396,51 @@ } | ||
} | ||
function getResultsOpsRequest(results, versions) { | ||
if (!versions) return; | ||
var request; | ||
for (var i = 0; i < results.length; i++) { | ||
var result = results[i]; | ||
var from = versions[result.id]; | ||
if (from != null && result.v > from) { | ||
if (!request) request = {}; | ||
request[result.id] = from; | ||
} | ||
Agent.prototype._queryUnsubscribe = function(queryId, callback) { | ||
var emitter = this.subscribedQueries[queryId]; | ||
if (emitter) { | ||
emitter.destroy(); | ||
delete this.subscribedQueries[queryId]; | ||
} | ||
return request; | ||
} | ||
process.nextTick(callback); | ||
}; | ||
// Bulk subscribe. The message is: | ||
// {a:'bs', s:{users:{fred:100, george:5, carl:null}}} | ||
Agent.prototype._bulkSubscribe = function(req, callback) { | ||
var request = req.s; | ||
var response = {}; | ||
Agent.prototype._fetch = function(collection, id, version, callback) { | ||
if (version == null) { | ||
// Fetch a snapshot | ||
this.backend.fetch(this, collection, id, function(err, data) { | ||
if (err) return callback(err); | ||
callback(null, {data: data}); | ||
}); | ||
} else { | ||
// It says fetch on the tin, but if a version is specified the client | ||
// actually wants me to fetch some ops | ||
var agent = this; | ||
this.backend.getOps(this, collection, id, version, null, function(err, ops) { | ||
if (err) return callback(err); | ||
agent._sendOps(collection, id, ops); | ||
callback(); | ||
}); | ||
} | ||
}; | ||
var agent = this; | ||
async.forEachOf(request, function(versions, collection, eachCb) { | ||
agent.backend.subscribeBulk(agent, collection, versions, function(err, streams, snapshotMap) { | ||
if (err) return eachCb(err); | ||
for (var id in streams) { | ||
agent._subscribeToStream(collection, id, streams[id]); | ||
// Give a thumbs up for the subscription | ||
if (!snapshotMap[id]) snapshotMap[id] = true; | ||
Agent.prototype._fetchBulk = function(collection, versions, callback) { | ||
if (Array.isArray(versions)) { | ||
this.backend.fetchBulk(this, collection, versions, function(err, snapshotMap) { | ||
if (err) return callback(err); | ||
callback(null, {data: snapshotMap}); | ||
}); | ||
} else { | ||
var agent = this; | ||
this.backend.getOpsBulk(this, collection, versions, null, function(err, opsMap) { | ||
if (err) return callback(err); | ||
for (var id in opsMap) { | ||
var ops = opsMap[id]; | ||
agent._sendOps(collection, id, ops); | ||
} | ||
response[collection] = snapshotMap; | ||
eachCb(); | ||
callback(); | ||
}); | ||
}, function(err) { | ||
if (err) { | ||
// Close any streams we may have already subscribed before erroring to | ||
// avoid leaking memory if earlier calls to backend.subscribeBulk succeed | ||
// and others fail | ||
for (var collection in request) { | ||
var docs = agent.subscribedDocs[collection]; | ||
if (!docs) continue; | ||
for (var id in request[collection]) { | ||
var stream = docs[id]; | ||
if (stream) stream.destroy(); | ||
} | ||
} | ||
return callback(err); | ||
} | ||
callback(null, {s: response}); | ||
}); | ||
} | ||
}; | ||
Agent.prototype._subscribe = function(req, callback) { | ||
// Subscribe to a document | ||
var collection = req.c; | ||
var id = req.d; | ||
var version = req.v; | ||
Agent.prototype._subscribe = function(collection, id, version, callback) { | ||
// If the version is specified, catch the client up by sending all ops | ||
@@ -470,7 +462,20 @@ // since the specified version | ||
Agent.prototype._unsubscribe = function(req, callback) { | ||
Agent.prototype._subscribeBulk = function(collection, versions, callback) { | ||
var agent = this; | ||
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap) { | ||
if (err) return callback(err); | ||
for (var id in streams) { | ||
agent._subscribeToStream(collection, id, streams[id]); | ||
} | ||
if (snapshotMap) { | ||
callback(null, {data: snapshotMap}); | ||
} else { | ||
callback(); | ||
} | ||
}); | ||
}; | ||
Agent.prototype._unsubscribe = function(collection, id, callback) { | ||
// Unsubscribe from the specified document. This cancels the active | ||
// stream or an inflight subscribing state | ||
var collection = req.c; | ||
var id = req.d; | ||
var docs = this.subscribedDocs[collection]; | ||
@@ -482,30 +487,14 @@ var stream = docs && docs[id]; | ||
Agent.prototype._fetch = function(req, callback) { | ||
var collection = req.c; | ||
var id = req.d; | ||
var version = req.v; | ||
if (version == null) { | ||
// Fetch a snapshot | ||
this.backend.fetch(this, collection, id, function(err, data) { | ||
if (err) return callback(err); | ||
callback(null, {data: data}); | ||
}); | ||
} else { | ||
// It says fetch on the tin, but if a version is specified the client | ||
// actually wants me to fetch some ops | ||
var agent = this; | ||
this.backend.getOps(this, collection, id, version, null, function(err, results) { | ||
if (err) return callback(err); | ||
for (var i = 0; i < results.length; i++) { | ||
agent._sendOp(collection, id, results[i]); | ||
} | ||
callback(); | ||
}); | ||
Agent.prototype._unsubscribeBulk = function(collection, ids, callback) { | ||
var docs = this.subscribedDocs[collection]; | ||
if (!docs) return process.nextTick(callback); | ||
for (var i = 0; i < ids.length; i++) { | ||
var id = ids[i]; | ||
var stream = docs[id]; | ||
if (stream) stream.destroy(); | ||
} | ||
process.nextTick(callback); | ||
}; | ||
Agent.prototype._submit = function(req, callback) { | ||
var collection = req.c; | ||
var id = req.d; | ||
var op = this._createOp(req); | ||
Agent.prototype._submit = function(collection, id, op, callback) { | ||
var agent = this; | ||
@@ -524,5 +513,3 @@ this.backend.submit(this, collection, id, op, function(err, ops) { | ||
// Reply with any operations that the client is missing. | ||
for (var i = 0; i < ops.length; i++) { | ||
agent._sendOp(collection, id, ops[i]); | ||
} | ||
agent._sendOps(collection, id, ops); | ||
callback(null, ack); | ||
@@ -529,0 +516,0 @@ }); |
@@ -0,3 +1,5 @@ | ||
var Duplex = require('stream').Duplex; | ||
var async = require('async'); | ||
var Agent = require('./agent'); | ||
var Connection = require('./client/connection'); | ||
var emitter = require('./emitter'); | ||
@@ -9,2 +11,3 @@ var MemoryDB = require('./db/memory'); | ||
var QueryEmitter = require('./query-emitter'); | ||
var StreamSocket = require('./stream-socket'); | ||
var SubmitRequest = require('./submit-request'); | ||
@@ -42,2 +45,11 @@ | ||
Backend.prototype.createConnection = function() { | ||
var stream = new Duplex({objectMode: true}); | ||
var socket = new StreamSocket(stream); | ||
var connection = new Connection(socket); | ||
socket.connect(); | ||
this.listen(stream); | ||
return connection; | ||
}; | ||
/** A client has connected through the specified stream. Listen for messages. | ||
@@ -117,2 +129,28 @@ * | ||
// Submit an operation on the named collection/docname. op should contain a | ||
// {op:}, {create:} or {del:} field. It should probably contain a v: field (if | ||
// it doesn't, it defaults to the current version). | ||
// | ||
// callback called with (err, snapshot, ops) | ||
Backend.prototype.submit = function(agent, index, id, op, callback) { | ||
var err = ot.checkOp(op); | ||
if (err) return callback(err); | ||
var request = new SubmitRequest(this, agent, index, id, op); | ||
var backend = this; | ||
backend.trigger('submit', agent, request, function(err) { | ||
if (err) return callback(err); | ||
request.run(function(err, snapshot, ops) { | ||
if (err) return callback(err); | ||
backend.trigger('after submit', agent, request, function(err) { | ||
if (err) return callback(err); | ||
backend._sanitizeOps(agent, request.projection, request.collection, id, ops, function(err) { | ||
if (err) return callback(err); | ||
backend.emit('timing', 'submit.total', Date.now() - request.start); | ||
callback(err, ops); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}; | ||
Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) { | ||
@@ -200,28 +238,2 @@ if (projection) { | ||
// Submit an operation on the named collection/docname. op should contain a | ||
// {op:}, {create:} or {del:} field. It should probably contain a v: field (if | ||
// it doesn't, it defaults to the current version). | ||
// | ||
// callback called with (err, snapshot, ops) | ||
Backend.prototype.submit = function(agent, index, id, op, callback) { | ||
var err = ot.checkOp(op); | ||
if (err) return callback(err); | ||
var request = new SubmitRequest(this, agent, index, id, op); | ||
var backend = this; | ||
backend.trigger('submit', agent, request, function(err) { | ||
if (err) return callback(err); | ||
request.run(function(err, snapshot, ops) { | ||
if (err) return callback(err); | ||
backend.trigger('after submit', agent, request, function(err) { | ||
if (err) return callback(err); | ||
backend._sanitizeOps(agent, request.projection, request.collection, id, ops, function(err) { | ||
if (err) return callback(err); | ||
backend.emit('timing', 'submit.total', Date.now() - request.start); | ||
callback(err, ops); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}; | ||
Backend.prototype.fetch = function(agent, index, id, callback) { | ||
@@ -270,3 +282,3 @@ var projection = this.projections[index]; | ||
if (err) return callback(err); | ||
stream.initDocSubscribe(backend, agent, projection, version); | ||
stream.initProjection(backend, agent, projection); | ||
if (version == null) { | ||
@@ -283,3 +295,3 @@ // Subscribing from null means that the agent doesn't have a document | ||
if (err) return callback(err); | ||
stream.pack(version, ops); | ||
stream.pushOps(ops); | ||
backend.emit('timing', 'subscribe', Date.now() - start); | ||
@@ -298,15 +310,9 @@ callback(null, stream); | ||
var streams = {}; | ||
var fetchIds = []; | ||
var opsVersions = null; | ||
async.forEachOf(versions, function(version, id, eachCb) { | ||
if (version == null) { | ||
fetchIds.push(version); | ||
} else { | ||
if (!opsVersions) opsVersions = {}; | ||
opsVersions[id] = version; | ||
} | ||
var doFetch = Array.isArray(versions); | ||
var ids = (doFetch) ? versions : Object.keys(versions); | ||
async.each(ids, function(id, eachCb) { | ||
var channel = backend.getDocChannel(collection, id); | ||
backend.pubsub.subscribe(channel, function(err, stream) { | ||
if (err) return eachCb(err); | ||
stream.initDocSubscribe(backend, agent, projection, version); | ||
stream.initProjection(backend, agent, projection); | ||
streams[id] = stream; | ||
@@ -320,27 +326,27 @@ eachCb(); | ||
} | ||
async.parallel({ | ||
snapshotMap: function(parallelCb) { | ||
if (!fetchIds.length) return parallelCb(null, {}); | ||
backend.fetchBulk(agent, index, fetchIds, parallelCb); | ||
}, | ||
ops: function(parallelCb) { | ||
if (!opsVersions) return parallelCb(); | ||
backend.db.getOpsBulk(collection, opsVersions, null, function(err, opsMap) { | ||
if (err) return parallelCb(err); | ||
for (var id in opsVersions) { | ||
var version = opsVersions[id]; | ||
var ops = opsMap[id]; | ||
streams[id].pack(version, ops); | ||
} | ||
parallelCb(); | ||
}); | ||
} | ||
}, function(err, results) { | ||
if (err) { | ||
destroyStreams(streams); | ||
return callback(err); | ||
} | ||
backend.emit('timing', 'subscribeBulk', Date.now() - start); | ||
callback(null, streams, results.snapshotMap); | ||
}); | ||
if (doFetch) { | ||
// If an array of ids, get current snapshots | ||
backend.fetchBulk(agent, index, ids, function(err, snapshotMap) { | ||
if (err) { | ||
destroyStreams(streams); | ||
return callback(err); | ||
} | ||
backend.emit('timing', 'subscribeBulk', Date.now() - start); | ||
callback(null, streams, snapshotMap); | ||
}); | ||
} else { | ||
// If a versions map, get ops since requested versions | ||
backend.db.getOpsBulk(collection, versions, null, function(err, opsMap) { | ||
if (err) { | ||
destroyStreams(streams); | ||
return callback(err); | ||
} | ||
for (var id in opsMap) { | ||
var ops = opsMap[id]; | ||
streams[id].pushOps(ops); | ||
} | ||
backend.emit('timing', 'subscribeBulk', Date.now() - start); | ||
callback(null, streams); | ||
}); | ||
} | ||
}); | ||
@@ -383,6 +389,10 @@ }; | ||
if (err) return callback(err); | ||
stream.initProjection(backend, agent, request.projection); | ||
if (options.ids) { | ||
var queryEmitter = new QueryEmitter(request, stream, options.ids); | ||
backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request); | ||
callback(null, queryEmitter); | ||
return; | ||
} | ||
// Issue query on db to get our initial results | ||
stream.projection = request.projection; | ||
stream.backend = backend; | ||
stream.agent = agent; | ||
backend._query(agent, request, function(err, snapshots, extra) { | ||
@@ -393,3 +403,4 @@ if (err) { | ||
} | ||
var queryEmitter = new QueryEmitter(request, stream, snapshots, extra); | ||
var ids = pluckIds(snapshots); | ||
var queryEmitter = new QueryEmitter(request, stream, ids, extra); | ||
backend.emit('timing', 'querySubscribe', Date.now() - start, request); | ||
@@ -453,1 +464,9 @@ callback(null, queryEmitter, snapshots, extra); | ||
}; | ||
function pluckIds(snapshots) { | ||
var ids = []; | ||
for (var i = 0; i < snapshots.length; i++) { | ||
ids.push(snapshots[i].id); | ||
} | ||
return ids; | ||
} |
@@ -25,3 +25,3 @@ var Doc = require('./doc'); | ||
// Map of collection -> docName -> doc object for created documents. | ||
// Map of collection -> id -> doc object for created documents. | ||
// (created documents MUST BE UNIQUE) | ||
@@ -88,6 +88,6 @@ this.collections = {}; | ||
if (this.socket) { | ||
delete this.socket.onopen | ||
delete this.socket.onclose | ||
delete this.socket.onmessage | ||
delete this.socket.onerror | ||
this.socket.onmessage = null; | ||
this.socket.onopen = null; | ||
this.socket.onerror = null; | ||
this.socket.onclose = null; | ||
} | ||
@@ -109,6 +109,2 @@ | ||
// Fall back to supporting old browserchannel 1.x API which implemented the | ||
// websocket API incorrectly. This will be removed at some point | ||
if (!data) data = msg; | ||
// Some transports don't need parsing. | ||
@@ -121,3 +117,3 @@ if (typeof data === 'string') data = JSON.parse(data); | ||
t: (new Date()).toTimeString(), | ||
recv:JSON.stringify(data) | ||
recv: JSON.stringify(data) | ||
}); | ||
@@ -162,3 +158,2 @@ while (connection.messageBuffer.length > 100) { | ||
/** | ||
@@ -175,3 +170,3 @@ * @param {object} msg | ||
// ID. | ||
if (msg.protocol !== 0) throw new Error('Invalid protocol version'); | ||
if (msg.protocol !== 1) throw new Error('Invalid protocol version'); | ||
if (typeof msg.id != 'string') throw new Error('Invalid client id'); | ||
@@ -181,43 +176,73 @@ | ||
this._setState('connected'); | ||
break; | ||
return; | ||
case 'qfetch': | ||
case 'qsub': | ||
case 'qf': | ||
var query = this.queries[msg.id]; | ||
if (query) query._handleFetch(msg.error, msg.data, msg.extra); | ||
return; | ||
case 'qs': | ||
var query = this.queries[msg.id]; | ||
if (query) query._handleSubscribe(msg.error, msg.data, msg.extra); | ||
return; | ||
case 'qu': | ||
// Queries are removed immediately on calls to destroy, so we ignore | ||
// replies to query unsubscribes. Perhaps there should be a callback for | ||
// destroy, but this is currently unimplemented | ||
return; | ||
case 'q': | ||
case 'qunsub': | ||
// Query message. Pass this to the appropriate query object. | ||
var query = this.queries[msg.id]; | ||
if (query) query._onMessage(msg); | ||
break; | ||
if (query) query._handleDiff(msg.error, msg.diff); | ||
return; | ||
case 'bf': | ||
return this._handleBulkMessage(msg, '_handleFetch'); | ||
case 'bs': | ||
// Bulk subscribe response. The responses for each document are contained within. | ||
var result = msg.s; | ||
for (var cName in result) { | ||
for (var docName in result[cName]) { | ||
var doc = this.get(cName, docName); | ||
if (!doc) { | ||
console.warn('Message for unknown doc. Ignoring.', msg); | ||
break; | ||
} | ||
return this._handleBulkMessage(msg, '_handleSubscribe'); | ||
case 'bu': | ||
return this._handleBulkMessage(msg, '_handleUnsubscribe'); | ||
var msg = result[cName][docName]; | ||
if (typeof msg === 'object') { | ||
doc._handleSubscribe(msg.error, msg); | ||
} else { | ||
// The msg will be true if we simply resubscribed. | ||
doc._handleSubscribe(null, null); | ||
} | ||
} | ||
} | ||
break; | ||
case 'f': | ||
var doc = this.getExisting(msg.c, msg.d); | ||
if (doc) doc._handleFetch(msg.error, msg.data); | ||
return; | ||
case 's': | ||
var doc = this.getExisting(msg.c, msg.d); | ||
if (doc) doc._handleSubscribe(msg.error, msg.data); | ||
return; | ||
case 'u': | ||
var doc = this.getExisting(msg.c, msg.d); | ||
if (doc) doc._handleUnsubscribe(msg.error); | ||
return; | ||
case 'op': | ||
var doc = this.getExisting(msg.c, msg.d); | ||
if (doc) doc._handleOp(msg.error, msg); | ||
return; | ||
default: | ||
// Document message. Pull out the referenced document and forward the | ||
// message. | ||
var doc = this.getExisting(msg.c, msg.d); | ||
if (doc) doc._onMessage(msg); | ||
console.warn('Ignorning unrecognized message', msg); | ||
} | ||
}; | ||
Connection.prototype._handleBulkMessage = function(msg, method) { | ||
if (msg.data) { | ||
for (var id in msg.data) { | ||
var doc = this.getExisting(msg.c, id); | ||
if (doc) doc[method](msg.error, msg.data[id]); | ||
} | ||
} else if (Array.isArray(msg.b)) { | ||
for (var i = 0; i < msg.b.length; i++) { | ||
var id = msg.b[i]; | ||
var doc = this.getExisting(msg.c, id); | ||
if (doc) doc[method](msg.error); | ||
} | ||
} else if (msg.b) { | ||
for (var id in msg.b) { | ||
var doc = this.getExisting(msg.c, id); | ||
if (doc) doc[method](msg.error); | ||
} | ||
} else { | ||
console.error('Invalid bulk message', msg); | ||
} | ||
}; | ||
@@ -240,6 +265,6 @@ Connection.prototype.reset = function() { | ||
this._retryInterval = setInterval(function() { | ||
for (var collectionName in connection.collections) { | ||
var collection = connection.collections[collectionName]; | ||
for (var docName in collection) { | ||
collection[docName].retry(); | ||
for (var collection in connection.collections) { | ||
var docs = connection.collections[collection]; | ||
for (var id in docs) { | ||
docs[id].retry(); | ||
} | ||
@@ -252,3 +277,3 @@ } | ||
// Set the connection's state. The connection is basically a state machine. | ||
Connection.prototype._setState = function(newState, data) { | ||
Connection.prototype._setState = function(newState, reason) { | ||
if (this.state === newState) return; | ||
@@ -274,45 +299,55 @@ | ||
this.emit(newState, data); | ||
this.emit(newState, reason); | ||
// Group all subscribes together to help server make more efficient calls | ||
this.bsStart(); | ||
this.startBulk(); | ||
// Emit the event to all queries | ||
for (var id in this.queries) { | ||
var query = this.queries[id]; | ||
query._onConnectionStateChanged(newState, data); | ||
query._onConnectionStateChanged(); | ||
} | ||
// Emit the event to all documents | ||
for (var c in this.collections) { | ||
var collection = this.collections[c]; | ||
for (var docName in collection) { | ||
collection[docName]._onConnectionStateChanged(newState, data); | ||
for (var collection in this.collections) { | ||
var docs = this.collections[collection]; | ||
for (var id in docs) { | ||
docs[id]._onConnectionStateChanged(); | ||
} | ||
} | ||
this.bsEnd(); | ||
this.endBulk(); | ||
}; | ||
Connection.prototype.bsStart = function() { | ||
this.subscribeData = this.subscribeData || {}; | ||
Connection.prototype.startBulk = function() { | ||
if (!this.bulk) this.bulk = {}; | ||
}; | ||
Connection.prototype.bsEnd = function() { | ||
// Only send bulk subscribe if not empty | ||
if (util.hasKeys(this.subscribeData)) { | ||
this.send({a:'bs', s:this.subscribeData}); | ||
Connection.prototype.endBulk = function() { | ||
if (this.bulk) { | ||
for (var collection in this.bulk) { | ||
var actions = this.bulk[collection]; | ||
if (actions.fetch) this.send({a: 'bf', c: collection, b: actions.fetch}); | ||
if (actions.fetchFrom) this.send({a: 'bf', c: collection, b: actions.fetchFrom}); | ||
if (actions.subscribe) this.send({a: 'bs', c: collection, b: actions.subscribe}); | ||
if (actions.subscribeFrom) this.send({a: 'bs', c: collection, b: actions.subscribeFrom}); | ||
if (actions.unsubscribe) this.send({a: 'bu', c: collection, b: actions.unsubscribe}); | ||
} | ||
} | ||
this.subscribeData = null; | ||
this.bulk = null; | ||
}; | ||
Connection.prototype.sendSubscribe = function(doc, version) { | ||
Connection.prototype.sendSubscribe = function(doc) { | ||
// Ensure the doc is registered so that it receives the reply message | ||
this._addDoc(doc); | ||
if (this.subscribeData) { | ||
if (this.bulk) { | ||
// Bulk subscribe | ||
var data = this.subscribeData; | ||
if (!data[doc.collection]) data[doc.collection] = {}; | ||
data[doc.collection][doc.name] = version || null; | ||
var actions = this.bulk[doc.collection] || (this.bulk[doc.collection] = {}); | ||
if (doc.version == null) { | ||
var subscribe = actions.subscribe || (actions.subscribe = []); | ||
subscribe.push(doc.id); | ||
} else { | ||
var subscribeFrom = actions.subscribeFrom || (actions.subscribeFrom = {}); | ||
subscribeFrom[doc.id] = doc.version; | ||
} | ||
} else { | ||
// Send single subscribe message | ||
var msg = {a: 'sub', c: doc.collection, d: doc.name}; | ||
if (version != null) msg.v = version; | ||
// Send single doc subscribe message | ||
var msg = {a: 's', c: doc.collection, d: doc.id, v: doc.version}; | ||
this.send(msg); | ||
@@ -322,8 +357,20 @@ } | ||
Connection.prototype.sendFetch = function(doc, version) { | ||
Connection.prototype.sendFetch = function(doc) { | ||
// Ensure the doc is registered so that it receives the reply message | ||
this._addDoc(doc); | ||
var msg = {a: 'fetch', c: doc.collection, d: doc.name}; | ||
if (version != null) msg.v = version; | ||
this.send(msg); | ||
if (this.bulk) { | ||
// Bulk fetch | ||
var actions = this.bulk[doc.collection] || (this.bulk[doc.collection] = {}); | ||
if (doc.version == null) { | ||
var fetch = actions.fetch || (actions.fetch = []); | ||
fetch.push(doc.id); | ||
} else { | ||
var fetchFrom = actions.fetchFrom || (actions.fetchFrom = {}); | ||
fetchFrom[doc.id] = doc.version; | ||
} | ||
} else { | ||
// Send single doc fetch message | ||
var msg = {a: 'f', c: doc.collection, d: doc.id, v: doc.version}; | ||
this.send(msg); | ||
} | ||
}; | ||
@@ -334,4 +381,12 @@ | ||
this._addDoc(doc); | ||
var msg = {a: 'unsub', c: doc.collection, d: doc.name}; | ||
this.send(msg); | ||
if (this.bulk) { | ||
// Bulk unsubscribe | ||
var actions = this.bulk[doc.collection] || (this.bulk[doc.collection] = {}); | ||
var unsubscribe = actions.unsubscribe || (actions.unsubscribe = []); | ||
unsubscribe.push(doc.id); | ||
} else { | ||
// Send single doc unsubscribe message | ||
var msg = {a: 'u', c: doc.collection, d: doc.id}; | ||
this.send(msg); | ||
} | ||
}; | ||
@@ -345,3 +400,3 @@ | ||
c: doc.collection, | ||
d: doc.name, | ||
d: doc.id, | ||
v: doc.version, | ||
@@ -362,3 +417,3 @@ src: data.src, | ||
Connection.prototype.send = function(msg) { | ||
if (this.debug) console.log("SEND", JSON.stringify(msg)); | ||
if (this.debug) console.log('SEND', JSON.stringify(msg)); | ||
@@ -384,4 +439,4 @@ this.messageBuffer.push({t:Date.now(), send:JSON.stringify(msg)}); | ||
Connection.prototype.getExisting = function(collection, name) { | ||
if (this.collections[collection]) return this.collections[collection][name]; | ||
Connection.prototype.getExisting = function(collection, id) { | ||
if (this.collections[collection]) return this.collections[collection][id]; | ||
}; | ||
@@ -394,13 +449,13 @@ | ||
* @param collection | ||
* @param name | ||
* @param id | ||
* @param [data] ingested into document if created | ||
* @return {Doc} | ||
*/ | ||
Connection.prototype.get = function(collection, name, data) { | ||
var collectionObject = this.collections[collection] || | ||
Connection.prototype.get = function(collection, id, data) { | ||
var docs = this.collections[collection] || | ||
(this.collections[collection] = {}); | ||
var doc = collectionObject[name]; | ||
var doc = docs[id]; | ||
if (!doc) { | ||
doc = collectionObject[name] = new Doc(this, collection, name); | ||
doc = docs[id] = new Doc(this, collection, id); | ||
this.emit('doc', doc); | ||
@@ -428,6 +483,6 @@ } | ||
Connection.prototype._destroyDoc = function(doc) { | ||
var collectionObject = this.collections[doc.collection]; | ||
if (!collectionObject) return; | ||
var docs = this.collections[doc.collection]; | ||
if (!docs) return; | ||
delete collectionObject[doc.name]; | ||
delete docs[doc.id]; | ||
@@ -437,3 +492,3 @@ // Delete the collection container if its empty. This could be a source of | ||
// won't do anyway, but whatever. | ||
if (!util.hasKeys(collectionObject)) { | ||
if (!util.hasKeys(docs)) { | ||
delete this.collections[doc.collection]; | ||
@@ -444,8 +499,8 @@ } | ||
Connection.prototype._addDoc = function(doc) { | ||
var collectionObject = this.collections[doc.collection]; | ||
if (!collectionObject) { | ||
collectionObject = this.collections[doc.collection] = {}; | ||
var docs = this.collections[doc.collection]; | ||
if (!docs) { | ||
docs = this.collections[doc.collection] = {}; | ||
} | ||
if (collectionObject[doc.name] !== doc) { | ||
collectionObject[doc.name] = doc; | ||
if (docs[doc.id] !== doc) { | ||
docs[doc.id] = doc; | ||
} | ||
@@ -455,12 +510,9 @@ }; | ||
// Helper for createFetchQuery and createSubscribeQuery, below. | ||
Connection.prototype._createQuery = function(type, collection, q, options, callback) { | ||
if (type !== 'fetch' && type !== 'sub') { | ||
throw new Error('Invalid query type: ' + type); | ||
} | ||
Connection.prototype._createQuery = function(action, collection, q, options, callback) { | ||
if (!options) options = {}; | ||
var id = this.nextQueryId++; | ||
var query = new Query(type, this, id, collection, q, options, callback); | ||
var query = new Query(action, this, id, collection, q, options, callback); | ||
this.queries[id] = query; | ||
query._execute(); | ||
query.send(); | ||
return query; | ||
@@ -487,3 +539,3 @@ }; | ||
Connection.prototype.createFetchQuery = function(index, q, options, callback) { | ||
return this._createQuery('fetch', index, q, options, callback); | ||
return this._createQuery('qf', index, q, options, callback); | ||
}; | ||
@@ -498,3 +550,70 @@ | ||
Connection.prototype.createSubscribeQuery = function(index, q, options, callback) { | ||
return this._createQuery('sub', index, q, options, callback); | ||
return this._createQuery('qs', index, q, options, callback); | ||
}; | ||
Connection.prototype.hasPending = function() { | ||
return !!( | ||
this._firstDoc(hasPending) || | ||
this._firstQuery(callbackPending) | ||
); | ||
}; | ||
function hasPending(doc) { | ||
return doc.hasPending(); | ||
} | ||
function callbackPending(query) { | ||
return query.callback; | ||
} | ||
Connection.prototype.hasWritePending = function() { | ||
return !!this._firstDoc(hasWritePending); | ||
}; | ||
function hasWritePending(doc) { | ||
return doc.hasWritePending(); | ||
} | ||
Connection.prototype.whenNothingPending = function(callback) { | ||
var doc = this._firstDoc(hasPending); | ||
if (doc) { | ||
// If a document is found with a pending operation, wait for it to emit | ||
// that nothing is pending anymore, and then recheck all documents again. | ||
// We have to recheck all documents, just in case another mutation has | ||
// been made in the meantime as a result of an event callback | ||
doc.once('nothing pending', this._nothingPendingRetry(callback)); | ||
return; | ||
} | ||
var query = this._firstQuery(callbackPending); | ||
if (query) { | ||
query.once('ready', this._nothingPendingRetry(callback)); | ||
} | ||
// Call back when no pending operations | ||
process.nextTick(callback); | ||
}; | ||
Connection.prototype._nothingPendingRetry = function(callback) { | ||
var connection = this; | ||
return function() { | ||
process.nextTick(function() { | ||
connection.whenNothingPending(callback); | ||
}); | ||
}; | ||
}; | ||
Connection.prototype._firstDoc = function(fn) { | ||
for (var collection in this.collections) { | ||
var docs = this.collections[collection]; | ||
for (var id in docs) { | ||
var doc = docs[id]; | ||
if (fn(doc)) { | ||
return doc; | ||
} | ||
} | ||
} | ||
}; | ||
Connection.prototype._firstQuery = function(fn) { | ||
for (var id in this.queries) { | ||
var query = this.queries[id]; | ||
if (fn(query)) { | ||
return query; | ||
} | ||
} | ||
}; |
@@ -7,3 +7,3 @@ var types = require('../types').map; | ||
* | ||
* It is is uniquely identified by its `name` and `collection`. Documents | ||
* It is is uniquely identified by its `id` and `collection`. Documents | ||
* should not be created directly. Create them with Connection.get() | ||
@@ -17,3 +17,2 @@ * | ||
* doc.subscribe(function(error) { | ||
* doc.state // = 'ready' | ||
* doc.subscribed // = true | ||
@@ -48,4 +47,3 @@ * }) | ||
* locked state which only allows reading operations. | ||
* - `subscribed (error)` The document was subscribed | ||
* - `created ()` The document was created. That means its type was | ||
* - `create ()` The document was created. That means its type was | ||
* set and it has some initial data. | ||
@@ -55,3 +53,2 @@ * - `del (snapshot)` Fired after the document is deleted, that is | ||
* arguments | ||
* - `error` | ||
* | ||
@@ -61,3 +58,3 @@ */ | ||
module.exports = Doc; | ||
function Doc(connection, collection, name) { | ||
function Doc(connection, collection, id) { | ||
emitter.EventEmitter.call(this); | ||
@@ -68,3 +65,3 @@ | ||
this.collection = collection; | ||
this.name = name; | ||
this.id = id; | ||
@@ -74,35 +71,13 @@ this.version = this.type = null; | ||
// **** State in document: | ||
// Array of callbacks or nulls as placeholders | ||
this.inflightFetch = []; | ||
this.inflightSubscribe = []; | ||
this.inflightUnsubscribe = []; | ||
this.pendingFetch = []; | ||
// The action the document tries to perform with the server | ||
// | ||
// - subscribe | ||
// - unsubscribe | ||
// - fetch | ||
// - submit: send an operation | ||
this.action = null; | ||
// The data the document object stores can be in one of the following three states: | ||
// - No data. (null) We honestly don't know whats going on. | ||
// - Floating ('floating'): we have a locally created document that hasn't | ||
// been created on the server yet) | ||
// - Live ('ready') (we have data thats current on the server at some version). | ||
this.state = null; | ||
// Our subscription status. Either we're subscribed on the server, or we aren't. | ||
// Whether we think we are subscribed on the server | ||
this.subscribed = false; | ||
// Either we want to be subscribed (true), we want a new snapshot from the | ||
// server ('fetch'), or we don't care (false). This is also used when we | ||
// disconnect & reconnect to decide what to do. | ||
// Whether to re-establish the subscription on reconnect | ||
this.wantSubscribe = false; | ||
// This list is used for subscribe and unsubscribe, since we'll only want to | ||
// do one thing at a time. | ||
this._subscribeCallbacks = []; | ||
// *** end state stuff. | ||
// This doesn't provide any standard API access right now. | ||
this.provides = {}; | ||
// The op that is currently roundtripping to the server, or null. | ||
@@ -112,7 +87,6 @@ // | ||
// | ||
// This has the same format as an entry in pendingData, which is: | ||
// {[create:{...}], [del:true], [op:...], callbacks:[...], src:, seq:} | ||
this.inflightData = null; | ||
// This has the same format as an entry in pendingOps | ||
this.inflightOp = null; | ||
// All ops that are waiting for the server to acknowledge this.inflightData | ||
// All ops that are waiting for the server to acknowledge this.inflightOp | ||
// This used to just be a single operation, but creates & deletes can't be | ||
@@ -122,3 +96,3 @@ // composed with regular operations. | ||
// This is a list of {[create:{...}], [del:true], [op:...], callbacks:[...]} | ||
this.pendingData = []; | ||
this.pendingOps = []; | ||
@@ -130,21 +104,13 @@ // The OT type of this document. | ||
// For debouncing getLatestOps calls | ||
this._getLatestTimeout = null; | ||
// Prevents submitOp from accepting operations | ||
this.locked = false; | ||
} | ||
emitter.mixin(Doc); | ||
/** | ||
* Unsubscribe | ||
*/ | ||
Doc.prototype.destroy = function(callback) { | ||
var doc = this; | ||
this.unsubscribe(function() { | ||
// Don't care if there's an error unsubscribing. | ||
if (doc.hasPending()) { | ||
doc.once('nothing pending', function() { | ||
doc.connection._destroyDoc(doc); | ||
}); | ||
} else { | ||
doc.connection._destroyDoc(doc); | ||
doc.whenNothingPending(function() { | ||
doc.connection._destroyDoc(doc); | ||
if (doc.wantSubscribe) { | ||
return doc.unsubscribe(callback); | ||
} | ||
@@ -165,3 +131,3 @@ if (callback) callback(); | ||
if (typeof newType === 'string') { | ||
if (!types[newType]) throw new Error('Missing type ' + newType + ' ' + this.collection + ' ' + this.name); | ||
if (!types[newType]) throw new Error('Missing type ' + newType); | ||
newType = types[newType]; | ||
@@ -175,11 +141,7 @@ } | ||
if (!newType) { | ||
this.provides = {}; | ||
this.snapshot = undefined; | ||
} else if (newType.api) { | ||
// Register the new type's API. | ||
this.provides = newType.api.provides; | ||
} | ||
}; | ||
// Injest snapshot data. This data must include a version, snapshot and type. | ||
// Ingest snapshot data. This data must include a version, snapshot and type. | ||
// This is used both to ingest data that was exported with a webpage and data | ||
@@ -191,47 +153,41 @@ // that was received from the server during a fetch. | ||
// @param data.type | ||
// @fires ready | ||
Doc.prototype.ingestData = function(data) { | ||
// Ignore if the document is already created | ||
if (this.type) return; | ||
if (typeof data.v !== 'number') { | ||
throw new Error('Missing version in ingested data ' + this.collection + ' ' + this.name); | ||
throw new Error('Missing version in ingested data ' + this.collection + ' ' + this.id); | ||
} | ||
if (this.state) { | ||
// Silently ignore if doc snapshot version is equal or newer | ||
// TODO: Investigate whether this should happen in practice or not | ||
if (this.version >= data.v) return; | ||
console.warn('Ignoring ingest data for', this.collection, this.name, | ||
'\n in state:', this.state, '\n version:', this.version, | ||
'\n snapshot:\n', this.snapshot, '\n incoming data:\n', data); | ||
return; | ||
} | ||
this.version = data.v; | ||
// data.data is what the server will actually send. data.snapshot is the old | ||
// field name - supported now for backwards compatibility. | ||
this.snapshot = data.data; | ||
this._setType(data.type); | ||
this.state = 'ready'; | ||
this.emit('ready'); | ||
}; | ||
// Get and return the current document snapshot. | ||
Doc.prototype.getSnapshot = function() { | ||
return this.snapshot; | ||
}; | ||
// The callback will be called at a time when the document has a snapshot and | ||
// you can start applying operations. This may be immediately. | ||
Doc.prototype.whenReady = function(fn) { | ||
if (this.state === 'ready') { | ||
fn(); | ||
} else { | ||
this.once('ready', fn); | ||
Doc.prototype.whenNothingPending = function(callback) { | ||
if (this.hasPending()) { | ||
this.once('nothing pending', callback); | ||
return; | ||
} | ||
callback(); | ||
}; | ||
Doc.prototype.hasPending = function() { | ||
return this.action != null || this.inflightData != null || !!this.pendingData.length; | ||
return !!( | ||
this.inflightOp || | ||
this.pendingOps.length || | ||
this.inflightFetch.length || | ||
this.inflightSubscribe.length || | ||
this.inflightUnsubscribe.length || | ||
this.pendingFetch.length | ||
); | ||
}; | ||
Doc.prototype.hasWritePending = function() { | ||
return !!(this.inflightOp || this.pendingOps.length); | ||
}; | ||
Doc.prototype._emitNothingPending = function() { | ||
if (this.hasWritePending()) return; | ||
this.emit('no write pending'); | ||
if (this.hasPending()) return; | ||
@@ -241,13 +197,21 @@ this.emit('nothing pending'); | ||
// **** Helpers for network messages | ||
// This function exists so connection can call it directly for bulk subscribes. | ||
// It could just make a temporary object literal, thats pretty slow. | ||
Doc.prototype._handleFetch = function(err, data) { | ||
var callback = this.inflightFetch.shift(); | ||
if (err) { | ||
callback && callback(err); | ||
this._emitNothingPending(); | ||
return; | ||
} | ||
if (data) this.ingestData(data); | ||
callback && callback(); | ||
this._emitNothingPending(); | ||
}; | ||
Doc.prototype._handleSubscribe = function(err, data) { | ||
if (err && err !== 'Already subscribed') { | ||
console.error('Could not subscribe:', err, this.collection, this.name); | ||
this.emit('error', err); | ||
// There's probably a reason we couldn't subscribe. Don't retry. | ||
this._setWantSubscribe(false, null, err); | ||
var callback = this.inflightSubscribe.shift(); | ||
if (err) { | ||
callback && callback(err); | ||
this._emitNothingPending(); | ||
return; | ||
@@ -257,128 +221,58 @@ } | ||
this.subscribed = true; | ||
this._clearAction(); | ||
this.emit('subscribe'); | ||
this._finishSub(); | ||
callback && callback(); | ||
this._emitNothingPending(); | ||
}; | ||
// This is called by the connection when it receives a message for the document. | ||
Doc.prototype._onMessage = function(msg) { | ||
if (msg.c !== this.collection || msg.d !== this.name) { | ||
// This should never happen - its a sanity check for bugs in the connection code. | ||
throw new Error('Got message for wrong document'); | ||
Doc.prototype._handleUnsubscribe = function(err) { | ||
var callback = this.inflightUnsubscribe.shift(); | ||
if (err) { | ||
callback && callback(err); | ||
this._emitNothingPending(); | ||
return; | ||
} | ||
this.subscribed = false; | ||
callback && callback(); | ||
this._emitNothingPending(); | ||
}; | ||
// msg.a = the action. | ||
switch (msg.a) { | ||
case 'fetch': | ||
// We're done fetching. This message has no other information. | ||
if (msg.data) this.ingestData(msg.data); | ||
if (this.wantSubscribe === 'fetch') this.wantSubscribe = false; | ||
this._clearAction(); | ||
this._finishSub(msg.error); | ||
return; | ||
Doc.prototype._handleOp = function(err, msg) { | ||
if (this.inflightOp && err) { | ||
this._rollback(err); | ||
return; | ||
} | ||
case 'sub': | ||
// Subscribe reply. | ||
this._handleSubscribe(msg.error, msg.data); | ||
return; | ||
if (this.inflightOp && | ||
msg.src === this.inflightOp.src && | ||
msg.seq === this.inflightOp.seq) { | ||
// The op has already been applied locally. Just update the version | ||
// and pending state appropriately | ||
this._opAcknowledged(msg); | ||
return; | ||
} | ||
case 'unsub': | ||
// Unsubscribe reply | ||
this.subscribed = false; | ||
this.emit('unsubscribe'); | ||
if (this.version == null || msg.v > this.version) { | ||
// This will happen in normal operation if we become subscribed to a | ||
// new document via a query. It can also happen if we get an op for | ||
// a future version beyond the version we are expecting next. This | ||
// could happen if the server doesn't publish an op for whatever reason | ||
// or because of a race condition. In any case, we can send a fetch | ||
// command to catch back up. | ||
if (this.inflightFetch.length || this.inflightSubscribe.length) return; | ||
this.fetch(); | ||
return; | ||
} | ||
this._clearAction(); | ||
this._finishSub(msg.error); | ||
return; | ||
if (msg.v < this.version) { | ||
// We can safely ignore the old (duplicate) operation. | ||
return; | ||
} | ||
case 'op': | ||
if (msg.error) { | ||
// The server has rejected an op from the client for an unexpected reason. | ||
// We'll send the error message to the user and try to roll back the change. | ||
if (this.inflightData) { | ||
console.warn('Operation was rejected (' + msg.error + '). Trying to rollback change locally.'); | ||
this._tryRollback(this.inflightData); | ||
this._clearInflightOp(msg.error); | ||
} else { | ||
// I managed to get into this state once. I'm not sure how it happened. | ||
// The op was maybe double-acknowledged? | ||
console.warn('Second acknowledgement message (error) received', msg, this); | ||
} | ||
return; | ||
} | ||
if (this.inflightOp) transformX(this.inflightOp, msg); | ||
if (this.inflightData && | ||
msg.src === this.inflightData.src && | ||
msg.seq === this.inflightData.seq) { | ||
// The op has already been applied locally. Just update the version | ||
// and pending state appropriately | ||
this._opAcknowledged(msg); | ||
return; | ||
} | ||
if (this.version == null || msg.v > this.version) { | ||
// This will happen in normal operation if we become subscribed to a | ||
// new document via a query. It can also happen if we get an op for | ||
// a future version beyond the version we are expecting next. This | ||
// could happen if the server doesn't publish an op for whatever reason | ||
// or because of a race condition. In any case, we can send a fetch | ||
// command to catch back up. | ||
this._getLatestOps(); | ||
return; | ||
} | ||
if (msg.v < this.version) { | ||
// This will happen naturally in the following (or similar) cases: | ||
// | ||
// Client is not subscribed to document. | ||
// -> client submits an operation (v=10) | ||
// -> client subscribes to a query which matches this document. Says we | ||
// have v=10 of the doc. | ||
// | ||
// <- server acknowledges the operation (v=11). Server acknowledges the | ||
// operation because the doc isn't subscribed | ||
// <- server processes the query, which says the client only has v=10. | ||
// Server subscribes at v=10 not v=11, so we get another copy of the | ||
// v=10 operation. | ||
// | ||
// In this case, we can safely ignore the old (duplicate) operation. | ||
return; | ||
} | ||
if (this.inflightData) xf(this.inflightData, msg); | ||
for (var i = 0; i < this.pendingData.length; i++) { | ||
xf(this.pendingData[i], msg); | ||
} | ||
this.version++; | ||
this._otApply(msg, false); | ||
return; | ||
default: | ||
console.warn('Unhandled document message:', msg); | ||
for (var i = 0; i < this.pendingOps.length; i++) { | ||
transformX(this.pendingOps[i], msg); | ||
} | ||
}; | ||
Doc.prototype._getLatestOps = function() { | ||
var doc = this; | ||
var debounced = false; | ||
if (doc._getLatestTimeout) { | ||
debounced = true; | ||
} else { | ||
// Send a fetch command, which will get us the missing ops to catch back up | ||
// or the full doc if our version is currently null | ||
doc.connection.sendFetch(doc, doc.version); | ||
} | ||
// Debounce calls, since we are likely to get multiple future operations | ||
// in a rapid sequence | ||
clearTimeout(doc._getLatestTimeout); | ||
doc._getLatestTimeout = setTimeout(function() { | ||
doc._getLatestTimeout = null; | ||
// Send another fetch at the end of the final timeout interval if we were | ||
// debounced to make sure we didn't miss anything | ||
if (debounced) { | ||
doc.connection.sendFetch(doc, doc.version); | ||
} | ||
}, 5000); | ||
this.version++; | ||
this._otApply(msg, false); | ||
return; | ||
@@ -392,119 +286,105 @@ }; | ||
this.flush(); | ||
this._resubscribe(); | ||
} else { | ||
this.subscribed = false; | ||
this._clearAction(); | ||
if (this.inflightFetch.length || this.inflightSubscribe.length) { | ||
this.pendingFetch = this.pendingFetch.concat(this.inflightFetch, this.inflightSubscribe); | ||
this.inflightFetch.length = 0; | ||
this.inflightSubscribe.length = 0; | ||
} | ||
if (this.inflightUnsubscribe.length) { | ||
var callbacks = this.inflightUnsubscribe; | ||
this.inflightUnsubscribe = []; | ||
callEach(callbacks); | ||
} | ||
} | ||
}; | ||
Doc.prototype._clearAction = function() { | ||
this.action = null; | ||
this.flush(); | ||
this._emitNothingPending(); | ||
}; | ||
Doc.prototype._resubscribe = function() { | ||
var callbacks = this.pendingFetch; | ||
this.pendingFetch = []; | ||
// Send the next pending op to the server, if we can. | ||
// | ||
// Only one operation can be in-flight at a time. If an operation is already on | ||
// its way, or we're not currently connected, this method does nothing. | ||
Doc.prototype.flush = function() { | ||
// Ignore if we can't send or we are already sending an op | ||
if (!this.connection.canSend || this.inflightData) return; | ||
// Pump and dump any no-ops from the front of the pending op list. | ||
var op; | ||
while (this.pendingData.length && isNoOp(op = this.pendingData[0])) { | ||
var callbacks = op.callbacks; | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i](op.error); | ||
if (this.wantSubscribe) { | ||
if (callbacks.length) { | ||
this.subscribe(function(err) { | ||
callEach(callbacks, err); | ||
}); | ||
return; | ||
} | ||
this.pendingData.shift(); | ||
} | ||
// Send first pending op unless paused | ||
if (!this.paused && this.pendingData.length) { | ||
this._sendOp(); | ||
if (this.subscribed || this.inflightSubscribe.length) return; | ||
this.subscribe(); | ||
return; | ||
} | ||
// Ignore if an action is already in process | ||
if (this.action) return; | ||
// Once all ops are sent, perform subscriptions and fetches | ||
var version = (this.state === 'ready') ? this.version : null; | ||
if (this.subscribed && !this.wantSubscribe) { | ||
this.action = 'unsubscribe'; | ||
this.connection.sendUnsubscribe(this); | ||
} else if (!this.subscribed && this.wantSubscribe === 'fetch') { | ||
this.action = 'fetch'; | ||
this.connection.sendFetch(this, version); | ||
} else if (!this.subscribed && this.wantSubscribe) { | ||
this.action = 'subscribe'; | ||
this.connection.sendSubscribe(this, version); | ||
if (callbacks.length) { | ||
this.fetch(function(err) { | ||
callEach(callbacks, err); | ||
}); | ||
} | ||
}; | ||
// ****** Subscribing, unsubscribing and fetching | ||
// Value is true, false or 'fetch'. | ||
Doc.prototype._setWantSubscribe = function(value, callback, err) { | ||
if (this.subscribed === this.wantSubscribe && | ||
(this.subscribed === value || value === 'fetch' && this.subscribed)) { | ||
if (callback) callback(err); | ||
// Fetch the initial document and keep receiving updates | ||
Doc.prototype.subscribe = function(callback) { | ||
this.wantSubscribe = true; | ||
if (this.connection.canSend) { | ||
this.inflightSubscribe.push(callback); | ||
this.connection.sendSubscribe(this); | ||
return; | ||
} | ||
// If we want to subscribe, don't weaken it to a fetch. | ||
if (value !== 'fetch' || this.wantSubscribe !== true) { | ||
this.wantSubscribe = value; | ||
} | ||
if (callback) this._subscribeCallbacks.push(callback); | ||
this.flush(); | ||
if (callback) this.pendingFetch.push(callback); | ||
}; | ||
// Open the document. There is no callback and no error handling if you're | ||
// already connected. | ||
// | ||
// Only call this once per document. | ||
Doc.prototype.subscribe = function(callback) { | ||
this._setWantSubscribe(true, callback); | ||
}; | ||
// Unsubscribe. The data will stay around in local memory, but we'll stop | ||
// receiving updates. | ||
// receiving updates | ||
Doc.prototype.unsubscribe = function(callback) { | ||
this._setWantSubscribe(false, callback); | ||
this.wantSubscribe = false; | ||
if (this.connection.canSend) { | ||
this.inflightUnsubscribe.push(callback); | ||
this.connection.sendUnsubscribe(this); | ||
return; | ||
} | ||
if (callback) process.nextTick(callback); | ||
}; | ||
// Call to request fresh data from the server. | ||
// Request the current document snapshot or ops that bring us up to date | ||
Doc.prototype.fetch = function(callback) { | ||
this._setWantSubscribe('fetch', callback); | ||
}; | ||
// Called when our subscribe, fetch or unsubscribe messages are acknowledged. | ||
Doc.prototype._finishSub = function(err) { | ||
if (!this._subscribeCallbacks.length) return; | ||
for (var i = 0; i < this._subscribeCallbacks.length; i++) { | ||
this._subscribeCallbacks[i](err); | ||
if (this.connection.canSend) { | ||
this.inflightFetch.push(callback); | ||
this.connection.sendFetch(this); | ||
return; | ||
} | ||
this._subscribeCallbacks.length = 0; | ||
if (callback) this.pendingFetch.push(callback); | ||
}; | ||
// Operations | ||
// Operations // | ||
// Send the next pending op to the server, if we can. | ||
// | ||
// Only one operation can be in-flight at a time. If an operation is already on | ||
// its way, or we're not currently connected, this method does nothing. | ||
Doc.prototype.flush = function() { | ||
// Ignore if we can't send or we are already sending an op | ||
if (!this.connection.canSend || this.inflightOp) return; | ||
// ************ Dealing with operations. | ||
// Clear any no-ops from the front of the pending op list. | ||
while (this.pendingOps.length && isNoOp(this.pendingOps[0])) { | ||
var op = this.pendingOps.shift(); | ||
callEach(op.callbacks); | ||
} | ||
// Send first pending op unless paused | ||
if (!this.paused && this.pendingOps.length) { | ||
this._sendOp(); | ||
} | ||
}; | ||
// Helper function to set op to contain a no-op. | ||
var setNoOp = function(op) { | ||
function setNoOp(op) { | ||
delete op.op; | ||
delete op.create; | ||
delete op.del; | ||
}; | ||
} | ||
var isNoOp = function(op) { | ||
function isNoOp(op) { | ||
return !op.op && !op.create && !op.del; | ||
@@ -514,15 +394,11 @@ } | ||
// Try to compose data2 into data1. Returns truthy if it succeeds, otherwise falsy. | ||
var tryCompose = function(type, data1, data2) { | ||
if (data1.create && data2.del) { | ||
setNoOp(data1); | ||
} else if (data1.create && data2.op) { | ||
// Compose the data into the create data. | ||
var data = (data1.create.data === undefined) ? type.create() : data1.create.data; | ||
data1.create.data = type.apply(data, data2.op); | ||
} else if (isNoOp(data1)) { | ||
function tryCompose(type, data1, data2) { | ||
if (data1.create && data2.op) { | ||
data1.create.data = type.apply(data1.create.data, data2.op); | ||
} else if (data1.op && data2.op && type.compose) { | ||
data1.op = type.compose(data1.op, data2.op); | ||
} else if (data2.del || isNoOp(data1)) { | ||
data1.create = data2.create; | ||
data1.del = data2.del; | ||
data1.op = data2.op; | ||
} else if (data1.op && data2.op && type.compose) { | ||
data1.op = type.compose(data1.op, data2.op); | ||
} else { | ||
@@ -532,6 +408,6 @@ return false; | ||
return true; | ||
}; | ||
} | ||
// Transform server op data by a client op, and vice versa. Ops are edited in place. | ||
var xf = function(client, server) { | ||
function transformX(client, server) { | ||
// In this case, we're in for some fun. There are some local operations | ||
@@ -545,3 +421,3 @@ // which are totally invalid - either the client continued editing a | ||
if (server.create || server.del) return setNoOp(client); | ||
if (client.create) throw new Error('Invalid state. This is a bug. ' + this.collection + ' ' + this.name); | ||
if (client.create) throw new Error('Invalid state. This is a bug. ' + this.collection + ' ' + this.id); | ||
@@ -552,7 +428,5 @@ // The client has deleted the document while the server edited it. Kill the | ||
var clientEdit = client.op; | ||
var serverEdit = server.op; | ||
// We only get here if either the server or client ops are no-op. Carry on, | ||
// nothing to see here. | ||
if (!serverEdit || !clientEdit) return; | ||
if (!server.op || !client.op) return; | ||
@@ -566,11 +440,6 @@ // They both edited the document. This is the normal case for this function - | ||
// op data, we make sure the right type has its transform function called. | ||
if (client.type.transformX) { | ||
var result = client.type.transformX(clientEdit, serverEdit); | ||
client.op = result[0]; | ||
server.op = result[1]; | ||
} else { | ||
client.op = client.type.transform(clientEdit, serverEdit, 'left'); | ||
server.op = client.type.transform(serverEdit, clientEdit, 'right'); | ||
} | ||
}; | ||
var result = client.type.transformX(client.op, server.op); | ||
client.op = result[0]; | ||
server.op = result[1]; | ||
} | ||
@@ -590,33 +459,10 @@ /** | ||
Doc.prototype._otApply = function(op, context) { | ||
this.locked = true; | ||
if (op.create) { | ||
// If the type is currently set, it means we tried creating the document | ||
// and someone else won. client create x server create = server create. | ||
var create = op.create; | ||
this._setType(create.type); | ||
this.snapshot = this.type.create(create.data); | ||
// This is a bit heavyweight, but I want the created event to fire outside of the lock. | ||
this.once('unlock', function() { | ||
this.emit('create', context); | ||
}); | ||
} else if (op.del) { | ||
// The type should always exist in this case. del x _ = del | ||
var oldSnapshot = this.snapshot; | ||
this._setType(null); | ||
this.once('unlock', function() { | ||
this.emit('del', oldSnapshot, context); | ||
}); | ||
} else if (op.op) { | ||
if (!this.type) throw new Error('Document does not exist. ' + this.collection + ' ' + this.name); | ||
if (op.op) { | ||
if (!this.type) throw new Error('Cannot apply op to uncreated document ' + this.collection + '.' + this.id); | ||
var type = this.type; | ||
// This exists so clients can pull any necessary data out of the snapshot | ||
// before it gets changed. | ||
this.emit('before op', op.op, context); | ||
// This exists so clients can pull any necessary data out of the snapshot | ||
// before it gets changed. Previously we kept the old snapshot object and | ||
// passed it to the op event handler. However, apply no longer guarantees | ||
// the old object is still valid. | ||
// | ||
// Because this could be totally unnecessary work, its behind a flag. set | ||
@@ -626,2 +472,3 @@ // doc.incremental to enable. | ||
var doc = this; | ||
this.locked = true; | ||
type.incrementalApply(this.snapshot, op.op, function(component, snapshot) { | ||
@@ -631,2 +478,3 @@ doc.snapshot = snapshot; | ||
}); | ||
this.locked = false; | ||
} else { | ||
@@ -637,12 +485,22 @@ // This is the default case, simply applying the operation to the local snapshot. | ||
} | ||
this.emit('after op', op.op, context); | ||
return; | ||
} | ||
// Its possible for none of the above cases to match, in which case the op is | ||
// a no-op. This will happen when a document has been deleted locally and | ||
// remote ops edit the document. | ||
this.locked = false; | ||
this.emit('unlock'); | ||
if (op.create) { | ||
// If the type is currently set, it means we tried creating the document | ||
// and someone else won. client create x server create = server create. | ||
this._setType(op.create.type); | ||
this.snapshot = this.type.create(op.create.data); | ||
this.emit('create', context); | ||
return; | ||
} | ||
if (op.op) { | ||
this.emit('after op', op.op); | ||
if (op.del) { | ||
// The type should always exist in this case. del x _ = del | ||
var oldSnapshot = this.snapshot; | ||
this._setType(null); | ||
this.emit('del', oldSnapshot, context); | ||
return; | ||
} | ||
@@ -655,5 +513,5 @@ }; | ||
Doc.prototype.retry = function() { | ||
if (!this.inflightData) return; | ||
var threshold = 5000 * Math.pow(2, this.inflightData.retries); | ||
if (this.inflightData.sentAt < Date.now() - threshold) { | ||
if (!this.inflightOp) return; | ||
var threshold = 5000 * Math.pow(2, this.inflightOp.retries); | ||
if (this.inflightOp.sentAt < Date.now() - threshold) { | ||
this.connection.emit('retry', this); | ||
@@ -670,9 +528,9 @@ this._sendOp(); | ||
// When there is no inflightData, send the first item in pendingData. If | ||
// there is inflightData, try sending it again | ||
if (!this.inflightData) { | ||
// When there is no inflightOp, send the first item in pendingOps. If | ||
// there is inflightOp, try sending it again | ||
if (!this.inflightOp) { | ||
// Send first pending op | ||
this.inflightData = this.pendingData.shift(); | ||
this.inflightOp = this.pendingOps.shift(); | ||
} | ||
var data = this.inflightData; | ||
var data = this.inflightOp; | ||
if (!data) { | ||
@@ -688,3 +546,3 @@ throw new Error('no data to send on call to _sendOp'); | ||
// is used on the server to detect when ops have been sent multiple times and | ||
// on the client to match acknowledgement of an op back to the inflightData. | ||
// on the client to match acknowledgement of an op back to the inflightOp. | ||
// Note that the src could be different from this.connection.id after a | ||
@@ -699,3 +557,3 @@ // reconnect, since an op may still be pending after the reconnection and | ||
// src isn't needed on the first try, since the server session will have the | ||
// same id, but it must be set on the inflightData in case it is sent again | ||
// same id, but it must be set on the inflightOp in case it is sent again | ||
// after a reconnect and the connection's id has changed by then | ||
@@ -725,3 +583,3 @@ if (data.src == null) data.src = src; | ||
if (this.locked) { | ||
var err = new Error('Cannot call submitOp from inside an op event handler. ' + this.collection + ' ' + this.name); | ||
var err = new Error('Cannot call submitOp from inside an op event handler. ' + this.collection + ' ' + this.id); | ||
if (callback) return callback(err); | ||
@@ -742,6 +600,2 @@ throw err; | ||
if (!this.state) { | ||
this.state = 'floating'; | ||
} | ||
op.type = this.type; | ||
@@ -753,3 +607,3 @@ op.callbacks = []; | ||
var operation; | ||
var previous = this.pendingData[this.pendingData.length - 1]; | ||
var previous = this.pendingOps[this.pendingOps.length - 1]; | ||
@@ -760,3 +614,3 @@ if (previous && tryCompose(this.type, previous, op)) { | ||
operation = op; | ||
this.pendingData.push(op); | ||
this.pendingOps.push(op); | ||
} | ||
@@ -767,9 +621,8 @@ if (callback) operation.callbacks.push(callback); | ||
// The call to flush is in a timeout so if submitOp() is called multiple | ||
// times in a closure all the ops are combined before being sent to the | ||
// server. It doesn't matter if flush is called a bunch of times. | ||
// The call to flush is delayed so if submitOp() is called multiple times | ||
// synchronously, all the ops are combined before being sent to the server. | ||
var doc = this; | ||
setTimeout(function() { | ||
process.nextTick(function() { | ||
doc.flush(); | ||
}, 0); | ||
}); | ||
}; | ||
@@ -840,27 +693,34 @@ | ||
// This is called when the server acknowledges an operation from the client. | ||
Doc.prototype._opAcknowledged = function(msg) { | ||
if (this.inflightOp.create) { | ||
this.version = msg.v; | ||
// This will be called when the server rejects our operations for some reason. | ||
// There's not much we can do here if the OT type is noninvertable, but that | ||
// shouldn't happen too much in real life because readonly documents should be | ||
// flagged as such. (I should probably figure out a flag for that). | ||
// | ||
// This does NOT get called if our op fails to reach the server for some reason | ||
// - we optimistically assume it'll make it there eventually. | ||
Doc.prototype._tryRollback = function(op) { | ||
// This is probably horribly broken. | ||
if (op.create) { | ||
this._setType(null); | ||
} else if (msg.v !== this.version) { | ||
// We should already be at the same version, because the server should | ||
// have sent all the ops that have happened before acknowledging our op | ||
console.warn('Invalid version from server. Expected: ' + this.version + ' Received: ' + msg.v, msg); | ||
// I don't think its possible to get here if we aren't in a floating state. | ||
if (this.state === 'floating') | ||
this.state = null; | ||
else | ||
console.warn('Rollback a create from state ' + this.state); | ||
// Fetching should get us back to a working document state | ||
return this.fetch(); | ||
} | ||
} else if (op.op && op.type.invert) { | ||
// The op was committed successfully. Increment the version number | ||
this.version++; | ||
this._clearInflightOp(); | ||
}; | ||
// This will be called when the server rejects our operation for some reason. | ||
Doc.prototype._rollback = function(err) { | ||
// The server has rejected an op from the client for an unexpected reason. | ||
// We'll send the error message to the user and try to roll back the change. | ||
var op = this.inflightOp; | ||
if (op.op && op.type.invert) { | ||
op.op = op.type.invert(op.op); | ||
// Transform the undo operation by any pending ops. | ||
for (var i = 0; i < this.pendingData.length; i++) { | ||
xf(this.pendingData[i], op); | ||
for (var i = 0; i < this.pendingOps.length; i++) { | ||
transformX(this.pendingOps[i], op); | ||
} | ||
@@ -875,23 +735,27 @@ | ||
this._otApply(op, false); | ||
} else if (op.op || op.del) { | ||
// This is where an undo stack would come in handy. | ||
this._setType(null); | ||
this.version = null; | ||
this.state = null; | ||
this.subscribed = false; | ||
this.emit('error', 'Op apply failed and the operation could not be reverted'); | ||
// Trigger a fetch. In our invalid state, we can't really do anything. | ||
this.fetch(); | ||
this.flush(); | ||
this._clearInflightOp(err); | ||
return; | ||
} | ||
// Cancel all pending ops and reset if we can't invert | ||
var pending = this.pendingOps; | ||
this._setType(null); | ||
this.version = null; | ||
this.inflightOp = null; | ||
this.pendingOps = []; | ||
// Fetch the latest from the server to get us back into a working state | ||
this.fetch(function() { | ||
callEach(op.callbacks, err); | ||
for (var i = 0; i < pending.length; i++) { | ||
callEach(pending[i].callbacks, err); | ||
} | ||
}); | ||
}; | ||
Doc.prototype._clearInflightOp = function(error) { | ||
var callbacks = this.inflightData.callbacks; | ||
for (var i = 0; i < callbacks.length; i++) { | ||
callbacks[i](error || this.inflightData.error); | ||
} | ||
Doc.prototype._clearInflightOp = function(err) { | ||
callEach(this.inflightOp.callbacks, err); | ||
this.inflightData = null; | ||
this.inflightOp = null; | ||
this.flush(); | ||
@@ -901,34 +765,7 @@ this._emitNothingPending(); | ||
// This is called when the server acknowledges an operation from the client. | ||
Doc.prototype._opAcknowledged = function(msg) { | ||
// Our inflight op has been acknowledged, so we can throw away the inflight data. | ||
// (We were only holding on to it incase we needed to resend the op.) | ||
if (!this.state) { | ||
throw new Error('opAcknowledged called from a null state. This should never happen. ' + this.collection + '.' + this.name); | ||
} else if (this.state === 'floating') { | ||
if (!this.inflightData.create) throw new Error('Cannot acknowledge an op. ' + this.collection + '.' + this.name); | ||
// Our create has been acknowledged. This is the same as ingesting some data. | ||
this.version = msg.v; | ||
this.state = 'ready'; | ||
var doc = this; | ||
setTimeout(function() { | ||
doc.emit('ready'); | ||
}, 0); | ||
} else { | ||
// We already have a snapshot. The snapshot should be at the acknowledged | ||
// version, because the server has sent us all the ops that have happened | ||
// before acknowledging our op. | ||
// This should never happen - something is out of order. | ||
if (msg.v !== this.version) { | ||
throw new Error('Invalid version from server. This can happen when you submit ops in a submitOp callback. ' + | ||
'Expected: ' + this.version + ' Message version: ' + msg.v + ' ' + this.collection + '.' + this.name); | ||
} | ||
function callEach(callbacks, err) { | ||
for (var i = 0; i < callbacks.length; i++) { | ||
var callback = callbacks[i]; | ||
if (callback) callback(err); | ||
} | ||
// The op was committed successfully. Increment the version number | ||
this.version++; | ||
this._clearInflightOp(); | ||
}; | ||
} |
@@ -8,7 +8,7 @@ var emitter = require('../emitter'); | ||
module.exports = Query; | ||
function Query(type, connection, id, collection, query, options, callback) { | ||
function Query(action, connection, id, collection, query, options, callback) { | ||
emitter.EventEmitter.call(this); | ||
// 'fetch' or 'sub' | ||
this.type = type; | ||
// 'qf' or 'qs' | ||
this.action = action; | ||
@@ -30,21 +30,19 @@ this.connection = connection; | ||
// so that a query can be serialized and then re-established | ||
this.results = options.results || []; | ||
this.results = options.results; | ||
this.extra = undefined; | ||
// Do we have some initial data? | ||
this.ready = false; | ||
this.callback = callback; | ||
this.callback = callback; | ||
this.sent = false; | ||
} | ||
emitter.mixin(Query); | ||
Query.prototype.action = 'qsub'; | ||
// Helper for subscribe & fetch, since they share the same message format. | ||
// | ||
// This function actually issues the query. | ||
Query.prototype._execute = function() { | ||
Query.prototype.send = function() { | ||
if (!this.connection.canSend) return; | ||
var msg = { | ||
a: 'q' + this.type, | ||
a: this.action, | ||
id: this.id, | ||
@@ -55,145 +53,135 @@ c: this.collection, | ||
if (this.db != null) msg.db = this.db; | ||
if (this.results.length) { | ||
if (this.results) { | ||
// Collect the version of all the documents in the current result set so we | ||
// don't need to be sent their snapshots again. | ||
var versions = {}; | ||
var results = []; | ||
for (var i = 0; i < this.results.length; i++) { | ||
var doc = this.results[i]; | ||
if (doc.version == null || doc.collection !== this.collection) continue; | ||
versions[doc.name] = doc.version; | ||
results.push([doc.id, doc.version]); | ||
} | ||
msg.vs = versions; | ||
msg.r = results; | ||
} | ||
this.connection.send(msg); | ||
this.sent = true; | ||
}; | ||
// Make a list of documents from the list of server-returned data objects | ||
Query.prototype._dataToDocs = function(data) { | ||
var results = []; | ||
var lastType; | ||
for (var i = 0; i < data.length; i++) { | ||
var docData = data[i]; | ||
// Types are only put in for the first result in the set and every time the type changes in the list. | ||
if (docData.type) { | ||
lastType = docData.type; | ||
} else { | ||
docData.type = lastType; | ||
} | ||
// This will ultimately call doc.ingestData(), which is what populates | ||
// the doc snapshot and version with the data returned by the query | ||
var doc = this.connection.get(docData.c || this.collection, docData.d, docData); | ||
results.push(doc); | ||
// Destroy the query object. Any subsequent messages for the query will be | ||
// ignored by the connection. | ||
Query.prototype.destroy = function(callback) { | ||
if (this.connection.canSend && this.action === 'qs') { | ||
this.connection.send({a: 'qu', id: this.id}); | ||
} | ||
return results; | ||
this.connection._destroyQuery(this); | ||
}; | ||
// Destroy the query object. Any subsequent messages for the query will be | ||
// ignored by the connection. You should unsubscribe from the query before | ||
// destroying it. | ||
Query.prototype.destroy = function() { | ||
if (this.connection.canSend && this.type === 'sub') { | ||
this.connection.send({a:'qunsub', id:this.id}); | ||
Query.prototype._onConnectionStateChanged = function() { | ||
if (this.connection.canSend && !this.sent) { | ||
this.send(); | ||
} else { | ||
this.sent = false; | ||
} | ||
}; | ||
Query.prototype._handleFetch = function(err, data, extra) { | ||
var callback = this.callback; | ||
this.callback = null; | ||
// Once a fetch query gets its data, it is destroyed. | ||
this.connection._destroyQuery(this); | ||
if (err) { | ||
this.emit('ready'); | ||
if (callback) return callback(err); | ||
return this.emit('error', err); | ||
} | ||
var results = this._dataToDocs(data); | ||
if (callback) callback(null, results, extra); | ||
this.emit('ready'); | ||
}; | ||
Query.prototype._onConnectionStateChanged = function(state, reason) { | ||
if (this.connection.state === 'connecting') { | ||
this._execute(); | ||
Query.prototype._handleSubscribe = function(err, data, extra) { | ||
var callback = this.callback; | ||
this.callback = null; | ||
if (err) { | ||
// Cleanup the query if the initial subscribe returns an error | ||
this.connection._destroyQuery(this); | ||
this.emit('ready'); | ||
if (callback) return callback(err); | ||
return this.emit('error', err); | ||
} | ||
// Subscribe will only return results if issuing a new query without | ||
// previous results. On a resubscribe, changes to the results or ops will | ||
// have already been sent as individual diff events | ||
if (data) { | ||
this.results = this._dataToDocs(data); | ||
this.extra = extra | ||
} | ||
if (callback) callback(null, this.results, this.extra); | ||
this.emit('ready'); | ||
}; | ||
// Internal method called from connection to pass server messages to the query. | ||
Query.prototype._onMessage = function(msg) { | ||
if ((msg.a === 'qfetch') !== (this.type === 'fetch')) { | ||
console.warn('Invalid message sent to query', msg, this); | ||
return; | ||
Query.prototype._handleDiff = function(err, diff, extra) { | ||
if (err) { | ||
return this.emit('error', err); | ||
} | ||
if (msg.error) this.emit('error', msg.error); | ||
// Query diff data (inserts and removes) | ||
if (diff) { | ||
// We need to go through the list twice. First, we'll ingest all the | ||
// new documents and set them as subscribed. After that we'll emit | ||
// events and actually update our list. This avoids race conditions | ||
// around setting documents to be subscribed & unsubscribing documents | ||
// in event callbacks. | ||
for (var i = 0; i < diff.length; i++) { | ||
var d = diff[i]; | ||
if (d.type === 'insert') d.values = this._dataToDocs(d.values); | ||
} | ||
switch (msg.a) { | ||
case 'qfetch': | ||
var results = msg.data ? this._dataToDocs(msg.data) : undefined; | ||
if (this.callback) this.callback(msg.error, results, msg.extra); | ||
// Once a fetch query gets its data, it is destroyed. | ||
this.connection._destroyQuery(this); | ||
break; | ||
case 'q': | ||
// Query diff data (inserts and removes) | ||
if (msg.diff) { | ||
// We need to go through the list twice. First, we'll ingest all the | ||
// new documents and set them as subscribed. After that we'll emit | ||
// events and actually update our list. This avoids race conditions | ||
// around setting documents to be subscribed & unsubscribing documents | ||
// in event callbacks. | ||
for (var i = 0; i < msg.diff.length; i++) { | ||
var d = msg.diff[i]; | ||
if (d.type === 'insert') d.values = this._dataToDocs(d.values); | ||
} | ||
for (var i = 0; i < msg.diff.length; i++) { | ||
var d = msg.diff[i]; | ||
switch (d.type) { | ||
case 'insert': | ||
var newDocs = d.values; | ||
Array.prototype.splice.apply(this.results, [d.index, 0].concat(newDocs)); | ||
this.emit('insert', newDocs, d.index); | ||
break; | ||
case 'remove': | ||
var howMany = d.howMany || 1; | ||
var removed = this.results.splice(d.index, howMany); | ||
this.emit('remove', removed, d.index); | ||
break; | ||
case 'move': | ||
var howMany = d.howMany || 1; | ||
var docs = this.results.splice(d.from, howMany); | ||
Array.prototype.splice.apply(this.results, [d.to, 0].concat(docs)); | ||
this.emit('move', docs, d.from, d.to); | ||
break; | ||
} | ||
} | ||
for (var i = 0; i < diff.length; i++) { | ||
var d = diff[i]; | ||
switch (d.type) { | ||
case 'insert': | ||
var newDocs = d.values; | ||
Array.prototype.splice.apply(this.results, [d.index, 0].concat(newDocs)); | ||
this.emit('insert', newDocs, d.index); | ||
break; | ||
case 'remove': | ||
var howMany = d.howMany || 1; | ||
var removed = this.results.splice(d.index, howMany); | ||
this.emit('remove', removed, d.index); | ||
break; | ||
case 'move': | ||
var howMany = d.howMany || 1; | ||
var docs = this.results.splice(d.from, howMany); | ||
Array.prototype.splice.apply(this.results, [d.to, 0].concat(docs)); | ||
this.emit('move', docs, d.from, d.to); | ||
break; | ||
} | ||
} | ||
} | ||
if (msg.extra !== void 0) { | ||
this.emit('extra', msg.extra); | ||
} | ||
break; | ||
case 'qsub': | ||
// This message replaces the entire result set with the set passed. | ||
if (!msg.error) { | ||
var previous = this.results; | ||
// Then add everything in the new result set. | ||
this.results = this._dataToDocs(msg.data); | ||
this.extra = msg.extra; | ||
this.ready = true; | ||
this.emit('change', this.results, previous); | ||
} | ||
if (this.callback) { | ||
this.callback(msg.error, this.results, this.extra); | ||
delete this.callback; | ||
} | ||
break; | ||
if (extra !== undefined) { | ||
this.emit('extra', extra); | ||
} | ||
}; | ||
// Change the thing we're searching for. This isn't fully supported on the | ||
// backend (it destroys the old query and makes a new one) - but its | ||
// programatically useful and I might add backend support at some point. | ||
Query.prototype.setQuery = function(q) { | ||
if (this.type !== 'sub') throw new Error('cannot change a fetch query'); | ||
// Make a list of documents from the list of server-returned data objects | ||
Query.prototype._dataToDocs = function(data) { | ||
var results = []; | ||
var lastType; | ||
for (var i = 0; i < data.length; i++) { | ||
var docData = data[i]; | ||
this.query = q; | ||
if (this.connection.canSend) { | ||
// There's no 'change' message to send to the server. Just resubscribe. | ||
this.connection.send({a:'qunsub', id:this.id}); | ||
this._execute(); | ||
// Types are only put in for the first result in the set and every time the type changes in the list. | ||
if (docData.type) { | ||
lastType = docData.type; | ||
} else { | ||
docData.type = lastType; | ||
} | ||
// This will ultimately call doc.ingestData(), which is what populates | ||
// the doc snapshot and version with the data returned by the query | ||
var doc = this.connection.get(docData.c || this.collection, docData.d, docData); | ||
results.push(doc); | ||
} | ||
return results; | ||
}; |
var async = require('async'); | ||
var UNIMPLEMENTED = {code: 5000, message: 'DB method unimplemented'}; | ||
function DB(options) { | ||
@@ -17,7 +15,7 @@ // pollDebounce is the minimum time in ms between query polls | ||
DB.prototype.commit = function(collection, id, op, snapshot, callback) { | ||
callback(UNIMPLEMENTED); | ||
callback(new Error('commit DB method unimplemented')); | ||
}; | ||
DB.prototype.getSnapshot = function(collection, id, fields, callback) { | ||
callback(UNIMPLEMENTED); | ||
callback(new Error('getSnapshot DB method unimplemented')); | ||
}; | ||
@@ -35,3 +33,4 @@ | ||
}, function(err) { | ||
callback(err, err ? null : results); | ||
if (err) return callback(err); | ||
callback(null, results); | ||
}); | ||
@@ -41,3 +40,3 @@ }; | ||
DB.prototype.getOps = function(collection, id, from, to, callback) { | ||
callback(UNIMPLEMENTED); | ||
callback(new Error('getOps DB method unimplemented')); | ||
}; | ||
@@ -61,3 +60,4 @@ | ||
}, function(err) { | ||
callback(err, err ? null : results); | ||
if (err) return callback(err); | ||
callback(null, results); | ||
}); | ||
@@ -67,3 +67,3 @@ }; | ||
DB.prototype.query = function(collection, query, fields, options, callback) { | ||
callback(UNIMPLEMENTED); | ||
callback(new Error('query DB method unimplemented')); | ||
}; | ||
@@ -73,3 +73,3 @@ | ||
var fields = {}; | ||
this.query(collection, query, fields, function(err, snapshots, extra) { | ||
this.query(collection, query, fields, options, function(err, snapshots, extra) { | ||
if (err) return callback(err); | ||
@@ -85,3 +85,3 @@ var ids = []; | ||
DB.prototype.queryPollDoc = function(collection, id, query, options, callback) { | ||
callback(UNIMPLEMENTED); | ||
callback(new Error('queryPollDoc DB method unimplemented')); | ||
}; | ||
@@ -88,0 +88,0 @@ |
@@ -5,5 +5,3 @@ var inherits = require('util').inherits; | ||
// Stream of operations. Subscribe returns one of these. Passing a version | ||
// makes the stream not emit any operations that are earlier than the | ||
// specified version. This will be updated as the stream emits ops. | ||
// Stream of operations. Subscribe returns one of these | ||
function OpStream() { | ||
@@ -16,4 +14,2 @@ Readable.call(this, {objectMode: true}); | ||
this.projection = null; | ||
// Version number of the next op we expect to see | ||
this.v = null; | ||
@@ -33,23 +29,12 @@ this.open = true; | ||
OpStream.prototype.initDocSubscribe = function(backend, agent, projection, version) { | ||
OpStream.prototype.initProjection = function(backend, agent, projection) { | ||
this.backend = backend; | ||
this.agent = agent; | ||
this.projection = projection; | ||
this.v = version; | ||
}; | ||
OpStream.prototype.pushOp = function(op) { | ||
// We shouldn't get messages after unsubscribe, but it's happened | ||
// Ignore any messages after unsubscribe | ||
if (!this.open) return; | ||
if (this.v && op.v) { | ||
// op.v will usually be == stream.v, except if we're subscribing & | ||
// buffering ops | ||
if (op.v >= this.v) { | ||
this.v = op.v + 1; | ||
} else { | ||
return; | ||
} | ||
} | ||
if (this.backend) { | ||
@@ -65,2 +50,8 @@ var stream = this; | ||
OpStream.prototype.pushOps = function(ops) { | ||
for (var i = 0; i < ops.length; i++) { | ||
this.pushOp(ops[i]); | ||
} | ||
}; | ||
OpStream.prototype.destroy = function() { | ||
@@ -73,52 +64,1 @@ if (!this.open) return; | ||
}; | ||
// Helper for subscribe & bulkSubscribe to repack the start of a stream given | ||
// potential operations which happened while the listeners were getting | ||
// established | ||
OpStream.prototype.pack = function(v, ops) { | ||
// If there's no ops to pack, we're good - just return the stream as-is. | ||
if (!ops.length) return; | ||
// Ok, so if there's anything in the stream right now, it might overlap with | ||
// the historical operations. We'll pump the reader and (probably!) prefix | ||
// it with the getOps result. | ||
var op; | ||
var queue = []; | ||
while (op = this.read()) { | ||
queue.push(op); | ||
} | ||
// First send all the operations between v and when we called getOps | ||
for (var i = 0; i < ops.length; i++) { | ||
op = ops[i]; | ||
var err = checkOpVersion(op, v); | ||
if (err) return this.push({error: err}); | ||
v++; | ||
// console.log("stream push from preloaded ops", op); | ||
this.push(op); | ||
} | ||
// Then all the ops between then and now.. | ||
for (i = 0; i < queue.length; i++) { | ||
op = queue[i]; | ||
if (op.v >= v) { | ||
var err = checkOpVersion(op, v); | ||
if (err) return this.push({error: err}); | ||
v++; | ||
// console.log("stream push from early stream", op); | ||
this.push(op); | ||
} | ||
} | ||
// if (queue.length || ops.length) console.log("Queue " + queue.length + " ops " + ops.length); | ||
this.v = v; | ||
}; | ||
function checkOpVersion(op, v) { | ||
if (op.v === v) return; | ||
return { | ||
code: 5000, | ||
message: 'subscribe stream.pack op version inconsistent', | ||
op: op, | ||
v: v | ||
}; | ||
} |
var OpStream = require('../op-stream'); | ||
var util = require('../util'); | ||
var UNIMPLEMENTED_MESSAGE = 'Required PubSub method unimplemented'; | ||
function PubSub(options) { | ||
@@ -30,12 +28,12 @@ this.prefix = options && options.prefix; | ||
PubSub.prototype._subscribe = function() { | ||
throw new Error(UNIMPLEMENTED_MESSAGE); | ||
PubSub.prototype._subscribe = function(channel, callback) { | ||
callback(new Error('_subscribe PubSub method unimplemented')); | ||
}; | ||
PubSub.prototype._unsubscribe = function() { | ||
throw new Error(UNIMPLEMENTED_MESSAGE); | ||
PubSub.prototype._unsubscribe = function(channel, callback) { | ||
callback(new Error('_unsubscribe PubSub method unimplemented')); | ||
}; | ||
PubSub.prototype._publish = function() { | ||
throw new Error(UNIMPLEMENTED_MESSAGE); | ||
PubSub.prototype._publish = function(channels, data, callback) { | ||
callback(new Error('_publish PubSub method unimplemented')); | ||
}; | ||
@@ -113,3 +111,5 @@ | ||
this._unsubscribe(channel); | ||
this._unsubscribe(channel, function(err) { | ||
if (err) throw err; | ||
}); | ||
}; |
@@ -5,3 +5,3 @@ var deepEquals = require('deep-is'); | ||
function QueryEmitter(request, stream, snapshots, extra) { | ||
function QueryEmitter(request, stream, ids, extra) { | ||
this.backend = request.backend; | ||
@@ -17,3 +17,3 @@ this.agent = request.agent; | ||
this.stream = stream; | ||
this.ids = pluckIds(snapshots); | ||
this.ids = ids; | ||
this.extra = extra; | ||
@@ -28,15 +28,14 @@ | ||
this._polling = false; | ||
this._pollAgain = false; | ||
this._pollTimeout = null; | ||
this._pendingPoll = null; | ||
this.startStream(); | ||
this.init(); | ||
} | ||
module.exports = QueryEmitter; | ||
QueryEmitter.prototype.destroy = function() { | ||
this.stream.destroy(); | ||
}; | ||
QueryEmitter.prototype.startStream = function() { | ||
QueryEmitter.prototype.init = function() { | ||
var emitter = this; | ||
this._defaultCallback = function(err) { | ||
if (err) emitter.onError(err); | ||
} | ||
function readStream() { | ||
@@ -46,7 +45,5 @@ var data; | ||
if (data.error) { | ||
console.error('Error in query op stream:', emitter.index, emitter.query); | ||
this.emitError(data.error); | ||
emitter.onError(data.error); | ||
continue; | ||
} | ||
emitter.emitOp(data); | ||
emitter.update(data); | ||
@@ -59,2 +56,6 @@ } | ||
QueryEmitter.prototype.destroy = function() { | ||
this.stream.destroy(); | ||
}; | ||
QueryEmitter.prototype._emitTiming = function(action, start) { | ||
@@ -66,9 +67,37 @@ this.backend.emit('timing', action, Date.now() - start, this.index, this.query); | ||
var id = op.d; | ||
// Ignore if the user or database say we don't need to poll | ||
// Check if the op's id matches the query before updating the query results | ||
// and send it through immediately if it does. The current snapshot | ||
// (including the op) for a newly matched document will get sent in the | ||
// insert diff, so we don't need to send the op that caused the doc to | ||
// match. If the doc already exists in the client and isn't otherwise | ||
// subscribed, the client will need to request the op when it receives the | ||
// snapshot from the query to bring itself up to date. | ||
// | ||
// The client may see the result of the op get reflected before the query | ||
// results update. This might prove janky in some cases, since a doc could | ||
// get deleted before it is removed from the results, for example. However, | ||
// it will mean that ops which don't end up changing the results are | ||
// received sooner even if query polling takes a while. | ||
// | ||
// Alternatively, we could send the op message only after the query has | ||
// updated, and it would perhaps be ideal to send in the same message to | ||
// avoid the user seeing transitional states where the doc is updated but | ||
// the results order is not. | ||
// | ||
// We should send the op even if it is the op that causes the document to no | ||
// longer match the query. If client-side filters are applied to the model | ||
// to figure out which documents to render in a list, we will want the op | ||
// that removed the doc from the query to cause the client-side computed | ||
// list to update. | ||
if (this.ids.indexOf(id) !== -1) { | ||
this.onOp(op); | ||
} | ||
// Ignore if the database or user function says we don't need to poll | ||
try { | ||
if (this.skipPoll(this.collection, id, op, this.query)) return; | ||
if (this.db.skipPoll(this.collection, id, op, this.query)) return; | ||
if (this.db.skipPoll(this.collection, id, op, this.query)) return this._defaultCallback(); | ||
if (this.skipPoll(this.collection, id, op, this.query)) return this._defaultCallback(); | ||
} catch (err) { | ||
console.error('Error evaluating skipPoll:', this.collection, id, op, this.query); | ||
return this.emitError(err); | ||
return this._defaultCallback(err); | ||
} | ||
@@ -78,7 +107,7 @@ if (this.canPollDoc) { | ||
// op has changed whether or not it matches the results | ||
this.queryPollDoc(id); | ||
this.queryPollDoc(id, this._defaultCallback); | ||
} else { | ||
// We need to do a full poll of the query, because the query uses limits, | ||
// sorts, or something special | ||
this.queryPoll(); | ||
this.queryPoll(this._defaultCallback); | ||
} | ||
@@ -89,12 +118,6 @@ }; | ||
if (this._polling || this._pollTimeout) return; | ||
if (this._pollAgain) this.queryPoll(); | ||
if (this._pendingPoll) this.queryPoll(); | ||
}; | ||
QueryEmitter.prototype._finishPoll = function(err) { | ||
this._polling = false; | ||
if (err) this.emitError(err); | ||
this._flushPoll(); | ||
}; | ||
QueryEmitter.prototype.queryPoll = function() { | ||
QueryEmitter.prototype.queryPoll = function(callback) { | ||
var emitter = this; | ||
@@ -106,3 +129,3 @@ | ||
// could end up with results in a funky order and the wrong results being | ||
// removed from the query. Second, only having one query executed | ||
// mutated in the query. Second, only having one query executed | ||
// simultaneously per emitter will act as a natural adaptive rate limiting | ||
@@ -113,10 +136,15 @@ // in case the db is under load. | ||
// on a given id and won't accidentally modify the wrong doc. Also, those | ||
// queries should be faster and we have to run all of them eventually, so | ||
// there is less benefit to load reduction. | ||
// queries should be faster and are less likely to be the same, so there is | ||
// less benefit to possible load reduction. | ||
if (this._polling || this._pollTimeout) { | ||
this._pollAgain = true; | ||
if (this._pendingPoll) { | ||
this._pendingPoll.push(callback); | ||
} else { | ||
this._pendingPoll = [callback]; | ||
} | ||
return; | ||
} | ||
this._polling = true; | ||
this._pollAgain = false; | ||
var pending = this._pendingPoll; | ||
this._pendingPoll = null; | ||
if (this.pollDebounce) { | ||
@@ -131,5 +159,11 @@ this._pollTimeout = setTimeout(function() { | ||
this.db.queryPoll(this.collection, this.query, this.options, function(err, ids, extra) { | ||
if (err) return emitter._finishPoll(err); | ||
if (err) return emitter._finishPoll(err, callback, pending); | ||
emitter._emitTiming('query.poll', start); | ||
// Be nice to not have to do this in such a brute force way | ||
if (!deepEquals(emitter.extra, extra)) { | ||
emitter.extra = extra; | ||
emitter.onExtra(extra); | ||
} | ||
var idsDiff = arraydiff(emitter.ids, ids); | ||
@@ -141,29 +175,37 @@ if (idsDiff.length) { | ||
emitter.db.getSnapshotBulk(emitter.collection, inserted, emitter.fields, function(err, snapshotMap) { | ||
if (err) return emitter._finishPoll(err); | ||
if (err) return emitter._finishPoll(err, callback, pending); | ||
emitter.backend._sanitizeSnapshotBulk(emitter.agent, emitter.snapshotProjection, emitter.collection, snapshotMap, function(err) { | ||
if (err) return emitter._finishPoll(err); | ||
if (err) return emitter._finishPoll(err, callback, pending); | ||
emitter._emitTiming('query.pollGetSnapshotBulk', start); | ||
var diff = mapDiff(idsDiff, snapshotMap); | ||
emitter.emitDiff(diff); | ||
emitter._finishPoll(); | ||
emitter.onDiff(diff); | ||
emitter._finishPoll(err, callback, pending); | ||
}); | ||
}); | ||
} else { | ||
emitter.emitDiff(idsDiff); | ||
emitter._finishPoll(); | ||
emitter.onDiff(idsDiff); | ||
emitter._finishPoll(err, callback, pending); | ||
} | ||
} else { | ||
emitter._finishPoll(err, callback, pending); | ||
} | ||
// Be nice to not have to do this in such a brute force way | ||
if (!deepEquals(emitter.extra, extra)) { | ||
emitter.extra = extra; | ||
emitter.emitExtra(extra); | ||
} | ||
}); | ||
}; | ||
QueryEmitter.prototype._finishPoll = function(err, callback, pending) { | ||
this._polling = false; | ||
if (callback) callback(err); | ||
if (pending) { | ||
for (var i = 0; i < pending.length; i++) { | ||
callback = pending[i]; | ||
if (callback) callback(err); | ||
} | ||
} | ||
this._flushPoll(); | ||
}; | ||
QueryEmitter.prototype.queryPollDoc = function(id) { | ||
QueryEmitter.prototype.queryPollDoc = function(id, callback) { | ||
var emitter = this; | ||
var start = Date.now(); | ||
this.db.queryPollDoc(this.collection, id, this.query, this.options, function(err, matches) { | ||
if (err) return emitter.emitError(err); | ||
if (err) return callback(err); | ||
emitter._emitTiming('query.pollDoc', start); | ||
@@ -181,44 +223,35 @@ | ||
emitter.db.getSnapshot(emitter.collection, id, emitter.fields, function(err, snapshot) { | ||
if (err) return emitter.emitError(err); | ||
if (err) return callback(err); | ||
emitter._emitTiming('query.pollDocGetSnapshot', start); | ||
var values = [snapshot]; | ||
emitter.emitDiff([new arraydiff.InsertDiff(index, values)]); | ||
emitter.onDiff([new arraydiff.InsertDiff(index, values)]); | ||
callback(); | ||
}); | ||
return; | ||
} | ||
} else if (i !== -1 && !matches) { | ||
if (i !== -1 && !matches) { | ||
emitter.ids.splice(i, 1); | ||
emitter.emitDiff([new arraydiff.RemoveDiff(i, 1)]); | ||
emitter.onDiff([new arraydiff.RemoveDiff(i, 1)]); | ||
return callback(); | ||
} | ||
callback(); | ||
}); | ||
}; | ||
// Emit functions are called in response to operation events | ||
QueryEmitter.prototype.emitError = function(err) { | ||
this.onError(err); | ||
// Clients must assign each of these functions syncronously after constructing | ||
// an instance of QueryEmitter. The instance is subscribed to an op stream at | ||
// construction time, and does not buffer emitted events. Diff events assume | ||
// all messages are received and applied in order, so it is critical that none | ||
// are dropped. | ||
QueryEmitter.prototype.onError = | ||
QueryEmitter.prototype.onDiff = | ||
QueryEmitter.prototype.onExtra = | ||
QueryEmitter.prototype.onOp = function() { | ||
// Silently ignore if the op stream was destroyed already | ||
if (!this.stream.open) return; | ||
throw new Error('Required QueryEmitter listener not assigned'); | ||
}; | ||
QueryEmitter.prototype.emitDiff = function(diff) { | ||
this.onDiff(diff); | ||
}; | ||
QueryEmitter.prototype.emitExtra = function(extra) { | ||
this.onExtra(extra); | ||
}; | ||
QueryEmitter.prototype.emitOp = function(op) { | ||
if (this.ids.indexOf(op.d) === -1) return; | ||
this.onOp(op); | ||
}; | ||
// Clients should define these functions | ||
QueryEmitter.prototype.onError = util.doNothing; | ||
QueryEmitter.prototype.onDiff = util.doNothing; | ||
QueryEmitter.prototype.onExtra = util.doNothing; | ||
QueryEmitter.prototype.onOp = util.doNothing; | ||
function pluckIds(snapshots) { | ||
var ids = []; | ||
for (var i = 0; i < snapshots.length; i++) { | ||
ids.push(snapshots[i].id); | ||
} | ||
return ids; | ||
} | ||
function getInserted(diff) { | ||
@@ -225,0 +258,0 @@ var inserted = []; |
{ | ||
"name": "sharedb", | ||
"version": "0.8.5", | ||
"version": "0.9.0", | ||
"description": "JSON OT database backend", | ||
@@ -15,3 +15,4 @@ "main": "lib/index.js", | ||
"expect.js": "^0.3.1", | ||
"mocha": "^2.3.3" | ||
"mocha": "^2.3.3", | ||
"sinon": "^1.17.2" | ||
}, | ||
@@ -18,0 +19,0 @@ "scripts": { |
253
test/db.js
@@ -20,2 +20,16 @@ var async = require('async'); | ||
// Simplified mock of how submit request applies operations. The | ||
// noteworthy dependency is that it always calls getSnapshot with | ||
// the 'submit' projection and applies the op to the returned | ||
// snapshot before committing. Thus, commit may rely on the behavior | ||
// of getSnapshot with this special projection | ||
function submit(db, collection, id, op, callback) { | ||
db.getSnapshot(collection, id, 'submit', function(err, snapshot) { | ||
if (err) return callback(err); | ||
var err = ot.apply(snapshot, op); | ||
if (err) return callback(err); | ||
db.commit(collection, id, op, snapshot, callback); | ||
}); | ||
} | ||
describe('commit', function() { | ||
@@ -25,16 +39,6 @@ function commitConcurrent(db, ops, test, done) { | ||
async.each(ops, function(op, eachCb) { | ||
// Simplified mock of how submit request applies operations. The | ||
// noteworthy dependency is that it always calls getSnapshot with | ||
// the 'submit' projection and applies the op to the returned | ||
// snapshot before committing. Thus, commit may rely on the behavior | ||
// of getSnapshot with this special projection | ||
db.getSnapshot('testcollection', 'foo', 'submit', function(err, snapshot) { | ||
submit(db, 'testcollection', 'foo', op, function(err, succeeded) { | ||
if (err) return eachCb(err); | ||
var err = ot.apply(snapshot, op); | ||
if (err) return eachCb(err); | ||
db.commit('testcollection', 'foo', op, snapshot, function(err, succeeded) { | ||
if (err) return eachCb(err); | ||
if (succeeded) numSucceeded++; | ||
eachCb(); | ||
}); | ||
if (succeeded) numSucceeded++; | ||
eachCb(); | ||
}); | ||
@@ -181,2 +185,202 @@ }, function(err) { | ||
describe('getSnapshot', function() { | ||
it('getSnapshot returns v0 snapshot', function(done) { | ||
this.db.getSnapshot('testcollection', 'test', null, function(err, result) { | ||
if (err) throw err; | ||
expect(result).eql({id: 'test', type: null, v: 0, data: null}); | ||
done(); | ||
}); | ||
}); | ||
it('getSnapshot returns committed data', function(done) { | ||
var data = {x: 5, y: 6}; | ||
var op = {v: 0, create: {type: 'json0', data: data}}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getSnapshot('testcollection', 'test', null, function(err, result) { | ||
if (err) throw err; | ||
expect(result).eql({id: 'test', type: 'http://sharejs.org/types/JSONv0', v: 1, data: data}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe('getSnapshotBulk', function() { | ||
it('getSnapshotBulk returns committed and v0 snapshots', function(done) { | ||
var data = {x: 5, y: 6}; | ||
var op = {v: 0, create: {type: 'json0', data: data}}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getSnapshotBulk('testcollection', ['test2', 'test'], null, function(err, resultMap) { | ||
if (err) throw err; | ||
expect(resultMap).eql({ | ||
test: {id: 'test', type: 'http://sharejs.org/types/JSONv0', v: 1, data: data}, | ||
test2: {id: 'test2', type: null, v: 0, data: null} | ||
}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe('getOps', function() { | ||
it('getOps returns 1 committed op', function(done) { | ||
var op = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getOps('testcollection', 'test', 0, null, function(err, ops) { | ||
if (err) throw err; | ||
expect(ops).eql([op]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('getOps returns 2 committed ops', function(done) { | ||
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}}; | ||
var op1 = {v: 1, op: [{p: ['x'], na: 1}]}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op0, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test', op1, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getOps('testcollection', 'test', 0, null, function(err, ops) { | ||
if (err) throw err; | ||
expect(ops).eql([op0, op1]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('getOps returns from specific op number', function(done) { | ||
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}}; | ||
var op1 = {v: 1, op: [{p: ['x'], na: 1}]}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op0, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test', op1, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getOps('testcollection', 'test', 1, null, function(err, ops) { | ||
if (err) throw err; | ||
expect(ops).eql([op1]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('getOps returns to specific op number', function(done) { | ||
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}}; | ||
var op1 = {v: 1, op: [{p: ['x'], na: 1}]}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op0, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test', op1, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getOps('testcollection', 'test', 0, 1, function(err, ops) { | ||
if (err) throw err; | ||
expect(ops).eql([op0]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe('getOpsBulk', function() { | ||
it('getOpsBulk returns committed ops', function(done) { | ||
var op = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test2', op, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getOpsBulk('testcollection', {test: 0, test2: 0}, null, function(err, opsMap) { | ||
if (err) throw err; | ||
expect(opsMap).eql({ | ||
test: [op], | ||
test2: [op] | ||
}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('getOpsBulk returns committed ops with specific froms', function(done) { | ||
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}}; | ||
var op1 = {v: 1, op: [{p: ['x'], na: 1}]}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op0, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test2', op0, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test', op1, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test2', op1, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getOpsBulk('testcollection', {test: 0, test2: 1}, null, function(err, opsMap) { | ||
if (err) throw err; | ||
expect(opsMap).eql({ | ||
test: [op0, op1], | ||
test2: [op1] | ||
}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('getOpsBulk returns committed ops with specific to', function(done) { | ||
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}}; | ||
var op1 = {v: 1, op: [{p: ['x'], na: 1}]}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op0, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test2', op0, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test', op1, function(err, succeeded) { | ||
if (err) throw err; | ||
submit(db, 'testcollection', 'test2', op1, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getOpsBulk('testcollection', {test: 1, test2: 0}, {test2: 1}, function(err, opsMap) { | ||
if (err) throw err; | ||
expect(opsMap).eql({ | ||
test: [op1], | ||
test2: [op0] | ||
}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe('getOpsToSnapshot', function() { | ||
it('getOpsToSnapshot returns committed op', function(done) { | ||
var op = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}}; | ||
var db = this.db; | ||
submit(db, 'testcollection', 'test', op, function(err, succeeded) { | ||
if (err) throw err; | ||
db.getSnapshot('testcollection', 'test', 'submit', function(err, snapshot) { | ||
if (err) throw err; | ||
db.getOpsToSnapshot('testcollection', 'test', 0, snapshot, function(err, ops) { | ||
if (err) throw err; | ||
expect(ops).eql([op]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe('query', function() { | ||
@@ -236,2 +440,25 @@ it('returns data in the collection', function(done) { | ||
describe('queryPoll', function() { | ||
it('returns data in the collection', function(done) { | ||
var snapshot = {v: 1, type: 'json0', data: {x: 5, y: 6}}; | ||
var db = this.db; | ||
db.commit('testcollection', 'test', {v: 0, create: {}}, snapshot, function(err, succeeded) { | ||
if (err) throw err; | ||
db.queryPoll('testcollection', {x: 5}, null, function(err, ids) { | ||
if (err) throw err; | ||
expect(ids).eql(['test']); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('returns nothing when there is no data', function(done) { | ||
this.db.queryPoll('testcollection', {x: 5}, null, function(err, ids) { | ||
if (err) throw err; | ||
expect(ids).eql([]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
describe('queryPollDoc', function() { | ||
@@ -238,0 +465,0 @@ it('returns false when the document does not exist', function(done) { |
var MemoryPubSub = require('../lib/pubsub/memory'); | ||
var PubSub = require('../lib/pubsub'); | ||
var expect = require('expect.js'); | ||
@@ -6,1 +8,33 @@ require('./pubsub')(function(callback) { | ||
}); | ||
require('./pubsub')(function(callback) { | ||
callback(null, MemoryPubSub({prefix: 'foo'})); | ||
}); | ||
describe('PubSub base class', function() { | ||
it('returns an error if _subscribe is unimplemented', function() { | ||
var pubsub = new PubSub(); | ||
pubsub.subscribe('x', function(err) { | ||
expect(err).an(Error); | ||
}); | ||
}); | ||
it('throws an error if _unsubscribe is unimplemented', function() { | ||
var pubsub = new PubSub(); | ||
pubsub._subscribe = function(channel, callback) { | ||
callback(); | ||
}; | ||
pubsub.subscribe('x', function(err, stream) { | ||
if (err) throw err; | ||
expect(function() { | ||
stream.destroy(); | ||
}).throwException(); | ||
}); | ||
}); | ||
it('returns an error if _publish is unimplemented', function() { | ||
var pubsub = new PubSub(); | ||
pubsub.publish(['x', 'y'], {test: true}, function(err) { | ||
expect(err).an(Error); | ||
}); | ||
}); | ||
}); |
@@ -26,6 +26,39 @@ var expect = require('expect.js'); | ||
}); | ||
pubsub.publish(['x', 'y'], {test: true}); | ||
expect(pubsub.streamsCount).equal(1); | ||
pubsub.publish(['x'], {test: true}); | ||
}); | ||
}); | ||
it('publish optional callback waits', function(done) { | ||
var pubsub = this.pubsub; | ||
pubsub.subscribe('x', function(err, stream) { | ||
if (err) throw err; | ||
var emitted; | ||
stream.on('data', function(data) { | ||
emitted = data; | ||
}); | ||
pubsub.publish(['x'], {test: true}, function(err) { | ||
if (err) throw err; | ||
expect(emitted).eql({test: true}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('can subscribe to a channel twice', function(done) { | ||
var pubsub = this.pubsub; | ||
pubsub.subscribe('y', function(err, stream) { | ||
pubsub.subscribe('y', function(err, stream) { | ||
if (err) throw err; | ||
var emitted; | ||
stream.on('data', function(data) { | ||
expect(data).eql({test: true}); | ||
done(); | ||
}); | ||
expect(pubsub.streamsCount).equal(2); | ||
pubsub.publish(['x', 'y'], {test: true}); | ||
}); | ||
}); | ||
}); | ||
it('stream.destroy() unsubscribes from a channel', function(done) { | ||
@@ -32,0 +65,0 @@ var pubsub = this.pubsub; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
260733
47
4405
3
1