memsql-statsd
Advanced tools
Comparing version 0.1.1 to 0.2.0
@@ -7,2 +7,3 @@ { | ||
memsql: { | ||
prefix: "stats", | ||
host: "MASTER_AGGREGATOR_HOSTNAME", | ||
@@ -9,0 +10,0 @@ port: 3306, |
@@ -35,2 +35,16 @@ /* | ||
AggregatorsPool.prototype.connect_master = function() { | ||
var conn = this._connect_aggregator(this._master); | ||
if (!conn) { | ||
throw new Error('Failed to connect to master@' + this._master.host + ":" + this._master.port); | ||
} | ||
return conn; | ||
}; | ||
AggregatorsPool.prototype.connection_destroy = function(connection) { | ||
if (connection && connection.client && connection.client.connectedSync()) { | ||
connection.disconnect(); | ||
} | ||
}; | ||
AggregatorsPool.prototype._reset = function() { | ||
@@ -46,3 +60,3 @@ // resets the pool to an initial state | ||
create: this._connection_create.bind(this), | ||
destroy: this._connection_destroy.bind(this), | ||
destroy: this.connection_destroy.bind(this), | ||
max: 5, | ||
@@ -91,8 +105,2 @@ idleTimeoutMillis: 10 * 1000, | ||
AggregatorsPool.prototype._connection_destroy = function(connection) { | ||
if (connection && connection.client && connection.client.connectedSync()) { | ||
connection.disconnect(); | ||
} | ||
}; | ||
AggregatorsPool.prototype._refresh_aggregators = function(callback) { | ||
@@ -135,3 +143,3 @@ // Try to get an existing aggregator connection, if it fails fall | ||
if (conn) { update(conn, true); } | ||
else { callback(new Error('Failed to connect to master @ ' + this._master.host + ":" + this._master.port)); } | ||
else { callback(new Error('Failed to connect to master@' + this._master.host + ":" + this._master.port)); } | ||
} else { | ||
@@ -143,3 +151,3 @@ this.acquire(function(err, conn) { | ||
if (conn) { update(conn, true); } | ||
else { callback(new Error('Failed to connect to master @ ' + this._master.host + ":" + this._master.port)); } | ||
else { callback(new Error('Failed to connect to master@' + this._master.host + ":" + this._master.port)); } | ||
} else { | ||
@@ -146,0 +154,0 @@ update(conn); |
@@ -8,21 +8,34 @@ /* | ||
var util = require('util'); | ||
var crypto = require('crypto'); | ||
var CLASSIFIERS = ['alpha', 'beta', 'gamma', 'delta', 'epsilon', 'zeta', 'eta', 'theta', 'iota', 'kappa'] | ||
var ANALYTICS_COLUMNS = ['classifier', 'value', 'created'].concat(CLASSIFIERS); | ||
var CLASSIFIERS = ['alpha', 'beta', 'gamma', 'delta', 'epsilon', 'zeta', 'eta', 'theta', 'iota', 'kappa', 'lambda', 'mu', 'nu', 'xi', 'omicron'] | ||
var CLASSIFIERS_COLUMNS = ['id', 'classifier'].concat(CLASSIFIERS); | ||
var STATEMENT_TEMPLATE = util.format("INSERT INTO `%%s`.analytics (%s) VALUES ", ANALYTICS_COLUMNS.join(',')); | ||
var ANALYTICS_INSERT = "INSERT INTO `%s`.analytics (classifier_id, value, created) VALUES "; | ||
var CLASSIFIERS_INSERT = util.format("INSERT INTO `%%s`.classifiers (%s) VALUES %%%%s ON DUPLICATE KEY UPDATE id=id", CLASSIFIERS_COLUMNS.join(',')); | ||
var ANALYTICS_VALUES_PLACEHOLDER = '(?,?,?)'; | ||
// classifier, value, timestamp, alpha, beta, gamma, delta... rest of classifiers | ||
var VALUES_PLACEHOLDER = '(?,?,?,?,?,?,?,?,?,?,?,?,?)'; | ||
var CLASSIFIER_VALUES_PLACEHOLDER = '(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'; | ||
var AnalyticsCache = function(logger, database_name, connection_pool) { | ||
this._logger = logger; | ||
this._database_name = database_name; | ||
this._pool = connection_pool; | ||
this._pending = []; | ||
var AnalyticsRow = function(joined_classifier, value, timestamp) { | ||
this.joined_classifier = joined_classifier; | ||
this.classifiers = this._get_classifiers(joined_classifier); | ||
this.classifier_id = this._compute_id(this.classifiers); | ||
this.value = value; | ||
this.timestamp = timestamp; | ||
}; | ||
AnalyticsCache.prototype.record = function(key, value, timestamp) { | ||
var classifiers = key.split('.'); | ||
AnalyticsRow.prototype.classifier_values = function() { | ||
return [this.classifier_id, this.joined_classifier].concat(this.classifiers); | ||
}; | ||
AnalyticsRow.prototype.analytics_values = function() { | ||
return [this.classifier_id, this.value, this.timestamp]; | ||
}; | ||
AnalyticsRow.prototype._get_classifiers = function(joined_classifier) { | ||
var classifiers = joined_classifier.split('.'); | ||
// if there are more classifiers than classifier columns, | ||
@@ -32,4 +45,4 @@ // lets just throw the remainder in the last classifier column | ||
var extra = classifiers.splice(CLASSIFIERS.length - 1); | ||
classifiers[classifiers.length - 1] = extra.join('.'); | ||
} else if (classifiers.length < CLASSIFIERS.length - 1) { | ||
classifiers[classifiers.length] = extra.join('.'); | ||
} else if (classifiers.length < CLASSIFIERS.length) { | ||
var diff = CLASSIFIERS.length - classifiers.length; | ||
@@ -39,7 +52,27 @@ classifiers = classifiers.concat(new Array(diff + 1).join(' ').split('').map(function() { return ''; })); | ||
this._pending.push([key, value, new Date(timestamp * 1000).toISOString()].concat(classifiers)); | ||
return classifiers; | ||
}; | ||
AnalyticsRow.prototype._compute_id = function(classifiers) { | ||
var shasum = crypto.createHash('sha1'); | ||
shasum.update(classifiers.join('.')); | ||
return parseInt(shasum.digest('hex').slice(0, 16), 16); | ||
}; | ||
var AnalyticsCache = function(logger, database_name, connection_pool, prefix) { | ||
this._logger = logger; | ||
this._database_name = database_name; | ||
this._pool = connection_pool; | ||
this._prefix = _.isUndefined(prefix) ? prefix : prefix + '.'; | ||
this._pending = []; | ||
this._seen_classifiers = []; | ||
}; | ||
AnalyticsCache.prototype.record = function(key, value, timestamp) { | ||
if (!_.isUndefined(this._prefix)) { key = this._prefix + key; } | ||
this._pending.push(new AnalyticsRow(key, value, new Date(timestamp * 1000).toISOString())); | ||
}; | ||
AnalyticsCache.prototype.flush = _.throttle(function(callback) { | ||
var statement_template = util.format(STATEMENT_TEMPLATE, this._database_name); | ||
var statement_template = util.format(ANALYTICS_INSERT, this._database_name); | ||
@@ -49,5 +82,8 @@ var queries = []; | ||
this._throttled_classifiers_reset(); | ||
this._record_classifiers(this._pending); | ||
// build queries | ||
while (this._pending.length > 0) { | ||
var batch = this._pending.splice(0, 50); | ||
var batch = this._pending.splice(0, 128); | ||
@@ -58,4 +94,5 @@ // build values array | ||
for (var i = 0, l = batch.length; i < l; ++i) { | ||
values = values.concat(batch[i]); | ||
statement_values.push(VALUES_PLACEHOLDER); | ||
var row = batch[i]; | ||
values.push.apply(values, row.analytics_values()); | ||
statement_values.push(ANALYTICS_VALUES_PLACEHOLDER); | ||
} | ||
@@ -73,3 +110,2 @@ | ||
var start = Date.now(); | ||
queries.push(function(err, result) { | ||
@@ -95,2 +131,54 @@ if (err) { | ||
AnalyticsCache.prototype._throttled_classifiers_reset = _.throttle(function() { | ||
this._seen_classifiers = []; | ||
}, 1000 * 5 * 60); | ||
AnalyticsCache.prototype._record_classifiers = function(rows) { | ||
var pending = []; | ||
for (var i = 0, l = rows.length; i < l; ++i) { | ||
var row = rows[i]; | ||
var index = _.sortedIndex(this._seen_classifiers, row.classifier_id); | ||
if (this._seen_classifiers[index] !== row.classifier_id) { | ||
// new classifier | ||
this._seen_classifiers.splice(index, 0, row.classifier_id); | ||
pending.push(row); | ||
} | ||
} | ||
if (pending.length > 0) { | ||
var statement_template = util.format(CLASSIFIERS_INSERT, this._database_name); | ||
var queries = []; | ||
while (pending.length > 0) { | ||
var batch = pending.splice(0, 64); | ||
// build values array | ||
var statement_values = []; | ||
var values = []; | ||
for (var i = 0, l = batch.length; i < l; ++i) { | ||
var row = batch[i]; | ||
values.push.apply(values, row.classifier_values()); | ||
statement_values.push(CLASSIFIER_VALUES_PLACEHOLDER); | ||
} | ||
var statement = util.format(statement_template, statement_values.join(',')); | ||
queries = queries.concat([statement, values]); | ||
} | ||
var conn = this._pool.connect_master(); | ||
queries.push(function(err, result) { | ||
this._pool.connection_destroy(conn); | ||
}.bind(this)); | ||
try { | ||
conn.execSeries.apply(conn, queries); | ||
} catch (e) { | ||
// connection error occured | ||
this._logger.debug('Error while inserting classifiers.'); | ||
this._pool.connection_destroy(conn); | ||
} | ||
} | ||
}; | ||
module.exports = AnalyticsCache; |
@@ -38,3 +38,3 @@ /* | ||
this.pool = new AggregatorsPool(this.logger, this.config.host, this.config.port, this.config.user, this.config.password); | ||
this.cache = new AnalyticsCache(this.logger, this.config.database, this.pool); | ||
this.cache = new AnalyticsCache(this.logger, this.config.database, this.pool, this.config.prefix); | ||
this.stats = { | ||
@@ -73,2 +73,4 @@ exception_count: 0, | ||
else if (!_.isString(this.config.database)) { throw new Error('MemSQL Ops database name must be a string.'); } | ||
if (_.has(this.config, 'prefix') && !_.isString(this.config.prefix)) { throw new Error('MemSQL prefix must be a string.'); } | ||
}; | ||
@@ -90,6 +92,8 @@ | ||
value = metrics.counters[key]; | ||
var value_per_second = metrics.counter_rates[key]; | ||
this.cache.record([key, 'count'].join('.'), value, timestamp); | ||
this.cache.record([key, 'count'].join('.'), value, timestamp); | ||
this.cache.record([key, 'rate'].join('.'), value, timestamp); | ||
if (_.has(metrics, 'counter_rates')) { | ||
var value_per_second = metrics.counter_rates[key]; | ||
this.cache.record([key, 'rate'].join('.'), value_per_second, timestamp); | ||
} | ||
} | ||
@@ -96,0 +100,0 @@ |
@@ -5,3 +5,3 @@ { | ||
"description": "A StatsD backend for MemSQL Ops", | ||
"version": "0.1.1", | ||
"version": "0.2.0", | ||
"homepage": "https://github.com/memsql/memsql-statsd", | ||
@@ -8,0 +8,0 @@ "repository": { |
@@ -51,2 +51,3 @@ # MemSQL StatsD Backend | ||
* generic-pool >= 2.0.4 | ||
* mapper >= 0.2.5 | ||
@@ -53,0 +54,0 @@ ## Development |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
21199
10
415
64