New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

sharedb

Package Overview
Dependencies
Maintainers
1
Versions
141
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sharedb - npm Package Compare versions

Comparing version 0.8.5 to 0.9.0

lib/stream-socket.js

367

lib/agent.js

@@ -1,25 +0,1 @@

// This implements the network API for ShareJS.
//
// The wire protocol is speccced out here:
// https://github.com/josephg/ShareJS/wiki/Wire-Protocol
//
// When a client connects the server first authenticates it and sends:
//
// S: {id:<agent clientId>}
//
// After that, the client can open documents:
//
// C: {c:'users', d:'fred', sub:true, snapshot:null, create:true, type:'text'}
// S: {c:'users', d:'fred', sub:true, snapshot:{snapshot:'hi there', v:5, meta:{}}, create:false}
//
// ...
//
// The client can send open requests as soon as the socket has opened - it doesn't need to
// wait for its id.
//
// The wire protocol is documented here:
// https://github.com/josephg/ShareJS/wiki/Wire-Protocol
//
var async = require('async');
var hat = require('hat');

@@ -67,3 +43,3 @@ var util = require('./util');

// Initialize the remote client by sending it its agent Id.
this._send({a: 'init', protocol: 0, id: this.clientId});
this._send({a: 'init', protocol: 1, id: this.clientId});
}

@@ -187,3 +163,3 @@ module.exports = Agent;

// them in the streams for subscribed documents or queries
return this.clientId === op.src && collection === (op.i || op.c);
return (this.clientId === op.src) && (collection === (op.i || op.c));
};

@@ -214,4 +190,15 @@

