Socket
Socket
Sign inDemoInstall

dse-driver

Package Overview
Dependencies
1
Maintainers
2
Versions
21
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.6.0 to 1.7.0

10

lib/client-options.js

@@ -236,5 +236,10 @@ /**

if (userOptions.host && !(userOptions.host instanceof Host)) {
throw new TypeError('host must be an instance of Host');
}
if (!allowContinuousPaging && userOptions.continuousPaging) {
return new errors.NotSupportedError('Continuous paging not allowed, use stream() method instead');
}
// Using fixed property names is 2 order of magnitude faster than dynamically shallow clone objects

@@ -251,2 +256,3 @@ const result = {

hints: userOptions.hints,
host: userOptions.host,
isIdempotent: ifUndefined(userOptions.isIdempotent, defaultQueryOptions.isIdempotent),

@@ -369,2 +375,4 @@ keyspace: userOptions.keyspace,

exports.continuousPageDefaultSize = continuousPageDefaultSize;
exports.continuousPageDefaultHighWaterMark = continuousPageDefaultHighWaterMark;
exports.continuousPageDefaultHighWaterMark = continuousPageDefaultHighWaterMark;
const Host = require('./host').Host;

@@ -223,2 +223,23 @@ /**

* <p>For batch queries, an array of such arrays, ordered as with the queries in the batch.</p>
* @property {Host} [host] The host that should handle the query.
* <p>
* Use of this option is <em>heavily discouraged</em> and should only be used in the following cases:
* </p>
* <ol>
* <li>
* Querying node-local tables, such as tables in the <code>system</code> and <code>system_views</code>
* keyspaces.
* </li>
* <li>
* Applying a series of schema changes, where it may be advantageous to execute schema changes in sequence on the
* same node.
* </li>
* </ol>
* <p>
* Configuring a specific host causes the configured
* [LoadBalancingPolicy]{@link module:policies/loadBalancing~LoadBalancingPolicy} to be completely bypassed.
* However, if the load balancing policy dictates that the host is at a
* [distance of ignored]{@link module:types~distance} or there is no active connectivity to the host, the request will
* fail with a [NoHostAvailableError]{@link module:errors~NoHostAvailableError}.
* </p>
* @property {Boolean} [isIdempotent] Defines whether the query can be applied multiple times without changing the result

@@ -225,0 +246,0 @@ * beyond the initial application.

40

lib/control-connection.js

@@ -251,3 +251,4 @@ /**

ControlConnection.prototype.borrowHostConnection = function (host, callback) {
host.borrowConnection(callback);
// Borrow any open connection, regardless of the keyspace
host.borrowConnection(null, null, callback);
};

@@ -257,6 +258,7 @@

* Gets the info from local and peer metadata, reloads the keyspaces metadata and rebuilds tokens.
* @param {Boolean} newNodesUp
* @param {Boolean} initializing Determines whether this function was called in order to initialize the control
* connection the first time.
* @param {Function} [callback]
*/
ControlConnection.prototype.refreshHosts = function (newNodesUp, callback) {
ControlConnection.prototype.refreshHosts = function (initializing, callback) {
callback = callback || utils.noop;

@@ -289,3 +291,3 @@ // it's possible that this was called as a result of a topology change, but the connection was lost

c.sendStream(request, null, function (err, result) {
self.setPeersInfo(newNodesUp, err, result, next);
self.setPeersInfo(initializing, err, result, next);
});

@@ -626,3 +628,4 @@ },

/**
* @param {Boolean} newNodesUp
* @param {Boolean} initializing Determines whether this function was called in order to initialize the control
* connection the first time.
* @param {Error} err

@@ -632,3 +635,3 @@ * @param {ResultSet} result

*/
ControlConnection.prototype.setPeersInfo = function (newNodesUp, err, result, callback) {
ControlConnection.prototype.setPeersInfo = function (initializing, err, result, callback) {
if (!result || !result.rows || err) {

@@ -646,12 +649,13 @@ return callback(err);

}
peers[endPoint] = true;
let host = self.hosts.get(endPoint);
if (!host) {
let isNewHost = !host;
if (isNewHost) {
host = new Host(endPoint, self.protocolVersion, self.options, self.metadata);
if (!newNodesUp) {
host.setDown();
}
self.log('info', 'Adding host ' + endPoint);
self.hosts.set(endPoint, host);
isNewHost = true;
}
host.datacenter = row['data_center'];

@@ -663,2 +667,16 @@ host.rack = row['rack'];

if (isNewHost) {
// Add it to the map (and trigger events) after all the properties
// were set to avoid race conditions
self.hosts.set(endPoint, host);
if (!initializing) {
// Set the distance at Host level, that way the connection pool is created with the correct settings
self.profileManager.getDistance(host);
// When we are not initializing, we start with the node set as DOWN
host.setDown();
}
}
next();

@@ -665,0 +683,0 @@ });

@@ -71,2 +71,9 @@ /**

const unsetValueBuffer = utils.allocBufferFromArray([255, 255, 255, 254]);
const zeroLengthTypesSupported = new Set([
dataTypes.text,
dataTypes.ascii,
dataTypes.varchar,
dataTypes.custom,
dataTypes.blob
]);

@@ -217,3 +224,3 @@ /**

offset += self.collectionLengthSize;
if (valueLength < 0) {
if (valueLength < 0 || (valueLength === 0 && !zeroLengthTypesSupported.has(subtypes[1].code))) {
callback.call(thisArg, key, null);

@@ -220,0 +227,0 @@ continue;

@@ -110,8 +110,16 @@ /**

* {@link ClientOptions.socketOptions.readTimeout}.
* @param {String} message The error message.
* @param {String} [host] Address of the server host that caused the operation to time out.
* @constructor
*/
function OperationTimedOutError(message) {
function OperationTimedOutError(message, host) {
DriverError.call(this, message, this.constructor);
this.info = 'Represents a client-side error that is raised when the client did not hear back from the server ' +
'within socketOptions.readTimeout';
/**
* When defined, it gets the address of the host that caused the operation to time out.
* @type {String|undefined}
*/
this.host = host;
}

@@ -118,0 +126,0 @@

@@ -14,3 +14,3 @@ /**

const errors = require('./errors');
const defaultOptions = require('./client-options').defaultOptions();
const clientOptions = require('./client-options');

@@ -21,2 +21,4 @@ // Used to get the index of the connection with less in-flight requests

let defaultOptions;
/**

@@ -74,5 +76,6 @@ * Represents the possible states of the pool.

* Borrows a connection from the pool.
* @param {String} keyspace
* @param {Function} callback
*/
borrowConnection(callback) {
createAndBorrowConnection(keyspace, callback) {
this.create(false, err => {

@@ -82,14 +85,32 @@ if (err) {

}
if (this.connections.length === 0) {
// Normally it should have callback in error, but better to handle this possibility
return callback(new Error('No connection available'));
}
const maxRequests = this.options.pooling.maxRequestsPerConnection;
const c = HostConnectionPool.minInFlight(this.connections, maxRequests);
this.borrowConnection(keyspace, null, callback);
});
}
if (c.getInFlight() >= maxRequests) {
return callback(new errors.BusyConnectionError(this._address, maxRequests, this.connections.length));
}
callback(null, c);
/**
* Tries to borrow one of the existing connections from the pool.
* @param {Connection} previousConnection When provided, the pool should try to provide a different connection.
* @param {String} keyspace
* @param {Function} callback
*/
borrowConnection(keyspace, previousConnection, callback) {
if (this.connections.length === 0) {
return callback(new Error('No connection available'));
}
const maxRequests = this.options.pooling.maxRequestsPerConnection;
const c = HostConnectionPool.minInFlight(this.connections, maxRequests, previousConnection);
if (c.getInFlight() >= maxRequests) {
return callback(new errors.BusyConnectionError(this._address, maxRequests, this.connections.length));
}
if (!keyspace || keyspace === c.keyspace) {
// Connection is ready to be used
return callback(null, c);
}
c.changeKeyspace(keyspace, (err) => {
callback(err, c);
});

@@ -104,5 +125,6 @@ }

* @param {Number} maxRequests
* @param {Connection} previousConnection
* @returns {Connection}
*/
static minInFlight(connections, maxRequests) {
static minInFlight(connections, maxRequests, previousConnection) {
const length = connections.length;

@@ -122,6 +144,17 @@ if (length === 1) {

current = connections[index % length];
const next = connections[(index + 1) % length];
if (current === previousConnection) {
// Increment the index and skip
current = connections[(++index) % length];
}
let next = connections[(index + 1) % length];
if (next === previousConnection) {
// Skip
next = connections[(index + 2) % length];
}
if (next.getInFlight() < current.getInFlight()) {
current = next;
}
if (current.getInFlight() < maxRequests) {

@@ -376,3 +409,3 @@ // Check as few connections as possible, as long as the amount of in-flight

// Check that after sometime (readTimeout + 100ms) the connections have been drained
const delay = (this.options.socketOptions.readTimeout || defaultOptions.socketOptions.readTimeout) + 100;
const delay = (this.options.socketOptions.readTimeout || getDefaultOptions().socketOptions.readTimeout) + 100;
checkShutdownTimeout = setTimeout(function checkShutdown() {

@@ -454,2 +487,10 @@ wasClosed = true;

/** Lazily loads the default options */
function getDefaultOptions() {
if (defaultOptions === undefined) {
defaultOptions = clientOptions.defaultOptions();
}
return defaultOptions;
}
module.exports = HostConnectionPool;

@@ -88,2 +88,3 @@ /**

this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
this.reconnectionDelay = 0;
}

@@ -94,3 +95,4 @@

/**
* Marks this host as not available for query coordination.
* Marks this host as not available for query coordination, when the host was previously marked as UP, otherwise its
* a no-op.
* @internal

@@ -100,3 +102,3 @@ * @ignore

Host.prototype.setDown = function() {
// multiple events signaling that a host is failing could cause multiple calls to this method
// Multiple events signaling that a host is failing could cause multiple calls to this method
if (this.setDownAt !== 0) {

@@ -106,2 +108,3 @@ // the host is already marked as Down

}
if (this.pool.isClosing()) {

@@ -111,10 +114,12 @@ // the pool is being closed/shutdown, don't mind

}
this.setDownAt = Date.now();
if (this._distance !== types.distance.ignored) {
this.log('warning',
util.format('Host %s considered as DOWN. Reconnection delay %dms.', this.address, this.reconnectionDelay));
if (this.pool.coreConnectionsLength > 0) {
// According to the distance, there should be connections open to it => issue a warning
this.log('warning', `Host ${this.address} considered as DOWN. Reconnection delay ${this.reconnectionDelay}ms.`);
} else {
this.log('info', `Host ${this.address} considered as DOWN.`);
}
else {
this.log('info', util.format('Host %s considered as DOWN.', this.address));
}
this.emit('down');

@@ -239,2 +244,6 @@ this._checkPoolState();

* If there isn't an available connections, it will open a new one according to the pooling options.
* @param {String} keyspace The keyspace that the connection must be using. When the keyspace provided is null, no
* keyspace check is performed.
* @param {Connection} previousConnection The previous connection. When provided, the pool should try to provide a
* different connection.
* @param {Function} callback

@@ -244,4 +253,9 @@ * @internal

*/
Host.prototype.borrowConnection = function (callback) {
this.pool.borrowConnection(callback);
Host.prototype.borrowConnection = function (keyspace, previousConnection, callback) {
if (previousConnection) {
// Obtain one of the existing connections
return this.pool.borrowConnection(keyspace, previousConnection, callback);
}
this.pool.createAndBorrowConnection(keyspace, callback);
};

@@ -305,4 +319,5 @@

}
if (this.pool.connections.length < this.pool.coreConnectionsLength) {
// the pool still needs to grow
// the pool needs to grow / reconnect
if (!this.pool.hasScheduledNewConnection()) {

@@ -313,5 +328,7 @@ this.reconnectionDelay = this.reconnectionSchedule.next().value;

}
if (this._distance !== types.distance.ignored &&
this.pool.connections.length === 0 &&
this.pool.coreConnectionsLength > 0) {
const shouldHaveConnections = this._distance !== types.distance.ignored && this.pool.coreConnectionsLength > 0;
if (shouldHaveConnections && this.pool.connections.length === 0) {
// Mark as DOWN, if its UP
this.setDown();

@@ -318,0 +335,0 @@ }

@@ -70,3 +70,4 @@ /**

/**
* Specifies the probability with which read repairs should be invoked on non-quorum reads. The value must be between 0 and 1.
* Specifies the probability with which read repairs should be invoked on non-quorum reads. The value must be
* between 0 and 1.
* @type {number}

@@ -78,3 +79,3 @@ */

* <p>
* For Cassandra versions prior to 3.0.0, this method always returns {@code null}.
* For Apache Cassandra versions prior to 3.0.0, this method always returns <code>null</code>.
* </p>

@@ -89,3 +90,3 @@ * @type {Object}

* <p>
* For Cassandra versions prior to 3.0.0, this method always returns {@code null}.
* For Apache Cassandra versions prior to 3.0.0, this method always returns <code>null</code>.
* </p>

@@ -102,5 +103,2 @@ * @type {Number|null}

* Returns the default TTL for this table.
* <p>
* Note: this option is not available in Cassandra 1.2 and will return 0 (no default TTL) when connected to 1.2 nodes.
* </p>
* @type {Number}

@@ -111,6 +109,2 @@ */

* * Returns the speculative retry option for this table.
* <p>
* Note: this option is not available in Cassandra 1.2 and will return "NONE" (no speculative retry) when connected
* to 1.2 nodes.
* </p>
* @type {String}

@@ -122,3 +116,4 @@ */

* <p>
* Note: this option is available in Cassandra 2.1 and above, and will return {@code null} for earlier versions.
* Note: this option is available in Apache Cassandra 2.1 and above, and will return <code>null</code> for
* earlier versions.
* </p>

@@ -131,3 +126,4 @@ * @type {Number|null}

* <p>
* Note: this option is available in Cassandra 2.1 and above, and will return {@code null} for earlier versions.
* Note: this option is available in Apache Cassandra 2.1 and above, and will return <code>null</code> for
* earlier versions.
* </p>

@@ -134,0 +130,0 @@ * @type {Number|null}

@@ -68,3 +68,3 @@ /**

this.initialized = false;
this._schemaParser = schemaParserFactory.getByVersion(controlConnection, this.getUdt.bind(this));
this._schemaParser = schemaParserFactory.getByVersion(options, controlConnection, this.getUdt.bind(this));
const self = this;

@@ -84,3 +84,3 @@ this._preparedQueries = new PreparedQueries(options.maxPrepared, function () {

this._schemaParser = schemaParserFactory.getByVersion(
this.controlConnection, this.getUdt.bind(this), version, this._schemaParser);
this.options, this.controlConnection, this.getUdt.bind(this), version, this._schemaParser);
};

@@ -512,2 +512,3 @@

let cache;
let virtual;
if (this.options.isMetadataSyncEnabled) {

@@ -519,4 +520,5 @@ const keyspace = this.keyspaces[keyspaceName];

cache = keyspace.tables;
virtual = keyspace.virtual;
}
this._schemaParser.getTable(keyspaceName, name, cache, callback);
this._schemaParser.getTable(keyspaceName, name, cache, virtual, callback);
};

@@ -523,0 +525,0 @@

@@ -23,3 +23,3 @@ /**

* @param {String} target
* @param {Number} kind
* @param {Number|String} kind
* @param {Object} options

@@ -44,3 +44,3 @@ * @alias module:metadata~Index

*/
this.kind = kind;
this.kind = typeof kind === 'string' ? getKindByName(kind) : kind;
/**

@@ -79,2 +79,3 @@ * An associative array containing the index options

* Parses Index information from rows in the 'system_schema.indexes' table
* @deprecated It will be removed in the next major version.
* @param {Array.<Row>} indexRows

@@ -94,3 +95,4 @@ * @returns {Array.<Index>}

/**
* Parses Index information from rows in the legacy 'system.schema_columns' table
* Parses Index information from rows in the legacy 'system.schema_columns' table.
* @deprecated It will be removed in the next major version.
* @param {Array.<Row>} columnRows

@@ -97,0 +99,0 @@ * @param {Object.<String, {name, type}>} columnsByName

@@ -40,4 +40,11 @@ /**

const _selectAllVirtualKeyspaces = "SELECT * FROM system_virtual_schema.keyspaces";
const _selectSingleVirtualKeyspace = "SELECT * FROM system_virtual_schema.keyspaces where keyspace_name = '%s'";
const _selectVirtualTable = "SELECT * FROM system_virtual_schema.tables where keyspace_name = '%s' and table_name='%s'";
const _selectVirtualColumns = "SELECT * FROM system_virtual_schema.columns where keyspace_name = '%s' and table_name='%s'";
/**
* @abstract
* @param {ClientOptions} options The client options
* @param {ControlConnection} cc

@@ -47,4 +54,5 @@ * @constructor

*/
function SchemaParser(cc) {
function SchemaParser(options, cc) {
this.cc = cc;
this.encodingOptions = options.encoding;
this.selectTable = null;

@@ -56,2 +64,3 @@ this.selectColumns = null;

this.selectFunctions = null;
this.supportsVirtual = false;
}

@@ -64,6 +73,7 @@

* @param strategyOptions
* @param virtual
* @returns {{name, durableWrites, strategy, strategyOptions, tokenToReplica, udts, tables, functions, aggregates}|null}
* @protected
*/
SchemaParser.prototype._createKeyspace = function (name, durableWrites, strategy, strategyOptions) {
SchemaParser.prototype._createKeyspace = function (name, durableWrites, strategy, strategyOptions, virtual) {
const ksInfo = {

@@ -75,2 +85,3 @@ name: name,

tokenToReplica: null,
virtual: virtual === true,
udts: {},

@@ -106,5 +117,6 @@ tables: {},

* @param {Object} cache
* @param {Boolean} virtual
* @param {Function} callback
*/
SchemaParser.prototype.getTable = function (keyspaceName, name, cache, callback) {
SchemaParser.prototype.getTable = function (keyspaceName, name, cache, virtual, callback) {
let tableInfo = cache && cache[name];

@@ -129,5 +141,7 @@ if (!tableInfo) {

const self = this;
let virtualTable = virtual;
utils.series([
function getTableRow(next) {
const query = util.format(self.selectTable, keyspaceName, name);
const selectTable = virtualTable ? _selectVirtualTable : self.selectTable;
const query = util.format(selectTable, keyspaceName, name);
self.cc.query(query, function (err, response) {

@@ -141,7 +155,30 @@ if (err) {

},
function getVirtualTableRow(next) {
// if we weren't sure if table was virtual or not, query virtual schema.
if (!tableRow && self.supportsVirtual && virtualTable === undefined) {
const query = util.format(_selectVirtualTable, keyspaceName, name);
self.cc.query(query, function (err, response) {
if (err) {
// we can't error here as we can't be sure if the node
// supports virtual tables, in this case it is adequate
// to act as if there was no matching table.
return next();
}
tableRow = response.rows[0];
// if we got a result, this is a virtual table
if (tableRow) {
virtualTable = true;
}
next();
});
} else {
return next();
}
},
function getColumnRows (next) {
if (!tableRow) {
return next(null, null, null);
return next();
}
const query = util.format(self.selectColumns, keyspaceName, name);
const selectColumns = virtualTable ? _selectVirtualColumns : self.selectColumns;
const query = util.format(selectColumns, keyspaceName, name);
self.cc.query(query, function (err, response) {

@@ -156,3 +193,3 @@ if (err) {

function getIndexes(next) {
if (!tableRow || !self.selectIndexes) {
if (!tableRow || !self.selectIndexes || virtualTable) {
//either the table does not exists or it does not support indexes schema table

@@ -175,3 +212,3 @@ return next();

}
self._parseTableOrView(tableInfo, tableRow, columnRows, indexRows, function (err) {
self._parseTableOrView(tableInfo, tableRow, columnRows, indexRows, virtualTable, function (err) {
tableInfo.loading = false;

@@ -250,6 +287,7 @@ tableInfo.loaded = !err;

* @param {Array.<Row>} indexRows
* @param {Boolean} virtual
* @param {Function} callback
* @throws {Error}
*/
SchemaParser.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, callback) {
SchemaParser.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, virtual, callback) {
};

@@ -347,4 +385,33 @@

/** @returns {Map} */
SchemaParser.prototype._asMap = function (obj) {
if (!obj) {
return new Map();
}
if (this.encodingOptions.map && obj instanceof this.encodingOptions.map) {
// Its already a Map or a polyfill of a Map
return obj;
}
return new Map(Object.keys(obj).map(k => [ k, obj[k]]));
};
SchemaParser.prototype._mapAsObject = function (map) {
if (!map) {
return map;
}
if (this.encodingOptions.map && map instanceof this.encodingOptions.map) {
const result = {};
map.forEach((value, key) => result[key] = value);
return result;
}
return map;
};
/**
* Used to parse schema information for Cassandra versions 1.2.x, and 2.x
* @param {ClientOptions} options The client options
* @param {ControlConnection} cc

@@ -354,4 +421,4 @@ * @constructor

*/
function SchemaParserV1(cc) {
SchemaParser.call(this, cc);
function SchemaParserV1(options, cc) {
SchemaParser.call(this, options, cc);
this.selectTable = _selectTableV1;

@@ -407,3 +474,3 @@ this.selectColumns = _selectColumnsV1;

/** @override */
SchemaParserV1.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, callback) {
SchemaParserV1.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, virtual, callback) {
let i, c, name, types;

@@ -629,2 +696,3 @@ const encoder = this.cc.getEncoder();

* Used to parse schema information for Cassandra versions 3.x and above
* @param {ClientOptions} options The client options
* @param {ControlConnection} cc The control connection to be used

@@ -635,4 +703,4 @@ * @param {Function} udtResolver The function to be used to retrieve the udts.

*/
function SchemaParserV2(cc, udtResolver) {
SchemaParser.call(this, cc);
function SchemaParserV2(options, cc, udtResolver) {
SchemaParser.call(this, options, cc);
this.udtResolver = udtResolver;

@@ -730,3 +798,3 @@ this.selectTable = _selectTableV2;

}
self._parseTableOrView(viewInfo, tableRow, columnRows, null, function (err) {
self._parseTableOrView(viewInfo, tableRow, columnRows, null, false, function (err) {
viewInfo.loading = false;

@@ -740,3 +808,3 @@ viewInfo.loaded = !err;

SchemaParserV2.prototype._parseKeyspace = function (row) {
SchemaParserV2.prototype._parseKeyspace = function (row, virtual) {
const replication = row['replication'];

@@ -759,7 +827,8 @@ let strategy;

strategy,
strategyOptions);
strategyOptions,
virtual);
};
/** @override */
SchemaParserV2.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, callback) {
SchemaParserV2.prototype._parseTableOrView = function (tableInfo, tableRow, columnRows, indexRows, virtual, callback) {
const encoder = this.cc.getEncoder();

@@ -769,2 +838,55 @@ const columnsKeyed = {};

const clusteringKeys = [];
const self = this;
// maps column rows to columnInfo and also populates columnsKeyed
const columnRowMapper = (row, next) => {
encoder.parseTypeName(tableRow['keyspace_name'], row['type'], 0, null, self.udtResolver, function (err, type) {
if (err) {
return next(err);
}
const c = {
name: row['column_name'],
type: type,
isStatic: false
};
columnsKeyed[c.name] = c;
switch (row['kind']) {
case 'partition_key':
partitionKeys.push({ c: c, index: (row['position'] || 0)});
break;
case 'clustering':
clusteringKeys.push({ c: c, index: (row['position'] || 0), order: row['clustering_order'] === 'desc' ? 'DESC' : 'ASC'});
break;
case 'static':
c.isStatic = true;
break;
}
next(null, c);
});
};
// is table is virtual, the only relevant information to parse is the columns as the table itself has no configuration.
if (virtual) {
tableInfo.virtual = true;
utils.map(columnRows, columnRowMapper, function (err, columns) {
if (err) {
return callback(err);
}
tableInfo.columns = columns;
tableInfo.columnsByName = columnsKeyed;
tableInfo.partitionKeys = partitionKeys.sort(utils.propCompare('index')).map(function (item) {
return item.c;
});
clusteringKeys.sort(utils.propCompare('index'));
tableInfo.clusteringKeys = clusteringKeys.map(function (item) {
return item.c;
});
tableInfo.clusteringOrder = clusteringKeys.map(function (item) {
return item.order;
});
callback();
});
return;
}
const isView = tableInfo instanceof MaterializedView;

@@ -774,18 +896,24 @@ tableInfo.bloomFilterFalsePositiveChance = tableRow['bloom_filter_fp_chance'];

tableInfo.comment = tableRow['comment'];
const compaction = tableRow['compaction'];
// Regardless of the encoding options, use always an Object to represent an associative Array
const compaction = this._asMap(tableRow['compaction']);
if (compaction) {
// compactionOptions as an Object<String, String>
tableInfo.compactionOptions = {};
tableInfo.compactionClass = compaction['class'];
for (const key in compaction) {
if (!compaction.hasOwnProperty(key) || key === 'class') {
continue;
tableInfo.compactionClass = compaction.get('class');
compaction.forEach((value, key) => {
if (key === 'class') {
return;
}
tableInfo.compactionOptions[key] = compaction[key];
}
tableInfo.compactionOptions[key] = compaction.get(key);
});
}
tableInfo.compression = tableRow['compression'];
// Convert compression to an Object<String, String>
tableInfo.compression = this._mapAsObject(tableRow['compression']);
tableInfo.gcGraceSeconds = tableRow['gc_grace_seconds'];
tableInfo.localReadRepairChance = tableRow['dclocal_read_repair_chance'];
tableInfo.readRepairChance = tableRow['read_repair_chance'];
tableInfo.extensions = tableRow['extensions'];
tableInfo.extensions = this._mapAsObject(tableRow['extensions']);
tableInfo.crcCheckChance = tableRow['crc_check_chance'];

@@ -804,28 +932,5 @@ tableInfo.memtableFlushPeriod = tableRow['memtable_flush_period_in_ms'] || tableInfo.memtableFlushPeriod;

}
const self = this;
utils.map(columnRows, function (row, next) {
encoder.parseTypeName(tableRow['keyspace_name'], row['type'], 0, null, self.udtResolver, function (err, type) {
if (err) {
return next(err);
}
const c = {
name: row['column_name'],
type: type,
isStatic: false
};
columnsKeyed[c.name] = c;
switch (row['kind']) {
case 'partition_key':
partitionKeys.push({ c: c, index: (row['position'] || 0)});
break;
case 'clustering':
clusteringKeys.push({ c: c, index: (row['position'] || 0), order: row['clustering_order'] === 'desc' ? 'DESC' : 'ASC'});
break;
case 'static':
c.isStatic = true;
break;
}
next(null, c);
});
}, function (err, columns) {
// Map all columns asynchronously
utils.map(columnRows, columnRowMapper, (err, columns) => {
if (err) {

@@ -846,2 +951,3 @@ return callback(err);

});
if (isView) {

@@ -853,7 +959,15 @@ tableInfo.tableName = tableRow['base_table_name'];

}
tableInfo.indexes = Index.fromRows(indexRows);
const flags = tableRow['flags'];
const isDense = flags.indexOf('dense') >= 0;
const isSuper = flags.indexOf('super') >= 0;
const isCompound = flags.indexOf('compound') >= 0;
tableInfo.indexes = this._getIndexes(indexRows);
// flags can be an instance of Array or Set (real or polyfill)
let flags = tableRow['flags'];
if (Array.isArray(flags)) {
flags = new Set(flags);
}
const isDense = flags.has('dense');
const isSuper = flags.has('super');
const isCompound = flags.has('compound');
tableInfo.isCompact = isSuper || isDense || !isCompound;

@@ -872,2 +986,13 @@ //remove the columns related to Thrift

SchemaParserV2.prototype._getIndexes = function (indexRows) {
if (!indexRows || indexRows.length === 0) {
return utils.emptyArray;
}
return indexRows.map((row) => {
const options = this._mapAsObject(row['options']);
return new Index(row['index_name'], options['target'], row['kind'], options);
});
};
/** @override */

@@ -981,2 +1106,84 @@ SchemaParserV2.prototype._parseAggregate = function (row, callback) {

/**
* Used to parse schema information for Cassandra versions 4.x and above.
*
* This parser similar to [SchemaParserV2] expect it also parses virtual
* keyspaces.
*
* @param {ClientOptions} options The client options
* @param {ControlConnection} cc The control connection to be used
* @param {Function} udtResolver The function to be used to retrieve the udts.
* @constructor
* @ignore
*/
function SchemaParserV3(options, cc, udtResolver) {
SchemaParserV2.call(this, options, cc, udtResolver);
this.supportsVirtual = true;
}
util.inherits(SchemaParserV3, SchemaParserV2);
/** @override */
SchemaParserV3.prototype.getKeyspaces = function (waitReconnect, callback) {
const self = this;
const keyspaces = {};
const queries = [
{ query: _selectAllKeyspacesV2, virtual: false },
{ query: _selectAllVirtualKeyspaces, virtual: true }
];
utils.each(queries, (q, cb) => {
self.cc.query(q.query, waitReconnect, (err, result) => {
if (err) {
// only callback in error for non-virtual query as
// server reporting C* 4.0 may not actually implement
// virtual tables.
if (!q.virtual) {
return cb(err);
}
return cb();
}
for (let i = 0; i < result.rows.length; i++) {
const ksInfo = self._parseKeyspace(result.rows[i], q.virtual);
keyspaces[ksInfo.name] = ksInfo;
}
cb();
});
}, (err) => {
callback(err, keyspaces);
});
};
/** @override */
SchemaParserV3.prototype.getKeyspace = function (name, callback) {
this._getKeyspace(_selectSingleKeyspaceV2, name, false, (err, ks) => {
if (err) {
return callback(err);
}
if (!ks) {
// if not found, attempt to retrieve as virtual keyspace.
return this._getKeyspace(_selectSingleVirtualKeyspace, name, true, callback);
}
return callback(null, ks);
});
};
SchemaParserV3.prototype._getKeyspace = function (query, name, virtual, callback) {
this.cc.query(util.format(query, name), (err, result) => {
if (err) {
// only callback in error for non-virtual query as
// server reporting C* 4.0 may not actually implement
// virtual tables.
if (!virtual) {
return callback(err);
}
return callback(null, null);
}
const row = result.rows[0];
if (!row) {
return callback(null, null);
}
callback(null, this._parseKeyspace(row, virtual));
});
};
/**
* Upon migration from thrift to CQL, we internally create a pair of surrogate clustering/regular columns

@@ -1194,2 +1401,3 @@ * for compact static tables. These columns shouldn't be exposed to the user but are currently returned by C*.

* provided Cassandra version
* @param {ClientOptions} options The client options
* @param {ControlConnection} cc The control connection to be used

@@ -1201,9 +1409,11 @@ * @param {Function} udtResolver The function to be used to retrieve the udts.

*/
function getByVersion(cc, udtResolver, version, currentInstance) {
function getByVersion(options, cc, udtResolver, version, currentInstance) {
let parserConstructor = SchemaParserV1;
if (version && version[0] >= 3) {
if (version && version[0] === 3) {
parserConstructor = SchemaParserV2;
} else if (version && version[0] >= 4) {
parserConstructor = SchemaParserV3;
}
if (!currentInstance || !(currentInstance instanceof parserConstructor)){
return new parserConstructor(cc, udtResolver);
return new parserConstructor(options, cc, udtResolver);
}

@@ -1210,0 +1420,0 @@ return currentInstance;

@@ -30,6 +30,2 @@ /**

* Returns the memtable flush period (in milliseconds) option for this table.
* <p>
* Note: this option is available only on Cassandra 2.x and will return 0 (no periodic
* flush) when connected to 1.2 nodes.
* </p>
* @type {Number}

@@ -41,4 +37,4 @@ */

* <p>
* Note: this option is only available in Cassandra 2.0. It is deprecated in Cassandra 2.1 and above, and will
* therefore return {@code null} for 2.1 nodes.
* Note: this option is only available in Apache Cassandra 2.0. It is deprecated in Apache Cassandra 2.1 and
* above, and will therefore return <code>null</code> for 2.1 nodes.
* </p>

@@ -64,2 +60,8 @@ * @type {Number|null}

this.cdc = null;
/**
* Determines whether the table is a virtual table or not.
* @type {Boolean}
*/
this.virtual = false;
}

@@ -66,0 +68,0 @@

@@ -99,3 +99,3 @@ /**

const message = util.format('The host %s did not reply before timeout %d ms', address, millis);
self._markAsTimedOut(new errors.OperationTimedOutError(message), onResponse);
self._markAsTimedOut(new errors.OperationTimedOutError(message, address), onResponse);
}, millis);

@@ -102,0 +102,0 @@ }

@@ -109,4 +109,8 @@ /**

* @param {?String} [localDc] local datacenter name.
* @param {Number} [usedHostsPerRemoteDc] the number of host per remote datacenter that the policy will yield \
* in a newQueryPlan after the local nodes.
* @param {Number} [usedHostsPerRemoteDc] Deprecated: the number of host per remote datacenter that the policy will
* yield in a newQueryPlan after the local nodes.
* <p>
* Note that this parameter is deprecated and will be removed in the next major version. Handling data center
* outages is better suited at a service level rather than within an application client.
* </p>
* @extends {LoadBalancingPolicy}

@@ -113,0 +117,0 @@ * @constructor

@@ -252,2 +252,6 @@ /**

* @property {useCurrentHost} [useCurrentHost] Determines if it should use the same host to retry the request.
* <p>
* In the case that the current host is not available anymore, it will be retried on the next host even when
* <code>useCurrentHost</code> is set to <code>true</code>.
* </p>
*/

@@ -254,0 +258,0 @@

@@ -196,3 +196,4 @@ /**

}
RequestHandler.borrowFromHost(host, keyspace, function borrowCallback(err, connection) {
host.borrowConnection(keyspace, null, function borrowCallback(err, connection) {
if (err) {

@@ -223,3 +224,4 @@ return callback(err);

}
RequestHandler.borrowFromHost(host, keyspace, function borrowCallback(err, connection) {
host.borrowConnection(keyspace, null, function borrowCallback(err, connection) {
if (err) {

@@ -226,0 +228,0 @@ // Don't mind about issues with the pool in this case

@@ -16,5 +16,5 @@ /**

const retryOnNextHostDecision = Object.freeze({
const retryOnCurrentHost = Object.freeze({
decision: retry.RetryPolicy.retryDecision.retry,
useCurrentHost: false,
useCurrentHost: true,
consistency: undefined

@@ -168,4 +168,6 @@ });

if (err.requestNotWritten) {
// The request was definitely not applied, it's safe to retry
return retryOnNextHostDecision;
// The request was definitely not applied, it's safe to retry.
// Retry on the current host as there might be other connections open, in case it fails to obtain a connection
// on the current host, the driver will immediately retry on the next host.
return retryOnCurrentHost;
}

@@ -209,4 +211,6 @@ return onRequestError();

}
this._parent.log('info', 'Retrying request');
this._retryCount++;
if (meta || (typeof consistency === 'number' && this._request.consistency !== consistency)) {

@@ -223,6 +227,24 @@ this._request = this._request.clone();

}
if (useCurrentHost !== false) {
// Use existing host (default)
return this._sendOnConnection();
// Use existing host (default).
const keyspace = this._parent.client.keyspace;
// Reusing the existing connection is suitable for the most common scenarios, like server read timeouts that
// will be fixed with a new request.
// To cover all scenarios (e.g., where a different connection to the same host might mean something different),
// we obtain a new connection from the host pool.
// When there was a socket error, the connection provided was already removed from the pool earlier.
return this._host.borrowConnection(keyspace, this._connection, (err, connection) => {
if (err) {
// All connections are busy (`BusyConnectionError`) or there isn't a ready connection in the pool (`Error`)
// The retry policy declared the intention to retry on the current host but its not available anymore.
// Use the next host
return this.start();
}
this._connection = connection;
this._sendOnConnection();
});
}
// Use the next host in the query plan to send the request

@@ -232,4 +254,2 @@ this.start();

/**

@@ -236,0 +256,0 @@ * Issues a PREPARE request on the current connection.

@@ -63,3 +63,3 @@ /**

RequestHandler.borrowFromHost(host, keyspace, function borrowFromHostCallback(err, connection) {
host.borrowConnection(keyspace, null, function borrowFromHostCallback(err, connection) {
if (err) {

@@ -80,26 +80,2 @@ triedHosts[host.address] = err;

/**
* Borrows a connection from the provided host, changing the current keyspace, if necessary.
* @param {Host} host
* @param {String} keyspace
* @param {Function} callback
*/
static borrowFromHost(host, keyspace, callback) {
host.borrowConnection(function (err, connection) {
if (err) {
return callback(err);
}
if (!keyspace || keyspace === connection.keyspace) {
// Connection is ready to be used
return callback(null, connection);
}
connection.changeKeyspace(keyspace, function (err) {
if (err) {
return callback(err, connection);
}
callback(null, connection);
});
});
}
/**
* Gets the next host from the query plan.

@@ -164,11 +140,18 @@ * @param {Iterator} iterator

}
this._callback = callback;
const self = this;
this.loadBalancingPolicy.newQueryPlan(this.client.keyspace, this.requestOptions, function newPlanCb(err, iterator) {
if (err) {
return callback(err);
}
self._hostIterator = iterator;
self._callback = callback;
if (this.requestOptions.host) {
// if host is configured bypass load balancing policy and use
// a single host plan.
self._hostIterator = utils.arrayIterator([self.requestOptions.host]);
self._startNewExecution();
});
} else {
this.loadBalancingPolicy.newQueryPlan(this.client.keyspace, this.requestOptions, function newPlanCb(err, iterator) {
if (err) {
return self._callback(err);
}
self._hostIterator = iterator;
self._startNewExecution();
});
}
}

@@ -175,0 +158,0 @@

{
"name": "dse-driver",
"version": "1.6.0",
"version": "1.7.0",
"description": "DataStax Enterprise Node.js Driver",

@@ -24,8 +24,5 @@ "author": "DataStax",

"devDependencies": {
"mocha": "~2.5.3",
"rewire": ">= 2.1.0",
"temp": ">= 0.8.3",
"mocha-jenkins-reporter": ">= 0.1.9",
"mocha-appveyor-reporter": ">= 0.2.1",
"mocha-multi": "~0.11.0"
"mocha": "~5.2.0",
"rewire": "^4.0.1",
"temp": ">= 0.8.3"
},

@@ -37,11 +34,11 @@ "homepage": "http://docs.datastax.com/en/developer/nodejs-driver-dse/latest/",

"license": "SEE LICENSE IN http://www.datastax.com/terms/datastax-dse-driver-license-terms",
"scripts": {
"scripts": {
"test": "./node_modules/.bin/mocha test/unit -R spec -t 5000 --recursive",
"ci": "./node_modules/.bin/mocha test/unit test/integration/short --recursive -R mocha-jenkins-reporter",
"ci_unit": "./node_modules/.bin/mocha test/unit -R mocha-multi",
"ci_jenkins": "./node_modules/.bin/mocha test/unit test/integration/short -R mocha-jenkins-reporter --recursive --exit",
"ci_unit_appveyor": "./node_modules/.bin/mocha test/unit -R mocha-appveyor-reporter --recursive --exit",
"eslint": "eslint lib test --ignore-pattern '/lib/types/integer.js'"
},
"engines": {
"node" : ">=4"
}
"node": ">=4"
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc