async-logging
Advanced tools
Comparing version 0.1.5 to 0.1.6
@@ -6,3 +6,3 @@ 'use strict'; | ||
* Buffer should enable a processing pipeline which is responsible for restructure the raw events in a format of tree | ||
* Buffer should also be an EventEmitter, which fires higher level events like "transaction completed", "event completed" etc. to allow downstream | ||
* Buffer should also be an EventEmitter, which fires higher level events like 'transaction completed', 'event completed' etc. to allow downstream | ||
* listeners (CAL publisher) to react to those events. | ||
@@ -15,3 +15,3 @@ * | ||
* transaction is the most common type, user request is mapped to a transaction, according to the schema below, transaction is an event with no parent | ||
* begins with a marker of "begin", and ends with a marker of "end", itself has an event id, which all of the events enclosed by this transaction is | ||
* begins with a marker of 'begin', and ends with a marker of 'end', it_this has an event id, which all of the events enclosed by this transaction is | ||
* labeled with. | ||
@@ -27,113 +27,177 @@ * | ||
*/ | ||
var _ = require("underscore"), | ||
crypto = require("crypto"), | ||
assert = require("assert"); | ||
var _ = require('underscore'), | ||
crypto = require('crypto'), | ||
util = require('util'); | ||
//standardized the tid pseudo assignment | ||
//pid must be used as a prefix, and the uuid will be hashed, and only the last char of hex encoded will be used, giving an equal oppotunity of 1/16 | ||
var assignThread = function(pid, uuid){ | ||
var md5 = crypto.createHash("md5"); | ||
md5.update(uuid || ""); | ||
var hex = md5.digest("hex"); | ||
//using the last char of a md5 hex encoding, giving it a fair chance of being one of the 16 chars. | ||
return pid + '-' + hex[hex.length-1]; | ||
}; | ||
var VALID_CLAZZ = ['atomicEvent', 'heartbeat', 'begin', 'end'], | ||
//standardized the tid pseudo assignment | ||
//pid must be used as a prefix, and the uuid will be hashed, and only the last char of hex encoded will be used, giving an equal oppotunity of 1/16 | ||
assignThread = function assignThread(pid, uuid){ | ||
var md5 = crypto.createHash('md5'); | ||
md5.update(uuid || ''); | ||
var hex = md5.digest('hex'); | ||
//using the last char of a md5 hex encoding, giving it a fair chance of being one of the 16 chars. | ||
return pid + '-' + hex[hex.length-1]; | ||
}, | ||
DEFAULT_MAPPER = function DEFAULT_MAPPER(log){ | ||
var DEFAULT_MAPPER = function(log){ | ||
var mapped = { | ||
type : log.type || 'URL', | ||
name: log.name, | ||
request: log.uuid, | ||
parent: log.parent || '0', | ||
clazz: log.clazz || 'atomicEvent', | ||
event : log.event, | ||
duration: log.duration, | ||
pid: log.pid, | ||
tid: log.tid || assignThread(log.pid, log.uuid), | ||
machine: log.machine, | ||
ipAddress: log.ipAddress || log.ip, | ||
pool: log.pool, | ||
level: log.level, | ||
msg: log.msg, | ||
rlogid: log.rlogid, | ||
timestamp: log.timestamp || Date.now() | ||
}; | ||
//in case there's anything else not mapped. | ||
_.extend(mapped, _.omit(log, 'type', 'uuid', 'parent', 'begin', 'end', 'event', 'duration', 'name', 'pid', 'tid', 'machine', 'ipAddress', 'pool', 'level', 'msg', 'rlogid', 'timestamp')); | ||
var mapped = { | ||
type : log.type || "a", | ||
request: log.uuid, | ||
parent: log.parent || "0", | ||
begin : log.begin, | ||
end : log.end, | ||
event : log.event, | ||
duration: log.duration, | ||
name: log.name, | ||
pid: log.pid, | ||
tid: log.tid || assignThread(log.pid, log.uuid), | ||
machine: log.machine, | ||
ipAddress: log.ipAddress || log.ip, | ||
pool: log.pool, | ||
level: log.level, | ||
msg: log.msg, | ||
rlogid: log.rlogid | ||
}; | ||
//in case there's anything else not mapped. | ||
_.extend(mapped, _.omit(log, "type", "uuid", "parent", "begin", "end", "event", "duration", "name", "pid", "tid", "machine", "ipAddress", "pool", "level", "msg", "rlogid")); | ||
return mapped; | ||
}, | ||
makeTree = function makeTree(family, node){ | ||
return mapped; | ||
}; | ||
if(!node){ | ||
return null; | ||
} | ||
else if(_.isEqual('atomicEvent', node.clazz)){ | ||
return node; | ||
} | ||
else if(_.isEqual('heartbeat', node.clazz)){ | ||
return node; | ||
} | ||
else if(_.isEqual('end', node.clazz)){ | ||
return node; | ||
} | ||
var VALID_TYPES = ["atomicEvent", "heartbeat", "transaction"]; | ||
var children = family[node.event] || []; | ||
var makeTree = function(family, node){ | ||
node.children = _.values(_.reduce(children, function(memoize, c){ | ||
var child = makeTree(family, c), | ||
begin = memoize[c.event]; | ||
if(!begin){//begin of nested transaction | ||
memoize[c.event] = child; | ||
} | ||
else{//end of nested transaction, we'll need to update msg & duration; | ||
_.extend(begin, { | ||
'type': c.type, | ||
'name': c.name, | ||
'msg': c.msg, | ||
'duration': c.duration | ||
//not timestamp | ||
}); | ||
} | ||
return memoize; | ||
}, {})); | ||
if(!node){ | ||
return null; | ||
} | ||
else if(_.isEqual('atomicEvent', node.type)){ | ||
return node; | ||
} | ||
else if(_.isEqual('transaction', node.type) && node.end){ | ||
return null; | ||
} | ||
}, | ||
validateTree = function validateTree(family, node){ | ||
var children = family[node.event] || []; | ||
if(!node){ | ||
return 'end'; | ||
} | ||
else if(_.isEqual('atomicEvent', node.clazz) || _.isEqual('heartbeat', node.clazz)){ | ||
return 'end'; | ||
} | ||
node.children = _.compact(_.map(children, function(c){ | ||
return makeTree(family, c); | ||
})); | ||
var complete = _.reduce(family[node.event] || [], function(memoize, c){ | ||
if(!memoize || !validateTree(family, c)){ | ||
return null; | ||
} | ||
delete node.family;//done with makeTree, remove family | ||
if(_.isEqual(c.clazz, 'begin')){ | ||
memoize[c.event] = 'begin'; | ||
} | ||
else{ | ||
memoize[c.event] = 'end'; | ||
} | ||
return memoize; | ||
}, | ||
{}); | ||
return node; | ||
}; | ||
if(!complete){ | ||
return false; | ||
} | ||
var children = _.values(complete); | ||
return _.isEmpty(children) || _.every(children, function(elem){ | ||
return _.isEqual(elem, 'end'); | ||
}); | ||
}; | ||
var LogBuffer = exports.LogBuffer = function(emitter, mapper){ | ||
var self = this; | ||
self.transactions = {}; | ||
self.atomicEvents = []; | ||
self.heartbeats = []; | ||
self.mapper = mapper || DEFAULT_MAPPER; | ||
var _this = this; | ||
_.extend(this, { | ||
'transactions': {}, | ||
'atomicEvents': [], | ||
'heartbeats': [], | ||
'mapper': mapper || DEFAULT_MAPPER | ||
}); | ||
emitter.on("log", function(log){ | ||
emitter.on('log', function(log){ | ||
var mapped = self.mapper(log), | ||
var mapped = _this.mapper(log), | ||
parent = mapped.parent, | ||
begin = mapped.begin, | ||
end = mapped.end, | ||
clazz = mapped.clazz, | ||
begin = _.isEqual('begin', clazz), | ||
end = _.isEqual('end', clazz), | ||
event = mapped.event, | ||
request= mapped.request, | ||
type = mapped.type; | ||
request= mapped.request; | ||
assert.ok(_.contains(VALID_TYPES, type));//must be one of the valid event types | ||
//console.log(util.format('[mapped] %j, [begin] %s, [end] %s', mapped, begin, end)); | ||
if(!_.contains(VALID_CLAZZ, clazz)){//must be one of the valid event types | ||
console.log(util.format('clazz:%s of log:%j cannot be processed, as the clazz is invalid, should be one of %j', clazz, log, VALID_CLAZZ)); | ||
} | ||
if(parent === "0" && !begin && !end){ | ||
if(_.isEqual("atomicEvent", type)){ | ||
self.atomicEvents.push(mapped); | ||
if(parent === '0' && !begin && !end){ | ||
if(_.isEqual('atomicEvent', clazz)){ | ||
_this.atomicEvents.push(mapped); | ||
emitter.emit("atomicEvent", mapped); | ||
emitter.emit('atomicEvent', mapped); | ||
} | ||
else if(_.isEqual("heartbeat", type)){ | ||
self.heartbeats.push(mapped); | ||
else if(_.isEqual('heartbeat', clazz)){ | ||
_this.heartbeats.push(mapped); | ||
emitter.emit("heartbeat", mapped); | ||
emitter.emit('heartbeat', mapped); | ||
} | ||
} | ||
else if(parent === "0" && begin){ | ||
else if(parent === '0' && begin){ | ||
//create the root transaction with empty family tree | ||
var root = self.transactions[request] = mapped; | ||
var root = _this.transactions[request] = mapped; | ||
root.family = {}; | ||
} | ||
else if(parent === "0" && end){ | ||
else if(parent === '0' && end){ | ||
var transaction = _this.transactions[request]; | ||
if(!transaction){ | ||
mapped.family = {};//tolerate case where 'begin' event is missing; | ||
_this.transactions[request] = mapped; | ||
} | ||
var transaction = self.transactions[request]; | ||
transaction.duration = mapped.duration || new Date().getTime() - transaction.timestamp; | ||
transaction.duration = mapped.duration || Date.now() - transaction.timestamp; | ||
transaction.msg = mapped.msg; | ||
var tree = makeTree(transaction.family, transaction); | ||
emitter.emit("transaction", tree); | ||
if(validateTree(transaction.family, transaction)){ | ||
emitter.emit('transaction', makeTree(transaction.family, transaction)); | ||
} | ||
else{ | ||
transaction.age = 1; | ||
} | ||
} | ||
else{ | ||
//tolerate case where 'begin' event is missing; | ||
var transaction = _this.transactions[request]; | ||
if(!transaction){ | ||
_this.transactions[request] = {'family':{}}; | ||
} | ||
//parent & event would form a family map naturally | ||
@@ -144,3 +208,3 @@ //but must be scoped by request (otherwise, parent or event id could duplicate with others) | ||
//to restore the tree structure, simply start with root, get all children, map each child to their children in depth first order using family map | ||
var family = self.transactions[request].family; | ||
var family = _this.transactions[request].family; | ||
family[parent] = family[parent] || []; | ||
@@ -151,25 +215,38 @@ family[parent].push(mapped); | ||
emitter.on("clean", function(till){ | ||
_.each(self.atomicEvents, function(a, i){ | ||
emitter.on('clean', function(till){ | ||
//console.log('[clean] transactions:%j vs. till:%d', _this.transactions, till); | ||
_.each(_this.atomicEvents, function(a, i){ | ||
if(a.timestamp <= till){ | ||
self.atomicEvents[i] = null; | ||
_this.atomicEvents[i] = null; | ||
} | ||
}); | ||
self.atomicEvents = _.compact(self.atomicEvents); | ||
_this.atomicEvents = _.compact(_this.atomicEvents); | ||
_.each(self.heartbeats, function(h, i){ | ||
_.each(_this.heartbeats, function(h, i){ | ||
if(h.timestamp <= till){ | ||
self.heartbeats[i] = null; | ||
_this.heartbeats[i] = null; | ||
} | ||
}); | ||
self.heartbeats = _.compact(self.heartbeats); | ||
_this.heartbeats = _.compact(_this.heartbeats); | ||
_.each(self.transactions, function(tx, req){ | ||
_.each(_this.transactions, function(tx, req){ | ||
//console.log(util.format('[clean] %j vs. till:%d', tx, till)); | ||
if(tx.timestamp <= till){ | ||
delete self.transactions[req]; | ||
var transaction = _this.transactions[req]; | ||
if(transaction.age){ | ||
transaction.age = validateTree(transaction.family, transaction) ? 3 : transaction.age + 1; | ||
if(transaction.age >= 3){//forced to emit incomplete tree | ||
emitter.emit('transaction', makeTree(transaction.family, transaction)); | ||
transaction.age = null;//clean next cycle | ||
} | ||
} | ||
else{ | ||
delete _this.transactions[req]; | ||
} | ||
} | ||
}); | ||
emitter.emit("cleaned", {till: till}); | ||
emitter.emit('cleaned', {till: till}); | ||
}); | ||
}; | ||
}; |
'use strict'; | ||
var LogTransport = require("./log-transport.js").LogTransport, | ||
var WebSocketTransport = require("./index.js").WebSocketTransport, | ||
assert = require("assert"); | ||
@@ -11,9 +11,15 @@ | ||
var emitter = options.emitter || process, | ||
ActualTransport = options.LogTransport || LogTransport; | ||
ActualTransport = options.LogTransport || WebSocketTransport; | ||
var transport = new ActualTransport(options); | ||
var transport = new ActualTransport(options), | ||
listener = function(log){ | ||
transport.log(log); | ||
}; | ||
emitter.on("log", function(log){ | ||
transport.log(log); | ||
}); | ||
emitter.on('log', listener); | ||
this.stop = function(){ | ||
//when there needs to be different client (url changed etc.) | ||
emitter.removeListener('log', listener); | ||
}; | ||
}; |
@@ -9,11 +9,11 @@ 'use strict'; | ||
var WebSocketServer = require('websocket').server, | ||
http = require("http"), | ||
os = require("os"), | ||
_ = require("underscore"), | ||
Cluster = require("cluster2"), | ||
LogListener = require("./log-listener.js").LogListener, | ||
LogBuffer = require("./log-buffer.js").LogBuffer, | ||
WinstonPublisher = require("./winston-publisher.js").WinstonPublisher, | ||
assert = require("assert"), | ||
EventEmitter = require("events").EventEmitter; | ||
http = require('http'), | ||
os = require('os'), | ||
_ = require('underscore'), | ||
Cluster = require('cluster2'), | ||
LogListener = require('./log-listener.js').LogListener, | ||
LogBuffer = require('./log-buffer.js').LogBuffer, | ||
WinstonPublisher = require('./winston-publisher.js').WinstonPublisher, | ||
assert = require('assert'), | ||
EventEmitter = require('events').EventEmitter; | ||
@@ -24,6 +24,6 @@ var LogCluster = exports.LogCluster = function(options, emitter){ | ||
//try not closing the http connection. | ||
console.log('[log-cluster]' + request.url); | ||
console.log('[log-cluster] received: ' + request.url); | ||
response.send('', 404); | ||
}), | ||
wss = new WebSocketServer({ | ||
wss = options.websocket ? new WebSocketServer({ | ||
httpServer: server, | ||
@@ -36,3 +36,3 @@ // You should not use autoAcceptConnections for production | ||
autoAcceptConnections: false | ||
}), | ||
}) : null, | ||
actualOptions = { | ||
@@ -51,3 +51,3 @@ port: 3000, | ||
LogPublisher: WinstonPublisher, | ||
cleanDuration : 1000 * 3600,//one hour | ||
cleanDuration : 1000 * 60,//one min | ||
}; | ||
@@ -60,3 +60,3 @@ _.extend(actualOptions, options); | ||
console.log("listening:" + actualOptions.port); | ||
console.log('listening:' + actualOptions.port); | ||
@@ -85,13 +85,17 @@ var logCluster = new Cluster({ | ||
var cleanDuration = actualOptions.cleanDuration, | ||
till = Date.now() - cleanDuration, | ||
cleanUpInterval = setInterval(function(){ | ||
emitter.emit('clean', till); | ||
till += cleanDuration; | ||
}, cleanDuration); | ||
logCluster.on('SIGINT', _.bind(clearInterval, null, cleanUpInterval)); | ||
logCluster.on('SIGTERM', _.bind(clearInterval, null, cleanUpInterval)); | ||
cleanUpTimeout = null, | ||
cleanUpCycle = function(){ | ||
emitter.emit('clean', Date.now() - cleanDuration);//this is a bugfix, we used to use till += cleanDuration, which could grow bigger gap with each cycle | ||
cleanUpTimeout = setTimeout(cleanUpCycle, cleanDuration);//this guarantees that the next cycle is started only after the previous is finished. | ||
}; | ||
//init timeout | ||
cleanUpTimeout = setTimeout(cleanUpCycle, cleanDuration); | ||
//unregister the timer to unblock the cluster shutdown | ||
logCluster.once('SIGINT', _.bind(clearTimeout, null, cleanUpTimeout)); | ||
logCluster.once('SIGTERM', _.bind(clearTimeout, null, cleanUpTimeout)); | ||
}); | ||
return logCluster; | ||
}; | ||
}; | ||
exports.LogClient = require('./log-client.js').LogClient; |
@@ -15,13 +15,14 @@ 'use strict'; | ||
//this isn't correct | ||
return true;//origin.contains(".ebay.com"); | ||
return true;//origin.contains('.ebay.com'); | ||
}, | ||
msgpack = require("msgpack"), | ||
_ = require("underscore"); | ||
msgpack = require('msgpack'), | ||
util = require('util'), | ||
_ = require('underscore'); | ||
var LogListener = exports.LogListener = function(wss, emitter){ | ||
var self = this; | ||
self.connections = []; | ||
var _this = this; | ||
_this.connections = []; | ||
wss.on("request", function(request) { | ||
wss.on('request', function(request) { | ||
@@ -34,5 +35,4 @@ if (!originIsAllowed(request.origin)) { | ||
console.log('[log-websocket]' + JSON.stringify(request)); | ||
var connection = request.accept("log-protocol", request.origin); | ||
self.connections.push(connection); | ||
var connection = request.accept('log-protocol', request.origin); | ||
_this.connections.push(connection); | ||
@@ -45,29 +45,40 @@ connection.idleMonitor = setInterval(function(){ | ||
connection.on("message", function(message) { | ||
connection.on('message', function(message) { | ||
console.log('[log-websocket] message received'); | ||
var bytes = message.binaryData, | ||
buf = new Buffer(bytes.length); | ||
bytes.copy(buf, 0, 0, bytes.length); | ||
try{ | ||
var bytes = message.binaryData, | ||
buf = new Buffer(bytes.length); | ||
bytes.copy(buf, 0, 0, bytes.length); | ||
var unpack = msgpack.unpack(buf), | ||
logs = _.isArray(unpack) ? unpack : [unpack]; | ||
var unpack = msgpack.unpack(buf), | ||
logs = _.isArray(unpack) ? unpack : [unpack]; | ||
_.each(logs, function(log){ | ||
emitter.emit("log", log); | ||
}); | ||
//clear previous timeout and setup a new one. | ||
connection.lastMessageReceivedAt = Date.now(); | ||
console.log(util.format('[log-websocket] message received:\n%j', logs)); | ||
_.each(logs, function(log){ | ||
emitter.emit('log', log); | ||
}); | ||
} | ||
catch(e){ | ||
console.log(util.format('[log-websocket] message handling error:%s\n%j', e, e.stack)); | ||
} | ||
finally{ | ||
//clear previous timeout and setup a new one. | ||
connection.lastMessageReceivedAt = Date.now(); | ||
} | ||
}); | ||
connection.on("close", function(reasonCode, description){ | ||
connection.on('close', function(reasonCode, description){ | ||
console.log('[log-websocket] closed due to:' + reasonCode + ':' + description); | ||
clearTimeout(connection.idleMonitor); | ||
self.connections = _.without(self.connections, connection); | ||
_this.connections = _.without(_this.connections, connection); | ||
}); | ||
setTimeout(function(){ | ||
connection.sendUTF('ready'); | ||
console.log('[log-websocket] ready message sent'); | ||
}, 100);//this is just a safe measure to allow client to have time to register its ready callback | ||
}); | ||
wss.on("close", function(){ | ||
_.each(this.connections, function(connection){ | ||
wss.on('close', function(){ | ||
_.each(_this.connections, function(connection){ | ||
clearTimeout(connection.idleMonitor); | ||
@@ -74,0 +85,0 @@ }); |
'use strict'; | ||
var LogCluster = require("./log-cluster.js").LogCluster; | ||
var LogCluster = require('./log-cluster.js').LogCluster; | ||
new LogCluster(); |
{ | ||
"author": "cubejs", | ||
"name": "async-logging", | ||
"version": "0.1.5", | ||
"version": "0.1.6", | ||
"description": "0.1.6 is the same as 0.2.2 just to get around ebay-logging-client vs. async-logging-client change", | ||
"repository": { | ||
@@ -12,16 +13,16 @@ "type": "git", | ||
}, | ||
"main":"./lib/log-cluster.js", | ||
"main":"./lib/index.js", | ||
"dependencies": { | ||
"websocket": "~1.0.8", | ||
"msgpack": "~0.1.8", | ||
"msgpack": "0.1.8", | ||
"underscore": "~1.4.4", | ||
"cluster2": "~0.4.0", | ||
"winston": "~0.7.1" | ||
"cluster2": "git://github.com/cubejs/cluster2.git", | ||
"winston": "~0.7.1", | ||
"request": "~2.22.0" | ||
}, | ||
"devDependencies": { | ||
"express": "~3.2", | ||
"mocha": "", | ||
"redis": "", | ||
"should": "", | ||
"q": "" | ||
"mocha": "~1.11.0", | ||
"should": "~1.2.2", | ||
"q": "~0.9.6" | ||
}, | ||
@@ -28,0 +29,0 @@ "scripts":{ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Git dependency
Supply chain riskContains a dependency which resolves to a remote git URL. Dependencies fetched from git URLs are not immutable can be used to inject untrusted code or reduce the likelihood of a reproducible install.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
27339
4
18
559
6
1
2
+ Addedrequest@~2.22.0
+ Addedasn1@0.1.11(transitive)
+ Addedassert-plus@0.1.5(transitive)
+ Addedaws-sign@0.3.0(transitive)
+ Addedboom@0.4.2(transitive)
+ Addedcookie-jar@0.3.0(transitive)
+ Addedcryptiles@0.2.2(transitive)
+ Addedctype@0.5.3(transitive)
+ Addedforever-agent@0.5.2(transitive)
+ Addedform-data@0.0.8(transitive)
+ Addedhawk@0.13.1(transitive)
+ Addedhoek@0.8.50.9.1(transitive)
+ Addedhttp-signature@0.10.1(transitive)
+ Addedjson-stringify-safe@4.0.0(transitive)
+ Addedmsgpack@0.1.8(transitive)
+ Addedoauth-sign@0.3.0(transitive)
+ Addedqs@0.6.6(transitive)
+ Addedrequest@2.22.0(transitive)
+ Addedsntp@0.2.4(transitive)
+ Addedtunnel-agent@0.3.0(transitive)
- Removedbignumber.js@1.1.1(transitive)
- Removedbindings@1.5.0(transitive)
- Removedcluster2@0.4.26(transitive)
- Removedconnect@1.9.2(transitive)
- Removedejs@0.8.8(transitive)
- Removedexpress@2.5.11(transitive)
- Removedfile-uri-to-path@1.0.0(transitive)
- Removedform-data@0.0.10(transitive)
- Removedformidable@1.0.17(transitive)
- Removedgc-stats@0.0.6(transitive)
- Removedmime@1.2.4(transitive)
- Removedmkdirp@0.3.0(transitive)
- Removedmsgpack@0.1.11(transitive)
- Removednan@2.22.0(transitive)
- Removednpm@1.3.26(transitive)
- Removedqs@0.4.2(transitive)
- Removedusage@0.3.10(transitive)
- Removedwhen@2.4.0(transitive)
Updatedmsgpack@0.1.8