mongodb-core
Advanced tools
Comparing version 3.1.9 to 3.1.10
@@ -5,2 +5,13 @@ # Change Log | ||
<a name="3.1.10"></a> | ||
## [3.1.10](https://github.com/mongodb-js/mongodb-core/compare/v3.1.9...v3.1.10) (2019-01-15) | ||
### Bug Fixes | ||
* **mongos-replset:** pass connect options to child server instances ([7ffb4bb](https://github.com/mongodb-js/mongodb-core/commit/7ffb4bb)) | ||
* **prettier:** fix prettier file paths for Windows ([00c631e](https://github.com/mongodb-js/mongodb-core/commit/00c631e)) | ||
<a name="3.1.9"></a> | ||
@@ -7,0 +18,0 @@ ## [3.1.9](https://github.com/mongodb-js/mongodb-core/compare/v3.1.8...v3.1.9) (2018-11-16) |
@@ -93,4 +93,3 @@ 'use strict'; | ||
Object.keys(LEGACY_FIND_OPTIONS_MAP).forEach(key => { | ||
if (typeof command.options[key] !== 'undefined') | ||
result[LEGACY_FIND_OPTIONS_MAP[key]] = command.options[key]; | ||
if (typeof command[key] !== 'undefined') result[LEGACY_FIND_OPTIONS_MAP[key]] = command[key]; | ||
}); | ||
@@ -97,0 +96,0 @@ |
@@ -25,3 +25,3 @@ 'use strict'; | ||
// Response flags | ||
var CURSOR_NOT_FOUND = 0; | ||
var CURSOR_NOT_FOUND = 1; | ||
var QUERY_FAILURE = 2; | ||
@@ -50,5 +50,2 @@ var SHARD_CONFIG_STALE = 4; | ||
// Ensure empty options | ||
this.options = options || {}; | ||
// Additional options | ||
@@ -55,0 +52,0 @@ this.numberToSkip = options.numberToSkip || 0; |
@@ -11,4 +11,4 @@ 'use strict'; | ||
var BSON = retrieveBSON(), | ||
Long = BSON.Long; | ||
const BSON = retrieveBSON(); | ||
const Long = BSON.Long; | ||
@@ -109,3 +109,4 @@ /** | ||
// Result field name if not a cursor (contains the array of results) | ||
transforms: options.transforms | ||
transforms: options.transforms, | ||
raw: options.raw || (cmd && cmd.raw) | ||
}; | ||
@@ -214,125 +215,5 @@ | ||
// Internal methods | ||
Cursor.prototype._find = function(callback) { | ||
var self = this; | ||
if (self.logger.isDebug()) { | ||
self.logger.debug( | ||
f( | ||
'issue initial query [%s] with flags [%s]', | ||
JSON.stringify(self.cmd), | ||
JSON.stringify(self.query) | ||
) | ||
); | ||
} | ||
var queryCallback = function(err, r) { | ||
if (err) return callback(err); | ||
// Get the raw message | ||
var result = r.message; | ||
// Query failure bit set | ||
if (result.queryFailure) { | ||
return callback(new MongoError(result.documents[0]), null); | ||
} | ||
// Check if we have a command cursor | ||
if ( | ||
Array.isArray(result.documents) && | ||
result.documents.length === 1 && | ||
(!self.cmd.find || (self.cmd.find && self.cmd.virtual === false)) && | ||
(typeof result.documents[0].cursor !== 'string' || | ||
result.documents[0]['$err'] || | ||
result.documents[0]['errmsg'] || | ||
Array.isArray(result.documents[0].result)) | ||
) { | ||
// We have a an error document return the error | ||
if (result.documents[0]['$err'] || result.documents[0]['errmsg']) { | ||
return callback(new MongoError(result.documents[0]), null); | ||
} | ||
// We have a cursor document | ||
if (result.documents[0].cursor != null && typeof result.documents[0].cursor !== 'string') { | ||
var id = result.documents[0].cursor.id; | ||
// If we have a namespace change set the new namespace for getmores | ||
if (result.documents[0].cursor.ns) { | ||
self.ns = result.documents[0].cursor.ns; | ||
} | ||
// Promote id to long if needed | ||
self.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id; | ||
self.cursorState.lastCursorId = self.cursorState.cursorId; | ||
self.cursorState.operationTime = result.documents[0].operationTime; | ||
// If we have a firstBatch set it | ||
if (Array.isArray(result.documents[0].cursor.firstBatch)) { | ||
self.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse(); | ||
} | ||
// Return after processing command cursor | ||
return callback(null, result); | ||
} | ||
if (Array.isArray(result.documents[0].result)) { | ||
self.cursorState.documents = result.documents[0].result; | ||
self.cursorState.cursorId = Long.ZERO; | ||
return callback(null, result); | ||
} | ||
} | ||
// Otherwise fall back to regular find path | ||
self.cursorState.cursorId = result.cursorId; | ||
self.cursorState.documents = result.documents; | ||
self.cursorState.lastCursorId = result.cursorId; | ||
// Transform the results with passed in transformation method if provided | ||
if (self.cursorState.transforms && typeof self.cursorState.transforms.query === 'function') { | ||
self.cursorState.documents = self.cursorState.transforms.query(result); | ||
} | ||
// Return callback | ||
callback(null, result); | ||
}; | ||
// Options passed to the pool | ||
var queryOptions = {}; | ||
// If we have a raw query decorate the function | ||
if (self.options.raw || self.cmd.raw) { | ||
// queryCallback.raw = self.options.raw || self.cmd.raw; | ||
queryOptions.raw = self.options.raw || self.cmd.raw; | ||
} | ||
// Do we have documentsReturnedIn set on the query | ||
if (typeof self.query.documentsReturnedIn === 'string') { | ||
// queryCallback.documentsReturnedIn = self.query.documentsReturnedIn; | ||
queryOptions.documentsReturnedIn = self.query.documentsReturnedIn; | ||
} | ||
// Add promote Long value if defined | ||
if (typeof self.cursorState.promoteLongs === 'boolean') { | ||
queryOptions.promoteLongs = self.cursorState.promoteLongs; | ||
} | ||
// Add promote values if defined | ||
if (typeof self.cursorState.promoteValues === 'boolean') { | ||
queryOptions.promoteValues = self.cursorState.promoteValues; | ||
} | ||
// Add promote values if defined | ||
if (typeof self.cursorState.promoteBuffers === 'boolean') { | ||
queryOptions.promoteBuffers = self.cursorState.promoteBuffers; | ||
} | ||
if (typeof self.cursorState.session === 'object') { | ||
queryOptions.session = self.cursorState.session; | ||
} | ||
// Write the initial command out | ||
self.server.s.pool.write(self.query, queryOptions, queryCallback); | ||
}; | ||
Cursor.prototype._getmore = function(callback) { | ||
if (this.logger.isDebug()) | ||
this.logger.debug(f('schedule getMore call for query [%s]', JSON.stringify(this.query))); | ||
// Determine if it's a raw query | ||
var raw = this.options.raw || this.cmd.raw; | ||
@@ -348,13 +229,7 @@ // Set the current batchSize | ||
// Default pool | ||
var pool = this.server.s.pool; | ||
// We have a wire protocol handler | ||
this.server.wireProtocolHandler.getMore( | ||
this.bson, | ||
this.server, | ||
this.ns, | ||
this.cursorState, | ||
batchSize, | ||
raw, | ||
pool, | ||
this.options, | ||
@@ -471,6 +346,3 @@ callback | ||
// Default pool | ||
var pool = this.server.s.pool; | ||
// Execute command | ||
this.server.wireProtocolHandler.killCursor(this.bson, this.ns, this.cursorState, pool, callback); | ||
this.server.wireProtocolHandler.killCursor(this.server, this.ns, this.cursorState, callback); | ||
}; | ||
@@ -599,38 +471,3 @@ | ||
// If we don't have a cursorId execute the first query | ||
if (self.cursorState.cursorId == null) { | ||
// Check if pool is dead and return if not possible to | ||
// execute the query against the db | ||
if (isConnectionDead(self, callback)) return; | ||
// Check if topology is destroyed | ||
if (self.topology.isDestroyed()) | ||
return callback( | ||
new MongoNetworkError('connection destroyed, not possible to instantiate cursor') | ||
); | ||
// query, cmd, options, cursorState, callback | ||
self._find(function(err) { | ||
if (err) return handleCallback(callback, err, null); | ||
if (self.cursorState.cursorId && self.cursorState.cursorId.isZero() && self._endSession) { | ||
self._endSession(); | ||
} | ||
if ( | ||
self.cursorState.documents.length === 0 && | ||
self.cursorState.cursorId && | ||
self.cursorState.cursorId.isZero() && | ||
!self.cmd.tailable && | ||
!self.cmd.awaitData | ||
) { | ||
return setCursorNotified(self, callback); | ||
} | ||
nextFunction(self, callback); | ||
}); | ||
} else if ( | ||
self.cursorState.limit > 0 && | ||
self.cursorState.currentLimit >= self.cursorState.limit | ||
) { | ||
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) { | ||
// Ensure we kill the cursor on the server | ||
@@ -787,11 +624,5 @@ self.kill(); | ||
if (err) { | ||
// Handle the error and add object to next method call | ||
if (cursor.disconnectHandler != null) { | ||
return cursor.disconnectHandler.addObjectAndMethod( | ||
'cursor', | ||
cursor, | ||
'next', | ||
[callback], | ||
callback | ||
); | ||
const disconnectHandler = cursor.disconnectHandler; | ||
if (disconnectHandler != null) { | ||
return disconnectHandler.addObjectAndMethod('cursor', cursor, 'next', [callback], callback); | ||
} | ||
@@ -803,7 +634,3 @@ | ||
cursor.server = server; | ||
// Set as init | ||
cursor.cursorState.init = true; | ||
// error if collation not supported | ||
if (collationNotSupported(cursor.server, cursor.cmd)) { | ||
@@ -813,21 +640,124 @@ return callback(new MongoError(`server ${cursor.server.name} does not support collation`)); | ||
try { | ||
cursor.query = cursor.server.wireProtocolHandler.command( | ||
cursor.bson, | ||
function done() { | ||
if ( | ||
cursor.cursorState.cursorId && | ||
cursor.cursorState.cursorId.isZero() && | ||
cursor._endSession | ||
) { | ||
cursor._endSession(); | ||
} | ||
if ( | ||
cursor.cursorState.documents.length === 0 && | ||
cursor.cursorState.cursorId && | ||
cursor.cursorState.cursorId.isZero() && | ||
!cursor.cmd.tailable && | ||
!cursor.cmd.awaitData | ||
) { | ||
return setCursorNotified(cursor, callback); | ||
} | ||
nextFunction(cursor, callback); | ||
} | ||
// NOTE: this is a special internal method for cloning a cursor, consider removing | ||
if (cursor.cursorState.cursorId != null) { | ||
return done(); | ||
} | ||
const queryCallback = (err, r) => { | ||
if (err) return callback(err); | ||
const result = r.message; | ||
if (result.queryFailure) { | ||
return callback(new MongoError(result.documents[0]), null); | ||
} | ||
// Check if we have a command cursor | ||
if ( | ||
Array.isArray(result.documents) && | ||
result.documents.length === 1 && | ||
(!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) && | ||
(typeof result.documents[0].cursor !== 'string' || | ||
result.documents[0]['$err'] || | ||
result.documents[0]['errmsg'] || | ||
Array.isArray(result.documents[0].result)) | ||
) { | ||
// We have an error document, return the error | ||
if (result.documents[0]['$err'] || result.documents[0]['errmsg']) { | ||
return callback(new MongoError(result.documents[0]), null); | ||
} | ||
// We have a cursor document | ||
if (result.documents[0].cursor != null && typeof result.documents[0].cursor !== 'string') { | ||
var id = result.documents[0].cursor.id; | ||
// If we have a namespace change set the new namespace for getmores | ||
if (result.documents[0].cursor.ns) { | ||
cursor.ns = result.documents[0].cursor.ns; | ||
} | ||
// Promote id to long if needed | ||
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id; | ||
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId; | ||
cursor.cursorState.operationTime = result.documents[0].operationTime; | ||
// If we have a firstBatch set it | ||
if (Array.isArray(result.documents[0].cursor.firstBatch)) { | ||
cursor.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse(); | ||
} | ||
// Return after processing command cursor | ||
return done(result); | ||
} | ||
if (Array.isArray(result.documents[0].result)) { | ||
cursor.cursorState.documents = result.documents[0].result; | ||
cursor.cursorState.cursorId = Long.ZERO; | ||
return done(result); | ||
} | ||
} | ||
// Otherwise fall back to regular find path | ||
cursor.cursorState.cursorId = result.cursorId; | ||
cursor.cursorState.documents = result.documents; | ||
cursor.cursorState.lastCursorId = result.cursorId; | ||
// Transform the results with passed in transformation method if provided | ||
if ( | ||
cursor.cursorState.transforms && | ||
typeof cursor.cursorState.transforms.query === 'function' | ||
) { | ||
cursor.cursorState.documents = cursor.cursorState.transforms.query(result); | ||
} | ||
// Return callback | ||
done(result); | ||
}; | ||
if (cursor.logger.isDebug()) { | ||
cursor.logger.debug( | ||
`issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify( | ||
cursor.query | ||
)}]` | ||
); | ||
} | ||
if (cursor.cmd.find != null) { | ||
cursor.server.wireProtocolHandler.query( | ||
cursor.server, | ||
cursor.ns, | ||
cursor.cmd, | ||
cursor.cursorState, | ||
cursor.topology, | ||
cursor.options | ||
cursor.options, | ||
queryCallback | ||
); | ||
if (cursor.query instanceof MongoError) { | ||
return callback(cursor.query); | ||
} | ||
return; | ||
} | ||
// call `nextFunction` again now that we are initialized | ||
nextFunction(cursor, callback); | ||
} catch (err) { | ||
return callback(err); | ||
} | ||
cursor.query = cursor.server.wireProtocolHandler.command( | ||
cursor.server, | ||
cursor.ns, | ||
cursor.cmd, | ||
cursor.options, | ||
queryCallback | ||
); | ||
}); | ||
@@ -834,0 +764,0 @@ } |
@@ -159,6 +159,4 @@ 'use strict'; | ||
// Are we executing against a specific topology | ||
const topology = options.topology || {}; | ||
// Create the query object | ||
const query = this.s.wireProtocolHandler.command(this.s.bson, ns, cmd, {}, topology, options); | ||
const query = this.s.wireProtocolHandler.command(this, ns, cmd, {}, options); | ||
// Set slave OK of the query | ||
@@ -165,0 +163,0 @@ query.slaveOk = options.readPreference ? options.readPreference.slaveOk() : false; |
@@ -289,3 +289,3 @@ 'use strict'; | ||
const server = new Server( | ||
Object.assign({}, self.s.options, x, { | ||
Object.assign({}, self.s.options, x, options, { | ||
authProviders: self.authProviders, | ||
@@ -292,0 +292,0 @@ reconnect: false, |
@@ -963,8 +963,10 @@ 'use strict'; | ||
this.s.connectOptions = options || {}; | ||
// Set connecting state | ||
stateTransition(this, CONNECTING); | ||
// Create server instances | ||
var servers = this.s.seedlist.map(function(x) { | ||
return new Server( | ||
Object.assign({}, self.s.options, x, { | ||
Object.assign({}, self.s.options, x, options, { | ||
authProviders: self.authProviders, | ||
@@ -971,0 +973,0 @@ reconnect: false, |
@@ -758,29 +758,3 @@ 'use strict'; | ||
// Are we executing against a specific topology | ||
var topology = options.topology || {}; | ||
// Create the query object | ||
var query = self.wireProtocolHandler.command(self.s.bson, ns, cmd, {}, topology, options); | ||
if (query instanceof MongoError) { | ||
return callback(query, null); | ||
} | ||
// Set slave OK of the query | ||
query.slaveOk = options.readPreference ? options.readPreference.slaveOk() : false; | ||
// Write options | ||
var writeOptions = { | ||
raw: typeof options.raw === 'boolean' ? options.raw : false, | ||
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, | ||
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, | ||
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false, | ||
command: true, | ||
monitoring: typeof options.monitoring === 'boolean' ? options.monitoring : false, | ||
fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false, | ||
requestId: query.requestId, | ||
socketTimeout: typeof options.socketTimeout === 'number' ? options.socketTimeout : null, | ||
session: options.session || null | ||
}; | ||
// Write the operation to the pool | ||
self.s.pool.write(query, writeOptions, callback); | ||
self.wireProtocolHandler.command(self, ns, cmd, options, callback); | ||
}; | ||
@@ -816,3 +790,3 @@ | ||
// Execute write | ||
return self.wireProtocolHandler.insert(self.s.pool, ns, self.s.bson, ops, options, callback); | ||
return self.wireProtocolHandler.insert(self, ns, ops, options, callback); | ||
}; | ||
@@ -852,3 +826,3 @@ | ||
// Execute write | ||
return self.wireProtocolHandler.update(self.s.pool, ns, self.s.bson, ops, options, callback); | ||
return self.wireProtocolHandler.update(self, ns, ops, options, callback); | ||
}; | ||
@@ -888,3 +862,3 @@ | ||
// Execute write | ||
return self.wireProtocolHandler.remove(self.s.pool, ns, self.s.bson, ops, options, callback); | ||
return self.wireProtocolHandler.remove(self, ns, ops, options, callback); | ||
}; | ||
@@ -891,0 +865,0 @@ |
'use strict'; | ||
var copy = require('../connection/utils').copy, | ||
retrieveBSON = require('../connection/utils').retrieveBSON, | ||
KillCursor = require('../connection/commands').KillCursor, | ||
GetMore = require('../connection/commands').GetMore, | ||
Query = require('../connection/commands').Query, | ||
f = require('util').format, | ||
MongoError = require('../error').MongoError, | ||
getReadPreference = require('./shared').getReadPreference; | ||
const retrieveBSON = require('../connection/utils').retrieveBSON; | ||
const KillCursor = require('../connection/commands').KillCursor; | ||
const GetMore = require('../connection/commands').GetMore; | ||
const Query = require('../connection/commands').Query; | ||
const MongoError = require('../error').MongoError; | ||
const getReadPreference = require('./shared').getReadPreference; | ||
const applyCommonQueryOptions = require('./shared').applyCommonQueryOptions; | ||
const isMongos = require('./shared').isMongos; | ||
const databaseNamespace = require('./shared').databaseNamespace; | ||
const collectionNamespace = require('./shared').collectionNamespace; | ||
var BSON = retrieveBSON(), | ||
Long = BSON.Long; | ||
const BSON = retrieveBSON(); | ||
const Long = BSON.Long; | ||
var WireProtocol = function() {}; | ||
// | ||
// Execute a write operation | ||
var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callback) { | ||
if (ops.length === 0) throw new MongoError('insert must contain at least one document'); | ||
if (typeof options === 'function') { | ||
callback = options; | ||
options = {}; | ||
options = options || {}; | ||
class WireProtocol { | ||
insert(server, ns, ops, options, callback) { | ||
executeWrite(this, server, 'insert', 'documents', ns, ops, options, callback); | ||
} | ||
// Split the ns up to get db and collection | ||
var p = ns.split('.'); | ||
var d = p.shift(); | ||
// Options | ||
var ordered = typeof options.ordered === 'boolean' ? options.ordered : true; | ||
var writeConcern = options.writeConcern; | ||
// return skeleton | ||
var writeCommand = {}; | ||
writeCommand[type] = p.join('.'); | ||
writeCommand[opsField] = ops; | ||
writeCommand.ordered = ordered; | ||
// Did we specify a write concern | ||
if (writeConcern && Object.keys(writeConcern).length > 0) { | ||
writeCommand.writeConcern = writeConcern; | ||
update(server, ns, ops, options, callback) { | ||
executeWrite(this, server, 'update', 'updates', ns, ops, options, callback); | ||
} | ||
// Do we have bypassDocumentValidation set, then enable it on the write command | ||
if (options.bypassDocumentValidation === true) { | ||
writeCommand.bypassDocumentValidation = options.bypassDocumentValidation; | ||
remove(server, ns, ops, options, callback) { | ||
executeWrite(this, server, 'delete', 'deletes', ns, ops, options, callback); | ||
} | ||
// Options object | ||
var opts = { command: true }; | ||
if (typeof options.session !== 'undefined') opts.session = options.session; | ||
var queryOptions = { checkKeys: false, numberToSkip: 0, numberToReturn: 1 }; | ||
if (type === 'insert') queryOptions.checkKeys = true; | ||
if (typeof options.checkKeys === 'boolean') queryOptions.checkKeys = options.checkKeys; | ||
// Ensure we support serialization of functions | ||
if (options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions; | ||
// Do not serialize the undefined fields | ||
if (options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined; | ||
killCursor(server, ns, cursorState, callback) { | ||
const bson = server.s.bson; | ||
const pool = server.s.pool; | ||
const cursorId = cursorState.cursorId; | ||
const killCursor = new KillCursor(bson, ns, [cursorId]); | ||
const options = { | ||
immediateRelease: true, | ||
noResponse: true | ||
}; | ||
try { | ||
// Create write command | ||
var cmd = new Query(bson, f('%s.$cmd', d), writeCommand, queryOptions); | ||
// Execute command | ||
pool.write(cmd, opts, callback); | ||
} catch (err) { | ||
callback(err); | ||
if (typeof cursorState.session === 'object') { | ||
options.session = cursorState.session; | ||
} | ||
if (pool && pool.isConnected()) { | ||
try { | ||
pool.write(killCursor, options, callback); | ||
} catch (err) { | ||
if (typeof callback === 'function') { | ||
callback(err, null); | ||
} else { | ||
console.warn(err); | ||
} | ||
} | ||
} | ||
} | ||
}; | ||
// | ||
// Needs to support legacy mass insert as well as ordered/unordered legacy | ||
// emulation | ||
// | ||
WireProtocol.prototype.insert = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback); | ||
}; | ||
getMore(server, ns, cursorState, batchSize, options, callback) { | ||
const bson = server.s.bson; | ||
const getMore = new GetMore(bson, ns, cursorState.cursorId, { numberToReturn: batchSize }); | ||
function queryCallback(err, result) { | ||
if (err) return callback(err); | ||
const response = result.message; | ||
WireProtocol.prototype.update = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback); | ||
}; | ||
// If we have a timed out query or a cursor that was killed | ||
if (response.cursorNotFound) { | ||
return callback(new MongoError('Cursor does not exist, was killed, or timed out'), null); | ||
} | ||
WireProtocol.prototype.remove = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback); | ||
}; | ||
const cursorId = | ||
typeof response.cursorId === 'number' | ||
? Long.fromNumber(response.cursorId) | ||
: response.cursorId; | ||
WireProtocol.prototype.killCursor = function(bson, ns, cursorState, pool, callback) { | ||
var cursorId = cursorState.cursorId; | ||
// Create a kill cursor command | ||
var killCursor = new KillCursor(bson, ns, [cursorId]); | ||
cursorState.documents = response.documents; | ||
cursorState.cursorId = cursorId; | ||
// Build killCursor options | ||
const options = { | ||
immediateRelease: true, | ||
noResponse: true | ||
}; | ||
callback(null, null, response.connection); | ||
} | ||
if (typeof cursorState.session === 'object') { | ||
options.session = cursorState.session; | ||
const queryOptions = applyCommonQueryOptions({}, cursorState); | ||
server.s.pool.write(getMore, queryOptions, queryCallback); | ||
} | ||
// Execute the kill cursor command | ||
if (pool && pool.isConnected()) { | ||
try { | ||
pool.write(killCursor, options, callback); | ||
} catch (err) { | ||
if (typeof callback === 'function') { | ||
callback(err, null); | ||
} else { | ||
console.warn(err); | ||
} | ||
query(server, ns, cmd, cursorState, options, callback) { | ||
if (cursorState.cursorId != null) { | ||
return; | ||
} | ||
} | ||
}; | ||
WireProtocol.prototype.getMore = function( | ||
bson, | ||
ns, | ||
cursorState, | ||
batchSize, | ||
raw, | ||
connection, | ||
options, | ||
callback | ||
) { | ||
// Create getMore command | ||
var getMore = new GetMore(bson, ns, cursorState.cursorId, { numberToReturn: batchSize }); | ||
const query = setupClassicFind(server, ns, cmd, cursorState, options); | ||
const queryOptions = applyCommonQueryOptions({}, cursorState); | ||
if (typeof query.documentsReturnedIn === 'string') { | ||
queryOptions.documentsReturnedIn = query.documentsReturnedIn; | ||
} | ||
// Query callback | ||
var queryCallback = function(err, result) { | ||
if (err) return callback(err); | ||
// Get the raw message | ||
var r = result.message; | ||
server.s.pool.write(query, queryOptions, callback); | ||
} | ||
// If we have a timed out query or a cursor that was killed | ||
if ((r.responseFlags & (1 << 0)) !== 0) { | ||
return callback(new MongoError('cursor does not exist, was killed or timed out'), null); | ||
command(server, ns, cmd, options, callback) { | ||
if (cmd == null) { | ||
return callback(new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`)); | ||
} | ||
// Ensure we have a Long valie cursor id | ||
var cursorId = typeof r.cursorId === 'number' ? Long.fromNumber(r.cursorId) : r.cursorId; | ||
options = options || {}; | ||
const bson = server.s.bson; | ||
const pool = server.s.pool; | ||
const readPreference = getReadPreference(cmd, options); | ||
// Set all the values | ||
cursorState.documents = r.documents; | ||
cursorState.cursorId = cursorId; | ||
let finalCmd = Object.assign({}, cmd); | ||
if (finalCmd.readConcern) { | ||
if (finalCmd.readConcern.level !== 'local') { | ||
return callback( | ||
new MongoError( | ||
`server ${JSON.stringify(finalCmd)} command does not support a readConcern level of ${ | ||
finalCmd.readConcern.level | ||
}` | ||
) | ||
); | ||
} | ||
// Return | ||
callback(null, null, r.connection); | ||
}; | ||
delete finalCmd['readConcern']; | ||
} | ||
// Contains any query options | ||
var queryOptions = {}; | ||
if (isMongos(server) && readPreference && readPreference.preference !== 'primary') { | ||
finalCmd = { | ||
$query: finalCmd, | ||
$readPreference: readPreference.toJSON() | ||
}; | ||
} | ||
// If we have a raw query decorate the function | ||
if (raw) { | ||
queryOptions.raw = raw; | ||
} | ||
const commandOptions = Object.assign( | ||
{ | ||
command: true, | ||
slaveOk: readPreference.slaveOk(), | ||
numberToSkip: 0, | ||
numberToReturn: -1, | ||
checkKeys: false | ||
}, | ||
options | ||
); | ||
// Check if we need to promote longs | ||
if (typeof cursorState.promoteLongs === 'boolean') { | ||
queryOptions.promoteLongs = cursorState.promoteLongs; | ||
try { | ||
const query = new Query(bson, `${databaseNamespace(ns)}.$cmd`, finalCmd, commandOptions); | ||
pool.write(query, commandOptions, callback); | ||
} catch (err) { | ||
callback(err); | ||
} | ||
} | ||
} | ||
if (typeof cursorState.promoteValues === 'boolean') { | ||
queryOptions.promoteValues = cursorState.promoteValues; | ||
function executeWrite(handler, server, type, opsField, ns, ops, options, callback) { | ||
if (ops.length === 0) throw new MongoError('insert must contain at least one document'); | ||
if (typeof options === 'function') { | ||
callback = options; | ||
options = {}; | ||
options = options || {}; | ||
} | ||
if (typeof cursorState.promoteBuffers === 'boolean') { | ||
queryOptions.promoteBuffers = cursorState.promoteBuffers; | ||
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; | ||
const writeConcern = options.writeConcern; | ||
const writeCommand = {}; | ||
writeCommand[type] = collectionNamespace(ns); | ||
writeCommand[opsField] = ops; | ||
writeCommand.ordered = ordered; | ||
if (writeConcern && Object.keys(writeConcern).length > 0) { | ||
writeCommand.writeConcern = writeConcern; | ||
} | ||
if (typeof cursorState.session === 'object') { | ||
queryOptions.session = cursorState.session; | ||
if (options.bypassDocumentValidation === true) { | ||
writeCommand.bypassDocumentValidation = options.bypassDocumentValidation; | ||
} | ||
// Write out the getMore command | ||
connection.write(getMore, queryOptions, queryCallback); | ||
}; | ||
const commandOptions = Object.assign( | ||
{ | ||
checkKeys: type === 'insert', | ||
numberToReturn: 1 | ||
}, | ||
options | ||
); | ||
WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) { | ||
// Establish type of command | ||
if (cmd.find) { | ||
return setupClassicFind(bson, ns, cmd, cursorState, topology, options); | ||
} else if (cursorState.cursorId != null) { | ||
return; | ||
} else if (cmd) { | ||
return setupCommand(bson, ns, cmd, cursorState, topology, options); | ||
} else { | ||
throw new MongoError(f('command %s does not return a cursor', JSON.stringify(cmd))); | ||
} | ||
}; | ||
handler.command(server, ns, writeCommand, commandOptions, callback); | ||
} | ||
// | ||
// Execute a find command | ||
var setupClassicFind = function(bson, ns, cmd, cursorState, topology, options) { | ||
// Ensure we have at least some options | ||
function setupClassicFind(server, ns, cmd, cursorState, options) { | ||
options = options || {}; | ||
// Get the readPreference | ||
var readPreference = getReadPreference(cmd, options); | ||
// Set the optional batchSize | ||
const bson = server.s.bson; | ||
const readPreference = getReadPreference(cmd, options); | ||
cursorState.batchSize = cmd.batchSize || cursorState.batchSize; | ||
var numberToReturn = 0; | ||
// Unpack the limit and batchSize values | ||
let numberToReturn = 0; | ||
if (cursorState.limit === 0) { | ||
@@ -217,12 +204,9 @@ numberToReturn = cursorState.batchSize; | ||
var numberToSkip = cursorState.skip || 0; | ||
// Build actual find command | ||
var findCmd = {}; | ||
const numberToSkip = cursorState.skip || 0; | ||
// We have a Mongos topology, check if we need to add a readPreference | ||
if (topology.type === 'mongos' && readPreference) { | ||
const findCmd = {}; | ||
if (isMongos(server) && readPreference) { | ||
findCmd['$readPreference'] = readPreference.toJSON(); | ||
} | ||
// Add special modifiers to the query | ||
if (cmd.sort) findCmd['$orderby'] = cmd.sort; | ||
@@ -238,3 +222,2 @@ if (cmd.hint) findCmd['$hint'] = cmd.hint; | ||
if (cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS; | ||
if (cmd.explain) { | ||
@@ -247,26 +230,20 @@ // nToReturn must be 0 (match all) or negative (match N and close cursor) | ||
// Add the query | ||
findCmd['$query'] = cmd.query; | ||
// Throw on majority readConcern passed in | ||
if (cmd.readConcern && cmd.readConcern.level !== 'local') { | ||
throw new MongoError( | ||
f('server find command does not support a readConcern level of %s', cmd.readConcern.level) | ||
`server find command does not support a readConcern level of ${cmd.readConcern.level}` | ||
); | ||
} | ||
// Remove readConcern, ensure no failing commands | ||
if (cmd.readConcern) { | ||
cmd = copy(cmd); | ||
cmd = Object.assign({}, cmd); | ||
delete cmd['readConcern']; | ||
} | ||
// Serialize functions | ||
var serializeFunctions = | ||
const serializeFunctions = | ||
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false; | ||
var ignoreUndefined = | ||
const ignoreUndefined = | ||
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false; | ||
// Build Query object | ||
var query = new Query(bson, ns, findCmd, { | ||
const query = new Query(bson, ns, findCmd, { | ||
numberToSkip: numberToSkip, | ||
@@ -281,92 +258,12 @@ numberToReturn: numberToReturn, | ||
// Set query flags | ||
query.slaveOk = readPreference.slaveOk(); | ||
if (typeof cmd.tailable === 'boolean') query.tailable = cmd.tailable; | ||
if (typeof cmd.oplogReplay === 'boolean') query.oplogReplay = cmd.oplogReplay; | ||
if (typeof cmd.noCursorTimeout === 'boolean') query.noCursorTimeout = cmd.noCursorTimeout; | ||
if (typeof cmd.awaitData === 'boolean') query.awaitData = cmd.awaitData; | ||
if (typeof cmd.partial === 'boolean') query.partial = cmd.partial; | ||
// Set up the option bits for wire protocol | ||
if (typeof cmd.tailable === 'boolean') { | ||
query.tailable = cmd.tailable; | ||
} | ||
if (typeof cmd.oplogReplay === 'boolean') { | ||
query.oplogReplay = cmd.oplogReplay; | ||
} | ||
if (typeof cmd.noCursorTimeout === 'boolean') { | ||
query.noCursorTimeout = cmd.noCursorTimeout; | ||
} | ||
if (typeof cmd.awaitData === 'boolean') { | ||
query.awaitData = cmd.awaitData; | ||
} | ||
if (typeof cmd.partial === 'boolean') { | ||
query.partial = cmd.partial; | ||
} | ||
// Return the query | ||
return query; | ||
}; | ||
// | ||
// Set up a command cursor | ||
var setupCommand = function(bson, ns, cmd, cursorState, topology, options) { | ||
// Set empty options object | ||
options = options || {}; | ||
// Get the readPreference | ||
var readPreference = getReadPreference(cmd, options); | ||
// Final query | ||
var finalCmd = {}; | ||
for (var name in cmd) { | ||
finalCmd[name] = cmd[name]; | ||
} | ||
// Build command namespace | ||
var parts = ns.split(/\./); | ||
// Serialize functions | ||
var serializeFunctions = | ||
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false; | ||
var ignoreUndefined = | ||
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false; | ||
// Throw on majority readConcern passed in | ||
if (cmd.readConcern && cmd.readConcern.level !== 'local') { | ||
throw new MongoError( | ||
f( | ||
'server %s command does not support a readConcern level of %s', | ||
JSON.stringify(cmd), | ||
cmd.readConcern.level | ||
) | ||
); | ||
} | ||
// Remove readConcern, ensure no failing commands | ||
if (cmd.readConcern) delete cmd['readConcern']; | ||
// We have a Mongos topology, check if we need to add a readPreference | ||
if (topology.type === 'mongos' && readPreference && readPreference.preference !== 'primary') { | ||
finalCmd = { | ||
$query: finalCmd, | ||
$readPreference: readPreference.toJSON() | ||
}; | ||
} | ||
// Build Query object | ||
var query = new Query(bson, f('%s.$cmd', parts.shift()), finalCmd, { | ||
numberToSkip: 0, | ||
numberToReturn: -1, | ||
checkKeys: false, | ||
serializeFunctions: serializeFunctions, | ||
ignoreUndefined: ignoreUndefined | ||
}); | ||
// Set query flags | ||
query.slaveOk = readPreference.slaveOk(); | ||
// Return the query | ||
return query; | ||
}; | ||
} | ||
module.exports = WireProtocol; |
@@ -12,5 +12,181 @@ 'use strict'; | ||
const TxnState = require('../transactions').TxnState; | ||
const isMongos = require('./shared').isMongos; | ||
const databaseNamespace = require('./shared').databaseNamespace; | ||
const collectionNamespace = require('./shared').collectionNamespace; | ||
const WireProtocol = function() {}; | ||
class WireProtocol { | ||
insert(server, ns, ops, options, callback) { | ||
executeWrite(this, server, 'insert', 'documents', ns, ops, options, callback); | ||
} | ||
update(server, ns, ops, options, callback) { | ||
executeWrite(this, server, 'update', 'updates', ns, ops, options, callback); | ||
} | ||
remove(server, ns, ops, options, callback) { | ||
executeWrite(this, server, 'delete', 'deletes', ns, ops, options, callback); | ||
} | ||
killCursor(server, ns, cursorState, callback) { | ||
callback = typeof callback === 'function' ? callback : () => {}; | ||
const cursorId = cursorState.cursorId; | ||
const killCursorCmd = { | ||
killCursors: collectionNamespace(ns), | ||
cursors: [cursorId] | ||
}; | ||
const options = {}; | ||
if (typeof cursorState.session === 'object') options.session = cursorState.session; | ||
this.command(server, ns, killCursorCmd, options, (err, result) => { | ||
if (err) { | ||
return callback(err); | ||
} | ||
const response = result.message; | ||
if (response.cursorNotFound) { | ||
return callback(new MongoNetworkError('cursor killed or timed out'), null); | ||
} | ||
if (!Array.isArray(response.documents) || response.documents.length === 0) { | ||
return callback( | ||
new MongoError(`invalid killCursors result returned for cursor id ${cursorId}`) | ||
); | ||
} | ||
callback(null, response.documents[0]); | ||
}); | ||
} | ||
getMore(server, ns, cursorState, batchSize, options, callback) { | ||
options = options || {}; | ||
const getMoreCmd = { | ||
getMore: cursorState.cursorId, | ||
collection: collectionNamespace(ns), | ||
batchSize: Math.abs(batchSize) | ||
}; | ||
if (cursorState.cmd.tailable && typeof cursorState.cmd.maxAwaitTimeMS === 'number') { | ||
getMoreCmd.maxTimeMS = cursorState.cmd.maxAwaitTimeMS; | ||
} | ||
function queryCallback(err, result) { | ||
if (err) return callback(err); | ||
const response = result.message; | ||
// If we have a timed out query or a cursor that was killed | ||
if (response.cursorNotFound) { | ||
return callback(new MongoNetworkError('cursor killed or timed out'), null); | ||
} | ||
// Raw, return all the extracted documents | ||
if (cursorState.raw) { | ||
cursorState.documents = response.documents; | ||
cursorState.cursorId = response.cursorId; | ||
return callback(null, response.documents); | ||
} | ||
// We have an error detected | ||
if (response.documents[0].ok === 0) { | ||
return callback(new MongoError(response.documents[0])); | ||
} | ||
// Ensure we have a Long valid cursor id | ||
const cursorId = | ||
typeof response.documents[0].cursor.id === 'number' | ||
? Long.fromNumber(response.documents[0].cursor.id) | ||
: response.documents[0].cursor.id; | ||
cursorState.documents = response.documents[0].cursor.nextBatch; | ||
cursorState.cursorId = cursorId; | ||
callback(null, response.documents[0], response.connection); | ||
} | ||
const commandOptions = Object.assign( | ||
{ | ||
returnFieldSelector: null, | ||
documentsReturnedIn: 'nextBatch' | ||
}, | ||
options | ||
); | ||
this.command(server, ns, getMoreCmd, commandOptions, queryCallback); | ||
} | ||
query(server, ns, cmd, cursorState, options, callback) { | ||
options = options || {}; | ||
if (cursorState.cursorId != null) { | ||
return callback(); | ||
} | ||
if (cmd == null) { | ||
return callback(new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`)); | ||
} | ||
const readPreference = getReadPreference(cmd, options); | ||
const findCmd = prepareFindCommand(server, ns, cmd, cursorState, options); | ||
// NOTE: This actually modifies the passed in cmd, and our code _depends_ on this | ||
// side-effect. Change this ASAP | ||
cmd.virtual = false; | ||
const commandOptions = Object.assign( | ||
{ | ||
documentsReturnedIn: 'firstBatch', | ||
numberToReturn: 1, | ||
slaveOk: readPreference.slaveOk() | ||
}, | ||
options | ||
); | ||
if (cmd.readPreference) commandOptions.readPreference = readPreference; | ||
this.command(server, ns, findCmd, commandOptions, callback); | ||
} | ||
command(server, ns, cmd, options, callback) { | ||
if (typeof options === 'function') (callback = options), (options = {}); | ||
options = options || {}; | ||
if (cmd == null) { | ||
return callback(new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`)); | ||
} | ||
const bson = server.s.bson; | ||
const pool = server.s.pool; | ||
const readPreference = getReadPreference(cmd, options); | ||
let finalCmd = Object.assign({}, cmd); | ||
if (isMongos(server) && readPreference && readPreference.preference !== 'primary') { | ||
finalCmd = { | ||
$query: finalCmd, | ||
$readPreference: readPreference.toJSON() | ||
}; | ||
} | ||
const err = decorateWithSessionsData(finalCmd, options.session, options); | ||
if (err) { | ||
return callback(err); | ||
} | ||
const commandOptions = Object.assign( | ||
{ | ||
command: true, | ||
slaveOk: readPreference.slaveOk(), | ||
numberToSkip: 0, | ||
numberToReturn: -1, | ||
checkKeys: false | ||
}, | ||
options | ||
); | ||
try { | ||
const query = new Query(bson, `${databaseNamespace(ns)}.$cmd`, finalCmd, commandOptions); | ||
pool.write(query, commandOptions, callback); | ||
} catch (err) { | ||
callback(err); | ||
} | ||
} | ||
} | ||
function isTransactionCommand(command) { | ||
@@ -84,5 +260,3 @@ return !!(command.commitTransaction || command.abortTransaction); | ||
// | ||
// Execute a write operation | ||
function executeWrite(pool, bson, type, opsField, ns, ops, options, callback) { | ||
function executeWrite(handler, server, type, opsField, ns, ops, options, callback) { | ||
if (ops.length === 0) throw new MongoError('insert must contain at least one document'); | ||
@@ -95,16 +269,10 @@ if (typeof options === 'function') { | ||
// Split the ns up to get db and collection | ||
const p = ns.split('.'); | ||
const d = p.shift(); | ||
// Options | ||
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true; | ||
const writeConcern = options.writeConcern; | ||
// return skeleton | ||
const writeCommand = {}; | ||
writeCommand[type] = p.join('.'); | ||
writeCommand[type] = collectionNamespace(ns); | ||
writeCommand[opsField] = ops; | ||
writeCommand.ordered = ordered; | ||
// Did we specify a write concern | ||
if (writeConcern && Object.keys(writeConcern).length > 0) { | ||
@@ -114,3 +282,2 @@ writeCommand.writeConcern = writeConcern; | ||
// If we have collation passed in | ||
if (options.collation) { | ||
@@ -124,3 +291,2 @@ for (let i = 0; i < writeCommand[opsField].length; i++) { | ||
// Do we have bypassDocumentValidation set, then enable it on the write command | ||
if (options.bypassDocumentValidation === true) { | ||
@@ -130,340 +296,20 @@ writeCommand.bypassDocumentValidation = options.bypassDocumentValidation; | ||
// optionally decorate command with transactions data | ||
const err = decorateWithSessionsData(writeCommand, options.session, options, callback); | ||
if (err) { | ||
return callback(err, null); | ||
} | ||
const commandOptions = Object.assign( | ||
{ | ||
checkKeys: type === 'insert', | ||
numberToReturn: 1 | ||
}, | ||
options | ||
); | ||
// Options object | ||
const opts = { command: true }; | ||
if (typeof options.session !== 'undefined') opts.session = options.session; | ||
const queryOptions = { checkKeys: false, numberToSkip: 0, numberToReturn: 1 }; | ||
if (type === 'insert') queryOptions.checkKeys = true; | ||
if (typeof options.checkKeys === 'boolean') queryOptions.checkKeys = options.checkKeys; | ||
// Ensure we support serialization of functions | ||
if (options.serializeFunctions) queryOptions.serializeFunctions = options.serializeFunctions; | ||
// Do not serialize the undefined fields | ||
if (options.ignoreUndefined) queryOptions.ignoreUndefined = options.ignoreUndefined; | ||
try { | ||
// Create write command | ||
const cmd = new Query(bson, `${d}.$cmd`, writeCommand, queryOptions); | ||
// Execute command | ||
pool.write(cmd, opts, callback); | ||
} catch (err) { | ||
callback(err); | ||
} | ||
handler.command(server, ns, writeCommand, commandOptions, callback); | ||
} | ||
// | ||
// Needs to support legacy mass insert as well as ordered/unordered legacy | ||
// emulation | ||
// | ||
WireProtocol.prototype.insert = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'insert', 'documents', ns, ops, options, callback); | ||
}; | ||
WireProtocol.prototype.update = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'update', 'updates', ns, ops, options, callback); | ||
}; | ||
WireProtocol.prototype.remove = function(pool, ns, bson, ops, options, callback) { | ||
executeWrite(pool, bson, 'delete', 'deletes', ns, ops, options, callback); | ||
}; | ||
WireProtocol.prototype.killCursor = function(bson, ns, cursorState, pool, callback) { | ||
// Build command namespace | ||
const parts = ns.split(/\./); | ||
// Command namespace | ||
const commandns = `${parts.shift()}.$cmd`; | ||
const cursorId = cursorState.cursorId; | ||
// Create killCursor command | ||
const killcursorCmd = { | ||
killCursors: parts.join('.'), | ||
cursors: [cursorId] | ||
}; | ||
// Build Query object | ||
const query = new Query(bson, commandns, killcursorCmd, { | ||
numberToSkip: 0, | ||
numberToReturn: -1, | ||
checkKeys: false, | ||
returnFieldSelector: null | ||
}); | ||
// Kill cursor callback | ||
function killCursorCallback(err, result) { | ||
if (err) { | ||
if (typeof callback !== 'function') return; | ||
return callback(err); | ||
} | ||
// Result | ||
const r = result.message; | ||
// If we have a timed out query or a cursor that was killed | ||
if ((r.responseFlags & (1 << 0)) !== 0) { | ||
if (typeof callback !== 'function') return; | ||
return callback(new MongoNetworkError('cursor killed or timed out'), null); | ||
} | ||
if (!Array.isArray(r.documents) || r.documents.length === 0) { | ||
if (typeof callback !== 'function') return; | ||
return callback( | ||
new MongoError(`invalid killCursors result returned for cursor id ${cursorId}`) | ||
); | ||
} | ||
// Return the result | ||
if (typeof callback === 'function') { | ||
callback(null, r.documents[0]); | ||
} | ||
} | ||
const options = { command: true }; | ||
if (typeof cursorState.session === 'object') { | ||
options.session = cursorState.session; | ||
} | ||
// Execute the kill cursor command | ||
if (pool && pool.isConnected()) { | ||
try { | ||
pool.write(query, options, killCursorCallback); | ||
} catch (err) { | ||
killCursorCallback(err, null); | ||
} | ||
return; | ||
} | ||
// Callback | ||
if (typeof callback === 'function') callback(null, null); | ||
}; | ||
WireProtocol.prototype.getMore = function( | ||
bson, | ||
ns, | ||
cursorState, | ||
batchSize, | ||
raw, | ||
connection, | ||
options, | ||
callback | ||
) { | ||
options = options || {}; | ||
// Build command namespace | ||
const parts = ns.split(/\./); | ||
// Command namespace | ||
const commandns = `${parts.shift()}.$cmd`; | ||
// Create getMore command | ||
const getMoreCmd = { | ||
getMore: cursorState.cursorId, | ||
collection: parts.join('.'), | ||
batchSize: Math.abs(batchSize) | ||
}; | ||
// optionally decorate command with transactions data | ||
const err = decorateWithSessionsData(getMoreCmd, options.session, options, callback); | ||
if (err) { | ||
return callback(err, null); | ||
} | ||
if (cursorState.cmd.tailable && typeof cursorState.cmd.maxAwaitTimeMS === 'number') { | ||
getMoreCmd.maxTimeMS = cursorState.cmd.maxAwaitTimeMS; | ||
} | ||
// Build Query object | ||
const query = new Query(bson, commandns, getMoreCmd, { | ||
numberToSkip: 0, | ||
numberToReturn: -1, | ||
checkKeys: false, | ||
returnFieldSelector: null | ||
}); | ||
// Query callback | ||
function queryCallback(err, result) { | ||
if (err) return callback(err); | ||
// Get the raw message | ||
const r = result.message; | ||
// If we have a timed out query or a cursor that was killed | ||
if ((r.responseFlags & (1 << 0)) !== 0) { | ||
return callback(new MongoNetworkError('cursor killed or timed out'), null); | ||
} | ||
// Raw, return all the extracted documents | ||
if (raw) { | ||
cursorState.documents = r.documents; | ||
cursorState.cursorId = r.cursorId; | ||
return callback(null, r.documents); | ||
} | ||
// We have an error detected | ||
if (r.documents[0].ok === 0) { | ||
return callback(new MongoError(r.documents[0])); | ||
} | ||
// Ensure we have a Long valid cursor id | ||
const cursorId = | ||
typeof r.documents[0].cursor.id === 'number' | ||
? Long.fromNumber(r.documents[0].cursor.id) | ||
: r.documents[0].cursor.id; | ||
// Set all the values | ||
cursorState.documents = r.documents[0].cursor.nextBatch; | ||
cursorState.cursorId = cursorId; | ||
// Return the result | ||
callback(null, r.documents[0], r.connection); | ||
} | ||
// Query options | ||
const queryOptions = { command: true }; | ||
// If we have a raw query decorate the function | ||
if (raw) { | ||
queryOptions.raw = raw; | ||
} | ||
// Add the result field needed | ||
queryOptions.documentsReturnedIn = 'nextBatch'; | ||
// Check if we need to promote longs | ||
if (typeof cursorState.promoteLongs === 'boolean') { | ||
queryOptions.promoteLongs = cursorState.promoteLongs; | ||
} | ||
if (typeof cursorState.promoteValues === 'boolean') { | ||
queryOptions.promoteValues = cursorState.promoteValues; | ||
} | ||
if (typeof cursorState.promoteBuffers === 'boolean') { | ||
queryOptions.promoteBuffers = cursorState.promoteBuffers; | ||
} | ||
if (typeof cursorState.session === 'object') { | ||
queryOptions.session = cursorState.session; | ||
} | ||
// Write out the getMore command | ||
connection.write(query, queryOptions, queryCallback); | ||
}; | ||
WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology, options) { | ||
options = options || {}; | ||
// Check if this is a wire protocol command or not | ||
const wireProtocolCommand = | ||
typeof options.wireProtocolCommand === 'boolean' ? options.wireProtocolCommand : true; | ||
// Establish type of command | ||
let query; | ||
if (cmd.find && wireProtocolCommand) { | ||
// Create the find command | ||
query = executeFindCommand(bson, ns, cmd, cursorState, topology, options); | ||
// Mark the cmd as virtual | ||
cmd.virtual = false; | ||
// Signal the documents are in the firstBatch value | ||
query.documentsReturnedIn = 'firstBatch'; | ||
} else if (cursorState.cursorId != null) { | ||
return; | ||
} else if (cmd) { | ||
query = setupCommand(bson, ns, cmd, cursorState, topology, options); | ||
} else { | ||
return new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`); | ||
} | ||
if (query instanceof MongoError) { | ||
return query; | ||
} | ||
// optionally decorate query with transaction data | ||
const err = decorateWithSessionsData(query.query, options.session, options); | ||
if (err) { | ||
return err; | ||
} | ||
return query; | ||
}; | ||
// // Command | ||
// { | ||
// find: ns | ||
// , query: <object> | ||
// , limit: <n> | ||
// , fields: <object> | ||
// , skip: <n> | ||
// , hint: <string> | ||
// , explain: <boolean> | ||
// , snapshot: <boolean> | ||
// , batchSize: <n> | ||
// , returnKey: <boolean> | ||
// , maxScan: <n> | ||
// , min: <n> | ||
// , max: <n> | ||
// , showDiskLoc: <boolean> | ||
// , comment: <string> | ||
// , maxTimeMS: <n> | ||
// , raw: <boolean> | ||
// , readPreference: <ReadPreference> | ||
// , tailable: <boolean> | ||
// , oplogReplay: <boolean> | ||
// , noCursorTimeout: <boolean> | ||
// , awaitdata: <boolean> | ||
// , exhaust: <boolean> | ||
// , partial: <boolean> | ||
// } | ||
// FIND/GETMORE SPEC | ||
// { | ||
// “find”: <string>, | ||
// “filter”: { ... }, | ||
// “sort”: { ... }, | ||
// “projection”: { ... }, | ||
// “hint”: { ... }, | ||
// “skip”: <int>, | ||
// “limit”: <int>, | ||
// “batchSize”: <int>, | ||
// “singleBatch”: <bool>, | ||
// “comment”: <string>, | ||
// “maxScan”: <int>, | ||
// “maxTimeMS”: <int>, | ||
// “max”: { ... }, | ||
// “min”: { ... }, | ||
// “returnKey”: <bool>, | ||
// “showRecordId”: <bool>, | ||
// “snapshot”: <bool>, | ||
// “tailable”: <bool>, | ||
// “oplogReplay”: <bool>, | ||
// “noCursorTimeout”: <bool>, | ||
// “awaitData”: <bool>, | ||
// “partial”: <bool>, | ||
// “$readPreference”: { ... } | ||
// } | ||
// | ||
// Execute a find command | ||
function executeFindCommand(bson, ns, cmd, cursorState, topology, options) { | ||
// Ensure we have at least some options | ||
options = options || {}; | ||
// Get the readPreference | ||
const readPreference = getReadPreference(cmd, options); | ||
// Set the optional batchSize | ||
function prepareFindCommand(server, ns, cmd, cursorState) { | ||
cursorState.batchSize = cmd.batchSize || cursorState.batchSize; | ||
// Build command namespace | ||
const parts = ns.split(/\./); | ||
// Command namespace | ||
const commandns = `${parts.shift()}.$cmd`; | ||
// Build actual find command | ||
let findCmd = { | ||
find: parts.join('.') | ||
find: collectionNamespace(ns) | ||
}; | ||
// I we provided a filter | ||
if (cmd.query) { | ||
// Check if the user is passing in the $query parameter | ||
if (cmd.query['$query']) { | ||
@@ -476,6 +322,3 @@ findCmd.filter = cmd.query['$query']; | ||
// Sort value | ||
let sortValue = cmd.sort; | ||
// Handle issue of sort being an Array | ||
if (Array.isArray(sortValue)) { | ||
@@ -486,3 +329,2 @@ const sortObject = {}; | ||
let sortDirection = sortValue[1]; | ||
// Translate the sort order text | ||
if (sortDirection === 'asc') { | ||
@@ -494,8 +336,6 @@ sortDirection = 1; | ||
// Set the sort order | ||
sortObject[sortValue[0]] = sortDirection; | ||
} else { | ||
for (var i = 0; i < sortValue.length; i++) { | ||
for (let i = 0; i < sortValue.length; i++) { | ||
let sortDirection = sortValue[i][1]; | ||
// Translate the sort order text | ||
if (sortDirection === 'asc') { | ||
@@ -507,3 +347,2 @@ sortDirection = 1; | ||
// Set the sort order | ||
sortObject[sortValue[i][0]] = sortDirection; | ||
@@ -516,14 +355,7 @@ } | ||
// Add sort to command | ||
if (cmd.sort) findCmd.sort = sortValue; | ||
// Add a projection to the command | ||
if (cmd.fields) findCmd.projection = cmd.fields; | ||
// Add a hint to the command | ||
if (cmd.hint) findCmd.hint = cmd.hint; | ||
// Add a skip | ||
if (cmd.skip) findCmd.skip = cmd.skip; | ||
// Add a limit | ||
if (cmd.limit) findCmd.limit = cmd.limit; | ||
// Check if we wish to have a singleBatch | ||
if (cmd.limit < 0) { | ||
@@ -534,3 +366,2 @@ findCmd.limit = Math.abs(cmd.limit); | ||
// Add a batchSize | ||
if (typeof cmd.batchSize === 'number') { | ||
@@ -548,44 +379,18 @@ if (cmd.batchSize < 0) { | ||
// If we have comment set | ||
if (cmd.comment) findCmd.comment = cmd.comment; | ||
// If we have maxScan | ||
if (cmd.maxScan) findCmd.maxScan = cmd.maxScan; | ||
// If we have maxTimeMS set | ||
if (cmd.maxTimeMS) findCmd.maxTimeMS = cmd.maxTimeMS; | ||
// If we have min | ||
if (cmd.min) findCmd.min = cmd.min; | ||
// If we have max | ||
if (cmd.max) findCmd.max = cmd.max; | ||
// If we have returnKey set | ||
findCmd.returnKey = cmd.returnKey ? cmd.returnKey : false; | ||
// If we have showDiskLoc set | ||
findCmd.showRecordId = cmd.showDiskLoc ? cmd.showDiskLoc : false; | ||
// If we have snapshot set | ||
if (cmd.snapshot) findCmd.snapshot = cmd.snapshot; | ||
// If we have tailable set | ||
if (cmd.tailable) findCmd.tailable = cmd.tailable; | ||
// If we have oplogReplay set | ||
if (cmd.oplogReplay) findCmd.oplogReplay = cmd.oplogReplay; | ||
// If we have noCursorTimeout set | ||
if (cmd.noCursorTimeout) findCmd.noCursorTimeout = cmd.noCursorTimeout; | ||
// If we have awaitData set | ||
if (cmd.awaitData) findCmd.awaitData = cmd.awaitData; | ||
if (cmd.awaitdata) findCmd.awaitData = cmd.awaitdata; | ||
// If we have partial set | ||
if (cmd.partial) findCmd.partial = cmd.partial; | ||
// If we have collation passed in | ||
if (cmd.collation) findCmd.collation = cmd.collation; | ||
if (cmd.readConcern) findCmd.readConcern = cmd.readConcern; | ||
@@ -600,97 +405,5 @@ // If we have explain, we need to rewrite the find command | ||
// Did we provide a readConcern | ||
if (cmd.readConcern) findCmd.readConcern = cmd.readConcern; | ||
// Set up the serialize and ignoreUndefined fields | ||
const serializeFunctions = | ||
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false; | ||
const ignoreUndefined = | ||
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false; | ||
// We have a Mongos topology, check if we need to add a readPreference | ||
if (topology.type === 'mongos' && readPreference && readPreference.preference !== 'primary') { | ||
findCmd = { | ||
$query: findCmd, | ||
$readPreference: readPreference.toJSON() | ||
}; | ||
} | ||
// optionally decorate query with transaction data | ||
const err = decorateWithSessionsData(findCmd, options.session, options); | ||
if (err) { | ||
return err; | ||
} | ||
// Build Query object | ||
const query = new Query(bson, commandns, findCmd, { | ||
numberToSkip: 0, | ||
numberToReturn: 1, | ||
checkKeys: false, | ||
returnFieldSelector: null, | ||
serializeFunctions: serializeFunctions, | ||
ignoreUndefined: ignoreUndefined | ||
}); | ||
// Set query flags | ||
query.slaveOk = readPreference.slaveOk(); | ||
// Return the query | ||
return query; | ||
return findCmd; | ||
} | ||
// | ||
// Set up a command cursor | ||
function setupCommand(bson, ns, cmd, cursorState, topology, options) { | ||
// Set empty options object | ||
options = options || {}; | ||
// Get the readPreference | ||
const readPreference = getReadPreference(cmd, options); | ||
// Final query | ||
let finalCmd = {}; | ||
for (let name in cmd) { | ||
finalCmd[name] = cmd[name]; | ||
} | ||
// Build command namespace | ||
const parts = ns.split(/\./); | ||
// Serialize functions | ||
const serializeFunctions = | ||
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false; | ||
// Set up the serialize and ignoreUndefined fields | ||
const ignoreUndefined = | ||
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false; | ||
// We have a Mongos topology, check if we need to add a readPreference | ||
if (topology.type === 'mongos' && readPreference && readPreference.preference !== 'primary') { | ||
finalCmd = { | ||
$query: finalCmd, | ||
$readPreference: readPreference.toJSON() | ||
}; | ||
} | ||
// optionally decorate query with transaction data | ||
const err = decorateWithSessionsData(finalCmd, options.session, options); | ||
if (err) { | ||
return err; | ||
} | ||
// Build Query object | ||
const query = new Query(bson, `${parts.shift()}.$cmd`, finalCmd, { | ||
numberToSkip: 0, | ||
numberToReturn: -1, | ||
checkKeys: false, | ||
serializeFunctions: serializeFunctions, | ||
ignoreUndefined: ignoreUndefined | ||
}); | ||
// Set query flags | ||
query.slaveOk = readPreference.slaveOk(); | ||
// Return the query | ||
return query; | ||
} | ||
module.exports = WireProtocol; |
@@ -50,7 +50,53 @@ 'use strict'; | ||
function applyCommonQueryOptions(queryOptions, options) { | ||
Object.assign(queryOptions, { | ||
raw: typeof options.raw === 'boolean' ? options.raw : false, | ||
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, | ||
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, | ||
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false, | ||
monitoring: typeof options.monitoring === 'boolean' ? options.monitoring : false, | ||
fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false | ||
}); | ||
if (typeof options.socketTimeout === 'number') { | ||
queryOptions.socketTimeout = options.socketTimeout; | ||
} | ||
if (options.session) { | ||
queryOptions.session = options.session; | ||
} | ||
if (typeof options.documentsReturnedIn === 'string') { | ||
queryOptions.documentsReturnedIn = options.documentsReturnedIn; | ||
} | ||
return queryOptions; | ||
} | ||
function isMongos(server) { | ||
if (server.type === 'mongos') return true; | ||
if (server.parent && server.parent.type === 'mongos') return true; | ||
// NOTE: handle unified topology | ||
return false; | ||
} | ||
function databaseNamespace(ns) { | ||
return ns.split('.')[0]; | ||
} | ||
function collectionNamespace(ns) { | ||
return ns | ||
.split('.') | ||
.slice(1) | ||
.join('.'); | ||
} | ||
module.exports = { | ||
getReadPreference: getReadPreference, | ||
MESSAGE_HEADER_SIZE: MESSAGE_HEADER_SIZE, | ||
opcodes: opcodes, | ||
parseHeader: parseHeader | ||
getReadPreference, | ||
MESSAGE_HEADER_SIZE, | ||
opcodes, | ||
parseHeader, | ||
applyCommonQueryOptions, | ||
isMongos, | ||
databaseNamespace, | ||
collectionNamespace | ||
}; |
{ | ||
"name": "mongodb-core", | ||
"version": "3.1.9", | ||
"version": "3.1.10", | ||
"description": "Core MongoDB driver functionality, no bells and whistles and meant for integration not end applications", | ||
@@ -14,3 +14,3 @@ "main": "index.js", | ||
"lint": "eslint index.js lib test", | ||
"format": "prettier --print-width 100 --tab-width 2 --single-quote --write index.js 'test/**/*.js' 'lib/**/*.js'", | ||
"format": "prettier --print-width 100 --tab-width 2 --single-quote --write index.js test/**/*.js lib/**/*.js", | ||
"changelog": "conventional-changelog -p angular -i HISTORY.md -s", | ||
@@ -17,0 +17,0 @@ "atlas": "node ./test/atlas.js", |
603554
14981