Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

async-logging

Package Overview
Dependencies
Maintainers
1
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

async-logging - npm Package Compare versions

Comparing version 0.1.5 to 0.1.6

lib/http-transport.js

261

lib/log-buffer.js

@@ -6,3 +6,3 @@ 'use strict';

* Buffer should enable a processing pipeline which is responsible for restructure the raw events in a format of tree
* Buffer should also be an EventEmitter, which fires higher level events like "transaction completed", "event completed" etc. to allow downstream
* Buffer should also be an EventEmitter, which fires higher level events like 'transaction completed', 'event completed' etc. to allow downstream
* listeners (CAL publisher) to react to those events.

@@ -15,3 +15,3 @@ *

* transaction is the most common type, user request is mapped to a transaction, according to the schema below, transaction is an event with no parent
* begins with a marker of "begin", and ends with a marker of "end", itself has an event id, which all of the events enclosed by this transaction is
* begins with a marker of 'begin', and ends with a marker of 'end', it_this has an event id, which all of the events enclosed by this transaction is
* labeled with.

@@ -27,113 +27,177 @@ *

*/
var _ = require("underscore"),
crypto = require("crypto"),
assert = require("assert");
var _ = require('underscore'),
crypto = require('crypto'),
util = require('util');
//standardized the tid pseudo assignment
//pid must be used as a prefix, and the uuid will be hashed, and only the last char of hex encoded will be used, giving an equal oppotunity of 1/16
var assignThread = function(pid, uuid){
var md5 = crypto.createHash("md5");
md5.update(uuid || "");
var hex = md5.digest("hex");
//using the last char of a md5 hex encoding, giving it a fair chance of being one of the 16 chars.
return pid + '-' + hex[hex.length-1];
};
var VALID_CLAZZ = ['atomicEvent', 'heartbeat', 'begin', 'end'],
//standardized the tid pseudo assignment
//pid must be used as a prefix, and the uuid will be hashed, and only the last char of hex encoded will be used, giving an equal oppotunity of 1/16
assignThread = function assignThread(pid, uuid){
var md5 = crypto.createHash('md5');
md5.update(uuid || '');
var hex = md5.digest('hex');
//using the last char of a md5 hex encoding, giving it a fair chance of being one of the 16 chars.
return pid + '-' + hex[hex.length-1];
},
DEFAULT_MAPPER = function DEFAULT_MAPPER(log){
var DEFAULT_MAPPER = function(log){
var mapped = {
type : log.type || 'URL',
name: log.name,
request: log.uuid,
parent: log.parent || '0',
clazz: log.clazz || 'atomicEvent',
event : log.event,
duration: log.duration,
pid: log.pid,
tid: log.tid || assignThread(log.pid, log.uuid),
machine: log.machine,
ipAddress: log.ipAddress || log.ip,
pool: log.pool,
level: log.level,
msg: log.msg,
rlogid: log.rlogid,
timestamp: log.timestamp || Date.now()
};
//in case there's anything else not mapped.
_.extend(mapped, _.omit(log, 'type', 'uuid', 'parent', 'begin', 'end', 'event', 'duration', 'name', 'pid', 'tid', 'machine', 'ipAddress', 'pool', 'level', 'msg', 'rlogid', 'timestamp'));
var mapped = {
type : log.type || "a",
request: log.uuid,
parent: log.parent || "0",
begin : log.begin,
end : log.end,
event : log.event,
duration: log.duration,
name: log.name,
pid: log.pid,
tid: log.tid || assignThread(log.pid, log.uuid),
machine: log.machine,
ipAddress: log.ipAddress || log.ip,
pool: log.pool,
level: log.level,
msg: log.msg,
rlogid: log.rlogid
};
//in case there's anything else not mapped.
_.extend(mapped, _.omit(log, "type", "uuid", "parent", "begin", "end", "event", "duration", "name", "pid", "tid", "machine", "ipAddress", "pool", "level", "msg", "rlogid"));
return mapped;
},
makeTree = function makeTree(family, node){
return mapped;
};
if(!node){
return null;
}
else if(_.isEqual('atomicEvent', node.clazz)){
return node;
}
else if(_.isEqual('heartbeat', node.clazz)){
return node;
}
else if(_.isEqual('end', node.clazz)){
return node;
}
var VALID_TYPES = ["atomicEvent", "heartbeat", "transaction"];
var children = family[node.event] || [];
var makeTree = function(family, node){
node.children = _.values(_.reduce(children, function(memoize, c){
var child = makeTree(family, c),
begin = memoize[c.event];
if(!begin){//begin of nested transaction
memoize[c.event] = child;
}
else{//end of nested transaction, we'll need to update msg & duration;
_.extend(begin, {
'type': c.type,
'name': c.name,
'msg': c.msg,
'duration': c.duration
//not timestamp
});
}
return memoize;
}, {}));
if(!node){
return null;
}
else if(_.isEqual('atomicEvent', node.type)){
return node;
}
else if(_.isEqual('transaction', node.type) && node.end){
return null;
}
},
validateTree = function validateTree(family, node){
var children = family[node.event] || [];
if(!node){
return 'end';
}
else if(_.isEqual('atomicEvent', node.clazz) || _.isEqual('heartbeat', node.clazz)){
return 'end';
}
node.children = _.compact(_.map(children, function(c){
return makeTree(family, c);
}));
var complete = _.reduce(family[node.event] || [], function(memoize, c){
if(!memoize || !validateTree(family, c)){
return null;
}
delete node.family;//done with makeTree, remove family
if(_.isEqual(c.clazz, 'begin')){
memoize[c.event] = 'begin';
}
else{
memoize[c.event] = 'end';
}
return memoize;
},
{});
return node;
};
if(!complete){
return false;
}
var children = _.values(complete);
return _.isEmpty(children) || _.every(children, function(elem){
return _.isEqual(elem, 'end');
});
};
var LogBuffer = exports.LogBuffer = function(emitter, mapper){
var self = this;
self.transactions = {};
self.atomicEvents = [];
self.heartbeats = [];
self.mapper = mapper || DEFAULT_MAPPER;
var _this = this;
_.extend(this, {
'transactions': {},
'atomicEvents': [],
'heartbeats': [],
'mapper': mapper || DEFAULT_MAPPER
});
emitter.on("log", function(log){
emitter.on('log', function(log){
var mapped = self.mapper(log),
var mapped = _this.mapper(log),
parent = mapped.parent,
begin = mapped.begin,
end = mapped.end,
clazz = mapped.clazz,
begin = _.isEqual('begin', clazz),
end = _.isEqual('end', clazz),
event = mapped.event,
request= mapped.request,
type = mapped.type;
request= mapped.request;
assert.ok(_.contains(VALID_TYPES, type));//must be one of the valid event types
//console.log(util.format('[mapped] %j, [begin] %s, [end] %s', mapped, begin, end));
if(!_.contains(VALID_CLAZZ, clazz)){//must be one of the valid event types
console.log(util.format('clazz:%s of log:%j cannot be processed, as the clazz is invalid, should be one of %j', clazz, log, VALID_CLAZZ));
}
if(parent === "0" && !begin && !end){
if(_.isEqual("atomicEvent", type)){
self.atomicEvents.push(mapped);
if(parent === '0' && !begin && !end){
if(_.isEqual('atomicEvent', clazz)){
_this.atomicEvents.push(mapped);
emitter.emit("atomicEvent", mapped);
emitter.emit('atomicEvent', mapped);
}
else if(_.isEqual("heartbeat", type)){
self.heartbeats.push(mapped);
else if(_.isEqual('heartbeat', clazz)){
_this.heartbeats.push(mapped);
emitter.emit("heartbeat", mapped);
emitter.emit('heartbeat', mapped);
}
}
else if(parent === "0" && begin){
else if(parent === '0' && begin){
//create the root transaction with empty family tree
var root = self.transactions[request] = mapped;
var root = _this.transactions[request] = mapped;
root.family = {};
}
else if(parent === "0" && end){
else if(parent === '0' && end){
var transaction = _this.transactions[request];
if(!transaction){
mapped.family = {};//tolerate case where 'begin' event is missing;
_this.transactions[request] = mapped;
}
var transaction = self.transactions[request];
transaction.duration = mapped.duration || new Date().getTime() - transaction.timestamp;
transaction.duration = mapped.duration || Date.now() - transaction.timestamp;
transaction.msg = mapped.msg;
var tree = makeTree(transaction.family, transaction);
emitter.emit("transaction", tree);
if(validateTree(transaction.family, transaction)){
emitter.emit('transaction', makeTree(transaction.family, transaction));
}
else{
transaction.age = 1;
}
}
else{
//tolerate case where 'begin' event is missing;
var transaction = _this.transactions[request];
if(!transaction){
_this.transactions[request] = {'family':{}};
}
//parent & event would form a family map naturally

@@ -144,3 +208,3 @@ //but must be scoped by request (otherwise, parent or event id could duplicate with others)

//to restore the tree structure, simply start with root, get all children, map each child to their children in depth first order using family map
var family = self.transactions[request].family;
var family = _this.transactions[request].family;
family[parent] = family[parent] || [];

@@ -151,25 +215,38 @@ family[parent].push(mapped);

emitter.on("clean", function(till){
_.each(self.atomicEvents, function(a, i){
emitter.on('clean', function(till){
//console.log('[clean] transactions:%j vs. till:%d', _this.transactions, till);
_.each(_this.atomicEvents, function(a, i){
if(a.timestamp <= till){
self.atomicEvents[i] = null;
_this.atomicEvents[i] = null;
}
});
self.atomicEvents = _.compact(self.atomicEvents);
_this.atomicEvents = _.compact(_this.atomicEvents);
_.each(self.heartbeats, function(h, i){
_.each(_this.heartbeats, function(h, i){
if(h.timestamp <= till){
self.heartbeats[i] = null;
_this.heartbeats[i] = null;
}
});
self.heartbeats = _.compact(self.heartbeats);
_this.heartbeats = _.compact(_this.heartbeats);
_.each(self.transactions, function(tx, req){
_.each(_this.transactions, function(tx, req){
//console.log(util.format('[clean] %j vs. till:%d', tx, till));
if(tx.timestamp <= till){
delete self.transactions[req];
var transaction = _this.transactions[req];
if(transaction.age){
transaction.age = validateTree(transaction.family, transaction) ? 3 : transaction.age + 1;
if(transaction.age >= 3){//forced to emit incomplete tree
emitter.emit('transaction', makeTree(transaction.family, transaction));
transaction.age = null;//clean next cycle
}
}
else{
delete _this.transactions[req];
}
}
});
emitter.emit("cleaned", {till: till});
emitter.emit('cleaned', {till: till});
});
};
};
'use strict';
var LogTransport = require("./log-transport.js").LogTransport,
var WebSocketTransport = require("./index.js").WebSocketTransport,
assert = require("assert");

@@ -11,9 +11,15 @@

var emitter = options.emitter || process,
ActualTransport = options.LogTransport || LogTransport;
ActualTransport = options.LogTransport || WebSocketTransport;
var transport = new ActualTransport(options);
var transport = new ActualTransport(options),
listener = function(log){
transport.log(log);
};
emitter.on("log", function(log){
transport.log(log);
});
emitter.on('log', listener);
this.stop = function(){
//when there needs to be different client (url changed etc.)
emitter.removeListener('log', listener);
};
};

@@ -9,11 +9,11 @@ 'use strict';

var WebSocketServer = require('websocket').server,
http = require("http"),
os = require("os"),
_ = require("underscore"),
Cluster = require("cluster2"),
LogListener = require("./log-listener.js").LogListener,
LogBuffer = require("./log-buffer.js").LogBuffer,
WinstonPublisher = require("./winston-publisher.js").WinstonPublisher,
assert = require("assert"),
EventEmitter = require("events").EventEmitter;
http = require('http'),
os = require('os'),
_ = require('underscore'),
Cluster = require('cluster2'),
LogListener = require('./log-listener.js').LogListener,
LogBuffer = require('./log-buffer.js').LogBuffer,
WinstonPublisher = require('./winston-publisher.js').WinstonPublisher,
assert = require('assert'),
EventEmitter = require('events').EventEmitter;

@@ -24,6 +24,6 @@ var LogCluster = exports.LogCluster = function(options, emitter){

//try not closing the http connection.
console.log('[log-cluster]' + request.url);
console.log('[log-cluster] received: ' + request.url);
response.send('', 404);
}),
wss = new WebSocketServer({
wss = options.websocket ? new WebSocketServer({
httpServer: server,

@@ -36,3 +36,3 @@ // You should not use autoAcceptConnections for production

autoAcceptConnections: false
}),
}) : null,
actualOptions = {

@@ -51,3 +51,3 @@ port: 3000,

LogPublisher: WinstonPublisher,
cleanDuration : 1000 * 3600,//one hour
cleanDuration : 1000 * 60,//one min
};

@@ -60,3 +60,3 @@ _.extend(actualOptions, options);

console.log("listening:" + actualOptions.port);
console.log('listening:' + actualOptions.port);

@@ -85,13 +85,17 @@ var logCluster = new Cluster({

var cleanDuration = actualOptions.cleanDuration,
till = Date.now() - cleanDuration,
cleanUpInterval = setInterval(function(){
emitter.emit('clean', till);
till += cleanDuration;
}, cleanDuration);
logCluster.on('SIGINT', _.bind(clearInterval, null, cleanUpInterval));
logCluster.on('SIGTERM', _.bind(clearInterval, null, cleanUpInterval));
cleanUpTimeout = null,
cleanUpCycle = function(){
emitter.emit('clean', Date.now() - cleanDuration);//this is a bugfix, we used to use till += cleanDuration, which could grow bigger gap with each cycle
cleanUpTimeout = setTimeout(cleanUpCycle, cleanDuration);//this guarantees that the next cycle is started only after the previous is finished.
};
//init timeout
cleanUpTimeout = setTimeout(cleanUpCycle, cleanDuration);
//unregister the timer to unblock the cluster shutdown
logCluster.once('SIGINT', _.bind(clearTimeout, null, cleanUpTimeout));
logCluster.once('SIGTERM', _.bind(clearTimeout, null, cleanUpTimeout));
});
return logCluster;
};
};
exports.LogClient = require('./log-client.js').LogClient;

@@ -15,13 +15,14 @@ 'use strict';

//this isn't correct
return true;//origin.contains(".ebay.com");
return true;//origin.contains('.ebay.com');
},
msgpack = require("msgpack"),
_ = require("underscore");
msgpack = require('msgpack'),
util = require('util'),
_ = require('underscore');
var LogListener = exports.LogListener = function(wss, emitter){
var self = this;
self.connections = [];
var _this = this;
_this.connections = [];
wss.on("request", function(request) {
wss.on('request', function(request) {

@@ -34,5 +35,4 @@ if (!originIsAllowed(request.origin)) {

console.log('[log-websocket]' + JSON.stringify(request));
var connection = request.accept("log-protocol", request.origin);
self.connections.push(connection);
var connection = request.accept('log-protocol', request.origin);
_this.connections.push(connection);

@@ -45,29 +45,40 @@ connection.idleMonitor = setInterval(function(){

connection.on("message", function(message) {
connection.on('message', function(message) {
console.log('[log-websocket] message received');
var bytes = message.binaryData,
buf = new Buffer(bytes.length);
bytes.copy(buf, 0, 0, bytes.length);
try{
var bytes = message.binaryData,
buf = new Buffer(bytes.length);
bytes.copy(buf, 0, 0, bytes.length);
var unpack = msgpack.unpack(buf),
logs = _.isArray(unpack) ? unpack : [unpack];
var unpack = msgpack.unpack(buf),
logs = _.isArray(unpack) ? unpack : [unpack];
_.each(logs, function(log){
emitter.emit("log", log);
});
//clear previous timeout and setup a new one.
connection.lastMessageReceivedAt = Date.now();
console.log(util.format('[log-websocket] message received:\n%j', logs));
_.each(logs, function(log){
emitter.emit('log', log);
});
}
catch(e){
console.log(util.format('[log-websocket] message handling error:%s\n%j', e, e.stack));
}
finally{
//clear previous timeout and setup a new one.
connection.lastMessageReceivedAt = Date.now();
}
});
connection.on("close", function(reasonCode, description){
connection.on('close', function(reasonCode, description){
console.log('[log-websocket] closed due to:' + reasonCode + ':' + description);
clearTimeout(connection.idleMonitor);
self.connections = _.without(self.connections, connection);
_this.connections = _.without(_this.connections, connection);
});
setTimeout(function(){
connection.sendUTF('ready');
console.log('[log-websocket] ready message sent');
}, 100);//this is just a safe measure to allow client to have time to register its ready callback
});
wss.on("close", function(){
_.each(this.connections, function(connection){
wss.on('close', function(){
_.each(_this.connections, function(connection){
clearTimeout(connection.idleMonitor);

@@ -74,0 +85,0 @@ });

'use strict';
var LogCluster = require("./log-cluster.js").LogCluster;
var LogCluster = require('./log-cluster.js').LogCluster;
new LogCluster();
{
"author": "cubejs",
"name": "async-logging",
"version": "0.1.5",
"version": "0.1.6",
"description": "0.1.6 is the same as 0.2.2 just to get around ebay-logging-client vs. async-logging-client change",
"repository": {

@@ -12,16 +13,16 @@ "type": "git",

},
"main":"./lib/log-cluster.js",
"main":"./lib/index.js",
"dependencies": {
"websocket": "~1.0.8",
"msgpack": "~0.1.8",
"msgpack": "0.1.8",
"underscore": "~1.4.4",
"cluster2": "~0.4.0",
"winston": "~0.7.1"
"cluster2": "git://github.com/cubejs/cluster2.git",
"winston": "~0.7.1",
"request": "~2.22.0"
},
"devDependencies": {
"express": "~3.2",
"mocha": "",
"redis": "",
"should": "",
"q": ""
"mocha": "~1.11.0",
"should": "~1.2.2",
"q": "~0.9.6"
},

@@ -28,0 +29,0 @@ "scripts":{

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc