New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

sharedb

Package Overview
Dependencies
Maintainers
6
Versions
141
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sharedb - npm Package Compare versions

Comparing version 5.0.3 to 5.0.4

109

lib/agent.js

@@ -51,2 +51,6 @@ var hat = require('hat');

this.presenceRequests = Object.create(null);
// Keep track of the latest known Doc version, so that we can avoid fetching
// ops to transform presence if not needed
this.latestDocVersionStreams = Object.create(null);
this.latestDocVersions = Object.create(null);

@@ -112,2 +116,8 @@ // We need to track this manually to make sure we don't reply to messages

this.subscribedQueries = Object.create(null);
for (var collection in this.latestDocVersionStreams) {
var streams = this.latestDocVersionStreams[collection];
for (var id in streams) streams[id].destroy();
}
this.latestDocVersionStreams = Object.create(null);
};

@@ -120,13 +130,4 @@

Agent.prototype._subscribeToStream = function(collection, id, stream) {
if (this.closed) return stream.destroy();
var streams = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = Object.create(null));
// If already subscribed to this document, destroy the previously subscribed stream
var previous = streams[id];
if (previous) previous.destroy();
streams[id] = stream;
var agent = this;
stream.on('data', function(data) {
this._subscribeMapToStream(this.subscribedDocs, collection, id, stream, function(data) {
if (data.error) {

@@ -141,9 +142,22 @@ // Log then silently ignore errors in a subscription stream, since these

});
};
Agent.prototype._subscribeMapToStream = function(map, collection, id, stream, dataHandler) {
if (this.closed) return stream.destroy();
var streams = map[collection] || (map[collection] = Object.create(null));
// If already subscribed to this document, destroy the previously subscribed stream
var previous = streams[id];
if (previous) previous.destroy();
streams[id] = stream;
stream.on('data', dataHandler);
stream.on('end', function() {
// The op stream is done sending, so release its reference
var streams = agent.subscribedDocs[collection];
var streams = map[collection];
if (!streams || streams[id] !== stream) return;
delete streams[id];
if (util.hasKeys(streams)) return;
delete agent.subscribedDocs[collection];
delete map[collection];
});

@@ -801,16 +815,35 @@ };

var start = Date.now();
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, this, context, function(error) {
var subscriptionUpdater = presence.p === null ?
this._unsubscribeDocVersion.bind(this) :
this._subscribeDocVersion.bind(this);
subscriptionUpdater(presence.c, presence.d, function(error) {
if (error) return callback(error);
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null));
var previousRequest = requests[presence.id];
if (!previousRequest || previousRequest.pv < presence.pv) {
presenceRequests[presence.ch][presence.id] = presence;
}
backend.transformPresenceToLatestVersion(agent, presence, function(error, presence) {
backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, agent, context, function(error) {
if (error) return callback(error);
var channel = agent._getPresenceChannel(presence.ch);
agent.backend.pubsub.publish([channel], presence, function(error) {
var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null));
var previousRequest = requests[presence.id];
if (!previousRequest || previousRequest.pv < presence.pv) {
presenceRequests[presence.ch][presence.id] = presence;
}
var transformer = function(agent, presence, callback) {
callback(null, presence);
};
var latestDocVersion = util.dig(agent.latestDocVersions, presence.c, presence.d);
var presenceIsUpToDate = presence.v === latestDocVersion;
if (!presenceIsUpToDate) {
transformer = backend.transformPresenceToLatestVersion.bind(backend);
}
transformer(agent, presence, function(error, presence) {
if (error) return callback(error);
backend.emit('timing', 'presence.broadcast', Date.now() - start, context);
callback(null, presence);
var channel = agent._getPresenceChannel(presence.ch);
agent.backend.pubsub.publish([channel], presence, function(error) {
if (error) return callback(error);
backend.emit('timing', 'presence.broadcast', Date.now() - start, context);
callback(null, presence);
});
});

@@ -821,2 +854,32 @@ });

Agent.prototype._subscribeDocVersion = function(collection, id, callback) {
if (!collection || !id) return callback();
var latestDocVersions = this.latestDocVersions;
var isSubscribed = util.dig(latestDocVersions, collection, id) !== undefined;
if (isSubscribed) return callback();
var agent = this;
this.backend.subscribe(this, collection, id, null, function(error, stream, snapshot) {
if (error) return callback(error);
var versions = latestDocVersions[collection] || (latestDocVersions[collection] = Object.create(null));
versions[id] = snapshot.v;
agent._subscribeMapToStream(agent.latestDocVersionStreams, collection, id, stream, function(op) {
// op.v behind snapshot.v by 1
latestDocVersions[collection][id] = op.v + 1;
});
callback();
});
};
Agent.prototype._unsubscribeDocVersion = function(collection, id, callback) {
var stream = util.dig(this.latestDocVersionStreams, collection, id);
if (stream) stream.destroy();
util.digAndRemove(this.latestDocVersions, collection, id);
util.nextTick(callback);
};
Agent.prototype._createPresence = function(request) {

@@ -823,0 +886,0 @@ return {

{
"name": "sharedb",
"version": "5.0.3",
"version": "5.0.4",
"description": "JSON OT database backend",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -8,2 +8,3 @@ var Backend = require('../../../lib/backend');

var PresencePauser = require('./presence-pauser');
var sinon = require('sinon');
types.register(presenceTestType.type);

@@ -301,2 +302,19 @@

it('does not call getOps() when presence is already up-to-date', function(done) {
var localPresence1 = presence1.create('presence-1');
async.series([
doc1.fetch.bind(doc1), // Ensure up-to-date
function(next) {
sinon.spy(Backend.prototype, 'getOps');
next();
},
localPresence1.submit.bind(localPresence1, {index: 1}),
function(next) {
expect(Backend.prototype.getOps).not.to.have.been.called;
next();
}
], done);
});
// This test case attempts to force us into a tight race condition corner case:

@@ -303,0 +321,0 @@ // 1. doc1 sends presence, as well as submits an op

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc