dse-driver
Advanced tools
Comparing version 1.6.0 to 1.7.0
@@ -236,5 +236,10 @@ /** | ||
if (userOptions.host && !(userOptions.host instanceof Host)) { | ||
throw new TypeError('host must be an instance of Host'); | ||
} | ||
if (!allowContinuousPaging && userOptions.continuousPaging) { | ||
return new errors.NotSupportedError('Continuous paging not allowed, use stream() method instead'); | ||
} | ||
// Using fixed property names is 2 order of magnitude faster than dynamically shallow clone objects | ||
@@ -251,2 +256,3 @@ const result = { | ||
hints: userOptions.hints, | ||
host: userOptions.host, | ||
isIdempotent: ifUndefined(userOptions.isIdempotent, defaultQueryOptions.isIdempotent), | ||
@@ -369,2 +375,4 @@ keyspace: userOptions.keyspace, | ||
exports.continuousPageDefaultSize = continuousPageDefaultSize; | ||
exports.continuousPageDefaultHighWaterMark = continuousPageDefaultHighWaterMark; | ||
exports.continuousPageDefaultHighWaterMark = continuousPageDefaultHighWaterMark; | ||
const Host = require('./host').Host; |
@@ -223,2 +223,23 @@ /** | ||
* <p>For batch queries, an array of such arrays, ordered as with the queries in the batch.</p> | ||
* @property {Host} [host] The host that should handle the query. | ||
* <p> | ||
* Use of this option is <em>heavily discouraged</em> and should only be used in the following cases: | ||
* </p> | ||
* <ol> | ||
* <li> | ||
* Querying node-local tables, such as tables in the <code>system</code> and <code>system_views</code> | ||
* keyspaces. | ||
* </li> | ||
* <li> | ||
* Applying a series of schema changes, where it may be advantageous to execute schema changes in sequence on the | ||
* same node. | ||
* </li> | ||
* </ol> | ||
* <p> | ||
* Configuring a specific host causes the configured | ||
* [LoadBalancingPolicy]{@link module:policies/loadBalancing~LoadBalancingPolicy} to be completely bypassed. | ||
* However, if the load balancing policy dictates that the host is at a | ||
* [distance of ignored]{@link module:types~distance} or there is no active connectivity to the host, the request will | ||
* fail with a [NoHostAvailableError]{@link module:errors~NoHostAvailableError}. | ||
* </p> | ||
* @property {Boolean} [isIdempotent] Defines whether the query can be applied multiple times without changing the result | ||
@@ -225,0 +246,0 @@ * beyond the initial application. |
@@ -251,3 +251,4 @@ /** | ||
ControlConnection.prototype.borrowHostConnection = function (host, callback) { | ||
host.borrowConnection(callback); | ||
// Borrow any open connection, regardless of the keyspace | ||
host.borrowConnection(null, null, callback); | ||
}; | ||
@@ -257,6 +258,7 @@ | ||
* Gets the info from local and peer metadata, reloads the keyspaces metadata and rebuilds tokens. | ||
* @param {Boolean} newNodesUp | ||
* @param {Boolean} initializing Determines whether this function was called in order to initialize the control | ||
* connection the first time. | ||
* @param {Function} [callback] | ||
*/ | ||
ControlConnection.prototype.refreshHosts = function (newNodesUp, callback) { | ||
ControlConnection.prototype.refreshHosts = function (initializing, callback) { | ||
callback = callback || utils.noop; | ||
@@ -289,3 +291,3 @@ // it's possible that this was called as a result of a topology change, but the connection was lost | ||
c.sendStream(request, null, function (err, result) { | ||
self.setPeersInfo(newNodesUp, err, result, next); | ||
self.setPeersInfo(initializing, err, result, next); | ||
}); | ||
@@ -626,3 +628,4 @@ }, | ||
/** | ||
* @param {Boolean} newNodesUp | ||
* @param {Boolean} initializing Determines whether this function was called in order to initialize the control | ||
* connection the first time. | ||
* @param {Error} err | ||
@@ -632,3 +635,3 @@ * @param {ResultSet} result | ||
*/ | ||
ControlConnection.prototype.setPeersInfo = function (newNodesUp, err, result, callback) { | ||
ControlConnection.prototype.setPeersInfo = function (initializing, err, result, callback) { | ||
if (!result || !result.rows || err) { | ||
@@ -646,12 +649,13 @@ return callback(err); | ||
} | ||
peers[endPoint] = true; | ||
let host = self.hosts.get(endPoint); | ||
if (!host) { | ||
let isNewHost = !host; | ||
if (isNewHost) { | ||
host = new Host(endPoint, self.protocolVersion, self.options, self.metadata); | ||
if (!newNodesUp) { | ||
host.setDown(); | ||
} | ||
self.log('info', 'Adding host ' + endPoint); | ||
self.hosts.set(endPoint, host); | ||
isNewHost = true; | ||
} | ||
host.datacenter = row['data_center']; | ||
@@ -663,2 +667,16 @@ host.rack = row['rack']; | ||
if (isNewHost) { | ||
// Add it to the map (and trigger events) after all the properties | ||
// were set to avoid race conditions | ||
self.hosts.set(endPoint, host); | ||
if (!initializing) { | ||
// Set the distance at Host level, that way the connection pool is created with the correct settings | ||
self.profileManager.getDistance(host); | ||
// When we are not initializing, we start with the node set as DOWN | ||
host.setDown(); | ||
} | ||
} | ||
next(); | ||
@@ -665,0 +683,0 @@ }); |
@@ -71,2 +71,9 @@ /** | ||
const unsetValueBuffer = utils.allocBufferFromArray([255, 255, 255, 254]); | ||
const zeroLengthTypesSupported = new Set([ | ||
dataTypes.text, | ||
dataTypes.ascii, | ||
dataTypes.varchar, | ||
dataTypes.custom, | ||
dataTypes.blob | ||
]); | ||
@@ -217,3 +224,3 @@ /** | ||
offset += self.collectionLengthSize; | ||
if (valueLength < 0) { | ||
if (valueLength < 0 || (valueLength === 0 && !zeroLengthTypesSupported.has(subtypes[1].code))) { | ||
callback.call(thisArg, key, null); | ||
@@ -220,0 +227,0 @@ continue; |
@@ -110,8 +110,16 @@ /** | ||
* {@link ClientOptions.socketOptions.readTimeout}. | ||
* @param {String} message The error message. | ||
* @param {String} [host] Address of the server host that caused the operation to time out. | ||
* @constructor | ||
*/ | ||
function OperationTimedOutError(message) { | ||
function OperationTimedOutError(message, host) { | ||
DriverError.call(this, message, this.constructor); | ||
this.info = 'Represents a client-side error that is raised when the client did not hear back from the server ' + | ||
'within socketOptions.readTimeout'; | ||
/** | ||
* When defined, it gets the address of the host that caused the operation to time out. | ||
* @type {String|undefined} | ||
*/ | ||
this.host = host; | ||
} | ||
@@ -118,0 +126,0 @@ |
@@ -14,3 +14,3 @@ /** | ||
const errors = require('./errors'); | ||
const defaultOptions = require('./client-options').defaultOptions(); | ||
const clientOptions = require('./client-options'); | ||
@@ -21,2 +21,4 @@ // Used to get the index of the connection with less in-flight requests | ||
let defaultOptions; | ||
/** | ||
@@ -74,5 +76,6 @@ * Represents the possible states of the pool. | ||
* Borrows a connection from the pool. | ||
* @param {String} keyspace | ||
* @param {Function} callback | ||
*/ | ||
borrowConnection(callback) { | ||
createAndBorrowConnection(keyspace, callback) { | ||
this.create(false, err => { | ||
@@ -82,14 +85,32 @@ if (err) { | ||
} | ||
if (this.connections.length === 0) { | ||
// Normally it should have callback in error, but better to handle this possibility | ||
return callback(new Error('No connection available')); | ||
} | ||
const maxRequests = this.options.pooling.maxRequestsPerConnection; | ||
const c = HostConnectionPool.minInFlight(this.connections, maxRequests); | ||
this.borrowConnection(keyspace, null, callback); | ||
}); | ||
} | ||
if (c.getInFlight() >= maxRequests) { | ||
return callback(new errors.BusyConnectionError(this._address, maxRequests, this.connections.length)); | ||
} | ||
callback(null, c); | ||
/** | ||
* Tries to borrow one of the existing connections from the pool. | ||
* @param {Connection} previousConnection When provided, the pool should try to provide a different connection. | ||
* @param {String} keyspace | ||
* @param {Function} callback | ||
*/ | ||
borrowConnection(keyspace, previousConnection, callback) { | ||
if (this.connections.length === 0) { | ||
return callback(new Error('No connection available')); | ||
} | ||
const maxRequests = this.options.pooling.maxRequestsPerConnection; | ||
const c = HostConnectionPool.minInFlight(this.connections, maxRequests, previousConnection); | ||
if (c.getInFlight() >= maxRequests) { | ||
return callback(new errors.BusyConnectionError(this._address, maxRequests, this.connections.length)); | ||
} | ||
if (!keyspace || keyspace === c.keyspace) { | ||
// Connection is ready to be used | ||
return callback(null, c); | ||
} | ||
c.changeKeyspace(keyspace, (err) => { | ||
callback(err, c); | ||
}); | ||
@@ -104,5 +125,6 @@ } | ||
* @param {Number} maxRequests | ||
* @param {Connection} previousConnection | ||
* @returns {Connection} | ||
*/ | ||
static minInFlight(connections, maxRequests) { | ||
static minInFlight(connections, maxRequests, previousConnection) { | ||
const length = connections.length; | ||
@@ -122,6 +144,17 @@ if (length === 1) { | ||
current = connections[index % length]; | ||
const next = connections[(index + 1) % length]; | ||
if (current === previousConnection) { | ||
// Increment the index and skip | ||
current = connections[(++index) % length]; | ||
} | ||
let next = connections[(index + 1) % length]; | ||
if (next === previousConnection) { | ||
// Skip | ||
next = connections[(index + 2) % length]; | ||
} | ||
if (next.getInFlight() < current.getInFlight()) { | ||
current = next; | ||
} | ||
if (current.getInFlight() < maxRequests) { | ||
@@ -376,3 +409,3 @@ // Check as few connections as possible, as long as the amount of in-flight | ||
// Check that after sometime (readTimeout + 100ms) the connections have been drained | ||
const delay = (this.options.socketOptions.readTimeout || defaultOptions.socketOptions.readTimeout) + 100; | ||
const delay = (this.options.socketOptions.readTimeout || getDefaultOptions().socketOptions.readTimeout) + 100; | ||
checkShutdownTimeout = setTimeout(function checkShutdown() { | ||
@@ -454,2 +487,10 @@ wasClosed = true; | ||
/** Lazily loads the default options */ | ||
function getDefaultOptions() { | ||
if (defaultOptions === undefined) { | ||
defaultOptions = clientOptions.defaultOptions(); | ||
} | ||
return defaultOptions; | ||
} | ||
module.exports = HostConnectionPool; |
@@ -88,2 +88,3 @@ /** | ||
this.reconnectionSchedule = this.options.policies.reconnection.newSchedule(); | ||
this.reconnectionDelay = 0; | ||
} | ||
@@ -94,3 +95,4 @@ | ||
/** | ||
* Marks this host as not available for query coordination. | ||
* Marks this host as not available for query coordination, when the host was previously marked as UP, otherwise its | ||
* a no-op. | ||
* @internal | ||
@@ -100,3 +102,3 @@ * @ignore | ||
Host.prototype.setDown = function() { | ||
// multiple events signaling that a host is failing could cause multiple calls to this method | ||
// Multiple events signaling that a host is failing could cause multiple calls to this method | ||
if (this.setDownAt !== 0) { | ||
@@ -106,2 +108,3 @@ // the host is already marked as Down | ||
} | ||
if (this.pool.isClosing()) { | ||
@@ -111,10 +114,12 @@ // the pool is being closed/shutdown, don't mind | ||
} | ||
this.setDownAt = Date.now(); | ||
if (this._distance !== types.distance.ignored) { | ||
this.log('warning', | ||
util.format('Host %s considered as DOWN. Reconnection delay %dms.', this.address, this.reconnectionDelay)); | ||
if (this.pool.coreConnectionsLength > 0) { | ||
// According to the distance, there should be connections open to it => issue a warning | ||
this.log('warning', `Host ${this.address} considered as DOWN. Reconnection delay ${this.reconnectionDelay}ms.`); | ||
} else { | ||
this.log('info', `Host ${this.address} considered as DOWN.`); | ||
} | ||
else { | ||
this.log('info', util.format('Host %s considered as DOWN.', this.address)); | ||
} | ||
this.emit('down'); | ||
@@ -239,2 +244,6 @@ this._checkPoolState(); | ||
* If there isn't an available connections, it will open a new one according to the pooling options. | ||
* @param {String} keyspace The keyspace that the connection must be using. When the keyspace provided is null, no | ||
* keyspace check is performed. | ||
* @param {Connection} previousConnection The previous connection. When provided, the pool should try to provide a | ||
* different connection. | ||
* @param {Function} callback | ||
@@ -244,4 +253,9 @@ * @internal | ||
*/ | ||
Host.prototype.borrowConnection = function (callback) { | ||
this.pool.borrowConnection(callback); | ||
Host.prototype.borrowConnection = function (keyspace, previousConnection, callback) { | ||
if (previousConnection) { | ||
// Obtain one of the existing connections | ||
return this.pool.borrowConnection(keyspace, previousConnection, callback); | ||
} | ||
this.pool.createAndBorrowConnection(keyspace, callback); | ||
}; | ||
@@ -305,4 +319,5 @@ | ||
} | ||
if (this.pool.connections.length < this.pool.coreConnectionsLength) { | ||
// the pool still needs to grow | ||
// the pool needs to grow / reconnect | ||
if (!this.pool.hasScheduledNewConnection()) { | ||
@@ -313,5 +328,7 @@ this.reconnectionDelay = this.reconnectionSchedule.next().value; | ||
} | ||
if (this._distance !== types.distance.ignored && | ||
this.pool.connections.length === 0 && | ||
this.pool.coreConnectionsLength > 0) { | ||
const shouldHaveConnections = this._distance !== types.distance.ignored && this.pool.coreConnectionsLength > 0; | ||
if (shouldHaveConnections && this.pool.connections.length === 0) { | ||
// Mark as DOWN, if its UP | ||
this.setDown(); | ||
@@ -318,0 +335,0 @@ } |
@@ -70,3 +70,4 @@ /** | ||
/** | ||
* Specifies the probability with which read repairs should be invoked on non-quorum reads. The value must be between 0 and 1. | ||
* Specifies the probability with which read repairs should be invoked on non-quorum reads. The value must be | ||
* between 0 and 1. | ||
* @type {number} | ||
@@ -78,3 +79,3 @@ */ | ||
* <p> | ||
* For Cassandra versions prior to 3.0.0, this method always returns {@code null}. | ||
* For Apache Cassandra versions prior to 3.0.0, this method always returns <code>null</code>. | ||
* </p> | ||
@@ -89,3 +90,3 @@ * @type {Object} | ||
* <p> | ||
* For Cassandra versions prior to 3.0.0, this method always returns {@code null}. | ||
* For Apache Cassandra versions prior to 3.0.0, this method always returns <code>null</code>. | ||
* </p> | ||
@@ -102,5 +103,2 @@ * @type {Number|null} | ||
* Returns the default TTL for this table. | ||
* <p> | ||
* Note: this option is not available in Cassandra 1.2 and will return 0 (no default TTL) when connected to 1.2 nodes. | ||
* </p> | ||
* @type {Number} | ||
@@ -111,6 +109,2 @@ */ | ||
* * Returns the speculative retry option for this table. | ||
* <p> | ||
* Note: this option is not available in Cassandra 1.2 and will return "NONE" (no speculative retry) when connected | ||
* to 1.2 nodes. | ||
* </p> | ||
* @type {String} | ||
@@ -122,3 +116,4 @@ */ | ||
* <p> | ||
* Note: this option is available in Cassandra 2.1 and above, and will return {@code null} for earlier versions. | ||
* Note: this option is available in Apache Cassandra 2.1 and above, and will return <code>null</code> for | ||
* earlier versions. | ||
* </p> | ||
@@ -131,3 +126,4 @@ * @type {Number|null} | ||
* <p> | ||
* Note: this option is available in Cassandra 2.1 and above, and will return {@code null} for earlier versions. | ||
* Note: this option is available in Apache Cassandra 2.1 and above, and will return <code>null</code> for | ||
* earlier versions. | ||
* </p> | ||
@@ -134,0 +130,0 @@ * @type {Number|null} |
@@ -68,3 +68,3 @@ /** | ||
this.initialized = false; | ||
this._schemaParser = schemaParserFactory.getByVersion(controlConnection, this.getUdt.bind(this)); | ||
this._schemaParser = schemaParserFactory.getByVersion(options, controlConnection, this.getUdt.bind(this)); | ||
const self = this; | ||
@@ -84,3 +84,3 @@ this._preparedQueries = new PreparedQueries(options.maxPrepared, function () { | ||
this._schemaParser = schemaParserFactory.getByVersion( | ||
this.controlConnection, this.getUdt.bind(this), version, this._schemaParser); | ||
this.options, this.controlConnection, this.getUdt.bind(this), version, this._schemaParser); | ||
}; | ||
@@ -512,2 +512,3 @@ | ||
let cache; | ||
let virtual; | ||
if (this.options.isMetadataSyncEnabled) { | ||
@@ -519,4 +520,5 @@ const keyspace = this.keyspaces[keyspaceName]; | ||
cache = keyspace.tables; | ||
virtual = keyspace.virtual; | ||
} | ||
this._schemaParser.getTable(keyspaceName, name, cache, callback); | ||
this._schemaParser.getTable(keyspaceName, name, cache, virtual, callback); | ||
}; | ||
@@ -523,0 +525,0 @@ |
@@ -23,3 +23,3 @@ /** | ||
* @param {String} target | ||
* @param {Number} kind | ||
* @param {Number|String} kind | ||
* @param {Object} options | ||
@@ -44,3 +44,3 @@ * @alias module:metadata~Index | ||
*/ | ||
this.kind = kind; | ||
this.kind = typeof kind === 'string' ? getKindByName(kind) : kind; | ||
/** | ||
@@ -79,2 +79,3 @@ * An associative array containing the index options | ||
* Parses Index information from rows in the 'system_schema.indexes' table | ||
* @deprecated It will be removed in the next major version. | ||
* @param {Array.<Row>} indexRows | ||
@@ -94,3 +95,4 @@ * @returns {Array.<Index>} | ||
/** | ||
* Parses Index information from rows in the legacy 'system.schema_columns' table | ||
* Parses Index information from rows in the legacy 'system.schema_columns' table. | ||
* @deprecated It will be removed in the next major version. | ||
* @param {Array.<Row>} columnRows | ||
@@ -97,0 +99,0 @@ * @param {Object.<String, {name, type}>} columnsByName |
@@ -40,4 +40,11 @@ /** | ||
const _selectAllVirtualKeyspaces = "SELECT * FROM system_virtual_schema.keyspaces"; | ||
const _selectSingleVirtualKeyspace = "SELECT * FROM system_virtual_schema.keyspaces where keyspace_name = '%s'"; | ||
const _selectVirtualTable = "SELECT * FROM system_virtual_schema.tables where keyspace_name = '%s' and table_name='%s'"; | ||
const _selectVirtualColumns = "SELECT * FROM system_virtual_schema.columns where keyspace_name = '%s' and table_name='%s'"; | ||
/** | ||
* @abstract | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc | ||
@@ -47,4 +54,5 @@ * @constructor | ||
*/ | ||
function SchemaParser(cc) { | ||
function SchemaParser(options, cc) { | ||
this.cc = cc; | ||
this.encodingOptions = options.encoding; | ||
this.selectTable = null; | ||
@@ -56,2 +64,3 @@ this.selectColumns = null; | ||
this.selectFunctions = null; | ||
this.supportsVirtual = false; | ||
} | ||
@@ -64,6 +73,7 @@ | ||
* @param strategyOptions | ||
* @param virtual | ||
* @returns {{name, durableWrites, strategy, strategyOptions, tokenToReplica, udts, tables, functions, aggregates}|null} | ||
* @protected | ||
*/ | ||
SchemaParser.prototype._createKeyspace = function (name, durableWrites, strategy, strategyOptions) { | ||
SchemaParser.prototype._createKeyspace = function (name, durableWrites, strategy, strategyOptions, virtual) { | ||
const ksInfo = { | ||
@@ -75,2 +85,3 @@ name: name, | ||
tokenToReplica: null, | ||
virtual: virtual === true, | ||
udts: {}, | ||
@@ -106,5 +117,6 @@ tables: {}, | ||
* @param {Object} cache | ||
* @param {Boolean} virtual | ||
* @param {Function} callback | ||
*/ | ||
SchemaParser.prototype.getTable = function (keyspaceName, name, cache, callback) { | ||
SchemaParser.prototype.getTable = function (keyspaceName, name, cache, virtual, callback) { | ||
let tableInfo = cache && cache[name]; | ||
@@ -129,5 +141,7 @@ if (!tableInfo) { | ||
const self = this; | ||
let virtualTable = virtual; | ||
utils.series([ | ||
function getTableRow(next) { | ||
const query = util.format(self.selectTable, keyspaceName, name); | ||
const selectTable = virtualTable ? _selectVirtualTable : self.selectTable; | ||
const query = util.format(selectTable, keyspaceName, name); | ||
self.cc.query(query, function (err, response) { | ||
@@ -141,7 +155,30 @@ if (err) { | ||
}, | ||
function getVirtualTableRow(next) { | ||
// if we weren't sure if table was virtual or not, query virtual schema. | ||
if (!tableRow && self.supportsVirtual && virtualTable === undefined) { | ||
const query = util.format(_selectVirtualTable, keyspaceName, name); | ||
self.cc.query(query, function (err, response) { | ||
if (err) { | ||
// we can't error here as we can't be sure if the node | ||
// supports virtual tables, in this case it is adequate | ||
// to act as if there was no matching table. | ||
return next(); | ||
} | ||
tableRow = response.rows[0]; | ||
// if we got a result, this is a virtual table | ||
if (tableRow) { | ||
virtualTable = true; | ||
} | ||
next(); | ||
}); | ||
} else { | ||
return next(); | ||
} | ||
}, | ||
function getColumnRows (next) { | ||
if (!tableRow) { | ||
return next(null, null, null); | ||
return next(); | ||
} | ||
const query = util.format(self.selectColumns, keyspaceName, name); | ||
const selectColumns = virtualTable ? _selectVirtualColumns : self.selectColumns; | ||
const query = util.format(selectColumns, keyspaceName, name); | ||
self.cc.query(query, function (err, response) { | ||
@@ -156,3 +193,3 @@ if (err) { | ||
function getIndexes(next) { | ||
if (!tableRow || !self.selectIndexes) { | ||
if (!tableRow || !self.selectIndexes || virtualTable) { | ||
//either the table does not exists or it does not support indexes schema table | ||
@@ -175,3 +212,3 @@ return next(); | ||
} | ||
self._parseTableOrView(tableInfo, tableRow, columnRows, indexRows, function (err) { | ||
self._parseTableOrView(tableInfo, tableRow, columnRows, indexRows, virtualTable, function (err) { | ||
tableInfo.loading = false; | ||
@@ -250,6 +287,7 @@ tableInfo.loaded = !err; | ||
* @param {Array.<Row>} indexRows | ||
* @param {Boolean} virtual | ||
* @param {Function} callback | ||
* @throws {Error} | ||
*/ | ||
SchemaParser.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, callback) { | ||
SchemaParser.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, virtual, callback) { | ||
}; | ||
@@ -347,4 +385,33 @@ | ||
/** @returns {Map} */ | ||
SchemaParser.prototype._asMap = function (obj) { | ||
if (!obj) { | ||
return new Map(); | ||
} | ||
if (this.encodingOptions.map && obj instanceof this.encodingOptions.map) { | ||
// Its already a Map or a polyfill of a Map | ||
return obj; | ||
} | ||
return new Map(Object.keys(obj).map(k => [ k, obj[k]])); | ||
}; | ||
SchemaParser.prototype._mapAsObject = function (map) { | ||
if (!map) { | ||
return map; | ||
} | ||
if (this.encodingOptions.map && map instanceof this.encodingOptions.map) { | ||
const result = {}; | ||
map.forEach((value, key) => result[key] = value); | ||
return result; | ||
} | ||
return map; | ||
}; | ||
/** | ||
* Used to parse schema information for Cassandra versions 1.2.x, and 2.x | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc | ||
@@ -354,4 +421,4 @@ * @constructor | ||
*/ | ||
function SchemaParserV1(cc) { | ||
SchemaParser.call(this, cc); | ||
function SchemaParserV1(options, cc) { | ||
SchemaParser.call(this, options, cc); | ||
this.selectTable = _selectTableV1; | ||
@@ -407,3 +474,3 @@ this.selectColumns = _selectColumnsV1; | ||
/** @override */ | ||
SchemaParserV1.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, callback) { | ||
SchemaParserV1.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, virtual, callback) { | ||
let i, c, name, types; | ||
@@ -629,2 +696,3 @@ const encoder = this.cc.getEncoder(); | ||
* Used to parse schema information for Cassandra versions 3.x and above | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc The control connection to be used | ||
@@ -635,4 +703,4 @@ * @param {Function} udtResolver The function to be used to retrieve the udts. | ||
*/ | ||
function SchemaParserV2(cc, udtResolver) { | ||
SchemaParser.call(this, cc); | ||
function SchemaParserV2(options, cc, udtResolver) { | ||
SchemaParser.call(this, options, cc); | ||
this.udtResolver = udtResolver; | ||
@@ -730,3 +798,3 @@ this.selectTable = _selectTableV2; | ||
} | ||
self._parseTableOrView(viewInfo, tableRow, columnRows, null, function (err) { | ||
self._parseTableOrView(viewInfo, tableRow, columnRows, null, false, function (err) { | ||
viewInfo.loading = false; | ||
@@ -740,3 +808,3 @@ viewInfo.loaded = !err; | ||
SchemaParserV2.prototype._parseKeyspace = function (row) { | ||
SchemaParserV2.prototype._parseKeyspace = function (row, virtual) { | ||
const replication = row['replication']; | ||
@@ -759,7 +827,8 @@ let strategy; | ||
strategy, | ||
strategyOptions); | ||
strategyOptions, | ||
virtual); | ||
}; | ||
/** @override */ | ||
SchemaParserV2.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, callback) { | ||
SchemaParserV2.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, virtual, callback) { | ||
const encoder = this.cc.getEncoder(); | ||
@@ -769,2 +838,55 @@ const columnsKeyed = {}; | ||
const clusteringKeys = []; | ||
const self = this; | ||
// maps column rows to columnInfo and also populates columnsKeyed | ||
const columnRowMapper = (row, next) => { | ||
encoder.parseTypeName(tableRow['keyspace_name'], row['type'], 0, null, self.udtResolver, function (err, type) { | ||
if (err) { | ||
return next(err); | ||
} | ||
const c = { | ||
name: row['column_name'], | ||
type: type, | ||
isStatic: false | ||
}; | ||
columnsKeyed[c.name] = c; | ||
switch (row['kind']) { | ||
case 'partition_key': | ||
partitionKeys.push({ c: c, index: (row['position'] || 0)}); | ||
break; | ||
case 'clustering': | ||
clusteringKeys.push({ c: c, index: (row['position'] || 0), order: row['clustering_order'] === 'desc' ? 'DESC' : 'ASC'}); | ||
break; | ||
case 'static': | ||
c.isStatic = true; | ||
break; | ||
} | ||
next(null, c); | ||
}); | ||
}; | ||
// is table is virtual, the only relevant information to parse is the columns as the table itself has no configuration. | ||
if (virtual) { | ||
tableInfo.virtual = true; | ||
utils.map(columnRows, columnRowMapper, function (err, columns) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
tableInfo.columns = columns; | ||
tableInfo.columnsByName = columnsKeyed; | ||
tableInfo.partitionKeys = partitionKeys.sort(utils.propCompare('index')).map(function (item) { | ||
return item.c; | ||
}); | ||
clusteringKeys.sort(utils.propCompare('index')); | ||
tableInfo.clusteringKeys = clusteringKeys.map(function (item) { | ||
return item.c; | ||
}); | ||
tableInfo.clusteringOrder = clusteringKeys.map(function (item) { | ||
return item.order; | ||
}); | ||
callback(); | ||
}); | ||
return; | ||
} | ||
const isView = tableInfo instanceof MaterializedView; | ||
@@ -774,18 +896,24 @@ tableInfo.bloomFilterFalsePositiveChance = tableRow['bloom_filter_fp_chance']; | ||
tableInfo.comment = tableRow['comment']; | ||
const compaction = tableRow['compaction']; | ||
// Regardless of the encoding options, use always an Object to represent an associative Array | ||
const compaction = this._asMap(tableRow['compaction']); | ||
if (compaction) { | ||
// compactionOptions as an Object<String, String> | ||
tableInfo.compactionOptions = {}; | ||
tableInfo.compactionClass = compaction['class']; | ||
for (const key in compaction) { | ||
if (!compaction.hasOwnProperty(key) || key === 'class') { | ||
continue; | ||
tableInfo.compactionClass = compaction.get('class'); | ||
compaction.forEach((value, key) => { | ||
if (key === 'class') { | ||
return; | ||
} | ||
tableInfo.compactionOptions[key] = compaction[key]; | ||
} | ||
tableInfo.compactionOptions[key] = compaction.get(key); | ||
}); | ||
} | ||
tableInfo.compression = tableRow['compression']; | ||
// Convert compression to an Object<String, String> | ||
tableInfo.compression = this._mapAsObject(tableRow['compression']); | ||
tableInfo.gcGraceSeconds = tableRow['gc_grace_seconds']; | ||
tableInfo.localReadRepairChance = tableRow['dclocal_read_repair_chance']; | ||
tableInfo.readRepairChance = tableRow['read_repair_chance']; | ||
tableInfo.extensions = tableRow['extensions']; | ||
tableInfo.extensions = this._mapAsObject(tableRow['extensions']); | ||
tableInfo.crcCheckChance = tableRow['crc_check_chance']; | ||
@@ -804,28 +932,5 @@ tableInfo.memtableFlushPeriod = tableRow['memtable_flush_period_in_ms'] || tableInfo.memtableFlushPeriod; | ||
} | ||
const self = this; | ||
utils.map(columnRows, function (row, next) { | ||
encoder.parseTypeName(tableRow['keyspace_name'], row['type'], 0, null, self.udtResolver, function (err, type) { | ||
if (err) { | ||
return next(err); | ||
} | ||
const c = { | ||
name: row['column_name'], | ||
type: type, | ||
isStatic: false | ||
}; | ||
columnsKeyed[c.name] = c; | ||
switch (row['kind']) { | ||
case 'partition_key': | ||
partitionKeys.push({ c: c, index: (row['position'] || 0)}); | ||
break; | ||
case 'clustering': | ||
clusteringKeys.push({ c: c, index: (row['position'] || 0), order: row['clustering_order'] === 'desc' ? 'DESC' : 'ASC'}); | ||
break; | ||
case 'static': | ||
c.isStatic = true; | ||
break; | ||
} | ||
next(null, c); | ||
}); | ||
}, function (err, columns) { | ||
// Map all columns asynchronously | ||
utils.map(columnRows, columnRowMapper, (err, columns) => { | ||
if (err) { | ||
@@ -846,2 +951,3 @@ return callback(err); | ||
}); | ||
if (isView) { | ||
@@ -853,7 +959,15 @@ tableInfo.tableName = tableRow['base_table_name']; | ||
} | ||
tableInfo.indexes = Index.fromRows(indexRows); | ||
const flags = tableRow['flags']; | ||
const isDense = flags.indexOf('dense') >= 0; | ||
const isSuper = flags.indexOf('super') >= 0; | ||
const isCompound = flags.indexOf('compound') >= 0; | ||
tableInfo.indexes = this._getIndexes(indexRows); | ||
// flags can be an instance of Array or Set (real or polyfill) | ||
let flags = tableRow['flags']; | ||
if (Array.isArray(flags)) { | ||
flags = new Set(flags); | ||
} | ||
const isDense = flags.has('dense'); | ||
const isSuper = flags.has('super'); | ||
const isCompound = flags.has('compound'); | ||
tableInfo.isCompact = isSuper || isDense || !isCompound; | ||
@@ -872,2 +986,13 @@ //remove the columns related to Thrift | ||
SchemaParserV2.prototype._getIndexes = function (indexRows) { | ||
if (!indexRows || indexRows.length === 0) { | ||
return utils.emptyArray; | ||
} | ||
return indexRows.map((row) => { | ||
const options = this._mapAsObject(row['options']); | ||
return new Index(row['index_name'], options['target'], row['kind'], options); | ||
}); | ||
}; | ||
/** @override */ | ||
@@ -981,2 +1106,84 @@ SchemaParserV2.prototype._parseAggregate = function (row, callback) { | ||
/** | ||
* Used to parse schema information for Cassandra versions 4.x and above. | ||
* | ||
* This parser similar to [SchemaParserV2] expect it also parses virtual | ||
* keyspaces. | ||
* | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc The control connection to be used | ||
* @param {Function} udtResolver The function to be used to retrieve the udts. | ||
* @constructor | ||
* @ignore | ||
*/ | ||
function SchemaParserV3(options, cc, udtResolver) { | ||
SchemaParserV2.call(this, options, cc, udtResolver); | ||
this.supportsVirtual = true; | ||
} | ||
util.inherits(SchemaParserV3, SchemaParserV2); | ||
/** @override */ | ||
SchemaParserV3.prototype.getKeyspaces = function (waitReconnect, callback) { | ||
const self = this; | ||
const keyspaces = {}; | ||
const queries = [ | ||
{ query: _selectAllKeyspacesV2, virtual: false }, | ||
{ query: _selectAllVirtualKeyspaces, virtual: true } | ||
]; | ||
utils.each(queries, (q, cb) => { | ||
self.cc.query(q.query, waitReconnect, (err, result) => { | ||
if (err) { | ||
// only callback in error for non-virtual query as | ||
// server reporting C* 4.0 may not actually implement | ||
// virtual tables. | ||
if (!q.virtual) { | ||
return cb(err); | ||
} | ||
return cb(); | ||
} | ||
for (let i = 0; i < result.rows.length; i++) { | ||
const ksInfo = self._parseKeyspace(result.rows[i], q.virtual); | ||
keyspaces[ksInfo.name] = ksInfo; | ||
} | ||
cb(); | ||
}); | ||
}, (err) => { | ||
callback(err, keyspaces); | ||
}); | ||
}; | ||
/** @override */ | ||
SchemaParserV3.prototype.getKeyspace = function (name, callback) { | ||
this._getKeyspace(_selectSingleKeyspaceV2, name, false, (err, ks) => { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (!ks) { | ||
// if not found, attempt to retrieve as virtual keyspace. | ||
return this._getKeyspace(_selectSingleVirtualKeyspace, name, true, callback); | ||
} | ||
return callback(null, ks); | ||
}); | ||
}; | ||
SchemaParserV3.prototype._getKeyspace = function (query, name, virtual, callback) { | ||
this.cc.query(util.format(query, name), (err, result) => { | ||
if (err) { | ||
// only callback in error for non-virtual query as | ||
// server reporting C* 4.0 may not actually implement | ||
// virtual tables. | ||
if (!virtual) { | ||
return callback(err); | ||
} | ||
return callback(null, null); | ||
} | ||
const row = result.rows[0]; | ||
if (!row) { | ||
return callback(null, null); | ||
} | ||
callback(null, this._parseKeyspace(row, virtual)); | ||
}); | ||
}; | ||
/** | ||
* Upon migration from thrift to CQL, we internally create a pair of surrogate clustering/regular columns | ||
@@ -1194,2 +1401,3 @@ * for compact static tables. These columns shouldn't be exposed to the user but are currently returned by C*. | ||
* provided Cassandra version | ||
* @param {ClientOptions} options The client options | ||
* @param {ControlConnection} cc The control connection to be used | ||
@@ -1201,9 +1409,11 @@ * @param {Function} udtResolver The function to be used to retrieve the udts. | ||
*/ | ||
function getByVersion(cc, udtResolver, version, currentInstance) { | ||
function getByVersion(options, cc, udtResolver, version, currentInstance) { | ||
let parserConstructor = SchemaParserV1; | ||
if (version && version[0] >= 3) { | ||
if (version && version[0] === 3) { | ||
parserConstructor = SchemaParserV2; | ||
} else if (version && version[0] >= 4) { | ||
parserConstructor = SchemaParserV3; | ||
} | ||
if (!currentInstance || !(currentInstance instanceof parserConstructor)){ | ||
return new parserConstructor(cc, udtResolver); | ||
return new parserConstructor(options, cc, udtResolver); | ||
} | ||
@@ -1210,0 +1420,0 @@ return currentInstance; |
@@ -30,6 +30,2 @@ /** | ||
* Returns the memtable flush period (in milliseconds) option for this table. | ||
* <p> | ||
* Note: this option is available only on Cassandra 2.x and will return 0 (no periodic | ||
* flush) when connected to 1.2 nodes. | ||
* </p> | ||
* @type {Number} | ||
@@ -41,4 +37,4 @@ */ | ||
* <p> | ||
* Note: this option is only available in Cassandra 2.0. It is deprecated in Cassandra 2.1 and above, and will | ||
* therefore return {@code null} for 2.1 nodes. | ||
* Note: this option is only available in Apache Cassandra 2.0. It is deprecated in Apache Cassandra 2.1 and | ||
* above, and will therefore return <code>null</code> for 2.1 nodes. | ||
* </p> | ||
@@ -64,2 +60,8 @@ * @type {Number|null} | ||
this.cdc = null; | ||
/** | ||
* Determines whether the table is a virtual table or not. | ||
* @type {Boolean} | ||
*/ | ||
this.virtual = false; | ||
} | ||
@@ -66,0 +68,0 @@ |
@@ -99,3 +99,3 @@ /** | ||
const message = util.format('The host %s did not reply before timeout %d ms', address, millis); | ||
self._markAsTimedOut(new errors.OperationTimedOutError(message), onResponse); | ||
self._markAsTimedOut(new errors.OperationTimedOutError(message, address), onResponse); | ||
}, millis); | ||
@@ -102,0 +102,0 @@ } |
@@ -109,4 +109,8 @@ /** | ||
* @param {?String} [localDc] local datacenter name. | ||
* @param {Number} [usedHostsPerRemoteDc] the number of host per remote datacenter that the policy will yield \ | ||
* in a newQueryPlan after the local nodes. | ||
* @param {Number} [usedHostsPerRemoteDc] Deprecated: the number of host per remote datacenter that the policy will | ||
* yield in a newQueryPlan after the local nodes. | ||
* <p> | ||
* Note that this parameter is deprecated and will be removed in the next major version. Handling data center | ||
* outages is better suited at a service level rather than within an application client. | ||
* </p> | ||
* @extends {LoadBalancingPolicy} | ||
@@ -113,0 +117,0 @@ * @constructor |
@@ -252,2 +252,6 @@ /** | ||
* @property {useCurrentHost} [useCurrentHost] Determines if it should use the same host to retry the request. | ||
* <p> | ||
* In the case that the current host is not available anymore, it will be retried on the next host even when | ||
* <code>useCurrentHost</code> is set to <code>true</code>. | ||
* </p> | ||
*/ | ||
@@ -254,0 +258,0 @@ |
@@ -196,3 +196,4 @@ /** | ||
} | ||
RequestHandler.borrowFromHost(host, keyspace, function borrowCallback(err, connection) { | ||
host.borrowConnection(keyspace, null, function borrowCallback(err, connection) { | ||
if (err) { | ||
@@ -223,3 +224,4 @@ return callback(err); | ||
} | ||
RequestHandler.borrowFromHost(host, keyspace, function borrowCallback(err, connection) { | ||
host.borrowConnection(keyspace, null, function borrowCallback(err, connection) { | ||
if (err) { | ||
@@ -226,0 +228,0 @@ // Don't mind about issues with the pool in this case |
@@ -16,5 +16,5 @@ /** | ||
const retryOnNextHostDecision = Object.freeze({ | ||
const retryOnCurrentHost = Object.freeze({ | ||
decision: retry.RetryPolicy.retryDecision.retry, | ||
useCurrentHost: false, | ||
useCurrentHost: true, | ||
consistency: undefined | ||
@@ -168,4 +168,6 @@ }); | ||
if (err.requestNotWritten) { | ||
// The request was definitely not applied, it's safe to retry | ||
return retryOnNextHostDecision; | ||
// The request was definitely not applied, it's safe to retry. | ||
// Retry on the current host as there might be other connections open, in case it fails to obtain a connection | ||
// on the current host, the driver will immediately retry on the next host. | ||
return retryOnCurrentHost; | ||
} | ||
@@ -209,4 +211,6 @@ return onRequestError(); | ||
} | ||
this._parent.log('info', 'Retrying request'); | ||
this._retryCount++; | ||
if (meta || (typeof consistency === 'number' && this._request.consistency !== consistency)) { | ||
@@ -223,6 +227,24 @@ this._request = this._request.clone(); | ||
} | ||
if (useCurrentHost !== false) { | ||
// Use existing host (default) | ||
return this._sendOnConnection(); | ||
// Use existing host (default). | ||
const keyspace = this._parent.client.keyspace; | ||
// Reusing the existing connection is suitable for the most common scenarios, like server read timeouts that | ||
// will be fixed with a new request. | ||
// To cover all scenarios (e.g., where a different connection to the same host might mean something different), | ||
// we obtain a new connection from the host pool. | ||
// When there was a socket error, the connection provided was already removed from the pool earlier. | ||
return this._host.borrowConnection(keyspace, this._connection, (err, connection) => { | ||
if (err) { | ||
// All connections are busy (`BusyConnectionError`) or there isn't a ready connection in the pool (`Error`) | ||
// The retry policy declared the intention to retry on the current host but its not available anymore. | ||
// Use the next host | ||
return this.start(); | ||
} | ||
this._connection = connection; | ||
this._sendOnConnection(); | ||
}); | ||
} | ||
// Use the next host in the query plan to send the request | ||
@@ -232,4 +254,2 @@ this.start(); | ||
/** | ||
@@ -236,0 +256,0 @@ * Issues a PREPARE request on the current connection. |
@@ -63,3 +63,3 @@ /** | ||
RequestHandler.borrowFromHost(host, keyspace, function borrowFromHostCallback(err, connection) { | ||
host.borrowConnection(keyspace, null, function borrowFromHostCallback(err, connection) { | ||
if (err) { | ||
@@ -80,26 +80,2 @@ triedHosts[host.address] = err; | ||
/** | ||
* Borrows a connection from the provided host, changing the current keyspace, if necessary. | ||
* @param {Host} host | ||
* @param {String} keyspace | ||
* @param {Function} callback | ||
*/ | ||
static borrowFromHost(host, keyspace, callback) { | ||
host.borrowConnection(function (err, connection) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (!keyspace || keyspace === connection.keyspace) { | ||
// Connection is ready to be used | ||
return callback(null, connection); | ||
} | ||
connection.changeKeyspace(keyspace, function (err) { | ||
if (err) { | ||
return callback(err, connection); | ||
} | ||
callback(null, connection); | ||
}); | ||
}); | ||
} | ||
/** | ||
* Gets the next host from the query plan. | ||
@@ -164,11 +140,18 @@ * @param {Iterator} iterator | ||
} | ||
this._callback = callback; | ||
const self = this; | ||
this.loadBalancingPolicy.newQueryPlan(this.client.keyspace, this.requestOptions, function newPlanCb(err, iterator) { | ||
if (err) { | ||
return callback(err); | ||
} | ||
self._hostIterator = iterator; | ||
self._callback = callback; | ||
if (this.requestOptions.host) { | ||
// if host is configured bypass load balancing policy and use | ||
// a single host plan. | ||
self._hostIterator = utils.arrayIterator([self.requestOptions.host]); | ||
self._startNewExecution(); | ||
}); | ||
} else { | ||
this.loadBalancingPolicy.newQueryPlan(this.client.keyspace, this.requestOptions, function newPlanCb(err, iterator) { | ||
if (err) { | ||
return self._callback(err); | ||
} | ||
self._hostIterator = iterator; | ||
self._startNewExecution(); | ||
}); | ||
} | ||
} | ||
@@ -175,0 +158,0 @@ |
{ | ||
"name": "dse-driver", | ||
"version": "1.6.0", | ||
"version": "1.7.0", | ||
"description": "DataStax Enterprise Node.js Driver", | ||
@@ -24,8 +24,5 @@ "author": "DataStax", | ||
"devDependencies": { | ||
"mocha": "~2.5.3", | ||
"rewire": ">= 2.1.0", | ||
"temp": ">= 0.8.3", | ||
"mocha-jenkins-reporter": ">= 0.1.9", | ||
"mocha-appveyor-reporter": ">= 0.2.1", | ||
"mocha-multi": "~0.11.0" | ||
"mocha": "~5.2.0", | ||
"rewire": "^4.0.1", | ||
"temp": ">= 0.8.3" | ||
}, | ||
@@ -37,11 +34,11 @@ "homepage": "http://docs.datastax.com/en/developer/nodejs-driver-dse/latest/", | ||
"license": "SEE LICENSE IN http://www.datastax.com/terms/datastax-dse-driver-license-terms", | ||
"scripts": { | ||
"scripts": { | ||
"test": "./node_modules/.bin/mocha test/unit -R spec -t 5000 --recursive", | ||
"ci": "./node_modules/.bin/mocha test/unit test/integration/short --recursive -R mocha-jenkins-reporter", | ||
"ci_unit": "./node_modules/.bin/mocha test/unit -R mocha-multi", | ||
"ci_jenkins": "./node_modules/.bin/mocha test/unit test/integration/short -R mocha-jenkins-reporter --recursive --exit", | ||
"ci_unit_appveyor": "./node_modules/.bin/mocha test/unit -R mocha-appveyor-reporter --recursive --exit", | ||
"eslint": "eslint lib test --ignore-pattern '/lib/types/integer.js'" | ||
}, | ||
"engines": { | ||
"node" : ">=4" | ||
} | ||
"node": ">=4" | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
759713
3
21908
6