New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

sharedb

Package Overview
Dependencies
Maintainers
1
Versions
141
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sharedb - npm Package Compare versions

Comparing version 0.8.0 to 0.8.1

lib/backend.js

254

lib/agent.js

@@ -24,12 +24,7 @@ // This implements the network API for ShareJS.

var async = require('async');
var hat = require('hat');
var util = require('./util');
// stream is a nodejs 0.10 stream object.
/**
* @param {Shareshare} share
* @param {Duplex} stream
* @param {Http.Request} req
*/
/**
* Agent deserializes the wire protocol messages received from the stream and

@@ -40,8 +35,8 @@ * calls the corresponding functions on its Agent. It uses the return values

*
* @param {Shareshare} share
* @param {Backend} backend
* @param {Duplex} stream connection to a client
*/
function Agent(share, stream) {
function Agent(backend, stream) {
// The stream passed in should be a nodejs 0.10-style stream.
this.share = share;
this.backend = backend;
this.stream = stream;

@@ -53,6 +48,6 @@

// We need to track which documents are subscribed by the client. This is a
// map of collection name -> {id: stream || true || false}
// map of collection -> id -> stream
this.subscribedDocs = {};
// Map from query ID -> emitter.
// Map from queryId -> emitter
this.subscribedQueries = {};

@@ -99,4 +94,5 @@

for (var collection in this.subscribedDocs) {
for (var id in this.subscribedDocs[collection]) {
var stream = this.subscribedDocs[collection][id];
var docs = this.subscribedDocs[collection];
for (var id in docs) {
var stream = docs[id];
stream.destroy();

@@ -121,21 +117,29 @@ }

Agent.prototype._subscribeToStream = function(collection, id, stream) {
var docs = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = {});
if (this.closed) return stream.destroy();
var streams = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = {});
// If already subscribed to this document, destroy the previously subscribed stream
var previous = docs[id];
var previous = streams[id];
if (previous) previous.destroy();
if (this.closed) return stream.destroy();
docs[id] = stream;
streams[id] = stream;
var agent = this;
stream.on('data', onData);
function onData(data) {
stream.on('data', function(data) {
if (data.error) {
// Log then silently ignore errors in a subscription stream, since these
// may not be the client's fault, and they were not the result of a
// direct request by the client
console.error('Doc subscription stream error', collection, id, data.error);
return;
}
if (agent._isOwnOp(collection, data)) return;
agent._sendOp(collection, id, data);
}
});
stream.on('end', function() {
// Livedb has closed the op stream, so release its reference
var docs = agent.subscribedDocs[collection];
if (!docs) return;
delete docs[id];
if (util.hasKeys(docs)) return;
var streams = agent.subscribedDocs[collection];
if (!streams) return;
delete streams[id];
if (util.hasKeys(streams)) return;
delete agent.subscribedDocs[collection];

@@ -146,5 +150,6 @@ });

Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) {
if (this.closed) return emitter.destroy();
var previous = this.subscribedQueries[queryId];
if (previous) previous.destroy();
if (this.closed) return emitter.destroy();
this.subscribedQueries[queryId] = emitter;

@@ -170,11 +175,11 @@

emitter.onError = function(err) {
// Should we destroy the emitter here?
agent._send({a: 'q', id: queryId, error: err});
console.warn('Query ' + collection + '.' + JSON.stringify(query) + ' emitted an error:', err);
emitter.destroy();
delete agent.subscribedQueries[queryId];
// Log then silently ignore errors in a subscription stream, since these
// may not be the client's fault, and they were not the result of a
// direct request by the client
console.error('Query subscription stream error', collection, query, err);
};
emitter.onOp = function(op) {
var id = op.id;
var id = op.d;
if (agent._isOwnOp(collection, op)) return;
agent._sendOp(collection, id, op);

@@ -184,3 +189,9 @@ };

// Send a message to the remote client.
Agent.prototype._isOwnOp = function(collection, op) {
// Detect ops from this client on the same projection. Since the client sent
// these in, the submit reply will be sufficient and we can silently ignore
// them in the streams for subscribed documents or queries
return this.clientId === op.src && collection === (op.i || op.c);
};
Agent.prototype._send = function(msg) {

@@ -193,3 +204,3 @@ // Quietly drop replies if the stream was closed

Agent.prototype._sendOp = function(collection, id, data) {
Agent.prototype._sendOp = function(collection, id, op) {
var msg = {

@@ -199,18 +210,10 @@ a: 'op',

d: id,
v: data.v,
src: data.src,
seq: data.seq
v: op.v,
src: op.src,
seq: op.seq
};
if (op.op) msg.op = op.op;
if (op.create) msg.create = op.create;
if (op.del) msg.del = true;
// In theory, we only need to send the operation data if data.src !==
// this.clientId. However, this doesn't work with projections because
// the client needs to see their own operations in the projected collection.
//
// I'd like to reinstate this optimization, but I can't think of a good way to
// do it while making projections work. For now, you get your own operations
// back.
if (data.op) msg.op = data.op;
if (data.create) msg.create = data.create;
if (data.del) msg.del = true;
this._send(msg);

@@ -220,10 +223,7 @@ };

Agent.prototype._reply = function(req, err, msg) {
if (err) {
msg = {a:req.a, error:err};
} else {
if (!msg.a) msg.a = req.a;
}
var msg = (err) ? {error: err} : msg || {};
if (req.c) msg.c = req.c; // collection
if (req.d) msg.d = req.d; // id
msg.a = req.a;
if (req.c) msg.c = req.c;
if (req.d) msg.d = req.d;
if (req.id) msg.id = req.id;

@@ -241,22 +241,3 @@

var agent = this;
if (req != null) {
if (typeof req === 'string') {
try {
req = JSON.parse(req);
} catch(e) {
console.warn('Client sent invalid JSON', e.stack);
agent.close(e);
}
}
this._handleMessage(req, function(err, msg) {
if (err || msg) agent._reply(req, err, msg);
// This is in a process.nextTick to avoid stack smashing attacks (since
// sometimes this callback function is called synchronously).
process.nextTick(function() {
agent.pump();
});
});
} else {
if (req == null) {
// Retry when there's a message waiting for us.

@@ -266,3 +247,17 @@ this.stream.once('readable', function() {

});
return;
}
if (typeof req === 'string') {
try {
req = JSON.parse(req);
} catch(e) {
console.warn('Client sent invalid JSON', e.stack);
this.close(e);
}
}
this._handleMessage(req);
// Clean up the stack then read the next message
process.nextTick(function() {
agent.pump();
});
};

@@ -292,9 +287,11 @@

// Handle an incoming message from the client
Agent.prototype._handleMessage = function(req, callback) {
var err = this._checkRequest(req);
if (err) {
console.warn('Warning: Invalid request from ', this.clientId, req, 'Error: ', err);
return callback(err);
}
Agent.prototype._handleMessage = function(req) {
var agent = this;
var callback = function(err, message) {
agent._reply(req, err, message);
};
var errMessage = this._checkRequest(req);
if (errMessage) return callback({code: 4000, message: errMessage});
switch (req.a) {

@@ -318,4 +315,6 @@ case 'qsub':

default:
console.warn('invalid message', req);
callback('invalid or unknown message');
callback({
code: 4000,
message: 'Invalid or unknown message'
});
}

@@ -325,10 +324,6 @@ };

function getQueryOptions(req) {
var options = {};
if (req.o) {
// The client tells us what versions it already has
options.versions = req.o.vs;
// Set the DB name for the request (useful if you have alternate databases)
options.db = req.o.b;
}
return options;
return {
versions: req.vs,
db: req.db
};
}

@@ -338,9 +333,9 @@

// Subscribe to a query. The client is sent the query results and its
// notified whenever there's a change.
// notified whenever there's a change
var queryId = req.id;
var collection = req.collection;
var collection = req.c;
var query = req.q;
var options = getQueryOptions(req);
var agent = this;
this.share.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) {
this.backend.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) {
if (err) return callback(err);

@@ -354,6 +349,6 @@ agent._subscribeToQuery(emitter, queryId, collection, query);

var queryId = req.id;
var emitter = agent.subscribedQueries[queryId];
var emitter = this.subscribedQueries[queryId];
if (emitter) {
emitter.destroy();
delete agent.subscribedQueries[queryId];
delete this.subscribedQueries[queryId];
}

@@ -364,9 +359,9 @@ process.nextTick(callback);

Agent.prototype._queryFetch = function(req, callback) {
// Fetch the results of a query. This does not subscribe to the query or
// anything, its just a once-off query fetch.
// Fetch the results of a query once
var queryId = req.id;
var collection = req.collection;
var collection = req.c;
var query = req.q;
var options = getQueryOptions(req);
this.share.queryFetch(this, collection, query, options, function(err, results, extra) {
var agent = this;
this.backend.queryFetch(this, collection, query, options, function(err, results, extra) {
if (err) return callback(err);

@@ -378,9 +373,9 @@ agent._sendQueryResults(queryId, collection, options, results, extra, callback);

Agent.prototype._sendQueryResults = function(queryId, collection, options, results, extra, callback) {
var versions = options.versions && options.versions[collection];
var versions = options.versions;
var data = getResultsData(results, versions);
var res = {id: queryId, data: data, extra: extra};
var opsRequest = getResultsOpsRequest(results, versions);
if (!opsRequest) callback(null, res);
if (!opsRequest) return callback(null, res);
var agent = this;
this.share.getOpsBulk(this, collection, opsRequest, null, function(err, results) {
this.backend.getOpsBulk(this, collection, opsRequest, null, function(err, results) {
if (err) return callback(err);

@@ -405,3 +400,3 @@ for (var id in results) {

}
if (!versions || versions[item.id] == null) {
if (!versions || versions[result.id] == null) {
item.data = result.data;

@@ -414,3 +409,4 @@ }

function getResultsOpsRequest(results, versions) {
var request = null;
if (!versions) return;
var request;
for (var i = 0; i < results.length; i++) {

@@ -431,7 +427,7 @@ var result = results[i];

var request = req.s;
var agent = this;
var response = {};
var agent = this;
async.forEachOf(request, function(versions, collection, eachCb) {
agent.share.subscribeBulk(agent, collection, versions, function(err, streams, snapshotMap) {
agent.backend.subscribeBulk(agent, collection, versions, function(err, streams, snapshotMap) {
if (err) return eachCb(err);

@@ -449,6 +445,6 @@ for (var id in streams) {

// Close any streams we may have already subscribed before erroring to
// avoid leaking memory if earlier calls to share.subscribeBulk succeed
// avoid leaking memory if earlier calls to backend.subscribeBulk succeed
// and others fail
for (var collection in request) {
var docs = this.subscribedDocs[collection];
var docs = agent.subscribedDocs[collection];
if (!docs) continue;

@@ -468,3 +464,2 @@ for (var id in request[collection]) {

// Subscribe to a document
var agent = this;
var collection = req.c;

@@ -476,3 +471,4 @@ var id = req.d;

// since the specified version
this.share.subscribe(this, collection, id, version, function(err, stream, data) {
var agent = this;
this.backend.subscribe(this, collection, id, version, function(err, stream, data) {
if (err) return callback(err);

@@ -502,3 +498,2 @@ agent._subscribeToStream(collection, id, stream);

Agent.prototype._fetch = function(req, callback) {
var agent = this;
var collection = req.c;

@@ -509,3 +504,3 @@ var id = req.d;

// Fetch a snapshot
this.share.fetch(this, collection, id, function(err, data) {
this.backend.fetch(this, collection, id, function(err, data) {
if (err) return callback(err);

@@ -517,3 +512,4 @@ callback(null, {data: data});

// actually wants me to fetch some ops
this.share.getOps(this, collection, id, version, null, function(err, results) {
var agent = this;
this.backend.getOps(this, collection, id, version, null, function(err, results) {
if (err) return callback(err);

@@ -529,17 +525,15 @@ for (var i = 0; i < results.length; i++) {

Agent.prototype._submit = function(req, callback) {
var agent = this;
var collection = req.c;
var id = req.d;
var op = this._createOp(req);
this.share.submit(this, collection, id, op, function(err, ops) {
// Occassional 'Op already submitted' errors are expected to happen
// as part of normal operation, since inflight ops need to be resent
// after disconnect
var agent = this;
this.backend.submit(this, collection, id, op, function(err, ops) {
// Message to acknowledge the op was successfully submitted
var ack = {src: op.src, seq: op.seq, v: op.v};
if (err) {
if (err === 'Op already submitted') {
agent._sendOp(collection, id, op);
}
console.error('Op error:', err, collection, id, op);
callback(null, {a: 'ack', error: err});
return;
// Occassional 'Op already submitted' errors are expected to happen as
// part of normal operation, since inflight ops need to be resent after
// disconnect. In this case, ack the op so the client can proceed
if (err.code === 4001) return callback(null, ack);
return callback(err);
}

@@ -551,9 +545,7 @@

}
// Luckily, the op is transformed & etc in place.
agent._sendOp(collection, id, op);
callback();
callback(null, ack);
});
};
function CreateOp(src, seq, v, m, create) {
function CreateOp(src, seq, v, create) {
this.src = src;

@@ -565,3 +557,3 @@ this.seq = seq;

}
function EditOp(src, seq, v, m, op) {
function EditOp(src, seq, v, op) {
this.src = src;

@@ -573,3 +565,3 @@ this.seq = seq;

}
function DeleteOp(src, seq, v, m, del) {
function DeleteOp(src, seq, v, del) {
this.src = src;

@@ -576,0 +568,0 @@ this.seq = seq;

@@ -462,3 +462,3 @@ var Doc = require('./doc');

//
// The index is specific to the source, but if you're using mongodb it'll be
// The index is specific to the db, but if you're using mongodb it'll be
// the collection to which the query is made.

@@ -465,0 +465,0 @@ // The callback should have the signature function(error, results, extraData)

@@ -11,3 +11,2 @@ var types = require('../types').map;

*
*
* Subscriptions

@@ -32,8 +31,3 @@ * -------------

*
* TODO What happens when the document does not exist yet.
*
*
*
*
*
* Events

@@ -43,3 +37,3 @@ * ------

* You can use doc.on(eventName, callback) to subscribe to the following events:
* - `before op (op, localContext)` Fired before an operation is applied to the
* - `before op (op)` Fired before an operation is applied to the
* snapshot. The document is already in locked state, so it is not allowed to

@@ -49,5 +43,5 @@ * submit further operations. It may be used to read the old snapshot just

* operation originated locally and `false` otherwise
* - `after op (op, localContext)` Fired after an operation has been applied to
* - `after op (op)` Fired after an operation has been applied to
* the snapshot. The arguments are the same as for `before op`
* - `op (op, localContext)` The same as `after op` unless incremental updates
* - `op (op)` The same as `after op` unless incremental updates
* are enabled. In this case it is fired after every partial operation with

@@ -57,5 +51,5 @@ * this operation as the first argument. When fired the document is in a

* - `subscribed (error)` The document was subscribed
* - `created (localContext)` The document was created. That means its type was
* - `created ()` The document was created. That means its type was
* set and it has some initial data.
* - `del (localContext, snapshot)` Fired after the document is deleted, that is
* - `del (snapshot)` Fired after the document is deleted, that is
* the snapshot is null. It is passed the snapshot before delteion as an

@@ -66,2 +60,3 @@ * arguments

*/
module.exports = Doc;

@@ -169,3 +164,2 @@ function Doc(connection, collection, name) {

}
this.removeContexts();

@@ -263,7 +257,5 @@ // Set the new type

Doc.prototype._onMessage = function(msg) {
if (!(msg.c === this.collection && msg.d === this.name)) {
if (msg.c !== this.collection || msg.d !== this.name) {
// This should never happen - its a sanity check for bugs in the connection code.
var err = 'Got message for wrong document.';
console.error(err, this.collection, this.name, msg);
throw new Error(err);
throw new Error('Got message for wrong document');
}

@@ -279,3 +271,3 @@

this._finishSub(msg.error);
break;
return;

@@ -285,3 +277,3 @@ case 'sub':

this._handleSubscribe(msg.error, msg.data);
break;
return;

@@ -295,11 +287,6 @@ case 'unsub':

this._finishSub(msg.error);
break;
return;
case 'ack':
// Acknowledge a locally submitted operation.
//
// Usually we do nothing here - all the interesting logic happens when we
// get sent our op back in the op stream (which happens even if we aren't
// subscribed)
if (msg.error && msg.error !== 'Op already submitted') {
case 'op':
if (msg.error) {
// The server has rejected an op from the client for an unexpected reason.

@@ -316,6 +303,5 @@ // We'll send the error message to the user and try to roll back the change.

}
return;
}
break;
case 'op':
if (this.inflightData &&

@@ -327,3 +313,3 @@ msg.src === this.inflightData.src &&

this._opAcknowledged(msg);
break;
return;
}

@@ -339,3 +325,3 @@

this._getLatestOps();
break;
return;
}

@@ -358,29 +344,20 @@

// In this case, we can safely ignore the old (duplicate) operation.
break;
return;
}
this._transformPendingOps(msg);
if (this.inflightData) xf(this.inflightData, msg);
for (var i = 0; i < this.pendingData.length; i++) {
xf(this.pendingData[i], msg);
}
this.version++;
this._otApply(msg);
break;
this._otApply(msg, false);
return;
case 'meta':
console.warn('Unhandled meta op:', msg);
break;
default:
console.warn('Unhandled document message:', msg);
break;
}
};
Doc.prototype._transformPendingOps = function(op) {
if (this.inflightData) {
xf(this.inflightData, op);
}
for (var i = 0; i < this.pendingData.length; i++) {
xf(this.pendingData[i], op);
}
};
Doc.prototype._getLatestOps = function() {

@@ -606,3 +583,3 @@ var doc = this;

*/
Doc.prototype._otApply = function(op) {
Doc.prototype._otApply = function(op, context) {
this.locked = true;

@@ -619,3 +596,3 @@

this.once('unlock', function() {
this.emit('create');
this.emit('create', context);
});

@@ -627,3 +604,3 @@ } else if (op.del) {

this.once('unlock', function() {
this.emit('del', oldSnapshot);
this.emit('del', oldSnapshot, context);
});

@@ -634,3 +611,3 @@ } else if (op.op) {

this.emit('before op', op.op);
this.emit('before op', op.op, context);

@@ -648,3 +625,3 @@ // This exists so clients can pull any necessary data out of the snapshot

doc.snapshot = snapshot;
doc.emit('op', component);
doc.emit('op', component, context);
});

@@ -654,3 +631,3 @@ } else {

this.snapshot = type.apply(this.snapshot, op.op);
this.emit('op', op.op);
this.emit('op', op.op, context);
}

@@ -666,3 +643,3 @@ }

if (op.op) {
return this.emit('after op', op.op);
this.emit('after op', op.op);
}

@@ -732,7 +709,14 @@ };

// @param [callback] called when operation is submitted
Doc.prototype._submitOp = function(op, callback) {
Doc.prototype._submitOp = function(op, context, callback) {
if (typeof context === 'function') {
callback = context;
context = true; // The default context is true
} else if (context == null) {
context = true;
}
if (this.locked) {
var err = 'Cannot call submitOp from inside an op event handler. ' + this.collection + ' ' + this.name;
var err = new Error('Cannot call submitOp from inside an op event handler. ' + this.collection + ' ' + this.name);
if (callback) return callback(err);
throw new Error(err);
throw err;
}

@@ -743,5 +727,5 @@

if (!this.type) {
var err = 'Document has not been created';
var err = new Error('Document has not been created');
if (callback) return callback(err);
throw new Error(err);
throw err;
}

@@ -772,3 +756,3 @@ // Try to normalize the op. This removes trailing skip:0's and things like that.

this._otApply(op);
this._otApply(op, context);

@@ -793,4 +777,4 @@ // The call to flush is in a timeout so if submitOp() is called multiple

// @fires before op, op, after op
Doc.prototype.submitOp = function(op, callback) {
this._submitOp({op: op}, callback);
Doc.prototype.submitOp = function(op, context, callback) {
this._submitOp({op: op}, context, callback);
};

@@ -806,11 +790,11 @@

// @param callback called when operation submitted
Doc.prototype.create = function(type, data, callback) {
Doc.prototype.create = function(type, data, context, callback) {
if (this.type) {
var err = 'Document already exists';
var err = new Error('Document already exists');
if (callback) return callback(err);
throw new Error(err);
throw err;
}
var op = {create: {type:type, data:data}};
this._submitOp(op, callback);
this._submitOp(op, context, callback);
};

@@ -824,10 +808,10 @@

// @param callback called when operation submitted
Doc.prototype.del = function(callback) {
Doc.prototype.del = function(context, callback) {
if (!this.type) {
var err = 'Document does not exist';
var err = new Error('Document does not exist');
if (callback) return callback(err);
throw new Error(err);
throw err;
}
this._submitOp({del: true}, callback);
this._submitOp({del: true}, context, callback);
};

@@ -883,3 +867,3 @@

// by the server, the editor window should update to reflect the undo.
this._otApply(op);
this._otApply(op, false);
} else if (op.op || op.del) {

@@ -915,5 +899,5 @@ // This is where an undo stack would come in handy.

if (!this.state) {
throw new Error('opAcknowledged called from a null state. This should never happen. ' + this.collection + ' ' + this.name);
throw new Error('opAcknowledged called from a null state. This should never happen. ' + this.collection + '.' + this.name);
} else if (this.state === 'floating') {
if (!this.inflightData.create) throw new Error('Cannot acknowledge an op. ' + this.collection + ' ' + this.name);
if (!this.inflightData.create) throw new Error('Cannot acknowledge an op. ' + this.collection + '.' + this.name);

@@ -934,3 +918,4 @@ // Our create has been acknowledged. This is the same as ingesting some data.

if (msg.v !== this.version) {
throw new Error('Invalid version from server. This can happen when you submit ops in a submitOp callback. Expected: ' + this.version + ' Message version: ' + msg.v + ' ' + this.collection + ' ' + this.name);
throw new Error('Invalid version from server. This can happen when you submit ops in a submitOp callback. ' +
'Expected: ' + this.version + ' Message version: ' + msg.v + ' ' + this.collection + '.' + this.name);
}

@@ -937,0 +922,0 @@ }

@@ -21,8 +21,2 @@ var emitter = require('../emitter');

// Do we repoll the entire query whenever anything changes? (As opposed to
// just polling the changed item). This needs to be enabled to be able to use
// ordered queries (sortby:) and paginated queries. Set to undefined, it will
// be enabled / disabled automatically based on the query's properties.
this.poll = options.poll;
// The db we actually hit. If this isn't defined, it hits the snapshot

@@ -34,9 +28,5 @@ // database. Otherwise this can be used to hit another configured query

// A list of resulting documents. These are actual documents, complete with
// data and all the rest. If fetch is false, these documents will not
// have any data. You should manually call fetch() or subscribe() on them.
//
// Calling subscribe() might be a good idea anyway, as you won't be
// subscribed to the documents by default.
this.knownDocs = options.knownDocs || [];
this.results = [];
// data and all the rest. It is possible to pass in an initial results set,
// so that a query can be serialized and then re-established
this.results = options.results || [];

@@ -58,12 +48,2 @@ // Do we have some initial data?

var versions = {};
// Collect the version of all the documents in the current result set so we
// don't need to be sent their snapshots again.
for (var i = 0; i < this.knownDocs.length; i++) {
var doc = this.knownDocs[i];
if (doc.version == null) continue;
var collectionVersions = versions[doc.collection] = (versions[doc.collection] || {});
collectionVersions[doc.name] = doc.version;
}
var msg = {

@@ -73,9 +53,17 @@ a: 'q' + this.type,

c: this.collection,
o: {vs: versions},
q: this.query,
q: this.query
};
if (this.db != null) msg.db = this.db;
if (this.results.length) {
// Collect the version of all the documents in the current result set so we
// don't need to be sent their snapshots again.
var versions = {};
for (var i = 0; i < this.results.length; i++) {
var doc = this.results[i];
if (doc.version == null || doc.collection !== this.collection) continue;
versions[doc.name] = doc.version;
}
msg.vs = versions;
}
if (this.backend != null) msg.o.b = this.backend;
if (this.poll !== undefined) msg.o.p = this.poll;
this.connection.send(msg);

@@ -186,3 +174,3 @@ };

// Then add everything in the new result set.
this.results = this.knownDocs = this._dataToDocs(msg.data);
this.results = this._dataToDocs(msg.data);
this.extra = msg.extra;

@@ -189,0 +177,0 @@

@@ -6,2 +6,3 @@ var async = require('async');

function DB(options) {
// pollDebounce is the minimum time in ms between query polls
this.pollDebounce = options && options.pollDebounce;

@@ -8,0 +9,0 @@ }

@@ -13,3 +13,3 @@ var DB = require('./index');

function MemoryDB(options) {
if (!(this instanceof MemoryDB)) return new MemoryDB();
if (!(this instanceof MemoryDB)) return new MemoryDB(options);
DB.call(this, options);

@@ -16,0 +16,0 @@

@@ -1,448 +0,14 @@

var async = require('async');
var emitter = require('./emitter');
var ot = require('./ot');
var projections = require('./projections');
var Agent = require('./agent');
var QueryEmitter = require('./query-emitter');
var SubmitRequest = require('./submit-request');
var Backend = require('./backend');
module.exports = Backend;
function ShareDB(options) {
if (!(this instanceof ShareDB)) return new ShareDB(options);
emitter.EventEmitter.call(this);
if (!options) options = {};
this.pubsub = options.pubsub || ShareDB.MemoryDB();
this.db = options.db || ShareDB.MemoryPubSub();
// This contains any extra databases that can be queried
this.extraDbs = options.extraDbs || {};
// Map from projected collection -> {type, fields}
this.projections = {};
this.suppressPublish = !!options.suppressPublish;
this.maxSubmitRetries = options.maxSubmitRetries || null;
// Map from event name to a list of middleware
this.middleware = {};
}
module.exports = ShareDB;
emitter.mixin(ShareDB);
ShareDB.ot = ot;
ShareDB.projections = projections;
ShareDB.Agent = Agent;
ShareDB.QueryEmitter = QueryEmitter;
ShareDB.SubmitRequest = SubmitRequest;
ShareDB.types = require('./types');
ShareDB.DB = require('./db');
ShareDB.MemoryDB = require('./db/memory');
ShareDB.PubSub = require('./pubsub');
ShareDB.MemoryPubSub = require('./pubsub/memory');
ShareDB.prototype.close = function() {
this.pubsub.close();
this.db.close();
for (var name in this.extraDbs) {
this.extraDbs[name].close();
}
};
/** A client has connected through the specified stream. Listen for messages.
* Returns the useragent associated with the connected session.
*
* The optional second argument (req) is an initial request which is passed
* through to any connect() middleware. This is useful for inspecting cookies
* or an express session or whatever on the request object in your middleware.
*
* (The useragent is available through all middleware)
*/
ShareDB.prototype.listen = function(stream, req) {
var agent = new Agent(this, stream);
this.trigger('connect', agent, {stream: stream, req: req}, function(err) {
if (err) return agent.close(err);
agent.pump();
});
return agent;
};
ShareDB.prototype.addProjection = function(name, collection, type, fields) {
if (this.projections[name]) {
throw new Error('Projection ' + name + ' already exists');
}
for (var k in fields) {
if (fields[k] !== true) {
throw new Error('Invalid field ' + k + ' - fields must be {somekey:true}. Subfields not currently supported.');
}
}
this.projections[name] = {
target: collection,
type: ot.normalizeType(type),
fields: fields
};
};
/**
* Add middleware to an action or array of actions
*/
ShareDB.prototype.use = function(action, fn) {
if (Array.isArray(action)) {
for (var i = 0; i < action.length; i++) {
this.use(action[i], fn);
}
return;
}
var fns = this.middleware[action] || (this.middleware[action] = []);
fns.push(fn);
};
/**
* Passes request through the middleware stack
*
* Middleware may modify the request object. After all middleware have been
* invoked we call `callback` with `null` and the modified request. If one of
* the middleware resturns an error the callback is called with that error.
*/
ShareDB.prototype.trigger = function(action, agent, request, callback) {
request.action = action;
request.agent = agent;
request.share = this;
var fns = this.middleware[action];
if (!fns) return callback();
// Copying the triggers we'll fire so they don't get edited while we iterate.
fns = fns.slice();
var next = function(err) {
if (err) return callback(err);
var fn = fns.shift();
if (!fn) return callback(null, request);
fn(request, next);
};
next();
};
ShareDB.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) {
if (projection) {
try {
op = projections.projectOp(projection.fields, op);
} catch (err) {
return callback(err);
}
}
this.trigger('op', agent, {collection: collection, id: id, op: op}, callback);
};
ShareDB.prototype._sanitizeOps = function(agent, projection, collection, id, ops, callback) {
var share = this;
async.each(ops, function(op, eachCb) {
share._sanitizeOp(agent, projection, collection, id, op, eachCb);
}, callback);
};
ShareDB.prototype._sanitizeOpsBulk = function(agent, projection, collection, opsMap, callback) {
var share = this;
async.forEachOf(opsMap, function(ops, id, eachCb) {
share._sanitizeOps(agent, projection, collection, id, ops, eachCb);
}, callback);
};
ShareDB.prototype._sanitizeSnapshot = function(agent, projection, collection, id, snapshot, callback) {
if (projection) {
try {
snapshot = projections.projectSnapshot(projection.fields, snapshot);
} catch (err) {
return callback(err);
}
}
this.trigger('doc', agent, {collection: collection, id: id, snapshot: snapshot}, callback);
};
ShareDB.prototype._sanitizeSnapshots = function(agent, projection, collection, snapshots, callback) {
var share = this;
async.each(snapshots, function(snapshot, eachCb) {
share._sanitizeSnapshot(agent, projection, collection, snapshot.id, snapshot, eachCb);
}, callback);
};
ShareDB.prototype._sanitizeSnapshotBulk = function(agent, projection, collection, snapshotMap, callback) {
var share = this;
async.forEachOf(snapshotMap, function(ops, id, eachCb) {
share._sanitizeSnapshot(agent, projection, collection, id, snapshot, eachCb);
}, callback);
};
ShareDB.prototype._getSnapshotProjection = function(db, projection) {
return (db.projectsSnapshot) ? null : projection;
};
// Non inclusive - gets ops from [from, to). Ie, all relevant ops. If to is
// not defined (null or undefined) then it returns all ops.
ShareDB.prototype.getOps = function(agent, index, id, from, to, callback) {
var start = Date.now();
var projection = this.projections[index];
var collection = (projection) ? projection.target : index;
var share = this;
share.db.getOps(collection, id, from, to, function(err, ops) {
if (err) return callback(err);
share._sanitizeOps(agent, projection, collection, id, ops, function(err) {
if (err) return callback(err);
share.emit('timing', 'getOps', Date.now() - start);
callback(err, ops);
});
});
};
ShareDB.prototype.getOpsBulk = function(agent, index, fromMap, toMap, callback) {
var start = Date.now();
var projection = this.projections[index];
var collection = (projection) ? projection.target : index;
var share = this;
share.db.getOpsBulk(collection, fromMap, toMap, function(err, opsMap) {
if (err) return callback(err);
share._sanitizeOpsBulk(agent, projection, collection, opsMap, function(err) {
if (err) return callback(err);
share.emit('timing', 'getOpsBulk', Date.now() - start);
callback(err, opsMap);
});
});
};
// Submit an operation on the named collection/docname. op should contain a
// {op:}, {create:} or {del:} field. It should probably contain a v: field (if
// it doesn't, it defaults to the current version).
//
// callback called with (err, snapshot, ops)
ShareDB.prototype.submit = function(agent, index, id, op, callback) {
var err = ot.checkOp(op);
if (err) return callback(err);
var request = new SubmitRequest(this, agent, index, id, op);
var share = this;
share.trigger('submit', agent, request, function(err) {
if (err) return callback(err);
request.run(function(err, snapshot, ops) {
if (err) return callback(err);
share.trigger('after submit', agent, request, function(err) {
if (err) return callback(err);
share._sanitizeOps(agent, request.projection, request.collection, id, ops, function(err) {
if (err) return callback(err);
share.emit('timing', 'submit.total', Date.now() - request.start);
callback(err, ops);
});
});
});
});
};
ShareDB.prototype.fetch = function(agent, index, id, callback) {
var projection = this.projections[index];
var collection = (projection) ? projection.target : index;
var fields = projection && projection.fields;
var start = Date.now();
var share = this;
share.db.getSnapshot(collection, id, fields, function(err, snapshot) {
if (err) return callback(err);
var snapshotProjection = share._getSnapshotProjection(share.db, projection);
share._sanitizeSnapshot(agent, snapshotProjection, collection, id, snapshot, function(err) {
if (err) return callback(err);
share.emit('timing', 'fetch', Date.now() - start);
callback(null, snapshot);
});
});
};
ShareDB.prototype.fetchBulk = function(agent, index, ids, callback) {
var projection = this.projections[index];
var collection = (projection) ? projection.target : index;
var fields = projection && projection.fields;
var start = Date.now();
var share = this;
share.db.getSnapshotBulk(collection, ids, fields, function(err, snapshotMap) {
if (err) return done(err);
var snapshotProjection = share._getSnapshotProjection(share.db, projection);
share._sanitizeSnapshotBulk(agent, snapshotProjection, collection, snapshotMap, function(err) {
if (err) return callback(err);
share.emit('timing', 'fetchBulk', Date.now() - start);
callback(null, snapshotMap);
});
});
};
// Subscribe to the document from the specified version or null version
ShareDB.prototype.subscribe = function(agent, index, id, version, callback) {
var projection = this.projections[index];
var collection = (projection) ? projection.target : index;
var channel = this.getDocChannel(collection, id);
var start = Date.now();
var share = this;
share.pubsub.subscribe(channel, function(err, stream) {
if (err) return callback(err);
stream.initDocSubscribe(share, agent, projection, version);
if (version == null) {
// Subscribing from null means that the agent doesn't have a document
// and needs to fetch it as well as subscribing
share.fetch(index, id, function(err, snapshot) {
if (err) return callback(err);
share.emit('timing', 'subscribe', Date.now() - start);
callback(null, stream, snapshot);
});
} else {
share.db.getOps(collection, id, version, null, function(err, ops) {
if (err) return callback(err);
stream.pack(version, ops);
share.emit('timing', 'subscribe', Date.now() - start);
callback(null, stream);
});
}
});
};
ShareDB.prototype.subscribeBulk = function(agent, index, versions, callback) {
var projection = this.projections[index];
var collection = (projection) ? projection.target : index;
var start = Date.now();
var share = this;
var streams = {};
var fetchIds = [];
var opsVersions = null;
async.forEachOf(versions, function(version, id, eachCb) {
if (version == null) {
fetchIds.push(version);
} else {
if (!opsVersions) opsVersions = {};
opsVersions[id] = version;
}
var channel = share.getDocChannel(collection, id);
share.pubsub.subscribe(channel, function(err, stream) {
if (err) return eachCb(err);
stream.initDocSubscribe(share, agent, projection, version);
streams[id] = stream;
eachCb();
});
}, function(err) {
if (err) {
closeStreams(streams);
return callback(err);
}
async.parallel({
snapshotMap: function(parallelCb) {
if (!fetchIds.length) return parallelCb(null, {});
share.fetchBulk(agent, index, fetchIds, parallelCb);
},
ops: function(parallelCb) {
if (!opsVersions) return parallelCb();
this.db.getOpsBulk(collection, opsVersions, null, function(err, opsMap) {
if (err) return parallelCb(err);
for (var id in opsVersions) {
var version = opsVersions[id];
var ops = opsMap[id];
streams[index][id].pack(version, ops);
}
parallelCb();
});
}
}, function(err, results) {
if (err) {
closeStreams(streams);
return callback(err);
}
share.emit('timing', 'subscribeBulk', Date.now() - start);
callback(null, streams, results.snapshotMap);
});
});
};
ShareDB.prototype.queryFetch = function(agent, index, query, options, callback) {
var start = Date.now();
var share = this;
share._triggerQuery(agent, index, query, options, function(err, request) {
if (err) return callback(err);
share._query(agent, request, function(err, snapshots, extra) {
if (err) return callback(err);
share.emit('timing', 'queryFetch', Date.now() - start, request);
callback(null, snapshots, extra);
});
});
};
// Options can contain:
// db: The name of the DB (if the DB is specified in the otherDbs when the share instance is created)
// skipPoll: function(collection, id, op, query) {return true or false; }
// this is a syncronous function which can be used as an early filter for
// operations going through the system to reduce the load on the DB.
// pollDebounce: Minimum delay between subsequent database polls. This is
// used to batch updates to reduce load on the database at the expense of
// liveness. Defaults to 1000 (1 second)
ShareDB.prototype.querySubscribe = function(agent, index, query, options, callback) {
var start = Date.now();
var share = this;
share._triggerQuery(agent, index, query, options, function(err, request) {
if (err) return callback(err);
if (request.db.disableSubscribe) return callback({message: 'DB does not support subscribe'});
share.pubsub.subscribe(request.channel, function(err, stream) {
if (err) return callback(err);
// Issue query on db to get our initial results
stream.projection = request.projection;
stream.share = share;
stream.agent = agent;
share._query(agent, request, function(err, snapshots, extra) {
if (err) {
stream.destroy();
return callback(err);
}
var queryEmitter = new QueryEmitter(request, stream, snapshots, extra);
share.emit('timing', 'querySubscribe', Date.now() - start, request);
callback(null, queryEmitter, snapshots, extra);
});
});
});
};
ShareDB.prototype._triggerQuery = function(agent, index, query, options, callback) {
var projection = this.projections[index];
var collection = (projection) ? projection.target : index;
var fields = projection && projection.fields;
var request = {
index: index,
collection: collection,
projection: projection,
fields: fields,
channel: this.getCollectionChannel(collection),
query: query,
options: options,
db: null,
snapshotProjection: null,
};
var share = this;
share.trigger('query', agent, request, function(err) {
if (err) return callback(err);
// Set the DB reference for the request after the middleware trigger so
// that the db option can be changed in middleware
request.db = (options.db) ? this.extraDbs[options.db] : share.db;
if (!request.db) return callback({message: 'DB not found'});
request.snapshotProjection = share._getSnapshotProjection(request.db, projection);
callback(null, request);
});
};
ShareDB.prototype._query = function(agent, request, callback) {
var share = this;
request.db.query(request.collection, request.query, request.fields, request.options, function(err, snapshots, extra) {
if (err) return callback(err);
share._sanitizeSnapshots(agent, request.snapshotProjection, request.collection, snapshots, function(err) {
callback(err, snapshots, extra);
});
});
};
ShareDB.prototype.getCollectionChannel = function(collection) {
return collection;
};
ShareDB.prototype.getDocChannel = function(collection, id) {
return collection + '.' + id;
};
ShareDB.prototype.getChannels = function(collection, id) {
return [
this.getCollectionChannel(collection),
this.getDocChannel(collection, id)
];
};
Backend.Agent = require('./agent');
Backend.Backend = Backend;
Backend.DB = require('./db');
Backend.MemoryDB = require('./db/memory');
Backend.MemoryPubSub = require('./pubsub/memory');
Backend.ot = require('./ot');
Backend.projections = require('./projections');
Backend.PubSub = require('./pubsub');
Backend.QueryEmitter = require('./query-emitter');
Backend.SubmitRequest = require('./submit-request');
Backend.types = require('./types');

@@ -13,3 +13,3 @@ var assert = require('assert');

this.id = null;
this.share = null;
this.backend = null;
this.agent = null;

@@ -33,4 +33,4 @@ this.projection = null;

OpStream.prototype.initDocSubscribe = function(share, agent, projection, version) {
this.share = share;
OpStream.prototype.initDocSubscribe = function(backend, agent, projection, version) {
this.backend = backend;
this.agent = agent;

@@ -55,5 +55,5 @@ this.projection = projection;

if (this.share) {
if (this.backend) {
var stream = this;
this.share._sanitizeOp(this.agent, this.projection, op.collection, op.id, op, function(err, op) {
this.backend._sanitizeOp(this.agent, this.projection, op.c, op.d, op, function(err) {
stream.push(err ? {error: err} : op);

@@ -60,0 +60,0 @@ });

@@ -6,3 +6,3 @@ var deepEquals = require('deep-is');

function QueryEmitter(request, stream, snapshots, extra) {
this.share = request.share;
this.backend = request.backend;
this.agent = request.agent;

@@ -21,3 +21,3 @@ this.db = request.db;

this.skipPoll = this.options.skipPoll || util.doNothing;
this.canPollDoc = this.db.canPollDoc(collection, query);
this.canPollDoc = this.db.canPollDoc(this.collection, this.query);
this.pollDebounce =

@@ -45,6 +45,4 @@ (this.options.pollDebounce != null) ? this.options.pollDebounce :

if (data.error) {
// Ignore errors in op stream instead of passing them through to the
// client, since they likely aren't this client's fault
console.error('Error in query op stream:', emitter.index, emitter.query);
console.error(data.error);
this.emitError(data.error);
continue;

@@ -61,20 +59,14 @@ }

QueryEmitter.prototype._emitTiming = function(action, start) {
this.share.emit('timing', action, Date.now() - start, this.index, this.query);
this.backend.emit('timing', action, Date.now() - start, this.index, this.query);
};
QueryEmitter.prototype.update = function(op) {
// Ignore if the user or database say we don't need to poll.
//
// Try/catch this, since we want to make sure polling from a malformed op
// doesn't crash the server. Note that we are just quietly logging the error
// instead of emitting it, because any errors thrown here are likely due to
// malformed ops coming from a different client. We don't want to send errors
// to clients that aren't responsible for causing them.
var id = op.d;
// Ignore if the user or database say we don't need to poll
try {
if (this.skipPoll(this.collection, op.id, op, this.query)) return;
if (this.db.skipPoll(this.collection, op.id, op, this.query)) return;
if (this.skipPoll(this.collection, id, op, this.query)) return;
if (this.db.skipPoll(this.collection, id, op, this.query)) return;
} catch (err) {
console.error('Error evaluating skipPoll:', this.collection, op.id, op, this.query);
console.error(err.stack || err);
return;
console.error('Error evaluating skipPoll:', this.collection, id, op, this.query);
return this.emitError(err);
}

@@ -84,3 +76,3 @@ if (this.canPollDoc) {

// op has changed whether or not it matches the results
this.queryPollDoc(op.id);
this.queryPollDoc(id);
} else {

@@ -144,3 +136,3 @@ // We need to do a full poll of the query, because the query uses limits,

if (err) return emitter._finishPoll(err);
emitter.share._sanitizeSnapshotBulk(emitter.agent, emitter.snapshotProjection, emitter.collection, snapshotMap, function(err) {
emitter.backend._sanitizeSnapshotBulk(emitter.agent, emitter.snapshotProjection, emitter.collection, snapshotMap, function(err) {
if (err) return emitter._finishPoll(err);

@@ -207,3 +199,3 @@ emitter._emitTiming('query.pollGetSnapshotBulk', start);

QueryEmitter.prototype.emitOp = function(op) {
if (this.ids.indexOf(op.id) === -1) return;
if (this.ids.indexOf(op.d) === -1) return;
this.onOp(op);

@@ -210,0 +202,0 @@ };

var ot = require('./ot');
function SubmitRequest(share, agent, index, id, op) {
this.share = share;
function SubmitRequest(backend, agent, index, id, op) {
this.backend = backend;
this.agent = agent;
// If a projection, rewrite the call into a call against the collection
var projection = share.projections[index];
var projection = backend.projections[index];
this.index = index;

@@ -13,2 +13,4 @@ this.projection = projection;

this.op = op;
this.start = Date.now();
this._addOpMeta();

@@ -21,5 +23,4 @@

this.start = Date.now();
this.suppressPublish = share.suppressPublish;
this.maxRetries = share.maxSubmitRetries;
this.suppressPublish = backend.suppressPublish;
this.maxRetries = backend.maxSubmitRetries;
this.retries = 0;

@@ -40,3 +41,3 @@

var request = this;
var share = this.share;
var backend = this.backend;
var collection = this.collection;

@@ -48,3 +49,3 @@ var id = this.id;

share.db.getSnapshot(collection, id, fields, function(err, snapshot) {
backend.db.getSnapshot(collection, id, fields, function(err, snapshot) {
if (err) return callback(err);

@@ -66,5 +67,13 @@

if (op.v === snapshot.v || op.v == null) {
// The snapshot hasn't changed since the op's base version, or the op
// was submitted with a null version. Apply without transforming the op
if (op.v == null) {
// Submitting an op with a null version means that it should get the
// version from the latest snapshot. Generally this will mean the op
// won't be transformed, though transform could be called on it in the
// case of a retry from a simultaneous submit
op.v = snapshot.v;
}
if (op.v === snapshot.v) {
// The snapshot hasn't changed since the op's base version. Apply
// without transforming the op
return request.apply(callback);

@@ -85,3 +94,3 @@ }

var from = op.v;
share.db.getOpsToSnapshot(collection, id, from, snapshot, function(err, ops) {
backend.db.getOpsToSnapshot(collection, id, from, snapshot, function(err, ops) {
if (err) return callback(err);

@@ -147,6 +156,6 @@

// modified in a middleware and we retry, we want to reset to a new array
this.channels = this.share.getChannels(this.collection, this.id);
this.channels = this.backend.getChannels(this.collection, this.id);
var request = this;
this.share.trigger('apply', this.agent, this, function(err) {
this.backend.trigger('apply', this.agent, this, function(err) {
if (err) return callback(err);

@@ -165,9 +174,10 @@ if (!request.op) return callback();

var request = this;
var share = this.share;
share.trigger('commit', this.agent, this, function(err) {
var backend = this.backend;
backend.trigger('commit', this.agent, this, function(err) {
if (err) return callback(err);
if (!request.op) return callback();
var op = request.op;
if (!op) return callback();
// Try committing the operation and snapshot to the database atomically
share.db.commit(request.collection, request.id, request.op, request.snapshot, function(err, succeeded) {
backend.db.commit(request.collection, request.id, op, request.snapshot, function(err, succeeded) {
if (err) return callback(err);

@@ -180,3 +190,9 @@ if (!succeeded) {

if (!request.suppressPublish) {
share.pubsub.publish(request.channels, request.op);
op.c = request.collection;
op.d = request.id;
op.m = undefined;
// Needed for agent to detect if it can ignore sending the op back to
// the client that submitted it in subscriptions
if (request.collection !== request.index) op.i = request.index;
backend.pubsub.publish(request.channels, op);
}

@@ -197,3 +213,3 @@ callback(err, request.snapshot, request.ops);

}
this.share.emit('timing', 'submit.retry', Date.now() - this.start, this.retries);
this.backend.emit('timing', 'submit.retry', Date.now() - this.start, this.retries);
this.run(callback);

@@ -213,3 +229,3 @@ };

var err = {
code: 4000,
code: 4001,
message: 'Op already submitted'

@@ -216,0 +232,0 @@ };

{
"name": "sharedb",
"version": "0.8.0",
"version": "0.8.1",
"description": "JSON OT database backend",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc