Comparing version 0.10.2 to 0.11.0
@@ -15,3 +15,2 @@ var hat = require('hat'); | ||
function Agent(backend, stream) { | ||
// The stream passed in should be a nodejs 0.10-style stream. | ||
this.backend = backend; | ||
@@ -30,6 +29,2 @@ this.stream = stream; | ||
// Subscriptions care about the stream being destroyed. We end up with a | ||
// listener per subscribed document for the client, which can be a lot. | ||
stream.setMaxListeners(0); | ||
// We need to track this manually to make sure we don't reply to messages | ||
@@ -40,7 +35,2 @@ // after the stream was closed. There's no built-in way to ask a stream | ||
var agent = this; | ||
stream.once('end', function() { | ||
agent._cleanup(); | ||
}); | ||
// Initialize the remote client by sending it its agent Id. | ||
@@ -59,7 +49,6 @@ this._send({ | ||
if (err) { | ||
console.warn('Agent closed due to error', err); | ||
this.stream.emit('error', err); | ||
console.warn('Agent closed due to error', this.clientId, err.stack || err); | ||
} | ||
if (this.closed) return; | ||
// This will emit 'end', which will call _cleanup | ||
// This will end the writable stream and emit 'finish' | ||
this.stream.end(); | ||
@@ -69,9 +58,5 @@ }; | ||
Agent.prototype._cleanup = function() { | ||
if (this.closed) return; | ||
this.closed = true; | ||
// Remove the pump listener | ||
this.stream.removeAllListeners('readable'); | ||
// Clean up all the subscriptions. | ||
// Clean up doc subscription streams | ||
for (var collection in this.subscribedDocs) { | ||
@@ -84,5 +69,5 @@ var docs = this.subscribedDocs[collection]; | ||
} | ||
// Cancel the subscribes | ||
this.subscribedDocs = {}; | ||
// Clean up query subscription streams | ||
for (var id in this.subscribedQueries) { | ||
@@ -92,3 +77,2 @@ var emitter = this.subscribedQueries[id]; | ||
} | ||
// Cancel the subscribes | ||
this.subscribedQueries = {}; | ||
@@ -224,29 +208,25 @@ }; | ||
// start processing events from the stream. This calls itself recursively. | ||
// Use .close() to drain the pump. | ||
// Start processing events from the stream | ||
Agent.prototype.pump = function() { | ||
if (this.closed) return; | ||
var req = this.stream.read(); | ||
var agent = this; | ||
if (req == null) { | ||
// Retry when there's a message waiting for us. | ||
this.stream.once('readable', function() { | ||
agent.pump(); | ||
}); | ||
return; | ||
} | ||
if (typeof req === 'string') { | ||
this.stream.on('data', function(chunk) { | ||
if (agent.closed) return; | ||
if (typeof chunk !== 'string') { | ||
var err = {message: 'Received non-string message'}; | ||
return agent.close(err); | ||
} | ||
try { | ||
req = JSON.parse(req); | ||
} catch(e) { | ||
console.warn('Client sent invalid JSON', e.stack); | ||
this.close(e); | ||
var req = JSON.parse(chunk); | ||
} catch (err) { | ||
return agent.close(err); | ||
} | ||
} | ||
this._handleMessage(req); | ||
// Clean up the stack then read the next message | ||
process.nextTick(function() { | ||
agent.pump(); | ||
agent._handleMessage(req); | ||
}); | ||
this.stream.on('end', function() { | ||
agent._cleanup(); | ||
}); | ||
}; | ||
@@ -339,8 +319,7 @@ | ||
} | ||
return { | ||
ids: ids, | ||
fetch: fetch, | ||
fetchOps: fetchOps, | ||
db: req.db | ||
}; | ||
var options = req.o || {}; | ||
options.ids = ids; | ||
options.fetch = fetch; | ||
options.fetchOps = fetchOps; | ||
return options; | ||
} | ||
@@ -347,0 +326,0 @@ |
@@ -1,2 +0,1 @@ | ||
var Duplex = require('stream').Duplex; | ||
var async = require('async'); | ||
@@ -45,4 +44,3 @@ var Agent = require('./agent'); | ||
Backend.prototype.connect = function(connection) { | ||
var stream = new Duplex({objectMode: true}); | ||
var socket = new StreamSocket(stream); | ||
var socket = new StreamSocket(); | ||
if (connection) { | ||
@@ -54,3 +52,3 @@ connection.bindToSocket(socket); | ||
socket._open(); | ||
this.listen(stream); | ||
this.listen(socket.stream); | ||
return connection; | ||
@@ -75,3 +73,3 @@ }; | ||
Backend.prototype.addProjection = function(name, collection, type, fields) { | ||
Backend.prototype.addProjection = function(name, collection, fields) { | ||
if (this.projections[name]) { | ||
@@ -81,5 +79,5 @@ throw new Error('Projection ' + name + ' already exists'); | ||
for (var k in fields) { | ||
if (fields[k] !== true) { | ||
throw new Error('Invalid field ' + k + ' - fields must be {somekey:true}. Subfields not currently supported.'); | ||
for (var key in fields) { | ||
if (fields[key] !== true) { | ||
throw new Error('Invalid field ' + key + ' - fields must be {somekey: true}. Subfields not currently supported.'); | ||
} | ||
@@ -90,3 +88,2 @@ } | ||
target: collection, | ||
type: ot.normalizeType(type), | ||
fields: fields | ||
@@ -93,0 +90,0 @@ }; |
@@ -31,4 +31,3 @@ var Doc = require('./doc'); | ||
// Each query is created with an id that the server uses when it sends us | ||
// info about the query (updates, etc). | ||
//this.nextQueryId = (Math.random() * 1000) |0; | ||
// info about the query (updates, etc) | ||
this.nextQueryId = 1; | ||
@@ -39,14 +38,13 @@ | ||
// A unique message number for the given id | ||
this.seq = 1; | ||
// Equals agent.clientId on the server | ||
this.id = null; | ||
// Private variable to support clearing of op retry interval | ||
this._retryInterval = null; | ||
// Reset some more state variables. | ||
this.reset(); | ||
this.debug = false; | ||
// I'll store the most recent 100 messages so when errors occur we can see | ||
// what happened. | ||
this.messageBuffer = []; | ||
this.bindToSocket(socket); | ||
@@ -85,9 +83,11 @@ } | ||
// State of the connection. The correspoding events are emmited when this | ||
// changes. Available states are: | ||
// - 'connecting' The connection has been established, but we don't have our | ||
// client ID yet | ||
// - 'connected' We have connected and recieved our client ID. Ready for data. | ||
// - 'disconnected' The connection is closed, but it will reconnect automatically. | ||
// - 'stopped' The connection is closed, and should not reconnect. | ||
// State of the connection. The correspoding events are emmited when this changes | ||
// | ||
// - 'connecting' The connection is still being established, or we are still | ||
// waiting on the server to send us the initialization message | ||
// - 'connected' The connection is open and we have connected to a server | ||
// and recieved the initialization message | ||
// - 'disconnected' Connection is closed, but it will reconnect automatically | ||
// - 'closed' The connection was closed by the client, and will not reconnect | ||
// - 'stopped' The connection was closed by the server, and will not reconnect | ||
this.state = (socket.readyState === 0 || socket.readyState === 1) ? 'connecting' : 'disconnected'; | ||
@@ -101,18 +101,14 @@ | ||
socket.onmessage = function(msg) { | ||
var data = msg.data; | ||
socket.onmessage = function(event) { | ||
try { | ||
var data = (typeof event.data === 'string') ? | ||
JSON.parse(event.data) : | ||
event.data; | ||
} catch (err) { | ||
// Silently ignore parsing errors | ||
return; | ||
} | ||
// Some transports don't need parsing. | ||
if (typeof data === 'string') data = JSON.parse(data); | ||
if (connection.debug) console.log('RECV', JSON.stringify(data)); | ||
connection.messageBuffer.push({ | ||
t: (new Date()).toTimeString(), | ||
recv: JSON.stringify(data) | ||
}); | ||
while (connection.messageBuffer.length > 100) { | ||
connection.messageBuffer.shift(); | ||
} | ||
try { | ||
@@ -129,3 +125,3 @@ connection.handleMessage(data); | ||
socket.onerror = function(e) { | ||
socket.onerror = function(err) { | ||
// This isn't the same as a regular error, because it will happen normally | ||
@@ -135,7 +131,7 @@ // from time to time. Your connection should probably automatically | ||
// (onclose happens when onerror gets called anyway). | ||
connection.emit('connection error', e); | ||
connection.emit('connection error', err); | ||
}; | ||
socket.onclose = function(reason) { | ||
// reason values: | ||
// node-browserchannel reason values: | ||
// 'Closed' - The socket was manually closed by calling socket.close() | ||
@@ -145,5 +141,11 @@ // 'Stopped by server' - The server sent the stop message to tell the client not to try connecting | ||
// 'Unknown session ID' - Server session for client is missing (temporary, will immediately reestablish) | ||
connection._setState('disconnected', reason); | ||
if (reason === 'Closed' || reason === 'Stopped by server') { | ||
if (reason === 'closed' || reason === 'Closed') { | ||
connection._setState('closed', reason); | ||
} else if (reason === 'stopped' || reason === 'Stopped by server') { | ||
connection._setState('stopped', reason); | ||
} else { | ||
connection._setState('disconnected', reason); | ||
} | ||
@@ -250,13 +252,10 @@ }; | ||
Connection.prototype.reset = function() { | ||
Connection.prototype._reset = function() { | ||
this.seq = 1; | ||
this.id = null; | ||
clearInterval(this._retryInterval); | ||
this._retryInterval = null; | ||
}; | ||
Connection.prototype._setupRetry = function() { | ||
if (!this.canSend) { | ||
clearInterval(this._retryInterval); | ||
this._retryInterval = null; | ||
return; | ||
} | ||
if (this._retryInterval != null) return; | ||
@@ -283,3 +282,3 @@ | ||
if ( | ||
(newState === 'connecting' && this.state !== 'disconnected' && this.state !== 'stopped') || | ||
(newState === 'connecting' && this.state !== 'disconnected' && this.state !== 'stopped' && this.state !== 'closed') || | ||
(newState === 'connected' && this.state !== 'connecting') | ||
@@ -294,4 +293,4 @@ ) { | ||
if (newState === 'disconnected' || newState === 'stopped') this.reset(); | ||
this._setupRetry(); | ||
if (newState === 'disconnected' || newState === 'stopped' || newState === 'closed') this._reset(); | ||
if (this.canSend) this._setupRetry(); | ||
@@ -417,7 +416,2 @@ // Group subscribes together to help server make more efficient calls | ||
this.messageBuffer.push({t:Date.now(), send:JSON.stringify(msg)}); | ||
while (this.messageBuffer.length > 100) { | ||
this.messageBuffer.shift(); | ||
} | ||
this.socket.send(JSON.stringify(msg)); | ||
@@ -428,5 +422,5 @@ }; | ||
/** | ||
* Closes the socket and emits 'disconnected' | ||
* Closes the socket and emits 'closed' | ||
*/ | ||
Connection.prototype.disconnect = function() { | ||
Connection.prototype.close = function() { | ||
this.socket.close(); | ||
@@ -492,4 +486,2 @@ }; | ||
Connection.prototype._createQuery = function(action, collection, q, options, callback) { | ||
if (!options) options = {}; | ||
var id = this.nextQueryId++; | ||
@@ -534,11 +526,8 @@ var query = new Query(action, this, id, collection, q, options, callback); | ||
this._firstDoc(hasPending) || | ||
this._firstQuery(callbackPending) | ||
this._firstQuery(hasPending) | ||
); | ||
}; | ||
function hasPending(doc) { | ||
return doc.hasPending(); | ||
function hasPending(object) { | ||
return object.hasPending(); | ||
} | ||
function callbackPending(query) { | ||
return query.callback; | ||
} | ||
@@ -548,4 +537,4 @@ Connection.prototype.hasWritePending = function() { | ||
}; | ||
function hasWritePending(doc) { | ||
return doc.hasWritePending(); | ||
function hasWritePending(object) { | ||
return object.hasWritePending(); | ||
} | ||
@@ -563,5 +552,6 @@ | ||
} | ||
var query = this._firstQuery(callbackPending); | ||
var query = this._firstQuery(hasPending); | ||
if (query) { | ||
query.once('ready', this._nothingPendingRetry(callback)); | ||
return; | ||
} | ||
@@ -568,0 +558,0 @@ // Call back when no pending operations |
@@ -334,3 +334,3 @@ var emitter = require('../emitter'); | ||
} | ||
if (callback) this.pendingFetch.push(callback); | ||
this.pendingFetch.push(callback); | ||
}; | ||
@@ -346,3 +346,3 @@ | ||
} | ||
if (callback) this.pendingFetch.push(callback); | ||
this.pendingFetch.push(callback); | ||
}; | ||
@@ -389,8 +389,2 @@ | ||
// Clear any no-ops from the front of the pending op list. | ||
while (this.pendingOps.length && isNoOp(this.pendingOps[0])) { | ||
var op = this.pendingOps.shift(); | ||
callEach(op.callbacks); | ||
} | ||
// Send first pending op unless paused | ||
@@ -409,6 +403,2 @@ if (!this.paused && this.pendingOps.length) { | ||
function isNoOp(op) { | ||
return !op.op && !op.create && !op.del; | ||
} | ||
// Try to compose op2 into op1. Returns truthy if it succeeds, otherwise falsy. | ||
@@ -419,13 +409,9 @@ function tryCompose(type, op1, op2) { | ||
return true; | ||
} | ||
} else if (op1.op && op2.op && type.compose) { | ||
if (op1.op && op2.op && type.compose) { | ||
op1.op = type.compose(op1.op, op2.op); | ||
return true; | ||
} | ||
} else if (isNoOp(op1)) { | ||
op1.create = op2.create; | ||
op1.del = op2.del; | ||
op1.op = op2.op; | ||
return true; | ||
} | ||
return false; | ||
@@ -432,0 +418,0 @@ } |
@@ -21,15 +21,17 @@ var emitter = require('../emitter'); | ||
// The db we actually hit. If this isn't defined, it hits the snapshot | ||
// database. Otherwise this can be used to hit another configured query | ||
// index. | ||
this.db = options.db; | ||
// A list of resulting documents. These are actual documents, complete with | ||
// data and all the rest. It is possible to pass in an initial results set, | ||
// so that a query can be serialized and then re-established | ||
this.results = options.results; | ||
this.results = null; | ||
if (options && options.results) { | ||
this.results = options.results; | ||
delete options.results; | ||
} | ||
this.extra = undefined; | ||
// Options to pass through with the query | ||
this.options = options; | ||
this.callback = callback; | ||
this.ready = false; | ||
this.sent = false; | ||
@@ -39,2 +41,6 @@ } | ||
Query.prototype.hasPending = function() { | ||
return !this.ready; | ||
} | ||
// Helper for subscribe & fetch, since they share the same message format. | ||
@@ -52,3 +58,5 @@ // | ||
}; | ||
if (this.db != null) msg.db = this.db; | ||
if (this.options) { | ||
msg.o = this.options; | ||
} | ||
if (this.results) { | ||
@@ -76,2 +84,5 @@ // Collect the version of all the documents in the current result set so we | ||
this.connection._destroyQuery(this); | ||
// There is a callback for consistency, but we don't actually wait for the | ||
// server's unsubscribe message currently | ||
if (callback) process.nextTick(callback); | ||
}; | ||
@@ -141,2 +152,3 @@ | ||
this.emit('ready'); | ||
this.ready = true; | ||
if (err) { | ||
@@ -143,0 +155,0 @@ this.connection._destroyQuery(this); |
@@ -86,7 +86,4 @@ var DB = require('./index'); | ||
// The memory database has a really simple (probably too simple) query | ||
// mechanism to get all documents in the collection. The query is just the | ||
// collection name. | ||
// Ignore the query - Returns all documents in the specified collection | ||
// The memory database query function returns all documents in a collection | ||
// regardless of query by default | ||
MemoryDB.prototype.query = function(collection, query, fields, options, callback) { | ||
@@ -101,7 +98,18 @@ var db = this; | ||
} | ||
callback(null, snapshots); | ||
try { | ||
var filtered = db._querySync(snapshots, query, options); | ||
callback(null, filtered); | ||
} catch (err) { | ||
callback(err); | ||
} | ||
}); | ||
}; | ||
// For testing, it may be useful to implement the desired query language by | ||
// defining this function | ||
MemoryDB.prototype._querySync = function(snapshots, query, options) { | ||
return snapshots; | ||
}; | ||
MemoryDB.prototype._writeOpSync = function(collection, id, op) { | ||
@@ -108,0 +116,0 @@ var opLog = this._getOpLogSync(collection, id); |
@@ -38,3 +38,3 @@ // This contains the master OT functions for the database. They look like | ||
exports.normalizeType = function(typeName) { | ||
return types[typeName].uri; | ||
return types[typeName] && types[typeName].uri; | ||
}; | ||
@@ -101,10 +101,2 @@ | ||
// This is a helper function to catchup a document by a list of operations. | ||
exports.applyAll = function(snapshot, ops) { | ||
for (var i = 0; i < ops.length; i++) { | ||
var err = exports.apply(snapshot, ops[i]); | ||
if (err) return err; | ||
} | ||
}; | ||
exports.transform = function(type, op, appliedOp) { | ||
@@ -111,0 +103,0 @@ // There are 16 cases this function needs to deal with - which are all the |
@@ -11,7 +11,4 @@ var json0 = require('ot-json0').type; | ||
function projectSnapshot(fields, snapshot) { | ||
// Nothing to project if there isn't any data | ||
if (snapshot.data == null) return; | ||
// Only json0 supported right now | ||
if (snapshot.type !== json0.uri) { | ||
if (snapshot.type && snapshot.type !== json0.uri) { | ||
throw new Error('Cannot project snapshots of type ' + snapshot.type); | ||
@@ -24,3 +21,3 @@ } | ||
if (op.create) { | ||
op.create.data = projectData(fields, op.create.data); | ||
projectSnapshot(fields, op.create); | ||
} | ||
@@ -65,9 +62,9 @@ if (op.op) { | ||
return isSnapshotAllowed(fields, op.create); | ||
} else if (op.op) { | ||
if (knownType !== json0.uri) return false; | ||
} | ||
if (op.op) { | ||
if (knownType && knownType !== json0.uri) return false; | ||
return isEditAllowed(fields, op.op); | ||
} else { | ||
// Noop and del are both ok. | ||
return true; | ||
} | ||
// Noop and del are both ok. | ||
return true; | ||
} | ||
@@ -77,10 +74,10 @@ | ||
function isSnapshotAllowed(fields, snapshot) { | ||
if (snapshot.type !== json0.uri) { | ||
if (snapshot.type && snapshot.type !== json0.uri) { | ||
return false; | ||
} | ||
if (snapshot.data === undefined) { | ||
if (snapshot.data == null) { | ||
return true; | ||
} | ||
// Data must be an object if not undefined | ||
if (!snapshot.data || typeof snapshot.data !== 'object' || Array.isArray(snapshot.data)) { | ||
// Data must be an object if not null | ||
if (typeof snapshot.data !== 'object' || Array.isArray(snapshot.data)) { | ||
return false; | ||
@@ -107,4 +104,8 @@ } | ||
function projectData(fields, data) { | ||
// Return back null or undefined | ||
if (data == null) { | ||
return data; | ||
} | ||
// If data is not an object, the projected version just looks like null. | ||
if (!data || typeof data !== 'object' || Array.isArray(data)) { | ||
if (typeof data !== 'object' || Array.isArray(data)) { | ||
return null; | ||
@@ -111,0 +112,0 @@ } |
@@ -212,6 +212,9 @@ var deepEquals = require('deep-is'); | ||
if (err) return callback(err); | ||
emitter._emitTiming('query.pollDocGetSnapshot', start); | ||
var values = [snapshot]; | ||
emitter.onDiff([new arraydiff.InsertDiff(index, values)]); | ||
callback(); | ||
emitter.backend._sanitizeSnapshot(emitter.agent, emitter.snapshotProjection, emitter.collection, id, snapshot, function(err) { | ||
if (err) return callback(err); | ||
var values = [snapshot]; | ||
emitter.onDiff([new arraydiff.InsertDiff(index, values)]); | ||
emitter._emitTiming('query.pollDocGetSnapshot', start); | ||
callback(); | ||
}); | ||
}); | ||
@@ -218,0 +221,0 @@ return; |
@@ -0,16 +1,22 @@ | ||
var Duplex = require('stream').Duplex; | ||
var util = require('./util'); | ||
function StreamSocket(stream) { | ||
this.stream = stream; | ||
function StreamSocket() { | ||
this.stream = new Duplex({objectMode: true}); | ||
this.readyState = 0; | ||
var socket = this; | ||
stream._read = util.doNothing; | ||
stream._write = function(chunk, encoding, callback) { | ||
socket.onmessage({ | ||
type: 'message', | ||
data: chunk | ||
this.stream._read = util.doNothing; | ||
this.stream._write = function(chunk, encoding, callback) { | ||
process.nextTick(function() { | ||
if (socket.readyState !== 1) return; | ||
socket.onmessage({data: chunk}); | ||
callback(); | ||
}); | ||
callback(); | ||
}; | ||
// The server ended the writable stream. Triggered by calling stream.end() | ||
// in agent.close() | ||
this.stream.once('finish', function() { | ||
socket.close('stopped'); | ||
}); | ||
} | ||
@@ -20,16 +26,16 @@ module.exports = StreamSocket; | ||
StreamSocket.prototype._open = function() { | ||
if (this.readyState !== 0) return; | ||
this.readyState = 1; | ||
this.onopen(); | ||
}; | ||
StreamSocket.prototype.close = function() { | ||
StreamSocket.prototype.close = function(reason) { | ||
if (this.readyState === 3) return; | ||
this.readyState = 3; | ||
this.stream.end(); | ||
this.stream.emit('close'); | ||
this.stream.emit('end'); | ||
this.onclose(); | ||
// Signal data writing is complete. Emits the 'end' event | ||
this.stream.push(null); | ||
this.onclose(reason || 'closed'); | ||
}; | ||
StreamSocket.prototype.send = function(data) { | ||
var copy = JSON.parse(JSON.stringify(data)); | ||
this.stream.push(copy); | ||
// Data is a string of JSON | ||
this.stream.push(data); | ||
}; | ||
@@ -40,2 +46,1 @@ StreamSocket.prototype.onmessage = util.doNothing; | ||
StreamSocket.prototype.onopen = util.doNothing; | ||
StreamSocket.prototype.onconnecting = util.doNothing; |
var ot = require('./ot'); | ||
var projections = require('./projections'); | ||
@@ -44,3 +45,3 @@ function SubmitRequest(backend, agent, index, id, op) { | ||
// With a null projection, it strips document metadata | ||
var fields = 'submit'; | ||
var fields = {$submit: true}; | ||
@@ -47,0 +48,0 @@ backend.db.getSnapshot(collection, id, fields, function(err, snapshot) { |
{ | ||
"name": "sharedb", | ||
"version": "0.10.2", | ||
"version": "0.11.0", | ||
"description": "JSON OT database backend", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -1,88 +0,64 @@ | ||
var sinon = require('sinon'); | ||
var assert = require('assert'); | ||
var Connection = require('../../lib/client').Connection; | ||
var Doc = require('../../lib/client').Doc; | ||
var expect = require('expect.js'); | ||
var Backend = require('../../lib/backend'); | ||
describe('Connection', function() { | ||
var socket = { | ||
readyState: 0, | ||
send: function() {}, | ||
close: function() { | ||
this.readyState = 3; | ||
this.onclose(); | ||
} | ||
}; | ||
describe('client connection', function() { | ||
beforeEach(function() { | ||
socket.readyState = 0; | ||
this.connection = new Connection(socket); | ||
this.backend = new Backend(); | ||
}); | ||
describe('state and socket', function() { | ||
it('is set to disconnected', function() { | ||
socket.readyState = 3; | ||
var connection = new Connection(socket); | ||
assert.equal(connection.state, 'disconnected'); | ||
it('ends the agent stream when a connection is closed after connect', function(done) { | ||
this.backend.use('connect', function(request, next) { | ||
request.agent.stream.on('end', function() { | ||
done(); | ||
}); | ||
next(); | ||
}); | ||
it('is set to connecting', function() { | ||
socket.readyState = 1; | ||
var connection = new Connection(socket); | ||
assert.equal(connection.state, 'connecting'); | ||
var connection = this.backend.connect(); | ||
connection.on('connected', function() { | ||
connection.close(); | ||
}); | ||
}); | ||
describe('socket onopen', function() { | ||
beforeEach(function() { | ||
socket.readyState = 3; | ||
this.connection = new Connection(socket); | ||
it('ends the agent stream when a connection is immediately closed', function(done) { | ||
this.backend.use('connect', function(request, next) { | ||
request.agent.stream.on('end', function() { | ||
done(); | ||
}); | ||
next(); | ||
}); | ||
it('sets connecting state', function() { | ||
assert.equal(this.connection.state, 'disconnected'); | ||
socket.onopen(); | ||
assert.equal(this.connection.state, 'connecting'); | ||
}); | ||
var connection = this.backend.connect(); | ||
connection.close(); | ||
}); | ||
describe('socket onclose', function() { | ||
it('sets disconnected state', function() { | ||
assert.equal(this.connection.state, 'connecting'); | ||
socket.close(); | ||
assert.equal(this.connection.state, 'disconnected'); | ||
it('emits closed event on call to connection.close()', function(done) { | ||
var connection = this.backend.connect(); | ||
connection.on('closed', function() { | ||
done(); | ||
}); | ||
connection.close(); | ||
}); | ||
describe('socket onmessage', function() { | ||
it('calls handle message', function() { | ||
var handleMessage = sinon.spy(this.connection, 'handleMessage'); | ||
socket.onmessage({data: {key: 'value'}}); | ||
sinon.assert.calledWith(handleMessage, { | ||
key: 'value' | ||
it('ends the agent steam on call to agent.close()', function(done) { | ||
this.backend.use('connect', function(request, next) { | ||
request.agent.stream.on('end', function() { | ||
done(); | ||
}); | ||
request.agent.close(); | ||
next(); | ||
}); | ||
var connection = this.backend.connect(); | ||
}) | ||
it('pushes message buffer', function() { | ||
assert(this.connection.messageBuffer.length === 0); | ||
socket.onmessage({data: {key: 'value'}}); | ||
assert(this.connection.messageBuffer.length === 1); | ||
it('emits stopped event on call to agent.close()', function(done) { | ||
this.backend.use('connect', function(request, next) { | ||
request.agent.close(); | ||
next(); | ||
}); | ||
var connection = this.backend.connect(); | ||
connection.on('stopped', function() { | ||
done(); | ||
}); | ||
}); | ||
describe('#disconnect', function() { | ||
it('calls socket.close()', function() { | ||
var close; | ||
close = sinon.spy(socket, 'close'); | ||
this.connection.disconnect(); | ||
sinon.assert.calledOnce(close); | ||
close.reset(); | ||
}); | ||
it('emits disconnected', function() { | ||
var emit = sinon.spy(this.connection, 'emit'); | ||
this.connection.disconnect(); | ||
sinon.assert.calledWith(emit, 'disconnected'); | ||
emit.reset(); | ||
}); | ||
}); | ||
}); |
@@ -1,4 +0,4 @@ | ||
var Backend = require('../../lib/backend'); | ||
var expect = require('expect.js'); | ||
var async = require('async'); | ||
var util = require('../util'); | ||
@@ -10,4 +10,3 @@ module.exports = function() { | ||
it(method + ' on an empty collection', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var connection = backend.connect(); | ||
var connection = this.backend.connect(); | ||
connection[method]('dogs', {}, null, function(err, results) { | ||
@@ -21,4 +20,3 @@ if (err) return done(err); | ||
it(method + ' on collection with fetched docs', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var connection = backend.connect(); | ||
var connection = this.backend.connect(); | ||
async.parallel([ | ||
@@ -32,5 +30,5 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); }, | ||
if (err) return done(err); | ||
sortById(results); | ||
expect(pluck(results, 'id')).eql(['fido', 'spot']); | ||
expect(pluck(results, 'data')).eql([{age: 3}, {age: 5}]); | ||
var sorted = util.sortById(results); | ||
expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']); | ||
expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 5}]); | ||
done(); | ||
@@ -42,4 +40,4 @@ }); | ||
it(method + ' on collection with unfetched docs', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var connection = backend.connect(); | ||
var connection = this.backend.connect(); | ||
var connection2 = this.backend.connect(); | ||
async.parallel([ | ||
@@ -51,8 +49,7 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); }, | ||
if (err) return done(err); | ||
var connection2 = backend.connect(); | ||
connection2[method]('dogs', {}, null, function(err, results) { | ||
if (err) return done(err); | ||
sortById(results); | ||
expect(pluck(results, 'id')).eql(['fido', 'spot']); | ||
expect(pluck(results, 'data')).eql([{age: 3}, {age: 5}]); | ||
var sorted = util.sortById(results); | ||
expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']); | ||
expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 5}]); | ||
done(); | ||
@@ -64,4 +61,4 @@ }); | ||
it(method + ' on collection with one fetched doc', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var connection = backend.connect(); | ||
var connection = this.backend.connect(); | ||
var connection2 = this.backend.connect(); | ||
async.parallel([ | ||
@@ -73,3 +70,2 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); }, | ||
if (err) return done(err); | ||
var connection2 = backend.connect(); | ||
connection2.get('dogs', 'fido').fetch(function(err) { | ||
@@ -79,5 +75,5 @@ if (err) return done(err); | ||
if (err) return done(err); | ||
sortById(results); | ||
expect(pluck(results, 'id')).eql(['fido', 'spot']); | ||
expect(pluck(results, 'data')).eql([{age: 3}, {age: 5}]); | ||
var sorted = util.sortById(results); | ||
expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']); | ||
expect(util.pluck(sorted, 'data')).eql([{age: 3}, {age: 5}]); | ||
done(); | ||
@@ -90,4 +86,4 @@ }); | ||
it(method + ' on collection with one fetched doc missing an op', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var connection = backend.connect(); | ||
var connection = this.backend.connect(); | ||
var connection2 = this.backend.connect(); | ||
async.parallel([ | ||
@@ -99,3 +95,2 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); }, | ||
if (err) return done(err); | ||
var connection2 = backend.connect(); | ||
connection2.get('dogs', 'fido').fetch(function(err) { | ||
@@ -115,5 +110,5 @@ if (err) return done(err); | ||
if (err) return done(err); | ||
sortById(results); | ||
expect(pluck(results, 'id')).eql(['fido', 'spot']); | ||
expect(pluck(results, 'data')).eql([{age: 4}, {age: 5}]); | ||
var sorted = util.sortById(results); | ||
expect(util.pluck(sorted, 'id')).eql(['fido', 'spot']); | ||
expect(util.pluck(sorted, 'data')).eql([{age: 4}, {age: 5}]); | ||
done(); | ||
@@ -130,17 +125,1 @@ }); | ||
}; | ||
function sortById(docs) { | ||
docs.sort(function(a, b) { | ||
if (a.id > b.id) return 1; | ||
if (b.id > a.id) return -1; | ||
return 0; | ||
}); | ||
} | ||
function pluck(docs, key) { | ||
var values = []; | ||
for (var i = 0; i < docs.length; i++) { | ||
values.push(docs[i][key]); | ||
} | ||
return values; | ||
} |
@@ -1,4 +0,3 @@ | ||
var Backend = require('../../lib/backend'); | ||
var async = require('async'); | ||
var expect = require('expect.js'); | ||
var async = require('async'); | ||
@@ -8,23 +7,4 @@ module.exports = function() { | ||
it('getting twice returns the same doc', function() { | ||
var backend = new Backend({db: this.db}); | ||
var connection = backend.connect(); | ||
var doc = connection.get('dogs', 'fido'); | ||
var doc2 = connection.get('dogs', 'fido'); | ||
expect(doc).equal(doc2); | ||
}); | ||
it('getting then destroying then getting returns a new doc object', function() { | ||
var backend = new Backend({db: this.db}); | ||
var connection = backend.connect(); | ||
var doc = connection.get('dogs', 'fido'); | ||
doc.destroy(); | ||
var doc2 = connection.get('dogs', 'fido'); | ||
expect(doc).not.equal(doc2); | ||
expect(doc).eql(doc2); | ||
}); | ||
it('can fetch an uncreated doc', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
expect(doc.data).equal(undefined); | ||
@@ -41,4 +21,3 @@ expect(doc.version).equal(null); | ||
it('can fetch then create a new doc', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.fetch(function(err) { | ||
@@ -56,4 +35,3 @@ if (err) return done(err); | ||
it('can create a new doc without fetching', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -68,4 +46,3 @@ if (err) return done(err); | ||
it('can create then delete then create a doc', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -92,4 +69,3 @@ if (err) return done(err); | ||
it('can create then submit an op', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -107,4 +83,3 @@ if (err) return done(err); | ||
it('can create then submit an op sync', function() { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}); | ||
@@ -118,5 +93,20 @@ expect(doc.data).eql({age: 3}); | ||
it('cannot submit op on an uncreated doc', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.submitOp({p: ['age'], na: 2}, function(err) { | ||
expect(err).ok(); | ||
done(); | ||
}); | ||
}); | ||
it('cannot delete an uncreated doc', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.del(function(err) { | ||
expect(err).ok(); | ||
done(); | ||
}); | ||
}); | ||
it('ops submitted sync get composed', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}); | ||
@@ -148,4 +138,3 @@ doc.submitOp({p: ['age'], na: 2}); | ||
it('can create a new doc then fetch', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -163,4 +152,3 @@ if (err) return done(err); | ||
it('calling create on the same doc twice fails', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -178,5 +166,4 @@ if (err) return done(err); | ||
it('trying to create an already created doc without fetching fails and fetches', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -197,3 +184,3 @@ if (err) return done(err); | ||
process.nextTick(function() { | ||
connection.disconnect(); | ||
connection.close(); | ||
// Reconnect once the server has a chance to save the op data | ||
@@ -207,3 +194,3 @@ setTimeout(function() { | ||
it('resends create when disconnected before ack', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var backend = this.backend; | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
@@ -220,3 +207,3 @@ doc.create({age: 3}, function(err) { | ||
it('resent create on top of deleted doc gets proper starting version', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var backend = this.backend; | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
@@ -241,3 +228,3 @@ doc.create({age: 4}, function(err) { | ||
it('resends delete when disconnected before ack', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var backend = this.backend; | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
@@ -257,4 +244,3 @@ doc.create({age: 3}, function(err) { | ||
it('op submitted during inflight create does not compose and gets flushed', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}); | ||
@@ -273,5 +259,4 @@ // Submit an op after message is sent but before server has a chance to reply | ||
it('can commit then fetch in a new connection to get the same data', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -292,5 +277,4 @@ if (err) return done(err); | ||
it('an op submitted concurrently is transformed by the first', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -330,5 +314,4 @@ if (err) return done(err); | ||
it('second of two concurrent creates is rejected', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
var count = 0; | ||
@@ -364,5 +347,4 @@ doc.create({age: 3}, function(err) { | ||
it('concurrent delete operations transform', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -402,5 +384,4 @@ if (err) return done(err); | ||
it('second client can create following delete', function(done) { | ||
var backend = new Backend({db: this.db}); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -420,3 +401,75 @@ if (err) return done(err); | ||
it('doc.pause() prevents ops from being sent', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.pause(); | ||
doc.create({age: 3}, done); | ||
done(); | ||
}); | ||
it('can call doc.resume() without pausing', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.resume(); | ||
doc.create({age: 3}, done); | ||
}); | ||
it('doc.resume() resumes sending ops after pause', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.pause(); | ||
doc.create({age: 3}, done); | ||
doc.resume(); | ||
}); | ||
it('pending ops are transformed by ops from other clients', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
if (err) return done(err); | ||
doc2.fetch(function(err) { | ||
if (err) return done(err); | ||
doc.pause(); | ||
doc.submitOp({p: ['age'], na: 1}); | ||
doc.submitOp({p: ['color'], oi: 'gold'}); | ||
expect(doc.version).equal(1); | ||
doc2.submitOp({p: ['age'], na: 5}); | ||
process.nextTick(function() { | ||
doc2.submitOp({p: ['sex'], oi: 'female'}, function(err) { | ||
if (err) return done(err); | ||
expect(doc2.version).equal(3); | ||
async.parallel([ | ||
function(cb) { doc.fetch(cb) }, | ||
function(cb) { doc2.fetch(cb) } | ||
], function(err) { | ||
if (err) return done(err); | ||
expect(doc.data).eql({age: 9, color: 'gold', sex: 'female'}); | ||
expect(doc.version).equal(3); | ||
expect(doc.hasPending()).equal(true); | ||
expect(doc2.data).eql({age: 8, sex: 'female'}); | ||
expect(doc2.version).equal(3); | ||
expect(doc2.hasPending()).equal(false); | ||
doc.resume(); | ||
doc.whenNothingPending(function() { | ||
doc2.fetch(function(err) { | ||
if (err) return done(err); | ||
expect(doc.data).eql({age: 9, color: 'gold', sex: 'female'}); | ||
expect(doc.version).equal(4); | ||
expect(doc.hasPending()).equal(false); | ||
expect(doc2.data).eql({age: 9, color: 'gold', sex: 'female'}); | ||
expect(doc2.version).equal(4); | ||
expect(doc2.hasPending()).equal(false); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}; |
@@ -1,10 +0,9 @@ | ||
var Backend = require('../../lib/backend'); | ||
var expect = require('expect.js'); | ||
var async = require('async'); | ||
module.exports = function() { | ||
describe('client subscribe', function() { | ||
it('can call bulk without doing any actions', function() { | ||
var backend = new Backend(); | ||
var connection = backend.connect(); | ||
var connection = this.backend.connect(); | ||
connection.startBulk(); | ||
@@ -16,5 +15,4 @@ connection.endBulk(); | ||
it(method + ' gets initial data', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -32,5 +30,4 @@ if (err) return done(err); | ||
it(method + ' twice simultaneously calls back', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -51,5 +48,4 @@ if (err) return done(err); | ||
it(method + ' twice in bulk simultaneously calls back', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -72,4 +68,4 @@ if (err) return done(err); | ||
it(method + ' bulk on same collection', function(done) { | ||
var backend = new Backend(); | ||
var connection = backend.connect(); | ||
var connection = this.backend.connect(); | ||
var connection2 = this.backend.connect(); | ||
async.parallel([ | ||
@@ -81,3 +77,2 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); }, | ||
if (err) return done(err); | ||
var connection2 = backend.connect(); | ||
var fido = connection2.get('dogs', 'fido'); | ||
@@ -103,4 +98,4 @@ var spot = connection2.get('dogs', 'spot'); | ||
it(method + ' bulk on same collection from known version', function(done) { | ||
var backend = new Backend(); | ||
var connection2 = backend.connect(); | ||
var connection = this.backend.connect(); | ||
var connection2 = this.backend.connect(); | ||
var fido = connection2.get('dogs', 'fido'); | ||
@@ -123,3 +118,2 @@ var spot = connection2.get('dogs', 'spot'); | ||
var connection = backend.connect(); | ||
async.parallel([ | ||
@@ -182,5 +176,4 @@ function(cb) { connection.get('dogs', 'fido').create({age: 3}, cb); }, | ||
it(method + ' gets new ops', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -200,8 +193,36 @@ if (err) return done(err); | ||
}); | ||
it(method + ' calls back after reconnect', function(done) { | ||
var backend = this.backend; | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
if (err) return done(err); | ||
doc2[method](function(err) { | ||
if (err) return done(err); | ||
expect(doc2.version).eql(1); | ||
expect(doc2.data).eql({age: 3}) | ||
done(); | ||
}); | ||
doc2.connection.close(); | ||
process.nextTick(function() { | ||
backend.connect(doc2.connection); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('subscribed client gets create from first client', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
it('unsubscribe calls back immediately on disconnect', function(done) { | ||
var backend = this.backend; | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
doc.subscribe(function(err) { | ||
if (err) return done(err); | ||
doc.unsubscribe(done); | ||
doc.connection.close(); | ||
}); | ||
}); | ||
it('subscribed client gets create from other client', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc2.subscribe(function(err) { | ||
@@ -219,6 +240,5 @@ if (err) return done(err); | ||
it('subscribed client gets op from first client', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
it('subscribed client gets op from other client', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -238,6 +258,22 @@ if (err) return done(err); | ||
it('disconnecting stops op updates', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
if (err) return done(err); | ||
doc2.subscribe(function(err) { | ||
if (err) return done(err); | ||
doc2.on('op', function(op, context) { | ||
done(); | ||
}); | ||
doc2.connection.close(); | ||
doc.submitOp({p: ['age'], na: 1}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('unsubscribe stops op updates', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -247,8 +283,8 @@ if (err) return done(err); | ||
if (err) return done(err); | ||
doc2.on('op', function(op, context) { | ||
done(); | ||
}); | ||
doc2.unsubscribe(function(err) { | ||
if (err) return done(err); | ||
done(); | ||
doc2.on('op', function(op, context) { | ||
done(); | ||
}); | ||
doc.submitOp({p: ['age'], na: 1}); | ||
@@ -260,6 +296,25 @@ }); | ||
it('doc destroy stops op updates', function(done) { | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
if (err) return done(err); | ||
doc2.subscribe(function(err) { | ||
if (err) return done(err); | ||
doc2.on('op', function(op, context) { | ||
done(); | ||
}); | ||
doc2.destroy(function(err) { | ||
if (err) return done(err); | ||
done(); | ||
doc.submitOp({p: ['age'], na: 1}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('bulk unsubscribe stops op updates', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var connection2 = backend.connect(); | ||
var connection = this.backend.connect(); | ||
var connection2 = this.backend.connect(); | ||
var doc = connection.get('dogs', 'fido'); | ||
var fido = connection2.get('dogs', 'fido'); | ||
@@ -291,6 +346,28 @@ var spot = connection2.get('dogs', 'spot'); | ||
it('a subscribed doc is re-subscribed after reconnect and gets any missing ops', function(done) { | ||
var backend = this.backend; | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
if (err) return done(err); | ||
doc2.subscribe(function(err) { | ||
if (err) return done(err); | ||
doc2.on('op', function(op, context) { | ||
expect(doc2.version).eql(2); | ||
expect(doc2.data).eql({age: 4}); | ||
done(); | ||
}); | ||
doc2.connection.close(); | ||
doc.submitOp({p: ['age'], na: 1}, function(err) { | ||
if (err) return done(err); | ||
backend.connect(doc2.connection); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('calling subscribe, unsubscribe, subscribe sync leaves a doc subscribed', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -311,5 +388,4 @@ if (err) return done(err); | ||
it('calling subscribe, unsubscribe, subscribe sync leaves a doc subscribed', function(done) { | ||
var backend = new Backend(); | ||
var doc = backend.connect().get('dogs', 'fido'); | ||
var doc2 = backend.connect().get('dogs', 'fido'); | ||
var doc = this.backend.connect().get('dogs', 'fido'); | ||
var doc2 = this.backend.connect().get('dogs', 'fido'); | ||
doc.create({age: 3}, function(err) { | ||
@@ -330,1 +406,2 @@ if (err) return done(err); | ||
}); | ||
}; |
var MemoryDB = require('../lib/db/memory'); | ||
require('./db')(function(callback) { | ||
callback(null, MemoryDB()); | ||
var db = new MemoryDB(); | ||
// Implement extremely simple subset of Mongo queries for unit tests | ||
db._querySync = function(snapshots, query) { | ||
var filtered = filter(snapshots, query.$query || query); | ||
sort(filtered, query.$orderby); | ||
return filtered; | ||
}; | ||
db.queryPollDoc = function(collection, id, query, options, callback) { | ||
this.getSnapshot(collection, id, null, function(err, snapshot) { | ||
if (err) return callback(err); | ||
var result = filterSnapshot(snapshot, query); | ||
callback(null, result); | ||
}); | ||
}; | ||
db.canPollDoc = function(collection, query) { | ||
return !query.$orderby; | ||
}; | ||
callback(null, db); | ||
}); | ||
// Support exact key match filters only | ||
function filter(snapshots, query) { | ||
return snapshots.filter(function(snapshot) { | ||
return filterSnapshot(snapshot, query); | ||
}); | ||
} | ||
function filterSnapshot(snapshot, query) { | ||
if (!snapshot.data) return false; | ||
for (var key in query) { | ||
if (key.charAt(0) === '$') continue; | ||
if (snapshot.data[key] !== query[key]) return false; | ||
} | ||
return true; | ||
} | ||
// Support sorting with the Mongo $orderby syntax | ||
function sort(snapshots, orderby) { | ||
if (!orderby) return; | ||
snapshots.sort(function(snapshotA, snapshotB) { | ||
for (var key in orderby) { | ||
var value = orderby[key]; | ||
if (value !== 1 && value !== -1) { | ||
throw new Error('Invalid $orderby value'); | ||
} | ||
var a = snapshotA.data && snapshotA.data[key]; | ||
var b = snapshotB.data && snapshotB.data[key]; | ||
if (a > b) return value; | ||
if (b > a) return -value; | ||
} | ||
return 0; | ||
}); | ||
} |
var async = require('async'); | ||
var expect = require('expect.js'); | ||
var Backend = require('../lib/backend'); | ||
var ot = require('../lib/ot'); | ||
@@ -12,2 +13,3 @@ | ||
self.db = db; | ||
self.backend = new Backend({db: db}); | ||
done(); | ||
@@ -17,16 +19,19 @@ }); | ||
afterEach(function(done) { | ||
this.db.close(done); | ||
afterEach(function() { | ||
this.backend.close(); | ||
}); | ||
require('./client/projections')(); | ||
require('./client/query-subscribe')(); | ||
require('./client/query')(); | ||
require('./client/submit')(); | ||
require('./client/query')(); | ||
require('./client/subscribe')(); | ||
// Simplified mock of how submit request applies operations. The | ||
// noteworthy dependency is that it always calls getSnapshot with | ||
// the 'submit' projection and applies the op to the returned | ||
// the {$submit: true} projection and applies the op to the returned | ||
// snapshot before committing. Thus, commit may rely on the behavior | ||
// of getSnapshot with this special projection | ||
function submit(db, collection, id, op, callback) { | ||
db.getSnapshot(collection, id, 'submit', function(err, snapshot) { | ||
db.getSnapshot(collection, id, {$submit: true}, function(err, snapshot) { | ||
if (err) return callback(err); | ||
@@ -388,3 +393,3 @@ if (snapshot.v !== op.v) { | ||
if (err) return done(err); | ||
db.getSnapshot('testcollection', 'test', 'submit', function(err, snapshot) { | ||
db.getSnapshot('testcollection', 'test', {$submit: true}, function(err, snapshot) { | ||
if (err) return done(err); | ||
@@ -391,0 +396,0 @@ db.getOpsToSnapshot('testcollection', 'test', 0, snapshot, function(err, ops) { |
319913
55
6146