Agent.prototype._sendOps = function(collection, id, ops) {
for (var i = 0; i < ops.length; i++) {
this._sendOp(collection, id, ops[i]);
}
};
Agent.prototype._reply = function(req, err, msg) {
var msg = (err) ? {error: err} : msg || {};
if (err) {
req.error = err;
this._send(req);
return;
}
if (!msg) msg = {};

@@ -222,2 +209,3 @@ msg.a = req.a;

if (req.id) msg.id = req.id;
if (req.b && !msg.data) msg.b = req.b;

@@ -258,6 +246,6 @@ this._send(msg);

Agent.prototype._checkRequest = function(req) {
if (req.a === 'qsub' || req.a === 'qfetch' || req.a === 'qunsub') {
if (req.a === 'qf' || req.a === 'qs' || req.a === 'qu') {
// Query messages need an ID property.
if (typeof req.id !== 'number') return 'Missing query ID';
} else if (req.a === 'op' || req.a === 'sub' || req.a === 'unsub' || req.a === 'fetch') {
} else if (req.a === 'op' || req.a === 'f' || req.a === 's' || req.a === 'u') {
// Doc-based request.

@@ -270,7 +258,6 @@ if (req.c != null && typeof req.c !== 'string') return 'Invalid collection';

}
} else if (req.a === 'bs') {
// Bulk subscribe
if (typeof req.s !== 'object') return 'Invalid bulk subscribe data';
} else {
return 'Invalid action';
} else if (req.a === 'bf' || req.a === 'bs' || req.a === 'bu') {
// Bulk request
if (req.c != null && typeof req.c !== 'string') return 'Invalid collection';
if (typeof req.b !== 'object') return 'Invalid bulk subscribe data';
}

@@ -286,33 +273,55 @@ };

var errMessage = this._checkRequest(req);
if (errMessage) return callback({code: 4000, message: errMessage});
try {
var errMessage = this._checkRequest(req);
if (errMessage) return callback({code: 4000, message: errMessage});
switch (req.a) {
case 'qsub':
return this._querySubscribe(req, callback);
case 'qunsub':
return this._queryUnsubscribe(req, callback);
case 'qfetch':
return this._queryFetch(req, callback);
case 'bs':
return this._bulkSubscribe(req, callback);
case 'sub':
return this._subscribe(req, callback);
case 'unsub':
return this._unsubscribe(req, callback);
case 'fetch':
return this._fetch(req, callback);
case 'op':
return this._submit(req, callback);
default:
callback({
code: 4000,
message: 'Invalid or unknown message'
});
switch (req.a) {
case 'qf':
return this._queryFetch(req.id, req.c, req.q, getQueryOptions(req), callback);
case 'qs':
return this._querySubscribe(req.id, req.c, req.q, getQueryOptions(req), callback);
case 'qu':
return this._queryUnsubscribe(req.id, callback);
case 'bf':
return this._fetchBulk(req.c, req.b, callback);
case 'bs':
return this._subscribeBulk(req.c, req.b, callback);
case 'bu':
return this._unsubscribeBulk(req.c, req.b, callback);
case 'f':
return this._fetch(req.c, req.d, req.v, callback);
case 's':
return this._subscribe(req.c, req.d, req.v, callback);
case 'u':
return this._unsubscribe(req.c, req.d, callback);
case 'op':
var op = this._createOp(req);
return this._submit(req.c, req.d, op, callback);
default:
callback({
code: 4000,
message: 'Invalid or unknown message'
});
}
} catch (err) {
callback(err);
}
};
function getQueryOptions(req) {
var results = req.r;
var ids, versions;
if (results) {
ids = [];
versions = (results.length) ? {} : null;
for (var i = 0; i < results.length; i++) {
var result = results[i];
var id = result[0];
var version = result[1];
ids.push(id);
versions[id] = version;
}
}
return {
versions: req.vs,
ids: ids,
versions: versions,
db: req.db

@@ -322,59 +331,49 @@ };

Agent.prototype._querySubscribe = function(req, callback) {
// Subscribe to a query. The client is sent the query results and its
// notified whenever there's a change
var queryId = req.id;
var collection = req.c;
var query = req.q;
var options = getQueryOptions(req);
var agent = this;
this.backend.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) {
if (err) return callback(err);
agent._subscribeToQuery(emitter, queryId, collection, query);
agent._sendQueryResults(queryId, collection, options, results, extra, callback);
});
};
Agent.prototype._queryUnsubscribe = function(req, callback) {
var queryId = req.id;
var emitter = this.subscribedQueries[queryId];
if (emitter) {
emitter.destroy();
delete this.subscribedQueries[queryId];
}
process.nextTick(callback);
};
Agent.prototype._queryFetch = function(req, callback) {
Agent.prototype._queryFetch = function(queryId, collection, query, options, callback) {
// Fetch the results of a query once
var queryId = req.id;
var collection = req.c;
var query = req.q;
var options = getQueryOptions(req);
var agent = this;
this.backend.queryFetch(this, collection, query, options, function(err, results, extra) {
if (err) return callback(err);
agent._sendQueryResults(queryId, collection, options, results, extra, callback);
var message = {
data: getResultsData(results),
extra: extra
};
callback(null, message);
});
};
Agent.prototype._sendQueryResults = function(queryId, collection, options, results, extra, callback) {
var versions = options.versions;
var data = getResultsData(results, versions);
var res = {id: queryId, data: data, extra: extra};
var opsRequest = getResultsOpsRequest(results, versions);
if (!opsRequest) return callback(null, res);
Agent.prototype._querySubscribe = function(queryId, collection, query, options, callback) {
// Subscribe to a query. The client is sent the query results and its
// notified whenever there's a change
var agent = this;
this.backend.getOpsBulk(this, collection, opsRequest, null, function(err, results) {
var wait = 1;
var message;
function finish(err) {
if (err) return callback(err);
for (var id in results) {
var ops = results[id];
for (var i = 0; i < ops.length; i++) {
agent._sendOp(collection, id, ops[i]);
}
if (--wait) return;
callback(null, message);
}
if (options.versions) {
wait++;
this._fetchBulk(collection, options.versions, finish);
}
this.backend.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) {
if (err) return finish(err);
agent._subscribeToQuery(emitter, queryId, collection, query);
// No results are returned when ids are passed in as an option. Instead,
// want to re-poll the entire query once we've established listeners to
// emit any diff in results
if (!results) {
emitter.queryPoll(finish);
return;
}
callback(null, res);
message = {
data: getResultsData(results),
extra: extra
};
finish();
});
};
function getResultsData(results, versions) {
function getResultsData(results) {
var items = [];

@@ -384,9 +383,10 @@ var lastType = null;

var result = results[i];
var item = {d: result.id, v: result.v};
var item = {
d: result.id,
v: result.v,
data: result.data
};
if (lastType !== result.type) {
lastType = item.type = result.type;
}
if (!versions || versions[result.id] == null) {
item.data = result.data;
}
items.push(item);

@@ -396,59 +396,51 @@ }

}
function getResultsOpsRequest(results, versions) {
if (!versions) return;
var request;
for (var i = 0; i < results.length; i++) {
var result = results[i];
var from = versions[result.id];
if (from != null && result.v > from) {
if (!request) request = {};
request[result.id] = from;
}
Agent.prototype._queryUnsubscribe = function(queryId, callback) {
var emitter = this.subscribedQueries[queryId];
if (emitter) {
emitter.destroy();
delete this.subscribedQueries[queryId];
}
return request;
}
process.nextTick(callback);
};
// Bulk subscribe. The message is:
// {a:'bs', s:{users:{fred:100, george:5, carl:null}}}
Agent.prototype._bulkSubscribe = function(req, callback) {
var request = req.s;
var response = {};
Agent.prototype._fetch = function(collection, id, version, callback) {
if (version == null) {
// Fetch a snapshot
this.backend.fetch(this, collection, id, function(err, data) {
if (err) return callback(err);
callback(null, {data: data});
});
} else {
// It says fetch on the tin, but if a version is specified the client
// actually wants me to fetch some ops
var agent = this;
this.backend.getOps(this, collection, id, version, null, function(err, ops) {
if (err) return callback(err);
agent._sendOps(collection, id, ops);
callback();
});
}
};
var agent = this;
async.forEachOf(request, function(versions, collection, eachCb) {
agent.backend.subscribeBulk(agent, collection, versions, function(err, streams, snapshotMap) {
if (err) return eachCb(err);
for (var id in streams) {
agent._subscribeToStream(collection, id, streams[id]);
// Give a thumbs up for the subscription
if (!snapshotMap[id]) snapshotMap[id] = true;
Agent.prototype._fetchBulk = function(collection, versions, callback) {
if (Array.isArray(versions)) {
this.backend.fetchBulk(this, collection, versions, function(err, snapshotMap) {
if (err) return callback(err);
callback(null, {data: snapshotMap});
});
} else {
var agent = this;
this.backend.getOpsBulk(this, collection, versions, null, function(err, opsMap) {
if (err) return callback(err);
for (var id in opsMap) {
var ops = opsMap[id];
agent._sendOps(collection, id, ops);
}
response[collection] = snapshotMap;
eachCb();
callback();
});
}, function(err) {
if (err) {
// Close any streams we may have already subscribed before erroring to
// avoid leaking memory if earlier calls to backend.subscribeBulk succeed
// and others fail
for (var collection in request) {
var docs = agent.subscribedDocs[collection];
if (!docs) continue;
for (var id in request[collection]) {
var stream = docs[id];
if (stream) stream.destroy();
}
}
return callback(err);
}
callback(null, {s: response});
});
}
};
Agent.prototype._subscribe = function(req, callback) {
// Subscribe to a document
var collection = req.c;
var id = req.d;
var version = req.v;
Agent.prototype._subscribe = function(collection, id, version, callback) {
// If the version is specified, catch the client up by sending all ops

@@ -470,7 +462,20 @@ // since the specified version

Agent.prototype._unsubscribe = function(req, callback) {
Agent.prototype._subscribeBulk = function(collection, versions, callback) {
var agent = this;
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap) {
if (err) return callback(err);
for (var id in streams) {
agent._subscribeToStream(collection, id, streams[id]);
}
if (snapshotMap) {
callback(null, {data: snapshotMap});
} else {
callback();
}
});
};
Agent.prototype._unsubscribe = function(collection, id, callback) {
// Unsubscribe from the specified document. This cancels the active
// stream or an inflight subscribing state
var collection = req.c;
var id = req.d;
var docs = this.subscribedDocs[collection];

@@ -482,30 +487,14 @@ var stream = docs && docs[id];

Agent.prototype._fetch = function(req, callback) {
var collection = req.c;
var id = req.d;
var version = req.v;
if (version == null) {
// Fetch a snapshot
this.backend.fetch(this, collection, id, function(err, data) {
if (err) return callback(err);
callback(null, {data: data});
});
} else {
// It says fetch on the tin, but if a version is specified the client
// actually wants me to fetch some ops
var agent = this;
this.backend.getOps(this, collection, id, version, null, function(err, results) {
if (err) return callback(err);
for (var i = 0; i < results.length; i++) {
agent._sendOp(collection, id, results[i]);
}
callback();
});
Agent.prototype._unsubscribeBulk = function(collection, ids, callback) {
var docs = this.subscribedDocs[collection];
if (!docs) return process.nextTick(callback);
for (var i = 0; i < ids.length; i++) {
var id = ids[i];
var stream = docs[id];
if (stream) stream.destroy();
}
process.nextTick(callback);
};
Agent.prototype._submit = function(req, callback) {
var collection = req.c;
var id = req.d;
var op = this._createOp(req);
Agent.prototype._submit = function(collection, id, op, callback) {
var agent = this;

@@ -524,5 +513,3 @@ this.backend.submit(this, collection, id, op, function(err, ops) {

// Reply with any operations that the client is missing.
for (var i = 0; i < ops.length; i++) {
agent._sendOp(collection, id, ops[i]);
}
agent._sendOps(collection, id, ops);
callback(null, ack);

@@ -529,0 +516,0 @@ });

@@ -0,3 +1,5 @@

var Duplex = require('stream').Duplex;
var async = require('async');
var Agent = require('./agent');
var Connection = require('./client/connection');
var emitter = require('./emitter');

@@ -9,2 +11,3 @@ var MemoryDB = require('./db/memory');

var QueryEmitter = require('./query-emitter');
var StreamSocket = require('./stream-socket');
var SubmitRequest = require('./submit-request');

@@ -42,2 +45,11 @@

Backend.prototype.createConnection = function() {
var stream = new Duplex({objectMode: true});
var socket = new StreamSocket(stream);
var connection = new Connection(socket);
socket.connect();
this.listen(stream);
return connection;
};
/** A client has connected through the specified stream. Listen for messages.

@@ -117,2 +129,28 @@ *

// Submit an operation on the named collection/docname. op should contain a
// {op:}, {create:} or {del:} field. It should probably contain a v: field (if
// it doesn't, it defaults to the current version).
//
// callback called with (err, snapshot, ops)
Backend.prototype.submit = function(agent, index, id, op, callback) {
var err = ot.checkOp(op);
if (err) return callback(err);
var request = new SubmitRequest(this, agent, index, id, op);
var backend = this;
backend.trigger('submit', agent, request, function(err) {
if (err) return callback(err);
request.run(function(err, snapshot, ops) {
if (err) return callback(err);
backend.trigger('after submit', agent, request, function(err) {
if (err) return callback(err);
backend._sanitizeOps(agent, request.projection, request.collection, id, ops, function(err) {
if (err) return callback(err);
backend.emit('timing', 'submit.total', Date.now() - request.start);
callback(err, ops);
});
});
});
});
};
Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) {

@@ -200,28 +238,2 @@ if (projection) {

// Submit an operation on the named collection/docname. op should contain a
// {op:}, {create:} or {del:} field. It should probably contain a v: field (if
// it doesn't, it defaults to the current version).
//
// callback called with (err, snapshot, ops)
Backend.prototype.submit = function(agent, index, id, op, callback) {
var err = ot.checkOp(op);
if (err) return callback(err);
var request = new SubmitRequest(this, agent, index, id, op);
var backend = this;
backend.trigger('submit', agent, request, function(err) {
if (err) return callback(err);
request.run(function(err, snapshot, ops) {
if (err) return callback(err);
backend.trigger('after submit', agent, request, function(err) {
if (err) return callback(err);
backend._sanitizeOps(agent, request.projection, request.collection, id, ops, function(err) {
if (err) return callback(err);
backend.emit('timing', 'submit.total', Date.now() - request.start);
callback(err, ops);
});
});
});
});
};
Backend.prototype.fetch = function(agent, index, id, callback) {

@@ -270,3 +282,3 @@ var projection = this.projections[index];

if (err) return callback(err);
stream.initDocSubscribe(backend, agent, projection, version);
stream.initProjection(backend, agent, projection);
if (version == null) {

@@ -283,3 +295,3 @@ // Subscribing from null means that the agent doesn't have a document

if (err) return callback(err);
stream.pack(version, ops);
stream.pushOps(ops);
backend.emit('timing', 'subscribe', Date.now() - start);

@@ -298,15 +310,9 @@ callback(null, stream);

var streams = {};
var fetchIds = [];
var opsVersions = null;
async.forEachOf(versions, function(version, id, eachCb) {
if (version == null) {
fetchIds.push(version);
} else {
if (!opsVersions) opsVersions = {};
opsVersions[id] = version;
}
var doFetch = Array.isArray(versions);
var ids = (doFetch) ? versions : Object.keys(versions);
async.each(ids, function(id, eachCb) {
var channel = backend.getDocChannel(collection, id);
backend.pubsub.subscribe(channel, function(err, stream) {
if (err) return eachCb(err);
stream.initDocSubscribe(backend, agent, projection, version);
stream.initProjection(backend, agent, projection);
streams[id] = stream;

@@ -320,27 +326,27 @@ eachCb();

}
async.parallel({
snapshotMap: function(parallelCb) {
if (!fetchIds.length) return parallelCb(null, {});
backend.fetchBulk(agent, index, fetchIds, parallelCb);
},
ops: function(parallelCb) {
if (!opsVersions) return parallelCb();
backend.db.getOpsBulk(collection, opsVersions, null, function(err, opsMap) {
if (err) return parallelCb(err);
for (var id in opsVersions) {
var version = opsVersions[id];
var ops = opsMap[id];
streams[id].pack(version, ops);
}
parallelCb();
});
}
}, function(err, results) {
if (err) {
destroyStreams(streams);
return callback(err);
}
backend.emit('timing', 'subscribeBulk', Date.now() - start);
callback(null, streams, results.snapshotMap);
});
if (doFetch) {
// If an array of ids, get current snapshots
backend.fetchBulk(agent, index, ids, function(err, snapshotMap) {
if (err) {
destroyStreams(streams);
return callback(err);
}
backend.emit('timing', 'subscribeBulk', Date.now() - start);
callback(null, streams, snapshotMap);
});
} else {
// If a versions map, get ops since requested versions
backend.db.getOpsBulk(collection, versions, null, function(err, opsMap) {
if (err) {
destroyStreams(streams);
return callback(err);
}
for (var id in opsMap) {
var ops = opsMap[id];
streams[id].pushOps(ops);
}
backend.emit('timing', 'subscribeBulk', Date.now() - start);
callback(null, streams);
});
}
});

@@ -383,6 +389,10 @@ };

if (err) return callback(err);
stream.initProjection(backend, agent, request.projection);
if (options.ids) {
var queryEmitter = new QueryEmitter(request, stream, options.ids);
backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request);
callback(null, queryEmitter);
return;
}
// Issue query on db to get our initial results
stream.projection = request.projection;
stream.backend = backend;
stream.agent = agent;
backend._query(agent, request, function(err, snapshots, extra) {

@@ -393,3 +403,4 @@ if (err) {

}
var queryEmitter = new QueryEmitter(request, stream, snapshots, extra);
var ids = pluckIds(snapshots);
var queryEmitter = new QueryEmitter(request, stream, ids, extra);
backend.emit('timing', 'querySubscribe', Date.now() - start, request);

@@ -453,1 +464,9 @@ callback(null, queryEmitter, snapshots, extra);

};
function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
ids.push(snapshots[i].id);
}
return ids;
}

@@ -25,3 +25,3 @@ var Doc = require('./doc');

// Map of collection -> docName -> doc object for created documents.
// Map of collection -> id -> doc object for created documents.
// (created documents MUST BE UNIQUE)

@@ -88,6 +88,6 @@ this.collections = {};

if (this.socket) {
delete this.socket.onopen
delete this.socket.onclose
delete this.socket.onmessage
delete this.socket.onerror
this.socket.onmessage = null;
this.socket.onopen = null;
this.socket.onerror = null;
this.socket.onclose = null;
}

@@ -109,6 +109,2 @@

// Fall back to supporting old browserchannel 1.x API which implemented the
// websocket API incorrectly. This will be removed at some point
if (!data) data = msg;
// Some transports don't need parsing.

@@ -121,3 +117,3 @@ if (typeof data === 'string') data = JSON.parse(data);

t: (new Date()).toTimeString(),
recv:JSON.stringify(data)
recv: JSON.stringify(data)
});

@@ -162,3 +158,2 @@ while (connection.messageBuffer.length > 100) {

/**

@@ -175,3 +170,3 @@ * @param {object} msg

// ID.
if (msg.protocol !== 0) throw new Error('Invalid protocol version');
if (msg.protocol !== 1) throw new Error('Invalid protocol version');
if (typeof msg.id != 'string') throw new Error('Invalid client id');

@@ -181,43 +176,73 @@

this._setState('connected');
break;
return;
case 'qfetch':
case 'qsub':
case 'qf':
var query = this.queries[msg.id];
if (query) query._handleFetch(msg.error, msg.data, msg.extra);
return;
case 'qs':
var query = this.queries[msg.id];
if (query) query._handleSubscribe(msg.error, msg.data, msg.extra);
return;
case 'qu':
// Queries are removed immediately on calls to destroy, so we ignore
// replies to query unsubscribes. Perhaps there should be a callback for
// destroy, but this is currently unimplemented
return;
case 'q':
case 'qunsub':
// Query message. Pass this to the appropriate query object.
var query = this.queries[msg.id];
if (query) query._onMessage(msg);
break;
if (query) query._handleDiff(msg.error, msg.diff);
return;
case 'bf':
return this._handleBulkMessage(msg, '_handleFetch');
case 'bs':
// Bulk subscribe response. The responses for each document are contained within.
var result = msg.s;
for (var cName in result) {
for (var docName in result[cName]) {
var doc = this.get(cName, docName);
if (!doc) {
console.warn('Message for unknown doc. Ignoring.', msg);
break;
}
return this._handleBulkMessage(msg, '_handleSubscribe');
case 'bu':
return this._handleBulkMessage(msg, '_handleUnsubscribe');
var msg = result[cName][docName];
if (typeof msg === 'object') {
doc._handleSubscribe(msg.error, msg);
} else {
// The msg will be true if we simply resubscribed.
doc._handleSubscribe(null, null);
}
}
}
break;
case 'f':
var doc = this.getExisting(msg.c, msg.d);
if (doc) doc._handleFetch(msg.error, msg.data);
return;
case 's':
var doc = this.getExisting(msg.c, msg.d);
if (doc) doc._handleSubscribe(msg.error, msg.data);
return;
case 'u':
var doc = this.getExisting(msg.c, msg.d);
if (doc) doc._handleUnsubscribe(msg.error);
return;
case 'op':
var doc = this.getExisting(msg.c, msg.d);
if (doc) doc._handleOp(msg.error, msg);
return;
default:
// Document message. Pull out the referenced document and forward the
// message.
var doc = this.getExisting(msg.c, msg.d);
if (doc) doc._onMessage(msg);
console.warn('Ignorning unrecognized message', msg);
}
};
Connection.prototype._handleBulkMessage = function(msg, method) {
if (msg.data) {
for (var id in msg.data) {
var doc = this.getExisting(msg.c, id);
if (doc) doc[method](msg.error, msg.data[id]);
}
} else if (Array.isArray(msg.b)) {
for (var i = 0; i < msg.b.length; i++) {
var id = msg.b[i];
var doc = this.getExisting(msg.c, id);
if (doc) doc[method](msg.error);
}
} else if (msg.b) {
for (var id in msg.b) {
var doc = this.getExisting(msg.c, id);
if (doc) doc[method](msg.error);
}
} else {
console.error('Invalid bulk message', msg);
}
};

@@ -240,6 +265,6 @@ Connection.prototype.reset = function() {

this._retryInterval = setInterval(function() {
for (var collectionName in connection.collections) {
var collection = connection.collections[collectionName];
for (var docName in collection) {
collection[docName].retry();
for (var collection in connection.collections) {
var docs = connection.collections[collection];
for (var id in docs) {
docs[id].retry();
}

@@ -252,3 +277,3 @@ }

// Set the connection's state. The connection is basically a state machine.
Connection.prototype._setState = function(newState, data) {
Connection.prototype._setState = function(newState, reason) {
if (this.state === newState) return;

@@ -274,45 +299,55 @@

this.emit(newState, data);
this.emit(newState, reason);
// Group all subscribes together to help server make more efficient calls
this.bsStart();
this.startBulk();
// Emit the event to all queries
for (var id in this.queries) {
var query = this.queries[id];
query._onConnectionStateChanged(newState, data);
query._onConnectionStateChanged();
}
// Emit the event to all documents
for (var c in this.collections) {
var collection = this.collections[c];
for (var docName in collection) {
collection[docName]._onConnectionStateChanged(newState, data);
for (var collection in this.collections) {
var docs = this.collections[collection];
for (var id in docs) {
docs[id]._onConnectionStateChanged();
}
}
this.bsEnd();
this.endBulk();
};
Connection.prototype.bsStart = function() {
this.subscribeData = this.subscribeData || {};
Connection.prototype.startBulk = function() {
if (!this.bulk) this.bulk = {};
};
Connection.prototype.bsEnd = function() {
// Only send bulk subscribe if not empty
if (util.hasKeys(this.subscribeData)) {
this.send({a:'bs', s:this.subscribeData});
Connection.prototype.endBulk = function() {
if (this.bulk) {
for (var collection in this.bulk) {
var actions = this.bulk[collection];
if (actions.fetch) this.send({a: 'bf', c: collection, b: actions.fetch});
if (actions.fetchFrom) this.send({a: 'bf', c: collection, b: actions.fetchFrom});
if (actions.subscribe) this.send({a: 'bs', c: collection, b: actions.subscribe});
if (actions.subscribeFrom) this.send({a: 'bs', c: collection, b: actions.subscribeFrom});
if (actions.unsubscribe) this.send({a: 'bu', c: collection, b: actions.unsubscribe});
}
}
this.subscribeData = null;
this.bulk = null;
};
Connection.prototype.sendSubscribe = function(doc, version) {
Connection.prototype.sendSubscribe = function(doc) {
// Ensure the doc is registered so that it receives the reply message
this._addDoc(doc);
if (this.subscribeData) {
if (this.bulk) {
// Bulk subscribe
var data = this.subscribeData;
if (!data[doc.collection]) data[doc.collection] = {};
data[doc.collection][doc.name] = version || null;
var actions = this.bulk[doc.collection] || (this.bulk[doc.collection] = {});
if (doc.version == null) {
var subscribe = actions.subscribe || (actions.subscribe = []);
subscribe.push(doc.id);
} else {
var subscribeFrom = actions.subscribeFrom || (actions.subscribeFrom = {});
subscribeFrom[doc.id] = doc.version;
}
} else {
// Send single subscribe message
var msg = {a: 'sub', c: doc.collection, d: doc.name};
if (version != null) msg.v = version;
// Send single doc subscribe message
var msg = {a: 's', c: doc.collection, d: doc.id, v: doc.version};
this.send(msg);

@@ -322,8 +357,20 @@ }

Connection.prototype.sendFetch = function(doc, version) {
Connection.prototype.sendFetch = function(doc) {
// Ensure the doc is registered so that it receives the reply message
this._addDoc(doc);
var msg = {a: 'fetch', c: doc.collection, d: doc.name};
if (version != null) msg.v = version;
this.send(msg);
if (this.bulk) {
// Bulk fetch
var actions = this.bulk[doc.collection] || (this.bulk[doc.collection] = {});
if (doc.version == null) {
var fetch = actions.fetch || (actions.fetch = []);
fetch.push(doc.id);
} else {
var fetchFrom = actions.fetchFrom || (actions.fetchFrom = {});
fetchFrom[doc.id] = doc.version;
}
} else {
// Send single doc fetch message
var msg = {a: 'f', c: doc.collection, d: doc.id, v: doc.version};
this.send(msg);
}
};

@@ -334,4 +381,12 @@

this._addDoc(doc);
var msg = {a: 'unsub', c: doc.collection, d: doc.name};
this.send(msg);
if (this.bulk) {
// Bulk unsubscribe
var actions = this.bulk[doc.collection] || (this.bulk[doc.collection] = {});
var unsubscribe = actions.unsubscribe || (actions.unsubscribe = []);
unsubscribe.push(doc.id);
} else {
// Send single doc unsubscribe message
var msg = {a: 'u', c: doc.collection, d: doc.id};
this.send(msg);
}
};

@@ -345,3 +400,3 @@

c: doc.collection,
d: doc.name,
d: doc.id,
v: doc.version,

@@ -362,3 +417,3 @@ src: data.src,

Connection.prototype.send = function(msg) {
if (this.debug) console.log("SEND", JSON.stringify(msg));
if (this.debug) console.log('SEND', JSON.stringify(msg));

@@ -384,4 +439,4 @@ this.messageBuffer.push({t:Date.now(), send:JSON.stringify(msg)});

Connection.prototype.getExisting = function(collection, name) {
if (this.collections[collection]) return this.collections[collection][name];
Connection.prototype.getExisting = function(collection, id) {
if (this.collections[collection]) return this.collections[collection][id];
};

@@ -394,13 +449,13 @@

* @param collection
* @param name
* @param id
* @param [data] ingested into document if created
* @return {Doc}
*/
Connection.prototype.get = function(collection, name, data) {
var collectionObject = this.collections[collection] ||
Connection.prototype.get = function(collection, id, data) {
var docs = this.collections[collection] ||
(this.collections[collection] = {});
var doc = collectionObject[name];
var doc = docs[id];
if (!doc) {
doc = collectionObject[name] = new Doc(this, collection, name);
doc = docs[id] = new Doc(this, collection, id);
this.emit('doc', doc);

@@ -428,6 +483,6 @@ }

Connection.prototype._destroyDoc = function(doc) {
var collectionObject = this.collections[doc.collection];
if (!collectionObject) return;
var docs = this.collections[doc.collection];
if (!docs) return;
delete collectionObject[doc.name];
delete docs[doc.id];

@@ -437,3 +492,3 @@ // Delete the collection container if its empty. This could be a source of

// won't do anyway, but whatever.
if (!util.hasKeys(collectionObject)) {
if (!util.hasKeys(docs)) {
delete this.collections[doc.collection];

@@ -444,8 +499,8 @@ }

Connection.prototype._addDoc = function(doc) {
var collectionObject = this.collections[doc.collection];
if (!collectionObject) {
collectionObject = this.collections[doc.collection] = {};
var docs = this.collections[doc.collection];
if (!docs) {
docs = this.collections[doc.collection] = {};
}
if (collectionObject[doc.name] !== doc) {
collectionObject[doc.name] = doc;
if (docs[doc.id] !== doc) {
docs[doc.id] = doc;
}

@@ -455,12 +510,9 @@ };

// Helper for createFetchQuery and createSubscribeQuery, below.
Connection.prototype._createQuery = function(type, collection, q, options, callback) {
if (type !== 'fetch' && type !== 'sub') {
throw new Error('Invalid query type: ' + type);
}
Connection.prototype._createQuery = function(action, collection, q, options, callback) {
if (!options) options = {};
var id = this.nextQueryId++;
var query = new Query(type, this, id, collection, q, options, callback);
var query = new Query(action, this, id, collection, q, options, callback);
this.queries[id] = query;
query._execute();
query.send();
return query;

@@ -487,3 +539,3 @@ };

Connection.prototype.createFetchQuery = function(index, q, options, callback) {
return this._createQuery('fetch', index, q, options, callback);
return this._createQuery('qf', index, q, options, callback);
};

@@ -498,3 +550,70 @@

Connection.prototype.createSubscribeQuery = function(index, q, options, callback) {
return this._createQuery('sub', index, q, options, callback);
return this._createQuery('qs', index, q, options, callback);
};
Connection.prototype.hasPending = function() {
return !!(
this._firstDoc(hasPending) ||
this._firstQuery(callbackPending)
);
};
function hasPending(doc) {
return doc.hasPending();
}
function callbackPending(query) {
return query.callback;
}
Connection.prototype.hasWritePending = function() {
return !!this._firstDoc(hasWritePending);
};
function hasWritePending(doc) {
return doc.hasWritePending();
}
Connection.prototype.whenNothingPending = function(callback) {
var doc = this._firstDoc(hasPending);
if (doc) {
// If a document is found with a pending operation, wait for it to emit
// that nothing is pending anymore, and then recheck all documents again.
// We have to recheck all documents, just in case another mutation has
// been made in the meantime as a result of an event callback
doc.once('nothing pending', this._nothingPendingRetry(callback));
return;
}
var query = this._firstQuery(callbackPending);
if (query) {
query.once('ready', this._nothingPendingRetry(callback));
}
// Call back when no pending operations
process.nextTick(callback);
};
Connection.prototype._nothingPendingRetry = function(callback) {
var connection = this;
return function() {
process.nextTick(function() {
connection.whenNothingPending(callback);
});
};
};
Connection.prototype._firstDoc = function(fn) {
for (var collection in this.collections) {
var docs = this.collections[collection];
for (var id in docs) {
var doc = docs[id];
if (fn(doc)) {
return doc;
}
}
}
};
Connection.prototype._firstQuery = function(fn) {
for (var id in this.queries) {
var query = this.queries[id];
if (fn(query)) {
return query;
}
}
};

@@ -7,3 +7,3 @@ var types = require('../types').map;

*
* It is is uniquely identified by its `name` and `collection`. Documents
* It is is uniquely identified by its `id` and `collection`. Documents
* should not be created directly. Create them with Connection.get()

@@ -17,3 +17,2 @@ *

* doc.subscribe(function(error) {
* doc.state // = 'ready'
* doc.subscribed // = true

@@ -48,4 +47,3 @@ * })

* locked state which only allows reading operations.
* - `subscribed (error)` The document was subscribed
* - `created ()` The document was created. That means its type was
* - `create ()` The document was created. That means its type was
* set and it has some initial data.

@@ -55,3 +53,2 @@ * - `del (snapshot)` Fired after the document is deleted, that is

* arguments
* - `error`
*

@@ -61,3 +58,3 @@ */

module.exports = Doc;
function Doc(connection, collection, name) {
function Doc(connection, collection, id) {
emitter.EventEmitter.call(this);

@@ -68,3 +65,3 @@

this.collection = collection;
this.name = name;
this.id = id;

@@ -74,35 +71,13 @@ this.version = this.type = null;

// **** State in document:
// Array of callbacks or nulls as placeholders
this.inflightFetch = [];
this.inflightSubscribe = [];
this.inflightUnsubscribe = [];
this.pendingFetch = [];
// The action the document tries to perform with the server
//
// - subscribe
// - unsubscribe
// - fetch
// - submit: send an operation
this.action = null;
// The data the document object stores can be in one of the following three states:
// - No data. (null) We honestly don't know whats going on.
// - Floating ('floating'): we have a locally created document that hasn't
// been created on the server yet)
// - Live ('ready') (we have data thats current on the server at some version).
this.state = null;
// Our subscription status. Either we're subscribed on the server, or we aren't.
// Whether we think we are subscribed on the server
this.subscribed = false;
// Either we want to be subscribed (true), we want a new snapshot from the
// server ('fetch'), or we don't care (false). This is also used when we
// disconnect & reconnect to decide what to do.
// Whether to re-establish the subscription on reconnect
this.wantSubscribe = false;
// This list is used for subscribe and unsubscribe, since we'll only want to
// do one thing at a time.
this._subscribeCallbacks = [];
// *** end state stuff.
// This doesn't provide any standard API access right now.
this.provides = {};
// The op that is currently roundtripping to the server, or null.

@@ -112,7 +87,6 @@ //

//
// This has the same format as an entry in pendingData, which is:
// {[create:{...}], [del:true], [op:...], callbacks:[...], src:, seq:}
this.inflightData = null;
// This has the same format as an entry in pendingOps
this.inflightOp = null;
// All ops that are waiting for the server to acknowledge this.inflightData
// All ops that are waiting for the server to acknowledge this.inflightOp
// This used to just be a single operation, but creates & deletes can't be

@@ -122,3 +96,3 @@ // composed with regular operations.

// This is a list of {[create:{...}], [del:true], [op:...], callbacks:[...]}
this.pendingData = [];
this.pendingOps = [];

@@ -130,21 +104,13 @@ // The OT type of this document.

// For debouncing getLatestOps calls
this._getLatestTimeout = null;
// Prevents submitOp from accepting operations
this.locked = false;
}
emitter.mixin(Doc);
/**
* Unsubscribe
*/
Doc.prototype.destroy = function(callback) {
var doc = this;
this.unsubscribe(function() {
// Don't care if there's an error unsubscribing.
if (doc.hasPending()) {
doc.once('nothing pending', function() {
doc.connection._destroyDoc(doc);
});
} else {
doc.connection._destroyDoc(doc);
doc.whenNothingPending(function() {
doc.connection._destroyDoc(doc);
if (doc.wantSubscribe) {
return doc.unsubscribe(callback);
}

@@ -165,3 +131,3 @@ if (callback) callback();

if (typeof newType === 'string') {
if (!types[newType]) throw new Error('Missing type ' + newType + ' ' + this.collection + ' ' + this.name);
if (!types[newType]) throw new Error('Missing type ' + newType);
newType = types[newType];

@@ -175,11 +141,7 @@ }

if (!newType) {
this.provides = {};
this.snapshot = undefined;
} else if (newType.api) {
// Register the new type's API.
this.provides = newType.api.provides;
}
};
// Injest snapshot data. This data must include a version, snapshot and type.
// Ingest snapshot data. This data must include a version, snapshot and type.
// This is used both to ingest data that was exported with a webpage and data

@@ -191,47 +153,41 @@ // that was received from the server during a fetch.

// @param data.type
// @fires ready
Doc.prototype.ingestData = function(data) {
// Ignore if the document is already created
if (this.type) return;
if (typeof data.v !== 'number') {
throw new Error('Missing version in ingested data ' + this.collection + ' ' + this.name);
throw new Error('Missing version in ingested data ' + this.collection + ' ' + this.id);
}
if (this.state) {
// Silently ignore if doc snapshot version is equal or newer
// TODO: Investigate whether this should happen in practice or not
if (this.version >= data.v) return;
console.warn('Ignoring ingest data for', this.collection, this.name,
'\n in state:', this.state, '\n version:', this.version,
'\n snapshot:\n', this.snapshot, '\n incoming data:\n', data);
return;
}
this.version = data.v;
// data.data is what the server will actually send. data.snapshot is the old
// field name - supported now for backwards compatibility.
this.snapshot = data.data;
this._setType(data.type);
this.state = 'ready';
this.emit('ready');
};
// Get and return the current document snapshot.
Doc.prototype.getSnapshot = function() {
return this.snapshot;
};
// The callback will be called at a time when the document has a snapshot and
// you can start applying operations. This may be immediately.
Doc.prototype.whenReady = function(fn) {
if (this.state === 'ready') {
fn();
} else {
this.once('ready', fn);
Doc.prototype.whenNothingPending = function(callback) {
if (this.hasPending()) {
this.once('nothing pending', callback);
return;
}
callback();
};
Doc.prototype.hasPending = function() {
return this.action != null || this.inflightData != null || !!this.pendingData.length;
return !!(
this.inflightOp ||
this.pendingOps.length ||
this.inflightFetch.length ||
this.inflightSubscribe.length ||
this.inflightUnsubscribe.length ||
this.pendingFetch.length
);
};
Doc.prototype.hasWritePending = function() {
return !!(this.inflightOp || this.pendingOps.length);
};
Doc.prototype._emitNothingPending = function() {
if (this.hasWritePending()) return;
this.emit('no write pending');
if (this.hasPending()) return;

@@ -241,13 +197,21 @@ this.emit('nothing pending');

// **** Helpers for network messages
// This function exists so connection can call it directly for bulk subscribes.
// It could just make a temporary object literal, thats pretty slow.
Doc.prototype._handleFetch = function(err, data) {
var callback = this.inflightFetch.shift();
if (err) {
callback && callback(err);
this._emitNothingPending();
return;
}
if (data) this.ingestData(data);
callback && callback();
this._emitNothingPending();
};
Doc.prototype._handleSubscribe = function(err, data) {
if (err && err !== 'Already subscribed') {
console.error('Could not subscribe:', err, this.collection, this.name);
this.emit('error', err);
// There's probably a reason we couldn't subscribe. Don't retry.
this._setWantSubscribe(false, null, err);
var callback = this.inflightSubscribe.shift();
if (err) {
callback && callback(err);
this._emitNothingPending();
return;

@@ -257,128 +221,58 @@ }

this.subscribed = true;
this._clearAction();
this.emit('subscribe');
this._finishSub();
callback && callback();
this._emitNothingPending();
};
// This is called by the connection when it receives a message for the document.
Doc.prototype._onMessage = function(msg) {
if (msg.c !== this.collection || msg.d !== this.name) {
// This should never happen - its a sanity check for bugs in the connection code.
throw new Error('Got message for wrong document');
Doc.prototype._handleUnsubscribe = function(err) {
var callback = this.inflightUnsubscribe.shift();
if (err) {
callback && callback(err);
this._emitNothingPending();
return;
}
this.subscribed = false;
callback && callback();
this._emitNothingPending();
};
// msg.a = the action.
switch (msg.a) {
case 'fetch':
// We're done fetching. This message has no other information.
if (msg.data) this.ingestData(msg.data);
if (this.wantSubscribe === 'fetch') this.wantSubscribe = false;
this._clearAction();
this._finishSub(msg.error);
return;
Doc.prototype._handleOp = function(err, msg) {
if (this.inflightOp && err) {
this._rollback(err);
return;
}
case 'sub':
// Subscribe reply.
this._handleSubscribe(msg.error, msg.data);
return;
if (this.inflightOp &&
msg.src === this.inflightOp.src &&
msg.seq === this.inflightOp.seq) {
// The op has already been applied locally. Just update the version
// and pending state appropriately
this._opAcknowledged(msg);
return;
}
case 'unsub':
// Unsubscribe reply
this.subscribed = false;
this.emit('unsubscribe');
if (this.version == null || msg.v > this.version) {
// This will happen in normal operation if we become subscribed to a
// new document via a query. It can also happen if we get an op for
// a future version beyond the version we are expecting next. This
// could happen if the server doesn't publish an op for whatever reason
// or because of a race condition. In any case, we can send a fetch
// command to catch back up.
if (this.inflightFetch.length || this.inflightSubscribe.length) return;
this.fetch();
return;
}
this._clearAction();
this._finishSub(msg.error);
return;
if (msg.v < this.version) {
// We can safely ignore the old (duplicate) operation.
return;
}
case 'op':
if (msg.error) {
// The server has rejected an op from the client for an unexpected reason.
// We'll send the error message to the user and try to roll back the change.
if (this.inflightData) {
console.warn('Operation was rejected (' + msg.error + '). Trying to rollback change locally.');
this._tryRollback(this.inflightData);
this._clearInflightOp(msg.error);
} else {
// I managed to get into this state once. I'm not sure how it happened.
// The op was maybe double-acknowledged?
console.warn('Second acknowledgement message (error) received', msg, this);
}
return;
}
if (this.inflightOp) transformX(this.inflightOp, msg);
if (this.inflightData &&
msg.src === this.inflightData.src &&
msg.seq === this.inflightData.seq) {
// The op has already been applied locally. Just update the version
// and pending state appropriately
this._opAcknowledged(msg);
return;
}
if (this.version == null || msg.v > this.version) {
// This will happen in normal operation if we become subscribed to a
// new document via a query. It can also happen if we get an op for
// a future version beyond the version we are expecting next. This
// could happen if the server doesn't publish an op for whatever reason
// or because of a race condition. In any case, we can send a fetch
// command to catch back up.
this._getLatestOps();
return;
}
if (msg.v < this.version) {
// This will happen naturally in the following (or similar) cases:
//
// Client is not subscribed to document.
// -> client submits an operation (v=10)
// -> client subscribes to a query which matches this document. Says we
// have v=10 of the doc.
//
// <- server acknowledges the operation (v=11). Server acknowledges the
// operation because the doc isn't subscribed
// <- server processes the query, which says the client only has v=10.
// Server subscribes at v=10 not v=11, so we get another copy of the
// v=10 operation.
//
// In this case, we can safely ignore the old (duplicate) operation.
return;
}
if (this.inflightData) xf(this.inflightData, msg);
for (var i = 0; i < this.pendingData.length; i++) {
xf(this.pendingData[i], msg);
}
this.version++;
this._otApply(msg, false);
return;
default:
console.warn('Unhandled document message:', msg);
for (var i = 0; i < this.pendingOps.length; i++) {
transformX(this.pendingOps[i], msg);
}
};
Doc.prototype._getLatestOps = function() {
var doc = this;
var debounced = false;
if (doc._getLatestTimeout) {
debounced = true;
} else {
// Send a fetch command, which will get us the missing ops to catch back up
// or the full doc if our version is currently null
doc.connection.sendFetch(doc, doc.version);
}
// Debounce calls, since we are likely to get multiple future operations
// in a rapid sequence
clearTimeout(doc._getLatestTimeout);
doc._getLatestTimeout = setTimeout(function() {
doc._getLatestTimeout = null;
// Send another fetch at the end of the final timeout interval if we were
// debounced to make sure we didn't miss anything
if (debounced) {
doc.connection.sendFetch(doc, doc.version);
}
}, 5000);
this.version++;
this._otApply(msg, false);
return;

@@ -392,119 +286,105 @@ };

this.flush();
this._resubscribe();
} else {
this.subscribed = false;
this._clearAction();
if (this.inflightFetch.length || this.inflightSubscribe.length) {
this.pendingFetch = this.pendingFetch.concat(this.inflightFetch, this.inflightSubscribe);
this.inflightFetch.length = 0;
this.inflightSubscribe.length = 0;
}
if (this.inflightUnsubscribe.length) {
var callbacks = this.inflightUnsubscribe;
this.inflightUnsubscribe = [];
callEach(callbacks);
}
}
};
Doc.prototype._clearAction = function() {
this.action = null;
this.flush();
this._emitNothingPending();
};
Doc.prototype._resubscribe = function() {
var callbacks = this.pendingFetch;
this.pendingFetch = [];
// Send the next pending op to the server, if we can.
//
// Only one operation can be in-flight at a time. If an operation is already on
// its way, or we're not currently connected, this method does nothing.
Doc.prototype.flush = function() {
// Ignore if we can't send or we are already sending an op
if (!this.connection.canSend || this.inflightData) return;
// Pump and dump any no-ops from the front of the pending op list.
var op;
while (this.pendingData.length && isNoOp(op = this.pendingData[0])) {
var callbacks = op.callbacks;
for (var i = 0; i < callbacks.length; i++) {
callbacks[i](op.error);
if (this.wantSubscribe) {
if (callbacks.length) {
this.subscribe(function(err) {
callEach(callbacks, err);
});
return;
}
this.pendingData.shift();
}
// Send first pending op unless paused
if (!this.paused && this.pendingData.length) {
this._sendOp();
if (this.subscribed || this.inflightSubscribe.length) return;
this.subscribe();
return;
}
// Ignore if an action is already in process
if (this.action) return;
// Once all ops are sent, perform subscriptions and fetches
var version = (this.state === 'ready') ? this.version : null;
if (this.subscribed && !this.wantSubscribe) {
this.action = 'unsubscribe';
this.connection.sendUnsubscribe(this);
} else if (!this.subscribed && this.wantSubscribe === 'fetch') {
this.action = 'fetch';
this.connection.sendFetch(this, version);
} else if (!this.subscribed && this.wantSubscribe) {
this.action = 'subscribe';
this.connection.sendSubscribe(this, version);
if (callbacks.length) {
this.fetch(function(err) {
callEach(callbacks, err);
});
}
};
// ****** Subscribing, unsubscribing and fetching
// Value is true, false or 'fetch'.
Doc.prototype._setWantSubscribe = function(value, callback, err) {
if (this.subscribed === this.wantSubscribe &&
(this.subscribed === value || value === 'fetch' && this.subscribed)) {
if (callback) callback(err);
// Fetch the initial document and keep receiving updates
Doc.prototype.subscribe = function(callback) {
this.wantSubscribe = true;
if (this.connection.canSend) {
this.inflightSubscribe.push(callback);
this.connection.sendSubscribe(this);
return;
}
// If we want to subscribe, don't weaken it to a fetch.
if (value !== 'fetch' || this.wantSubscribe !== true) {
this.wantSubscribe = value;
}
if (callback) this._subscribeCallbacks.push(callback);
this.flush();
if (callback) this.pendingFetch.push(callback);
};
// Open the document. There is no callback and no error handling if you're
// already connected.
//
// Only call this once per document.
Doc.prototype.subscribe = function(callback) {
this._setWantSubscribe(true, callback);
};
// Unsubscribe. The data will stay around in local memory, but we'll stop
// receiving updates.
// receiving updates
Doc.prototype.unsubscribe = function(callback) {
this._setWantSubscribe(false, callback);
this.wantSubscribe = false;
if (this.connection.canSend) {
this.inflightUnsubscribe.push(callback);
this.connection.sendUnsubscribe(this);
return;
}
if (callback) process.nextTick(callback);
};
// Call to request fresh data from the server.
// Request the current document snapshot or ops that bring us up to date
Doc.prototype.fetch = function(callback) {
this._setWantSubscribe('fetch', callback);
};
// Called when our subscribe, fetch or unsubscribe messages are acknowledged.
Doc.prototype._finishSub = function(err) {
if (!this._subscribeCallbacks.length) return;
for (var i = 0; i < this._subscribeCallbacks.length; i++) {
this._subscribeCallbacks[i](err);
if (this.connection.canSend) {
this.inflightFetch.push(callback);
this.connection.sendFetch(this);
return;
}
this._subscribeCallbacks.length = 0;
if (callback) this.pendingFetch.push(callback);
};
// Operations
// Operations //
// Send the next pending op to the server, if we can.
//
// Only one operation can be in-flight at a time. If an operation is already on
// its way, or we're not currently connected, this method does nothing.
Doc.prototype.flush = function() {
// Ignore if we can't send or we are already sending an op
if (!this.connection.canSend || this.inflightOp) return;
// ************ Dealing with operations.
// Clear any no-ops from the front of the pending op list.
while (this.pendingOps.length && isNoOp(this.pendingOps[0])) {
var op = this.pendingOps.shift();
callEach(op.callbacks);
}
// Send first pending op unless paused
if (!this.paused && this.pendingOps.length) {
this._sendOp();
}
};
// Helper function to set op to contain a no-op.
var setNoOp = function(op) {
function setNoOp(op) {
delete op.op;
delete op.create;
delete op.del;
};
}
var isNoOp = function(op) {
function isNoOp(op) {
return !op.op && !op.create && !op.del;

@@ -514,15 +394,11 @@ }

// Try to compose data2 into data1. Returns truthy if it succeeds, otherwise falsy.
var tryCompose = function(type, data1, data2) {
if (data1.create && data2.del) {
setNoOp(data1);
} else if (data1.create && data2.op) {
// Compose the data into the create data.
var data = (data1.create.data === undefined) ? type.create() : data1.create.data;
data1.create.data = type.apply(data, data2.op);
} else if (isNoOp(data1)) {
function tryCompose(type, data1, data2) {
if (data1.create && data2.op) {
data1.create.data = type.apply(data1.create.data, data2.op);
} else if (data1.op && data2.op && type.compose) {
data1.op = type.compose(data1.op, data2.op);
} else if (data2.del || isNoOp(data1)) {
data1.create = data2.create;
data1.del = data2.del;
data1.op = data2.op;
} else if (data1.op && data2.op && type.compose) {
data1.op = type.compose(data1.op, data2.op);
} else {

@@ -532,6 +408,6 @@ return false;

return true;
};
}
// Transform server op data by a client op, and vice versa. Ops are edited in place.
var xf = function(client, server) {
function transformX(client, server) {
// In this case, we're in for some fun. There are some local operations

@@ -545,3 +421,3 @@ // which are totally invalid - either the client continued editing a

if (server.create || server.del) return setNoOp(client);
if (client.create) throw new Error('Invalid state. This is a bug. ' + this.collection + ' ' + this.name);
if (client.create) throw new Error('Invalid state. This is a bug. ' + this.collection + ' ' + this.id);

@@ -552,7 +428,5 @@ // The client has deleted the document while the server edited it. Kill the

var clientEdit = client.op;
var serverEdit = server.op;
// We only get here if either the server or client ops are no-op. Carry on,
// nothing to see here.
if (!serverEdit || !clientEdit) return;
if (!server.op || !client.op) return;

@@ -566,11 +440,6 @@ // They both edited the document. This is the normal case for this function -

// op data, we make sure the right type has its transform function called.
if (client.type.transformX) {
var result = client.type.transformX(clientEdit, serverEdit);
client.op = result[0];
server.op = result[1];
} else {
client.op = client.type.transform(clientEdit, serverEdit, 'left');
server.op = client.type.transform(serverEdit, clientEdit, 'right');
}
};
var result = client.type.transformX(client.op, server.op);
client.op = result[0];
server.op = result[1];
}

@@ -590,33 +459,10 @@ /**

Doc.prototype._otApply = function(op, context) {
this.locked = true;
if (op.create) {
// If the type is currently set, it means we tried creating the document
// and someone else won. client create x server create = server create.
var create = op.create;
this._setType(create.type);
this.snapshot = this.type.create(create.data);
// This is a bit heavyweight, but I want the created event to fire outside of the lock.
this.once('unlock', function() {
this.emit('create', context);
});
} else if (op.del) {
// The type should always exist in this case. del x _ = del
var oldSnapshot = this.snapshot;
this._setType(null);
this.once('unlock', function() {
this.emit('del', oldSnapshot, context);
});
} else if (op.op) {
if (!this.type) throw new Error('Document does not exist. ' + this.collection + ' ' + this.name);
if (op.op) {
if (!this.type) throw new Error('Cannot apply op to uncreated document ' + this.collection + '.' + this.id);
var type = this.type;
// This exists so clients can pull any necessary data out of the snapshot
// before it gets changed.
this.emit('before op', op.op, context);
// This exists so clients can pull any necessary data out of the snapshot
// before it gets changed. Previously we kept the old snapshot object and
// passed it to the op event handler. However, apply no longer guarantees
// the old object is still valid.
//
// Because this could be totally unnecessary work, its behind a flag. set

@@ -626,2 +472,3 @@ // doc.incremental to enable.

var doc = this;
this.locked = true;
type.incrementalApply(this.snapshot, op.op, function(component, snapshot) {

@@ -631,2 +478,3 @@ doc.snapshot = snapshot;

});
this.locked = false;
} else {

@@ -637,12 +485,22 @@ // This is the default case, simply applying the operation to the local snapshot.

}
this.emit('after op', op.op, context);
return;
}
// Its possible for none of the above cases to match, in which case the op is
// a no-op. This will happen when a document has been deleted locally and
// remote ops edit the document.
this.locked = false;
this.emit('unlock');
if (op.create) {
// If the type is currently set, it means we tried creating the document
// and someone else won. client create x server create = server create.
this._setType(op.create.type);
this.snapshot = this.type.create(op.create.data);
this.emit('create', context);
return;
}
if (op.op) {
this.emit('after op', op.op);
if (op.del) {
// The type should always exist in this case. del x _ = del
var oldSnapshot = this.snapshot;
this._setType(null);
this.emit('del', oldSnapshot, context);
return;
}

@@ -655,5 +513,5 @@ };

Doc.prototype.retry = function() {
if (!this.inflightData) return;
var threshold = 5000 * Math.pow(2, this.inflightData.retries);
if (this.inflightData.sentAt < Date.now() - threshold) {
if (!this.inflightOp) return;
var threshold = 5000 * Math.pow(2, this.inflightOp.retries);
if (this.inflightOp.sentAt < Date.now() - threshold) {
this.connection.emit('retry', this);

@@ -670,9 +528,9 @@ this._sendOp();

// When there is no inflightData, send the first item in pendingData. If
// there is inflightData, try sending it again
if (!this.inflightData) {
// When there is no inflightOp, send the first item in pendingOps. If
// there is inflightOp, try sending it again
if (!this.inflightOp) {
// Send first pending op
this.inflightData = this.pendingData.shift();
this.inflightOp = this.pendingOps.shift();
}
var data = this.inflightData;
var data = this.inflightOp;
if (!data) {

@@ -688,3 +546,3 @@ throw new Error('no data to send on call to _sendOp');

// is used on the server to detect when ops have been sent multiple times and
// on the client to match acknowledgement of an op back to the inflightData.
// on the client to match acknowledgement of an op back to the inflightOp.
// Note that the src could be different from this.connection.id after a

@@ -699,3 +557,3 @@ // reconnect, since an op may still be pending after the reconnection and

// src isn't needed on the first try, since the server session will have the
// same id, but it must be set on the inflightData in case it is sent again
// same id, but it must be set on the inflightOp in case it is sent again
// after a reconnect and the connection's id has changed by then

@@ -725,3 +583,3 @@ if (data.src == null) data.src = src;

if (this.locked) {
var err = new Error('Cannot call submitOp from inside an op event handler. ' + this.collection + ' ' + this.name);
var err = new Error('Cannot call submitOp from inside an op event handler. ' + this.collection + ' ' + this.id);
if (callback) return callback(err);

@@ -742,6 +600,2 @@ throw err;

if (!this.state) {
this.state = 'floating';
}
op.type = this.type;

@@ -753,3 +607,3 @@ op.callbacks = [];

var operation;
var previous = this.pendingData[this.pendingData.length - 1];
var previous = this.pendingOps[this.pendingOps.length - 1];

@@ -760,3 +614,3 @@ if (previous && tryCompose(this.type, previous, op)) {

operation = op;
this.pendingData.push(op);
this.pendingOps.push(op);
}

@@ -767,9 +621,8 @@ if (callback) operation.callbacks.push(callback);

// The call to flush is in a timeout so if submitOp() is called multiple
// times in a closure all the ops are combined before being sent to the
// server. It doesn't matter if flush is called a bunch of times.
// The call to flush is delayed so if submitOp() is called multiple times
// synchronously, all the ops are combined before being sent to the server.
var doc = this;
setTimeout(function() {
process.nextTick(function() {
doc.flush();
}, 0);
});
};

@@ -840,27 +693,34 @@

// This is called when the server acknowledges an operation from the client.
Doc.prototype._opAcknowledged = function(msg) {
if (this.inflightOp.create) {
this.version = msg.v;
// This will be called when the server rejects our operations for some reason.
// There's not much we can do here if the OT type is noninvertable, but that
// shouldn't happen too much in real life because readonly documents should be
// flagged as such. (I should probably figure out a flag for that).
//
// This does NOT get called if our op fails to reach the server for some reason
// - we optimistically assume it'll make it there eventually.
Doc.prototype._tryRollback = function(op) {
// This is probably horribly broken.
if (op.create) {
this._setType(null);
} else if (msg.v !== this.version) {
// We should already be at the same version, because the server should
// have sent all the ops that have happened before acknowledging our op
console.warn('Invalid version from server. Expected: ' + this.version + ' Received: ' + msg.v, msg);
// I don't think its possible to get here if we aren't in a floating state.
if (this.state === 'floating')
this.state = null;
else
console.warn('Rollback a create from state ' + this.state);
// Fetching should get us back to a working document state
return this.fetch();
}
} else if (op.op && op.type.invert) {
// The op was committed successfully. Increment the version number
this.version++;
this._clearInflightOp();
};
// This will be called when the server rejects our operation for some reason.
Doc.prototype._rollback = function(err) {
// The server has rejected an op from the client for an unexpected reason.
// We'll send the error message to the user and try to roll back the change.
var op = this.inflightOp;
if (op.op && op.type.invert) {
op.op = op.type.invert(op.op);
// Transform the undo operation by any pending ops.
for (var i = 0; i < this.pendingData.length; i++) {
xf(this.pendingData[i], op);
for (var i = 0; i < this.pendingOps.length; i++) {
transformX(this.pendingOps[i], op);
}

@@ -875,23 +735,27 @@

this._otApply(op, false);
} else if (op.op || op.del) {
// This is where an undo stack would come in handy.
this._setType(null);
this.version = null;
this.state = null;
this.subscribed = false;
this.emit('error', 'Op apply failed and the operation could not be reverted');
// Trigger a fetch. In our invalid state, we can't really do anything.
this.fetch();
this.flush();
this._clearInflightOp(err);
return;
}
// Cancel all pending ops and reset if we can't invert
var pending = this.pendingOps;
this._setType(null);
this.version = null;
this.inflightOp = null;
this.pendingOps = [];
// Fetch the latest from the server to get us back into a working state
this.fetch(function() {
callEach(op.callbacks, err);
for (var i = 0; i < pending.length; i++) {
callEach(pending[i].callbacks, err);
}
});
};
Doc.prototype._clearInflightOp = function(error) {
var callbacks = this.inflightData.callbacks;
for (var i = 0; i < callbacks.length; i++) {
callbacks[i](error || this.inflightData.error);
}
Doc.prototype._clearInflightOp = function(err) {
callEach(this.inflightOp.callbacks, err);
this.inflightData = null;
this.inflightOp = null;
this.flush();

@@ -901,34 +765,7 @@ this._emitNothingPending();

// This is called when the server acknowledges an operation from the client.
Doc.prototype._opAcknowledged = function(msg) {
// Our inflight op has been acknowledged, so we can throw away the inflight data.
// (We were only holding on to it incase we needed to resend the op.)
if (!this.state) {
throw new Error('opAcknowledged called from a null state. This should never happen. ' + this.collection + '.' + this.name);
} else if (this.state === 'floating') {
if (!this.inflightData.create) throw new Error('Cannot acknowledge an op. ' + this.collection + '.' + this.name);
// Our create has been acknowledged. This is the same as ingesting some data.
this.version = msg.v;
this.state = 'ready';
var doc = this;
setTimeout(function() {
doc.emit('ready');
}, 0);
} else {
// We already have a snapshot. The snapshot should be at the acknowledged
// version, because the server has sent us all the ops that have happened
// before acknowledging our op.
// This should never happen - something is out of order.
if (msg.v !== this.version) {
throw new Error('Invalid version from server. This can happen when you submit ops in a submitOp callback. ' +
'Expected: ' + this.version + ' Message version: ' + msg.v + ' ' + this.collection + '.' + this.name);
}
function callEach(callbacks, err) {
for (var i = 0; i < callbacks.length; i++) {
var callback = callbacks[i];
if (callback) callback(err);
}
// The op was committed successfully. Increment the version number
this.version++;
this._clearInflightOp();
};
}

@@ -8,7 +8,7 @@ var emitter = require('../emitter');

module.exports = Query;
function Query(type, connection, id, collection, query, options, callback) {
function Query(action, connection, id, collection, query, options, callback) {
emitter.EventEmitter.call(this);
// 'fetch' or 'sub'
this.type = type;
// 'qf' or 'qs'
this.action = action;

@@ -30,21 +30,19 @@ this.connection = connection;

// so that a query can be serialized and then re-established
this.results = options.results || [];
this.results = options.results;
this.extra = undefined;
// Do we have some initial data?
this.ready = false;
this.callback = callback;
this.callback = callback;
this.sent = false;
}
emitter.mixin(Query);
Query.prototype.action = 'qsub';
// Helper for subscribe & fetch, since they share the same message format.
//
// This function actually issues the query.
Query.prototype._execute = function() {
Query.prototype.send = function() {
if (!this.connection.canSend) return;
var msg = {
a: 'q' + this.type,
a: this.action,
id: this.id,

@@ -55,145 +53,135 @@ c: this.collection,

if (this.db != null) msg.db = this.db;
if (this.results.length) {
if (this.results) {
// Collect the version of all the documents in the current result set so we
// don't need to be sent their snapshots again.
var versions = {};
var results = [];
for (var i = 0; i < this.results.length; i++) {
var doc = this.results[i];
if (doc.version == null || doc.collection !== this.collection) continue;
versions[doc.name] = doc.version;
results.push([doc.id, doc.version]);
}
msg.vs = versions;
msg.r = results;
}
this.connection.send(msg);
this.sent = true;
};
// Make a list of documents from the list of server-returned data objects
Query.prototype._dataToDocs = function(data) {
var results = [];
var lastType;
for (var i = 0; i < data.length; i++) {
var docData = data[i];
// Types are only put in for the first result in the set and every time the type changes in the list.
if (docData.type) {
lastType = docData.type;
} else {
docData.type = lastType;
}
// This will ultimately call doc.ingestData(), which is what populates
// the doc snapshot and version with the data returned by the query
var doc = this.connection.get(docData.c || this.collection, docData.d, docData);
results.push(doc);
// Destroy the query object. Any subsequent messages for the query will be
// ignored by the connection.
Query.prototype.destroy = function(callback) {
if (this.connection.canSend && this.action === 'qs') {
this.connection.send({a: 'qu', id: this.id});
}
return results;
this.connection._destroyQuery(this);
};
// Destroy the query object. Any subsequent messages for the query will be
// ignored by the connection. You should unsubscribe from the query before
// destroying it.
Query.prototype.destroy = function() {
if (this.connection.canSend && this.type === 'sub') {
this.connection.send({a:'qunsub', id:this.id});
Query.prototype._onConnectionStateChanged = function() {
if (this.connection.canSend && !this.sent) {
this.send();
} else {
this.sent = false;
}
};
Query.prototype._handleFetch = function(err, data, extra) {
var callback = this.callback;
this.callback = null;
// Once a fetch query gets its data, it is destroyed.
this.connection._destroyQuery(this);
if (err) {
this.emit('ready');
if (callback) return callback(err);
return this.emit('error', err);
}
var results = this._dataToDocs(data);
if (callback) callback(null, results, extra);
this.emit('ready');
};
Query.prototype._onConnectionStateChanged = function(state, reason) {
if (this.connection.state === 'connecting') {
this._execute();
Query.prototype._handleSubscribe = function(err, data, extra) {
var callback = this.callback;
this.callback = null;
if (err) {
// Cleanup the query if the initial subscribe returns an error
this.connection._destroyQuery(this);
this.emit('ready');
if (callback) return callback(err);
return this.emit('error', err);
}
// Subscribe will only return results if issuing a new query without
// previous results. On a resubscribe, changes to the results or ops will
// have already been sent as individual diff events
if (data) {
this.results = this._dataToDocs(data);
this.extra = extra
}
if (callback) callback(null, this.results, this.extra);
this.emit('ready');
};
// Internal method called from connection to pass server messages to the query.
Query.prototype._onMessage = function(msg) {
if ((msg.a === 'qfetch') !== (this.type === 'fetch')) {
console.warn('Invalid message sent to query', msg, this);
return;
Query.prototype._handleDiff = function(err, diff, extra) {
if (err) {
return this.emit('error', err);
}
if (msg.error) this.emit('error', msg.error);
// Query diff data (inserts and removes)
if (diff) {
// We need to go through the list twice. First, we'll ingest all the
// new documents and set them as subscribed. After that we'll emit
// events and actually update our list. This avoids race conditions
// around setting documents to be subscribed & unsubscribing documents
// in event callbacks.
for (var i = 0; i < diff.length; i++) {
var d = diff[i];
if (d.type === 'insert') d.values = this._dataToDocs(d.values);
}
switch (msg.a) {
case 'qfetch':
var results = msg.data ? this._dataToDocs(msg.data) : undefined;
if (this.callback) this.callback(msg.error, results, msg.extra);
// Once a fetch query gets its data, it is destroyed.
this.connection._destroyQuery(this);
break;
case 'q':
// Query diff data (inserts and removes)
if (msg.diff) {
// We need to go through the list twice. First, we'll ingest all the
// new documents and set them as subscribed. After that we'll emit
// events and actually update our list. This avoids race conditions
// around setting documents to be subscribed & unsubscribing documents
// in event callbacks.
for (var i = 0; i < msg.diff.length; i++) {
var d = msg.diff[i];
if (d.type === 'insert') d.values = this._dataToDocs(d.values);
}
for (var i = 0; i < msg.diff.length; i++) {
var d = msg.diff[i];
switch (d.type) {
case 'insert':
var newDocs = d.values;
Array.prototype.splice.apply(this.results, [d.index, 0].concat(newDocs));
this.emit('insert', newDocs, d.index);
break;
case 'remove':
var howMany = d.howMany || 1;
var removed = this.results.splice(d.index, howMany);
this.emit('remove', removed, d.index);
break;
case 'move':
var howMany = d.howMany || 1;
var docs = this.results.splice(d.from, howMany);
Array.prototype.splice.apply(this.results, [d.to, 0].concat(docs));
this.emit('move', docs, d.from, d.to);
break;
}
}
for (var i = 0; i < diff.length; i++) {
var d = diff[i];
switch (d.type) {
case 'insert':
var newDocs = d.values;
Array.prototype.splice.apply(this.results, [d.index, 0].concat(newDocs));
this.emit('insert', newDocs, d.index);
break;
case 'remove':
var howMany = d.howMany || 1;
var removed = this.results.splice(d.index, howMany);
this.emit('remove', removed, d.index);
break;
case 'move':
var howMany = d.howMany || 1;
var docs = this.results.splice(d.from, howMany);
Array.prototype.splice.apply(this.results, [d.to, 0].concat(docs));
this.emit('move', docs, d.from, d.to);
break;
}
}
}
if (msg.extra !== void 0) {
this.emit('extra', msg.extra);
}
break;
case 'qsub':
// This message replaces the entire result set with the set passed.
if (!msg.error) {
var previous = this.results;
// Then add everything in the new result set.
this.results = this._dataToDocs(msg.data);
this.extra = msg.extra;
this.ready = true;
this.emit('change', this.results, previous);
}
if (this.callback) {
this.callback(msg.error, this.results, this.extra);
delete this.callback;
}
break;
if (extra !== undefined) {
this.emit('extra', extra);
}
};
// Change the thing we're searching for. This isn't fully supported on the
// backend (it destroys the old query and makes a new one) - but its
// programatically useful and I might add backend support at some point.
Query.prototype.setQuery = function(q) {
if (this.type !== 'sub') throw new Error('cannot change a fetch query');
// Make a list of documents from the list of server-returned data objects
Query.prototype._dataToDocs = function(data) {
var results = [];
var lastType;
for (var i = 0; i < data.length; i++) {
var docData = data[i];
this.query = q;
if (this.connection.canSend) {
// There's no 'change' message to send to the server. Just resubscribe.
this.connection.send({a:'qunsub', id:this.id});
this._execute();
// Types are only put in for the first result in the set and every time the type changes in the list.
if (docData.type) {
lastType = docData.type;
} else {
docData.type = lastType;
}
// This will ultimately call doc.ingestData(), which is what populates
// the doc snapshot and version with the data returned by the query
var doc = this.connection.get(docData.c || this.collection, docData.d, docData);
results.push(doc);
}
return results;
};
var async = require('async');
var UNIMPLEMENTED = {code: 5000, message: 'DB method unimplemented'};
function DB(options) {

@@ -17,7 +15,7 @@ // pollDebounce is the minimum time in ms between query polls

DB.prototype.commit = function(collection, id, op, snapshot, callback) {
callback(UNIMPLEMENTED);
callback(new Error('commit DB method unimplemented'));
};
DB.prototype.getSnapshot = function(collection, id, fields, callback) {
callback(UNIMPLEMENTED);
callback(new Error('getSnapshot DB method unimplemented'));
};

@@ -35,3 +33,4 @@

}, function(err) {
callback(err, err ? null : results);
if (err) return callback(err);
callback(null, results);
});

@@ -41,3 +40,3 @@ };

DB.prototype.getOps = function(collection, id, from, to, callback) {
callback(UNIMPLEMENTED);
callback(new Error('getOps DB method unimplemented'));
};

@@ -61,3 +60,4 @@

}, function(err) {
callback(err, err ? null : results);
if (err) return callback(err);
callback(null, results);
});

@@ -67,3 +67,3 @@ };

DB.prototype.query = function(collection, query, fields, options, callback) {
callback(UNIMPLEMENTED);
callback(new Error('query DB method unimplemented'));
};

@@ -73,3 +73,3 @@

var fields = {};
this.query(collection, query, fields, function(err, snapshots, extra) {
this.query(collection, query, fields, options, function(err, snapshots, extra) {
if (err) return callback(err);

@@ -85,3 +85,3 @@ var ids = [];

DB.prototype.queryPollDoc = function(collection, id, query, options, callback) {
callback(UNIMPLEMENTED);
callback(new Error('queryPollDoc DB method unimplemented'));
};

@@ -88,0 +88,0 @@

@@ -5,5 +5,3 @@ var inherits = require('util').inherits;

// Stream of operations. Subscribe returns one of these. Passing a version
// makes the stream not emit any operations that are earlier than the
// specified version. This will be updated as the stream emits ops.
// Stream of operations. Subscribe returns one of these
function OpStream() {

@@ -16,4 +14,2 @@ Readable.call(this, {objectMode: true});

this.projection = null;
// Version number of the next op we expect to see
this.v = null;

@@ -33,23 +29,12 @@ this.open = true;

OpStream.prototype.initDocSubscribe = function(backend, agent, projection, version) {
OpStream.prototype.initProjection = function(backend, agent, projection) {
this.backend = backend;
this.agent = agent;
this.projection = projection;
this.v = version;
};
OpStream.prototype.pushOp = function(op) {
// We shouldn't get messages after unsubscribe, but it's happened
// Ignore any messages after unsubscribe
if (!this.open) return;
if (this.v && op.v) {
// op.v will usually be == stream.v, except if we're subscribing &
// buffering ops
if (op.v >= this.v) {
this.v = op.v + 1;
} else {
return;
}
}
if (this.backend) {

@@ -65,2 +50,8 @@ var stream = this;

OpStream.prototype.pushOps = function(ops) {
for (var i = 0; i < ops.length; i++) {
this.pushOp(ops[i]);
}
};
OpStream.prototype.destroy = function() {

@@ -73,52 +64,1 @@ if (!this.open) return;

};
// Helper for subscribe & bulkSubscribe to repack the start of a stream given
// potential operations which happened while the listeners were getting
// established
OpStream.prototype.pack = function(v, ops) {
// If there's no ops to pack, we're good - just return the stream as-is.
if (!ops.length) return;
// Ok, so if there's anything in the stream right now, it might overlap with
// the historical operations. We'll pump the reader and (probably!) prefix
// it with the getOps result.
var op;
var queue = [];
while (op = this.read()) {
queue.push(op);
}
// First send all the operations between v and when we called getOps
for (var i = 0; i < ops.length; i++) {
op = ops[i];
var err = checkOpVersion(op, v);
if (err) return this.push({error: err});
v++;
// console.log("stream push from preloaded ops", op);
this.push(op);
}
// Then all the ops between then and now..
for (i = 0; i < queue.length; i++) {
op = queue[i];
if (op.v >= v) {
var err = checkOpVersion(op, v);
if (err) return this.push({error: err});
v++;
// console.log("stream push from early stream", op);
this.push(op);
}
}
// if (queue.length || ops.length) console.log("Queue " + queue.length + " ops " + ops.length);
this.v = v;
};
function checkOpVersion(op, v) {
if (op.v === v) return;
return {
code: 5000,
message: 'subscribe stream.pack op version inconsistent',
op: op,
v: v
};
}
var OpStream = require('../op-stream');
var util = require('../util');
var UNIMPLEMENTED_MESSAGE = 'Required PubSub method unimplemented';
function PubSub(options) {

@@ -30,12 +28,12 @@ this.prefix = options && options.prefix;

PubSub.prototype._subscribe = function() {
throw new Error(UNIMPLEMENTED_MESSAGE);
PubSub.prototype._subscribe = function(channel, callback) {
callback(new Error('_subscribe PubSub method unimplemented'));
};
PubSub.prototype._unsubscribe = function() {
throw new Error(UNIMPLEMENTED_MESSAGE);
PubSub.prototype._unsubscribe = function(channel, callback) {
callback(new Error('_unsubscribe PubSub method unimplemented'));
};
PubSub.prototype._publish = function() {
throw new Error(UNIMPLEMENTED_MESSAGE);
PubSub.prototype._publish = function(channels, data, callback) {
callback(new Error('_publish PubSub method unimplemented'));
};

@@ -113,3 +111,5 @@

this._unsubscribe(channel);
this._unsubscribe(channel, function(err) {
if (err) throw err;
});
};

@@ -5,3 +5,3 @@ var deepEquals = require('deep-is');

function QueryEmitter(request, stream, snapshots, extra) {
function QueryEmitter(request, stream, ids, extra) {
this.backend = request.backend;

@@ -17,3 +17,3 @@ this.agent = request.agent;

this.stream = stream;
this.ids = pluckIds(snapshots);
this.ids = ids;
this.extra = extra;

@@ -28,15 +28,14 @@

this._polling = false;
this._pollAgain = false;
this._pollTimeout = null;
this._pendingPoll = null;
this.startStream();
this.init();
}
module.exports = QueryEmitter;
QueryEmitter.prototype.destroy = function() {
this.stream.destroy();
};
QueryEmitter.prototype.startStream = function() {
QueryEmitter.prototype.init = function() {
var emitter = this;
this._defaultCallback = function(err) {
if (err) emitter.onError(err);
}
function readStream() {

@@ -46,7 +45,5 @@ var data;

if (data.error) {
console.error('Error in query op stream:', emitter.index, emitter.query);
this.emitError(data.error);
emitter.onError(data.error);
continue;
}
emitter.emitOp(data);
emitter.update(data);

@@ -59,2 +56,6 @@ }

QueryEmitter.prototype.destroy = function() {
this.stream.destroy();
};
QueryEmitter.prototype._emitTiming = function(action, start) {

@@ -66,9 +67,37 @@ this.backend.emit('timing', action, Date.now() - start, this.index, this.query);

var id = op.d;
// Ignore if the user or database say we don't need to poll
// Check if the op's id matches the query before updating the query results
// and send it through immediately if it does. The current snapshot
// (including the op) for a newly matched document will get sent in the
// insert diff, so we don't need to send the op that caused the doc to
// match. If the doc already exists in the client and isn't otherwise
// subscribed, the client will need to request the op when it receives the
// snapshot from the query to bring itself up to date.
//
// The client may see the result of the op get reflected before the query
// results update. This might prove janky in some cases, since a doc could
// get deleted before it is removed from the results, for example. However,
// it will mean that ops which don't end up changing the results are
// received sooner even if query polling takes a while.
//
// Alternatively, we could send the op message only after the query has
// updated, and it would perhaps be ideal to send in the same message to
// avoid the user seeing transitional states where the doc is updated but
// the results order is not.
//
// We should send the op even if it is the op that causes the document to no
// longer match the query. If client-side filters are applied to the model
// to figure out which documents to render in a list, we will want the op
// that removed the doc from the query to cause the client-side computed
// list to update.
if (this.ids.indexOf(id) !== -1) {
this.onOp(op);
}
// Ignore if the database or user function says we don't need to poll
try {
if (this.skipPoll(this.collection, id, op, this.query)) return;
if (this.db.skipPoll(this.collection, id, op, this.query)) return;
if (this.db.skipPoll(this.collection, id, op, this.query)) return this._defaultCallback();
if (this.skipPoll(this.collection, id, op, this.query)) return this._defaultCallback();
} catch (err) {
console.error('Error evaluating skipPoll:', this.collection, id, op, this.query);
return this.emitError(err);
return this._defaultCallback(err);
}

@@ -78,7 +107,7 @@ if (this.canPollDoc) {

// op has changed whether or not it matches the results
this.queryPollDoc(id);
this.queryPollDoc(id, this._defaultCallback);
} else {
// We need to do a full poll of the query, because the query uses limits,
// sorts, or something special
this.queryPoll();
this.queryPoll(this._defaultCallback);
}

@@ -89,12 +118,6 @@ };

if (this._polling || this._pollTimeout) return;
if (this._pollAgain) this.queryPoll();
if (this._pendingPoll) this.queryPoll();
};
QueryEmitter.prototype._finishPoll = function(err) {
this._polling = false;
if (err) this.emitError(err);
this._flushPoll();
};
QueryEmitter.prototype.queryPoll = function() {
QueryEmitter.prototype.queryPoll = function(callback) {
var emitter = this;

@@ -106,3 +129,3 @@

// could end up with results in a funky order and the wrong results being
// removed from the query. Second, only having one query executed
// mutated in the query. Second, only having one query executed
// simultaneously per emitter will act as a natural adaptive rate limiting

@@ -113,10 +136,15 @@ // in case the db is under load.

// on a given id and won't accidentally modify the wrong doc. Also, those
// queries should be faster and we have to run all of them eventually, so
// there is less benefit to load reduction.
// queries should be faster and are less likely to be the same, so there is
// less benefit to possible load reduction.
if (this._polling || this._pollTimeout) {
this._pollAgain = true;
if (this._pendingPoll) {
this._pendingPoll.push(callback);
} else {
this._pendingPoll = [callback];
}
return;
}
this._polling = true;
this._pollAgain = false;
var pending = this._pendingPoll;
this._pendingPoll = null;
if (this.pollDebounce) {

@@ -131,5 +159,11 @@ this._pollTimeout = setTimeout(function() {

this.db.queryPoll(this.collection, this.query, this.options, function(err, ids, extra) {
if (err) return emitter._finishPoll(err);
if (err) return emitter._finishPoll(err, callback, pending);
emitter._emitTiming('query.poll', start);
// Be nice to not have to do this in such a brute force way
if (!deepEquals(emitter.extra, extra)) {
emitter.extra = extra;
emitter.onExtra(extra);
}
var idsDiff = arraydiff(emitter.ids, ids);

@@ -141,29 +175,37 @@ if (idsDiff.length) {

emitter.db.getSnapshotBulk(emitter.collection, inserted, emitter.fields, function(err, snapshotMap) {
if (err) return emitter._finishPoll(err);
if (err) return emitter._finishPoll(err, callback, pending);
emitter.backend._sanitizeSnapshotBulk(emitter.agent, emitter.snapshotProjection, emitter.collection, snapshotMap, function(err) {
if (err) return emitter._finishPoll(err);
if (err) return emitter._finishPoll(err, callback, pending);
emitter._emitTiming('query.pollGetSnapshotBulk', start);
var diff = mapDiff(idsDiff, snapshotMap);
emitter.emitDiff(diff);
emitter._finishPoll();
emitter.onDiff(diff);
emitter._finishPoll(err, callback, pending);
});
});
} else {
emitter.emitDiff(idsDiff);
emitter._finishPoll();
emitter.onDiff(idsDiff);
emitter._finishPoll(err, callback, pending);
}
} else {
emitter._finishPoll(err, callback, pending);
}
// Be nice to not have to do this in such a brute force way
if (!deepEquals(emitter.extra, extra)) {
emitter.extra = extra;
emitter.emitExtra(extra);
}
});
};
QueryEmitter.prototype._finishPoll = function(err, callback, pending) {
this._polling = false;
if (callback) callback(err);
if (pending) {
for (var i = 0; i < pending.length; i++) {
callback = pending[i];
if (callback) callback(err);
}
}
this._flushPoll();
};
QueryEmitter.prototype.queryPollDoc = function(id) {
QueryEmitter.prototype.queryPollDoc = function(id, callback) {
var emitter = this;
var start = Date.now();
this.db.queryPollDoc(this.collection, id, this.query, this.options, function(err, matches) {
if (err) return emitter.emitError(err);
if (err) return callback(err);
emitter._emitTiming('query.pollDoc', start);

@@ -181,44 +223,35 @@

emitter.db.getSnapshot(emitter.collection, id, emitter.fields, function(err, snapshot) {
if (err) return emitter.emitError(err);
if (err) return callback(err);
emitter._emitTiming('query.pollDocGetSnapshot', start);
var values = [snapshot];
emitter.emitDiff([new arraydiff.InsertDiff(index, values)]);
emitter.onDiff([new arraydiff.InsertDiff(index, values)]);
callback();
});
return;
}
} else if (i !== -1 && !matches) {
if (i !== -1 && !matches) {
emitter.ids.splice(i, 1);
emitter.emitDiff([new arraydiff.RemoveDiff(i, 1)]);
emitter.onDiff([new arraydiff.RemoveDiff(i, 1)]);
return callback();
}
callback();
});
};
// Emit functions are called in response to operation events
QueryEmitter.prototype.emitError = function(err) {
this.onError(err);
// Clients must assign each of these functions syncronously after constructing
// an instance of QueryEmitter. The instance is subscribed to an op stream at
// construction time, and does not buffer emitted events. Diff events assume
// all messages are received and applied in order, so it is critical that none
// are dropped.
QueryEmitter.prototype.onError =
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
// Silently ignore if the op stream was destroyed already
if (!this.stream.open) return;
throw new Error('Required QueryEmitter listener not assigned');
};
QueryEmitter.prototype.emitDiff = function(diff) {
this.onDiff(diff);
};
QueryEmitter.prototype.emitExtra = function(extra) {
this.onExtra(extra);
};
QueryEmitter.prototype.emitOp = function(op) {
if (this.ids.indexOf(op.d) === -1) return;
this.onOp(op);
};
// Clients should define these functions
QueryEmitter.prototype.onError = util.doNothing;
QueryEmitter.prototype.onDiff = util.doNothing;
QueryEmitter.prototype.onExtra = util.doNothing;
QueryEmitter.prototype.onOp = util.doNothing;
function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
ids.push(snapshots[i].id);
}
return ids;
}
function getInserted(diff) {

@@ -225,0 +258,0 @@ var inserted = [];

{
"name": "sharedb",
"version": "0.8.5",
"version": "0.9.0",
"description": "JSON OT database backend",

@@ -15,3 +15,4 @@ "main": "lib/index.js",

"expect.js": "^0.3.1",
"mocha": "^2.3.3"
"mocha": "^2.3.3",
"sinon": "^1.17.2"
},

@@ -18,0 +19,0 @@ "scripts": {

@@ -20,2 +20,16 @@ var async = require('async');

// Simplified mock of how submit request applies operations. The
// noteworthy dependency is that it always calls getSnapshot with
// the 'submit' projection and applies the op to the returned
// snapshot before committing. Thus, commit may rely on the behavior
// of getSnapshot with this special projection
function submit(db, collection, id, op, callback) {
db.getSnapshot(collection, id, 'submit', function(err, snapshot) {
if (err) return callback(err);
var err = ot.apply(snapshot, op);
if (err) return callback(err);
db.commit(collection, id, op, snapshot, callback);
});
}
describe('commit', function() {

@@ -25,16 +39,6 @@ function commitConcurrent(db, ops, test, done) {

async.each(ops, function(op, eachCb) {
// Simplified mock of how submit request applies operations. The
// noteworthy dependency is that it always calls getSnapshot with
// the 'submit' projection and applies the op to the returned
// snapshot before committing. Thus, commit may rely on the behavior
// of getSnapshot with this special projection
db.getSnapshot('testcollection', 'foo', 'submit', function(err, snapshot) {
submit(db, 'testcollection', 'foo', op, function(err, succeeded) {
if (err) return eachCb(err);
var err = ot.apply(snapshot, op);
if (err) return eachCb(err);
db.commit('testcollection', 'foo', op, snapshot, function(err, succeeded) {
if (err) return eachCb(err);
if (succeeded) numSucceeded++;
eachCb();
});
if (succeeded) numSucceeded++;
eachCb();
});

@@ -181,2 +185,202 @@ }, function(err) {

describe('getSnapshot', function() {
it('getSnapshot returns v0 snapshot', function(done) {
this.db.getSnapshot('testcollection', 'test', null, function(err, result) {
if (err) throw err;
expect(result).eql({id: 'test', type: null, v: 0, data: null});
done();
});
});
it('getSnapshot returns committed data', function(done) {
var data = {x: 5, y: 6};
var op = {v: 0, create: {type: 'json0', data: data}};
var db = this.db;
submit(db, 'testcollection', 'test', op, function(err, succeeded) {
if (err) throw err;
db.getSnapshot('testcollection', 'test', null, function(err, result) {
if (err) throw err;
expect(result).eql({id: 'test', type: 'http://sharejs.org/types/JSONv0', v: 1, data: data});
done();
});
});
});
});
describe('getSnapshotBulk', function() {
it('getSnapshotBulk returns committed and v0 snapshots', function(done) {
var data = {x: 5, y: 6};
var op = {v: 0, create: {type: 'json0', data: data}};
var db = this.db;
submit(db, 'testcollection', 'test', op, function(err, succeeded) {
if (err) throw err;
db.getSnapshotBulk('testcollection', ['test2', 'test'], null, function(err, resultMap) {
if (err) throw err;
expect(resultMap).eql({
test: {id: 'test', type: 'http://sharejs.org/types/JSONv0', v: 1, data: data},
test2: {id: 'test2', type: null, v: 0, data: null}
});
done();
});
});
});
});
describe('getOps', function() {
it('getOps returns 1 committed op', function(done) {
var op = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}};
var db = this.db;
submit(db, 'testcollection', 'test', op, function(err, succeeded) {
if (err) throw err;
db.getOps('testcollection', 'test', 0, null, function(err, ops) {
if (err) throw err;
expect(ops).eql([op]);
done();
});
});
});
it('getOps returns 2 committed ops', function(done) {
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}};
var op1 = {v: 1, op: [{p: ['x'], na: 1}]};
var db = this.db;
submit(db, 'testcollection', 'test', op0, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test', op1, function(err, succeeded) {
if (err) throw err;
db.getOps('testcollection', 'test', 0, null, function(err, ops) {
if (err) throw err;
expect(ops).eql([op0, op1]);
done();
});
});
});
});
it('getOps returns from specific op number', function(done) {
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}};
var op1 = {v: 1, op: [{p: ['x'], na: 1}]};
var db = this.db;
submit(db, 'testcollection', 'test', op0, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test', op1, function(err, succeeded) {
if (err) throw err;
db.getOps('testcollection', 'test', 1, null, function(err, ops) {
if (err) throw err;
expect(ops).eql([op1]);
done();
});
});
});
});
it('getOps returns to specific op number', function(done) {
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}};
var op1 = {v: 1, op: [{p: ['x'], na: 1}]};
var db = this.db;
submit(db, 'testcollection', 'test', op0, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test', op1, function(err, succeeded) {
if (err) throw err;
db.getOps('testcollection', 'test', 0, 1, function(err, ops) {
if (err) throw err;
expect(ops).eql([op0]);
done();
});
});
});
});
});
describe('getOpsBulk', function() {
it('getOpsBulk returns committed ops', function(done) {
var op = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}};
var db = this.db;
submit(db, 'testcollection', 'test', op, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test2', op, function(err, succeeded) {
if (err) throw err;
db.getOpsBulk('testcollection', {test: 0, test2: 0}, null, function(err, opsMap) {
if (err) throw err;
expect(opsMap).eql({
test: [op],
test2: [op]
});
done();
});
});
});
});
it('getOpsBulk returns committed ops with specific froms', function(done) {
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}};
var op1 = {v: 1, op: [{p: ['x'], na: 1}]};
var db = this.db;
submit(db, 'testcollection', 'test', op0, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test2', op0, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test', op1, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test2', op1, function(err, succeeded) {
if (err) throw err;
db.getOpsBulk('testcollection', {test: 0, test2: 1}, null, function(err, opsMap) {
if (err) throw err;
expect(opsMap).eql({
test: [op0, op1],
test2: [op1]
});
done();
});
});
});
});
});
});
it('getOpsBulk returns committed ops with specific to', function(done) {
var op0 = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}};
var op1 = {v: 1, op: [{p: ['x'], na: 1}]};
var db = this.db;
submit(db, 'testcollection', 'test', op0, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test2', op0, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test', op1, function(err, succeeded) {
if (err) throw err;
submit(db, 'testcollection', 'test2', op1, function(err, succeeded) {
if (err) throw err;
db.getOpsBulk('testcollection', {test: 1, test2: 0}, {test2: 1}, function(err, opsMap) {
if (err) throw err;
expect(opsMap).eql({
test: [op1],
test2: [op0]
});
done();
});
});
});
});
});
});
});
describe('getOpsToSnapshot', function() {
it('getOpsToSnapshot returns committed op', function(done) {
var op = {v: 0, create: {type: 'json0', data: {x: 5, y: 6}}};
var db = this.db;
submit(db, 'testcollection', 'test', op, function(err, succeeded) {
if (err) throw err;
db.getSnapshot('testcollection', 'test', 'submit', function(err, snapshot) {
if (err) throw err;
db.getOpsToSnapshot('testcollection', 'test', 0, snapshot, function(err, ops) {
if (err) throw err;
expect(ops).eql([op]);
done();
});
});
});
});
});
describe('query', function() {

@@ -236,2 +440,25 @@ it('returns data in the collection', function(done) {

describe('queryPoll', function() {
it('returns data in the collection', function(done) {
var snapshot = {v: 1, type: 'json0', data: {x: 5, y: 6}};
var db = this.db;
db.commit('testcollection', 'test', {v: 0, create: {}}, snapshot, function(err, succeeded) {
if (err) throw err;
db.queryPoll('testcollection', {x: 5}, null, function(err, ids) {
if (err) throw err;
expect(ids).eql(['test']);
done();
});
});
});
it('returns nothing when there is no data', function(done) {
this.db.queryPoll('testcollection', {x: 5}, null, function(err, ids) {
if (err) throw err;
expect(ids).eql([]);
done();
});
});
});
describe('queryPollDoc', function() {

@@ -238,0 +465,0 @@ it('returns false when the document does not exist', function(done) {

var MemoryPubSub = require('../lib/pubsub/memory');
var PubSub = require('../lib/pubsub');
var expect = require('expect.js');

@@ -6,1 +8,33 @@ require('./pubsub')(function(callback) {

});
require('./pubsub')(function(callback) {
callback(null, MemoryPubSub({prefix: 'foo'}));
});
describe('PubSub base class', function() {
it('returns an error if _subscribe is unimplemented', function() {
var pubsub = new PubSub();
pubsub.subscribe('x', function(err) {
expect(err).an(Error);
});
});
it('throws an error if _unsubscribe is unimplemented', function() {
var pubsub = new PubSub();
pubsub._subscribe = function(channel, callback) {
callback();
};
pubsub.subscribe('x', function(err, stream) {
if (err) throw err;
expect(function() {
stream.destroy();
}).throwException();
});
});
it('returns an error if _publish is unimplemented', function() {
var pubsub = new PubSub();
pubsub.publish(['x', 'y'], {test: true}, function(err) {
expect(err).an(Error);
});
});
});

@@ -26,6 +26,39 @@ var expect = require('expect.js');

});
pubsub.publish(['x', 'y'], {test: true});
expect(pubsub.streamsCount).equal(1);
pubsub.publish(['x'], {test: true});
});
});
it('publish optional callback waits', function(done) {
var pubsub = this.pubsub;
pubsub.subscribe('x', function(err, stream) {
if (err) throw err;
var emitted;
stream.on('data', function(data) {
emitted = data;
});
pubsub.publish(['x'], {test: true}, function(err) {
if (err) throw err;
expect(emitted).eql({test: true});
done();
});
});
});
it('can subscribe to a channel twice', function(done) {
var pubsub = this.pubsub;
pubsub.subscribe('y', function(err, stream) {
pubsub.subscribe('y', function(err, stream) {
if (err) throw err;
var emitted;
stream.on('data', function(data) {
expect(data).eql({test: true});
done();
});
expect(pubsub.streamsCount).equal(2);
pubsub.publish(['x', 'y'], {test: true});
});
});
});
it('stream.destroy() unsubscribes from a channel', function(done) {

@@ -32,0 +65,0 @@ var pubsub = this.pubsub;

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc