Socket
Socket
Sign inDemoInstall

pooling

Package Overview
Dependencies
10
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.4.0 to 0.4.1

202

lib/pool.js

@@ -18,2 +18,3 @@ // Copyright (c) 2012, Mark Cavage. All rights reserved.

var DEFAULT_EVENTS = ['close', 'end', 'error', 'timeout'];
var DTPS = {};
var MAX_INT = Math.pow(2, 32) - 1;

@@ -60,32 +61,36 @@

var dtp = dtrace.createDTraceProvider('pooling-' + name);
if (DTPS[name]) {
return (DTPS[name]);
}
// Most probes are:
//
// object_id, state
//
var dtp = dtrace.createDTraceProvider(name);
var probes = {
release: dtp.addProbe('release', 'int', 'char *'),
remove: dtp.addProbe('remove', 'int', 'char *'),
kill: dtp.addProbe('kill', 'int', 'char *'),
createStart: dtp.addProbe('create-start', 'int', 'char *'),
// these are all object_id, state
release: dtp.addProbe('release', 'int', 'json'),
remove: dtp.addProbe('remove', 'int', 'json'),
kill: dtp.addProbe('kill', 'int', 'json'),
create: dtp.addProbe('create', 'int', 'json'),
// "Special" probes
// "ok" tacked at end
createDone: dtp.addProbe('create-done', 'int', 'char *', 'int'),
// additionally create_time of object
acquire: dtp.addProbe('acquire', 'int', 'char *', 'char *'),
acquire: dtp.addProbe('acquire', 'int', 'json', 'char *'),
// additionally "ok"
assert: dtp.addProbe('assert', 'int', 'char *', 'int'),
assert: dtp.addProbe('assert', 'int', 'json', 'int'),
// just num_items to check and state
check: dtp.addProbe('check', 'int', 'char *')
check: dtp.addProbe('check', 'int', 'json'),
// arg0 is the current number of checked out connections
shutdown: dtp.addProbe('shutdown', 'int', 'json'),
// just the current state (ignore arg0)
queue: dtp.addProbe('queue', 'int', 'json'),
_provider: dtp
};
DTPS[name] = probes;
dtp.enable();
return (probes);
return (DTPS[name]);
}

@@ -216,14 +221,10 @@

