Comparing version 0.3.2 to 0.4.0
50
index.js
@@ -15,2 +15,5 @@ var mysql = require('mysql'); | ||
this.ctrlConnection = mysql.createConnection(ctrlDsn); | ||
this.ctrlConnection.on('error', this._emitError); | ||
this.ctrlConnection.on('unhandledError', this._emitError); | ||
this.ctrlConnection.connect(); | ||
@@ -20,2 +23,4 @@ this.ctrlCallbacks = []; | ||
this.connection = mysql.createConnection(dsn); | ||
this.connection.on('error', this._emitError); | ||
this.connection.on('unhandledError', this._emitError); | ||
@@ -94,5 +99,6 @@ this.tableMap = {}; | ||
ZongJi.prototype._isChecksumEnabled = function(next) { | ||
var self = this; | ||
var sql = 'select @@GLOBAL.binlog_checksum as checksum'; | ||
var ctrlConnection = this.ctrlConnection; | ||
var connection = this.connection; | ||
var ctrlConnection = self.ctrlConnection; | ||
var connection = self.connection; | ||
@@ -104,4 +110,7 @@ ctrlConnection.query(sql, function(err, rows) { | ||
return next(false); | ||
} else { | ||
// Any other errors should be emitted | ||
self.emit('error', err); | ||
return; | ||
} | ||
throw err; | ||
} | ||
@@ -118,3 +127,5 @@ | ||
if (err) { | ||
throw err; | ||
// Errors should be emitted | ||
self.emit('error', err); | ||
return; | ||
} | ||
@@ -132,3 +143,7 @@ next(checksumEnabled); | ||
self.ctrlConnection.query('SHOW BINARY LOGS', function(err, rows) { | ||
if(err) throw err; | ||
if (err) { | ||
// Errors should be emitted | ||
self.emit('error', err); | ||
return; | ||
} | ||
next(rows.length > 0 ? rows[rows.length - 1] : null); | ||
@@ -157,4 +172,19 @@ }); | ||
this.ctrlConnection.query(sql, function(err, rows) { | ||
if (err) throw err; | ||
if (err) { | ||
// Errors should be emitted | ||
self.emit('error', err); | ||
// This is a fatal error, no additional binlog events will be | ||
// processed since next() will never be called | ||
return; | ||
} | ||
if (rows.length === 0) { | ||
self.emit('error', new Error( | ||
'Insufficient permissions to access: ' + | ||
tableMapEvent.schemaName + '.' + tableMapEvent.tableName)); | ||
// This is a fatal error, no additional binlog events will be | ||
// processed since next() will never be called | ||
return; | ||
} | ||
self.tableMap[tableMapEvent.tableId] = { | ||
@@ -242,4 +272,4 @@ columnSchemas: rows, | ||
(exclude === undefined || | ||
(database !== undefined && | ||
(!(database in exclude) || | ||
(database !== undefined && | ||
(!(database in exclude) || | ||
(exclude[database] !== true && | ||
@@ -250,2 +280,6 @@ (exclude[database] instanceof Array && | ||
ZongJi.prototype._emitError = function(error) { | ||
this.emit('error', error); | ||
}; | ||
module.exports = ZongJi; |
@@ -177,9 +177,8 @@ var util = require('util'); | ||
this.columnsMetadata = this.columnTypes.map(function(code) { | ||
var mysqlType = Common.convertToMysqlType(code); | ||
var result; | ||
//jshint indent: false | ||
switch (mysqlType) { | ||
case 'FLOAT': | ||
case 'DOUBLE': | ||
switch (code) { | ||
case Common.MysqlTypes.FLOAT: | ||
case Common.MysqlTypes.DOUBLE: | ||
result = { | ||
@@ -189,3 +188,3 @@ size: parser.parseUnsignedNumber(1) | ||
break; | ||
case 'VARCHAR': | ||
case Common.MysqlTypes.VARCHAR: | ||
result = { | ||
@@ -195,3 +194,3 @@ 'max_length': parser.parseUnsignedNumber(2) | ||
break; | ||
case 'BIT': | ||
case Common.MysqlTypes.BIT: | ||
var bits = parser.parseUnsignedNumber(1); | ||
@@ -203,3 +202,3 @@ var bytes = parser.parseUnsignedNumber(1); | ||
break; | ||
case 'NEWDECIMAL': | ||
case Common.MysqlTypes.NEWDECIMAL: | ||
result = { | ||
@@ -210,4 +209,5 @@ precision: parser.parseUnsignedNumber(1), | ||
break; | ||
case 'BLOB': | ||
case 'GEOMETRY': | ||
case Common.MysqlTypes.BLOB: | ||
case Common.MysqlTypes.GEOMETRY: | ||
case Common.MysqlTypes.JSON: | ||
result = { | ||
@@ -217,4 +217,4 @@ 'length_size': parser.parseUnsignedNumber(1) | ||
break; | ||
case 'STRING': | ||
case 'VAR_STRING': | ||
case Common.MysqlTypes.STRING: | ||
case Common.MysqlTypes.VAR_STRING: | ||
// The STRING type sets a 'real_type' field to indicate the | ||
@@ -227,4 +227,4 @@ // actual type which is fundamentally incompatible with STRING | ||
var realType = metadata >> 8; | ||
var typeName = Common.convertToMysqlType(realType); | ||
if (typeName === 'ENUM' || typeName === 'SET') { | ||
if (realType === Common.MysqlTypes.ENUM | ||
|| realType === Common.MysqlTypes.SET) { | ||
result = { | ||
@@ -241,5 +241,5 @@ type: realType, | ||
break; | ||
case 'TIMESTAMP2': | ||
case 'DATETIME2': | ||
case 'TIME2': | ||
case Common.MysqlTypes.TIMESTAMP2: | ||
case Common.MysqlTypes.DATETIME2: | ||
case Common.MysqlTypes.TIME2: | ||
result = { | ||
@@ -246,0 +246,0 @@ decimals: parser.parseUnsignedNumber(1) |
var events = require('./binlog_event'); | ||
var rowsEvents = require('./rows_event'); | ||
var EventCode = { | ||
UNKNOWN_EVENT: 0x00, | ||
START_EVENT_V3: 0x01, | ||
QUERY_EVENT: 0x02, | ||
STOP_EVENT: 0x03, | ||
ROTATE_EVENT: 0x04, | ||
INTVAR_EVENT: 0x05, | ||
LOAD_EVENT: 0x06, | ||
SLAVE_EVENT: 0x07, | ||
CREATE_FILE_EVENT: 0x08, | ||
APPEND_BLOCK_EVENT: 0x09, | ||
EXEC_LOAD_EVENT: 0x0a, | ||
DELETE_FILE_EVENT: 0x0b, | ||
NEW_LOAD_EVENT: 0x0c, | ||
RAND_EVENT: 0x0d, | ||
USER_VAR_EVENT: 0x0e, | ||
FORMAT_DESCRIPTION_EVENT: 0x0f, | ||
XID_EVENT: 0x10, | ||
BEGIN_LOAD_QUERY_EVENT: 0x11, | ||
EXECUTE_LOAD_QUERY_EVENT: 0x12, | ||
TABLE_MAP_EVENT: 0x13, | ||
PRE_GA_DELETE_ROWS_EVENT: 0x14, | ||
PRE_GA_UPDATE_ROWS_EVENT: 0x15, | ||
PRE_GA_WRITE_ROWS_EVENT: 0x16, | ||
DELETE_ROWS_EVENT_V1: 0x19, | ||
UPDATE_ROWS_EVENT_V1: 0x18, | ||
WRITE_ROWS_EVENT_V1: 0x17, | ||
INCIDENT_EVENT: 0x1a, | ||
HEARTBEAT_LOG_EVENT: 0x1b, | ||
IGNORABLE_LOG_EVENT: 0x1c, | ||
ROWS_QUERY_LOG_EVENT: 0x1d, | ||
WRITE_ROWS_EVENT_V2: 0x1e, | ||
UPDATE_ROWS_EVENT_V2: 0x1f, | ||
DELETE_ROWS_EVENT_V2: 0x20, | ||
GTID_LOG_EVENT: 0x21, | ||
ANONYMOUS_GTID_LOG_EVENT: 0x22, | ||
PREVIOUS_GTIDS_LOG_EVENT: 0x23 | ||
}; | ||
var CodeEvent = [ | ||
'UNKNOWN_EVENT', | ||
'START_EVENT_V3', | ||
'QUERY_EVENT', | ||
'STOP_EVENT', | ||
'ROTATE_EVENT', | ||
'INTVAR_EVENT', | ||
'LOAD_EVENT', | ||
'SLAVE_EVENT', | ||
'CREATE_FILE_EVENT', | ||
'APPEND_BLOCK_EVENT', | ||
'EXEC_LOAD_EVENT', | ||
'DELETE_FILE_EVENT', | ||
'NEW_LOAD_EVENT', | ||
'RAND_EVENT', | ||
'USER_VAR_EVENT', | ||
'FORMAT_DESCRIPTION_EVENT', | ||
'XID_EVENT', | ||
'BEGIN_LOAD_QUERY_EVENT', | ||
'EXECUTE_LOAD_QUERY_EVENT', | ||
'TABLE_MAP_EVENT', | ||
'PRE_GA_DELETE_ROWS_EVENT', | ||
'PRE_GA_UPDATE_ROWS_EVENT', | ||
'PRE_GA_WRITE_ROWS_EVENT', | ||
'WRITE_ROWS_EVENT_V1', | ||
'UPDATE_ROWS_EVENT_V1', | ||
'DELETE_ROWS_EVENT_V1', | ||
'INCIDENT_EVENT', | ||
'HEARTBEAT_LOG_EVENT', | ||
'IGNORABLE_LOG_EVENT', | ||
'ROWS_QUERY_LOG_EVENT', | ||
'WRITE_ROWS_EVENT_V2', | ||
'UPDATE_ROWS_EVENT_V2', | ||
'DELETE_ROWS_EVENT_V2', | ||
'GTID_LOG_EVENT', | ||
'ANONYMOUS_GTID_LOG_EVENT', | ||
'PREVIOUS_GTIDS_LOG_EVENT' | ||
]; | ||
@@ -59,24 +59,4 @@ var EventClass = { | ||
var getEventName = exports.getEventName = function(code) { | ||
var result = 'UNKNOWN_EVENT'; | ||
Object.keys(EventCode).forEach(function(name) { | ||
if (EventCode[name] === code) { | ||
result = name; | ||
return; | ||
} | ||
}); | ||
return result; | ||
}; | ||
exports.getEventClass = function(code) { | ||
var eventName = getEventName(code); | ||
var result = events.Unknown; | ||
Object.keys(EventClass).forEach(function(className) { | ||
if (eventName === className) { | ||
result = EventClass[className]; | ||
return; | ||
} | ||
}); | ||
return result; | ||
return EventClass[CodeEvent[code]] || events.Unknown; | ||
}; |
@@ -1,33 +0,38 @@ | ||
var MysqlTypes = { | ||
'DECIMAL': 0, | ||
'TINY': 1, | ||
'SHORT': 2, | ||
'LONG': 3, | ||
'FLOAT': 4, | ||
'DOUBLE': 5, | ||
'NULL': 6, | ||
'TIMESTAMP': 7, | ||
'LONGLONG': 8, | ||
'INT24': 9, | ||
'DATE': 10, | ||
'TIME': 11, | ||
'DATETIME': 12, | ||
'YEAR': 13, | ||
'NEWDATE': 14, | ||
'VARCHAR': 15, | ||
'BIT': 16, | ||
var iconv = require('iconv-lite'); | ||
var decodeJson = require('./json_decode'); | ||
var MysqlTypes = exports.MysqlTypes = { | ||
DECIMAL: 0, | ||
TINY: 1, | ||
SHORT: 2, | ||
LONG: 3, | ||
FLOAT: 4, | ||
DOUBLE: 5, | ||
NULL: 6, | ||
TIMESTAMP: 7, | ||
LONGLONG: 8, | ||
INT24: 9, | ||
DATE: 10, | ||
TIME: 11, | ||
DATETIME: 12, | ||
YEAR: 13, | ||
NEWDATE: 14, | ||
VARCHAR: 15, | ||
BIT: 16, | ||
// Fractional temporal types in MySQL >=5.6.4 | ||
'TIMESTAMP2': 17, | ||
'DATETIME2': 18, | ||
'TIME2': 19, | ||
'NEWDECIMAL': 246, | ||
'ENUM': 247, | ||
'SET': 248, | ||
'TINY_BLOB': 249, | ||
'MEDIUM_BLOB': 250, | ||
'LONG_BLOB': 251, | ||
'BLOB': 252, | ||
'VAR_STRING': 253, | ||
'STRING': 254, | ||
'GEOMETRY': 255, | ||
TIMESTAMP2: 17, | ||
DATETIME2: 18, | ||
TIME2: 19, | ||
// JSON data type added in MySQL 5.7.7 | ||
JSON: 245, | ||
NEWDECIMAL: 246, | ||
ENUM: 247, | ||
SET: 248, | ||
TINY_BLOB: 249, | ||
MEDIUM_BLOB: 250, | ||
LONG_BLOB: 251, | ||
BLOB: 252, | ||
VAR_STRING: 253, | ||
STRING: 254, | ||
GEOMETRY: 255, | ||
}; | ||
@@ -71,11 +76,7 @@ | ||
var parseSetEnumTypeDef = function(type){ | ||
var prefixLen; | ||
if(type.substr(0,4).toLowerCase() === 'set('){ | ||
prefixLen = 4; | ||
}else if(type.substr(0,5).toLowerCase() === 'enum('){ | ||
prefixLen = 5; | ||
}else{ | ||
throw 'not set or enum type'; | ||
} | ||
// Parse column definition list string for SET and ENUM data types | ||
// @param type String Definition of column 'set(...)' or 'enum(...)' | ||
// @param prefixLen Integer Number of characters before list starts | ||
// (e.g. 'set(': 4, 'enum(': 5) | ||
var parseSetEnumTypeDef = function(type, prefixLen){ | ||
// listed distinct elements should not include commas | ||
@@ -89,3 +90,3 @@ return type.substr(prefixLen, type.length - prefixLen - 1) | ||
var zeroPad = function(num, size) { | ||
var zeroPad = exports.zeroPad = function(num, size) { | ||
// Max 32 digits | ||
@@ -96,3 +97,3 @@ var s = "00000000000000000000000000000000" + num; | ||
var sliceBits = function(input, start, end){ | ||
var sliceBits = exports.sliceBits = function(input, start, end){ | ||
// ex: start: 10, end: 15 = "111110000000000" | ||
@@ -107,3 +108,3 @@ var match = (((1 << end) - 1) ^ ((1 << start) - 1)); | ||
// Pass only high for 32-bit float, pass high and low for 64-bit double | ||
var parseIEEE754Float = function(high, low){ | ||
var parseIEEE754Float = exports.parseIEEE754Float = function(high, low){ | ||
var lastSignificantBit, sigFigs, expLeading; | ||
@@ -145,3 +146,3 @@ if(low !== undefined){ | ||
var getUInt32Value = function(input){ | ||
var getUInt32Value = exports.getUInt32Value = function(input){ | ||
// Last bit is not sign, it is part of value! | ||
@@ -256,6 +257,7 @@ if(input & (1 << 31)) return Math.pow(2, 31) + (input & ((1 << 31) -1)); | ||
str += '.'; // Proceeding bytes are fractional digits | ||
// Build fractional digits | ||
var fractionDigits = ''; | ||
for(var i = 0; i < uncompFractional; i++){ | ||
str += (buffer.readInt32BE(pos) ^ mask).toString(10); | ||
fractionDigits += (buffer.readInt32BE(pos) ^ mask).toString(10); | ||
pos += 4; | ||
@@ -266,5 +268,9 @@ } | ||
if(compFractionalSize > 0){ | ||
str += (readIntBE(buffer, pos, compFractionalSize) ^ mask).toString(10); | ||
fractionDigits += | ||
(readIntBE(buffer, pos, compFractionalSize) ^ mask).toString(10); | ||
} | ||
// Fractional digits may have leading zeros | ||
str += '.' + zeroPad(fractionDigits, scale); | ||
return parseFloat(str); | ||
@@ -331,13 +337,2 @@ }; | ||
var convertToMysqlType = exports.convertToMysqlType = function(code) { | ||
var result; | ||
Object.keys(MysqlTypes).forEach(function(name) { | ||
if (MysqlTypes[name] === code) { | ||
result = name; | ||
return; | ||
} | ||
}); | ||
return result; | ||
}; | ||
var readTemporalFraction = function(parser, fractionPrecision) { | ||
@@ -367,3 +362,3 @@ if(!fractionPrecision) return false; | ||
exports.readMysqlValue = function(parser, column, columnSchema) { | ||
exports.readMysqlValue = function(parser, column, columnSchema, tableMap, emitter) { | ||
var result; | ||
@@ -402,3 +397,4 @@ // jshint indent: false | ||
var choices = parseSetEnumTypeDef(columnSchema.COLUMN_TYPE); | ||
// Second argument: prefixLen = 4 'set(' | ||
var choices = parseSetEnumTypeDef(columnSchema.COLUMN_TYPE, 4); | ||
result = ''; | ||
@@ -417,3 +413,4 @@ for(var i = 0; low >= Math.pow(2, i); i++){ | ||
var raw = parser.parseUnsignedNumber(column.metadata.size); | ||
var choices = parseSetEnumTypeDef(columnSchema.COLUMN_TYPE); | ||
// Second argument: prefixLen = 5 'enum(' | ||
var choices = parseSetEnumTypeDef(columnSchema.COLUMN_TYPE, 5); | ||
result = choices[raw - 1]; | ||
@@ -446,5 +443,16 @@ break; | ||
var lengthSize = column.metadata['length_size']; | ||
result = parser.parseString( | ||
parser.parseUnsignedNumber(lengthSize)); | ||
result = parser.parseBuffer(parser.parseUnsignedNumber(lengthSize)); | ||
// Blobs can be sometimes return as String instead of Buffer | ||
// e.g. TINYTEXT, MEDIUMTEXT, LONGTEXT, TEXT data types | ||
if(column.charset !== null) { | ||
result = iconv.decode(result, column.charset); | ||
} | ||
break; | ||
case MysqlTypes.JSON: | ||
var lengthSize = column.metadata['length_size']; | ||
var size = parser.parseUnsignedNumber(lengthSize); | ||
var buffer = parser.parseBuffer(size); | ||
result = decodeJson(buffer); | ||
break; | ||
case MysqlTypes.GEOMETRY: | ||
@@ -560,5 +568,10 @@ var lengthSize = column.metadata['length_size']; | ||
default: | ||
throw new Error('Unsupported type: ' + convertToMysqlType(column.type)); | ||
result = undefined; | ||
emitter.emit('error', | ||
new Error('Unsupported type "' + column.type + | ||
'" on column "' + column.name + | ||
'" of the table "' + tableMap.tableName + | ||
'" in the database "' + tableMap.parentSchema + '"')); | ||
} | ||
return result; | ||
}; |
@@ -8,7 +8,2 @@ // Constants for variable length encoded binary | ||
var UNSIGNED_CHAR_LENGTH = 1; | ||
var UNSIGNED_SHORT_LENGTH = 2; | ||
var UNSIGNED_INT24_LENGTH = 3; | ||
var UNSIGNED_INT64_LENGTH = 8; | ||
function BufferReader(buffer) { | ||
@@ -15,0 +10,0 @@ this.buffer = buffer; |
@@ -23,2 +23,3 @@ var util = require('util'); | ||
BinlogEvent.apply(this, arguments); | ||
this._zongji = zongji; | ||
this._readTableId(parser); | ||
@@ -95,6 +96,6 @@ this.flags = parser.parseUnsignedNumber(2); | ||
RowsEvent.prototype._fetchOneRow = function(parser) { | ||
return readRow(this.tableMap[this.tableId], parser); | ||
return readRow(this.tableMap[this.tableId], parser, this._zongji); | ||
}; | ||
var readRow = function(tableMap, parser) { | ||
var readRow = function(tableMap, parser, emitter) { | ||
var row = {}, column, columnSchema; | ||
@@ -113,3 +114,4 @@ var nullBitmapSize = Math.floor((tableMap.columns.length + 7) / 8); | ||
if((curNullByte & (1 << curBit)) === 0){ | ||
row[column.name] = Common.readMysqlValue(parser, column, columnSchema); | ||
row[column.name] = | ||
Common.readMysqlValue(parser, column, columnSchema, tableMap, emitter); | ||
}else{ | ||
@@ -145,4 +147,4 @@ row[column.name] = null; | ||
return { | ||
before: readRow(tableMap, parser), | ||
after: readRow(tableMap, parser) | ||
before: readRow(tableMap, parser, this._zongji), | ||
after: readRow(tableMap, parser, this._zongji) | ||
}; | ||
@@ -149,0 +151,0 @@ }; |
{ | ||
"name": "zongji", | ||
"version": "0.3.2", | ||
"version": "0.4.0", | ||
"description": "A mysql binlog listener running on Node.js", | ||
"main": "index.js", | ||
"engines": { | ||
"node": "0.10" | ||
}, | ||
"directories": { | ||
@@ -33,7 +30,8 @@ "test": "test" | ||
"devDependencies": { | ||
"nodeunit": "~0.9.0" | ||
"nodeunit": "~0.9.1" | ||
}, | ||
"dependencies": { | ||
"mysql": "~2.5.5" | ||
"iconv-lite": "^0.4.13", | ||
"mysql": "~2.10.2" | ||
} | ||
} |
@@ -52,3 +52,3 @@ # ZongJi [![Build Status](https://travis-ci.org/nevill/zongji.svg?branch=master)](https://travis-ci.org/nevill/zongji) | ||
```sql | ||
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'zongji'@'localhost' | ||
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'zongji'@'localhost' | ||
``` | ||
@@ -55,0 +55,0 @@ |
Sorry, the diff of this file is not supported yet
73245
19
1784
2
+ Addediconv-lite@^0.4.13
+ Addedbignumber.js@2.1.4(transitive)
+ Addediconv-lite@0.4.24(transitive)
+ Addedmysql@2.10.2(transitive)
+ Addedsafer-buffer@2.1.2(transitive)
- Removedbignumber.js@2.0.0(transitive)
- Removedmysql@2.5.5(transitive)
- Removedrequire-all@1.0.0(transitive)
Updatedmysql@~2.10.2