Comparing version 0.8.0 to 0.8.1
254
lib/agent.js
@@ -24,12 +24,7 @@ // This implements the network API for ShareJS. | ||
var async = require('async'); | ||
var hat = require('hat'); | ||
var util = require('./util'); | ||
// stream is a nodejs 0.10 stream object. | ||
/** | ||
* @param {Shareshare} share | ||
* @param {Duplex} stream | ||
* @param {Http.Request} req | ||
*/ | ||
/** | ||
* Agent deserializes the wire protocol messages received from the stream and | ||
@@ -40,8 +35,8 @@ * calls the corresponding functions on its Agent. It uses the return values | ||
* | ||
* @param {Shareshare} share | ||
* @param {Backend} backend | ||
* @param {Duplex} stream connection to a client | ||
*/ | ||
function Agent(share, stream) { | ||
function Agent(backend, stream) { | ||
// The stream passed in should be a nodejs 0.10-style stream. | ||
this.share = share; | ||
this.backend = backend; | ||
this.stream = stream; | ||
@@ -53,6 +48,6 @@ | ||
// We need to track which documents are subscribed by the client. This is a | ||
// map of collection name -> {id: stream || true || false} | ||
// map of collection -> id -> stream | ||
this.subscribedDocs = {}; | ||
// Map from query ID -> emitter. | ||
// Map from queryId -> emitter | ||
this.subscribedQueries = {}; | ||
@@ -99,4 +94,5 @@ | ||
for (var collection in this.subscribedDocs) { | ||
for (var id in this.subscribedDocs[collection]) { | ||
var stream = this.subscribedDocs[collection][id]; | ||
var docs = this.subscribedDocs[collection]; | ||
for (var id in docs) { | ||
var stream = docs[id]; | ||
stream.destroy(); | ||
@@ -121,21 +117,29 @@ } | ||
Agent.prototype._subscribeToStream = function(collection, id, stream) { | ||
var docs = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = {}); | ||
if (this.closed) return stream.destroy(); | ||
var streams = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = {}); | ||
// If already subscribed to this document, destroy the previously subscribed stream | ||
var previous = docs[id]; | ||
var previous = streams[id]; | ||
if (previous) previous.destroy(); | ||
if (this.closed) return stream.destroy(); | ||
docs[id] = stream; | ||
streams[id] = stream; | ||
var agent = this; | ||
stream.on('data', onData); | ||
function onData(data) { | ||
stream.on('data', function(data) { | ||
if (data.error) { | ||
// Log then silently ignore errors in a subscription stream, since these | ||
// may not be the client's fault, and they were not the result of a | ||
// direct request by the client | ||
console.error('Doc subscription stream error', collection, id, data.error); | ||
return; | ||
} | ||
if (agent._isOwnOp(collection, data)) return; | ||
agent._sendOp(collection, id, data); | ||
} | ||
}); | ||
stream.on('end', function() { | ||
// Livedb has closed the op stream, so release its reference | ||
var docs = agent.subscribedDocs[collection]; | ||
if (!docs) return; | ||
delete docs[id]; | ||
if (util.hasKeys(docs)) return; | ||
var streams = agent.subscribedDocs[collection]; | ||
if (!streams) return; | ||
delete streams[id]; | ||
if (util.hasKeys(streams)) return; | ||
delete agent.subscribedDocs[collection]; | ||
@@ -146,5 +150,6 @@ }); | ||
Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) { | ||
if (this.closed) return emitter.destroy(); | ||
var previous = this.subscribedQueries[queryId]; | ||
if (previous) previous.destroy(); | ||
if (this.closed) return emitter.destroy(); | ||
this.subscribedQueries[queryId] = emitter; | ||
@@ -170,11 +175,11 @@ | ||
emitter.onError = function(err) { | ||
// Should we destroy the emitter here? | ||
agent._send({a: 'q', id: queryId, error: err}); | ||
console.warn('Query ' + collection + '.' + JSON.stringify(query) + ' emitted an error:', err); | ||
emitter.destroy(); | ||
delete agent.subscribedQueries[queryId]; | ||
// Log then silently ignore errors in a subscription stream, since these | ||
// may not be the client's fault, and they were not the result of a | ||
// direct request by the client | ||
console.error('Query subscription stream error', collection, query, err); | ||
}; | ||
emitter.onOp = function(op) { | ||
var id = op.id; | ||
var id = op.d; | ||
if (agent._isOwnOp(collection, op)) return; | ||
agent._sendOp(collection, id, op); | ||
@@ -184,3 +189,9 @@ }; | ||
// Send a message to the remote client. | ||
Agent.prototype._isOwnOp = function(collection, op) { | ||
// Detect ops from this client on the same projection. Since the client sent | ||
// these in, the submit reply will be sufficient and we can silently ignore | ||
// them in the streams for subscribed documents or queries | ||
return this.clientId === op.src && collection === (op.i || op.c); | ||
}; | ||
Agent.prototype._send = function(msg) { | ||
@@ -193,3 +204,3 @@ // Quietly drop replies if the stream was closed | ||
Agent.prototype._sendOp = function(collection, id, data) { | ||
Agent.prototype._sendOp = function(collection, id, op) { | ||
var msg = { | ||
@@ -199,18 +210,10 @@ a: 'op', | ||
d: id, | ||
v: data.v, | ||
src: data.src, | ||
seq: data.seq | ||
v: op.v, | ||
src: op.src, | ||
seq: op.seq | ||
}; | ||
if (op.op) msg.op = op.op; | ||
if (op.create) msg.create = op.create; | ||
if (op.del) msg.del = true; | ||
// In theory, we only need to send the operation data if data.src !== | ||
// this.clientId. However, this doesn't work with projections because | ||
// the client needs to see their own operations in the projected collection. | ||
// | ||
// I'd like to reinstate this optimization, but I can't think of a good way to | ||
// do it while making projections work. For now, you get your own operations | ||
// back. | ||
if (data.op) msg.op = data.op; | ||
if (data.create) msg.create = data.create; | ||
if (data.del) msg.del = true; | ||
this._send(msg); | ||
@@ -220,10 +223,7 @@ }; | ||
Agent.prototype._reply = function(req, err, msg) { | ||
if (err) { | ||
msg = {a:req.a, error:err}; | ||
} else { | ||
if (!msg.a) msg.a = req.a; | ||
} | ||
var msg = (err) ? {error: err} : msg || {}; | ||
if (req.c) msg.c = req.c; // collection | ||
if (req.d) msg.d = req.d; // id | ||
msg.a = req.a; | ||
if (req.c) msg.c = req.c; | ||
if (req.d) msg.d = req.d; | ||
if (req.id) msg.id = req.id; | ||
@@ -241,22 +241,3 @@ | ||
var agent = this; | ||
if (req != null) { | ||
if (typeof req === 'string') { | ||
try { | ||
req = JSON.parse(req); | ||
} catch(e) { | ||
console.warn('Client sent invalid JSON', e.stack); | ||
agent.close(e); | ||
} | ||
} | ||
this._handleMessage(req, function(err, msg) { | ||
if (err || msg) agent._reply(req, err, msg); | ||
// This is in a process.nextTick to avoid stack smashing attacks (since | ||
// sometimes this callback function is called synchronously). | ||
process.nextTick(function() { | ||
agent.pump(); | ||
}); | ||
}); | ||
} else { | ||
if (req == null) { | ||
// Retry when there's a message waiting for us. | ||
@@ -266,3 +247,17 @@ this.stream.once('readable', function() { | ||
}); | ||
return; | ||
} | ||
if (typeof req === 'string') { | ||
try { | ||
req = JSON.parse(req); | ||
} catch(e) { | ||
console.warn('Client sent invalid JSON', e.stack); | ||
this.close(e); | ||
} | ||
} | ||
this._handleMessage(req); | ||
// Clean up the stack then read the next message | ||
process.nextTick(function() { | ||
agent.pump(); | ||
}); | ||
}; | ||
@@ -292,9 +287,11 @@ | ||
// Handle an incoming message from the client | ||
Agent.prototype._handleMessage = function(req, callback) { | ||
var err = this._checkRequest(req); | ||
if (err) { | ||
console.warn('Warning: Invalid request from ', this.clientId, req, 'Error: ', err); | ||
return callback(err); | ||
} | ||
Agent.prototype._handleMessage = function(req) { | ||
var agent = this; | ||
var callback = function(err, message) { | ||
agent._reply(req, err, message); | ||
}; | ||
var errMessage = this._checkRequest(req); | ||
if (errMessage) return callback({code: 4000, message: errMessage}); | ||
switch (req.a) { | ||
@@ -318,4 +315,6 @@ case 'qsub': | ||
default: | ||
console.warn('invalid message', req); | ||
callback('invalid or unknown message'); | ||
callback({ | ||
code: 4000, | ||
message: 'Invalid or unknown message' | ||
}); | ||
} | ||
@@ -325,10 +324,6 @@ }; | ||
function getQueryOptions(req) { | ||
var options = {}; | ||
if (req.o) { | ||
// The client tells us what versions it already has | ||
options.versions = req.o.vs; | ||
// Set the DB name for the request (useful if you have alternate databases) | ||
options.db = req.o.b; | ||
} | ||
return options; | ||
return { | ||
versions: req.vs, | ||
db: req.db | ||
}; | ||
} | ||
@@ -338,9 +333,9 @@ | ||
// Subscribe to a query. The client is sent the query results and its | ||
// notified whenever there's a change. | ||
// notified whenever there's a change | ||
var queryId = req.id; | ||
var collection = req.collection; | ||
var collection = req.c; | ||
var query = req.q; | ||
var options = getQueryOptions(req); | ||
var agent = this; | ||
this.share.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) { | ||
this.backend.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) { | ||
if (err) return callback(err); | ||
@@ -354,6 +349,6 @@ agent._subscribeToQuery(emitter, queryId, collection, query); | ||
var queryId = req.id; | ||
var emitter = agent.subscribedQueries[queryId]; | ||
var emitter = this.subscribedQueries[queryId]; | ||
if (emitter) { | ||
emitter.destroy(); | ||
delete agent.subscribedQueries[queryId]; | ||
delete this.subscribedQueries[queryId]; | ||
} | ||
@@ -364,9 +359,9 @@ process.nextTick(callback); | ||
Agent.prototype._queryFetch = function(req, callback) { | ||
// Fetch the results of a query. This does not subscribe to the query or | ||
// anything, its just a once-off query fetch. | ||
// Fetch the results of a query once | ||
var queryId = req.id; | ||
var collection = req.collection; | ||
var collection = req.c; | ||
var query = req.q; | ||
var options = getQueryOptions(req); | ||
this.share.queryFetch(this, collection, query, options, function(err, results, extra) { | ||
var agent = this; | ||
this.backend.queryFetch(this, collection, query, options, function(err, results, extra) { | ||
if (err) return callback(err); | ||
@@ -378,9 +373,9 @@ agent._sendQueryResults(queryId, collection, options, results, extra, callback); | ||
Agent.prototype._sendQueryResults = function(queryId, collection, options, results, extra, callback) { | ||
var versions = options.versions && options.versions[collection]; | ||
var versions = options.versions; | ||
var data = getResultsData(results, versions); | ||
var res = {id: queryId, data: data, extra: extra}; | ||
var opsRequest = getResultsOpsRequest(results, versions); | ||
if (!opsRequest) callback(null, res); | ||
if (!opsRequest) return callback(null, res); | ||
var agent = this; | ||
this.share.getOpsBulk(this, collection, opsRequest, null, function(err, results) { | ||
this.backend.getOpsBulk(this, collection, opsRequest, null, function(err, results) { | ||
if (err) return callback(err); | ||
@@ -405,3 +400,3 @@ for (var id in results) { | ||
} | ||
if (!versions || versions[item.id] == null) { | ||
if (!versions || versions[result.id] == null) { | ||
item.data = result.data; | ||
@@ -414,3 +409,4 @@ } | ||
function getResultsOpsRequest(results, versions) { | ||
var request = null; | ||
if (!versions) return; | ||
var request; | ||
for (var i = 0; i < results.length; i++) { | ||
@@ -431,7 +427,7 @@ var result = results[i]; | ||
var request = req.s; | ||
var agent = this; | ||
var response = {}; | ||
var agent = this; | ||
async.forEachOf(request, function(versions, collection, eachCb) { | ||
agent.share.subscribeBulk(agent, collection, versions, function(err, streams, snapshotMap) { | ||
agent.backend.subscribeBulk(agent, collection, versions, function(err, streams, snapshotMap) { | ||
if (err) return eachCb(err); | ||
@@ -449,6 +445,6 @@ for (var id in streams) { | ||
// Close any streams we may have already subscribed before erroring to | ||
// avoid leaking memory if earlier calls to share.subscribeBulk succeed | ||
// avoid leaking memory if earlier calls to backend.subscribeBulk succeed | ||
// and others fail | ||
for (var collection in request) { | ||
var docs = this.subscribedDocs[collection]; | ||
var docs = agent.subscribedDocs[collection]; | ||
if (!docs) continue; | ||
@@ -468,3 +464,2 @@ for (var id in request[collection]) { | ||
// Subscribe to a document | ||
var agent = this; | ||
var collection = req.c; | ||
@@ -476,3 +471,4 @@ var id = req.d; | ||
// since the specified version | ||
this.share.subscribe(this, collection, id, version, function(err, stream, data) { | ||
var agent = this; | ||
this.backend.subscribe(this, collection, id, version, function(err, stream, data) { | ||
if (err) return callback(err); | ||
@@ -502,3 +498,2 @@ agent._subscribeToStream(collection, id, stream); | ||
Agent.prototype._fetch = function(req, callback) { | ||
var agent = this; | ||
var collection = req.c; | ||
@@ -509,3 +504,3 @@ var id = req.d; | ||
// Fetch a snapshot | ||
this.share.fetch(this, collection, id, function(err, data) { | ||
this.backend.fetch(this, collection, id, function(err, data) { | ||
if (err) return callback(err); | ||
@@ -517,3 +512,4 @@ callback(null, {data: data}); | ||
// actually wants me to fetch some ops | ||
this.share.getOps(this, collection, id, version, null, function(err, results) { | ||
var agent = this; | ||
this.backend.getOps(this, collection, id, version, null, function(err, results) { | ||
if (err) return callback(err); | ||
@@ -529,17 +525,15 @@ for (var i = 0; i < results.length; i++) { | ||
Agent.prototype._submit = function(req, callback) { | ||
var agent = this; | ||
var collection = req.c; | ||
var id = req.d; | ||
var op = this._createOp(req); | ||
this.share.submit(this, collection, id, op, function(err, ops) { | ||
// Occassional 'Op already submitted' errors are expected to happen | ||
// as part of normal operation, since inflight ops need to be resent | ||
// after disconnect | ||
var agent = this; | ||
this.backend.submit(this, collection, id, op, function(err, ops) { | ||
// Message to acknowledge the op was successfully submitted | ||
var ack = {src: op.src, seq: op.seq, v: op.v}; | ||
if (err) { | ||
if (err === 'Op already submitted') { | ||
agent._sendOp(collection, id, op); | ||
} | ||
console.error('Op error:', err, collection, id, op); | ||
callback(null, {a: 'ack', error: err}); | ||
return; | ||
// Occassional 'Op already submitted' errors are expected to happen as | ||
// part of normal operation, since inflight ops need to be resent after | ||
// disconnect. In this case, ack the op so the client can proceed | ||
if (err.code === 4001) return callback(null, ack); | ||
return callback(err); | ||
} | ||
@@ -551,9 +545,7 @@ | ||
} | ||
// Luckily, the op is transformed & etc in place. | ||
agent._sendOp(collection, id, op); | ||
callback(); | ||
callback(null, ack); | ||
}); | ||
}; | ||
function CreateOp(src, seq, v, m, create) { | ||
function CreateOp(src, seq, v, create) { | ||
this.src = src; | ||
@@ -565,3 +557,3 @@ this.seq = seq; | ||
} | ||
function EditOp(src, seq, v, m, op) { | ||
function EditOp(src, seq, v, op) { | ||
this.src = src; | ||
@@ -573,3 +565,3 @@ this.seq = seq; | ||
} | ||
function DeleteOp(src, seq, v, m, del) { | ||
function DeleteOp(src, seq, v, del) { | ||
this.src = src; | ||
@@ -576,0 +568,0 @@ this.seq = seq; |
@@ -462,3 +462,3 @@ var Doc = require('./doc'); | ||
// | ||
// The index is specific to the source, but if you're using mongodb it'll be | ||
// The index is specific to the db, but if you're using mongodb it'll be | ||
// the collection to which the query is made. | ||
@@ -465,0 +465,0 @@ // The callback should have the signature function(error, results, extraData) |
@@ -11,3 +11,2 @@ var types = require('../types').map; | ||
* | ||
* | ||
* Subscriptions | ||
@@ -32,8 +31,3 @@ * ------------- | ||
* | ||
* TODO What happens when the document does not exist yet. | ||
* | ||
* | ||
* | ||
* | ||
* | ||
* Events | ||
@@ -43,3 +37,3 @@ * ------ | ||
* You can use doc.on(eventName, callback) to subscribe to the following events: | ||
* - `before op (op, localContext)` Fired before an operation is applied to the | ||
* - `before op (op)` Fired before an operation is applied to the | ||
* snapshot. The document is already in locked state, so it is not allowed to | ||
@@ -49,5 +43,5 @@ * submit further operations. It may be used to read the old snapshot just | ||
* operation originated locally and `false` otherwise | ||
* - `after op (op, localContext)` Fired after an operation has been applied to | ||
* - `after op (op)` Fired after an operation has been applied to | ||
* the snapshot. The arguments are the same as for `before op` | ||
* - `op (op, localContext)` The same as `after op` unless incremental updates | ||
* - `op (op)` The same as `after op` unless incremental updates | ||
* are enabled. In this case it is fired after every partial operation with | ||
@@ -57,5 +51,5 @@ * this operation as the first argument. When fired the document is in a | ||
* - `subscribed (error)` The document was subscribed | ||
* - `created (localContext)` The document was created. That means its type was | ||
* - `created ()` The document was created. That means its type was | ||
* set and it has some initial data. | ||
* - `del (localContext, snapshot)` Fired after the document is deleted, that is | ||
* - `del (snapshot)` Fired after the document is deleted, that is | ||
* the snapshot is null. It is passed the snapshot before delteion as an | ||
@@ -66,2 +60,3 @@ * arguments | ||
*/ | ||
module.exports = Doc; | ||
@@ -169,3 +164,2 @@ function Doc(connection, collection, name) { | ||
} | ||
this.removeContexts(); | ||
@@ -263,7 +257,5 @@ // Set the new type | ||
Doc.prototype._onMessage = function(msg) { | ||
if (!(msg.c === this.collection && msg.d === this.name)) { | ||
if (msg.c !== this.collection || msg.d !== this.name) { | ||
// This should never happen - its a sanity check for bugs in the connection code. | ||
var err = 'Got message for wrong document.'; | ||
console.error(err, this.collection, this.name, msg); | ||
throw new Error(err); | ||
throw new Error('Got message for wrong document'); | ||
} | ||
@@ -279,3 +271,3 @@ | ||
this._finishSub(msg.error); | ||
break; | ||
return; | ||
@@ -285,3 +277,3 @@ case 'sub': | ||
this._handleSubscribe(msg.error, msg.data); | ||
break; | ||
return; | ||
@@ -295,11 +287,6 @@ case 'unsub': | ||
this._finishSub(msg.error); | ||
break; | ||
return; | ||
case 'ack': | ||
// Acknowledge a locally submitted operation. | ||
// | ||
// Usually we do nothing here - all the interesting logic happens when we | ||
// get sent our op back in the op stream (which happens even if we aren't | ||
// subscribed) | ||
if (msg.error && msg.error !== 'Op already submitted') { | ||
case 'op': | ||
if (msg.error) { | ||
// The server has rejected an op from the client for an unexpected reason. | ||
@@ -316,6 +303,5 @@ // We'll send the error message to the user and try to roll back the change. | ||
} | ||
return; | ||
} | ||
break; | ||
case 'op': | ||
if (this.inflightData && | ||
@@ -327,3 +313,3 @@ msg.src === this.inflightData.src && | ||
this._opAcknowledged(msg); | ||
break; | ||
return; | ||
} | ||
@@ -339,3 +325,3 @@ | ||
this._getLatestOps(); | ||
break; | ||
return; | ||
} | ||
@@ -358,29 +344,20 @@ | ||
// In this case, we can safely ignore the old (duplicate) operation. | ||
break; | ||
return; | ||
} | ||
this._transformPendingOps(msg); | ||
if (this.inflightData) xf(this.inflightData, msg); | ||
for (var i = 0; i < this.pendingData.length; i++) { | ||
xf(this.pendingData[i], msg); | ||
} | ||
this.version++; | ||
this._otApply(msg); | ||
break; | ||
this._otApply(msg, false); | ||
return; | ||
case 'meta': | ||
console.warn('Unhandled meta op:', msg); | ||
break; | ||
default: | ||
console.warn('Unhandled document message:', msg); | ||
break; | ||
} | ||
}; | ||
Doc.prototype._transformPendingOps = function(op) { | ||
if (this.inflightData) { | ||
xf(this.inflightData, op); | ||
} | ||
for (var i = 0; i < this.pendingData.length; i++) { | ||
xf(this.pendingData[i], op); | ||
} | ||
}; | ||
Doc.prototype._getLatestOps = function() { | ||
@@ -606,3 +583,3 @@ var doc = this; | ||
*/ | ||
Doc.prototype._otApply = function(op) { | ||
Doc.prototype._otApply = function(op, context) { | ||
this.locked = true; | ||
@@ -619,3 +596,3 @@ | ||
this.once('unlock', function() { | ||
this.emit('create'); | ||
this.emit('create', context); | ||
}); | ||
@@ -627,3 +604,3 @@ } else if (op.del) { | ||
this.once('unlock', function() { | ||
this.emit('del', oldSnapshot); | ||
this.emit('del', oldSnapshot, context); | ||
}); | ||
@@ -634,3 +611,3 @@ } else if (op.op) { | ||
this.emit('before op', op.op); | ||
this.emit('before op', op.op, context); | ||
@@ -648,3 +625,3 @@ // This exists so clients can pull any necessary data out of the snapshot | ||
doc.snapshot = snapshot; | ||
doc.emit('op', component); | ||
doc.emit('op', component, context); | ||
}); | ||
@@ -654,3 +631,3 @@ } else { | ||
this.snapshot = type.apply(this.snapshot, op.op); | ||
this.emit('op', op.op); | ||
this.emit('op', op.op, context); | ||
} | ||
@@ -666,3 +643,3 @@ } | ||
if (op.op) { | ||
return this.emit('after op', op.op); | ||
this.emit('after op', op.op); | ||
} | ||
@@ -732,7 +709,14 @@ }; | ||
// @param [callback] called when operation is submitted | ||
Doc.prototype._submitOp = function(op, callback) { | ||
Doc.prototype._submitOp = function(op, context, callback) { | ||
if (typeof context === 'function') { | ||
callback = context; | ||
context = true; // The default context is true | ||
} else if (context == null) { | ||
context = true; | ||
} | ||
if (this.locked) { | ||
var err = 'Cannot call submitOp from inside an op event handler. ' + this.collection + ' ' + this.name; | ||
var err = new Error('Cannot call submitOp from inside an op event handler. ' + this.collection + ' ' + this.name); | ||
if (callback) return callback(err); | ||
throw new Error(err); | ||
throw err; | ||
} | ||
@@ -743,5 +727,5 @@ | ||
if (!this.type) { | ||
var err = 'Document has not been created'; | ||
var err = new Error('Document has not been created'); | ||
if (callback) return callback(err); | ||
throw new Error(err); | ||
throw err; | ||
} | ||
@@ -772,3 +756,3 @@ // Try to normalize the op. This removes trailing skip:0's and things like that. | ||
this._otApply(op); | ||
this._otApply(op, context); | ||
@@ -793,4 +777,4 @@ // The call to flush is in a timeout so if submitOp() is called multiple | ||
// @fires before op, op, after op | ||
Doc.prototype.submitOp = function(op, callback) { | ||
this._submitOp({op: op}, callback); | ||
Doc.prototype.submitOp = function(op, context, callback) { | ||
this._submitOp({op: op}, context, callback); | ||
}; | ||
@@ -806,11 +790,11 @@ | ||
// @param callback called when operation submitted | ||
Doc.prototype.create = function(type, data, callback) { | ||
Doc.prototype.create = function(type, data, context, callback) { | ||
if (this.type) { | ||
var err = 'Document already exists'; | ||
var err = new Error('Document already exists'); | ||
if (callback) return callback(err); | ||
throw new Error(err); | ||
throw err; | ||
} | ||
var op = {create: {type:type, data:data}}; | ||
this._submitOp(op, callback); | ||
this._submitOp(op, context, callback); | ||
}; | ||
@@ -824,10 +808,10 @@ | ||
// @param callback called when operation submitted | ||
Doc.prototype.del = function(callback) { | ||
Doc.prototype.del = function(context, callback) { | ||
if (!this.type) { | ||
var err = 'Document does not exist'; | ||
var err = new Error('Document does not exist'); | ||
if (callback) return callback(err); | ||
throw new Error(err); | ||
throw err; | ||
} | ||
this._submitOp({del: true}, callback); | ||
this._submitOp({del: true}, context, callback); | ||
}; | ||
@@ -883,3 +867,3 @@ | ||
// by the server, the editor window should update to reflect the undo. | ||
this._otApply(op); | ||
this._otApply(op, false); | ||
} else if (op.op || op.del) { | ||
@@ -915,5 +899,5 @@ // This is where an undo stack would come in handy. | ||
if (!this.state) { | ||
throw new Error('opAcknowledged called from a null state. This should never happen. ' + this.collection + ' ' + this.name); | ||
throw new Error('opAcknowledged called from a null state. This should never happen. ' + this.collection + '.' + this.name); | ||
} else if (this.state === 'floating') { | ||
if (!this.inflightData.create) throw new Error('Cannot acknowledge an op. ' + this.collection + ' ' + this.name); | ||
if (!this.inflightData.create) throw new Error('Cannot acknowledge an op. ' + this.collection + '.' + this.name); | ||
@@ -934,3 +918,4 @@ // Our create has been acknowledged. This is the same as ingesting some data. | ||
if (msg.v !== this.version) { | ||
throw new Error('Invalid version from server. This can happen when you submit ops in a submitOp callback. Expected: ' + this.version + ' Message version: ' + msg.v + ' ' + this.collection + ' ' + this.name); | ||
throw new Error('Invalid version from server. This can happen when you submit ops in a submitOp callback. ' + | ||
'Expected: ' + this.version + ' Message version: ' + msg.v + ' ' + this.collection + '.' + this.name); | ||
} | ||
@@ -937,0 +922,0 @@ } |
@@ -21,8 +21,2 @@ var emitter = require('../emitter'); | ||
// Do we repoll the entire query whenever anything changes? (As opposed to | ||
// just polling the changed item). This needs to be enabled to be able to use | ||
// ordered queries (sortby:) and paginated queries. Set to undefined, it will | ||
// be enabled / disabled automatically based on the query's properties. | ||
this.poll = options.poll; | ||
// The db we actually hit. If this isn't defined, it hits the snapshot | ||
@@ -34,9 +28,5 @@ // database. Otherwise this can be used to hit another configured query | ||
// A list of resulting documents. These are actual documents, complete with | ||
// data and all the rest. If fetch is false, these documents will not | ||
// have any data. You should manually call fetch() or subscribe() on them. | ||
// | ||
// Calling subscribe() might be a good idea anyway, as you won't be | ||
// subscribed to the documents by default. | ||
this.knownDocs = options.knownDocs || []; | ||
this.results = []; | ||
// data and all the rest. It is possible to pass in an initial results set, | ||
// so that a query can be serialized and then re-established | ||
this.results = options.results || []; | ||
@@ -58,12 +48,2 @@ // Do we have some initial data? | ||
var versions = {}; | ||
// Collect the version of all the documents in the current result set so we | ||
// don't need to be sent their snapshots again. | ||
for (var i = 0; i < this.knownDocs.length; i++) { | ||
var doc = this.knownDocs[i]; | ||
if (doc.version == null) continue; | ||
var collectionVersions = versions[doc.collection] = (versions[doc.collection] || {}); | ||
collectionVersions[doc.name] = doc.version; | ||
} | ||
var msg = { | ||
@@ -73,9 +53,17 @@ a: 'q' + this.type, | ||
c: this.collection, | ||
o: {vs: versions}, | ||
q: this.query, | ||
q: this.query | ||
}; | ||
if (this.db != null) msg.db = this.db; | ||
if (this.results.length) { | ||
// Collect the version of all the documents in the current result set so we | ||
// don't need to be sent their snapshots again. | ||
var versions = {}; | ||
for (var i = 0; i < this.results.length; i++) { | ||
var doc = this.results[i]; | ||
if (doc.version == null || doc.collection !== this.collection) continue; | ||
versions[doc.name] = doc.version; | ||
} | ||
msg.vs = versions; | ||
} | ||
if (this.backend != null) msg.o.b = this.backend; | ||
if (this.poll !== undefined) msg.o.p = this.poll; | ||
this.connection.send(msg); | ||
@@ -186,3 +174,3 @@ }; | ||
// Then add everything in the new result set. | ||
this.results = this.knownDocs = this._dataToDocs(msg.data); | ||
this.results = this._dataToDocs(msg.data); | ||
this.extra = msg.extra; | ||
@@ -189,0 +177,0 @@ |
@@ -6,2 +6,3 @@ var async = require('async'); | ||
function DB(options) { | ||
// pollDebounce is the minimum time in ms between query polls | ||
this.pollDebounce = options && options.pollDebounce; | ||
@@ -8,0 +9,0 @@ } |
@@ -13,3 +13,3 @@ var DB = require('./index'); | ||
function MemoryDB(options) { | ||
if (!(this instanceof MemoryDB)) return new MemoryDB(); | ||
if (!(this instanceof MemoryDB)) return new MemoryDB(options); | ||
DB.call(this, options); | ||
@@ -16,0 +16,0 @@ |
460
lib/index.js
@@ -1,448 +0,14 @@ | ||
var async = require('async'); | ||
var emitter = require('./emitter'); | ||
var ot = require('./ot'); | ||
var projections = require('./projections'); | ||
var Agent = require('./agent'); | ||
var QueryEmitter = require('./query-emitter'); | ||
var SubmitRequest = require('./submit-request'); | ||
var Backend = require('./backend'); | ||
module.exports = Backend; | ||
function ShareDB(options) { | ||
if (!(this instanceof ShareDB)) return new ShareDB(options); | ||
emitter.EventEmitter.call(this); | ||
if (!options) options = {}; | ||
this.pubsub = options.pubsub || ShareDB.MemoryDB(); | ||
this.db = options.db || ShareDB.MemoryPubSub(); | ||
// This contains any extra databases that can be queried | ||
this.extraDbs = options.extraDbs || {}; | ||
// Map from projected collection -> {type, fields} | ||
this.projections = {}; | ||
this.suppressPublish = !!options.suppressPublish; | ||
this.maxSubmitRetries = options.maxSubmitRetries || null; | ||
// Map from event name to a list of middleware | ||
this.middleware = {}; | ||
} | ||
module.exports = ShareDB; | ||
emitter.mixin(ShareDB); | ||
ShareDB.ot = ot; | ||
ShareDB.projections = projections; | ||
ShareDB.Agent = Agent; | ||
ShareDB.QueryEmitter = QueryEmitter; | ||
ShareDB.SubmitRequest = SubmitRequest; | ||
ShareDB.types = require('./types'); | ||
ShareDB.DB = require('./db'); | ||
ShareDB.MemoryDB = require('./db/memory'); | ||
ShareDB.PubSub = require('./pubsub'); | ||
ShareDB.MemoryPubSub = require('./pubsub/memory'); | ||
ShareDB.prototype.close = function() { | ||
this.pubsub.close(); | ||
this.db.close(); | ||
for (var name in this.extraDbs) { | ||
this.extraDbs[name].close(); | ||
} | ||
}; | ||
/** A client has connected through the specified stream. Listen for messages. | ||
* Returns the useragent associated with the connected session. | ||
* | ||
* The optional second argument (req) is an initial request which is passed | ||
* through to any connect() middleware. This is useful for inspecting cookies | ||
* or an express session or whatever on the request object in your middleware. | ||
* | ||
* (The useragent is available through all middleware) | ||
*/ | ||
ShareDB.prototype.listen = function(stream, req) { | ||
var agent = new Agent(this, stream); | ||
this.trigger('connect', agent, {stream: stream, req: req}, function(err) { | ||
if (err) return agent.close(err); | ||
agent.pump(); | ||
}); | ||
return agent; | ||
}; | ||
ShareDB.prototype.addProjection = function(name, collection, type, fields) { | ||
if (this.projections[name]) { | ||
throw new Error('Projection ' + name + ' already exists'); | ||
} | ||
for (var k in fields) { | ||
if (fields[k] !== true) { | ||
throw new Error('Invalid field ' + k + ' - fields must be {somekey:true}. Subfields not currently supported.'); | ||
} | ||
} | ||
this.projections[name] = { | ||
target: collection, | ||
type: ot.normalizeType(type), | ||
fields: fields | ||
}; | ||
}; | ||
/** | ||
* Add middleware to an action or array of actions | ||
*/ | ||
ShareDB.prototype.use = function(action, fn) { | ||
if (Array.isArray(action)) { | ||
for (var i = 0; i < action.length; i++) { | ||
this.use(action[i], fn); | ||
} | ||
return; | ||
} | ||
var fns = this.middleware[action] || (this.middleware[action] = []); | ||
fns.push(fn); | ||
}; | ||
/** | ||
* Passes request through the middleware stack | ||
* | ||
* Middleware may modify the request object. After all middleware have been | ||
* invoked we call `callback` with `null` and the modified request. If one of | ||
* the middleware resturns an error the callback is called with that error. | ||
*/ | ||
ShareDB.prototype.trigger = function(action, agent, request, callback) { | ||
request.action = action; | ||
request.agent = agent; | ||
request.share = this; | ||
var fns = this.middleware[action]; | ||
if (!fns) return callback(); | ||
// Copying the triggers we'll fire so they don't get edited while we iterate. | ||
fns = fns.slice(); | ||
var next = function(err) { | ||
if (err) return callback(err); | ||
var fn = fns.shift(); | ||
if (!fn) return callback(null, request); | ||
fn(request, next); | ||
}; | ||
next(); | ||
}; | ||
ShareDB.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) { | ||
if (projection) { | ||
try { | ||
op = projections.projectOp(projection.fields, op); | ||
} catch (err) { | ||
return callback(err); | ||
} | ||
} | ||
this.trigger('op', agent, {collection: collection, id: id, op: op}, callback); | ||
}; | ||
ShareDB.prototype._sanitizeOps = function(agent, projection, collection, id, ops, callback) { | ||
var share = this; | ||
async.each(ops, function(op, eachCb) { | ||
share._sanitizeOp(agent, projection, collection, id, op, eachCb); | ||
}, callback); | ||
}; | ||
ShareDB.prototype._sanitizeOpsBulk = function(agent, projection, collection, opsMap, callback) { | ||
var share = this; | ||
async.forEachOf(opsMap, function(ops, id, eachCb) { | ||
share._sanitizeOps(agent, projection, collection, id, ops, eachCb); | ||
}, callback); | ||
}; | ||
ShareDB.prototype._sanitizeSnapshot = function(agent, projection, collection, id, snapshot, callback) { | ||
if (projection) { | ||
try { | ||
snapshot = projections.projectSnapshot(projection.fields, snapshot); | ||
} catch (err) { | ||
return callback(err); | ||
} | ||
} | ||
this.trigger('doc', agent, {collection: collection, id: id, snapshot: snapshot}, callback); | ||
}; | ||
ShareDB.prototype._sanitizeSnapshots = function(agent, projection, collection, snapshots, callback) { | ||
var share = this; | ||
async.each(snapshots, function(snapshot, eachCb) { | ||
share._sanitizeSnapshot(agent, projection, collection, snapshot.id, snapshot, eachCb); | ||
}, callback); | ||
}; | ||
ShareDB.prototype._sanitizeSnapshotBulk = function(agent, projection, collection, snapshotMap, callback) { | ||
var share = this; | ||
async.forEachOf(snapshotMap, function(ops, id, eachCb) { | ||
share._sanitizeSnapshot(agent, projection, collection, id, snapshot, eachCb); | ||
}, callback); | ||
}; | ||
ShareDB.prototype._getSnapshotProjection = function(db, projection) { | ||
return (db.projectsSnapshot) ? null : projection; | ||
}; | ||
// Non inclusive - gets ops from [from, to). Ie, all relevant ops. If to is | ||
// not defined (null or undefined) then it returns all ops. | ||
ShareDB.prototype.getOps = function(agent, index, id, from, to, callback) { | ||
var start = Date.now(); | ||
var projection = this.projections[index]; | ||
var collection = (projection) ? projection.target : index; | ||
var share = this; | ||
share.db.getOps(collection, id, from, to, function(err, ops) { | ||
if (err) return callback(err); | ||
share._sanitizeOps(agent, projection, collection, id, ops, function(err) { | ||
if (err) return callback(err); | ||
share.emit('timing', 'getOps', Date.now() - start); | ||
callback(err, ops); | ||
}); | ||
}); | ||
}; | ||
ShareDB.prototype.getOpsBulk = function(agent, index, fromMap, toMap, callback) { | ||
var start = Date.now(); | ||
var projection = this.projections[index]; | ||
var collection = (projection) ? projection.target : index; | ||
var share = this; | ||
share.db.getOpsBulk(collection, fromMap, toMap, function(err, opsMap) { | ||
if (err) return callback(err); | ||
share._sanitizeOpsBulk(agent, projection, collection, opsMap, function(err) { | ||
if (err) return callback(err); | ||
share.emit('timing', 'getOpsBulk', Date.now() - start); | ||
callback(err, opsMap); | ||
}); | ||
}); | ||
}; | ||
// Submit an operation on the named collection/docname. op should contain a | ||
// {op:}, {create:} or {del:} field. It should probably contain a v: field (if | ||
// it doesn't, it defaults to the current version). | ||
// | ||
// callback called with (err, snapshot, ops) | ||
ShareDB.prototype.submit = function(agent, index, id, op, callback) { | ||
var err = ot.checkOp(op); | ||
if (err) return callback(err); | ||
var request = new SubmitRequest(this, agent, index, id, op); | ||
var share = this; | ||
share.trigger('submit', agent, request, function(err) { | ||
if (err) return callback(err); | ||
request.run(function(err, snapshot, ops) { | ||
if (err) return callback(err); | ||
share.trigger('after submit', agent, request, function(err) { | ||
if (err) return callback(err); | ||
share._sanitizeOps(agent, request.projection, request.collection, id, ops, function(err) { | ||
if (err) return callback(err); | ||
share.emit('timing', 'submit.total', Date.now() - request.start); | ||
callback(err, ops); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}; | ||
ShareDB.prototype.fetch = function(agent, index, id, callback) { | ||
var projection = this.projections[index]; | ||
var collection = (projection) ? projection.target : index; | ||
var fields = projection && projection.fields; | ||
var start = Date.now(); | ||
var share = this; | ||
share.db.getSnapshot(collection, id, fields, function(err, snapshot) { | ||
if (err) return callback(err); | ||
var snapshotProjection = share._getSnapshotProjection(share.db, projection); | ||
share._sanitizeSnapshot(agent, snapshotProjection, collection, id, snapshot, function(err) { | ||
if (err) return callback(err); | ||
share.emit('timing', 'fetch', Date.now() - start); | ||
callback(null, snapshot); | ||
}); | ||
}); | ||
}; | ||
ShareDB.prototype.fetchBulk = function(agent, index, ids, callback) { | ||
var projection = this.projections[index]; | ||
var collection = (projection) ? projection.target : index; | ||
var fields = projection && projection.fields; | ||
var start = Date.now(); | ||
var share = this; | ||
share.db.getSnapshotBulk(collection, ids, fields, function(err, snapshotMap) { | ||
if (err) return done(err); | ||
var snapshotProjection = share._getSnapshotProjection(share.db, projection); | ||
share._sanitizeSnapshotBulk(agent, snapshotProjection, collection, snapshotMap, function(err) { | ||
if (err) return callback(err); | ||
share.emit('timing', 'fetchBulk', Date.now() - start); | ||
callback(null, snapshotMap); | ||
}); | ||
}); | ||
}; | ||
// Subscribe to the document from the specified version or null version | ||
ShareDB.prototype.subscribe = function(agent, index, id, version, callback) { | ||
var projection = this.projections[index]; | ||
var collection = (projection) ? projection.target : index; | ||
var channel = this.getDocChannel(collection, id); | ||
var start = Date.now(); | ||
var share = this; | ||
share.pubsub.subscribe(channel, function(err, stream) { | ||
if (err) return callback(err); | ||
stream.initDocSubscribe(share, agent, projection, version); | ||
if (version == null) { | ||
// Subscribing from null means that the agent doesn't have a document | ||
// and needs to fetch it as well as subscribing | ||
share.fetch(index, id, function(err, snapshot) { | ||
if (err) return callback(err); | ||
share.emit('timing', 'subscribe', Date.now() - start); | ||
callback(null, stream, snapshot); | ||
}); | ||
} else { | ||
share.db.getOps(collection, id, version, null, function(err, ops) { | ||
if (err) return callback(err); | ||
stream.pack(version, ops); | ||
share.emit('timing', 'subscribe', Date.now() - start); | ||
callback(null, stream); | ||
}); | ||
} | ||
}); | ||
}; | ||
ShareDB.prototype.subscribeBulk = function(agent, index, versions, callback) { | ||
var projection = this.projections[index]; | ||
var collection = (projection) ? projection.target : index; | ||
var start = Date.now(); | ||
var share = this; | ||
var streams = {}; | ||
var fetchIds = []; | ||
var opsVersions = null; | ||
async.forEachOf(versions, function(version, id, eachCb) { | ||
if (version == null) { | ||
fetchIds.push(version); | ||
} else { | ||
if (!opsVersions) opsVersions = {}; | ||
opsVersions[id] = version; | ||
} | ||
var channel = share.getDocChannel(collection, id); | ||
share.pubsub.subscribe(channel, function(err, stream) { | ||
if (err) return eachCb(err); | ||
stream.initDocSubscribe(share, agent, projection, version); | ||
streams[id] = stream; | ||
eachCb(); | ||
}); | ||
}, function(err) { | ||
if (err) { | ||
closeStreams(streams); | ||
return callback(err); | ||
} | ||
async.parallel({ | ||
snapshotMap: function(parallelCb) { | ||
if (!fetchIds.length) return parallelCb(null, {}); | ||
share.fetchBulk(agent, index, fetchIds, parallelCb); | ||
}, | ||
ops: function(parallelCb) { | ||
if (!opsVersions) return parallelCb(); | ||
this.db.getOpsBulk(collection, opsVersions, null, function(err, opsMap) { | ||
if (err) return parallelCb(err); | ||
for (var id in opsVersions) { | ||
var version = opsVersions[id]; | ||
var ops = opsMap[id]; | ||
streams[index][id].pack(version, ops); | ||
} | ||
parallelCb(); | ||
}); | ||
} | ||
}, function(err, results) { | ||
if (err) { | ||
closeStreams(streams); | ||
return callback(err); | ||
} | ||
share.emit('timing', 'subscribeBulk', Date.now() - start); | ||
callback(null, streams, results.snapshotMap); | ||
}); | ||
}); | ||
}; | ||
ShareDB.prototype.queryFetch = function(agent, index, query, options, callback) { | ||
var start = Date.now(); | ||
var share = this; | ||
share._triggerQuery(agent, index, query, options, function(err, request) { | ||
if (err) return callback(err); | ||
share._query(agent, request, function(err, snapshots, extra) { | ||
if (err) return callback(err); | ||
share.emit('timing', 'queryFetch', Date.now() - start, request); | ||
callback(null, snapshots, extra); | ||
}); | ||
}); | ||
}; | ||
// Options can contain: | ||
// db: The name of the DB (if the DB is specified in the otherDbs when the share instance is created) | ||
// skipPoll: function(collection, id, op, query) {return true or false; } | ||
// this is a syncronous function which can be used as an early filter for | ||
// operations going through the system to reduce the load on the DB. | ||
// pollDebounce: Minimum delay between subsequent database polls. This is | ||
// used to batch updates to reduce load on the database at the expense of | ||
// liveness. Defaults to 1000 (1 second) | ||
ShareDB.prototype.querySubscribe = function(agent, index, query, options, callback) { | ||
var start = Date.now(); | ||
var share = this; | ||
share._triggerQuery(agent, index, query, options, function(err, request) { | ||
if (err) return callback(err); | ||
if (request.db.disableSubscribe) return callback({message: 'DB does not support subscribe'}); | ||
share.pubsub.subscribe(request.channel, function(err, stream) { | ||
if (err) return callback(err); | ||
// Issue query on db to get our initial results | ||
stream.projection = request.projection; | ||
stream.share = share; | ||
stream.agent = agent; | ||
share._query(agent, request, function(err, snapshots, extra) { | ||
if (err) { | ||
stream.destroy(); | ||
return callback(err); | ||
} | ||
var queryEmitter = new QueryEmitter(request, stream, snapshots, extra); | ||
share.emit('timing', 'querySubscribe', Date.now() - start, request); | ||
callback(null, queryEmitter, snapshots, extra); | ||
}); | ||
}); | ||
}); | ||
}; | ||
ShareDB.prototype._triggerQuery = function(agent, index, query, options, callback) { | ||
var projection = this.projections[index]; | ||
var collection = (projection) ? projection.target : index; | ||
var fields = projection && projection.fields; | ||
var request = { | ||
index: index, | ||
collection: collection, | ||
projection: projection, | ||
fields: fields, | ||
channel: this.getCollectionChannel(collection), | ||
query: query, | ||
options: options, | ||
db: null, | ||
snapshotProjection: null, | ||
}; | ||
var share = this; | ||
share.trigger('query', agent, request, function(err) { | ||
if (err) return callback(err); | ||
// Set the DB reference for the request after the middleware trigger so | ||
// that the db option can be changed in middleware | ||
request.db = (options.db) ? this.extraDbs[options.db] : share.db; | ||
if (!request.db) return callback({message: 'DB not found'}); | ||
request.snapshotProjection = share._getSnapshotProjection(request.db, projection); | ||
callback(null, request); | ||
}); | ||
}; | ||
ShareDB.prototype._query = function(agent, request, callback) { | ||
var share = this; | ||
request.db.query(request.collection, request.query, request.fields, request.options, function(err, snapshots, extra) { | ||
if (err) return callback(err); | ||
share._sanitizeSnapshots(agent, request.snapshotProjection, request.collection, snapshots, function(err) { | ||
callback(err, snapshots, extra); | ||
}); | ||
}); | ||
}; | ||
ShareDB.prototype.getCollectionChannel = function(collection) { | ||
return collection; | ||
}; | ||
ShareDB.prototype.getDocChannel = function(collection, id) { | ||
return collection + '.' + id; | ||
}; | ||
ShareDB.prototype.getChannels = function(collection, id) { | ||
return [ | ||
this.getCollectionChannel(collection), | ||
this.getDocChannel(collection, id) | ||
]; | ||
}; | ||
Backend.Agent = require('./agent'); | ||
Backend.Backend = Backend; | ||
Backend.DB = require('./db'); | ||
Backend.MemoryDB = require('./db/memory'); | ||
Backend.MemoryPubSub = require('./pubsub/memory'); | ||
Backend.ot = require('./ot'); | ||
Backend.projections = require('./projections'); | ||
Backend.PubSub = require('./pubsub'); | ||
Backend.QueryEmitter = require('./query-emitter'); | ||
Backend.SubmitRequest = require('./submit-request'); | ||
Backend.types = require('./types'); |
@@ -13,3 +13,3 @@ var assert = require('assert'); | ||
this.id = null; | ||
this.share = null; | ||
this.backend = null; | ||
this.agent = null; | ||
@@ -33,4 +33,4 @@ this.projection = null; | ||
OpStream.prototype.initDocSubscribe = function(share, agent, projection, version) { | ||
this.share = share; | ||
OpStream.prototype.initDocSubscribe = function(backend, agent, projection, version) { | ||
this.backend = backend; | ||
this.agent = agent; | ||
@@ -55,5 +55,5 @@ this.projection = projection; | ||
if (this.share) { | ||
if (this.backend) { | ||
var stream = this; | ||
this.share._sanitizeOp(this.agent, this.projection, op.collection, op.id, op, function(err, op) { | ||
this.backend._sanitizeOp(this.agent, this.projection, op.c, op.d, op, function(err) { | ||
stream.push(err ? {error: err} : op); | ||
@@ -60,0 +60,0 @@ }); |
@@ -6,3 +6,3 @@ var deepEquals = require('deep-is'); | ||
function QueryEmitter(request, stream, snapshots, extra) { | ||
this.share = request.share; | ||
this.backend = request.backend; | ||
this.agent = request.agent; | ||
@@ -21,3 +21,3 @@ this.db = request.db; | ||
this.skipPoll = this.options.skipPoll || util.doNothing; | ||
this.canPollDoc = this.db.canPollDoc(collection, query); | ||
this.canPollDoc = this.db.canPollDoc(this.collection, this.query); | ||
this.pollDebounce = | ||
@@ -45,6 +45,4 @@ (this.options.pollDebounce != null) ? this.options.pollDebounce : | ||
if (data.error) { | ||
// Ignore errors in op stream instead of passing them through to the | ||
// client, since they likely aren't this client's fault | ||
console.error('Error in query op stream:', emitter.index, emitter.query); | ||
console.error(data.error); | ||
this.emitError(data.error); | ||
continue; | ||
@@ -61,20 +59,14 @@ } | ||
QueryEmitter.prototype._emitTiming = function(action, start) { | ||
this.share.emit('timing', action, Date.now() - start, this.index, this.query); | ||
this.backend.emit('timing', action, Date.now() - start, this.index, this.query); | ||
}; | ||
QueryEmitter.prototype.update = function(op) { | ||
// Ignore if the user or database say we don't need to poll. | ||
// | ||
// Try/catch this, since we want to make sure polling from a malformed op | ||
// doesn't crash the server. Note that we are just quietly logging the error | ||
// instead of emitting it, because any errors thrown here are likely due to | ||
// malformed ops coming from a different client. We don't want to send errors | ||
// to clients that aren't responsible for causing them. | ||
var id = op.d; | ||
// Ignore if the user or database say we don't need to poll | ||
try { | ||
if (this.skipPoll(this.collection, op.id, op, this.query)) return; | ||
if (this.db.skipPoll(this.collection, op.id, op, this.query)) return; | ||
if (this.skipPoll(this.collection, id, op, this.query)) return; | ||
if (this.db.skipPoll(this.collection, id, op, this.query)) return; | ||
} catch (err) { | ||
console.error('Error evaluating skipPoll:', this.collection, op.id, op, this.query); | ||
console.error(err.stack || err); | ||
return; | ||
console.error('Error evaluating skipPoll:', this.collection, id, op, this.query); | ||
return this.emitError(err); | ||
} | ||
@@ -84,3 +76,3 @@ if (this.canPollDoc) { | ||
// op has changed whether or not it matches the results | ||
this.queryPollDoc(op.id); | ||
this.queryPollDoc(id); | ||
} else { | ||
@@ -144,3 +136,3 @@ // We need to do a full poll of the query, because the query uses limits, | ||
if (err) return emitter._finishPoll(err); | ||
emitter.share._sanitizeSnapshotBulk(emitter.agent, emitter.snapshotProjection, emitter.collection, snapshotMap, function(err) { | ||
emitter.backend._sanitizeSnapshotBulk(emitter.agent, emitter.snapshotProjection, emitter.collection, snapshotMap, function(err) { | ||
if (err) return emitter._finishPoll(err); | ||
@@ -207,3 +199,3 @@ emitter._emitTiming('query.pollGetSnapshotBulk', start); | ||
QueryEmitter.prototype.emitOp = function(op) { | ||
if (this.ids.indexOf(op.id) === -1) return; | ||
if (this.ids.indexOf(op.d) === -1) return; | ||
this.onOp(op); | ||
@@ -210,0 +202,0 @@ }; |
var ot = require('./ot'); | ||
function SubmitRequest(share, agent, index, id, op) { | ||
this.share = share; | ||
function SubmitRequest(backend, agent, index, id, op) { | ||
this.backend = backend; | ||
this.agent = agent; | ||
// If a projection, rewrite the call into a call against the collection | ||
var projection = share.projections[index]; | ||
var projection = backend.projections[index]; | ||
this.index = index; | ||
@@ -13,2 +13,4 @@ this.projection = projection; | ||
this.op = op; | ||
this.start = Date.now(); | ||
this._addOpMeta(); | ||
@@ -21,5 +23,4 @@ | ||
this.start = Date.now(); | ||
this.suppressPublish = share.suppressPublish; | ||
this.maxRetries = share.maxSubmitRetries; | ||
this.suppressPublish = backend.suppressPublish; | ||
this.maxRetries = backend.maxSubmitRetries; | ||
this.retries = 0; | ||
@@ -40,3 +41,3 @@ | ||
var request = this; | ||
var share = this.share; | ||
var backend = this.backend; | ||
var collection = this.collection; | ||
@@ -48,3 +49,3 @@ var id = this.id; | ||
share.db.getSnapshot(collection, id, fields, function(err, snapshot) { | ||
backend.db.getSnapshot(collection, id, fields, function(err, snapshot) { | ||
if (err) return callback(err); | ||
@@ -66,5 +67,13 @@ | ||
if (op.v === snapshot.v || op.v == null) { | ||
// The snapshot hasn't changed since the op's base version, or the op | ||
// was submitted with a null version. Apply without transforming the op | ||
if (op.v == null) { | ||
// Submitting an op with a null version means that it should get the | ||
// version from the latest snapshot. Generally this will mean the op | ||
// won't be transformed, though transform could be called on it in the | ||
// case of a retry from a simultaneous submit | ||
op.v = snapshot.v; | ||
} | ||
if (op.v === snapshot.v) { | ||
// The snapshot hasn't changed since the op's base version. Apply | ||
// without transforming the op | ||
return request.apply(callback); | ||
@@ -85,3 +94,3 @@ } | ||
var from = op.v; | ||
share.db.getOpsToSnapshot(collection, id, from, snapshot, function(err, ops) { | ||
backend.db.getOpsToSnapshot(collection, id, from, snapshot, function(err, ops) { | ||
if (err) return callback(err); | ||
@@ -147,6 +156,6 @@ | ||
// modified in a middleware and we retry, we want to reset to a new array | ||
this.channels = this.share.getChannels(this.collection, this.id); | ||
this.channels = this.backend.getChannels(this.collection, this.id); | ||
var request = this; | ||
this.share.trigger('apply', this.agent, this, function(err) { | ||
this.backend.trigger('apply', this.agent, this, function(err) { | ||
if (err) return callback(err); | ||
@@ -165,9 +174,10 @@ if (!request.op) return callback(); | ||
var request = this; | ||
var share = this.share; | ||
share.trigger('commit', this.agent, this, function(err) { | ||
var backend = this.backend; | ||
backend.trigger('commit', this.agent, this, function(err) { | ||
if (err) return callback(err); | ||
if (!request.op) return callback(); | ||
var op = request.op; | ||
if (!op) return callback(); | ||
// Try committing the operation and snapshot to the database atomically | ||
share.db.commit(request.collection, request.id, request.op, request.snapshot, function(err, succeeded) { | ||
backend.db.commit(request.collection, request.id, op, request.snapshot, function(err, succeeded) { | ||
if (err) return callback(err); | ||
@@ -180,3 +190,9 @@ if (!succeeded) { | ||
if (!request.suppressPublish) { | ||
share.pubsub.publish(request.channels, request.op); | ||
op.c = request.collection; | ||
op.d = request.id; | ||
op.m = undefined; | ||
// Needed for agent to detect if it can ignore sending the op back to | ||
// the client that submitted it in subscriptions | ||
if (request.collection !== request.index) op.i = request.index; | ||
backend.pubsub.publish(request.channels, op); | ||
} | ||
@@ -197,3 +213,3 @@ callback(err, request.snapshot, request.ops); | ||
} | ||
this.share.emit('timing', 'submit.retry', Date.now() - this.start, this.retries); | ||
this.backend.emit('timing', 'submit.retry', Date.now() - this.start, this.retries); | ||
this.run(callback); | ||
@@ -213,3 +229,3 @@ }; | ||
var err = { | ||
code: 4000, | ||
code: 4001, | ||
message: 'Op already submitted' | ||
@@ -216,0 +232,0 @@ }; |
{ | ||
"name": "sharedb", | ||
"version": "0.8.0", | ||
"version": "0.8.1", | ||
"description": "JSON OT database backend", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
43
238557
3683