New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

sharedb

Package Overview
Dependencies
Maintainers
1
Versions
141
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

sharedb - npm Package Compare versions

Comparing version 0.10.2 to 0.11.0

test/client/doc.js

71

lib/agent.js

@@ -15,3 +15,2 @@ var hat = require('hat');

function Agent(backend, stream) {
// The stream passed in should be a nodejs 0.10-style stream.
this.backend = backend;

@@ -30,6 +29,2 @@ this.stream = stream;

// Subscriptions care about the stream being destroyed. We end up with a
// listener per subscribed document for the client, which can be a lot.
stream.setMaxListeners(0);
// We need to track this manually to make sure we don't reply to messages

@@ -40,7 +35,2 @@ // after the stream was closed. There's no built-in way to ask a stream

var agent = this;
stream.once('end', function() {
agent._cleanup();
});
// Initialize the remote client by sending it its agent Id.

@@ -59,7 +49,6 @@ this._send({

if (err) {
console.warn('Agent closed due to error', err);
this.stream.emit('error', err);
console.warn('Agent closed due to error', this.clientId, err.stack || err);
}
if (this.closed) return;
// This will emit 'end', which will call _cleanup
// This will end the writable stream and emit 'finish'
this.stream.end();

@@ -69,9 +58,5 @@ };

Agent.prototype._cleanup = function() {
if (this.closed) return;
this.closed = true;
// Remove the pump listener
this.stream.removeAllListeners('readable');
// Clean up all the subscriptions.
// Clean up doc subscription streams
for (var collection in this.subscribedDocs) {

@@ -84,5 +69,5 @@ var docs = this.subscribedDocs[collection];

}
// Cancel the subscribes
this.subscribedDocs = {};
// Clean up query subscription streams
for (var id in this.subscribedQueries) {

@@ -92,3 +77,2 @@ var emitter = this.subscribedQueries[id];

}
// Cancel the subscribes
this.subscribedQueries = {};

@@ -224,29 +208,25 @@ };

// start processing events from the stream. This calls itself recursively.
// Use .close() to drain the pump.
// Start processing events from the stream
Agent.prototype.pump = function() {
if (this.closed) return;
var req = this.stream.read();
var agent = this;
if (req == null) {
// Retry when there's a message waiting for us.
this.stream.once('readable', function() {
agent.pump();
});
return;
}
if (typeof req === 'string') {
this.stream.on('data', function(chunk) {
if (agent.closed) return;
if (typeof chunk !== 'string') {
var err = {message: 'Received non-string message'};
return agent.close(err);
}
try {
req = JSON.parse(req);
} catch(e) {
console.warn('Client sent invalid JSON', e.stack);
this.close(e);
var req = JSON.parse(chunk);
} catch (err) {
return agent.close(err);
}
}
this._handleMessage(req);
// Clean up the stack then read the next message
process.nextTick(function() {
agent.pump();
agent._handleMessage(req);
});
this.stream.on('end', function() {
agent._cleanup();
});
};

@@ -339,8 +319,7 @@

}
return {
ids: ids,
fetch: fetch,
fetchOps: fetchOps,
db: req.db
};
var options = req.o || {};
options.ids = ids;
options.fetch = fetch;
options.fetchOps = fetchOps;
return options;
}

@@ -347,0 +326,0 @@

@@ -1,2 +0,1 @@

var Duplex = require('stream').Duplex;
var async = require('async');

@@ -45,4 +44,3 @@ var Agent = require('./agent');

Backend.prototype.connect = function(connection) {
var stream = new Duplex({objectMode: true});
var socket = new StreamSocket(stream);
var socket = new StreamSocket();
if (connection) {

@@ -54,3 +52,3 @@ connection.bindToSocket(socket);

socket._open();
this.listen(stream);
this.listen(socket.stream);
return connection;

@@ -75,3 +73,3 @@ };

Backend.prototype.addProjection = function(name, collection, type, fields) {
Backend.prototype.addProjection = function(name, collection, fields) {
if (this.projections[name]) {

@@ -81,5 +79,5 @@ throw new Error('Projection ' + name + ' already exists');

for (var k in fields) {
if (fields[k] !== true) {
throw new Error('Invalid field ' + k + ' - fields must be {somekey:true}. Subfields not currently supported.');
for (var key in fields) {
if (fields[key] !== true) {
throw new Error('Invalid field ' + key + ' - fields must be {somekey: true}. Subfields not currently supported.');
}

@@ -90,3 +88,2 @@ }

target: collection,
type: ot.normalizeType(type),
fields: fields

@@ -93,0 +90,0 @@ };

@@ -31,4 +31,3 @@ var Doc = require('./doc');

// Each query is created with an id that the server uses when it sends us
// info about the query (updates, etc).
//this.nextQueryId = (Math.random() * 1000) |0;
// info about the query (updates, etc)
this.nextQueryId = 1;

@@ -39,14 +38,13 @@

// A unique message number for the given id
this.seq = 1;
// Equals agent.clientId on the server
this.id = null;
// Private variable to support clearing of op retry interval
this._retryInterval = null;
// Reset some more state variables.
this.reset();
this.debug = false;
// I'll store the most recent 100 messages so when errors occur we can see
// what happened.
this.messageBuffer = [];
this.bindToSocket(socket);

@@ -85,9 +83,11 @@ }

// State of the connection. The correspoding events are emmited when this
// changes. Available states are:
// - 'connecting' The connection has been established, but we don't have our
// client ID yet
// - 'connected' We have connected and recieved our client ID. Ready for data.
// - 'disconnected' The connection is closed, but it will reconnect automatically.
// - 'stopped' The connection is closed, and should not reconnect.
// State of the connection. The correspoding events are emmited when this changes
//
// - 'connecting' The connection is still being established, or we are still
// waiting on the server to send us the initialization message
// - 'connected' The connection is open and we have connected to a server
// and recieved the initialization message
// - 'disconnected' Connection is closed, but it will reconnect automatically
// - 'closed' The connection was closed by the client, and will not reconnect
// - 'stopped' The connection was closed by the server, and will not reconnect
this.state = (socket.readyState === 0 || socket.readyState === 1) ? 'connecting' : 'disconnected';

@@ -101,18 +101,14 @@

socket.onmessage = function(msg) {
var data = msg.data;
socket.onmessage = function(event) {
try {
var data = (typeof event.data === 'string') ?
JSON.parse(event.data) :
event.data;
} catch (err) {
// Silently ignore parsing errors
return;
}
// Some transports don't need parsing.
if (typeof data === 'string') data = JSON.parse(data);
if (connection.debug) console.log('RECV', JSON.stringify(data));
connection.messageBuffer.push({
t: (new Date()).toTimeString(),
recv: JSON.stringify(data)
});
while (connection.messageBuffer.length > 100) {
connection.messageBuffer.shift();
}
try {

@@ -129,3 +125,3 @@ connection.handleMessage(data);

socket.onerror = function(e) {
socket.onerror = function(err) {
// This isn't the same as a regular error, because it will happen normally

@@ -135,7 +131,7 @@ // from time to time. Your connection should probably automatically

// (onclose happens when onerror gets called anyway).
connection.emit('connection error', e);
connection.emit('connection error', err);
};
socket.onclose = function(reason) {
// reason values:
// node-browserchannel reason values:
// 'Closed' - The socket was manually closed by calling socket.close()

@@ -145,5 +141,11 @@ // 'Stopped by server' - The server sent the stop message to tell the client not to try connecting

// 'Unknown session ID' - Server session for client is missing (temporary, will immediately reestablish)
connection._setState('disconnected', reason);
if (reason === 'Closed' || reason === 'Stopped by server') {
if (reason === 'closed' || reason === 'Closed') {
connection._setState('closed', reason);
} else if (reason === 'stopped' || reason === 'Stopped by server') {
connection._setState('stopped', reason);
} else {
connection._setState('disconnected', reason);
}

@@ -250,13 +252,10 @@ };

Connection.prototype.reset = function() {
Connection.prototype._reset = function() {
this.seq = 1;
this.id = null;
clearInterval(this._retryInterval);
this._retryInterval = null;
};
Connection.prototype._setupRetry = function() {
if (!this.canSend) {
clearInterval(this._retryInterval);
this._retryInterval = null;
return;
}
if (this._retryInterval != null) return;

@@ -283,3 +282,3 @@

if (
(newState === 'connecting' && this.state !== 'disconnected' && this.state !== 'stopped') ||
(newState === 'connecting' && this.state !== 'disconnected' && this.state !== 'stopped' && this.state !== 'closed') ||
(newState === 'connected' && this.state !== 'connecting')

@@ -294,4 +293,4 @@ ) {

if (newState === 'disconnected' || newState === 'stopped') this.reset();
this._setupRetry();
if (newState === 'disconnected' || newState === 'stopped' || newState === 'closed') this._reset();
if (this.canSend) this._setupRetry();

@@ -417,7 +416,2 @@ // Group subscribes together to help server make more efficient calls

this.messageBuffer.push({t:Date.now(), send:JSON.stringify(msg)});
while (this.messageBuffer.length > 100) {
this.messageBuffer.shift();
}
this.socket.send(JSON.stringify(msg));

@@ -428,5 +422,5 @@ };

/**
* Closes the socket and emits 'disconnected'
* Closes the socket and emits 'closed'
*/
Connection.prototype.disconnect = function() {
Connection.prototype.close = function() {
this.socket.close();

@@ -492,4 +486,2 @@ };

Connection.prototype._createQuery = function(action, collection, q, options, callback) {
if (!options) options = {};
var id = this.nextQueryId++;

@@ -534,11 +526,8 @@ var query = new Query(action, this, id, collection, q, options, callback);

this._firstDoc(hasPending) ||
this._firstQuery(callbackPending)
this._firstQuery(hasPending)
);
};
function hasPending(doc) {
return doc.hasPending();
function hasPending(object) {
return object.hasPending();
}
function callbackPending(query) {
return query.callback;
}

@@ -548,4 +537,4 @@ Connection.prototype.hasWritePending = function() {

};
function hasWritePending(doc) {
return doc.hasWritePending();
function hasWritePending(object) {
return object.hasWritePending();
}

@@ -563,5 +552,6 @@

}
var query = this._firstQuery(callbackPending);
var query = this._firstQuery(hasPending);
if (query) {
query.once('ready', this._nothingPendingRetry(callback));
return;
}

@@ -568,0 +558,0 @@ // Call back when no pending operations

@@ -334,3 +334,3 @@ var emitter = require('../emitter');

}
if (callback) this.pendingFetch.push(callback);
this.pendingFetch.push(callback);
};

@@ -346,3 +346,3 @@

}
if (callback) this.pendingFetch.push(callback);
this.pendingFetch.push(callback);
};

@@ -389,8 +389,2 @@

// Clear any no-ops from the front of the pending op list.
while (this.pendingOps.length && isNoOp(this.pendingOps[0])) {
var op = this.pendingOps.shift();
callEach(op.callbacks);
}
// Send first pending op unless paused

@@ -409,6 +403,2 @@ if (!this.paused && this.pendingOps.length) {

function isNoOp(op) {
return !op.op && !op.create && !op.del;
}
// Try to compose op2 into op1. Returns truthy if it succeeds, otherwise falsy.

@@ -419,13 +409,9 @@ function tryCompose(type, op1, op2) {

return true;
}
} else if (op1.op && op2.op && type.compose) {
if (op1.op && op2.op && type.compose) {
op1.op = type.compose(op1.op, op2.op);
return true;
}
} else if (isNoOp(op1)) {
op1.create = op2.create;
op1.del = op2.del;
op1.op = op2.op;
return true;
}
return false;

@@ -432,0 +418,0 @@ }

@@ -21,15 +21,17 @@ var emitter = require('../emitter');

// The db we actually hit. If this isn't defined, it hits the snapshot
// database. Otherwise this can be used to hit another configured query
// index.
this.db = options.db;
// A list of resulting documents. These are actual documents, complete with
// data and all the rest. It is possible to pass in an initial results set,
// so that a query can be serialized and then re-established
this.results = options.results;
this.results = null;
if (options && options.results) {
this.results = options.results;
delete options.results;
}
this.extra = undefined;
// Options to pass through with the query
this.options = options;
this.callback = callback;
this.ready = false;
this.sent = false;

@@ -39,2 +41,6 @@ }

Query.prototype.hasPending = function() {
return !this.ready;
}
// Helper for subscribe & fetch, since they share the same message format.

@@ -52,3 +58,5 @@ //

};
if (this.db != null) msg.db = this.db;
if (this.options) {
msg.o = this.options;
}
if (this.results) {

@@ -76,2 +84,5 @@ // Collect the version of all the documents in the current result set so we

this.connection._destroyQuery(this);
// There is a callback for consistency, but we don't actually wait for the
// server's unsubscribe message currently
if (callback) process.nextTick(callback);
};

@@ -141,2 +152,3 @@

this.emit('ready');
this.ready = true;
if (err) {

@@ -143,0 +155,0 @@ this.connection._destroyQuery(this);

@@ -86,7 +86,4 @@ var DB = require('./index');

// The memory database has a really simple (probably too simple) query
// mechanism to get all documents in the collection. The query is just the
// collection name.
// Ignore the query - Returns all documents in the specified collection
// The memory database query function returns all documents in a collection
// regardless of query by default
MemoryDB.prototype.query = function(collection, query, fields, options, callback) {

@@ -101,7 +98,18 @@ var db = this;

}
callback(null, snapshots);
try {
var filtered = db._querySync(snapshots, query, options);
callback(null, filtered);
} catch (err) {
callback(err);
}
});
};
// For testing, it may be useful to implement the desired query language by
// defining this function
MemoryDB.prototype._querySync = function(snapshots, query, options) {
return snapshots;
};
MemoryDB.prototype._writeOpSync = function(collection, id, op) {

@@ -108,0 +116,0 @@ var opLog = this._getOpLogSync(collection, id);

@@ -38,3 +38,3 @@ // This contains the master OT functions for the database. They look like

exports.normalizeType = function(typeName) {
return types[typeName].uri;
return types[typeName] && types[typeName].uri;
};

@@ -101,10 +101,2 @@

// This is a helper function to catchup a document by a list of operations.
exports.applyAll = function(snapshot, ops) {
for (var i = 0; i < ops.length; i++) {
var err = exports.apply(snapshot, ops[i]);
if (err) return err;
}
};
exports.transform = function(type, op, appliedOp) {

@@ -111,0 +103,0 @@ // There are 16 cases this function needs to deal with - which are all the

@@ -11,7 +11,4 @@ var json0 = require('ot-json0').type;

function projectSnapshot(fields, snapshot) {
// Nothing to project if there isn't any data
if (snapshot.data == null) return;
// Only json0 supported right now
if (snapshot.type !== json0.uri) {
if (snapshot.type && snapshot.type !== json0.uri) {
throw new Error('Cannot project snapshots of type ' + snapshot.type);

@@ -24,3 +21,3 @@ }

if (op.create) {
op.create.data = projectData(fields, op.create.data);
projectSnapshot(fields, op.create);
}

@@ -65,9 +62,9 @@ if (op.op) {

return isSnapshotAllowed(fields, op.create);
} else if (op.op) {
if (knownType !== json0.uri) return false;
}
if (op.op) {
if (knownType && knownType !== json0.uri) return false;
return isEditAllowed(fields, op.op);
} else {
// Noop and del are both ok.
return true;
}
// Noop and del are both ok.
return true;
}

@@ -77,10 +74,10 @@

function isSnapshotAllowed(fields, snapshot) {
if (snapshot.type !== json0.uri) {
if (snapshot.type && snapshot.type !== json0.uri) {
return false;
}
if (snapshot.data === undefined) {
if (snapshot.data == null) {
return true;
}
// Data must be an object if not undefined
if (!snapshot.data || typeof snapshot.data !== 'object' || Array.isArray(snapshot.data)) {
// Data must be an object if not null
if (typeof snapshot.data !== 'object' || Array.isArray(snapshot.data)) {
return false;

@@ -107,4 +104,8 @@ }

function projectData(fields, data) {
// Return back null or undefined
if (data == null) {
return data;
}
// If data is not an object, the projected version just looks like null.
if (!data || typeof data !== 'object' || Array.isArray(data)) {
if (typeof data !== 'object' || Array.isArray(data)) {
return null;

@@ -111,0 +112,0 @@ }

@@ -212,6 +212,9 @@ var deepEquals = require('deep-is');

if (err) return callback(err);
emitter._emitTiming('query.pollDocGetSnapshot', start);
var values = [snapshot];
emitter.onDiff([new arraydiff.InsertDiff(index, values)]);
callback();
emitter.backend._sanitizeSnapshot(emitter.agent, emitter.snapshotProjection, emitter.collection, id, snapshot, function(err) {
if (err) return callback(err);
var values = [snapshot];
emitter.onDiff([new arraydiff.InsertDiff(index, values)]);
emitter._emitTiming('query.pollDocGetSnapshot', start);
callback();
});
});

@@ -218,0 +221,0 @@ return;

@@ -0,16 +1,22 @@

var Duplex = require('stream').Duplex;
var util = require('./util');
function StreamSocket(stream) {
this.stream = stream;
function StreamSocket() {
this.stream = new Duplex({objectMode: true});
this.readyState = 0;
var socket = this;
stream._read = util.doNothing;
stream._write = function(chunk, encoding, callback) {
socket.onmessage({
type: 'message',
data: chunk
this.stream._read = util.doNothing;
this.stream._write = function(chunk, encoding, callback) {
process.nextTick(function() {
if (socket.readyState !== 1) return;
socket.onmessage({data: chunk});
callback();
});
callback();
};
// The server ended the writable stream. Triggered by calling stream.end()
// in agent.close()
this.stream.once('finish', function() {
socket.close('stopped');
});
}

@@ -20,16 +26,16 @@ module.exports = StreamSocket;

StreamSocket.prototype._open = function() {
if (this.readyState !== 0) return;
this.readyState = 1;
this.onopen();
};
StreamSocket.prototype.close = function() {
StreamSocket.prototype.close = function(reason) {
if (this.readyState === 3) return;
this.readyState = 3;
this.stream.end();
this.stream.emit('close');
this.stream.emit('end');
this.onclose();
// Signal data writing is complete. Emits the 'end' event
this.stream.push(null);
this.onclose(reason || 'closed');
};
StreamSocket.prototype.send = function(data) {
var copy = JSON.parse(JSON.stringify(data));
this.stream.push(copy);
// Data is a string of JSON
this.stream.push(data);
};

@@ -40,2 +46,1 @@ StreamSocket.prototype.onmessage = util.doNothing;

StreamSocket.prototype.onopen = util.doNothing;
StreamSocket.prototype.onconnecting = util.doNothing;
var ot = require('./ot');
var projections = require('./projections');

@@ -44,3 +45,3 @@ function SubmitRequest(backend, agent, index, id, op) {

// With a null projection, it strips document metadata
var fields = 'submit';
var fields = {$submit: true};

@@ -47,0 +48,0 @@ backend.db.getSnapshot(collection, id, fields, function(err, snapshot) {

{
"name": "sharedb",
"version": "0.10.2",
"version": "0.11.0",
"description": "JSON OT database backend",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -1,88 +0,64 @@

var sinon = require('sinon');
var assert = require('assert');
var Connection = require('../../lib/client').Connection;
var Doc = require('../../lib/client').Doc;
var expect = require('expect.js');
var Backend = require('../../lib/backend');
describe('Connection', function() {
var socket = {
readyState: 0,
send: function() {},
close: function() {
this.readyState = 3;
this.onclose();
}
};
describe('client connection', function() {
beforeEach(function() {
socket.readyState = 0;
this.connection = new Connection(socket);
this.backend = new Backend();
});
describe('state and socket', function() {
it('is set to disconnected', function() {
socket.readyState = 3;
var connection = new Connection(socket);
assert.equal(connection.state, 'disconnected');
it('ends the agent stream when a connection is closed after connect', function(done) {
this.backend.use('connect', function(request, next) {
request.agent.stream.on('end', function() {
done();
});
next();
});
it('is set to connecting', function() {
socket.readyState = 1;
var connection = new Connection(socket);
assert.equal(connection.state, 'connecting');
var connection = this.backend.connect();
connection.on('connected', function() {
connection.close();
});
});
describe('socket onopen', function() {
beforeEach(function() {
socket.readyState = 3;
this.connection = new Connection(socket);
it('ends the agent stream when a connection is immediately closed', function(done) {
this.backend.use('connect', function(request, next) {
request.agent.stream.on('end', function() {
done();
});
next();
});
it('sets connecting state', function() {
assert.equal(this.connection.state, 'disconnected');
socket.onopen();
assert.equal(this.connection.state, 'connecting');
});
var connection = this.backend.connect();
connection.close();
});
describe('socket onclose', function() {
it('sets disconnected state', function() {
assert.equal(this.connection.state, 'connecting');
socket.close();
assert.equal(this.connection.state, 'disconnected');
it('emits closed event on call to connection.close()', function(done) {
var connection = this.backend.connect();
connection.on('closed', function() {
done();
});
connection.close();
});
describe('socket onmessage', function() {
it('calls handle message', function() {
var handleMessage = sinon.spy(this.connection, 'handleMessage');
socket.onmessage({data: {key: 'value'}});
sinon.assert.calledWith(handleMessage, {
key: 'value'
it('ends the agent steam on call to agent.close()', function(done) {
this.backend.use('connect', function(request, next) {
request.agent.stream.on('end', function() {
done();
});
request.agent.close();
next();
});
var connection = this.backend.connect();
})
it('pushes message buffer', function() {
assert(this.connection.messageBuffer.length === 0);
socket.onmessage({data: {key: 'value'}});
assert(this.connection.messageBuffer.length === 1);
it('emits stopped event on call to agent.close()', function(done) {
this.backend.use('connect', function(request, next) {
request.agent.close();
next();
});
var connection = this.backend.connect();
connection.on('stopped', function() {
done();
});
});
describe('#disconnect', function() {
it('calls socket.close()', function() {
var close;
close = sinon.spy(socket, 'close');
this.connection.disconnect();
sinon.assert.calledOnce(close);
close.reset();
});
it('emits disconnected', function() {
var emit = sinon.spy(this.connection, 'emit');
this.connection.disconnect();
sinon.assert.calledWith(emit, 'disconnected');
emit.reset();
});
});
});

@@ -1,4 +0,4 @@

var Backend = require('../../lib/backend');
var expect = require('expect.js');
var async = require('async');
var util = require('../util');

@@ -10,4 +10,3 @@ module.exports = function() {

it(method + ' on an empty collection', function(done) {
var backend = new Backend({db: this.db});
var connection = backend.connect();
var connection = this.backend.connect();
connection[method]('dogs', {}, null, function(err, results) {

@@ -21,4 +20,3 @@ if (err) return done(err);

it(method + ' on collection with fetched docs', function(done) {
var backend = new Backend({db: this.db});
var connection = backend.connect();
var connection = this.backend.connect();
async.parallel([

@@ -32,5 +30,5 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); },

if (err) return done(err);
sortById(results);
expect(pluck(results, 'id')).eql(['fido', 'spot']);
expect(pluck(results, 'data')).eql([{age: 3}, {age: 5}]);
var sorted = util.sortById(results);
expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']);
expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 5}]);
done();

@@ -42,4 +40,4 @@ });

it(method + ' on collection with unfetched docs', function(done) {
var backend = new Backend({db: this.db});
var connection = backend.connect();
var connection = this.backend.connect();
var connection2 = this.backend.connect();
async.parallel([

@@ -51,8 +49,7 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); },

if (err) return done(err);
var connection2 = backend.connect();
connection2[method]('dogs', {}, null, function(err, results) {
if (err) return done(err);
sortById(results);
expect(pluck(results, 'id')).eql(['fido', 'spot']);
expect(pluck(results, 'data')).eql([{age: 3}, {age: 5}]);
var sorted = util.sortById(results);
expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']);
expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 5}]);
done();

@@ -64,4 +61,4 @@ });

it(method + ' on collection with one fetched doc', function(done) {
var backend = new Backend({db: this.db});
var connection = backend.connect();
var connection = this.backend.connect();
var connection2 = this.backend.connect();
async.parallel([

@@ -73,3 +70,2 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); },

if (err) return done(err);
var connection2 = backend.connect();
connection2.get('dogs', 'fido').fetch(function(err) {

@@ -79,5 +75,5 @@ if (err) return done(err);

if (err) return done(err);
sortById(results);
expect(pluck(results, 'id')).eql(['fido', 'spot']);
expect(pluck(results, 'data')).eql([{age: 3}, {age: 5}]);
var sorted = util.sortById(results);
expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']);
expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 5}]);
done();

@@ -90,4 +86,4 @@ });

it(method + ' on collection with one fetched doc missing an op', function(done) {
var backend = new Backend({db: this.db});
var connection = backend.connect();
var connection = this.backend.connect();
var connection2 = this.backend.connect();
async.parallel([

@@ -99,3 +95,2 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); },

if (err) return done(err);
var connection2 = backend.connect();
connection2.get('dogs', 'fido').fetch(function(err) {

@@ -115,5 +110,5 @@ if (err) return done(err);

if (err) return done(err);
sortById(results);
expect(pluck(results, 'id')).eql(['fido', 'spot']);
expect(pluck(results, 'data')).eql([{age: 4}, {age: 5}]);
var sorted = util.sortById(results);
expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']);
expect(util.pluck(sorted, 'data')).eql([{age: 4}, {age: 5}]);
done();

@@ -130,17 +125,1 @@ });

};
function sortById(docs) {
docs.sort(function(a, b) {
if (a.id > b.id) return 1;
if (b.id > a.id) return -1;
return 0;
});
}
function pluck(docs, key) {
var values = [];
for (var i = 0; i < docs.length; i++) {
values.push(docs[i][key]);
}
return values;
}

@@ -1,4 +0,3 @@

var Backend = require('../../lib/backend');
var async = require('async');
var expect = require('expect.js');
var async = require('async');

@@ -8,23 +7,4 @@ module.exports = function() {

it('getting twice returns the same doc', function() {
var backend = new Backend({db: this.db});
var connection = backend.connect();
var doc = connection.get('dogs', 'fido');
var doc2 = connection.get('dogs', 'fido');
expect(doc).equal(doc2);
});
it('getting then destroying then getting returns a new doc object', function() {
var backend = new Backend({db: this.db});
var connection = backend.connect();
var doc = connection.get('dogs', 'fido');
doc.destroy();
var doc2 = connection.get('dogs', 'fido');
expect(doc).not.equal(doc2);
expect(doc).eql(doc2);
});
it('can fetch an uncreated doc', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
expect(doc.data).equal(undefined);

@@ -41,4 +21,3 @@ expect(doc.version).equal(null);

it('can fetch then create a new doc', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
doc.fetch(function(err) {

@@ -56,4 +35,3 @@ if (err) return done(err);

it('can create a new doc without fetching', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -68,4 +46,3 @@ if (err) return done(err);

it('can create then delete then create a doc', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -92,4 +69,3 @@ if (err) return done(err);

it('can create then submit an op', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -107,4 +83,3 @@ if (err) return done(err);

it('can create then submit an op sync', function() {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3});

@@ -118,5 +93,20 @@ expect(doc.data).eql({age: 3});

it('cannot submit op on an uncreated doc', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
doc.submitOp({p: ['age'], na: 2}, function(err) {
expect(err).ok();
done();
});
});
it('cannot delete an uncreated doc', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
doc.del(function(err) {
expect(err).ok();
done();
});
});
it('ops submitted sync get composed', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3});

@@ -148,4 +138,3 @@ doc.submitOp({p: ['age'], na: 2});

it('can create a new doc then fetch', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -163,4 +152,3 @@ if (err) return done(err);

it('calling create on the same doc twice fails', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -178,5 +166,4 @@ if (err) return done(err);

it('trying to create an already created doc without fetching fails and fetches', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -197,3 +184,3 @@ if (err) return done(err);

process.nextTick(function() {
connection.disconnect();
connection.close();
// Reconnect once the server has a chance to save the op data

@@ -207,3 +194,3 @@ setTimeout(function() {

it('resends create when disconnected before ack', function(done) {
var backend = new Backend({db: this.db});
var backend = this.backend;
var doc = backend.connect().get('dogs', 'fido');

@@ -220,3 +207,3 @@ doc.create({age: 3}, function(err) {

it('resent create on top of deleted doc gets proper starting version', function(done) {
var backend = new Backend({db: this.db});
var backend = this.backend;
var doc = backend.connect().get('dogs', 'fido');

@@ -241,3 +228,3 @@ doc.create({age: 4}, function(err) {

it('resends delete when disconnected before ack', function(done) {
var backend = new Backend({db: this.db});
var backend = this.backend;
var doc = backend.connect().get('dogs', 'fido');

@@ -257,4 +244,3 @@ doc.create({age: 3}, function(err) {

it('op submitted during inflight create does not compose and gets flushed', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3});

@@ -273,5 +259,4 @@ // Submit an op after message is sent but before server has a chance to reply

it('can commit then fetch in a new connection to get the same data', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -292,5 +277,4 @@ if (err) return done(err);

it('an op submitted concurrently is transformed by the first', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -330,5 +314,4 @@ if (err) return done(err);

it('second of two concurrent creates is rejected', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
var count = 0;

@@ -364,5 +347,4 @@ doc.create({age: 3}, function(err) {

it('concurrent delete operations transform', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -402,5 +384,4 @@ if (err) return done(err);

it('second client can create following delete', function(done) {
var backend = new Backend({db: this.db});
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -420,3 +401,75 @@ if (err) return done(err);

it('doc.pause() prevents ops from being sent', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
doc.pause();
doc.create({age: 3}, done);
done();
});
it('can call doc.resume() without pausing', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
doc.resume();
doc.create({age: 3}, done);
});
it('doc.resume() resumes sending ops after pause', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
doc.pause();
doc.create({age: 3}, done);
doc.resume();
});
it('pending ops are transformed by ops from other clients', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {
if (err) return done(err);
doc2.fetch(function(err) {
if (err) return done(err);
doc.pause();
doc.submitOp({p: ['age'], na: 1});
doc.submitOp({p: ['color'], oi: 'gold'});
expect(doc.version).equal(1);
doc2.submitOp({p: ['age'], na: 5});
process.nextTick(function() {
doc2.submitOp({p: ['sex'], oi: 'female'}, function(err) {
if (err) return done(err);
expect(doc2.version).equal(3);
async.parallel([
function(cb) { doc.fetch(cb) },
function(cb) { doc2.fetch(cb) }
], function(err) {
if (err) return done(err);
expect(doc.data).eql({age: 9, color: 'gold', sex: 'female'});
expect(doc.version).equal(3);
expect(doc.hasPending()).equal(true);
expect(doc2.data).eql({age: 8, sex: 'female'});
expect(doc2.version).equal(3);
expect(doc2.hasPending()).equal(false);
doc.resume();
doc.whenNothingPending(function() {
doc2.fetch(function(err) {
if (err) return done(err);
expect(doc.data).eql({age: 9, color: 'gold', sex: 'female'});
expect(doc.version).equal(4);
expect(doc.hasPending()).equal(false);
expect(doc2.data).eql({age: 9, color: 'gold', sex: 'female'});
expect(doc2.version).equal(4);
expect(doc2.hasPending()).equal(false);
done();
});
});
});
});
});
});
});
});
});
};

@@ -1,10 +0,9 @@

var Backend = require('../../lib/backend');
var expect = require('expect.js');
var async = require('async');
module.exports = function() {
describe('client subscribe', function() {
it('can call bulk without doing any actions', function() {
var backend = new Backend();
var connection = backend.connect();
var connection = this.backend.connect();
connection.startBulk();

@@ -16,5 +15,4 @@ connection.endBulk();

it(method + ' gets initial data', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -32,5 +30,4 @@ if (err) return done(err);

it(method + ' twice simultaneously calls back', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -51,5 +48,4 @@ if (err) return done(err);

it(method + ' twice in bulk simultaneously calls back', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -72,4 +68,4 @@ if (err) return done(err);

it(method + ' bulk on same collection', function(done) {
var backend = new Backend();
var connection = backend.connect();
var connection = this.backend.connect();
var connection2 = this.backend.connect();
async.parallel([

@@ -81,3 +77,2 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); },

if (err) return done(err);
var connection2 = backend.connect();
var fido = connection2.get('dogs', 'fido');

@@ -103,4 +98,4 @@ var spot = connection2.get('dogs', 'spot');

it(method + ' bulk on same collection from known version', function(done) {
var backend = new Backend();
var connection2 = backend.connect();
var connection = this.backend.connect();
var connection2 = this.backend.connect();
var fido = connection2.get('dogs', 'fido');

@@ -123,3 +118,2 @@ var spot = connection2.get('dogs', 'spot');

var connection = backend.connect();
async.parallel([

@@ -182,5 +176,4 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); },

it(method + ' gets new ops', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -200,8 +193,36 @@ if (err) return done(err);

});
it(method + ' calls back after reconnect', function(done) {
var backend = this.backend;
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {
if (err) return done(err);
doc2[method](function(err) {
if (err) return done(err);
expect(doc2.version).eql(1);
expect(doc2.data).eql({age: 3})
done();
});
doc2.connection.close();
process.nextTick(function() {
backend.connect(doc2.connection);
});
});
});
});
it('subscribed client gets create from first client', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
it('unsubscribe calls back immediately on disconnect', function(done) {
var backend = this.backend;
var doc = this.backend.connect().get('dogs', 'fido');
doc.subscribe(function(err) {
if (err) return done(err);
doc.unsubscribe(done);
doc.connection.close();
});
});
it('subscribed client gets create from other client', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc2.subscribe(function(err) {

@@ -219,6 +240,5 @@ if (err) return done(err);

it('subscribed client gets op from first client', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
it('subscribed client gets op from other client', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -238,6 +258,22 @@ if (err) return done(err);

it('disconnecting stops op updates', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {
if (err) return done(err);
doc2.subscribe(function(err) {
if (err) return done(err);
doc2.on('op', function(op, context) {
done();
});
doc2.connection.close();
doc.submitOp({p: ['age'], na: 1});
done();
});
});
});
it('unsubscribe stops op updates', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -247,8 +283,8 @@ if (err) return done(err);

if (err) return done(err);
doc2.on('op', function(op, context) {
done();
});
doc2.unsubscribe(function(err) {
if (err) return done(err);
done();
doc2.on('op', function(op, context) {
done();
});
doc.submitOp({p: ['age'], na: 1});

@@ -260,6 +296,25 @@ });

it('doc destroy stops op updates', function(done) {
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {
if (err) return done(err);
doc2.subscribe(function(err) {
if (err) return done(err);
doc2.on('op', function(op, context) {
done();
});
doc2.destroy(function(err) {
if (err) return done(err);
done();
doc.submitOp({p: ['age'], na: 1});
});
});
});
});
it('bulk unsubscribe stops op updates', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var connection2 = backend.connect();
var connection = this.backend.connect();
var connection2 = this.backend.connect();
var doc = connection.get('dogs', 'fido');
var fido = connection2.get('dogs', 'fido');

@@ -291,6 +346,28 @@ var spot = connection2.get('dogs', 'spot');

it('a subscribed doc is re-subscribed after reconnect and gets any missing ops', function(done) {
var backend = this.backend;
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {
if (err) return done(err);
doc2.subscribe(function(err) {
if (err) return done(err);
doc2.on('op', function(op, context) {
expect(doc2.version).eql(2);
expect(doc2.data).eql({age: 4});
done();
});
doc2.connection.close();
doc.submitOp({p: ['age'], na: 1}, function(err) {
if (err) return done(err);
backend.connect(doc2.connection);
});
});
});
});
it('calling subscribe, unsubscribe, subscribe sync leaves a doc subscribed', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -311,5 +388,4 @@ if (err) return done(err);

it('calling subscribe, unsubscribe, subscribe sync leaves a doc subscribed', function(done) {
var backend = new Backend();
var doc = backend.connect().get('dogs', 'fido');
var doc2 = backend.connect().get('dogs', 'fido');
var doc = this.backend.connect().get('dogs', 'fido');
var doc2 = this.backend.connect().get('dogs', 'fido');
doc.create({age: 3}, function(err) {

@@ -330,1 +406,2 @@ if (err) return done(err);

});
};
var MemoryDB = require('../lib/db/memory');
require('./db')(function(callback) {
callback(null, MemoryDB());
var db = new MemoryDB();
// Implement extremely simple subset of Mongo queries for unit tests
db._querySync = function(snapshots, query) {
var filtered = filter(snapshots, query.$query || query);
sort(filtered, query.$orderby);
return filtered;
};
db.queryPollDoc = function(collection, id, query, options, callback) {
this.getSnapshot(collection, id, null, function(err, snapshot) {
if (err) return callback(err);
var result = filterSnapshot(snapshot, query);
callback(null, result);
});
};
db.canPollDoc = function(collection, query) {
return !query.$orderby;
};
callback(null, db);
});
// Support exact key match filters only
function filter(snapshots, query) {
return snapshots.filter(function(snapshot) {
return filterSnapshot(snapshot, query);
});
}
function filterSnapshot(snapshot, query) {
if (!snapshot.data) return false;
for (var key in query) {
if (key.charAt(0) === '$') continue;
if (snapshot.data[key] !== query[key]) return false;
}
return true;
}
// Support sorting with the Mongo $orderby syntax
function sort(snapshots, orderby) {
if (!orderby) return;
snapshots.sort(function(snapshotA, snapshotB) {
for (var key in orderby) {
var value = orderby[key];
if (value !== 1 && value !== -1) {
throw new Error('Invalid $orderby value');
}
var a = snapshotA.data && snapshotA.data[key];
var b = snapshotB.data && snapshotB.data[key];
if (a > b) return value;
if (b > a) return -value;
}
return 0;
});
}
var async = require('async');
var expect = require('expect.js');
var Backend = require('../lib/backend');
var ot = require('../lib/ot');

@@ -12,2 +13,3 @@

self.db = db;
self.backend = new Backend({db: db});
done();

@@ -17,16 +19,19 @@ });

afterEach(function(done) {
this.db.close(done);
afterEach(function() {
this.backend.close();
});
require('./client/projections')();
require('./client/query-subscribe')();
require('./client/query')();
require('./client/submit')();
require('./client/query')();
require('./client/subscribe')();
// Simplified mock of how submit request applies operations. The
// noteworthy dependency is that it always calls getSnapshot with
// the 'submit' projection and applies the op to the returned
// the {$submit: true} projection and applies the op to the returned
// snapshot before committing. Thus, commit may rely on the behavior
// of getSnapshot with this special projection
function submit(db, collection, id, op, callback) {
db.getSnapshot(collection, id, 'submit', function(err, snapshot) {
db.getSnapshot(collection, id, {$submit: true}, function(err, snapshot) {
if (err) return callback(err);

@@ -388,3 +393,3 @@ if (snapshot.v !== op.v) {

if (err) return done(err);
db.getSnapshot('testcollection', 'test', 'submit', function(err, snapshot) {
db.getSnapshot('testcollection', 'test', {$submit: true}, function(err, snapshot) {
if (err) return done(err);

@@ -391,0 +396,0 @@ db.getOpsToSnapshot('testcollection', 'test', 0, snapshot, function(err, ops) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc