node-webhdfs
Advanced tools
Comparing version 0.4.0 to 1.0.0
{ | ||
"name": "node-webhdfs", | ||
"version": "0.4.0", | ||
"version": "1.0.0", | ||
"description": "A WebHDFS module for Node.js.", | ||
@@ -23,2 +23,3 @@ "author": "Ryan Cole <ryan@rycole.com> (http://rycole.com)", | ||
"mocha": "^5.2.0", | ||
"nock": "^10.0.6", | ||
"should": "^4.6.5" | ||
@@ -25,0 +26,0 @@ }, |
A WebHDFS module for Node.js | ||
# About | ||
## Notice | ||
I no longer personally use HDFS and have no plans to continue development on this module myself. With that said, many of the more recent contributions to this module have not been from myself, and instead have been from open source contributors - thanks to you folks. If you would like me to add you as a maintainer of this repository then just open a pull request or somehow message me. | ||
# WebHDFS | ||
I am currently following and testing against the [WebHDFS REST API documentation](http://hadoop.apache.org/docs/r1.2.1/webhdfs.html) for the `1.2.1` release, by Apache. Make sure you enable WebHDFS in the hdfs site configuration file. | ||
@@ -12,5 +16,6 @@ | ||
The following environment variables are used to configure the tests: | ||
- `HDFS_USERNAME` -- your username on the HDFS cluster (default: `ryan`) | ||
- `HDFS_NAMENODE_1` -- hostname of your primary namenode (default: `localhost`) | ||
- `HDFS_NAMENODE_2` -- hostname of your secondary namenode (default: `localhost`) | ||
- `HDFS_BASE_PATH` -- directory in which to conduct tests (default: `/user/$HDFS_USERNAME`) | ||
- `HDFS_USERNAME` -- your username on the HDFS cluster (default: `ryan`) | ||
- `HDFS_NAMENODE_1` -- hostname of your primary namenode (default: `localhost`) | ||
- `HDFS_NAMENODE_2` -- hostname of your secondary namenode (default: `localhost`) | ||
- `HDFS_BASE_PATH` -- directory in which to conduct tests (default: `/user/$HDFS_USERNAME`) |
@@ -7,6 +7,6 @@ var _ = require('lodash'); | ||
var WebHDFSClient = exports.WebHDFSClient = function (options) { | ||
// save specified options | ||
this.options = _.defaults(options || {}, { | ||
user: 'webuser', | ||
@@ -16,3 +16,4 @@ namenode_port: 50070, | ||
path_prefix: '/webhdfs/v1', | ||
high_availability: false | ||
high_availability: false, | ||
default_backoff_period_ms: 500 | ||
}); | ||
@@ -23,5 +24,9 @@ | ||
} | ||
// Set up the toggles for backoff and namenode switching | ||
this._backoff_period_ms = this.options.default_backoff_period_ms; | ||
this._switchedNameNodeClient = false; | ||
// save formatted base api url | ||
this.base_url = 'http://' + this.options.namenode_host + ':' + this.options.namenode_port + this.options.path_prefix; | ||
}; | ||
@@ -33,33 +38,79 @@ | ||
WebHDFSClient.prototype._changeNameNodeHost = function () { | ||
var host = this.options.namenode_host; | ||
var list = this.options.namenode_list; | ||
var index = list.indexOf(host) + 1; | ||
//if empty start from the beginning of the | ||
this.options.namenode_host = list[index] ? list[index] : list[0]; | ||
this._makeBaseUrl(); | ||
// Set up a toggle to determine if the backoff needs to be increased | ||
WebHDFSClient.prototype._switchedNameNodeClient = false; | ||
WebHDFSClient.prototype._changeNameNodeHost = function (callback) { | ||
// If the last operation resulted in a changed NameNode, double the current backoff | ||
if (this._switchedNameNodeClient) { | ||
this._backoff_period_ms = this._backoff_period_ms * 2; | ||
} else { | ||
this._switchedNameNodeClient = true; | ||
} | ||
// Wait the backoff period | ||
setTimeout(() => { | ||
var host = this.options.namenode_host; | ||
var list = this.options.namenode_list; | ||
var index = list.indexOf(host) + 1; | ||
//if empty start from the beginning of the | ||
this.options.namenode_host = list[index] ? list[index] : list[0]; | ||
this._makeBaseUrl(); | ||
return callback(); | ||
}, this.backoff_period_ms) | ||
}; | ||
function _parseResponse(self, fnName, args, bodyArgs, callback, justCheckErrors){ | ||
var _parseResponse = exports._parseResponse = function (self, fnName, args, bodyArgs, callback, justCheckErrors){ | ||
// forward request error | ||
return function(error, response, body) { | ||
if (error) return callback(error); | ||
// If a namenode process dies the connection will be refused, and if the namenode's server | ||
// completely dies, it will be inaccessible from this client, even if there is a successful | ||
// failover. Assumes namenodes provided in config are actual namenodes. | ||
if (error) { | ||
if (error.code === 'ECONNREFUSED' || error.code === 'ENOTFOUND') { | ||
if (self.options.high_availability){ | ||
//change client | ||
self._changeNameNodeHost(function () { | ||
return self[fnName].apply(self, args); | ||
}); | ||
return true; | ||
} | ||
else { | ||
callback(error); | ||
return true; | ||
} | ||
} | ||
} | ||
if (error) { | ||
callback(error); | ||
return true; | ||
} | ||
// exception handling | ||
if (typeof body === 'object' && 'RemoteException' in body) { | ||
if(self.options.high_availability && body.RemoteException.exception === 'StandbyException'){ | ||
if (self.options.high_availability && body.RemoteException.exception === 'StandbyException'){ | ||
//change client | ||
self._changeNameNodeHost(); | ||
return self[fnName].apply(self, args) | ||
self._changeNameNodeHost(function () { | ||
return self[fnName].apply(self, args); | ||
}); | ||
return true; | ||
//change client | ||
} | ||
else { | ||
return callback(new RemoteException(body)); | ||
callback(new RemoteException(body)); | ||
return true; | ||
} | ||
} | ||
// Reset Namenode switch toggle and backoff period if the operation is successful and the | ||
// toggle had been on | ||
if (self._switchedNameNodeClient) { | ||
self._switchedNameNodeClient = false; | ||
self._backoff_period_ms = self.options.default_backoff_period_ms; | ||
} | ||
if (justCheckErrors) { | ||
return | ||
return false; | ||
} | ||
// execute callback | ||
return callback(null, _.get(body, bodyArgs, body)); | ||
callback(null, _.get(body, bodyArgs, body)); | ||
return true; | ||
} | ||
@@ -71,3 +122,3 @@ } | ||
WebHDFSClient.prototype.del = function (path, hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -84,7 +135,7 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
} | ||
var self = this; | ||
var originalArgs = [path, hdfsoptions, requestoptions, callback]; | ||
var parseResponse = _parseResponse(self, 'del', originalArgs, 'boolean', callback); | ||
// format request args | ||
@@ -99,6 +150,4 @@ var args = _.defaults({ | ||
}, requestoptions || {}); | ||
// send http request | ||
request.del(args, parseResponse); | ||
}; | ||
@@ -109,3 +158,3 @@ | ||
WebHDFSClient.prototype.listStatus = function (path, hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -135,6 +184,4 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
}, requestoptions || {}); | ||
// send http request | ||
request.get(args, parseResponse) | ||
}; | ||
@@ -145,3 +192,3 @@ | ||
WebHDFSClient.prototype.getFileStatus = function (path, hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -158,3 +205,3 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
} | ||
var self = this; | ||
@@ -172,6 +219,4 @@ var originalArgs = [path, hdfsoptions, requestoptions, callback]; | ||
}, requestoptions || {}); | ||
// send http request | ||
request.get(args, parseResponse); | ||
}; | ||
@@ -182,3 +227,3 @@ | ||
WebHDFSClient.prototype.getContentSummary = function (path, hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -195,3 +240,3 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
} | ||
var self = this; | ||
@@ -209,6 +254,4 @@ var originalArgs = [path, hdfsoptions, requestoptions, callback]; | ||
}, requestoptions || {}); | ||
// send http request | ||
request.get(args, parseResponse); | ||
}; | ||
@@ -219,3 +262,3 @@ | ||
WebHDFSClient.prototype.getFileChecksum = function (path, hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -236,3 +279,3 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
var parseResponse = _parseResponse(self, 'getFileChecksum', originalArgs, 'FileChecksum', callback); | ||
// format request args | ||
@@ -246,3 +289,2 @@ var args = _.defaults({ | ||
}, requestoptions || {}); | ||
// send http request | ||
@@ -255,3 +297,3 @@ request.get(args, parseResponse); | ||
WebHDFSClient.prototype.getHomeDirectory = function (hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -268,7 +310,7 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
} | ||
var self = this; | ||
var originalArgs = [hdfsoptions, requestoptions, callback]; | ||
var parseResponse = _parseResponse(self, 'getHomeDirectory', originalArgs, 'Path', callback); | ||
// format request args | ||
@@ -283,6 +325,4 @@ var args = _.defaults({ | ||
}, requestoptions || {}); | ||
// send http request | ||
request.get(args, parseResponse); | ||
}; | ||
@@ -292,3 +332,3 @@ | ||
WebHDFSClient.prototype.open = function (path, hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -317,6 +357,4 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
}, requestoptions || {}); | ||
// send http request | ||
return request.get(args, parseResponse); | ||
}; | ||
@@ -327,3 +365,3 @@ | ||
WebHDFSClient.prototype.rename = function (path, destination, hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -344,3 +382,3 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
var parseResponse = _parseResponse(self, 'rename', originalArgs, 'boolean', callback); | ||
// format request args | ||
@@ -356,6 +394,4 @@ var args = _.defaults({ | ||
}, requestoptions || {}); | ||
// send http request | ||
request.put(args, parseResponse); | ||
}; | ||
@@ -382,3 +418,3 @@ | ||
var parseResponse = _parseResponse(self, 'mkdirs', originalArgs, 'boolean', callback); | ||
// generate query string | ||
@@ -393,6 +429,4 @@ var args = _.defaults({ | ||
}, requestoptions || {}); | ||
// send http request | ||
request.put(args, parseResponse); | ||
}; | ||
@@ -403,3 +437,3 @@ | ||
WebHDFSClient.prototype.append = function (path, data, hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -420,6 +454,5 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
var parseResponse = _parseResponse(self, 'append', originalArgs, null, callback, true); | ||
// format request args | ||
var args = _.defaults({ | ||
json: true, | ||
@@ -432,10 +465,8 @@ followRedirect: false, | ||
}, hdfsoptions || {}) | ||
}, requestoptions || {}); | ||
// send http request | ||
request.post(args, function (error, response, body) { | ||
parseResponse(error, response, body); | ||
if (error) { | ||
var handled = parseResponse(error, response, body); | ||
if (handled) { | ||
// callback already called | ||
@@ -455,9 +486,8 @@ return; | ||
}, requestoptions || {}); | ||
// send http request | ||
request.post(args, function (error, response, body) { | ||
// forward request error | ||
parseResponse(error, response, body); | ||
if (error) { | ||
var handled = parseResponse(error, response, body); | ||
if (handled) { | ||
return; | ||
@@ -472,3 +502,2 @@ } | ||
}); | ||
} else { | ||
@@ -483,3 +512,3 @@ return callback(new Error('expected redirect')); | ||
WebHDFSClient.prototype.create = function (path, data, hdfsoptions, requestoptions, callback) { | ||
// requestoptions may be omitted | ||
@@ -500,6 +529,5 @@ if (callback === undefined && typeof(requestoptions) === 'function') { | ||
var parseResponse = _parseResponse(self, 'create', originalArgs, null, callback, true); | ||
// generate query string | ||
var args = _.defaults({ | ||
json: true, | ||
@@ -509,12 +537,9 @@ followRedirect: false, | ||
qs: _.defaults({ | ||
op: 'create', | ||
'user.name': this.options.user | ||
}, hdfsoptions || {}) | ||
}, requestoptions || {}); | ||
// send http request | ||
request.put(args, function (error, response, body) { | ||
// forward request error | ||
@@ -532,3 +557,2 @@ parseResponse(error, response, body); | ||
else if (response.statusCode == 307) { | ||
// generate query string | ||
@@ -539,6 +563,5 @@ args = _.defaults({ | ||
}, requestoptions || {}); | ||
// send http request | ||
request.put(args, function (error, response, body) { | ||
// forward request error | ||
@@ -557,9 +580,6 @@ parseResponse(error, response, body); | ||
}); | ||
} else { | ||
return callback(new Error('expected redirect')); | ||
} | ||
}); | ||
}; |
const username = process.env.HDFS_USERNAME || 'ryan'; | ||
const endpoint1 = process.env.HDFS_NAMENODE_1 || 'localhost'; | ||
const endpoint2 = process.env.HDFS_NAMENODE_2 || '127.0.0.1'; | ||
const endpoint1 = process.env.HDFS_NAMENODE_1 || 'namenode1.lan'; | ||
const endpoint2 = process.env.HDFS_NAMENODE_2 || 'namenode2.lan'; | ||
const homeDir = `/user/${username}`; | ||
const basePath = process.env.HDFS_BASE_PATH || homeDir; | ||
const nodeOneBase = `http://${endpoint1}:50070`; | ||
const nodeTwoBase = `http://${endpoint2}:50070`; | ||
// endpoint defaults are written differently to verify switching | ||
var should = require('should'); | ||
// This is only needed for testing the response parser | ||
const webhdfs = require('..'); | ||
const should = require('should'); | ||
const nock = require('nock'); | ||
describe('WebHDFSClient', function () { | ||
// never used for actual connection | ||
var oneNodeClient = new (require('..')).WebHDFSClient({ | ||
const oneNodeClient = new (require('..')).WebHDFSClient({ | ||
namenode_host: endpoint1 | ||
}); | ||
var twoNodeClient = new (require('..')).WebHDFSClient({ | ||
const twoNodeClient = new (require('..')).WebHDFSClient({ | ||
user: username, | ||
@@ -24,226 +29,305 @@ namenode_host: endpoint1, | ||
describe('change endpoint', function () { | ||
// Set up a noop for testing puporses | ||
twoNodeClient._noop = function (callback) { | ||
if (callback) return callback(); | ||
// Adding this to test the namenode switch inside the response parser | ||
return twoNodeClient._checkParsedResponse(); | ||
}; | ||
it('should set high_availability to false if a list is not provided', function (done) { | ||
oneNodeClient.should.have.property('base_url', 'http://' + endpoint1 + ':50070/webhdfs/v1'); | ||
oneNodeClient.options.should.have.property('high_availability', false); | ||
it('should set high_availability to false if a list is not provided', function (done) { | ||
oneNodeClient.should.have.property('base_url', nodeOneBase + '/webhdfs/v1'); | ||
oneNodeClient.options.should.have.property('high_availability', false); | ||
return done() | ||
}); | ||
return done() | ||
}); | ||
it('should change endpoint if a list is provided', function (done) { | ||
twoNodeClient.should.have.property('base_url', 'http://' + endpoint1 + ':50070/webhdfs/v1'); | ||
describe('high-availability capability', function () { | ||
it('should detect the first failover', function (done) { | ||
function checkBackOffSettings() { | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.should.have.property('base_url', nodeTwoBase + '/webhdfs/v1'); | ||
return done() | ||
} | ||
twoNodeClient.should.have.property('base_url', nodeOneBase + '/webhdfs/v1'); | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', false); | ||
twoNodeClient.options.should.have.property('high_availability', true); | ||
twoNodeClient._changeNameNodeHost(); | ||
twoNodeClient.should.have.property('base_url', 'http://' + endpoint2 + ':50070/webhdfs/v1'); | ||
return done() | ||
twoNodeClient.options.should.have.property('default_backoff_period_ms', 500); | ||
twoNodeClient._changeNameNodeHost(checkBackOffSettings); | ||
}); | ||
}); | ||
describe('#getHomeDirectory()', function () { | ||
it('should get the home directory', function (done) { | ||
twoNodeClient.getHomeDirectory(function (err, status) { | ||
should.not.exist(err); | ||
should.exist(status); | ||
status.should.eql(homeDir); | ||
return done(); | ||
}); | ||
it('should increase backoff exponentially for repeated failovers', function (done) { | ||
function checkBackOffSettings() { | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 1000); | ||
twoNodeClient.should.have.property('base_url', nodeOneBase + '/webhdfs/v1'); | ||
return done() | ||
} | ||
twoNodeClient._switchedNameNodeClient = true; | ||
// twoNodeClient.base_url = nodeOneBase + ':50070/webhdfs/v1'; | ||
twoNodeClient.should.have.property('base_url', nodeTwoBase + '/webhdfs/v1'); | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.options.should.have.property('high_availability', true); | ||
// return done() | ||
twoNodeClient._changeNameNodeHost(checkBackOffSettings); | ||
}); | ||
}); | ||
describe('#mkdirs', function () { | ||
it('should return `true` if the directory was created', function (done) { | ||
twoNodeClient.mkdirs(basePath + '/test', function (err, success) { | ||
should.not.exist(err); | ||
should.exist(success); | ||
success.should.be.true; | ||
describe('_parseResponse', function () { | ||
it('executes a provided callback and resets backoff settings when there are no errors', function (done) { | ||
function checkParsedResponse (nullVal, bodyVal) { | ||
should(nullVal).be.exactly(null); | ||
should(bodyVal).be.exactly('ayo'); | ||
twoNodeClient.should.have.property('base_url', nodeOneBase + '/webhdfs/v1'); | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', false); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 500); | ||
return done(); | ||
}); | ||
} | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.should.have.property('base_url', nodeOneBase + '/webhdfs/v1'); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 1000); | ||
const parseResponse = webhdfs._parseResponse(twoNodeClient, '_noop', undefined, 'val1', checkParsedResponse, false) | ||
parseResponse(undefined, undefined, {val1: 'ayo'}); | ||
}); | ||
}); | ||
describe('#getFileStatus()', function () { | ||
it('should return information about the directory', function (done) { | ||
twoNodeClient.getFileStatus(basePath + '/test', function (err, status) { | ||
should.not.exist(err); | ||
should.exist(status); | ||
status.should.have.property('type', 'DIRECTORY'); | ||
return done(); | ||
}); | ||
it('just checks for errors', function (done) { | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', false); | ||
twoNodeClient.should.have.property('base_url', nodeOneBase + '/webhdfs/v1'); | ||
const parseResponse = webhdfs._parseResponse(twoNodeClient, undefined, undefined, undefined, undefined, true) | ||
should(parseResponse(undefined, undefined, undefined)).be.exactly(false); | ||
return done() | ||
}); | ||
}); | ||
describe('#create()', function () { | ||
it('should return the path to the new file', function (done) { | ||
twoNodeClient.create(basePath + '/test/foo.txt', 'foo bar', function (err, path) { | ||
should.not.exist(err); | ||
should.exist(path); | ||
it('handles an hdfs StandbyException properly', function (done) { | ||
twoNodeClient._checkParsedResponse = function () { | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 500); | ||
twoNodeClient.should.have.property('base_url', nodeTwoBase + '/webhdfs/v1'); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
describe('#append()', function () { | ||
it('should add to the file', function (done) { | ||
twoNodeClient.append(basePath + '/test/foo.txt', ' baz', function (err, path) { | ||
should.not.exist(err); | ||
should.exist(path); | ||
} | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', false); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 500); | ||
const parseResponse = webhdfs._parseResponse(twoNodeClient, '_noop', undefined, undefined, undefined, false) | ||
const result = parseResponse(undefined, undefined, {RemoteException: {exception: 'StandbyException'}}); | ||
should(result).be.exactly(true); | ||
}) | ||
it('handles a generic error properly', function (done) { | ||
function checkParsedResponse(error) { | ||
should(error).be.exactly(42); | ||
return done() | ||
} | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.should.have.property('base_url', nodeTwoBase + '/webhdfs/v1'); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 500); | ||
const parseResponse = webhdfs._parseResponse(twoNodeClient, undefined, undefined, undefined, checkParsedResponse, false) | ||
parseResponse(42, undefined, undefined); | ||
}) | ||
it('handles a refused connection properly (symptom of dead active namenode process)', function (done) { | ||
twoNodeClient._checkParsedResponse = function () { | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 1000); | ||
twoNodeClient.should.have.property('base_url', nodeOneBase + '/webhdfs/v1'); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
describe('#rename()', function () { | ||
it('should return `true` if the file was renamed', function (done) { | ||
twoNodeClient.rename(basePath + '/test/foo.txt', basePath + '/test/bar.txt', function (err, success) { | ||
should.not.exist(err); | ||
should.exist(success); | ||
success.should.be.true; | ||
} | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 500); | ||
const parseResponse = webhdfs._parseResponse(twoNodeClient, '_noop', undefined, undefined, undefined, false) | ||
const result = parseResponse({code: 'ECONNREFUSED'}, undefined, undefined); | ||
should(result).be.exactly(true); | ||
}) | ||
it('handles a not found connection error properly (symptom of dead active namenode server)', function (done) { | ||
twoNodeClient._checkParsedResponse = function () { | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 2000); | ||
twoNodeClient.should.have.property('base_url', nodeTwoBase + '/webhdfs/v1'); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
describe('#getContentSummary()', function () { | ||
it('should return summary of directory content', function (done) { | ||
twoNodeClient.getContentSummary(basePath + '/test', function (err, summary) { | ||
should.not.exist(err); | ||
should.exist(summary); | ||
summary.should.have.property('fileCount', 1); | ||
} | ||
twoNodeClient.should.have.property('_switchedNameNodeClient', true); | ||
twoNodeClient.should.have.property('_backoff_period_ms', 1000); | ||
const parseResponse = webhdfs._parseResponse(twoNodeClient, '_noop', undefined, undefined, undefined, false) | ||
const result = parseResponse({code: 'ENOTFOUND'}, undefined, undefined); | ||
should(result).be.exactly(true); | ||
}) | ||
it('handles a not found connection error properly (when HA is not configured)', function (done) { | ||
function checkParsedResponse (error) { | ||
should(error.code).be.exactly('ENOTFOUND'); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
describe('#listStatus()', function () { | ||
it('should list files in a directory', function (done) { | ||
twoNodeClient.listStatus(basePath + '/test', function (err, status) { | ||
should.not.exist(err); | ||
should.exist(status); | ||
status.map(f => f.pathSuffix).should.containEql('bar.txt'); | ||
} | ||
const parseResponse = webhdfs._parseResponse(oneNodeClient, '_noop', undefined, undefined, checkParsedResponse, false) | ||
const result = parseResponse({code: 'ENOTFOUND'}, undefined, undefined); | ||
should(result).be.exactly(true); | ||
}) | ||
it('handles a generic "RemmoteException" properly', function (done) { | ||
function checkParsedResponse (error) { | ||
should(typeof(error)).be.exactly('object') | ||
return done(); | ||
}); | ||
}); | ||
} | ||
const parseResponse = webhdfs._parseResponse(oneNodeClient, undefined, undefined, undefined, checkParsedResponse, false) | ||
const result = parseResponse(undefined, undefined, {RemoteException: 'Error!'}); | ||
should(result).be.exactly(true); | ||
}) | ||
}); | ||
describe('#getFileChecksum()', function () { | ||
it('should return a file checksum', function (done) { | ||
twoNodeClient.getFileChecksum(basePath + '/test/bar.txt', function (err, checksum) { | ||
describe('hdfs "delete"', function () { | ||
it('works properly', function (done) { | ||
const filePath = '/test/file'; | ||
const qs = '?op=delete&user.name=webuser'; | ||
const namenodeRequest= nock(nodeOneBase) | ||
.delete(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(200, {boolean: true}) | ||
oneNodeClient.del(filePath, function (err, response) { | ||
should.not.exist(err); | ||
should.exist(checksum); | ||
checksum.should.have.property('algorithm', 'MD5-of-0MD5-of-512CRC32C'); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
describe('#open()', function () { | ||
it('should return the files content', function (done) { | ||
twoNodeClient.open(basePath + '/test/bar.txt', function (err, data) { | ||
should(response).be.exactly(true) | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "listStatus"', function () { | ||
it('works properly', function (done) { | ||
const fileStatus = '{"FileStatuses": {"FileStatus": [{"some": "file details"},{"some": "other file details"}]}}' | ||
const filePath = '/test/file'; | ||
const qs = '?op=liststatus'; | ||
const namenodeRequest= nock(nodeOneBase) | ||
.get(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(200, fileStatus) | ||
oneNodeClient.listStatus(filePath, function (err, response) { | ||
should.not.exist(err); | ||
should.exist(data); | ||
data.should.eql('foo bar baz'); | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
describe('#del()', function () { | ||
it('should return `true` if the directory was deleted', function (done) { | ||
twoNodeClient.del(basePath + '/test', { recursive: true }, function (err, success) { | ||
should(response.length).be.exactly(2) | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "getFileStatus"', function () { | ||
it('works properly', function (done) { | ||
const fileStatus = '{"FileStatus": {"accessTime": 42}}' | ||
const filePath = '/test/file'; | ||
const qs = '?op=getfilestatus'; | ||
const namenodeRequest = nock(nodeOneBase) | ||
.get(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(200, fileStatus) | ||
oneNodeClient.getFileStatus(filePath, function (err, response) { | ||
should.not.exist(err); | ||
should.exist(success); | ||
success.should.be.true; | ||
return done(); | ||
}); | ||
}); | ||
}); | ||
should(response.accessTime).be.exactly(42) | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "getContentSummary"', function () { | ||
it('works properly', function (done) { | ||
const contentSummary = '{"ContentSummary": {"directoryCount": 7}}' | ||
const filePath = '/test/file'; | ||
const qs = '?op=getcontentsummary'; | ||
const namenodeRequest = nock(nodeOneBase) | ||
.get(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(200, contentSummary) | ||
oneNodeClient.getContentSummary(filePath, function (err, response) { | ||
should.not.exist(err); | ||
should(response.directoryCount).be.exactly(7) | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "getFileChecksum"', function () { | ||
it('works properly', function (done) { | ||
const fileChecksum = '{"FileChecksum": {"length": 42}}' | ||
const filePath = '/test/file'; | ||
const qs = '?op=getfilechecksum'; | ||
const namenodeRequest = nock(nodeOneBase) | ||
.get(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(200, fileChecksum) | ||
oneNodeClient.getFileChecksum(filePath, function (err, response) { | ||
should.not.exist(err); | ||
should(response.length).be.exactly(42) | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "getHomeDirectory"', function () { | ||
it('works properly', function (done) { | ||
const homeDirectory = '{"Path": "/user/username"}' | ||
const urlPath = '/webhdfs/v1?op=gethomedirectory&user.name=webuser'; | ||
const namenodeRequest = nock(nodeOneBase) | ||
.get(urlPath) | ||
.reply(200, homeDirectory) | ||
oneNodeClient.getHomeDirectory(function (err, response) { | ||
should.not.exist(err); | ||
should(response).be.exactly('/user/username') | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "open"', function () { | ||
it('works properly', function (done) { | ||
const filePath = '/test/file' | ||
const qs = '?op=open'; | ||
const namenodeRequest = nock(nodeOneBase) | ||
.get(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(200, 'file contents!!') | ||
oneNodeClient.open(filePath, function (err, status) { | ||
should.not.exist(err); | ||
should(status).be.exactly('file contents!!') | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "rename"', function () { | ||
it('works properly', function (done) { | ||
const filePath = '/test/file'; | ||
const newFilePath = '/new/test/file'; | ||
const namenodeRequest = nock(nodeOneBase) | ||
.put('/webhdfs/v1/test/file?op=rename&destination=%2Fnew%2Ftest%2Ffile&user.name=webuser') | ||
.reply(200, {boolean: true}) | ||
oneNodeClient.rename(filePath, newFilePath, function (err, status) { | ||
should.not.exist(err); | ||
should(status).be.exactly(true) | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "mkdirs"', function () { | ||
it('works properly', function (done) { | ||
const newDir = '/new/directory' | ||
const qs = '?op=mkdirs&user.name=webuser'; | ||
const namenodeRequest = nock(nodeOneBase) | ||
.put(`/webhdfs/v1${newDir}${qs}`) | ||
.reply(200, {boolean: true}) | ||
oneNodeClient.mkdirs(newDir, function (err, response) { | ||
should.not.exist(err); | ||
should(response).be.exactly(true) | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "append"', function () { | ||
const datanode = 'http://datanode1.lan:50070' | ||
const filePath = '/test/file' | ||
const appendData = 'some append data' | ||
const qs = '?op=append&user.name=webuser'; | ||
it('follows redirect properly', function (done) { | ||
const namenodeRequest = nock(nodeOneBase) | ||
.post(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(307, '', {location: `${datanode}/webhdfs/v1${filePath}${qs}`}) | ||
const datanodeRequest = nock(datanode) | ||
.post(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(200, true) | ||
oneNodeClient.append(filePath, appendData, function (err, response) { | ||
should.not.exist(err); | ||
should(response).be.exactly(true) | ||
return done() | ||
}) | ||
}) | ||
}) | ||
describe('hdfs "create"', function () { | ||
const datanode = 'http://datanode1.lan:50070' | ||
const filePath = '/test/file' | ||
const appendData = 'some append data' | ||
const qs = '?op=create&user.name=webuser'; | ||
it('follows redirect properly', function (done) { | ||
const namenodeRequest = nock(nodeOneBase) | ||
.put(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(307, '', {location: `${datanode}/webhdfs/v1${filePath}${qs}`}) | ||
const datanodeRequest = nock(datanode) | ||
.put(`/webhdfs/v1${filePath}${qs}`) | ||
.reply(201, '') | ||
oneNodeClient.create(filePath, appendData, function (err, response) { | ||
should.not.exist(err); | ||
should(response).be.exactly(undefined) | ||
return done() | ||
}) | ||
}) | ||
}) | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
38380
801
0
21
3