if ((obj = dequeue(this.available))) {
while ((obj = dequeue(this.available))) {
if (this._ensure(obj)) {
this.probes.acquire.fire(function () {
return ([obj.id,
JSON.stringify(self._state(true)),
obj.ctime]);
return ([obj.id, self._state(true), obj.ctime]);
});
callback(null, obj.client);
} else {
acquire(callback);
return;
}
return;
}

@@ -233,21 +234,10 @@

enqueue(this.queue, callback);
this.probes.queue.fire(function () {
return ([0, self._state(true)]);
});
return;
}
this.pendingResources++;
var nextId = nextSequence();
self.probes.createStart.fire(function () {
return ([nextId, JSON.stringify(self._state(true))]);
});
this.create(function createCallback(err, client) {
self.pendingResources = Math.max(self.pendingResources - 1, 0);
this._create(function onCreatedClient(err, object) {
if (err) {
self.probes.createDone.fire(function () {
return ([nextId,
JSON.stringify(self._state(true)),
0]);
});
callback(err);

@@ -257,26 +247,6 @@ return;

// Handle shutdown being called while a
// new client was being made
if (self.stopped || self.stopping) {
self._kill(client);
callback(new PoolShuttingDownError());
return;
}
obj = createPoolObject(client, nextId);
self._watch(client);
enqueue(self.resources, obj);
self.probes.createDone.fire(function () {
return ([nextId,
JSON.stringify(self._state(true)),
1]);
});
self.probes.acquire.fire(function () {
return ([obj.id,
JSON.stringify(self._state(true)),
obj.ctime]);
return ([object.id, self._state(true), object.ctime]);
});
callback(null, client);
callback(null, object.client);
});

@@ -304,2 +274,6 @@ };

this.probes.release.fire(function () {
return ([obj.id, self._state(true)]);
});
if (this.stopping || this.stopped) {

@@ -311,6 +285,2 @@ this._kill(obj);

this.probes.release.fire(function () {
return ([obj.id, JSON.stringify(self._state(true))]);
});
if (!this._ensure(obj))

@@ -343,3 +313,3 @@ return;

this.probes.remove.fire(function () {
return ([obj.id, JSON.stringify(self._state(true))]);
return ([obj.id, self._state(true)]);
});

@@ -354,7 +324,6 @@ this._kill(obj);

var cb = once(callback || function () {});
var log = this.log;
var self = this;
var cb = once(callback || function () {});
function end() {

@@ -372,2 +341,3 @@ while (self.resources.length !== 0) {

self.stopping = false;
self._deadbeef = true;

@@ -383,2 +353,7 @@ cb();

self.probes.shutdown.fire(function () {
var out = self.resources.length - self.available.length;
return ([out, self._state(true)]);
});
this.stopping = true;

@@ -388,2 +363,25 @@ if (this.timer)

if (DTPS[self.name]) {
var d = DTPS[self.name];
Object.keys(d).forEach(function (k) {
if (k === '_provider')
return;
if (d._provider.removeProbe)
d._provider.removeProbe(d[k]);
});
if (d._provider.disable)
d._provider.disable();
delete DTPS[self.name];
}
this.queue.forEach(function (w) {
process.nextTick(function () {
w(new PoolShuttingDownError());
});
});
this.queue.length = 0;
if (this.available.length === this.resources.length) {

@@ -400,3 +398,3 @@ process.nextTick(end);

Pool.prototype.toString = function toString() {
return (sprintf('[object Pool <%s>]', JSON.stringify(this._state())));
return (sprintf('[object Pool <%j>]', this._state()));
};

@@ -408,2 +406,34 @@

Pool.prototype._create = function _create(cb) {
var nextId = nextSequence();
var self = this;
this.pendingResources++;
this.create(function createCallback(err, client) {
self.pendingResources = Math.max(self.pendingResources - 1, 0);
// Handle shutdown being called while a
// new client was being made
if (err || self.stopped || self.stopping) {
if (client)
self._kill(client);
cb(err || new PoolShuttingDownError());
return;
}
var obj = createPoolObject(client, nextId);
self._watch(client);
enqueue(self.resources, obj);
self.probes.create.fire(function () {
return ([nextId, self._state(true)]);
});
cb(null, obj);
});
};
Pool.prototype._emitDrain = function _emitDrain() {

@@ -415,6 +445,4 @@ var self = this;

process.nextTick(function onDrain() {
if (self.available.length === self.resources.length)
self.emit('drain');
});
if (self.available.length === self.resources.length)
self.emit('drain');
};

@@ -438,5 +466,3 @@

this.probes.assert.fire(function () {
return ([obj.id,
JSON.stringify(self._state(true)),
ok ? 1 : 0]);
return ([obj.id, self._state(true), ok ? 1 : 0]);
});

@@ -471,6 +497,2 @@

this.probes.check.fire(function () {
return ([toCheck.length, JSON.stringify(self._state(true))]);
});
if (toCheck.length === 0) {

@@ -481,2 +503,6 @@ self._scheduleReaper();

this.probes.check.fire(function () {
return ([toCheck.length, self._state(true)]);
});
// Run over all the "locked" items in ||

@@ -510,2 +536,3 @@ var opts = {

});
self._emitDrain();
self._scheduleReaper();

@@ -537,3 +564,3 @@ });

this.probes.kill.fire(function () {
return ([obj.id, JSON.stringify(self._state(true))]);
return ([obj.id, self._state(true)]);
});

@@ -549,2 +576,4 @@

Pool.prototype._scheduleReaper = function _scheduleReaper() {
var self = this;
if (this.stopped || this.stopping) {

@@ -555,3 +584,7 @@ clearTimeout(this.timer);

this.timer = setTimeout(this._reap.bind(this), this.checkInterval);
if (this.checkInterval > 0 && this.checkInterval !== false) {
this.timer = setTimeout(function healthCheck() {
self._reap();
}, this.checkInterval);
}
};

@@ -576,2 +609,5 @@

Pool.prototype._watch = function _watch(client) {
if (this.stopped || this.stopping)
return;
var log = this.log;

@@ -578,0 +614,0 @@ var obj;

{
"name": "pooling",
"description": "General purpose resource pool API",
"version": "0.4.0",
"version": "0.4.1",
"author": "Mark Cavage <mcavage@gmail.com>",

@@ -16,3 +16,3 @@ "main": "lib/index.js",

"assert-plus": "0.1.2",
"dtrace-provider": "0.2.7",
"dtrace-provider": "0.2.8",
"bunyan": "0.18.2",

@@ -19,0 +19,0 @@ "once": "1.1.1",

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc