openhim-mediator-file-queue
Advanced tools
Comparing version 2.0.2 to 3.0.0
{ | ||
"port": 4003, | ||
"heartbeat": true, | ||
"host": "", | ||
"endpoints": [], | ||
"log_level": "info", | ||
"statsd": { | ||
"host": "localhost", | ||
"port": 8125 | ||
}, | ||
"statsd": {}, | ||
"api": { | ||
"apiURL": "https://localhost:8080", | ||
"username": "root@openhim.org", | ||
"password": "password" | ||
"apiURL": "https://localhost:8080" | ||
}, | ||
"mediatorConf": { | ||
"urn": "urn:uuid:a15c3d48-0686-4c9b-b375-f68d2f244a33", | ||
"version": "2.0.2", | ||
"version": "3.0.0", | ||
"name": "file-queue", | ||
@@ -24,3 +21,3 @@ "description": "Async file queue mediator for the OpenHIM", | ||
"host": "localhost", | ||
"path": "/test", | ||
"path": "/workers/test", | ||
"port": "4002", | ||
@@ -31,4 +28,76 @@ "primary": true, | ||
], | ||
"configDefs": [] | ||
"configDefs": [ | ||
{ | ||
"param": "endpoints", | ||
"displayName": "Endpoints", | ||
"description": "Configure File Queue endpoints", | ||
"type": "struct", | ||
"array": true, | ||
"template": [ | ||
{ | ||
"param": "name", | ||
"displayName": "Name", | ||
"description": "The name of the endpoint which is used for setting up the RESTlike routes for the worker", | ||
"type": "string" | ||
}, | ||
{ | ||
"param": "path", | ||
"displayName": "Path", | ||
"description": "The path to use for handling incoming requests. A '*' may be used to match any number of characters in a path.", | ||
"type": "string" | ||
}, | ||
{ | ||
"param": "url", | ||
"displayName": "Upstream URL", | ||
"description": "The URL to send the files to when processing them from the queue.", | ||
"type": "string" | ||
}, | ||
{ | ||
"param": "paused", | ||
"displayName": "Paused", | ||
"description": "Whether or not the endpoint's worker should be paused by default. This must be a boolean value.", | ||
"type": "bool" | ||
}, | ||
{ | ||
"param": "parallel", | ||
"displayName": "Parallel", | ||
"description": "The number of files that the worker should process in parallel.", | ||
"type": "number" | ||
}, | ||
{ | ||
"param": "updateTx", | ||
"displayName": "UpdateTx", | ||
"description": "Whether or not to attempt to update the OpenHIM transaction once the message has been processed.", | ||
"type": "bool" | ||
}, | ||
{ | ||
"param": "forwardMetadata", | ||
"displayName": "ForwardMetadata", | ||
"description": "Whether or not to use the url, headers and HTTP method of the original request when it is forwarded. The path of the original request will be appended to the url property.", | ||
"type": "bool" | ||
}, | ||
{ | ||
"param": "disableAutoChannelManagement", | ||
"displayName": "Disable auto channel managment", | ||
"description": "If disabled, the mediator won't automatically create and update channels as endpoint config is updated.", | ||
"type": "bool" | ||
} | ||
] | ||
} | ||
], | ||
"config": { | ||
"endpoints": [ | ||
{ | ||
"name": "echoServer", | ||
"path": "/test", | ||
"url": "http://localhost:8000", | ||
"paused": false, | ||
"parallel": 2, | ||
"updateTx": true, | ||
"forwardMetadata": true, | ||
"disableAutoChannelManagement": false | ||
} | ||
] | ||
} | ||
} | ||
} |
{ | ||
"port": 4002, | ||
"heartbeat": true, | ||
"record_stats": true, | ||
"log_level": "debug", | ||
"endpoints": [ | ||
{ | ||
"name": "test", | ||
"path": "/test/*", | ||
"url": "http://localhost:9999", | ||
"name": "File Queue", | ||
"path": "/test", | ||
"url": "http://localhost:8000", | ||
"paused": false, | ||
"parallel": 2, | ||
"updateTx": true, | ||
"forwardMetadata": true | ||
} | ||
] | ||
], | ||
"api": { | ||
"apiURL": "https://localhost:8080", | ||
"username": "root@openhim.org", | ||
"password": "password" | ||
}, | ||
"statsd": { | ||
"host": "localhost", | ||
"port": 8125 | ||
} | ||
} |
229
lib/index.js
#!/usr/bin/env node | ||
'use strict'; | ||
var BodyParser = require('body-parser'); | ||
var enableDestroy = require('server-graceful-shutdown'); | ||
var Confit = require('confit'); | ||
var Crypto = require('crypto'); | ||
var Express = require('express'); | ||
var FS = require('graceful-fs'); | ||
var MUtils = require('openhim-mediator-utils'); | ||
@@ -13,139 +11,9 @@ var OnFinished = require('on-finished'); | ||
var Stats = require('./stats'); | ||
var Type = require('type-is'); | ||
var Utils = require('./utils'); | ||
var Worker = require('./worker'); | ||
var ConfigHandler = require('./configHandler'); | ||
var Winston = require('winston'); | ||
var server; | ||
var app = Express(); | ||
// Adds an extension to the filename based on the content-type request header | ||
function addExt(filename, req) { | ||
switch (Type(req, ['json', 'xml'])) { | ||
case 'json': | ||
return filename + '.json'; | ||
case 'xml': | ||
return filename + '.xml'; | ||
default: | ||
return filename + '.txt'; | ||
} | ||
} | ||
// Write metadata to file | ||
function writeMetadata(filename, path, req, callback) { | ||
var metadataFile = Utils.getMetadataFilename(filename); | ||
var metadata = { | ||
method: req.method, | ||
url: req.url, | ||
headers: req.headers | ||
}; | ||
var metadataPath = Path.join(path, metadataFile); | ||
FS.writeFile(metadataPath, JSON.stringify(metadata), function(err) { | ||
callback(err); | ||
}); | ||
} | ||
// Set up an endpoint based on the config | ||
function setUpEndpoint(endpoint, apiOpts) { | ||
var updateTx; | ||
if (endpoint.updateTx && endpoint.updateTx === true) { | ||
updateTx = true; | ||
} else { | ||
updateTx = false; | ||
} | ||
var forwardMetadata = endpoint.forwardMetadata === true; | ||
var worker = new Worker({ | ||
name: endpoint.name, | ||
url: endpoint.url, | ||
paused: endpoint.paused, | ||
parallel: endpoint.parallel, | ||
updateTx: updateTx, | ||
forwardMetadata: forwardMetadata, | ||
apiOpts: apiOpts | ||
}); | ||
// Clear the worker's queue and repopulate it | ||
app.post('/workers/' + worker.name + '/repopulate', function(req, res) { | ||
worker.repopulate(); | ||
res.status(200).send('Worker repopulated'); | ||
}); | ||
// Register an endpoint for pausing/resuming the worker | ||
app.put('/workers/' + worker.name, BodyParser.json(), function(req, res) { | ||
if (typeof req.body.paused !== 'boolean') { | ||
return res.status(400).send('Missing or invalid property: paused'); | ||
} | ||
if (req.body.paused) { | ||
worker.pause(); | ||
res.status(200).send('Worker paused'); | ||
} else { | ||
worker.resume(); | ||
res.status(200).send('Worker resumed'); | ||
} | ||
}); | ||
Winston.info('Worker for endpoint %s available at /workers/%s', endpoint.path, worker.name); | ||
function handleError(err, path) { | ||
Stats.increment('errors'); | ||
Winston.error('Handling request for %s failed', path, err); | ||
} | ||
function writeBodyAndRespond(req, res, filename, next) { | ||
var filePath = Path.join(worker.queuePath, filename); | ||
var stream = req.pipe(FS.createWriteStream(filePath)); | ||
stream.on('error', function(err) { | ||
handleError(err, endpoint.path); | ||
return next(err); | ||
}); | ||
stream.on('finish', function() { | ||
Winston.info('File saved to ./%s', Path.relative(process.cwd(), filePath)); | ||
worker.addToQueue(filename, function(err) { | ||
if (err) { | ||
Stats.increment('errors'); | ||
Winston.error(err, {path: filename}); | ||
} | ||
}); | ||
var mediatorResponse = { | ||
'x-mediator-urn': apiOpts.urn, | ||
status: 'Processing', | ||
response: { | ||
status: 202, | ||
body: 'Request added to queue\n', | ||
timestamp: new Date().toISOString() | ||
} | ||
}; | ||
res.status(202).type('application/json+openhim').send(mediatorResponse); | ||
next(); | ||
}); | ||
} | ||
// Register an endpoint for handling requests | ||
app.all(endpoint.path, function(req, res, next) { | ||
Winston.info('Handling request for %s', endpoint.path); | ||
var filename; | ||
if (req.headers['x-openhim-transactionid']) { | ||
// set file name to transaction ID | ||
filename = req.headers['x-openhim-transactionid']; | ||
} else { | ||
// Generate an invalid transaction ID | ||
filename = Crypto.randomBytes(12).toString('hex').replace(/./, 'x'); | ||
} | ||
filename = addExt(filename, req); | ||
if (forwardMetadata) { | ||
writeMetadata(filename, worker.queuePath, req, function(err) { | ||
if (err) { | ||
return handleError(err, endpoint.path); | ||
} else { | ||
return writeBodyAndRespond(req, res, filename, next); | ||
} | ||
}); | ||
} else { | ||
return writeBodyAndRespond(req, res, filename, next); | ||
} | ||
}); | ||
} | ||
app.get('/heartbeat', function(req, res) { | ||
@@ -167,4 +35,9 @@ res.send({ | ||
// Create the config and start up the app | ||
if (!module.parent) { | ||
function handleConfigError(err) { | ||
Stats.increment('errors'); | ||
Winston.error(err); | ||
process.exit(1); | ||
} | ||
function start(callback) { | ||
Confit(Path.join(__dirname, '..', 'config')).create(function(err, config) { | ||
@@ -180,3 +53,5 @@ if (err) { | ||
Stats.init(config.get('statsd')); | ||
if(config.get('record_stats')) { | ||
Stats.init(config.get('statsd')); | ||
} | ||
@@ -189,2 +64,3 @@ var apiOpts = config.get('api'); | ||
if (err) { | ||
Winston.error(err); | ||
Winston.error('Could not register mediator'); | ||
@@ -194,14 +70,53 @@ process.exit(1); | ||
var heartbeat = MUtils.activateHeartbeat(apiOpts); | ||
heartbeat.on('error', function(err) { | ||
Winston.error('Sending heartbeat failed', err); | ||
}); | ||
MUtils.fetchConfig(apiOpts, (err, initialConfig) => { | ||
if (err) { | ||
Winston.error('Failed to fetch initial config'); | ||
Winston.error(err.stack); | ||
process.exit(1); | ||
} else { | ||
Winston.info('Received initial config:'); | ||
Winston.debug(JSON.stringify(initialConfig)); | ||
mediatorConf.config = initialConfig; | ||
ConfigHandler.updateEndpointConfig(app, config, apiOpts, initialConfig, function(err){ | ||
if(err){ | ||
handleConfigError(err); | ||
} | ||
}); | ||
config.get('endpoints').forEach(function(endpoint) { | ||
setUpEndpoint(endpoint, apiOpts); | ||
}); | ||
Winston.info('Successfully registered mediator'); | ||
var port = process.env.PORT || config.get('port'); | ||
app.listen(port, function() { | ||
Winston.info('App started on port %s', port); | ||
var port = process.env.PORT || config.get('port'); | ||
server = app.listen(port, function() { | ||
if(config.get('heartbeat')){ | ||
var heartbeat = MUtils.activateHeartbeat(apiOpts); | ||
heartbeat.on('error', function(err) { | ||
Winston.error('Sending heartbeat failed', err); | ||
}); | ||
heartbeat.on('config', function(newConfig){ | ||
Winston.info('Received updated config:'); | ||
Winston.debug(JSON.stringify(newConfig)); | ||
mediatorConf.config = newConfig; | ||
ConfigHandler.updateEndpointConfig(app, config, apiOpts, newConfig, function(err){ | ||
if(err){ | ||
handleConfigError(err); | ||
} | ||
}); | ||
Utils.displayEndpoints(app, (msg, routes) => { | ||
Winston.info(msg, routes.toString()); | ||
}); | ||
}); | ||
} | ||
Utils.displayEndpoints(app, (msg, routes) => { | ||
Winston.info(msg, routes.toString()); | ||
}); | ||
Winston.info('App started on port %s', port); | ||
callback(true); | ||
}); | ||
enableDestroy(server); | ||
} | ||
}); | ||
@@ -211,1 +126,19 @@ }); | ||
} | ||
exports.start = start; | ||
function stop (callback) { | ||
Winston.info('closing FQ server...'); | ||
server.shutdown(() => { | ||
Winston.info('FQ server closed.'); | ||
callback(); | ||
}); | ||
} | ||
exports.stop = stop; | ||
// Create the config and start up the app | ||
if (!module.parent) { | ||
start(() => { | ||
Winston.info('Finished'); | ||
}); | ||
} |
@@ -10,7 +10,9 @@ 'use strict'; | ||
init: function(options) { | ||
stats = new StatsD({ | ||
host: options.host, | ||
port: options.port, | ||
prefix: OS.hostname() + '.momconnect_queue.' | ||
}); | ||
if(options) { | ||
stats = new StatsD({ | ||
host: options.host, | ||
port: options.port, | ||
prefix: OS.hostname() + '.momconnect_queue.' | ||
}); | ||
} | ||
}, | ||
@@ -17,0 +19,0 @@ |
@@ -13,1 +13,17 @@ 'use strict'; | ||
}; | ||
exports.displayEndpoints = function(app, callback) { | ||
var routes = app._router.stack; | ||
var routePaths = []; | ||
for (var key in routes) { | ||
if (routes.hasOwnProperty(key)) { | ||
var val = routes[key]; | ||
if(val.route) | ||
{ | ||
val = val.route; | ||
routePaths.push(val.path); | ||
} | ||
} | ||
} | ||
callback('File Queue Endpoints:', routePaths); | ||
}; |
@@ -31,2 +31,3 @@ 'use strict'; | ||
this._url = options.url; | ||
this.path = options.path; | ||
this._queue = Async.queue(this._processFile.bind(this), options.parallel || 2); | ||
@@ -50,2 +51,43 @@ if (options.paused === true) { | ||
Worker.prototype.updateWorker = function(newOptions, callback){ | ||
if(newOptions){ | ||
if(newOptions.url){ | ||
this._url = newOptions.url; | ||
this._queue.concurrency = newOptions.parallel || 2; | ||
if (newOptions.paused === true) { | ||
this.pause(); | ||
} else { | ||
this.resume(); | ||
} | ||
this.updateTx = newOptions.updateTx === true; | ||
this.forwardMetadata = newOptions.forwardMetadata === true; | ||
Winston.info('Worker updated'); | ||
Winston.debug({ | ||
url:this._url, | ||
parallel: this._queue.concurrency, | ||
updateTx: this.updateTx, | ||
forwardMetadata:this.forwardMetadata | ||
}); | ||
} else { | ||
callback(new Error('Failed to update worker: url is required')); | ||
} | ||
} else { | ||
callback(new Error('Failed to update worker: no options supplied')); | ||
} | ||
}; | ||
Worker.prototype.getOptions = function(){ | ||
var options = { | ||
name: this.name, | ||
url: this._url, | ||
path: this.path, | ||
paused: this._queue.paused, | ||
parallel: this._queue.concurrency, | ||
updateTx: this.updateTx, | ||
forwardMetadata: this.forwardMetadata | ||
}; | ||
return options; | ||
}; | ||
Worker.prototype.repopulate = function() { | ||
@@ -52,0 +94,0 @@ Winston.info('Repopulating', {worker: this.name}); |
{ | ||
"name": "openhim-mediator-file-queue", | ||
"version": "2.0.2", | ||
"version": "3.0.0", | ||
"description": "An async file queue for OpenHIM mediators", | ||
@@ -16,4 +16,4 @@ "author": "Jembi Health Systems (https://github.com/jembi)", | ||
"start": "node lib", | ||
"test": "tap --cov test/tests.js", | ||
"coverage": "tap test/tests.js --cov --coverage-report=lcov" | ||
"test": "tap --cov test/tests.js test/test_worker.js test/test_openhim.js && jshint **/*.js", | ||
"coverage": "tap test/tests.js test/test_worker.js test/test_openhim.js --cov --coverage-report=lcov" | ||
}, | ||
@@ -31,3 +31,5 @@ "bin": { | ||
"express": "^4.11.1", | ||
"express-remove-route": "^0.1.1", | ||
"graceful-fs": "^4.1.2", | ||
"latest-version": "^2.0.0", | ||
"mkdirp": "^0.5.0", | ||
@@ -38,2 +40,3 @@ "needle": "^0.10.0", | ||
"openhim-mediator-utils": "^0.2.2", | ||
"server-graceful-shutdown": "^0.1.2", | ||
"type-is": "^1.5.5", | ||
@@ -55,5 +58,6 @@ "winston": "^1.0.1" | ||
"devDependencies": { | ||
"jshint": "^2.9.2", | ||
"rewire": "^2.5.1", | ||
"tap": "^2.3.2" | ||
"tap": "^2.3.4" | ||
} | ||
} |
@@ -12,2 +12,8 @@ # File queue | ||
For development purposes, in order to communicate to the OpenHIM core, since it uses self-signed certificates, execute: | ||
```sh | ||
NODE_TLS_REJECT_UNAUTHORIZED=0 npm start | ||
``` | ||
See this visual example of how this work as an OpenHIM mediator. Note the curl commands return instantly but the mediator knows to tell the OpenHIM that these are still processing and updates them after they complete asynchronously. | ||
@@ -19,6 +25,48 @@ | ||
The file queue simply handles incoming requests, writing the files to a directory on the filesystem, and then processes the queue, sending the files to a configured endpoint. If the file is successfully sent then the queued file is deleted from the filesystem, otherwise it is moved to an error directory. | ||
The file queue simply handles incoming requests, writing the files to a directory on the filesystem, and then processes the queue, sending the files to a configured endpoint. If the file is successfully sent, then the queued file is deleted from the filesystem, otherwise it is moved to an error directory. | ||
Multiple "endpoints" can be configured. Each endpoint handles incoming requests for a specific URL, queues them, and then sends them to another configured URL. An endpoint has a "worker" which is responsible for reading the files from the queue and processing them. Workers can process multiple files in parallel as configured (by default 2 at a time). Workers can be paused/unpaused or repopulated via a RESTlike endpoint. Pausing a worker will stop it from processing files from the queue, but the endpoint will continue accepting requests and writing the files to the queue. Repopulating a worker will cause it to refresh its queue from the files on the filesystem. This is useful when manually adding files to or removing files from the queue. | ||
To pause a queue: | ||
`./scripts/pause.sh <worker-name>` | ||
To resume a queue: | ||
`./scripts/resume.sh <worker-name>` | ||
To repopulate a queue: | ||
`mv ./error/<worker-name>/* ./queue/<worker-name>/` | ||
`./scripts/repopulate.sh <worker-name>` | ||
Each "endpoint" must have a matching channel and route registered in OpenHIM, for receiving requests and forwarding them to the file queue mediator. When the mediator starts up a new channel will automatically be created/updated based off the settings for the upstream server (where the files will be forwarded to). Note the the channel will be updated each time the endpoint settings are updated. | ||
Please note that the following manual steps might be required to get the File Queue to function correctly: | ||
* If the mediator is running on the same server as the OpenHIM server, then you may need to update the route host to the IP address of that server for OpenHIM, instead of using `localhost`. | ||
* The user role `file-queue` has been created by default for all channels. This role needs to be added to the user account which will be used to push new files onto the queue. | ||
Here is an example config for a channel: | ||
```js | ||
{ | ||
"name": "File Queue", | ||
"urlPattern": "^/test$", | ||
"status": "enabled", | ||
"routes": [ | ||
{ | ||
"name": "File queue", | ||
"host": "localhost", | ||
"path": "/test", | ||
"port": "4002", | ||
"secured": false, | ||
"primary": true, | ||
"type": "http" | ||
} | ||
], | ||
"authType": "private", | ||
"allow": [ | ||
"file-queue" | ||
] | ||
} | ||
``` | ||
## Endpoint config | ||
@@ -28,3 +76,3 @@ | ||
* `name` (required) - The name of the endpoint which is used for setting up the RESTlike routes for the worker. | ||
* `path` (required) - The path to use for handling incoming requests. A '\*' may be used to match any number of characters in a path. E.g. 'this/is/a/test/\*'. | ||
* `path` (required) - The path to use for handling incoming requests. A '\*' may be used to match any number of characters in a path. E.g. 'this/is/a/test/\*'. Note that this path must match the path for the route accepting these requests. | ||
* `url` (required) - The URL to send the files to when processing them from the queue. | ||
@@ -31,0 +79,0 @@ * `paused` - Whether or not the endpoint's worker should be paused by default. This must be a boolean value. Defaults to false. |
'use strict'; | ||
var fs = require('graceful-fs'); | ||
var tap = require('tap'); | ||
var rewire = require('rewire'); | ||
const fs = require('graceful-fs'); | ||
const tap = require('tap'); | ||
const rewire = require('rewire'); | ||
const testUtils = require('./utils'); | ||
const testServer = require('./test-openhim-server'); | ||
const testUpstreamServer = require('./test-upstream-server'); | ||
const Winston = require('winston'); | ||
const request = require('request'); | ||
var index = rewire('../lib/index'); | ||
var worker = rewire('../lib/worker'); | ||
var index = null; | ||
var setupEndpoint = rewire('../lib/setupEndpoint'); | ||
// this forces the use of the test config file | ||
process.env.NODE_ENV = 'test'; | ||
function beforeEach(callback) { | ||
index = rewire('../lib/index'); | ||
testServer.start(() => { | ||
testUpstreamServer.start(() => { | ||
Winston.info('Test servers started...'); | ||
callback(); | ||
}); | ||
}); | ||
} | ||
function cleanUp(callback){ | ||
Winston.info('teardown'); | ||
setupEndpoint.__set__('WorkerInstances', []); | ||
setupEndpoint.destroyWorkers(() => { | ||
// Shutdown servers | ||
testUpstreamServer.stop(() => { | ||
testServer.stop(() => { | ||
Winston.info('Test servers stopped'); | ||
callback(); | ||
}); | ||
}); | ||
}); | ||
index = null; | ||
} | ||
// ************************************************ | ||
// tests for index.js | ||
// ************************************************ | ||
tap.test('should write metadata to file', function(t) { | ||
@@ -19,3 +55,3 @@ var req = { | ||
}; | ||
var writeMetadata = index.__get__('writeMetadata'); | ||
var writeMetadata = setupEndpoint.__get__('writeMetadata'); | ||
writeMetadata('test123.json', 'test', req, function(err) { | ||
@@ -31,120 +67,73 @@ t.error(err); | ||
function setupTestFiles(bodyFile, metaFile) { | ||
fs.mkdirSync('test/from'); | ||
fs.mkdirSync('test/to'); | ||
if (bodyFile) { | ||
fs.writeFileSync('test/from/xb58d4327b141ebffe6e990c.txt', 'Test123'); | ||
} | ||
if (metaFile) { | ||
fs.writeFileSync('test/from/xb58d4327b141ebffe6e990c-metadata.json', JSON.stringify({'test': 'obj'})); | ||
} | ||
} | ||
function cleanupTestFiles() { | ||
try { fs.unlinkSync('test/from/xb58d4327b141ebffe6e990c.txt'); | ||
} catch(e) { /* delete if exist */ } | ||
try { fs.unlinkSync('test/from/xb58d4327b141ebffe6e990c-metadata.json'); | ||
} catch(e) { /* delete if exist */ } | ||
try { fs.unlinkSync('test/to/xb58d4327b141ebffe6e990c.txt'); | ||
} catch(e) { /* delete if exist */ } | ||
try { fs.unlinkSync('test/to/xb58d4327b141ebffe6e990c-metadata.json'); | ||
} catch(e) { /* delete if exist */ } | ||
try { fs.rmdirSync('test/from'); | ||
} catch(e) { /* delete if exist */ } | ||
try { fs.rmdirSync('test/to'); | ||
} catch(e) { /* delete if exist */ } | ||
} | ||
// In case there are leftovers from previously failed tests | ||
cleanupTestFiles(); | ||
tap.test('moveTx - should move both body and metadata files', function(t) { | ||
setupTestFiles(true, true); | ||
var moveTx = worker.__get__('moveTx'); | ||
moveTx('xb58d4327b141ebffe6e990c.txt', 'test/from', 'test/to', true, function(err) { | ||
t.notOk(err); | ||
t.ok(fs.statSync('test/to/xb58d4327b141ebffe6e990c.txt')); | ||
t.ok(fs.statSync('test/to/xb58d4327b141ebffe6e990c-metadata.json')); | ||
tap.test('should find worker', function(t){ | ||
testUtils.findWorker(function(findWorker){ | ||
t.ok(findWorker(testUtils.validConf)); | ||
t.end(); | ||
}); | ||
t.tearDown(cleanupTestFiles); | ||
}); | ||
}); | ||
tap.test('moveTx - should move just body if forward metadata is false', function(t) { | ||
setupTestFiles(true, false); | ||
var moveTx = worker.__get__('moveTx'); | ||
moveTx('xb58d4327b141ebffe6e990c.txt', 'test/from', 'test/to', false, function(err) { | ||
t.notOk(err); | ||
t.ok(fs.statSync('test/to/xb58d4327b141ebffe6e990c.txt')); | ||
tap.test('should fail to find worker', function(t){ | ||
testUtils.findWorker(function(findWorker){ | ||
t.notOk(findWorker(testUtils.invalidConf)); | ||
t.end(); | ||
}); | ||
t.tearDown(cleanupTestFiles); | ||
}); | ||
tap.test('moveTx - should throw an error if metadata file doesnt exist', function(t) { | ||
setupTestFiles(true, false); | ||
var moveTx = worker.__get__('moveTx'); | ||
moveTx('xb58d4327b141ebffe6e990c.txt', 'test/from', 'test/to', true, function(err) { | ||
t.ok(err); | ||
t.end(); | ||
}); | ||
t.tearDown(cleanupTestFiles); | ||
}); | ||
tap.test('should send file upstream', function(t){ | ||
beforeEach(() => { | ||
t.plan(3); | ||
index.start((res) => { | ||
Winston.info(res); | ||
const options = { | ||
url: 'http://root:password@localhost:4002/test', | ||
body: 'This is a test' | ||
}; | ||
t.ok(res); | ||
setTimeout(function() { | ||
request.post(options, (err, res) => { | ||
Winston.info(res.body); | ||
Winston.info(res.statusCode); | ||
t.equal(res.statusCode, 202); | ||
setTimeout(function() { | ||
index.stop(() => { | ||
tap.test('moveTx - should throw an error if body file doesnt exist', function(t) { | ||
var moveTx = worker.__get__('moveTx'); | ||
moveTx('xb58d4327b141ebffe6e990c.txt', 'test/from', 'test/to', false, function(err) { | ||
t.ok(err); | ||
t.end(); | ||
}); | ||
}); | ||
tap.test('delTx - should delete both files', function(t) { | ||
t.plan(3); | ||
setupTestFiles(true, true); | ||
var delTx = worker.__get__('delTx'); | ||
delTx('xb58d4327b141ebffe6e990c.txt', 'test/from', true, function(err) { | ||
t.notOk(err); | ||
fs.stat('test/from/xb58d4327b141ebffe6e990c.txt', function(err) { | ||
t.ok(err); | ||
cleanUp(() => { | ||
t.pass(); | ||
t.end(); | ||
}); | ||
}); | ||
}, 2000); | ||
}); | ||
}, 2000); | ||
}); | ||
fs.stat('test/from/xb58d4327b141ebffe6e990c-metadata.json', function(err) { | ||
t.ok(err); | ||
}); | ||
}); | ||
t.tearDown(cleanupTestFiles); | ||
}); | ||
tap.test('delTx - should just body file when not forwarding metadata', function(t) { | ||
t.plan(3); | ||
setupTestFiles(true, true); | ||
var delTx = worker.__get__('delTx'); | ||
delTx('xb58d4327b141ebffe6e990c.txt', 'test/from', false, function(err) { | ||
t.notOk(err); | ||
fs.stat('test/from/xb58d4327b141ebffe6e990c.txt', function(err) { | ||
t.ok(err); | ||
tap.test('should fail to send file upstream mediator config', function(t){ | ||
beforeEach(() => { | ||
t.plan(3); | ||
index.start((res) => { | ||
Winston.info(res); | ||
const options = { | ||
url: 'http://root:password@localhost:4002/invalidPath', | ||
body: 'This is a test' | ||
}; | ||
t.ok(res); | ||
setTimeout(function() { | ||
request.post(options, (err, res) => { | ||
Winston.info(res.body); | ||
Winston.info(res.statusCode); | ||
t.equal(res.statusCode, 404); | ||
setTimeout(function() { | ||
index.stop(() => { | ||
cleanUp(() => { | ||
t.pass(); | ||
t.end(); | ||
}); | ||
}); | ||
}, 2000); | ||
}); | ||
}, 2000); | ||
}); | ||
fs.stat('test/from/xb58d4327b141ebffe6e990c-metadata.json', function(err) { | ||
t.notOk(err); | ||
}); | ||
}); | ||
t.tearDown(cleanupTestFiles); | ||
}); | ||
tap.test('delTx - should throw an error if metadata file doesnt exist', function(t) { | ||
setupTestFiles(true, false); | ||
var delTx = worker.__get__('delTx'); | ||
delTx('xb58d4327b141ebffe6e990c.txt', 'test/from', true, function(err) { | ||
t.ok(err); | ||
t.end(); | ||
}); | ||
t.tearDown(cleanupTestFiles); | ||
}); | ||
tap.test('delTx - should throw an error if body file doesnt exist', function(t) { | ||
var delTx = worker.__get__('delTx'); | ||
delTx('xb58d4327b141ebffe6e990c.txt', 'test/from', false, function(err) { | ||
t.ok(err); | ||
t.end(); | ||
}); | ||
}); |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
819056
31
3849
98
15
3
2
5
+ Addedexpress-remove-route@^0.1.1
+ Addedlatest-version@^2.0.0
+ Addedcapture-stack-trace@1.0.2(transitive)
+ Addedcreate-error-class@3.0.2(transitive)
+ Addeddeep-extend@0.6.0(transitive)
+ Addedduplexer2@0.1.4(transitive)
+ Addederror-ex@1.3.2(transitive)
+ Addedexpress-remove-route@0.1.1(transitive)
+ Addedgot@5.7.1(transitive)
+ Addedini@1.3.8(transitive)
+ Addedis-arrayish@0.2.1(transitive)
+ Addedis-redirect@1.0.0(transitive)
+ Addedis-retry-allowed@1.2.0(transitive)
+ Addedis-stream@1.1.0(transitive)
+ Addedisarray@1.0.0(transitive)
+ Addedlatest-version@2.0.0(transitive)
+ Addedlowercase-keys@1.0.1(transitive)
+ Addednode-status-codes@1.0.0(transitive)
+ Addedobject-assign@4.1.1(transitive)
+ Addedpackage-json@2.4.0(transitive)
+ Addedparse-json@2.2.0(transitive)
+ Addedpinkie@2.0.4(transitive)
+ Addedpinkie-promise@2.0.1(transitive)
+ Addedprepend-http@1.0.4(transitive)
+ Addedprocess-nextick-args@2.0.1(transitive)
+ Addedrc@1.2.8(transitive)
+ Addedread-all-stream@3.1.0(transitive)
+ Addedreadable-stream@2.3.8(transitive)
+ Addedregistry-auth-token@3.4.0(transitive)
+ Addedregistry-url@3.1.0(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedsemver@5.7.2(transitive)
+ Addedserver-graceful-shutdown@0.1.2(transitive)
+ Addedstring_decoder@1.1.1(transitive)
+ Addedstrip-json-comments@2.0.1(transitive)
+ Addedtimed-out@3.1.3(transitive)
+ Addedunzip-response@1.0.2(transitive)
+ Addedurl-parse-lax@1.0.0(transitive)
+ Addedutil-deprecate@1.0.2(transitive)