Empujar. Empujarlo Bueno.
When you need to push data around, you push it. Push it real good.
An ETL and Operations tool.
What
Empujar is a tool which moves stuff around. It's built in node.js so you can do lots of stuff async-ly. You can move data around (a ETL tool), files (a backup tool), and more!
Empujar's top level object is a "book", which contains "chapters" and then "pages". Chapters are excecuted 1-by-1 in order, and then each page in a chapter can be run in parallel (up to a threading limit you specify).
See an example project here.
For Example, an example chapter to extract all data from a mySQL database would be:
var dateformat = require('dateformat');
exports.chapterLoader = function(book){
var chapter = book.addChapter(1, 'EXTRACT & LOAD', {threads: 5});
var source = book.connections.source.connection;
var destination = book.connections.destination.connection;
var queryLimit = 1000;
var tableMaxes = {};
var extractTable = function(table, callback){
destination.getMax(table, 'updatedAt', function(error, max){
if(error){ return callback(error); }
var query = 'SELECT * FROM `' + table + '` ';
if(max){
query += ' WHERE `updatedAt` >= "' + dateformat(max, 'yyyy-mm-dd HH:MM:ss') + '"';
}
source.getAll(query, queryLimit, function(error, rows, done){
destination.insertData(table, rows, function(error){
if(error){ return next(error); }
done();
});
}, callback);
});
};
chapter.addLoader('determine extract queries', function(done){
source.tables.forEach(function(table){
chapter.addPage('extract table: ' + table, function(next){
extractTable(table, next);
});
});
done();
});
};
Empujar runs operations in series or parallel. These are defined by books
and chapters
and pages
.
#!/usr/bin/env node
process.chdir(__dirname);
var Empujar = require('empujar');
var optimist = require('optimist');
var options = optimist.argv;
var book = new Empujar.book(options);
var errorHandler = function(error, context){
console.log("OH NO! (but I handled the error) | " + error);
setTimeout(process.exit, 5000);
};
book.on('error', errorHandler);
book.connect(function(){
book.logger.log('I am a debug message', 'debug');
book.data.stuff = 'something cool';
var chapter1 = book.addChapter(1, 'Do the first thing in parallel', {threads: 10});
var chapter2 = book.addChapter(2, 'Do that next thing in serial', {threads: 1});
var i = 0;
while(i < 100){
chapter1.addPage('sleepy thing: ' + i, function(next){
setTimeout(next, 100);
});
i++;
}
chapter2.addLoader('do something before', function(next){
book.logger.log('I am the preloader');
next();
});
chapter2.addPage('the final step', function(next){
next();
});
book.run(function(){
setTimeout(process.exit, 5000);
});
});
There is also a more formal example you can explore within this project. Check out /books/etl to learn more.
Empujar will connect to connections you define in book/config/connections/NAME.js
, and there should be a matching transport in /lib/connections/TYPE.js
.
When book.run()
is complete, you probably want to process.exit()
, or more gracefully shutdown.
You can subscribe to book.on('error')
and book.on('state')
events. A cool thing to do would be to actually record these state events into your datawarehouse, if you are using empujar as an ETL tool:
book.on('state', function(data){ datawarehouse.insertData('empujar', [data]); });
Project Layout
Create your project so that it looks like this:
| -\books
| ---\myBook
| -----\book.js
| -----\pids\
| -----\logs\
| -----\config\
| -----\config\connections\
| -----\config\connections\myDatabase.js
| -----\chapters\
| -----\chapters\chapte1.js
| -----\chapters\chapte2.js
Launch Flags
The defaults for all launch flags are:
{
chapterFiles: path.normalize( process.cwd() + '/chapters/**/*.js' ),
configPath: path.normalize( process.cwd() + '/config' ),
logPath: path.normalize( process.cwd() + '/log' ),
pidsPath: path.normalize( process.cwd() + '/pids' ),
logFile: 'empujar.log',
tmpPath: path.normalize( process.cwd() + '/tmp' ),
logStdout: true,
logLevel: 'info',
chapters: [],
getAllLimit: Infinity,
}
Examples:
- Run your book:
node yourBook.js
- Run your book in verbose mode:
node yourBook.js --logLevel debug
- Run only certain chapters in your book:
node yourBook.js --chapters 1,4
or a range: node yourBook.js --chapters 100-300
- Extract only a small subset of yoru data (great in testing)
node yourBook.js --getAllLimit 1000
- This would make all invocations of
connection.getAll()
exit sucessfully after retrieving 1000 rows.
Connections
While you can create your own connections, Empujar ships with the tools to work with a number of the most common ones:
MySQL
var connection = book.connections.mysql.connection;
connection.connect = function(callback)
connection.showTables = function(callback)
connection.showColumns = function(table, callback)
connection.query = function(query, data, callback)
connection.getAll = function(queryBase, chunkSize, dataCallback, doneCallback)
connection.getMax = function(table, column, callback)
connection.queryStream = function(query, callback)
connection.insertData = function(table, data, callback, mergeOnDuplicates)
connection.addColumn = function(table, column, rowData, callback)
connection.alterColumn = function(table, column, definition, callback)
connection.mergeTables = function(sourceTable, destinationTable, callback)
connection.copyTableSchema = function(sourceTable, destinationTable, callback)
connection.dump = function(file, options, callback)
Elasticsearch
var connection = book.connections.elasticsearch.connection;
connection.connect = function(callback)
connection.showIndices = function(callback)
connection.insertData = function(index, data, callback)
connection.getAll = function(index, query, fields, chunkSize, dataCallback, doneCallback)
S3
var connection = book.connections.s3.connection;
connection.connect = function(callback)
connection.listFolders = function(prefix, callback)
connection.listObjects = function(prefix, callback)
connection.deleteFolder = function(prefix, callback)
connection.objectExists = function(filename, callback)
connection.delete = function(filename, callback)
connection.streamingUpload = function(inputStream, filename, callback)
FTP
var connection = book.connections.ftp.connection;
connection.connect = function(callback)
connection.get = function(file, callback)
connection.listFiles = function(dir, callback)
Amazon Redshift
var connection = book.connections.redshift.connection;
connection.connect = function(callback)
connection.showTables = function(callback)
connection.showColumns = function(table, callback)
connection.query = function(query, callback)
connection.getAll = function(queryBase, chunkSize, dataCallback, doneCallback)
connection.insertData = function(table, data, callback)
connection.mergeTables = function(sourceTable, destinationTable, callback)
connection.addColumn = function(table, column, rowData, callback)
connection.alterColumn = function(table, column, definition, callback)
connection.copyTableSchema = function(sourceTable, destinationTable, callback)
connection.getMax = function(table, column, callback)
Creating your own connections.
It's easy to add your own connections to empujar. All you need is a /connections
folder in your project, and to follow some conventions. The basic building block of a connection looks like this:
var connection = function(name, type, options, book){
this.name = name;
this.type = type;
this.options = options;
this.book = book;
this.connection = null;
};
connection.prototype.connect = function(callback){
var self = this;
callback();
};
exports.connection = connection;
... and then extend your connection model with more prototypes.
For example, here'e a connection, delighted.js
which TaskRabbit uses to import NPS survey data from our partner Delighted. We extend their library to match the getAll
method of the built-in connections above.
var dateformat = require('dateformat');
var Delighted = require('delighted');
var connection = function(name, type, options, book){
this.name = name;
this.type = type;
this.options = options;
this.book = book;
this.connection = null;
};
connection.prototype.connect = function(callback){
var self = this;
self.connection = Delighted(self.options.apiKey);
callback();
};
connection.prototype.getAll = function(since, dataCallback, doneCallback, page, rowsFound){
var self = this;
var data = [];
if(page === undefined || page === null){ page = 1; }
if(!rowsFound){ rowsFound = 0; }
var options = {
per_page : 100,
since : since,
page : page,
expand : 'person',
};
self.connection.surveyResponse.all(options).then(function(responses) {
if(responses.length === 0){
doneCallback(null, rowsFound);
}else{
rowsFound = rowsFound + responses.length;
responses.forEach(function(resp){
data.push({
id: parseInt(resp.id),
person: parseInt(resp.person.id),
score: parseInt(resp.score),
comment: resp.comment,
permalink: resp.permalink,
created_at: dateformat(resp.created_at * 1000, 'yyyy-mm-dd HH:MM:ss'),
updated_at: dateformat(resp.updated_at * 1000, 'yyyy-mm-dd HH:MM:ss'),
customer_type: resp.customer_type,
email: resp.person.email,
name: resp.person.name,
});
});
dataCallback(null, data, function(){
if(self.book.options.getAllLimit > rowsFound){
self.getAll(since, dataCallback, doneCallback, (page + 1), rowsFound);
}else{
doneCallback(null, rowsFound);
}
});
}
});
};
exports.connection = connection;