trooba-hystrix-handler
Advanced tools
Comparing version 1.0.5 to 1.0.6
@@ -0,2 +1,5 @@ | ||
## v1.0.6 | ||
* Fixed: should handling streams without swallowing data chunks | ||
## v1.0.5 | ||
* Fixed: should preserve order when handling streaming data |
46
index.js
@@ -30,3 +30,3 @@ 'use strict'; | ||
// trooba pipeline request flow | ||
pipe.once('request', (request, next) => { | ||
pipe.on('request', (request, next) => { | ||
// pass pipe reference to the command run function | ||
@@ -39,11 +39,12 @@ serviceCommand.execute({ | ||
.then(response => { | ||
if (typeof response === 'function') { | ||
// continue existing response flow with next | ||
response(); | ||
return; | ||
if (response.next) { | ||
return response.next(); | ||
} | ||
// in case of fallback, we need to form the response again | ||
// start only fallback response here | ||
pipe.respond(response); | ||
}) | ||
.catch(err => pipe.throw(err)); | ||
.catch(err => { | ||
// start only open circuit errors here | ||
!err.skip && pipe.throw(err); | ||
}); | ||
}); | ||
@@ -64,4 +65,31 @@ }; | ||
return new Promise((resolve, reject) => { | ||
ctx.pipe.once('response', (response, next) => resolve(next)); | ||
ctx.pipe.once('error', reject); | ||
ctx.pipe.removeListener('response'); | ||
ctx.pipe.removeListener('response:data'); | ||
ctx.pipe.removeListener('error'); | ||
ctx.pipe.once('response', (response, next) => { | ||
// record hystrix success | ||
resolve({ | ||
next: next | ||
}); | ||
}); | ||
// use it to decide if we can still do fallback when deal with stream data; | ||
ctx.pipe.once('response:data', (data, next) => { | ||
ctx.pipe.context.fallback = undefined; | ||
next(); | ||
}); | ||
ctx.pipe.once('error', (err, next) => { | ||
// record rejection in hystrix | ||
err.skip = true; | ||
reject(err); | ||
// allow err to get recorded in hystrix so we can react on it in the | ||
// handler down the response pipe | ||
setImmediate(() => { | ||
// if fallback is not specified or deleted after data flush | ||
if (!ctx.pipe.context.fallback) { | ||
// continue pipe flow if needed | ||
next(); | ||
} | ||
}); | ||
}); | ||
ctx.next(); | ||
@@ -68,0 +96,0 @@ }); |
{ | ||
"name": "trooba-hystrix-handler", | ||
"version": "1.0.5", | ||
"version": "1.0.6", | ||
"description": "The handler provides hystrix functionality to trooba request/response pipeline", | ||
@@ -14,2 +14,5 @@ "main": "index.js", | ||
}, | ||
"publishConfig": { | ||
"registry": "http://registry.npmjs.org" | ||
}, | ||
"keywords": [ | ||
@@ -39,4 +42,4 @@ "hystrix", | ||
"supertest": "^3.0.0", | ||
"trooba": "^2.0.2" | ||
"trooba": "^2.1.2" | ||
} | ||
} |
trooba-hystrix-handler | ||
====================== | ||
[![Greenkeeper badge](https://badges.greenkeeper.io/trooba/trooba-hystrix-handler.svg)](https://greenkeeper.io/) | ||
Trooba handler that provides Hystrix functionality to [trooba](https://github.com/trooba/trooba) based service pipelines. For more details on this topic, please read these blog posts [Part 1](https://medium.com/@dimichmm/building-resilient-platform-part-1-51b852588fb3) and [Part 2](https://medium.com/@dimichmm/building-resilient-platform-part-2-509c9550617d). | ||
@@ -5,0 +7,0 @@ |
@@ -93,2 +93,46 @@ 'use strict'; | ||
describe('retry', () => { | ||
it('should handle retry logic', next => { | ||
var requestCounter = 0; | ||
const pipe = Trooba | ||
.use(pipe => { | ||
var count = 0; | ||
var _request; | ||
pipe.on('request', (request, next) => { | ||
_request = request; | ||
next(); | ||
}); | ||
pipe.on('response', (response, next) => { | ||
if (count++ < 1) { | ||
pipe.request(_request); | ||
return; | ||
} | ||
next(); | ||
}); | ||
}) | ||
.use(handler, { | ||
command: 'foo' | ||
}) | ||
.use(pipe => { | ||
pipe.on('request', request => { | ||
requestCounter++; | ||
pipe.respond(request); | ||
}); | ||
}) | ||
.build(); | ||
pipe.create().request('hello', (err, response) => { | ||
if (err) { | ||
next(err); | ||
return; | ||
} | ||
Assert.ok(!err, err && err.stack); | ||
Assert.equal('hello', response); | ||
Assert.equal(2, requestCounter); | ||
next(); | ||
}); | ||
}); | ||
}); | ||
describe('fallback', () => { | ||
@@ -556,3 +600,3 @@ it('should return error when no fallback available', next => { | ||
metrics.forEach(metric => { | ||
Assert.ok(metric.errorPercentage <= 30 && metric.errorPercentage >= 5, `Actual value ${metric.errorPercentage}`); | ||
Assert.ok(metric.errorPercentage <= 35 && metric.errorPercentage >= 5, `Actual value ${metric.errorPercentage}`); | ||
Assert.equal(100, metric.requestCount, `Actual ${metric}`); | ||
@@ -567,2 +611,37 @@ }); | ||
describe('streaming', () => { | ||
it('should handle response stream', next => { | ||
const pipe = Trooba.use(handler, { | ||
command: 'foo' | ||
}) | ||
.use(pipe => { | ||
pipe.on('request', request => { | ||
pipe.streamResponse(request) | ||
.write('data1') | ||
.write('data2') | ||
.end(); | ||
}); | ||
}) | ||
.build(); | ||
let _response; | ||
const _data = []; | ||
pipe.create().request('hello') | ||
.on('error', next) | ||
.on('response', (response, next) => { | ||
_response = response; | ||
next(); | ||
}) | ||
.on('response:data', (data, next) => { | ||
_data.push(data); | ||
next(); | ||
}) | ||
.on('response:end', () => { | ||
Assert.equal('hello', _response); | ||
Assert.deepEqual(['data1', 'data2', undefined], _data); | ||
next(); | ||
}); | ||
}); | ||
it('should handle stream data and preserve data order', next => { | ||
@@ -613,2 +692,252 @@ const pipe = Trooba | ||
}); | ||
it('should catch error', done => { | ||
const pipe = Trooba | ||
.use(handler, { | ||
command: 'foo' | ||
}) | ||
.use(pipe => { | ||
pipe.on('response:data', (data, next) => { | ||
if (data === 'data2') { | ||
pipe.throw(new Error('Boom')); | ||
return; | ||
} | ||
next(); | ||
}); | ||
}) | ||
.use(pipe => { | ||
pipe.on('request', request => { | ||
pipe.streamResponse(request) | ||
.write('data1') | ||
.write('data2') | ||
.end(); | ||
}); | ||
}) | ||
.build(); | ||
let _response; | ||
const _data = []; | ||
let _err; | ||
pipe.create().request('hello') | ||
.on('error', err => { | ||
_err = err; | ||
Assert.ok(_err); | ||
Assert.equal('Boom', _err.message); | ||
Assert.equal('hello', _response); | ||
Assert.deepEqual(['data1'], _data); | ||
done(); | ||
}) | ||
.on('response', (response, next) => { | ||
_response = response; | ||
next(); | ||
}) | ||
.on('response:data', (data, next) => { | ||
_data.push(data); | ||
next(); | ||
}) | ||
.on('response:end', () => done(new Error('Should never happen'))); | ||
}); | ||
it('should catch timeout error', done => { | ||
const pipe = Trooba.use(handler, { | ||
command: 'foo1', | ||
timeout: 1 | ||
}) | ||
.use(pipe => { | ||
pipe.on('request', request => { | ||
}); | ||
}) | ||
.build(); | ||
pipe.create().request('hello') | ||
.on('error', err => { | ||
Assert.ok(err); | ||
Assert.equal('CommandTimeOut', err.message); | ||
done(); | ||
}); | ||
}); | ||
it('should catch error and stop the pipe execution at this point', done => { | ||
const pipe = Trooba | ||
.use(handler, { | ||
command: 'foo' | ||
}) | ||
.use(pipe => { | ||
pipe.on('response:data', (data, next) => { | ||
if (data === 'data1') { | ||
return next(); | ||
} | ||
if (data === 'data2') { | ||
pipe.throw(new Error('Boom')); | ||
} | ||
}); | ||
}) | ||
.use(pipe => { | ||
pipe.on('request', request => { | ||
pipe.streamResponse(request) | ||
.write('data1') | ||
.write('data2') | ||
.end(); | ||
}); | ||
}) | ||
.build(); | ||
let _response; | ||
const _data = []; | ||
let _err; | ||
pipe.create().request('hello') | ||
.on('error', err => { | ||
_err = err; | ||
Assert.ok(_err); | ||
Assert.equal('Boom', _err.message); | ||
Assert.equal('hello', _response); | ||
Assert.deepEqual(['data1'], _data); | ||
done(); | ||
}) | ||
.on('response', (response, next) => { | ||
_response = response; | ||
next(); | ||
}) | ||
.on('response:data', (data, next) => { | ||
_data.push(data); | ||
next(); | ||
}) | ||
.on('response:end', () => done(new Error('Should never happen'))); | ||
}); | ||
it('should catch error and stop the pipe execution at this point and ignore fallback as stream has been flushed already', done => { | ||
const pipe = Trooba | ||
.use(handler, { | ||
command: 'foo' | ||
}) | ||
.use(pipe => { | ||
pipe.on('response:data', (data, next) => { | ||
if (data === 'data1') { | ||
return next(); | ||
} | ||
if (data === 'data2') { | ||
pipe.throw(new Error('Boom')); | ||
} | ||
}); | ||
}) | ||
.use(pipe => { | ||
pipe.on('request', request => { | ||
pipe.streamResponse(request) | ||
.write('data1') | ||
.write('data2') | ||
.end(); | ||
}); | ||
}) | ||
.build({ | ||
fallback: (err, request) => { | ||
done(new Error('Should not happen')); | ||
} | ||
}); | ||
let _response; | ||
const _data = []; | ||
let _err; | ||
pipe.create().request('hello') | ||
.on('error', err => { | ||
_err = err; | ||
Assert.ok(_err); | ||
Assert.equal('Boom', _err.message); | ||
Assert.equal('hello', _response); | ||
Assert.deepEqual(['data1'], _data); | ||
done(); | ||
}) | ||
.on('response', (response, next) => { | ||
_response = response; | ||
next(); | ||
}) | ||
.on('response:data', (data, next) => { | ||
_data.push(data); | ||
next(); | ||
}) | ||
.on('response:end', () => done(new Error('Should never happen'))); | ||
}); | ||
it('should catch error and do a fallback', done => { | ||
const pipe = Trooba | ||
.use(handler, { | ||
command: 'foo' | ||
}) | ||
.use(pipe => { | ||
pipe.on('response', (data, next) => { | ||
pipe.throw(new Error('Boom')); | ||
}); | ||
}) | ||
.use(pipe => { | ||
pipe.on('request', request => { | ||
pipe.streamResponse(request) | ||
.write('data1') | ||
.write('data2') | ||
.end(); | ||
}); | ||
}) | ||
.build({ | ||
fallback: (err, request) => { | ||
Assert.ok(err); | ||
Assert.equal('Boom', err.message); | ||
return Promise.resolve('fallback'); | ||
} | ||
}); | ||
pipe.create().request('hello') | ||
.on('error', err => { | ||
done(new Error('Should not happen')); | ||
}) | ||
.on('response', (response, next) => { | ||
Assert.equal('fallback', response); | ||
done(); | ||
next(); | ||
}) | ||
.on('response:data', (data, next) => { | ||
done(new Error('Should not happen')); | ||
}) | ||
.on('response:end', () => { | ||
done(new Error('Should not happen')); | ||
}); | ||
}); | ||
it('should handle request stream', next => { | ||
const pipe = Trooba.use(handler, { | ||
command: 'foo2' | ||
}) | ||
.use(pipe => { | ||
pipe.on('request', request => { | ||
setImmediate(() => { | ||
pipe.streamResponse(request) | ||
.write('data1') | ||
.write('data2') | ||
.end(); | ||
}); | ||
}); | ||
}) | ||
.build(); | ||
let _response; | ||
const _data = []; | ||
pipe.create().streamRequest('hello') | ||
.write('data1') | ||
.end() | ||
.on('error', next) | ||
.on('response', (response, next) => { | ||
_response = response; | ||
next(); | ||
}) | ||
.on('response:data', (data, next) => { | ||
_data.push(data); | ||
next(); | ||
}) | ||
.on('response:end', () => { | ||
Assert.equal('hello', _response); | ||
Assert.deepEqual(['data1', 'data2', undefined], _data); | ||
next(); | ||
}); | ||
}); | ||
}); | ||
@@ -615,0 +944,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
47018
1036
81