Comparing version 5.0.3 to 5.0.4
109
lib/agent.js
@@ -51,2 +51,6 @@ var hat = require('hat'); | ||
this.presenceRequests = Object.create(null); | ||
// Keep track of the latest known Doc version, so that we can avoid fetching | ||
// ops to transform presence if not needed | ||
this.latestDocVersionStreams = Object.create(null); | ||
this.latestDocVersions = Object.create(null); | ||
@@ -112,2 +116,8 @@ // We need to track this manually to make sure we don't reply to messages | ||
this.subscribedQueries = Object.create(null); | ||
for (var collection in this.latestDocVersionStreams) { | ||
var streams = this.latestDocVersionStreams[collection]; | ||
for (var id in streams) streams[id].destroy(); | ||
} | ||
this.latestDocVersionStreams = Object.create(null); | ||
}; | ||
@@ -120,13 +130,4 @@ | ||
Agent.prototype._subscribeToStream = function(collection, id, stream) { | ||
if (this.closed) return stream.destroy(); | ||
var streams = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = Object.create(null)); | ||
// If already subscribed to this document, destroy the previously subscribed stream | ||
var previous = streams[id]; | ||
if (previous) previous.destroy(); | ||
streams[id] = stream; | ||
var agent = this; | ||
stream.on('data', function(data) { | ||
this._subscribeMapToStream(this.subscribedDocs, collection, id, stream, function(data) { | ||
if (data.error) { | ||
@@ -141,9 +142,22 @@ // Log then silently ignore errors in a subscription stream, since these | ||
}); | ||
}; | ||
Agent.prototype._subscribeMapToStream = function(map, collection, id, stream, dataHandler) { | ||
if (this.closed) return stream.destroy(); | ||
var streams = map[collection] || (map[collection] = Object.create(null)); | ||
// If already subscribed to this document, destroy the previously subscribed stream | ||
var previous = streams[id]; | ||
if (previous) previous.destroy(); | ||
streams[id] = stream; | ||
stream.on('data', dataHandler); | ||
stream.on('end', function() { | ||
// The op stream is done sending, so release its reference | ||
var streams = agent.subscribedDocs[collection]; | ||
var streams = map[collection]; | ||
if (!streams || streams[id] !== stream) return; | ||
delete streams[id]; | ||
if (util.hasKeys(streams)) return; | ||
delete agent.subscribedDocs[collection]; | ||
delete map[collection]; | ||
}); | ||
@@ -801,16 +815,35 @@ }; | ||
var start = Date.now(); | ||
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, this, context, function(error) { | ||
var subscriptionUpdater = presence.p === null ? | ||
this._unsubscribeDocVersion.bind(this) : | ||
this._subscribeDocVersion.bind(this); | ||
subscriptionUpdater(presence.c, presence.d, function(error) { | ||
if (error) return callback(error); | ||
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null)); | ||
var previousRequest = requests[presence.id]; | ||
if (!previousRequest || previousRequest.pv < presence.pv) { | ||
presenceRequests[presence.ch][presence.id] = presence; | ||
} | ||
backend.transformPresenceToLatestVersion(agent, presence, function(error, presence) { | ||
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, agent, context, function(error) { | ||
if (error) return callback(error); | ||
var channel = agent._getPresenceChannel(presence.ch); | ||
agent.backend.pubsub.publish([channel], presence, function(error) { | ||
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null)); | ||
var previousRequest = requests[presence.id]; | ||
if (!previousRequest || previousRequest.pv < presence.pv) { | ||
presenceRequests[presence.ch][presence.id] = presence; | ||
} | ||
var transformer = function(agent, presence, callback) { | ||
callback(null, presence); | ||
}; | ||
var latestDocVersion = util.dig(agent.latestDocVersions, presence.c, presence.d); | ||
var presenceIsUpToDate = presence.v === latestDocVersion; | ||
if (!presenceIsUpToDate) { | ||
transformer = backend.transformPresenceToLatestVersion.bind(backend); | ||
} | ||
transformer(agent, presence, function(error, presence) { | ||
if (error) return callback(error); | ||
backend.emit('timing', 'presence.broadcast', Date.now() - start, context); | ||
callback(null, presence); | ||
var channel = agent._getPresenceChannel(presence.ch); | ||
agent.backend.pubsub.publish([channel], presence, function(error) { | ||
if (error) return callback(error); | ||
backend.emit('timing', 'presence.broadcast', Date.now() - start, context); | ||
callback(null, presence); | ||
}); | ||
}); | ||
@@ -821,2 +854,32 @@ }); | ||
Agent.prototype._subscribeDocVersion = function(collection, id, callback) { | ||
if (!collection || !id) return callback(); | ||
var latestDocVersions = this.latestDocVersions; | ||
var isSubscribed = util.dig(latestDocVersions, collection, id) !== undefined; | ||
if (isSubscribed) return callback(); | ||
var agent = this; | ||
this.backend.subscribe(this, collection, id, null, function(error, stream, snapshot) { | ||
if (error) return callback(error); | ||
var versions = latestDocVersions[collection] || (latestDocVersions[collection] = Object.create(null)); | ||
versions[id] = snapshot.v; | ||
agent._subscribeMapToStream(agent.latestDocVersionStreams, collection, id, stream, function(op) { | ||
// op.v behind snapshot.v by 1 | ||
latestDocVersions[collection][id] = op.v + 1; | ||
}); | ||
callback(); | ||
}); | ||
}; | ||
Agent.prototype._unsubscribeDocVersion = function(collection, id, callback) { | ||
var stream = util.dig(this.latestDocVersionStreams, collection, id); | ||
if (stream) stream.destroy(); | ||
util.digAndRemove(this.latestDocVersions, collection, id); | ||
util.nextTick(callback); | ||
}; | ||
Agent.prototype._createPresence = function(request) { | ||
@@ -823,0 +886,0 @@ return { |
{ | ||
"name": "sharedb", | ||
"version": "5.0.3", | ||
"version": "5.0.4", | ||
"description": "JSON OT database backend", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -8,2 +8,3 @@ var Backend = require('../../../lib/backend'); | ||
var PresencePauser = require('./presence-pauser'); | ||
var sinon = require('sinon'); | ||
types.register(presenceTestType.type); | ||
@@ -301,2 +302,19 @@ | ||
it('does not call getOps() when presence is already up-to-date', function(done) { | ||
var localPresence1 = presence1.create('presence-1'); | ||
async.series([ | ||
doc1.fetch.bind(doc1), // Ensure up-to-date | ||
function(next) { | ||
sinon.spy(Backend.prototype, 'getOps'); | ||
next(); | ||
}, | ||
localPresence1.submit.bind(localPresence1, {index: 1}), | ||
function(next) { | ||
expect(Backend.prototype.getOps).not.to.have.been.called; | ||
next(); | ||
} | ||
], done); | ||
}); | ||
// This test case attempts to force us into a tight race condition corner case: | ||
@@ -303,0 +321,0 @@ // 1. doc1 sends presence, as well as submits an op |
668775
17807