dse-driver
Advanced tools
Comparing version 1.6.0 to 1.7.0
@@ -236,5 +236,10 @@ /** | ||
if (userOptions.host && !(userOptions.host instanceof Host)) { | ||
throw new TypeError('host must be an instance of Host'); | ||
} | ||
if (!allowContinuousPaging && userOptions.continuousPaging) { | ||
return new errors.NotSupportedError('Continuous paging not allowed, use stream() method instead'); | ||
} | ||
// Using fixed property names is 2 order of magnitude faster than dynamically shallow clone objects | ||
@@ -251,2 +256,3 @@ const result = { | ||
hints: userOptions.hints, | ||
host: userOptions.host, | ||
isIdempotent: ifUndefined(userOptions.isIdempotent, defaultQueryOptions.isIdempotent), | ||
@@ -369,2 +375,4 @@ keyspace: userOptions.keyspace, | ||
exports.continuousPageDefaultSize = continuousPageDefaultSize; | ||
exports.continuousPageDefaultHighWaterMark = continuousPageDefaultHighWaterMark; | ||
exports.continuousPageDefaultHighWaterMark = continuousPageDefaultHighWaterMark; | ||
const Host = require('./host').Host; |
@@ -223,2 +223,23 @@ /** | ||
* <p>For batch queries, an array of such arrays, ordered as with the queries in the batch.</p> | ||
* @property {Host} [host] The host that should handle the query. | ||
* <p> | ||
* Use of this option is <em>heavily discouraged</em> and should only be used in the following cases: | ||
* </p> | ||
* <ol> | ||
* <li> | ||
* Querying node-local tables, such as tables in the <code>system</code> and <code>system_views</code> | ||
* keyspaces. | ||
* </li> | ||
* <li> | ||
* Applying a series of schema changes, where it may be advantageous to execute schema changes in sequence on the | ||
* same node. | ||
* </li> | ||
* </ol> | ||
* <p> | ||
* Configuring a specific host causes the configured | ||
* [LoadBalancingPolicy]{@link module:policies/loadBalancing~LoadBalancingPolicy} to be completely bypassed. | ||
* However, if the load balancing policy dictates that the host is at a | ||
* [distance of ignored]{@link module:types~distance} or there is no active connectivity to the host, the request will | ||
* fail with a [NoHostAvailableError]{@link module:errors~NoHostAvailableError}. | ||
* </p> | ||
* @property {Boolean} [isIdempotent] Defines whether the query can be applied multiple times without changing the result | ||
@@ -225,0 +246,0 @@ * beyond the initial application. |
@@ -251,3 +251,4 @@ /** | ||
ControlConnection.prototype.borrowHostConnection = function (host, callback) { | ||
host.borrowConnection(callback); | ||
// Borrow any open connection, regardless of the keyspace | ||
host.borrowConnection(null, null, callback); | ||
}; | ||
@@ -257,6 +258,7 @@ | ||
* Gets the info from local and peer metadata, reloads the keyspaces metadata and rebuilds tokens. | ||
* @param {Boolean} newNodesUp | ||
* @param {Boolean} initializing Determines whether this function was called in order to initialize the control | ||
* connection the first time. | ||
* @param {Function} [callback] | ||
*/ | ||
ControlConnection.prototype.refreshHosts = function (newNodesUp, callback) { | ||
ControlConnection.prototype.refreshHosts = function (initializing, callback) { | ||
callback = callback || utils.noop; | ||
@@ -289,3 +291,3 @@ // it's possible that this was called as a result of a topology change, but the connection was lost | ||
c.sendStream(request, null, function (err, result) { | ||
self.setPeersInfo(newNodesUp, err, result, next); | ||
self.setPeersInfo(initializing, err, result, next); | ||
}); | ||
@@ -626,3 +628,4 @@ }, | ||
/** | ||
* @param {Boolean} newNodesUp | ||
* @param {Boolean} initializing Determines whether this function was called in order to initialize the control | ||
* connection the first time. | ||
* @param {Error} err | ||
@@ -632,3 +635,3 @@ * @param {ResultSet} result | ||
*/ | ||
ControlConnection.prototype.setPeersInfo = function (newNodesUp, err, result, callback) { | ||
ControlConnection.prototype.setPeersInfo = function (initializing, err, result, callback) { | ||
if (!result || !result.rows || err) { | ||
@@ -646,12 +649,13 @@ return callback(err); | ||
} | ||
peers[endPoint] = true; | ||
let host = self.hosts.get(endPoint); | ||
if (!host) { | ||
let isNewHost = !host; | ||
if (isNewHost) { | ||
host = new Host(endPoint, self.protocolVersion, self.options, self.metadata); | ||
if (!newNodesUp) { | ||
host.setDown(); | ||
} | ||
self.log('info', 'Adding host ' + endPoint); | ||
self.hosts.set(endPoint, host); | ||
isNewHost = true; | ||
} | ||
host.datacenter = row['data_center']; | ||
@@ -663,2 +667,16 @@ host.rack = row['rack']; | ||
if (isNewHost) { | ||
// Add it to the map (and trigger events) after all the properties | ||
// were set to avoid race conditions | ||
self.hosts.set(endPoint, host); | ||
if (!initializing) { | ||
// Set the distance at Host level, that way the connection pool is created with the correct settings | ||
self.profileManager.getDistance(host); | ||
// When we are not initializing, we start with the node set as DOWN | ||
host.setDown(); | ||
} | ||
} | ||
next(); | ||
@@ -665,0 +683,0 @@ }); |
@@ -71,2 +71,9 @@ /** | ||
const unsetValueBuffer = utils.allocBufferFromArray([255, 255, 255, 254]); | ||
const zeroLengthTypesSupported = new Set([ | ||
dataTypes.text, | ||
dataTypes.ascii, | ||
dataTypes.varchar, | ||
dataTypes.custom, | ||
dataTypes.blob | ||
]); | ||
@@ -217,3 +224,3 @@ /** | ||
offset += self.collectionLengthSize; | ||
if (valueLength < 0) { | ||
if (valueLength < 0 || (valueLength === 0 && !zeroLengthTypesSupported.has(subtypes[1].code))) { | ||
callback.call(thisArg, key, null); | ||
@@ -220,0 +227,0 @@ continue; |
@@ -110,8 +110,16 @@ /** | ||
* {@link ClientOptions.socketOptions.readTimeout}. | ||
* @param {String} message The error message. | ||
* @param {String} [host] Address of the server host that caused the operation to time out. | ||
* @constructor | ||
*/ | ||
function OperationTimedOutError(message) { | ||
function OperationTimedOutError(message, host) { | ||
DriverError.call(this, message, this.constructor); | ||
this.info = 'Represents a client-side error that is raised when the client did not hear back from the server ' + | ||
'within socketOptions.readTimeout'; | ||
/** | ||
* When defined, it gets the address of the host that caused the operation to time out. | ||
* @type {String|undefined} | ||
*/ | ||
this.host = host; | ||
} | ||
@@ -118,0 +126,0 @@ |
@@ -14,3 +14,3 @@ /** | ||
const errors = require('./errors'); | ||
const defaultOptions = require('./client-options').defaultOptions(); | ||
const clientOptions = require('./client-options'); | ||
@@ -21,2 +21,4 @@ // Used to get the index of the connection with less in-flight requests | ||
let defaultOptions; | ||
/** | ||
@@ -74,5 +76,6 @@ * Represents the possible states of the pool. | ||
* Borrows a connection from the pool. | ||
* @param {String} keyspace | ||
* @param {Function} callback | ||
*/ | ||
borrowConnection(callback) { | ||
createAndBorrowConnection(keyspace, callback) { | ||
this.create(false, err => { | ||
@@ -82,14 +85,32 @@ if (err) { | ||
} | ||
if (this.connections.length === 0) { | ||
// Normally it should have callback in error, but better to handle this possibility | ||
return callback(new Error('No connection available')); | ||
} | ||
const maxRequests = this.options.pooling.maxRequestsPerConnection; | ||
const c = HostConnectionPool.minInFlight(this.connections, maxRequests); | ||
this.borrowConnection(keyspace, null, callback); | ||
}); | ||
} | ||
if (c.getInFlight() >= maxRequests) { | ||
return callback(new errors.BusyConnectionError(this._address, maxRequests, this.connections.length)); | ||
} | ||
callback(null, c); | ||
/** | ||
* Tries to borrow one of the existing connections from the pool. | ||
* @param {Connection} previousConnection When provided, the pool should try to provide a different connection. | ||
* @param {String} keyspace | ||
* @param {Function} callback | ||
*/ | ||
borrowConnection(keyspace, previousConnection, callback) { | ||
if (this.connections.length === 0) { | ||
return callback(new Error('No connection available')); | ||
} | ||
const maxRequests = this.options.pooling.maxRequestsPerConnection; | ||
const c = HostConnectionPool.minInFlight(this.connections, maxRequests, previousConnection); | ||
if (c.getInFlight() >= maxRequests) { | ||
return callback(new errors.BusyConnectionError(this._address, maxRequests, this.connections.length)); | ||
} | ||
if (!keyspace || keyspace === c.keyspace) { | ||
// Connection is ready to be used | ||
return callback(null, c); | ||
} | ||
c.changeKeyspace(keyspace, (err) => { | ||
callback(err, c); | ||
}); | ||
@@ -104,5 +125,6 @@ } | ||
* @param {Number} maxRequests | ||
* @param {Connection} previousConnection | ||
* @returns {Connection} | ||
*/ | ||
static minInFlight(connections, maxRequests) { | ||
static minInFlight(connections, maxRequests, previousConnection) { | ||
const length = connections.length; | ||
@@ -122,6 +144,17 @@ if (length === 1) { | ||
current = connections[index % length]; | ||
const next = connections[(index + 1) % length]; | ||
if (current === previousConnection) { | ||
// Increment the index and skip | ||
current = connections[(++index) % length]; | ||
} | ||
let next = connections[(index + 1) % length]; | ||
if (next === previousConnection) { | ||
// Skip | ||
next = connections[(index + 2) % length]; | ||
} | ||
if (next.getInFlight() < current.getInFlight()) { | ||
current = next; | ||
} | ||
if (current.getInFlight() < maxRequests) { | ||
@@ -376,3 +409,3 @@ // Check as few connections as possible, as long as the amount of in-flight | ||
// Check that after sometime (readTimeout + 100ms) the connections have been drained | ||
const delay = (this.options.socketOptions.readTimeout || defaultOptions.socketOptions.readTimeout) + 100; | ||
const delay = (this.options.socketOptions.readTimeout || getDefaultOptions().socketOptions.readTimeout) + 100; | ||
checkShutdownTimeout = setTimeout(function checkShutdown() { | ||
@@ -454,2 +487,10 @@ wasClosed = true; | ||
/** Lazily loads the default options */ | ||
function getDefaultOptions() { | ||
if (defaultOptions === undefined) { | ||
defaultOptions = clientOptions.defaultOptions(); | ||
} | ||
return defaultOptions; | ||
} | ||
module.exports = HostConnectionPool; |
@@ -88,2 +88,3 @@ /** | ||
this.reconnectionSchedule = this.options.policies.reconnection.newSchedule(); | ||
this.reconnectionDelay = 0; | ||
} | ||
@@ -94,3 +95,4 @@ | ||
/** | ||
* Marks this host as not available for query coordination. | ||
* Marks this host as not available for query coordination, when the host was previously marked as UP, otherwise its | ||
* a no-op. | ||
* @internal | ||
@@ -100,3 +102,3 @@ * @ignore | ||
Host.prototype.setDown = function() { | ||
// multiple events signaling that a host is failing could cause multiple calls to this method | ||
// Multiple events signaling that a host is failing could cause multiple calls to this method | ||
if (this.setDownAt !== 0) { | ||
@@ -106,2 +108,3 @@ // the host is already marked as Down | ||
} | ||
if (this.pool.isClosing()) { | ||
@@ -111,10 +114,12 @@ // the pool is being closed/shutdown, don't mind | ||
} | ||
this.setDownAt = Date.now(); | ||
if (this._distance !== types.distance.ignored) { | ||
this.log('warning', | ||
util.format('Host %s considered as DOWN. Reconnection delay %dms.', this.address, this.reconnectionDelay)); | ||
if (this.pool.coreConnectionsLength > 0) { | ||
// According to the distance, there should be connections open to it => issue a warning | ||
this.log('warning', `Host ${this.address} considered as DOWN. Reconnection delay ${this.reconnectionDelay}ms.`); | ||
} else { | ||
this.log('info', `Host ${this.address} considered as DOWN.`); | ||
} | ||
else { | ||
this.log('info', util.format('Host %s considered as DOWN.', this.address)); | ||
} | ||
this.emit('down'); | ||
@@ -239,2 +244,6 @@ this._checkPoolState(); | ||
* If there isn't an available connections, it will open a new one according to the pooling options. | ||
* @param {String} keyspace The keyspace that the connection must be using. When the keyspace provided is null, no | ||
* keyspace check is performed. | ||
* @param {Connection} previousConnection The previous connection. When provided, the pool should try to provide a | ||
* different connection. | ||
* @param {Function} callback | ||
@@ -244,4 +253,9 @@ * @internal | ||
*/ | ||
Host.prototype.borrowConnection = function (callback) { | ||
this.pool.borrowConnection(callback); | ||
Host.prototype.borrowConnection = function (keyspace, previousConnection, callback) { | ||
if (previousConnection) { | ||
// Obtain one of the existing connections | ||
return this.pool.borrowConnection(keyspace, previousConnection, callback); | ||
} | ||
this.pool.createAndBorrowConnection(keyspace, callback); | ||
}; | ||
@@ -305,4 +319,5 @@ | ||
} | ||
if (this.pool.connections.length < this.pool.coreConnectionsLength) { | ||
// the pool still needs to grow | ||
// the pool needs to grow / reconnect | ||
if (!this.pool.hasScheduledNewConnection()) { | ||
@@ -313,5 +328,7 @@ this.reconnectionDelay = this.reconnectionSchedule.next().value; | ||
} | ||
if (this._distance !== types.distance.ignored && | ||
this.pool.connections.length === 0 && | ||
this.pool.coreConnectionsLength > 0) { | ||
const shouldHaveConnections = this._distance !== types.distance.ignored && this.pool.coreConnectionsLength > 0; | ||
if (shouldHaveConnections && this.pool.connections.length === 0) { | ||
// Mark as DOWN, if its UP | ||
this.setDown(); | ||
@@ -318,0 +335,0 @@ } |
@@ -70,3 +70,4 @@ /** | ||
/** | ||
* Specifies the probability with which read repairs should be invoked on non-quorum reads. The value must be between 0 and 1. | ||
* Specifies the probability with which read repairs should be invoked on non-quorum reads. The value must be | ||
* between 0 and 1. | ||
* @type {number} | ||
@@ -78,3 +79,3 @@ */ | ||
* <p> | ||
* For Cassandra versions prior to 3.0.0, this method always returns {@code null}. | ||
* For Apache Cassandra versions prior to 3.0.0, this method always returns <code>null</code>. | ||
* </p> | ||
@@ -89,3 +90,3 @@ * @type {Object} | ||
* <p> | ||
* For Cassandra versions prior to 3.0.0, this method always returns {@code null}. | ||
* For Apache Cassandra versions prior to 3.0.0, this method always returns <code>null</code>. | ||
* </p> | ||
@@ -102,5 +103,2 @@ * @type {Number|null} | ||
* Returns the default TTL for this table. | ||
* <p> | ||
* Note: this option is not available in Cassandra 1.2 and will return 0 (no default TTL) when connected to 1.2 nodes. | ||
* </p> | ||
* @type {Number} | ||
@@ -111,6 +109,2 @@ */ | ||
* * Returns the speculative retry option for this table. | ||
* <p> | ||
* Note: this option is not available in Cassandra 1.2 and will return "NONE" (no speculative retry) when connected | ||
* to 1.2 nodes. | ||
* </p> | ||
* @type {String} | ||
@@ -122,3 +116,4 @@ */ | ||
* <p> | ||
* Note: this option is available in Cassandra 2.1 and above, and will return {@code null} for earlier versions. | ||
* Note: this option is available in Apache Cassandra 2.1 and above, and will return <code>null</code> for | ||
* earlier versions. | ||
* </p> | ||
@@ -131,3 +126,4 @@ * @type {Number|null} | ||
* <p> | ||
* Note: this option is available in Cassandra 2.1 and above, and will return {@code null} for earlier versions. | ||
* Note: this option is available in Apache Cassandra 2.1 and above, and will return <code>null</code> for | ||
* earlier versions. | ||
* </p> | ||
@@ -134,0 +130,0 @@ * @type {Number|null} |
@@ -68,3 +68,3 @@ /** | ||
this.initialized = false; | ||
this._schemaParser = schemaParserFactory.getByVersion(controlConnection, this.getUdt.bind(this)); | ||
this._schemaParser = schemaParserFactory.getByVersion(options, controlConnection, this.getUdt.bind(this)); | ||
const self = this; | ||
@@ -84,3 +84,3 @@ this._preparedQueries = new PreparedQueries(options.maxPrepared, function () { | ||
this._schemaParser = schemaParserFactory.getByVersion( | ||
this.controlConnection, this.getUdt.bind(this), version, this._schemaParser); | ||
this.options, this.controlConnection, this.getUdt.bind(this), version, this._schemaParser); | ||
}; | ||
@@ -512,2 +512,3 @@ | ||
let cache; | ||
let virtual; | ||
if (this.options.isMetadataSyncEnabled) { | ||
@@ -519,4 +520,5 @@ const keyspace = this.keyspaces[keyspaceName]; | ||
cache = keyspace.tables; | ||
virtual = keyspace.virtual; | ||
} | ||
this._schemaParser.getTable(keyspaceName, name, cache, callback); | ||
this._schemaParser.getTable(keyspaceName, name, cache, virtual, callback); | ||
}; | ||
@@ -523,0 +525,0 @@ |
@@ -23,3 +23,3 @@ /** | ||
* @param {String} target | ||
* @param {Number} kind | ||
* @param {Number|String} kind | ||
* @param {Object} options | ||
@@ -44,3 +44,3 @@ * @alias module:metadata~Index | ||
*/ | ||
this.kind = kind; | ||
this.kind = typeof kind === 'string' ? getKindByName(kind) : kind; | ||
/** | ||
@@ -79,2 +79,3 @@ * An associative array containing the index options | ||
* Parses Index information from rows in the 'system_schema.indexes' table | ||
* @deprecated It will be removed in the next major version. | ||
* @param {Array.<Row>} indexRows | ||
@@ -94,3 +95,4 @@ * @returns {Array.<Index>} | ||
/** | ||
* Parses Index information from rows in the legacy 'system.schema_columns' table | ||
* Parses Index information from rows in the legacy 'system.schema_columns' table. | ||
* @deprecated It will be removed in the next major version. | ||
* @param {Array.<Row>} columnRows | ||
@@ -97,0 +99,0 @@ * @param {Object.<String, {name, type}>} columnsByName |
@@ -40,4 +40,11 @@ /** | ||
const _selectAllVirtualKeyspaces = "SELECT * FROM system_virtual_schema.keyspaces"; | ||
const _selectSingleVirtualKeyspace = "SELECT * FROM system_virtual_schema.keyspaces where keyspace_name = '%s'"; | ||
const _selectVirtualTable = "SELECT * FROM system_virtual_schema.tables where keyspace_name = '%s' and table_name='%s'"; | ||
const _selectVirtualColumns = "SELECT * FROM system_virtual_schema.columns where keyspace_name = '%s' and table_name='%s'"; | ||
/** | ||
* @abstract | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc | ||
@@ -47,4 +54,5 @@ * @constructor | ||
*/ | ||
function SchemaParser(cc) { | ||
function SchemaParser(options, cc) { | ||
this.cc = cc; | ||
this.encodingOptions = options.encoding; | ||
this.selectTable = null; | ||
@@ -56,2 +64,3 @@ this.selectColumns = null; | ||
this.selectFunctions = null; | ||
this.supportsVirtual = false; | ||
} | ||
@@ -64,6 +73,7 @@ | ||
* @param strategyOptions | ||
* @param virtual | ||
* @returns {{name, durableWrites, strategy, strategyOptions, tokenToReplica, udts, tables, functions, aggregates}|null} | ||
* @protected | ||
*/ | ||
SchemaParser.prototype._createKeyspace = function (name, durableWrites, strategy, strategyOptions) { | ||
SchemaParser.prototype._createKeyspace = function (name, durableWrites, strategy, strategyOptions, virtual) { | ||
const ksInfo = { | ||
@@ -75,2 +85,3 @@ name: name, | ||
tokenToReplica: null, | ||
virtual: virtual === true, | ||
udts: {}, | ||
@@ -106,5 +117,6 @@ tables: {}, | ||
* @param {Object} cache | ||
* @param {Boolean} virtual | ||
* @param {Function} callback | ||
*/ | ||
SchemaParser.prototype.getTable = function (keyspaceName, name, cache, callback) { | ||
SchemaParser.prototype.getTable = function (keyspaceName, name, cache, virtual, callback) { | ||
let tableInfo = cache && cache[name]; | ||
@@ -129,5 +141,7 @@ if (!tableInfo) { | ||
const self = this; | ||
let virtualTable = virtual; | ||
utils.series([ | ||
function getTableRow(next) { | ||
const query = util.format(self.selectTable, keyspaceName, name); | ||
const selectTable = virtualTable ? _selectVirtualTable : self.selectTable; | ||
const query = util.format(selectTable, keyspaceName, name); | ||
self.cc.query(query, function (err, response) { | ||
@@ -141,7 +155,30 @@ if (err) { | ||
}, | ||
function getVirtualTableRow(next) { | ||
// if we weren't sure if table was virtual or not, query virtual schema. | ||
if (!tableRow && self.supportsVirtual && virtualTable === undefined) { | ||
const query = util.format(_selectVirtualTable, keyspaceName, name); | ||
self.cc.query(query, function (err, response) { | ||
if (err) { | ||
// we can't error here as we can't be sure if the node | ||
// supports virtual tables, in this case it is adequate | ||
// to act as if there was no matching table. | ||
return next(); | ||
} | ||
tableRow = response.rows[0]; | ||
// if we got a result, this is a virtual table | ||
if (tableRow) { | ||
virtualTable = true; | ||
} | ||
next(); | ||
}); | ||
} else { | ||
return next(); | ||
} | ||
}, | ||
function getColumnRows (next) { | ||
if (!tableRow) { | ||
return next(null, null, null); | ||
return next(); | ||
} | ||
const query = util.format(self.selectColumns, keyspaceName, name); | ||
const selectColumns = virtualTable ? _selectVirtualColumns : self.selectColumns; | ||
const query = util.format(selectColumns, keyspaceName, name); | ||
self.cc.query(query, function (err, response) { | ||
@@ -156,3 +193,3 @@ if (err) { | ||
function getIndexes(next) { | ||
if (!tableRow || !self.selectIndexes) { | ||
if (!tableRow || !self.selectIndexes || virtualTable) { | ||
//either the table does not exists or it does not support indexes schema table | ||
@@ -175,3 +212,3 @@ return next(); | ||
} | ||
self._parseTableOrView(tableInfo, tableRow, columnRows, indexRows, function (err) { | ||
self._parseTableOrView(tableInfo, tableRow, columnRows, indexRows, virtualTable, function (err) { | ||
tableInfo.loading = false; | ||
@@ -250,6 +287,7 @@ tableInfo.loaded = !err; | ||
* @param {Array.<Row>} indexRows | ||
* @param {Boolean} virtual | ||
* @param {Function} callback | ||
* @throws {Error} | ||
*/ | ||
SchemaParser.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, callback) { | ||
SchemaParser.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, virtual, callback) { | ||
}; | ||
@@ -347,4 +385,33 @@ | ||
/** @returns {Map} */ | ||
SchemaParser.prototype._asMap = function (obj) { | ||
if (!obj) { | ||
return new Map(); | ||
} | ||
if (this.encodingOptions.map && obj instanceof this.encodingOptions.map) { | ||
// Its already a Map or a polyfill of a Map | ||
return obj; | ||
} | ||
return new Map(Object.keys(obj).map(k => [ k, obj[k]])); | ||
}; | ||
SchemaParser.prototype._mapAsObject = function (map) { | ||
if (!map) { | ||
return map; | ||
} | ||
if (this.encodingOptions.map && map instanceof this.encodingOptions.map) { | ||
const result = {}; | ||
map.forEach((value, key) => result[key] = value); | ||
return result; | ||
} | ||
return map; | ||
}; | ||
/** | ||
* Used to parse schema information for Cassandra versions 1.2.x, and 2.x | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc | ||
@@ -354,4 +421,4 @@ * @constructor | ||
*/ | ||
function SchemaParserV1(cc) { | ||
SchemaParser.call(this, cc); | ||
function SchemaParserV1(options, cc) { | ||
SchemaParser.call(this, options, cc); | ||
this.selectTable = _selectTableV1; | ||
@@ -407,3 +474,3 @@ this.selectColumns = _selectColumnsV1; | ||
/** @override */ | ||
SchemaParserV1.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, callback) { | ||
SchemaParserV1.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, virtual, callback) { | ||
let i, c, name, types; | ||
@@ -629,2 +696,3 @@ const encoder = this.cc.getEncoder(); | ||
* Used to parse schema information for Cassandra versions 3.x and above | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc The control connection to be used | ||
@@ -635,4 +703,4 @@ * @param {Function} udtResolver The function to be used to retrieve the udts. | ||
*/ | ||
function SchemaParserV2(cc, udtResolver) { | ||
SchemaParser.call(this, cc); | ||
function SchemaParserV2(options, cc, udtResolver) { | ||
SchemaParser.call(this, options, cc); | ||
this.udtResolver = udtResolver; | ||
@@ -730,3 +798,3 @@ this.selectTable = _selectTableV2; | ||
} | ||
self._parseTableOrView(viewInfo, tableRow, columnRows, null, function (err) { | ||
self._parseTableOrView(viewInfo, tableRow, columnRows, null, false, function (err) { | ||
viewInfo.loading = false; | ||
@@ -740,3 +808,3 @@ viewInfo.loaded = !err; | ||
SchemaParserV2.prototype._parseKeyspace = function (row) { | ||
SchemaParserV2.prototype._parseKeyspace = function (row, virtual) { | ||
const replication = row['replication']; | ||
@@ -759,7 +827,8 @@ let strategy; | ||
strategy, | ||
strategyOptions); | ||
strategyOptions, | ||
virtual); | ||
}; | ||
/** @override */ | ||
SchemaParserV2.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, callback) { | ||
SchemaParserV2.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, virtual, callback) { | ||
const encoder = this.cc.getEncoder(); | ||
@@ -769,2 +838,55 @@ const columnsKeyed = {}; | ||
const clusteringKeys = []; | ||
const self = this; | ||
// maps column rows to columnInfo and also populates columnsKeyed | ||
const columnRowMapper = (row, next) => { | ||
encoder.parseTypeName(tableRow['keyspace_name'], row['type'], 0, null, self.udtResolver, function (err, type) { | ||
if (err) { | ||
return next(err); | ||
} | ||
const c = { | ||
name: row['column_name'], | ||
type: type, | ||
isStatic: false | ||
}; | ||
columnsKeyed[c.name] = c; | ||
switch (row['kind']) { | ||
case 'partition_key': | ||
partitionKeys.push({ c: c, index: (row['position'] || 0)}); | ||
break; | ||
case 'clustering': | ||
clusteringKeys.push({ c: c, index: (row['position'] || 0), order: row['clustering_order'] === 'desc' ? 'DESC' : 'ASC'}); | ||
break; | ||
case 'static': | ||
c.isStatic = true; | ||
break; | ||
} | ||
next(null, c); | ||
}); | ||
}; | ||
// is table is virtual, the only relevant information to parse is the columns as the table itself has no configuration. | ||
if (virtual) { | ||
tableInfo.virtual = true; | ||
utils.map(columnRows, columnRowMapper, function (err, columns) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
tableInfo.columns = columns; | ||
tableInfo.columnsByName = columnsKeyed; | ||
tableInfo.partitionKeys = partitionKeys.sort(utils.propCompare('index')).map(function (item) { | ||
return item.c; | ||
}); | ||
clusteringKeys.sort(utils.propCompare('index')); | ||
tableInfo.clusteringKeys = clusteringKeys.map(function (item) { | ||
return item.c; | ||
}); | ||
tableInfo.clusteringOrder = clusteringKeys.map(function (item) { | ||
return item.order; | ||
}); | ||
callback(); | ||
}); | ||
return; | ||
} | ||
const isView = tableInfo instanceof MaterializedView; | ||
@@ -774,18 +896,24 @@ tableInfo.bloomFilterFalsePositiveChance = tableRow['bloom_filter_fp_chance']; | ||
tableInfo.comment = tableRow['comment']; | ||
const compaction = tableRow['compaction']; | ||
// Regardless of the encoding options, use always an Object to represent an associative Array | ||
const compaction = this._asMap(tableRow['compaction']); | ||
if (compaction) { | ||
// compactionOptions as an Object<String, String> | ||
tableInfo.compactionOptions = {}; | ||
tableInfo.compactionClass = compaction['class']; | ||
for (const key in compaction) { | ||
if (!compaction.hasOwnProperty(key) || key === 'class') { | ||
continue; | ||
tableInfo.compactionClass = compaction.get('class'); | ||
compaction.forEach((value, key) => { | ||
if (key === 'class') { | ||
return; | ||
} | ||
tableInfo.compactionOptions[key] = compaction[key]; | ||
} | ||
tableInfo.compactionOptions[key] = compaction.get(key); | ||
}); | ||
} | ||
tableInfo.compression = tableRow['compression']; | ||
// Convert compression to an Object<String, String> | ||
tableInfo.compression = this._mapAsObject(tableRow['compression']); | ||
tableInfo.gcGraceSeconds = tableRow['gc_grace_seconds']; | ||
tableInfo.localReadRepairChance = tableRow['dclocal_read_repair_chance']; | ||
tableInfo.readRepairChance = tableRow['read_repair_chance']; | ||
tableInfo.extensions = tableRow['extensions']; | ||
tableInfo.extensions = this._mapAsObject(tableRow['extensions']); | ||
tableInfo.crcCheckChance = tableRow['crc_check_chance']; | ||
@@ -804,28 +932,5 @@ tableInfo.memtableFlushPeriod = tableRow['memtable_flush_period_in_ms'] || tableInfo.memtableFlushPeriod; | ||
} | ||
const self = this; | ||
utils.map(columnRows, function (row, next) { | ||
encoder.parseTypeName(tableRow['keyspace_name'], row['type'], 0, null, self.udtResolver, function (err, type) { | ||
if (err) { | ||
return next(err); | ||
} | ||
const c = { | ||
name: row['column_name'], | ||
type: type, | ||
isStatic: false | ||
}; | ||
columnsKeyed[c.name] = c; | ||
switch (row['kind']) { | ||
case 'partition_key': | ||
partitionKeys.push({ c: c, index: (row['position'] || 0)}); | ||
break; | ||
case 'clustering': | ||
clusteringKeys.push({ c: c, index: (row['position'] || 0), order: row['clustering_order'] === 'desc' ? 'DESC' : 'ASC'}); | ||
break; | ||
case 'static': | ||
c.isStatic = true; | ||
break; | ||
} | ||
next(null, c); | ||
}); | ||
}, function (err, columns) { | ||
// Map all columns asynchronously | ||
utils.map(columnRows, columnRowMapper, (err, columns) => { | ||
if (err) { | ||
@@ -846,2 +951,3 @@ return callback(err); | ||
}); | ||
if (isView) { | ||
@@ -853,7 +959,15 @@ tableInfo.tableName = tableRow['base_table_name']; | ||
} | ||
tableInfo.indexes = Index.fromRows(indexRows); | ||
const flags = tableRow['flags']; | ||
const isDense = flags.indexOf('dense') >= 0; | ||
const isSuper = flags.indexOf('super') >= 0; | ||
const isCompound = flags.indexOf('compound') >= 0; | ||
tableInfo.indexes = this._getIndexes(indexRows); | ||
// flags can be an instance of Array or Set (real or polyfill) | ||
let flags = tableRow['flags']; | ||
if (Array.isArray(flags)) { | ||
flags = new Set(flags); | ||
} | ||
const isDense = flags.has('dense'); | ||
const isSuper = flags.has('super'); | ||
const isCompound = flags.has('compound'); | ||
tableInfo.isCompact = isSuper || isDense || !isCompound; | ||
@@ -872,2 +986,13 @@ //remove the columns related to Thrift | ||
SchemaParserV2.prototype._getIndexes = function (indexRows) { | ||
if (!indexRows || indexRows.length === 0) { | ||
return utils.emptyArray; | ||
} | ||
return indexRows.map((row) => { | ||
const options = this._mapAsObject(row['options']); | ||
return new Index(row['index_name'], options['target'], row['kind'], options); | ||
}); | ||
}; | ||
/** @override */ | ||
@@ -981,2 +1106,84 @@ SchemaParserV2.prototype._parseAggregate = function (row, callback) { | ||
/** | ||
* Used to parse schema information for Cassandra versions 4.x and above. | ||
* | ||
* This parser similar to [SchemaParserV2] expect it also parses virtual | ||
* keyspaces. | ||
* | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc The control connection to be used | ||
* @param {Function} udtResolver The function to be used to retrieve the udts. | ||
* @constructor | ||
* @ignore | ||
*/ | ||
function SchemaParserV3(options, cc, udtResolver) { | ||
SchemaParserV2.call(this, options, cc, udtResolver); | ||
this.supportsVirtual = true; | ||
} | ||
util.inherits(SchemaParserV3, SchemaParserV2); | ||
/** @override */ | ||
SchemaParserV3.prototype.getKeyspaces = function (waitReconnect, callback) { | ||
const self = this; | ||
const keyspaces = {}; | ||
const queries = [ | ||
{ query: _selectAllKeyspacesV2, virtual: false }, | ||
{ query: _selectAllVirtualKeyspaces, virtual: true } | ||
]; | ||
utils.each(queries, (q, cb) => { | ||
self.cc.query(q.query, waitReconnect, (err, result) => { | ||
if (err) { | ||
// only callback in error for non-virtual query as | ||
// server reporting C* 4.0 may not actually implement | ||
// virtual tables. | ||
if (!q.virtual) { | ||
return cb(err); | ||
} | ||
return cb(); | ||
} | ||
for (let i = 0; i < result.rows.length; i++) { | ||
const ksInfo = self._parseKeyspace(result.rows[i], q.virtual); | ||
keyspaces[ksInfo.name] = ksInfo; | ||
} | ||
cb(); | ||
}); | ||
}, (err) => { | ||
callback(err, keyspaces); | ||
}); | ||
}; | ||
/** @override */ | ||
SchemaParserV3.prototype.getKeyspace = function (name, callback) { | ||
this._getKeyspace(_selectSingleKeyspaceV2, name, false, (err, ks) => { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (!ks) { | ||
// if not found, attempt to retrieve as virtual keyspace. | ||
return this._getKeyspace(_selectSingleVirtualKeyspace, name, true, callback); | ||
} | ||
return callback(null, ks); | ||
}); | ||
}; | ||
SchemaParserV3.prototype._getKeyspace = function (query, name, virtual, callback) { | ||
this.cc.query(util.format(query, name), (err, result) => { | ||
if (err) { | ||
// only callback in error for non-virtual query as | ||
// server reporting C* 4.0 may not actually implement | ||
// virtual tables. | ||
if (!virtual) { | ||
return callback(err); | ||
} | ||
return callback(null, null); | ||
} | ||
const row = result.rows[0]; | ||
if (!row) { | ||
return callback(null, null); | ||
} | ||
callback(null, this._parseKeyspace(row, virtual)); | ||
}); | ||
}; | ||
/** | ||
* Upon migration from thrift to CQL, we internally create a pair of surrogate clustering/regular columns | ||
@@ -1194,2 +1401,3 @@ * for compact static tables. These columns shouldn't be exposed to the user but are currently returned by C*. | ||
* provided Cassandra version | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc The control connection to be used | ||
@@ -1201,9 +1409,11 @@ * @param {Function} udtResolver The function to be used to retrieve the udts. | ||
*/ | ||
function getByVersion(cc, udtResolver, version, currentInstance) { | ||
function getByVersion(options, cc, udtResolver, version, currentInstance) { | ||
let parserConstructor = SchemaParserV1; | ||
if (version && version[0] >= 3) { | ||
if (version && version[0] === 3) { | ||
parserConstructor = SchemaParserV2; | ||
} else if (version && version[0] >= 4) { | ||
parserConstructor = SchemaParserV3; | ||
} | ||
if (!currentInstance || !(currentInstance instanceof parserConstructor)){ | ||
return new parserConstructor(cc, udtResolver); | ||
return new parserConstructor(options, cc, udtResolver); | ||
} | ||
@@ -1210,0 +1420,0 @@ return currentInstance; |
@@ -30,6 +30,2 @@ /** | ||
* Returns the memtable flush period (in milliseconds) option for this table. | ||
* <p> | ||
* Note: this option is available only on Cassandra 2.x and will return 0 (no periodic | ||
* flush) when connected to 1.2 nodes. | ||
* </p> | ||
* @type {Number} | ||
@@ -41,4 +37,4 @@ */ | ||
* <p> | ||
* Note: this option is only available in Cassandra 2.0. It is deprecated in Cassandra 2.1 and above, and will | ||
* therefore return {@code null} for 2.1 nodes. | ||
* Note: this option is only available in Apache Cassandra 2.0. It is deprecated in Apache Cassandra 2.1 and | ||
* above, and will therefore return <code>null</code> for 2.1 nodes. | ||
* </p> | ||
@@ -64,2 +60,8 @@ * @type {Number|null} | ||
this.cdc = null; | ||
/** | ||
* Determines whether the table is a virtual table or not. | ||
* @type {Boolean} | ||
*/ | ||
this.virtual = false; | ||
} | ||
@@ -66,0 +68,0 @@ |
@@ -99,3 +99,3 @@ /** | ||
const message = util.format('The host %s did not reply before timeout %d ms', address, millis); | ||
self._markAsTimedOut(new errors.OperationTimedOutError(message), onResponse); | ||
self._markAsTimedOut(new errors.OperationTimedOutError(message, address), onResponse); | ||
}, millis); | ||
@@ -102,0 +102,0 @@ } |
@@ -109,4 +109,8 @@ /** | ||
* @param {?String} [localDc] local datacenter name. | ||
* @param {Number} [usedHostsPerRemoteDc] the number of host per remote datacenter that the policy will yield \ | ||
* in a newQueryPlan after the local nodes. | ||
* @param {Number} [usedHostsPerRemoteDc] Deprecated: the number of host per remote datacenter that the policy will | ||
* yield in a newQueryPlan after the local nodes. | ||
* <p> | ||
* Note that this parameter is deprecated and will be removed in the next major version. Handling data center | ||
* outages is better suited at a service level rather than within an application client. | ||
* </p> | ||
* @extends {LoadBalancingPolicy} | ||
@@ -113,0 +117,0 @@ * @constructor |
@@ -252,2 +252,6 @@ /** | ||
* @property {useCurrentHost} [useCurrentHost] Determines if it should use the same host to retry the request. | ||
* <p> | ||
* In the case that the current host is not available anymore, it will be retried on the next host even when | ||
* <code>useCurrentHost</code> is set to <code>true</code>. | ||
* </p> | ||
*/ | ||
@@ -254,0 +258,0 @@ |
@@ -196,3 +196,4 @@ /** | ||
} | ||
RequestHandler.borrowFromHost(host, keyspace, function borrowCallback(err, connection) { | ||
host.borrowConnection(keyspace, null, function borrowCallback(err, connection) { | ||
if (err) { | ||
@@ -223,3 +224,4 @@ return callback(err); | ||
} | ||
RequestHandler.borrowFromHost(host, keyspace, function borrowCallback(err, connection) { | ||
host.borrowConnection(keyspace, null, function borrowCallback(err, connection) { | ||
if (err) { | ||
@@ -226,0 +228,0 @@ // Don't mind about issues with the pool in this case |
@@ -16,5 +16,5 @@ /** | ||
const retryOnNextHostDecision = Object.freeze({ | ||
const retryOnCurrentHost = Object.freeze({ | ||
decision: retry.RetryPolicy.retryDecision.retry, | ||
useCurrentHost: false, | ||
useCurrentHost: true, | ||
consistency: undefined | ||
@@ -168,4 +168,6 @@ }); | ||
if (err.requestNotWritten) { | ||
// The request was definitely not applied, it's safe to retry | ||
return retryOnNextHostDecision; | ||
// The request was definitely not applied, it's safe to retry. | ||
// Retry on the current host as there might be other connections open, in case it fails to obtain a connection | ||
// on the current host, the driver will immediately retry on the next host. | ||
return retryOnCurrentHost; | ||
} | ||
@@ -209,4 +211,6 @@ return onRequestError(); | ||
} | ||
this._parent.log('info', 'Retrying request'); | ||
this._retryCount++; | ||
if (meta || (typeof consistency === 'number' && this._request.consistency !== consistency)) { | ||
@@ -223,6 +227,24 @@ this._request = this._request.clone(); | ||
} | ||
if (useCurrentHost !== false) { | ||
// Use existing host (default) | ||
return this._sendOnConnection(); | ||
// Use existing host (default). | ||
const keyspace = this._parent.client.keyspace; | ||
// Reusing the existing connection is suitable for the most common scenarios, like server read timeouts that | ||
// will be fixed with a new request. | ||
// To cover all scenarios (e.g., where a different connection to the same host might mean something different), | ||
// we obtain a new connection from the host pool. | ||
// When there was a socket error, the connection provided was already removed from the pool earlier. | ||
return this._host.borrowConnection(keyspace, this._connection, (err, connection) => { | ||
if (err) { | ||
// All connections are busy (`BusyConnectionError`) or there isn't a ready connection in the pool (`Error`) | ||
// The retry policy declared the intention to retry on the current host but its not available anymore. | ||
// Use the next host | ||
return this.start(); | ||
} | ||
this._connection = connection; | ||
this._sendOnConnection(); | ||
}); | ||
} | ||
// Use the next host in the query plan to send the request | ||
@@ -232,4 +254,2 @@ this.start(); | ||
/** | ||
@@ -236,0 +256,0 @@ * Issues a PREPARE request on the current connection. |
@@ -63,3 +63,3 @@ /** | ||
RequestHandler.borrowFromHost(host, keyspace, function borrowFromHostCallback(err, connection) { | ||
host.borrowConnection(keyspace, null, function borrowFromHostCallback(err, connection) { | ||
if (err) { | ||
@@ -80,26 +80,2 @@ triedHosts[host.address] = err; | ||
/** | ||
* Borrows a connection from the provided host, changing the current keyspace, if necessary. | ||
* @param {Host} host | ||
* @param {String} keyspace | ||
* @param {Function} callback | ||
*/ | ||
static borrowFromHost(host, keyspace, callback) { | ||
host.borrowConnection(function (err, connection) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (!keyspace || keyspace === connection.keyspace) { | ||
// Connection is ready to be used | ||
return callback(null, connection); | ||
} | ||
connection.changeKeyspace(keyspace, function (err) { | ||
if (err) { | ||
return callback(err, connection); | ||
} | ||
callback(null, connection); | ||
}); | ||
}); | ||
} | ||
/** | ||
* Gets the next host from the query plan. | ||
@@ -164,11 +140,18 @@ * @param {Iterator} iterator | ||
} | ||
this._callback = callback; | ||
const self = this; | ||
this.loadBalancingPolicy.newQueryPlan(this.client.keyspace, this.requestOptions, function newPlanCb(err, iterator) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
self._hostIterator = iterator; | ||
self._callback = callback; | ||
if (this.requestOptions.host) { | ||
// if host is configured bypass load balancing policy and use | ||
// a single host plan. | ||
self._hostIterator = utils.arrayIterator([self.requestOptions.host]); | ||
self._startNewExecution(); | ||
}); | ||
} else { | ||
this.loadBalancingPolicy.newQueryPlan(this.client.keyspace, this.requestOptions, function newPlanCb(err, iterator) { | ||
if (err) { | ||
return self._callback(err); | ||
} | ||
self._hostIterator = iterator; | ||
self._startNewExecution(); | ||
}); | ||
} | ||
} | ||
@@ -175,0 +158,0 @@ |
{ | ||
"name": "dse-driver", | ||
"version": "1.6.0", | ||
"version": "1.7.0", | ||
"description": "DataStax Enterprise Node.js Driver", | ||
@@ -24,8 +24,5 @@ "author": "DataStax", | ||
"devDependencies": { | ||
"mocha": "~2.5.3", | ||
"rewire": ">= 2.1.0", | ||
"temp": ">= 0.8.3", | ||
"mocha-jenkins-reporter": ">= 0.1.9", | ||
"mocha-appveyor-reporter": ">= 0.2.1", | ||
"mocha-multi": "~0.11.0" | ||
"mocha": "~5.2.0", | ||
"rewire": "^4.0.1", | ||
"temp": ">= 0.8.3" | ||
}, | ||
@@ -37,11 +34,11 @@ "homepage": "http://docs.datastax.com/en/developer/nodejs-driver-dse/latest/", | ||
"license": "SEE LICENSE IN http://www.datastax.com/terms/datastax-dse-driver-license-terms", | ||
"scripts": { | ||
"scripts": { | ||
"test": "./node_modules/.bin/mocha test/unit -R spec -t 5000 --recursive", | ||
"ci": "./node_modules/.bin/mocha test/unit test/integration/short --recursive -R mocha-jenkins-reporter", | ||
"ci_unit": "./node_modules/.bin/mocha test/unit -R mocha-multi", | ||
"ci_jenkins": "./node_modules/.bin/mocha test/unit test/integration/short -R mocha-jenkins-reporter --recursive --exit", | ||
"ci_unit_appveyor": "./node_modules/.bin/mocha test/unit -R mocha-appveyor-reporter --recursive --exit", | ||
"eslint": "eslint lib test --ignore-pattern '/lib/types/integer.js'" | ||
}, | ||
"engines": { | ||
"node" : ">=4" | ||
} | ||
"node": ">=4" | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
759713
3
21908
6