pooling
Advanced tools
Comparing version 0.4.0 to 0.4.1
202
lib/pool.js
@@ -18,2 +18,3 @@ // Copyright (c) 2012, Mark Cavage. All rights reserved. | ||
var DEFAULT_EVENTS = ['close', 'end', 'error', 'timeout']; | ||
var DTPS = {}; | ||
var MAX_INT = Math.pow(2, 32) - 1; | ||
@@ -60,32 +61,36 @@ | ||
var dtp = dtrace.createDTraceProvider('pooling-' + name); | ||
if (DTPS[name]) { | ||
return (DTPS[name]); | ||
} | ||
// Most probes are: | ||
// | ||
// object_id, state | ||
// | ||
var dtp = dtrace.createDTraceProvider(name); | ||
var probes = { | ||
release: dtp.addProbe('release', 'int', 'char *'), | ||
remove: dtp.addProbe('remove', 'int', 'char *'), | ||
kill: dtp.addProbe('kill', 'int', 'char *'), | ||
createStart: dtp.addProbe('create-start', 'int', 'char *'), | ||
// these are all object_id, state | ||
release: dtp.addProbe('release', 'int', 'json'), | ||
remove: dtp.addProbe('remove', 'int', 'json'), | ||
kill: dtp.addProbe('kill', 'int', 'json'), | ||
create: dtp.addProbe('create', 'int', 'json'), | ||
// "Special" probes | ||
// "ok" tacked at end | ||
createDone: dtp.addProbe('create-done', 'int', 'char *', 'int'), | ||
// additionally create_time of object | ||
acquire: dtp.addProbe('acquire', 'int', 'char *', 'char *'), | ||
acquire: dtp.addProbe('acquire', 'int', 'json', 'char *'), | ||
// additionally "ok" | ||
assert: dtp.addProbe('assert', 'int', 'char *', 'int'), | ||
assert: dtp.addProbe('assert', 'int', 'json', 'int'), | ||
// just num_items to check and state | ||
check: dtp.addProbe('check', 'int', 'char *') | ||
check: dtp.addProbe('check', 'int', 'json'), | ||
// arg0 is the current number of checked out connections | ||
shutdown: dtp.addProbe('shutdown', 'int', 'json'), | ||
// just the current state (ignore arg0) | ||
queue: dtp.addProbe('queue', 'int', 'json'), | ||
_provider: dtp | ||
}; | ||
DTPS[name] = probes; | ||
dtp.enable(); | ||
return (probes); | ||
return (DTPS[name]); | ||
} | ||
@@ -216,14 +221,10 @@ | ||
if ((obj = dequeue(this.available))) { | ||
while ((obj = dequeue(this.available))) { | ||
if (this._ensure(obj)) { | ||
this.probes.acquire.fire(function () { | ||
return ([obj.id, | ||
JSON.stringify(self._state(true)), | ||
obj.ctime]); | ||
return ([obj.id, self._state(true), obj.ctime]); | ||
}); | ||
callback(null, obj.client); | ||
} else { | ||
acquire(callback); | ||
return; | ||
} | ||
return; | ||
} | ||
@@ -233,21 +234,10 @@ | ||
enqueue(this.queue, callback); | ||
this.probes.queue.fire(function () { | ||
return ([0, self._state(true)]); | ||
}); | ||
return; | ||
} | ||
this.pendingResources++; | ||
var nextId = nextSequence(); | ||
self.probes.createStart.fire(function () { | ||
return ([nextId, JSON.stringify(self._state(true))]); | ||
}); | ||
this.create(function createCallback(err, client) { | ||
self.pendingResources = Math.max(self.pendingResources - 1, 0); | ||
this._create(function onCreatedClient(err, object) { | ||
if (err) { | ||
self.probes.createDone.fire(function () { | ||
return ([nextId, | ||
JSON.stringify(self._state(true)), | ||
0]); | ||
}); | ||
callback(err); | ||
@@ -257,26 +247,6 @@ return; | ||
// Handle shutdown being called while a | ||
// new client was being made | ||
if (self.stopped || self.stopping) { | ||
self._kill(client); | ||
callback(new PoolShuttingDownError()); | ||
return; | ||
} | ||
obj = createPoolObject(client, nextId); | ||
self._watch(client); | ||
enqueue(self.resources, obj); | ||
self.probes.createDone.fire(function () { | ||
return ([nextId, | ||
JSON.stringify(self._state(true)), | ||
1]); | ||
}); | ||
self.probes.acquire.fire(function () { | ||
return ([obj.id, | ||
JSON.stringify(self._state(true)), | ||
obj.ctime]); | ||
return ([object.id, self._state(true), object.ctime]); | ||
}); | ||
callback(null, client); | ||
callback(null, object.client); | ||
}); | ||
@@ -304,2 +274,6 @@ }; | ||
this.probes.release.fire(function () { | ||
return ([obj.id, self._state(true)]); | ||
}); | ||
if (this.stopping || this.stopped) { | ||
@@ -311,6 +285,2 @@ this._kill(obj); | ||
this.probes.release.fire(function () { | ||
return ([obj.id, JSON.stringify(self._state(true))]); | ||
}); | ||
if (!this._ensure(obj)) | ||
@@ -343,3 +313,3 @@ return; | ||
this.probes.remove.fire(function () { | ||
return ([obj.id, JSON.stringify(self._state(true))]); | ||
return ([obj.id, self._state(true)]); | ||
}); | ||
@@ -354,7 +324,6 @@ this._kill(obj); | ||
var cb = once(callback || function () {}); | ||
var log = this.log; | ||
var self = this; | ||
var cb = once(callback || function () {}); | ||
function end() { | ||
@@ -372,2 +341,3 @@ while (self.resources.length !== 0) { | ||
self.stopping = false; | ||
self._deadbeef = true; | ||
@@ -383,2 +353,7 @@ cb(); | ||
self.probes.shutdown.fire(function () { | ||
var out = self.resources.length - self.available.length; | ||
return ([out, self._state(true)]); | ||
}); | ||
this.stopping = true; | ||
@@ -388,2 +363,25 @@ if (this.timer) | ||
if (DTPS[self.name]) { | ||
var d = DTPS[self.name]; | ||
Object.keys(d).forEach(function (k) { | ||
if (k === '_provider') | ||
return; | ||
if (d._provider.removeProbe) | ||
d._provider.removeProbe(d[k]); | ||
}); | ||
if (d._provider.disable) | ||
d._provider.disable(); | ||
delete DTPS[self.name]; | ||
} | ||
this.queue.forEach(function (w) { | ||
process.nextTick(function () { | ||
w(new PoolShuttingDownError()); | ||
}); | ||
}); | ||
this.queue.length = 0; | ||
if (this.available.length === this.resources.length) { | ||
@@ -400,3 +398,3 @@ process.nextTick(end); | ||
Pool.prototype.toString = function toString() { | ||
return (sprintf('[object Pool <%s>]', JSON.stringify(this._state()))); | ||
return (sprintf('[object Pool <%j>]', this._state())); | ||
}; | ||
@@ -408,2 +406,34 @@ | ||
Pool.prototype._create = function _create(cb) { | ||
var nextId = nextSequence(); | ||
var self = this; | ||
this.pendingResources++; | ||
this.create(function createCallback(err, client) { | ||
self.pendingResources = Math.max(self.pendingResources - 1, 0); | ||
// Handle shutdown being called while a | ||
// new client was being made | ||
if (err || self.stopped || self.stopping) { | ||
if (client) | ||
self._kill(client); | ||
cb(err || new PoolShuttingDownError()); | ||
return; | ||
} | ||
var obj = createPoolObject(client, nextId); | ||
self._watch(client); | ||
enqueue(self.resources, obj); | ||
self.probes.create.fire(function () { | ||
return ([nextId, self._state(true)]); | ||
}); | ||
cb(null, obj); | ||
}); | ||
}; | ||
Pool.prototype._emitDrain = function _emitDrain() { | ||
@@ -415,6 +445,4 @@ var self = this; | ||
process.nextTick(function onDrain() { | ||
if (self.available.length === self.resources.length) | ||
self.emit('drain'); | ||
}); | ||
if (self.available.length === self.resources.length) | ||
self.emit('drain'); | ||
}; | ||
@@ -438,5 +466,3 @@ | ||
this.probes.assert.fire(function () { | ||
return ([obj.id, | ||
JSON.stringify(self._state(true)), | ||
ok ? 1 : 0]); | ||
return ([obj.id, self._state(true), ok ? 1 : 0]); | ||
}); | ||
@@ -471,6 +497,2 @@ | ||
this.probes.check.fire(function () { | ||
return ([toCheck.length, JSON.stringify(self._state(true))]); | ||
}); | ||
if (toCheck.length === 0) { | ||
@@ -481,2 +503,6 @@ self._scheduleReaper(); | ||
this.probes.check.fire(function () { | ||
return ([toCheck.length, self._state(true)]); | ||
}); | ||
// Run over all the "locked" items in || | ||
@@ -510,2 +536,3 @@ var opts = { | ||
}); | ||
self._emitDrain(); | ||
self._scheduleReaper(); | ||
@@ -537,3 +564,3 @@ }); | ||
this.probes.kill.fire(function () { | ||
return ([obj.id, JSON.stringify(self._state(true))]); | ||
return ([obj.id, self._state(true)]); | ||
}); | ||
@@ -549,2 +576,4 @@ | ||
Pool.prototype._scheduleReaper = function _scheduleReaper() { | ||
var self = this; | ||
if (this.stopped || this.stopping) { | ||
@@ -555,3 +584,7 @@ clearTimeout(this.timer); | ||
this.timer = setTimeout(this._reap.bind(this), this.checkInterval); | ||
if (this.checkInterval > 0 && this.checkInterval !== false) { | ||
this.timer = setTimeout(function healthCheck() { | ||
self._reap(); | ||
}, this.checkInterval); | ||
} | ||
}; | ||
@@ -576,2 +609,5 @@ | ||
Pool.prototype._watch = function _watch(client) { | ||
if (this.stopped || this.stopping) | ||
return; | ||
var log = this.log; | ||
@@ -578,0 +614,0 @@ var obj; |
{ | ||
"name": "pooling", | ||
"description": "General purpose resource pool API", | ||
"version": "0.4.0", | ||
"version": "0.4.1", | ||
"author": "Mark Cavage <mcavage@gmail.com>", | ||
@@ -16,3 +16,3 @@ "main": "lib/index.js", | ||
"assert-plus": "0.1.2", | ||
"dtrace-provider": "0.2.7", | ||
"dtrace-provider": "0.2.8", | ||
"bunyan": "0.18.2", | ||
@@ -19,0 +19,0 @@ "once": "1.1.1", |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
498
0
23997
6
+ Addeddtrace-provider@0.2.8(transitive)
Updateddtrace-provider@0.2.8