larvitamsync
Advanced tools
Comparing version 0.1.5 to 0.1.7
@@ -5,3 +5,3 @@ 'use strict'; | ||
uuidLib = require('uuid'), | ||
mysql = require('mysql2'), | ||
spawn = require('child_process').spawn, | ||
async = require('async'), | ||
@@ -11,4 +11,3 @@ log = require('winston'), | ||
os = require('os'), | ||
fs = require('fs'), | ||
_ = require('lodash'); | ||
fs = require('fs'); | ||
@@ -38,23 +37,23 @@ function sync(options, cb) { | ||
tasks.push(function(cb) { | ||
const localDbConf = _.cloneDeep(db.conf); | ||
const mysqlOptions = [], | ||
f = fs.openSync(tmpFileName, 'r'); | ||
let dbCon; | ||
mysqlOptions.push('-h'); | ||
mysqlOptions.push(db.conf.host); | ||
mysqlOptions.push('-u'); | ||
mysqlOptions.push(db.conf.user); | ||
localDbConf.multipleStatements = true; | ||
dbCon = mysql.createConnection(localDbConf); | ||
if (db.conf.password) { | ||
mysqlOptions.push('-p' + db.conf.password); | ||
} | ||
dbCon.query(fs.readFileSync(tmpFileName).toString(), function(err) { | ||
if (err) { | ||
log.error('larvitamsync: ./mariadb.js - sync() - SQL error: ' + err.message); | ||
} else { | ||
log.info('larvitamsync: ./mariadb.js - sync() - Database synced!'); | ||
} | ||
mysqlOptions.push(db.conf.database); | ||
dbCon.end(function(err) { | ||
if (err) { | ||
log.warn('larvitamsync: ./mariadb.js - sync() - Could not end() database connection, err: ' + err.message); | ||
} | ||
let shMysql; | ||
cb(err); | ||
}); | ||
shMysql = spawn('mysql', mysqlOptions, {'stdio': [f, 'pipe', process.stderr]}); | ||
shMysql.on('close', function() { | ||
log.info('larvitamsync: ./mariadb.js - sync() - Database synced!'); | ||
cb(); | ||
}); | ||
@@ -61,0 +60,0 @@ }); |
{ | ||
"name": "larvitamsync", | ||
"version": "0.1.5", | ||
"version": "0.1.7", | ||
"description": "Sync data between minions", | ||
@@ -23,2 +23,3 @@ "main": "index.js", | ||
"devDependencies": { | ||
"lodash": "^4.17.3", | ||
"mocha": "^3.2.0", | ||
@@ -33,4 +34,2 @@ "mocha-eslint": "^3.0.1", | ||
"larvitutils": "^1.1.0", | ||
"lodash": "^4.17.2", | ||
"mysql2": "^1.1.2", | ||
"uuid": "^3.0.1", | ||
@@ -37,0 +36,0 @@ "winston": "^2.3.0" |
@@ -13,2 +13,4 @@ [![Build Status](https://travis-ci.org/larvit/larvitamsync.svg?branch=master)](https://travis-ci.org/larvit/larvitamsync) [![Dependencies](https://david-dm.org/larvit/larvitamsync.svg)](https://david-dm.org/larvit/larvitamsync.svg) | ||
#### Simple command | ||
```javascript | ||
@@ -26,3 +28,3 @@ const options = {'exchange': 'test_dataDump'}, // RabbitMQ exchange, must be unique on the queue | ||
}; | ||
// or | ||
// or pipe directly from mysqldump: | ||
options.dataDumpCmd = { | ||
@@ -33,8 +35,3 @@ 'command': 'mysqldump', | ||
}; | ||
// or something else | ||
// Optional Content-Type header can be set like this: | ||
options['Content-Type'] = 'application/sql'; | ||
// Returns https://nodejs.org/api/http.html#http_class_http_server | ||
new amsync.SyncServer(options, function(err) { | ||
@@ -47,2 +44,37 @@ if (err) throw err; | ||
#### Custom http request handler | ||
On each data dump request there is a http request and this can be handled manually | ||
```javascript | ||
const options = {'exchange': 'test_dataDump'}, // RabbitMQ exchange, must be unique on the queue | ||
amsync = require('larvitamsync'); | ||
let syncServer; | ||
syncServer = new amsync.SyncServer(options, function(err) { | ||
if (err) throw err; | ||
console.log('Server active'); | ||
}); | ||
syncServer.handleHttpReq_original = syncServer.handleHttpReq; | ||
syncServer.handleHttpReq = function(req, res) { | ||
// Set custom content type | ||
res.setHeader('Content-Type', 'text/plain'); | ||
// Run different commands depending on request url | ||
if (req.url === '/') { | ||
syncServer.options.dataDumpCmd = {'command': 'echo', 'args': ['blergh']}; | ||
} else { | ||
syncServer.options.dataDumpCmd = {'command': 'echo', 'args': [req.url]}; | ||
} | ||
// Run the original request handler | ||
syncServer.handleHttpReq_original(req, res); | ||
} | ||
``` | ||
### Client (data slave) | ||
@@ -53,5 +85,10 @@ | ||
```javascript | ||
const options = {'exchange': 'test_dataDump'}, // RabbitMQ exchange, must be unique on the queue | ||
const options = {}, | ||
amsync = require('larvitamsync'); | ||
options.exchange = 'test_dataDump'; // RabbitMQ exchange, must be unique on the queue | ||
options.requestOptions = {'path': '/foobar'}; // Optional extra options to | ||
// https://www.npmjs.com/package/request that | ||
// is used to request stuff from the server | ||
new amsync.SyncClient(options, function(err, res) { | ||
@@ -58,0 +95,0 @@ let syncData = Buffer.from(''); |
@@ -88,2 +88,8 @@ 'use strict'; | ||
if (that.options.requestOptions !== undefined) { | ||
for (const key of Object.keys(that.options.requestOptions)) { | ||
reqOptions[key] = that.options.requestOptions[key]; | ||
} | ||
} | ||
log.verbose('larvitamsync: syncClient.js - SyncClient.handleMsg() - Sending request: "' + JSON.stringify(reqOptions) + '"'); | ||
@@ -90,0 +96,0 @@ |
'use strict'; | ||
const lUtils = require('larvitutils'), | ||
uuidLib = require('uuid'), | ||
const uuidLib = require('uuid'), | ||
lUtils = require('larvitutils'), | ||
spawn = require('child_process').spawn, | ||
@@ -31,13 +31,56 @@ http = require('http'), | ||
// Subscribe to dump requests | ||
that.listenForRequests(cb); | ||
} | ||
SyncServer.prototype.handleHttpReq = function handleHttpReq(req, res) { | ||
const that = this; | ||
let dumpProcess; | ||
if (req.headers.token !== req.token) { | ||
log.info('larvitamsync: syncServer.js - SyncServer.handleHttpReq() - Token: "' + req.token + '". Incoming message. Invalid token detected: "' + req.headers.token + '"'); | ||
res.writeHead(401, {'Content-Type': 'text/plain; charset=utf-8'}); | ||
res.end('Unauthorized'); | ||
return; | ||
} | ||
if ( ! that.options.dataDumpCmd || ! that.options.dataDumpCmd.command) { | ||
const err = new Error('options.dataDumpCmd.command is a required option!'); | ||
log.warn('larvitamsync: syncServer.js - SyncServer() - Invalid options: ' + err.message); | ||
cb(err); | ||
log.error('larvitamsync: syncServer.js - handleHttpReq() - Invalid options: ' + err.message); | ||
res.writeHead(500, { 'Content-Type': 'text/plain' }); | ||
res.end('Internal server error'); | ||
return; | ||
} | ||
// Subscribe to dump requests | ||
that.listenForRequests(cb); | ||
} | ||
log.verbose('larvitamsync: syncServer.js - SyncServer.handleHttpReq() - Token: "' + req.token + '". Incoming message with valid token.'); | ||
dumpProcess = spawn(that.options.dataDumpCmd.command, that.options.dataDumpCmd.args, that.options.dataDumpCmd.options); | ||
if ( ! res.getHeader('Content-Type')) { res.setHeader('Content-Type', 'Application/Octet-stream'); } | ||
if ( ! res.getHeader('Connection')) { res.setHeader('Connection', 'Transfer-Encoding'); } | ||
if ( ! res.getHeader('Transfer-Encoding')) { res.setHeader('Connection', 'chunked'); } | ||
dumpProcess.stdout.on('data', function(data) { | ||
res.write(data); | ||
}); | ||
dumpProcess.stderr.on('data', function(data) { | ||
log.error('larvitamsync: syncServer.js - SyncServer.handleHttpReq() - Token: "' + req.token + '". Error from dump command: ' + data.toString()); | ||
}); | ||
dumpProcess.on('close', function() { | ||
log.debug('larvitamsync: syncServer.js - SyncServer.handleHttpReq() - Token: "' + req.token + '". Dump command closed.'); | ||
res.end(); | ||
clearTimeout(req.serverTimeout); | ||
req.server.close(); | ||
}); | ||
dumpProcess.on('error', function(err) { | ||
log.warn('larvitamsync: syncServer.js - SyncServer.handleHttpReq() - Token: "' + req.token + '". Non-0 exit code from dumpProcess. err: ' + err.message); | ||
res.writeHead(500, { 'Content-Type': 'text/plain' }); | ||
res.end('Process error: ' + err.message); | ||
}); | ||
}; | ||
SyncServer.prototype.handleIncMsg = function handleIncMsg(message, ack) { | ||
@@ -60,61 +103,10 @@ const token = uuidLib.v4(), | ||
function handleReq(req, res) { | ||
let headersWritten = false, | ||
dumpProcess; | ||
server = http.createServer(function(req, res) { | ||
req.server = server; | ||
req.serverTimeout = serverTimeout; | ||
req.that = that; | ||
req.token = token; | ||
if (req.headers.token !== token) { | ||
log.info('larvitamsync: syncServer.js - SyncServer.handleIncMsg() - handleReq() - Token: "' + token + '". Incoming message. Invalid token detected: "' + req.headers.token + '"'); | ||
res.writeHead(401, {'Content-Type': 'text/plain; charset=utf-8'}); | ||
res.end('Unauthorized'); | ||
return; | ||
} | ||
log.verbose('larvitamsync: syncServer.js - SyncServer.handleIncMsg() - handleReq() - Token: "' + token + '". Incoming message with valid token.'); | ||
dumpProcess = spawn(that.options.dataDumpCmd.command, that.options.dataDumpCmd.args, that.options.dataDumpCmd.options); | ||
function writeHeaders() { | ||
let contentType = that.options['Content-Type']; | ||
if ( ! contentType) { | ||
contentType = 'Application/Octet-stream'; | ||
} | ||
if (headersWritten === false) { | ||
log.debug('larvitamsync: syncServer.js - SyncServer.handleIncMsg() - handelReq() - writeHeaderes() - Writing headers'); | ||
res.writeHead(200, { | ||
'Connection': 'Transfer-Encoding', | ||
'Content-Type': contentType, | ||
'Transfer-Encoding': 'chunked' | ||
}); | ||
headersWritten = true; | ||
} | ||
} | ||
dumpProcess.stdout.on('data', function(data) { | ||
writeHeaders(); | ||
res.write(data); | ||
}); | ||
dumpProcess.stderr.on('data', function(data) { | ||
log.error('larvitamsync: syncServer.js - SyncServer.handleIncMsg() - handleReq() - Token: "' + token + '". Error from dump command: ' + data.toString()); | ||
}); | ||
dumpProcess.on('close', function() { | ||
log.debug('larvitamsync: syncServer.js - SyncServer.handleIncMsg() - handleReq() - Token: "' + token + '". Dump command closed.'); | ||
writeHeaders(); | ||
res.end(); | ||
clearTimeout(serverTimeout); | ||
server.close(); | ||
}); | ||
dumpProcess.on('error', function(err) { | ||
log.warn('larvitamsync: syncServer.js - SyncServer.handleIncMsg() - handleReq() - Token: "' + token + '". Non-0 exit code from dumpProcess. err: ' + err.message); | ||
res.writeHead(500, { 'Content-Type': 'text/plain' }); | ||
res.end('Process error: ' + err.message); | ||
}); | ||
} | ||
server = http.createServer(handleReq); | ||
that.handleHttpReq(req, res); | ||
}); | ||
server.listen(0); | ||
@@ -121,0 +113,0 @@ |
@@ -383,4 +383,94 @@ 'use strict'; | ||
describe('Custom http receiver', function() { | ||
it('should read path', function(done) { | ||
const exchangeName = 'test_dataDump_custom', | ||
intercom1 = new Intercom(require(intercomConfigFile)), | ||
intercom2 = new Intercom(require(intercomConfigFile)), | ||
tasks = []; | ||
this.slow(500); | ||
intercom1.on('ready', function(err) { | ||
if (err) throw err; | ||
intercom1.ready = true; | ||
}); | ||
intercom2.on('ready', function(err) { | ||
if (err) throw err; | ||
intercom2.ready = true; | ||
}); | ||
// Wait for the intercoms to come online | ||
tasks.push(function(cb) { | ||
function checkIfReady() { | ||
if (intercom1.ready === true && intercom2.ready === true) { | ||
cb(); | ||
} else { | ||
setTimeout(checkIfReady, 10); | ||
} | ||
} | ||
checkIfReady(); | ||
}); | ||
// Start server | ||
tasks.push(function(cb) { | ||
const options = {'exchange': exchangeName}; | ||
let syncServer; | ||
syncServer = new amsync.SyncServer(options, cb); | ||
syncServer.handleHttpReq_original = syncServer.handleHttpReq; | ||
syncServer.handleHttpReq = function(req, res) { | ||
// Set custom content type | ||
res.setHeader('Content-Type', 'text/plain'); | ||
//console.log(req); | ||
// Run different commands depending on request url | ||
if (req.url === '/') { | ||
syncServer.options.dataDumpCmd = {'command': 'echo', 'args': ['-n', 'blergh']}; | ||
} else { | ||
syncServer.options.dataDumpCmd = {'command': 'echo', 'args': ['-n', req.url]}; | ||
} | ||
// Run the original request handler | ||
syncServer.handleHttpReq_original(req, res); | ||
}; | ||
}); | ||
// Start the client | ||
tasks.push(function(cb) { | ||
const options = {'exchange': exchangeName}; | ||
options.requestOptions = {'path': '/foobar'}; | ||
new amsync.SyncClient(options, function(err, res) { | ||
let syncData = Buffer.from(''); | ||
if (err) throw err; | ||
res.on('data', function(chunk) { | ||
syncData = Buffer.concat([syncData, chunk], syncData.length + chunk.length); | ||
}); | ||
res.on('end', function() { | ||
assert.deepEqual(syncData.toString(), options.requestOptions.path); | ||
cb(); | ||
}); | ||
res.on('error', function(err) { | ||
throw err; | ||
}); | ||
}); | ||
}); | ||
async.series(tasks, function(err) { | ||
if (err) throw err; | ||
done(); | ||
}); | ||
}); | ||
}); | ||
after(function(done) { | ||
db.removeAllTables(done); | ||
}); |
28686
6
663
125
4
27
5
- Removedlodash@^4.17.2
- Removedmysql2@^1.1.2