concurix-traceaggregator
Advanced tools
Comparing version 1.0.1 to 1.1.0
{ | ||
"name": "concurix-traceaggregator", | ||
"version": "1.0.1", | ||
"version": "1.1.0", | ||
"description": "Aggregation of trace data for the concurix tracer", | ||
@@ -30,7 +30,7 @@ "main": "traceaggregator.js", | ||
"dependencies": { | ||
"concurix-waterfall": "^1.0.0", | ||
"concurix-waterfall": "^1.0.1", | ||
"stats-incremental": "^1.0.0" | ||
}, | ||
"devDependencies": { | ||
"tape": "^2.13.3" | ||
"tape": "^2.14.0" | ||
}, | ||
@@ -37,0 +37,0 @@ "repository": { |
@@ -13,23 +13,17 @@ "use strict"; | ||
this.tracer = tracer | ||
// TODO maybe just call this.reset() here? | ||
this.cache = [] | ||
this.starters = [] | ||
this.schedulers = {} // not used? | ||
this.resumers = {} | ||
this.waterfalls = {} | ||
this.reset() | ||
// Stats | ||
this.stackStats = Stats() | ||
this.waterfallStats = Stats() | ||
this.stacksCollapsed = 0 | ||
this.orphanedLinks = 0 | ||
} | ||
TraceAggregator.prototype.reset = function reset() { | ||
// TODO does this reset too much? | ||
this.cache = [] | ||
this.starters = [] | ||
this.schedulers = {} | ||
this.resumers = {} | ||
this.seen = [] | ||
this.leaves = [] | ||
this.targets = {} | ||
this.waterfalls = {} | ||
this.origins = {} | ||
} | ||
@@ -47,5 +41,10 @@ | ||
var outgoing = {} | ||
var outgoingTxs = {} | ||
var incomingTxs = {} | ||
var incoming = {} | ||
var tags = [] | ||
var tZero | ||
var tZero = [] | ||
var outgoingCount = 0 | ||
var incomingCount = 0 | ||
var isLeaf = false | ||
@@ -56,4 +55,9 @@ var blockName = "" | ||
var lastIdVisits = 1 | ||
var lastAction | ||
var lastAction = "" | ||
var index = this.cache.length | ||
var enterCount = 0 | ||
var exitCount = 0 | ||
var bf = new BlockFactory() | ||
@@ -71,3 +75,3 @@ | ||
if (record.type === "enter") { | ||
if (tZero === undefined) { | ||
if (tZero.length === 0) { | ||
tZero = record.ts | ||
@@ -89,2 +93,3 @@ bf.setZero(record.ts) | ||
blockName += "<" + lastEnterId | ||
enterCount++ | ||
} | ||
@@ -94,2 +99,3 @@ blockName += "<" + record.id | ||
isRepeat = false | ||
enterCount++ | ||
} | ||
@@ -104,3 +110,5 @@ lastEnterId = record.id | ||
if (enter.id !== record.id) { | ||
// TODO how are we going to track errors like this? | ||
// TBD how are we going to track errors like this? Send to cx? | ||
console.log("Concurix stack id mismatch") | ||
return | ||
} | ||
@@ -117,2 +125,3 @@ | ||
blockName += ">" | ||
exitCount++ | ||
} | ||
@@ -125,12 +134,41 @@ | ||
else if (record.type === "schedule") { | ||
outgoing[record.id] = bf.wait(record.id, record.name, record.ts, null, record.fnId) | ||
if (record.isTx) { | ||
outgoingTxs[record.id] = bf.wait(record.id, record.name, record.ts, null, record.fnId) | ||
this.origins[record.id] = index | ||
} | ||
else { | ||
outgoing[record.id] = bf.wait(record.id, record.name, record.ts, null, record.fnId) | ||
} | ||
outgoingCount++ | ||
if (incoming[record.id]) { | ||
// links to self | ||
incomingCount-- | ||
outgoingCount-- | ||
} | ||
} | ||
else if (record.type === "resume") { | ||
incoming[record.id] = bf.wait(record.id, record.name, null, record.ts, record.fnId) | ||
if (record.isTx) { | ||
incomingTxs[record.id] = index | ||
} | ||
incomingCount++ | ||
if (outgoing[record.id]) { | ||
// links to self | ||
incomingCount-- | ||
outgoingCount-- | ||
} | ||
this.targets[record.id] = index | ||
} | ||
else if (record.type === "tag") { | ||
tags.push(record.tag) | ||
tags.push(record.name) | ||
} | ||
} | ||
if (enterCount !== exitCount) { | ||
console.log("Concurix Stack Accounting Error: %s !=== %s", enterCount, exitCount) | ||
// The accounting got messed up somehow. We'll need to track down this issue, | ||
// but for now, just log it and throw data away. | ||
return | ||
} | ||
miniwf = miniwf.sort(function (a, b) { | ||
@@ -140,12 +178,13 @@ return a.order - b.order | ||
// outgoing link count safety valve | ||
var outgoingLinks = Object.keys(outgoing) | ||
if (outgoingLinks.length > this.tracer.config.max_links_per_stack) { | ||
// Orphan all the outgoing links, otherwise this waterfall is tough | ||
// to aggregate and not feasible to visualize | ||
outgoing = {} | ||
this.orphanedLinks += outgoingLinks.length | ||
// No links in/out = easy isolated "leaf" | ||
if (outgoingCount <= 0 && incomingCount <= 0) { | ||
isLeaf = true | ||
} | ||
// Incoming links, but they aren't transactions and no outgoing links = "leaf" | ||
if (incomingCount > 0 && outgoingCount <= 0 && Object.keys(incomingTxs).length === 0) { | ||
isLeaf = true | ||
} | ||
var workBlock = { | ||
idx: index, | ||
tZero: tZero, | ||
@@ -155,2 +194,4 @@ name: blockName, | ||
outgoing: outgoing, | ||
outgoingTxs: outgoingTxs, | ||
incomingTxs: incomingTxs, | ||
incoming: incoming, | ||
@@ -160,80 +201,108 @@ tags: tags | ||
this.cache.push(workBlock) | ||
var index = this.cache.length - 1 | ||
var outgoingIds = Object.keys(outgoing) | ||
for (i = 0; i < outgoingIds.length; i++) { | ||
// TODO Make sure this doesn't clobber? | ||
this.schedulers[outgoingIds[i]] = index | ||
this.seen.push(0) | ||
// if a transaction end, finish the transaction | ||
var txIds = Object.keys(incomingTxs) | ||
for (i = 0; i < txIds.length; i++) { | ||
this.navigateTransaction(txIds[i], index) | ||
} | ||
var incomingIds = Object.keys(incoming) | ||
var incomingCount = incomingIds.length | ||
for (i = 0; i < incomingIds.length; i++) { | ||
if (outgoing[incomingIds[i]]) { | ||
// we link to ourself! | ||
incomingCount-- | ||
} | ||
else { | ||
// TODO make sure this doesn't clobber? | ||
this.resumers[incomingIds[i]] = index | ||
} | ||
if (isLeaf) { | ||
// if a leaf, go ahead and throw it to aggregation | ||
this.finishTransactionThread([index]) | ||
} | ||
if (incomingCount <= 0) { | ||
this.starters.push(index) | ||
} | ||
} | ||
/* Aggregation Logic */ | ||
// These are called only periodically | ||
// TODO possibility to do waterfall aggregation as waterfalls become complete as opposed to send-time? | ||
function nextTransaction() { | ||
// get a transaction to process | ||
// no more? | ||
cleanupExtras() | ||
} | ||
// Fetch all aggregated waterfall data & reset state. | ||
TraceAggregator.prototype.reap = function reap() { | ||
this.cacheTouched = [] | ||
for (var j = 0; j < this.cache.length; j++) { | ||
this.cacheTouched[j] = 0 | ||
TraceAggregator.prototype.navigateTransaction = function (id, destIndex) { | ||
var origin = this.origins[id] | ||
if (origin == null) { | ||
// oops this transaction got truncated. | ||
// Toss the pieces of this transaction to the garbage aggregator. | ||
// Sucks, but it's the best we can do for now. | ||
// TBD keep transactions for X aggregation windows? | ||
return | ||
} | ||
var startSeg = this.cache[origin] | ||
var complete = false | ||
var links = Object.keys(startSeg.outgoing) | ||
for (var i = 0; i < links.length; i++) { | ||
// follow non-transaction links first | ||
var target = this.targets[links[i]] | ||
if (target == null) { | ||
continue | ||
} | ||
var path = this.followThread([origin], target, links[i], id, {}) | ||
if (path.length > 0) { | ||
// complete all the wait link segments | ||
for (var i = 0; i < this.starters.length; i++) { | ||
this.stitchWaterfall(this.starters[i]) | ||
// TBD track share count on segments? | ||
// difficulty: how to update already agg'd ones? | ||
return this.finishTransactionThread(path, id) | ||
} | ||
} | ||
this.finishTransactionThread([origin, id, destIndex], id) | ||
} | ||
// Make sure no blocks are left behind. | ||
this.cleanupPartials() | ||
TraceAggregator.prototype.followThread = function (thread, segId, linkId, transactionId, seenCache) { | ||
seenCache[segId] = true | ||
var segment = this.cache[segId] | ||
// Is this segment the end of transactionId? | ||
if (segment.incomingTxs[transactionId] != null) { | ||
// this finishes the transaction! | ||
// stop recursing and retun it | ||
return thread.concat(linkId, segId) | ||
} | ||
var outgoing = Object.keys(segment.outgoing) | ||
for (var i = 0; i < outgoing.length; i++) { | ||
var target = this.targets[outgoing[i]] | ||
if (seenCache[target]) { | ||
// Don't get stuck in a loop | ||
continue | ||
} | ||
var path = this.followThread([], target, outgoing[i], transactionId, seenCache) | ||
if (path.length) { | ||
// unwind | ||
return thread.concat(linkId, segId, path) | ||
} | ||
} | ||
// We didn't make it to transactionId; too bad so sad | ||
return [] | ||
} | ||
var self = this | ||
var wfArray = Object.keys(this.waterfalls).map(function (k) { | ||
return self.waterfalls[k] | ||
}) | ||
this.reset() | ||
return wfArray | ||
function cleanupExtras() { | ||
// throw all untouched segments into their own simple wfs | ||
} | ||
TraceAggregator.prototype.stitchWaterfall = function stitchWaterfall(idx, isPartial) { | ||
var firstBlock = this.cache[idx] | ||
TraceAggregator.prototype.finishTransactionThread = function (path, transactionId) { | ||
// finalize the stack | ||
var firstBlock = this.cache[path[0]] | ||
var waterfall = new Waterfall(firstBlock.tZero) | ||
var continuationQueue = new UniqueQueue([idx]) | ||
while (continuationQueue.hasNext()) { | ||
var nextIdx = continuationQueue.next() | ||
var block = this.cache[nextIdx] | ||
var seen = waterfall.addSegment(block) | ||
if (!seen) { | ||
this.cacheTouched[nextIdx]++ | ||
var outgoing = Object.keys(block.outgoing) | ||
for (var i = 0; i < outgoing.length; i++) { | ||
continuationQueue.add(this.resumers[outgoing[i]]) | ||
} | ||
} | ||
for (var i = 0; i < path.length; i += 2) { | ||
waterfall.addSegment(this.cache[path[i]]) | ||
this.seen[path[i]]++ | ||
} | ||
if (transactionId != null) { | ||
// manually finish transaction link segment | ||
var src = this.cache[path[0]].outgoingTxs[transactionId] | ||
var tgt = this.cache[path[path.length - 1]].incoming[transactionId] | ||
waterfall.addLink(src, tgt) | ||
} | ||
waterfall.finishLinks() | ||
// send to aggregation | ||
this.aggregate(waterfall) | ||
} | ||
TraceAggregator.prototype.aggregate = function (waterfall) { | ||
var name = waterfall.blockNames.join("~") | ||
if (this.waterfalls[name] == null) { | ||
this.waterfalls[name] = new AggregateWaterfall(waterfall) | ||
if (isPartial) { | ||
this.waterfalls[name].partial = true | ||
} | ||
this.waterfallStats.update(this.waterfalls[name].segments.length) | ||
@@ -246,25 +315,17 @@ } | ||
TraceAggregator.prototype.cleanupPartials = function cleanupPartials() { | ||
var leftBehind = [] | ||
for (var i = 0; i < this.cacheTouched.length; i++) { | ||
if (this.cacheTouched[i] === 0) { | ||
// This one was missed. | ||
leftBehind.push(i) | ||
TraceAggregator.prototype.reap = function () { | ||
// Go through all the unseen segments and finish them up | ||
for (var i = 0; i < this.seen.length; i++) { | ||
if (this.seen[i] === 0) { | ||
this.finishTransactionThread([i]) | ||
} | ||
} | ||
var self = this | ||
// TODO cache outgoing keys? and/or length? | ||
leftBehind = leftBehind.sort(function (a, b) { | ||
var bLinks = Object.keys(self.cache[b].outgoing).length | ||
var aLinks = Object.keys(self.cache[a].outgoing).length | ||
return bLinks - aLinks | ||
}) | ||
var queue = new UniqueQueue(leftBehind) | ||
while (queue.hasNext()) { | ||
var idx = queue.next() | ||
if (this.cacheTouched[idx] === 0) { | ||
this.stitchWaterfall(idx, true) | ||
} | ||
var wfArray = [] | ||
var wfids = Object.keys(this.waterfalls) | ||
for (var j = 0; j < wfids.length; j++) { | ||
wfArray.push(this.waterfalls[wfids[j]]) | ||
} | ||
this.reset() | ||
return wfArray | ||
} |
11923
352
Updatedconcurix-waterfall@^1.0.1