ripple-hbase-client
Advanced tools
Comparing version 0.1.13-thrift to 0.1.14-rc1
{ | ||
"name": "ripple-hbase-client", | ||
"version": "0.1.13-thrift", | ||
"version": "0.1.14-rc1", | ||
"description": "ripple hbase client", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
103
src/index.js
@@ -1,17 +0,7 @@ | ||
const thrift = require('thrift') | ||
const Logger = require('./logger') | ||
const genericPool = require('generic-pool') | ||
const HBase = require('./gen/Hbase') | ||
const Logger = require('./logger'); | ||
const client = require('./client'); | ||
const HBaseTypes = require('./gen/Hbase_types') | ||
const TIMEOUT_MESSAGE = 'HBase client timeout' | ||
const CLOSE_MESSAGE = 'HBase client connection closed' | ||
const ACQUIRE_TIMEOUT = 5000; | ||
const IDLE_TIMEOUT = 30000; | ||
const EVICTION_TIMEOUT = 10000; | ||
const TIMEOUT_MESSAGE = 'thrift client scan timeout' | ||
const DEFAULT_TIMEOUT = 30000; | ||
const DEFAULT_PORT = 9090; | ||
const DEFAULT_MAX_SOCKETS = 100; | ||
const DEFAULT_MIN_SOCKETS = 5; | ||
@@ -153,4 +143,4 @@ function addFilters(filters) { | ||
const self = this | ||
this._timeout = options.timeout || DEFAULT_TIMEOUT; | ||
this._prefix = options.prefix || '' | ||
this._timeout = options.timeout || DEFAULT_TIMEOUT; | ||
this.logStats = (options.logLevel && options.logLevel > 3) ? true : false | ||
@@ -164,85 +154,14 @@ | ||
const servers = options.servers || [] | ||
this.client = new client(options); | ||
} | ||
if (!servers.length) { | ||
servers.push({ | ||
host: options.host, | ||
port: options.port || DEFAULT_PORT | ||
}) | ||
} | ||
const factory = { | ||
create: function() { | ||
return new Promise(function(resolve, reject) { | ||
const i = Math.floor(Math.random() * servers.length) | ||
const server = servers[i] | ||
HbaseClient.prototype.release = function(connection) { | ||
//this.client.release(connection); | ||
}; | ||
const connection = thrift.createConnection(server.host, server.port, { | ||
transport: thrift.TFramedTransport, | ||
protocol: thrift.TBinaryProtocol, | ||
timeout: self._timeout, | ||
connect_timeout: ACQUIRE_TIMEOUT | ||
}) | ||
connection.once('connect', () => { | ||
connection.connection.setKeepAlive(true) | ||
connection.client = thrift.createClient(HBase, connection) | ||
resolve(connection) | ||
}) | ||
connection.on('error', reject); | ||
connection.on('close', () => { | ||
connection.connected = false | ||
reject('connection closed') | ||
}) | ||
connection.on('timeout', () => { | ||
connection.connected = false | ||
reject('connection timeout') | ||
}) | ||
}) | ||
}, | ||
destroy: client => {}, | ||
validate: client => { | ||
return client.connected | ||
} | ||
} | ||
const params = { | ||
testOnBorrow: true, | ||
max: options.max_sockets || DEFAULT_MAX_SOCKETS, | ||
min: options.min_sockets || DEFAULT_MIN_SOCKETS, | ||
acquireTimeoutMillis: ACQUIRE_TIMEOUT, | ||
idleTimeoutMillis: IDLE_TIMEOUT, | ||
evictionRunIntervalMillis: EVICTION_TIMEOUT | ||
} | ||
this.pool = genericPool.createPool(factory, params); | ||
} | ||
HbaseClient.prototype.acquire = function(reject) { | ||
const self = this; | ||
return this.pool.acquire() | ||
.then(client => { | ||
const handleRejection = error => { | ||
self.release(client); | ||
reject(error); | ||
} | ||
return this.client.getConnection(); | ||
}; | ||
const onTimeout = handleRejection.bind(this, TIMEOUT_MESSAGE) | ||
const onClose = handleRejection.bind(this, CLOSE_MESSAGE) | ||
client.on('error', handleRejection) | ||
client.on('timeout', onTimeout) | ||
client.on('close', onClose) | ||
return client | ||
}) | ||
} | ||
HbaseClient.prototype.release = function(client) { | ||
client.removeAllListeners() | ||
this.pool.release(client) | ||
} | ||
/** | ||
@@ -249,0 +168,0 @@ * getRow |
@@ -6,7 +6,6 @@ const Hbase = require('./src/index.js') | ||
const hbase = new Hbase({ | ||
host: 'hadoop1-private.sjc03.infra.ripple.coma', | ||
port: 9095, | ||
host: 'hadoop-slave6.usw2.data.ripple.com', | ||
port: 9090, | ||
prefix: 'prod_', | ||
logLevel: 3, | ||
timeout: 10000, | ||
min_sockets: 1, | ||
@@ -13,0 +12,0 @@ max_sockets: 5 |
@@ -10,5 +10,3 @@ const mock = require('./mock.json') | ||
prefix: 'prefix', | ||
logLevel: 2, | ||
max_sockets: 400, | ||
min_sockets: 5 | ||
logLevel: 2 | ||
}) | ||
@@ -46,3 +44,5 @@ | ||
}) | ||
.then(assert) | ||
.then(() => { | ||
assert(); | ||
}) | ||
.catch(err => { | ||
@@ -58,10 +58,34 @@ assert.strictEqual(err.name, 'IOError') | ||
logLevel: 2, | ||
max_sockets: 5, | ||
min_sockets: 5, | ||
timeout: 100 | ||
}) | ||
let i = 30 | ||
let i = 5 | ||
const list = [] | ||
while (i--) { | ||
list.push(hb.getRow({ | ||
table: 'test', | ||
rowkey: 'A', | ||
})) | ||
} | ||
return Promise.all(list) | ||
.then(() => { | ||
assert(); | ||
}) | ||
.catch(err => { | ||
assert.strictEqual(err.toString(), 'thrift client connection timeout') | ||
}) | ||
}) | ||
it('should handle client timeout error (scan)', function() { | ||
const hb = new Hbase({ | ||
host: 'hbase', | ||
prefix: 'prefix', | ||
logLevel: 2, | ||
timeout: 100 | ||
}) | ||
let i = 5 | ||
const list = [] | ||
while (i--) { | ||
list.push(hb.getScan({ | ||
@@ -76,5 +100,7 @@ table: 'test', | ||
return Promise.all(list) | ||
.then(assert) | ||
.then(() => { | ||
assert(); | ||
}) | ||
.catch(err => { | ||
assert.strictEqual(err.toString(), 'HBase client timeout') | ||
assert.strictEqual(err.toString(), 'thrift client scan timeout') | ||
}) | ||
@@ -81,0 +107,0 @@ }) |
426016
15
14661