Comparing version 1.0.0 to 1.1.0
@@ -427,3 +427,3 @@ var hat = require('hat'); | ||
if (err) return finish(err); | ||
message = {data: getMapData(snapshotMap)}; | ||
message = getMapResult(snapshotMap); | ||
finish(); | ||
@@ -467,8 +467,16 @@ }); | ||
function getMapData(snapshotMap) { | ||
function getMapResult(snapshotMap) { | ||
var data = {}; | ||
for (var id in snapshotMap) { | ||
data[id] = getSnapshotData(snapshotMap[id]); | ||
var mapValue = snapshotMap[id]; | ||
// fetchBulk / subscribeBulk map data can have either a Snapshot or an object | ||
// `{error: Error | string}` as a value. | ||
if (mapValue.error) { | ||
// Transform errors to serialization-friendly objects. | ||
data[id] = {error: getReplyErrorObject(mapValue.error)}; | ||
} else { | ||
data[id] = getSnapshotData(mapValue); | ||
} | ||
} | ||
return data; | ||
return {data: data}; | ||
} | ||
@@ -522,4 +530,11 @@ | ||
this.backend.fetchBulk(this, collection, versions, function(err, snapshotMap) { | ||
if (err) return callback(err); | ||
callback(null, {data: getMapData(snapshotMap)}); | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (snapshotMap) { | ||
var result = getMapResult(snapshotMap); | ||
callback(null, result); | ||
} else { | ||
callback(); | ||
} | ||
}); | ||
@@ -571,3 +586,5 @@ } else { | ||
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap, opsMap) { | ||
if (err) return callback(err); | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (opsMap) { | ||
@@ -580,3 +597,4 @@ agent._sendOpsBulk(collection, opsMap); | ||
if (snapshotMap) { | ||
callback(null, {data: getMapData(snapshotMap)}); | ||
var result = getMapResult(snapshotMap); | ||
callback(null, result); | ||
} else { | ||
@@ -583,0 +601,0 @@ callback(); |
@@ -15,2 +15,3 @@ var async = require('async'); | ||
var SubmitRequest = require('./submit-request'); | ||
var ReadSnapshotsRequest = require('./read-snapshots-request'); | ||
@@ -256,9 +257,17 @@ var ERROR_CODE = ShareDBError.CODES; | ||
var request = { | ||
collection: collection, | ||
snapshots: snapshots, | ||
snapshotType: snapshotType | ||
}; | ||
var request = new ReadSnapshotsRequest(collection, snapshots, snapshotType); | ||
this.trigger(this.MIDDLEWARE_ACTIONS.readSnapshots, agent, request, callback); | ||
this.trigger(this.MIDDLEWARE_ACTIONS.readSnapshots, agent, request, function(err) { | ||
if (err) return callback(err); | ||
// Handle "partial rejection" - "readSnapshots" middleware functions can use | ||
// `request.rejectSnapshotRead(snapshot, error)` to reject the read of a specific snapshot. | ||
if (request.hasSnapshotRejection()) { | ||
err = request.getReadSnapshotsError(); | ||
} | ||
if (err) { | ||
callback(err); | ||
} else { | ||
callback(); | ||
} | ||
}); | ||
}; | ||
@@ -387,2 +396,14 @@ | ||
/** | ||
* Map of document id to Snapshot or error object. | ||
* @typedef {{ [id: string]: Snapshot | { error: Error | string } }} SnapshotMap | ||
*/ | ||
/** | ||
* @param {Agent} agent | ||
* @param {string} index | ||
* @param {string[]} ids | ||
* @param {*} options | ||
* @param {(err?: Error | string, snapshotMap?: SnapshotMap) => void} callback | ||
*/ | ||
Backend.prototype.fetchBulk = function(agent, index, ids, options, callback) { | ||
@@ -416,5 +437,14 @@ if (typeof options === 'function') { | ||
function(err) { | ||
if (err) return callback(err); | ||
if (err) { | ||
if (err.code === ERROR_CODE.ERR_SNAPSHOT_READS_REJECTED) { | ||
for (var docId in err.idToError) { | ||
snapshotMap[docId] = {error: err.idToError[docId]}; | ||
} | ||
err = undefined; | ||
} else { | ||
snapshotMap = undefined; | ||
} | ||
} | ||
backend.emit('timing', 'fetchBulk', Date.now() - start, request); | ||
callback(null, snapshotMap); | ||
callback(err, snapshotMap); | ||
}); | ||
@@ -478,2 +508,22 @@ }); | ||
/** | ||
* Map of document id to pubsub stream. | ||
* @typedef {{ [id: string]: Stream }} StreamMap | ||
*/ | ||
/** | ||
* Map of document id to array of ops for the doc. | ||
* @typedef {{ [id: string]: Op[] }} OpsMap | ||
*/ | ||
/** | ||
* @param {Agent} agent | ||
* @param {string} index | ||
* @param {string[]} versions | ||
* @param {( | ||
* err?: Error | string | null, | ||
* streams?: StreamMap, | ||
* snapshotMap?: SnapshotMap | null | ||
* opsMap?: OpsMap | ||
* ) => void} callback | ||
*/ | ||
Backend.prototype.subscribeBulk = function(agent, index, versions, callback) { | ||
@@ -509,7 +559,17 @@ var start = Date.now(); | ||
if (err) { | ||
// Full error, destroy all streams. | ||
destroyStreams(streams); | ||
return callback(err); | ||
streams = undefined; | ||
snapshotMap = undefined; | ||
} | ||
for (var docId in snapshotMap) { | ||
// The doc id could map to an object `{error: Error | string}`, which indicates that | ||
// particular snapshot's read was rejected. Destroy the streams fur such docs. | ||
if (snapshotMap[docId].error) { | ||
streams[docId].destroy(); | ||
delete streams[docId]; | ||
} | ||
} | ||
backend.emit('timing', 'subscribeBulk.snapshot', Date.now() - start, request); | ||
callback(null, streams, snapshotMap); | ||
callback(err, streams, snapshotMap); | ||
}); | ||
@@ -516,0 +576,0 @@ } else { |
@@ -178,7 +178,3 @@ var Doc = require('./doc'); | ||
if (message.error) { | ||
// wrap in Error object so can be passed through event emitters | ||
err = new Error(message.error.message); | ||
err.code = message.error.code; | ||
// Add the message data to the error object for more context | ||
err.data = message; | ||
err = wrapErrorData(message.error, message); | ||
delete message.error; | ||
@@ -237,7 +233,7 @@ } | ||
case 'bf': | ||
return this._handleBulkMessage(message, '_handleFetch'); | ||
return this._handleBulkMessage(err, message, '_handleFetch'); | ||
case 'bs': | ||
return this._handleBulkMessage(message, '_handleSubscribe'); | ||
return this._handleBulkMessage(err, message, '_handleSubscribe'); | ||
case 'bu': | ||
return this._handleBulkMessage(message, '_handleUnsubscribe'); | ||
return this._handleBulkMessage(err, message, '_handleUnsubscribe'); | ||
@@ -270,7 +266,28 @@ case 'nf': | ||
Connection.prototype._handleBulkMessage = function(message, method) { | ||
function wrapErrorData(errorData, fullMessage) { | ||
// wrap in Error object so can be passed through event emitters | ||
var err = new Error(errorData.message); | ||
err.code = errorData.code; | ||
if (fullMessage) { | ||
// Add the message data to the error object for more context | ||
err.data = fullMessage; | ||
} | ||
return err; | ||
} | ||
Connection.prototype._handleBulkMessage = function(err, message, method) { | ||
if (message.data) { | ||
for (var id in message.data) { | ||
var dataForId = message.data[id]; | ||
var doc = this.getExisting(message.c, id); | ||
if (doc) doc[method](message.error, message.data[id]); | ||
if (doc) { | ||
if (err) { | ||
doc[method](err); | ||
} else if (dataForId.error) { | ||
// Bulk reply snapshot-specific errorr - see agent.js getMapResult | ||
doc[method](wrapErrorData(dataForId.error)); | ||
} else { | ||
doc[method](null, dataForId); | ||
} | ||
} | ||
} | ||
@@ -281,3 +298,3 @@ } else if (Array.isArray(message.b)) { | ||
var doc = this.getExisting(message.c, id); | ||
if (doc) doc[method](message.error); | ||
if (doc) doc[method](err); | ||
} | ||
@@ -287,3 +304,3 @@ } else if (message.b) { | ||
var doc = this.getExisting(message.c, id); | ||
if (doc) doc[method](message.error); | ||
if (doc) doc[method](err); | ||
} | ||
@@ -290,0 +307,0 @@ } else { |
@@ -249,2 +249,10 @@ var emitter = require('../emitter'); | ||
Doc.prototype._emitResponseError = function(err, callback) { | ||
if (err && err.code === ERROR_CODE.ERR_SNAPSHOT_READ_SILENT_REJECTION) { | ||
this.wantSubscribe = false; | ||
if (callback) { | ||
callback(); | ||
} | ||
this._emitNothingPending(); | ||
return; | ||
} | ||
if (callback) { | ||
@@ -251,0 +259,0 @@ callback(err); |
@@ -44,2 +44,21 @@ function ShareDBError(code, message) { | ||
ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED: 'ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED', | ||
/** | ||
* A special error that a "readSnapshots" middleware implementation can use to indicate that it | ||
* wishes for the ShareDB client to treat it as a silent rejection, not passing the error back to | ||
* user code. | ||
* | ||
* For subscribes, the ShareDB client will still cancel the document subscription. | ||
*/ | ||
ERR_SNAPSHOT_READ_SILENT_REJECTION: 'ERR_SNAPSHOT_READ_SILENT_REJECTION', | ||
/** | ||
* A "readSnapshots" middleware rejected the reads of specific snapshots. | ||
* | ||
* This error code is mostly for server use and generally will not be encountered on the client. | ||
* Instead, each specific doc that encountered an error will receive its specific error. | ||
* | ||
* The one exception is for queries, where a "readSnapshots" rejection of specific snapshots will | ||
* cause the client to receive this error for the whole query, since queries don't support | ||
* doc-specific errors. | ||
*/ | ||
ERR_SNAPSHOT_READS_REJECTED: 'ERR_SNAPSHOT_READS_REJECTED', | ||
ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND', | ||
@@ -46,0 +65,0 @@ ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED', |
{ | ||
"name": "sharedb", | ||
"version": "1.0.0", | ||
"version": "1.1.0", | ||
"description": "JSON OT database backend", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -0,1 +1,3 @@ | ||
_This README is for `sharedb@1.x`. For `sharedb@1.x-beta`, see [the 1.x-beta branch](https://github.com/share/sharedb/tree/1.x-beta). To upgrade, see [the upgrade guide](https://github.com/share/sharedb/wiki/Upgrading-to-sharedb@1.0.0-from-1.0.0-beta)._ | ||
# ShareDB | ||
@@ -173,2 +175,4 @@ | ||
* `snapshots`: Array of retrieved snapshots (for 'readSnapshots') | ||
* `rejectSnapshotRead(snapshot, error)`: Reject a specific snapshot read (for 'readSnapshots') | ||
- `rejectSnapshotReadSilent(snapshot, errorMessage)`: As above, but causes the ShareDB client to treat it as a silent rejection, not passing the error back to user code. | ||
* `data`: Received client message (for 'receive') | ||
@@ -603,3 +607,3 @@ * `request`: Client message being replied to (for 'reply') | ||
```javascript | ||
backend.useMiddleware('connect', function (request, callback) { | ||
backend.useMiddleware('connect', (request, callback) => { | ||
// Best practice to clone to prevent mutating the object after connection. | ||
@@ -611,11 +615,18 @@ // You may also want to consider a deep clone, depending on the shape of request.req. | ||
backend.useMiddleware('readSnapshots', function (request, callback) { | ||
var connectionInfo = request.agent.custom; | ||
var snapshots = request.snapshots; | ||
backend.useMiddleware('readSnapshots', (request, callback) => { | ||
const connectionInfo = request.agent.custom; | ||
const snapshots = request.snapshots; | ||
// Use the information provided at connection to determine if a user can access snapshots. | ||
// Use the information provided at connection to determine if a user can access the snapshots. | ||
// This should also be checked when fetching and submitting ops. | ||
if (!userCanAccessSnapshots(connectionInfo, snapshots)) { | ||
return callback(new Error('Authentication error')); | ||
if (!userCanAccessCollection(connectionInfo, request.collection)) { | ||
return callback(new Error('Not allowed to access collection ' + request.collection)); | ||
} | ||
// Check each snapshot individually. | ||
for (const snapshot of snapshots) { | ||
if (!userCanAccessSnapshot(connectionInfo, request.collection, snapshot)) { | ||
request.rejectSnapshotRead(snapshot, | ||
new Error('Not allowed to access snapshot in ' request.collection)); | ||
} | ||
} | ||
@@ -629,6 +640,6 @@ callback(); | ||
// after the permissions have been fetched. | ||
var connectionInfo = getUserPermissions(); | ||
const connectionInfo = getUserPermissions(); | ||
// Pass info in as the second argument. This will be made available as request.req in the | ||
// 'connection' middleware. | ||
var connection = backend.connect(null, connectionInfo); | ||
const connection = backend.connect(null, connectionInfo); | ||
``` | ||
@@ -635,0 +646,0 @@ |
@@ -143,4 +143,36 @@ var expect = require('chai').expect; | ||
}); | ||
it(method + ' on collection with readSnapshots rejectSnapshotRead', function(done) { | ||
var backend = this.backend; | ||
var connection = backend.connect(); | ||
var connection2 = backend.connect(); | ||
var matchAllDbQuery = this.matchAllDbQuery; | ||
async.parallel([ | ||
function(cb) { | ||
connection.get('dogs', 'fido').create({age: 3}, cb); | ||
}, | ||
function(cb) { | ||
connection.get('dogs', 'spot').create({age: 5}, cb); | ||
}, | ||
function(cb) { | ||
connection.get('cats', 'finn').create({age: 2}, cb); | ||
} | ||
], function(err) { | ||
if (err) return done(err); | ||
backend.use('readSnapshots', function(context, cb) { | ||
expect(context.snapshots).to.be.an('array').of.length(2); | ||
context.rejectSnapshotRead(context.snapshots[0], new Error('Failed to fetch dog')); | ||
cb(); | ||
}); | ||
// Queries have no way of supporting partial readSnapshots rejections, so the entire query | ||
// fails if at least one of the snapshots has an error. | ||
connection2[method]('dogs', matchAllDbQuery, null, function(err, results) { | ||
expect(err).to.be.an('error').with.property('code', 'ERR_SNAPSHOT_READS_REJECTED'); | ||
expect(results).to.equal(undefined); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}; |
@@ -48,2 +48,66 @@ var expect = require('chai').expect; | ||
function testSingleSnapshotSpecificError(label, fns) { | ||
var rejectSnapshot = fns.rejectSnapshot; | ||
var verifyClientError = fns.verifyClientError; | ||
it(method + ' single with readSnapshots rejectSnapshotRead ' + label, function(done) { | ||
var backend = this.backend; | ||
var connection = backend.connect(); | ||
var connection2 = backend.connect(); | ||
connection.get('dogs', 'fido').create({age: 3}, function(err) { | ||
if (err) return done(err); | ||
backend.use('readSnapshots', function(context, cb) { | ||
expect(context.snapshots).to.be.an('array').of.length(1); | ||
expect(context.snapshots[0]).to.have.property('id', 'fido'); | ||
rejectSnapshot(context, context.snapshots[0]); | ||
cb(); | ||
}); | ||
var fido = connection2.get('dogs', 'fido'); | ||
fido[method](function(err) { | ||
verifyClientError(err); | ||
// An error for 'fido' means the data shouldn't get loaded. | ||
expect(fido.data).eql(undefined); | ||
// For subscribe, also test that further remote ops will not get sent for the doc. | ||
if (method !== 'subscribe') { | ||
return done(); | ||
} | ||
// Add listeners on connection2 for remote operations. | ||
fido.on('before op', function(op) { | ||
done(new Error('fido on connection2 should not have received any ops, got:' + | ||
JSON.stringify(op))); | ||
}); | ||
// Issue an operation on connection1. | ||
connection.get('dogs', 'fido').submitOp([{p: ['age'], na: 1}], function(err) { | ||
if (err) return done(err); | ||
// Do a manual fetch on connection2, which should be enough time for it to receive | ||
// the op, if the op were to be sent. | ||
fido.fetch(function(err) { | ||
verifyClientError(err); | ||
expect(fido.data).eql(undefined); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
} | ||
testSingleSnapshotSpecificError('normal error', { | ||
rejectSnapshot: function(context, snapshot) { | ||
context.rejectSnapshotRead(snapshot, new Error('Failed to fetch fido')); | ||
}, | ||
verifyClientError: function(err) { | ||
expect(err).to.be.an('error').with.property('message', 'Failed to fetch fido'); | ||
} | ||
}); | ||
testSingleSnapshotSpecificError('silent error', { | ||
rejectSnapshot: function(context, snapshot) { | ||
context.rejectSnapshotReadSilent(snapshot, 'Failed to fetch fido'); | ||
}, | ||
verifyClientError: function(err) { | ||
expect(err).to.equal(undefined); | ||
} | ||
}); | ||
it(method + ' twice in bulk simultaneously calls back', function(done) { | ||
@@ -112,2 +176,137 @@ var doc = this.backend.connect().get('dogs', 'fido').on('error', done); | ||
it(method + ' bulk with readSnapshots full error', function(done) { | ||
var backend = this.backend; | ||
var connection = backend.connect(); | ||
var connection2 = backend.connect(); | ||
async.parallel([ | ||
function(cb) { | ||
connection.get('dogs', 'fido').create({age: 3}, cb); | ||
}, | ||
function(cb) { | ||
connection.get('dogs', 'spot').create({age: 5}, cb); | ||
} | ||
], function(err) { | ||
if (err) return done(err); | ||
backend.use('readSnapshots', function(context, cb) { | ||
expect(context.snapshots).to.be.an('array').of.length(2); | ||
cb(new Error('Failed to fetch dogs')); | ||
}); | ||
var fido = connection2.get('dogs', 'fido'); | ||
var spot = connection2.get('dogs', 'spot'); | ||
connection2.startBulk(); | ||
async.parallel([ | ||
function(cb) { | ||
fido[method](function(err) { | ||
expect(err).to.be.an('error').with.property('message', 'Failed to fetch dogs'); | ||
cb(err); | ||
}); | ||
}, | ||
function(cb) { | ||
spot[method](function(err) { | ||
expect(err).to.be.an('error').with.property('message', 'Failed to fetch dogs'); | ||
cb(err); | ||
}); | ||
} | ||
], function(err) { | ||
expect(err).to.be.an('error').with.property('message', 'Failed to fetch dogs'); | ||
// Error should mean data doesn't get loaded. | ||
expect(fido.data).eql(undefined); | ||
expect(spot.data).eql(undefined); | ||
done(); | ||
}); | ||
connection2.endBulk(); | ||
}); | ||
}); | ||
function testBulkSnapshotSpecificError(label, fns) { | ||
var rejectSnapshot = fns.rejectSnapshot; | ||
var verifyClientError = fns.verifyClientError; | ||
it(method + ' bulk with readSnapshots rejectSnapshotRead ' + label, function(done) { | ||
var backend = this.backend; | ||
var connection = backend.connect(); | ||
var connection2 = backend.connect(); | ||
async.parallel([ | ||
function(cb) { | ||
connection.get('dogs', 'fido').create({age: 3}, cb); | ||
}, | ||
function(cb) { | ||
connection.get('dogs', 'spot').create({age: 5}, cb); | ||
} | ||
], function(err) { | ||
if (err) return done(err); | ||
backend.use('readSnapshots', function(context, cb) { | ||
expect(context.snapshots).to.be.an('array').of.length(2); | ||
expect(context.snapshots[0]).to.have.property('id', 'fido'); | ||
rejectSnapshot(context, context.snapshots[0]); | ||
cb(); | ||
}); | ||
var fido = connection2.get('dogs', 'fido'); | ||
var spot = connection2.get('dogs', 'spot'); | ||
connection2.startBulk(); | ||
async.parallel([ | ||
function(cb) { | ||
fido[method](function(err) { | ||
verifyClientError(err); | ||
cb(); | ||
}); | ||
}, | ||
function(cb) { | ||
spot[method](cb); | ||
} | ||
], function(err) { | ||
if (err) return done(err); | ||
// An error for 'fido' means the data shouldn't get loaded. | ||
expect(fido.data).eql(undefined); | ||
// Data for 'spot' should still be loaded. | ||
expect(spot.data).eql({age: 5}); | ||
// For subscribe, also test that further remote ops will only get sent for the doc | ||
// without the error. | ||
if (method !== 'subscribe') { | ||
return done(); | ||
} | ||
// Add listeners on connection2 for those operations. | ||
fido.on('before op', function(op) { | ||
done(new Error('fido on connection2 should not have received any ops, got:' + | ||
JSON.stringify(op))); | ||
}); | ||
// Issue some operations on connection1. | ||
connection.get('dogs', 'fido').submitOp([{p: ['age'], na: 1}]); | ||
connection.get('dogs', 'spot').submitOp([{p: ['age'], na: 1}]); | ||
// Check that connection2 receives the op for spot but not fido. | ||
connection2.once('receive', function() { | ||
// 'receive' happens before the client processes the message. Wait an extra tick so we | ||
// can check the effects of the message. | ||
process.nextTick(function() { | ||
expect(fido.data).eql(undefined); | ||
expect(spot.data).eql({age: 6}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
connection2.endBulk(); | ||
}); | ||
}); | ||
} | ||
testBulkSnapshotSpecificError('normal error', { | ||
rejectSnapshot: function(context, snapshot) { | ||
context.rejectSnapshotRead(snapshot, new Error('Failed to fetch fido')); | ||
}, | ||
verifyClientError: function(err) { | ||
expect(err).to.be.an('error').with.property('message', 'Failed to fetch fido'); | ||
} | ||
}); | ||
testBulkSnapshotSpecificError('special ignorable error', { | ||
rejectSnapshot: function(context, snapshot) { | ||
context.rejectSnapshotReadSilent(snapshot, 'Failed to fetch fido'); | ||
}, | ||
verifyClientError: function(err) { | ||
expect(err).to.equal(undefined); | ||
} | ||
}); | ||
it(method + ' bulk on same collection from known version', function(done) { | ||
@@ -114,0 +313,0 @@ var connection = this.backend.connect(); |
2031772
96
12934
747