Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

ironium

Package Overview
Dependencies
Maintainers
1
Versions
123
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ironium - npm Package Compare versions

Comparing version 0.1.8 to 0.2.0

22

Gruntfile.js
module.exports = function(grunt) {
grunt.loadNpmTasks('grunt-contrib-clean');
grunt.loadNpmTasks('grunt-contrib-watch');
grunt.loadNpmTasks('grunt-notify');
grunt.loadNpmTasks('grunt-traceur');

@@ -22,8 +24,24 @@

grunt.config('watch', {
files: [ 'src/**/*.js' ],
tasks: [ 'build' ],
options: { interrupt: true }
});
grunt.config('clean', [ 'lib' ]);
grunt.config.set('notify.notify_hooks', {
options: { enabled: true }
});
grunt.registerTask('build', [ 'clean', 'traceur' ]);
grunt.registerTask('default', [ 'build' ]);
grunt.config('notify.build', {
options: { message: "Build complete!" }
});
grunt.registerTask('build', "Compile source files from src/ into lib/ directory",
[ 'clean', 'traceur', 'notify:build' ]);
grunt.registerTask('default', "Continously compile source files (build and watch)",
[ 'build', 'watch' ]);
}

9

lib/index.js

@@ -56,9 +56,4 @@ var $__getProtoParent = function(superClass) {

if (!queues) {
try {
throw undefined;
} catch (config) {
config = this._config || {};
queues = new Queues(this, config);
this._queues = queues;
}
queues = new Queues(this);
this._queues = queues;
}

@@ -65,0 +60,0 @@ return queues;

@@ -39,14 +39,56 @@ var $__Object = Object, $__getOwnPropertyNames = $__Object.getOwnPropertyNames, $__getOwnPropertyDescriptor = $__Object.getOwnPropertyDescriptor, $__getDescriptors = function(object) {

var ERROR_BACKOFF = ms('30s');
var Configuration = function() {
'use strict';
var $Configuration = ($__createClassNoExtends)({
constructor: function(workers) {
this._workers = workers;
},
get config() {
var config = this._config;
if (!config) {
try {
throw undefined;
} catch (source) {
source = this._workers._config && this._workers._config.queues;
if (!source) source = (process.env.NODE_ENV == 'test') ? {prefix: 'test-'}: {};
this._config = config = {
hostname: source.hostname || 'localhost',
port: source.port || 11300,
prefix: source.prefix
};
if (source.token) {
config.authenticate = 'oauth ' + source.token + ' ' + source.projectID;
config.webhookURL = 'https://' + source.hostname + '/1/projects/' + source.projectID + '/queues/{queueName}/messages/webhook?oauth=' + source.token;
} else {
config.authenticate = null;
config.webhookURL = 'https://<host>/1/projects/<project>/queues/{queueName}/messages/webhook?oauth=<token>';
}
}
}
return config;
},
get hostname() {
return this.config.hostname;
},
get port() {
return this.config.port;
},
prefixedName: function(queueName) {
return (this.config.prefix || '') + queueName;
},
get authenticate() {
return this.config.authenticate;
},
webhookURL: function(queueName) {
return this.config.webhookURL.replace('{queueName}', queueName);
}
}, {});
return $Configuration;
}();
module.exports = (function() {
'use strict';
var Queues = ($__createClassNoExtends)({
constructor: function() {
var logger = arguments[0] !== (void 0) ? arguments[0]: console;
var $__3 = arguments[1], host = "host"in $__3 ? $__3.host: 'localhost', port = "port"in $__3 ? $__3.port: 11300, projectID = $__3.projectID, token = $__3.token, prefix = $__3.prefix;
this._logger = logger;
if (token) this._authenticate = 'oauth ' + token + ' ' + projectID;
this._host = host;
this._port = port;
this._prefix = prefix || '';
this._webhookURL = 'https://' + host + '/1/projects/' + projectID + '/queues/{queueName}/messages/webhook?oauth=' + token;
var Server = ($__createClassNoExtends)({
constructor: function(workers) {
this.notify = workers;
this.config = new Configuration(workers);
this._queues = Object.create({});

@@ -58,11 +100,3 @@ },

if (!queue) {
queue = new Queue({
name: name,
prefixedName: this._prefix + name,
host: this._host,
port: this._port,
authenticate: this._authenticate,
webhookURL: this._webhookURL.replace('{queueName}', name),
logger: this._logger
});
queue = new Queue(name, this);
this._queues[name] = queue;

@@ -73,3 +107,3 @@ }

start: function() {
this._logger.debug("Start all queues");
this.notify.debug("Start all queues");
this._foreachQueue((function(queue) {

@@ -80,3 +114,3 @@ return queue.start();

stop: function(callback) {
this._logger.debug("Stop all queues");
this.notify.debug("Stop all queues");
this._foreachQueue((function(queue) {

@@ -87,3 +121,3 @@ return queue.stop();

reset: function(callback) {
this._logger.debug("Clear all queues");
this.notify.debug("Clear all queues");
this._foreachQueue((function(queue, done) {

@@ -94,3 +128,3 @@ return queue.reset(done);

once: function(callback) {
this._logger.debug("Process all queued jobs");
this.notify.debug("Process all queued jobs");
var queues = _.values(this._queues);

@@ -113,3 +147,3 @@ function iterate() {

}, {});
return Queues;
return Server;
}());

@@ -119,11 +153,8 @@ var Session = function() {

var $Session = ($__createClassNoExtends)({
constructor: function($__3, setup) {
var name = $__3.name, host = $__3.host, port = $__3.port, authenticate = $__3.authenticate, logger = $__3.logger;
constructor: function(name, server, setup) {
this.name = name;
this.host = host;
this.port = port;
this.authenticate = authenticate;
this.config = server.config;
this.notify = server.notify;
this.setup = setup;
this.pending = [];
this._logger = logger;
},

@@ -146,3 +177,3 @@ request: function(command) {

}), TIMEOUT_REQUEST);
this._withClient(function(client) {
this.withClient(function(client) {
var $__4;

@@ -154,6 +185,6 @@ ($__4 = client[command]).call.apply($__4, $__spread([client], args, [oncomplete]));

var message = error.toString();
if (message != "Error: Connection closed") this._logger.error("Client error in queue %s: %s", this.name, message);
if (this._deferred) {
this._deferred.reject(error);
this._deferred = null;
if (message != "Error: Connection closed") this.notify.error("Client error in queue %s: %s", this.name, message);
if (this.deferred) {
this.deferred.reject(error);
this.deferred = null;
}

@@ -167,5 +198,5 @@ var pending = this.pending.slice();

},
_withClient: function(fn) {
if (!this._deferred) this._connect();
this._deferred.promise.then(fn, (function(error) {
withClient: function(fn) {
if (!this.deferred) this.connect();
this.deferred.promise.then(fn, (function(error) {
this.fail(error);

@@ -175,8 +206,8 @@ this.use(fn);

},
_connect: function() {
var client = new fivebeans.client(this.host, this.port);
connect: function() {
var client = new fivebeans.client(this.config.hostname, this.config.port);
var deferred = Q.defer();
var authenticateAndSetup = (function() {
if (this.authenticate) {
client.put(0, 0, 0, this.authenticate, function(error) {
if (this.config.authenticate) {
client.put(0, 0, 0, this.config.authenticate, function(error) {
if (error) {

@@ -205,3 +236,3 @@ deferred.reject(error);

client.connect();
this._deferred = deferred;
this.deferred = deferred;
}

@@ -214,31 +245,34 @@ }, {});

var $Queue = ($__createClassNoExtends)({
constructor: function($__3) {
var name = $__3.name, prefixedName = $__3.prefixedName, host = $__3.host, port = $__3.port, authenticate = $__3.authenticate, webhookURL = $__3.webhookURL, logger = $__3.logger;
constructor: function(name, server) {
this.name = name;
this.webhookURL = webhookURL;
this._logger = logger;
this.notify = server.notify;
this.webhookURL = server.config.webhookURL(name);
this._server = server;
this._prefixedName = server.config.prefixedName(name);
this._processing = false;
this._handler = null;
this._putSession = new Session({
name: name,
host: host,
port: port,
authenticate: authenticate,
logger: logger
}, function(client, callback) {
client.use(prefixedName, callback);
});
this._getSession = new Session({
name: name,
host: host,
port: port,
authenticate: authenticate,
logger: logger
}, function(client, callback) {
client.ignore('default', function() {
client.watch(prefixedName, callback);
});
});
},
put: function(job, options, callback) {
get _put() {
var session = this._putSession;
if (!session) {
session = new Session(this.name, this._server, (function(client, callback) {
client.use(this._prefixedName, callback);
}).bind(this));
this._putSession = session;
}
return session;
},
get _reserve() {
var session = this._reserveSession;
if (!session) {
session = new Session(this.name, this._server, (function(client, callback) {
client.ignore('default', (function() {
client.watch(this._prefixedName, callback);
}).bind(this));
}).bind(this));
this._reserveSession = session;
}
return session;
},
push: function(job, options, callback) {
assert(job, "Missing job to queue");

@@ -251,4 +285,4 @@ if (typeof (options) == 'function') {

var delay = (options && options.delay) || 0;
this._putSession.request('put', 0, delay, PROCESSING_TIMEOUT / 1000, payload, (function(error, jobID) {
if (callback) callback(error); else if (error) this._logger.error("Error talking to Beanstalkd, queue %s: %s", this.name, error);
this._put.request('put', 0, delay, PROCESSING_TIMEOUT / 1000, payload, (function(error, jobID) {
if (callback) callback(error); else if (error) this.notify.error("Error talking to Beanstalkd, queue %s: %s", this.name, error);
}).bind(this));

@@ -277,4 +311,4 @@ },

}
this._logger.debug("Waiting for jobs on queue %s", this.name);
this._getSession.request('reserve_with_timeout', 0, (function(error, jobID, payload) {
this.notify.debug("Waiting for jobs on queue %s", this.name);
this._reserve.request('reserve_with_timeout', 0, (function(error, jobID, payload) {
if (error == 'DEADLINE_SOON' || error == 'TIMED_OUT' || (error && error.message == 'TIMED_OUT')) callback(null, false); else if (error) callback(error); else if (payload) this._processJob(jobID, payload, function(error) {

@@ -288,9 +322,9 @@ callback(error, !error);

if (!this._processing) return;
this._getSession.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000, (function(error, jobID, payload) {
this._reserve.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000, (function(error, jobID, payload) {
if (error == 'DEADLINE_SOON' || error == 'TIMED_OUT' || (error && error.message == 'TIMED_OUT')) setImmediate(pickNextJob); else if (error) {
this._logger.error(error);
this.notify.error(error);
setTimeout(pickNextJob, ERROR_BACKOFF);
} else {
this._processJob(jobID, payload, (function(error) {
if (error) this._logger.error(error);
if (error) this.notify.error(error);
setImmediate(pickNextJob);

@@ -301,3 +335,3 @@ }).bind(this));

}).bind(this);
this._logger.debug("Waiting for jobs on queue %s", this.name);
this.notify.debug("Waiting for jobs on queue %s", this.name);
pickNextJob();

@@ -316,6 +350,6 @@ },

outcomeDeferred.promise.then((function() {
this._getSession.request('destroy', jobID, (function(error) {
if (error) this._logger.error(error.stack);
this._reserve.request('destroy', jobID, (function(error) {
if (error) this.notify.error(error.stack);
}).bind(this));
this._logger.info("Completed job %s from queue %s", jobID, this.name);
this.notify.info("Completed job %s from queue %s", jobID, this.name);
clearTimeout(errorOnTimeout);

@@ -325,6 +359,6 @@ callback();

var delay = (process.env.NODE_ENV == 'test' ? 0: RELEASE_DELAY);
this._getSession.request('release', jobID, 0, delay / 1000, (function(error) {
if (error) this._logger.error(error.stack);
this._reserve.request('release', jobID, 0, delay / 1000, (function(error) {
if (error) this.notify.error(error.stack);
}).bind(this));
this._logger.error("Error processing job %s from queue %s: %s", jobID, this.name, error.stack);
this.notify.error("Error processing job %s from queue %s: %s", jobID, this.name, error.stack);
clearTimeout(errorOnTimeout);

@@ -334,4 +368,4 @@ callback(error);

domain.run((function() {
this._logger.info("Picked up job %s from queue %s", jobID, this.name);
this._logger.debug("Processing %s: %s", jobID, payload.toString());
this.notify.info("Picked up job %s from queue %s", jobID, this.name);
this.notify.debug("Processing %s: %s", jobID, payload.toString());
var job = payload;

@@ -347,3 +381,3 @@ try {

reset: function(callback) {
this._putSession._withClient(function(client) {
this._put.withClient(function(client) {
function deleteNextJob() {

@@ -350,0 +384,0 @@ client.reserve_with_timeout(0, function(error, jobID, payload) {

{
"name": "ironium",
"version": "0.1.8",
"version": "0.2.0",
"scripts": {
"test": "./node_modules/.bin/mocha",
"build": "grunt build",
"prepublish": "grunt build"

@@ -27,3 +26,5 @@ },

"grunt-traceur": "aaronfrost/grunt-traceur",
"grunt-contrib-clean": "~0.5.0"
"grunt-contrib-clean": "~0.5.0",
"grunt-contrib-watch": "~0.5.3",
"grunt-notify": "~0.2.16"
},

@@ -30,0 +31,0 @@ "repository": {

@@ -1,131 +0,223 @@

# Job Queues
**[Ironium](https://github.com/assaf/ironium)** A simple API for working with
job queues and scheduled jobs, using
[Beanstalkd](http://kr.github.io/beanstalkd/) and/or
[Iron.io](http://www.iron.io/).
A simple API for working with job queues. Can use
[Iron.io](http://www.iron.io/) or Beanstalkd.
## The Why
## Accessing Queues
Building a modern Web application involves a lot of workload that runs outside
the application request/response cycle.
The API consists of a single function that returns the named queue:
Some tasks take a long time to complete (e.g. updating 3rd party APIs), and you
want to run after you've sent the response back to the user. But since you send
back a response, you better get the task to complete, by retrying if necessary.
This is where job queues become useful:
- They can distribute the workload over multiple servers, and in time
- They can smooth out transient errors by retrying every joy
- They can be used for jobs queued by the application and 3rd party
([Webhooks](http://www.webhooks.org/))
Since high quality job queues exists, might as well use them. One option is
[Beanstalkd](http://kr.github.io/beanstalkd/) which is easy to install and use,
and battle tested.
Another option is [Iron.io](http://www.iron.io/), a paid service that supports
the same API and can be used interchangeable with Beanstalkd. This allows you
to use Beanstalkd for development/testing, and Iron.io for production instances.
Besides managed up-time and a management GUI, Iron.io can also handle Webhooks
(sending and receiving updates) for you.
### The How
Ironium has a simple API that exposes three primary methods:
- `push` a job into a queue
- `each` to process each job from a queue
- `schedule` a job to run at given schedule
There's more than that, so let's explore it. The main object provides access to
all the workers, and in particular has the following methods:
#### queue(name)
Returns the named queue. Calling this method with the same name will always
return the same queue. Queues are created on-the-fly, you don't need to setup
anything before accessing a queue.
You can immediately push new jobs into the queue. To process all jobs, you need
to first start the workers. This distinction allows you to push jobs from any
Node.js servers, but only process jobs from specific nodes.
For example, your code may have:
```
var queues = require("./lib/queues");
var myQueue = queues("my-queue");
```
var workers = require('ironium');
var sendWelcomeEmail = workers.queue('send-welcome-email');
Pushing and processing messages is done via the returned queue object (see
below).
// If this is a new customer, queue sending welcome email.
customer.on('save', function(next) {
if (this.isNew)
sendWelcomeEmail.push(this.id, next);
else
next();
});
An additional method is available in test environment only, to discard of all
queues before/after running test:
sendWelcomeEmail.each(function(id, callback) {
// Do something to render and send email
callback();
});
```
before(queues.clearAll);
```
As you can see from this example, each queue has two interesting methods, `push`
and `each`.
## Using Queues
#### queue.push(job, callback)
### put(job, options, callback)
Pushes a new job into the queue. The job can be any JavaScript value that
serializes into JSON, so object, arrays, string are all usable.
Use the `put` method to put a job in the queue.
If you call `push` with a callback, the callback will be notified after the job
has been successfully added to the queue. Use this to make sure the job is
queued before proceeding.
- job - The object to place on the queue. This object is serialized into
a JSON string, so must only contain data values, no circular references.
- options - Control how the job is processed (see below)
- callback - Optional
#### queue.each(fn)
Currently supported options:
Processes each job from the queue. The function is called with two arguments,
the job (see `push` above) and a callback. You must call that callback when
done processing the job.
- delay - How long before the job is available, in seconds, defaults to 0
If you call the callback with an error, the job is considered to have failed, is
put back in the queue, and will be picked up again after a 1 minute delay.
If you're queuing requests initiated by the end-user, you likely care whether
the job was queued successfully. Use a callback to determine that.
If you don't call the callback within a minute, the job will be considered to
have failed, and will be put back in the queue as well.
If one minute of processing doesn't sound like enough, consider breaking large
jobs into smaller chunks. Also consider that time-outs are a necessary evil,
given the likelihood of a bug resulting in jobs that never complete, and the
halting problem being NP hard.
### get(handler, concurrency)
#### queue.name
Use the `get` method to process jobs from the queue.
The queue name (property, not a method). This name does not include the prefix.
- handler - The function that will be called for each job
#### queue.webhookURL
The handler is called for each job with the following arguments:
The URL for recieving Webhook requests and queuing them, using Iron.io. This
URL is only valid if your configured the workers with a project ID and token.
- job - The job to process
- done - Called when done processing the job
The job to process is typically an object de-serialized from the JSON
representation (see `put` method). However, it may also be a string, e.g.
FullContact pushes URL-encoded name/value pairs to the queue.
#### schedule(name, time, job)
Once the job has been processed successfully, the handler must call then `done`
method.
TBD Schedules the named job to run at the given schedule.
If the job cannot be processed successfully, the handler calls `done` with an
error: the error will be logged, and the job will be put back in the queue, from
where it will be processed again after a short delay.
If the handler does not complete in time (60 seconds), the job will be returned
to the queue and the next job will be processed.
#### configure(object)
Handlers are invoked sequentially, except for the case where a handler is
considered to have timed-out.
Configure the workers (see below).
### name
#### start()
This property returns the queue name.
You must call this method to start the workers. Jobs can be queued, but will
not be processed until the workers are started.
This extra space allows you to load the same code in every environment, and
queue jobs on any server, but only process jobs on dedicated worker servers.
### webhookURL
This property returns the URL of a Webhook end-point associated with this queue.
Messages posted to this URL will show up as jobs in the queue.
#### stop()
You can call this method to stop the workers.
### onDrain(callback)
Called when there are no more messages to process. This is only availabe in
test environment.
#### once(callback)
Use this when testing. It will run all scheduled jobs exactly once (regardless
of schedule), process all queued jobs, and finally call the callback.
### clear(callback)
This method exists since you cannot reliably pair `start` and `stop`.
Remove all jobs from the queue. This is only availabe in test environment.
#### reset(callback)
## Iron.io
Use this when testing (setup and/or teardown). It will delete all queued jobs,
then call the callback.
To use [Iron.io](http://hud.iron.io/), the queue configuration must specify the
following properties:
* `host` - The Iron.io host name
* `projectID` - The project identifier
* `token` - The authentication token
These can be specified in the configuration file, or by setting the environment
variables `IRON_PROJECT_ID` and `IRONIO_TOKEN`.
## Configurations
Please use the testing project ID when testing the Iron.io integration.
For development and testing you can typically get by with the default
configuration. For production, you may want to set the server in use, as simple
as passing a configuration object to `workers.configure`:
```
var workers = require('ironium');
## Testing/Development
if (process.env.NODE_ENV == 'production')
workers.configure({
queues: {
hostname: 'my.beanstalkd.server'
}
});
```
For regular testing and development, we use Beanstalkd. The queue configuration
need only specify:
Or load it form a JSON configuration file:
* `host` - The hostname (defaults to "localhost")
* `port` - The port number (defaults to 11300)
```
var workers = require('ironium');
var config = require('./workers.json');
When testing, the `queues` function has two properties you can call on it that
affect all queues.
if (process.env.NODE_ENV == 'production')
workers.configure(config);
```
### onDrainAll(callback)
The configuration options are:
Called when there are no more messages to process on any queue.
* `queues.hostname` - Hostname of the queue server (defaults to `localhost`)
* `queues.port` - Port number for the queue server (defaults to 11300)
* `queues.prefix` - Prefix all queue names (when `NODE_ENV == test`,
defaults to `test-`)
* `queues.token` - When using Iron.io, the API token (get it from the
project's credentials page)
* `queues.projectID` - When using Iron.io, the API project ID (get it from the
project's credentials page)
### clearAll(callback)
If you're running in development or test environment with a local Beanstalkd
server, you can use the default configuration, which points to `localhost` port
`11300` and uses the prefix `test-` in test envrionment.
Remove all jobs from all queues.
If you're running in production against a Beanstalkd, you will likely need to
set the hostname and port number.
If you're running in production against an [Iron.io](https://hud.iron.io/)
server, you will need to set the hostname to `"mq-aws-us-east-1.iron.io"`, and
set the `token` and `projectID` based on the Iron.io project's credentials.
## Development
Ironium is written in ECMAScript 6, because better syntax. Specifically you'll
notice that `let` and `const` replaced all usage of `var`, class definitions are
easier to read in the new syntax, and fat arrows (`=>`) replace `that = this`.
However, the code doesn't use any ES6 library improvements (`Map`, `startsWith`,
etc), since these can't be added without polluting the global namespace.
The source files live in the `src` directory, and compiled into the `lib`
directory with Grunt. Specifically:
```
grunt build # Compile source files from src/ into lib/ directory
grunt watch # Continously compile source files on every change
grunt clean # Clean compiled files in lib/ directory
grunt # Shortcut for grunt build watch
```
The test suite is non-existent at the moment, but if it were to exist, you would
run it with `npm test` or `mocha`.

@@ -16,4 +16,4 @@ const assert = require('assert');

// Returns the named queue. Returned objects has the methods `put` and
// `process`.
// Returns the named queue. Returned objects has the methods `push` and
// `each`.
queue(name) {

@@ -26,4 +26,3 @@ return this.queues.getQueue(name);

if (!queues) {
let config = this._config || {};
queues = new Queues(this, config);
queues = new Queues(this);
this._queues = queues;

@@ -30,0 +29,0 @@ }

@@ -10,6 +10,6 @@ const _ = require('lodash');

// How long to wait when reserving a job (in seconds).
// How long to wait when reserving a job.
const RESERVE_TIMEOUT = ms('5m');
// How long before we consider a request failed due to timeout (in seconds).
// How long before we consider a request failed due to timeout.
// Should be longer than RESERVE_TIMEOUT.

@@ -19,3 +19,3 @@ const TIMEOUT_REQUEST = RESERVE_TIMEOUT + ms('1s');

// Timeout for processing job before we consider it failed and release it back
// to the queue (in seconds).
// to the queue.
const PROCESSING_TIMEOUT = ms('2m');

@@ -33,31 +33,63 @@

class Configuration {
constructor(workers) {
this._workers = workers;
}
// Abstracts an Iron.io project / Beanstalkd configuration.
//
// Configured with:
// host - Host name (defaults to localhost)
// port - Port number (defaults to 11300)
// projectID - Iron.io project ID
// token - Iron.io authentication token
// prefix - Queue name prefix (e.g. test-)
module.exports = class Queues {
get config() {
let config = this._config;
if (!config) {
let source = this._workers._config && this._workers._config.queues;
if (!source)
source = (process.env.NODE_ENV == 'test') ? { prefix: 'test-' } : {};
this._config = config = {
hostname: source.hostname || 'localhost',
port: source.port || 11300,
prefix: source.prefix
};
if (source.token) {
config.authenticate = 'oauth ' + source.token + ' ' + source.projectID;
config.webhookURL = 'https://' + source.hostname + '/1/projects/' + source.projectID +
'/queues/{queueName}/messages/webhook?oauth=' + source.token;
} else {
config.authenticate = null;
config.webhookURL = 'https://<host>/1/projects/<project>/queues/{queueName}/messages/webhook?oauth=<token>';
}
}
return config;
}
constructor(logger = console, { host = 'localhost', port = 11300, projectID, token, prefix }) {
this._logger = logger;
// When using Iron.io, we send an authentication string based on the token.
// Not applicable with standalone Beanstalkd.
if (token)
this._authenticate = 'oauth ' + token + ' ' + projectID;
// These are used to connect the client.
this._host = host;
this._port = port;
// Prefix used in certain environment, e.g. "test-"
this._prefix = prefix || '';
// Base URL for all Webhooks, queues set their name via interoplation.
this._webhookURL = 'https://' + host + '/1/projects/' + projectID +
'/queues/{queueName}/messages/webhook?oauth=' + token;
// Map (un-prefixed) queue name to queue.
this._queues = Object.create({});
get hostname() {
return this.config.hostname;
}
get port() {
return this.config.port;
}
prefixedName(queueName) {
return (this.config.prefix || '') + queueName;
}
// When using Iron.io, we send an authentication string based on the token.
// Not applicable with standalone Beanstalkd.
get authenticate() {
return this.config.authenticate;
}
webhookURL(queueName) {
return this.config.webhookURL.replace('{queueName}', queueName);
}
}
// Abstracts a queue server, Beanstalkd or compatible, specifically Iron.io.
module.exports = class Server {
constructor(workers) {
this.notify = workers;
this.config = new Configuration(workers);
this._queues = Object.create({});
}
// Returns a new queue.

@@ -68,11 +100,3 @@ getQueue(name) {

if (!queue) {
queue = new Queue({
name,
prefixedName: this._prefix + name,
host: this._host,
port: this._port,
authenticate: this._authenticate,
webhookURL: this._webhookURL.replace('{queueName}', name),
logger: this._logger
});
queue = new Queue(name, this);
this._queues[name] = queue;

@@ -84,3 +108,3 @@ }

start() {
this._logger.debug("Start all queues");
this.notify.debug("Start all queues");
this._foreachQueue((queue)=> queue.start());

@@ -90,3 +114,3 @@ }

stop(callback) {
this._logger.debug("Stop all queues");
this.notify.debug("Stop all queues");
this._foreachQueue((queue)=> queue.stop());

@@ -97,3 +121,3 @@ }

reset(callback) {
this._logger.debug("Clear all queues");
this.notify.debug("Clear all queues");
this._foreachQueue((queue, done)=> queue.reset(done), callback);

@@ -104,3 +128,3 @@ }

once(callback) {
this._logger.debug("Process all queued jobs");
this.notify.debug("Process all queued jobs");
let queues = _.values(this._queues);

@@ -159,10 +183,8 @@ function iterate() {

// callback - Call with error or nothing
constructor({ name, host, port, authenticate, logger }, setup) {
constructor(name, server, setup) {
this.name = name;
this.host = host;
this.port = port;
this.authenticate = authenticate;
this.config = server.config;
this.notify = server.notify;
this.setup = setup;
this.pending = [];
this._logger = logger;
}

@@ -199,3 +221,3 @@

// Get Fivebeans to execute this command.
this._withClient(function(client) {
this.withClient(function(client) {
client[command].call(client, ...args, oncomplete);

@@ -210,8 +232,8 @@ });

if (message != "Error: Connection closed")
this._logger.error("Client error in queue %s: %s", this.name, message);
this.notify.error("Client error in queue %s: %s", this.name, message);
// If we're in the process of setting up a connection, reject the promise.
// Discard the promise, next/recursive call to use(fn) will re-connect.
if (this._deferred) {
this._deferred.reject(error);
this._deferred = null;
if (this.deferred) {
this.deferred.reject(error);
this.deferred = null;
}

@@ -231,6 +253,6 @@ // Fail all pending requests.

// function.
_withClient(fn) {
if (!this._deferred)
this._connect();
this._deferred.promise.then(fn, (error)=> {
withClient(fn) {
if (!this.deferred)
this.connect();
this.deferred.promise.then(fn, (error)=> {
this.fail(error);

@@ -243,5 +265,5 @@ this.use(fn);

// to a fully setup Fivebeans client.
_connect() {
connect() {
// This is the Fivebeans client is essentially a session.
let client = new fivebeans.client(this.host, this.port);
let client = new fivebeans.client(this.config.hostname, this.config.port);
let deferred = Q.defer();

@@ -252,4 +274,4 @@

let authenticateAndSetup = ()=> {
if (this.authenticate) {
client.put(0, 0, 0, this.authenticate, function(error) {
if (this.config.authenticate) {
client.put(0, 0, 0, this.config.authenticate, function(error) {
if (error) {

@@ -293,3 +315,3 @@ deferred.reject(error);

// Make sure use/fail methods have access to the promise.
this._deferred = deferred;
this.deferred = deferred;
}

@@ -305,26 +327,43 @@

// webhookURL - URL for receiving Webhook posts (Iron.io only)
// put - Method for putting message in the queue
// each - Method for processing messages from the queue
// push - Method for pushing job to the queue
// each - Method for processing jobs from the queue
class Queue {
constructor({ name, prefixedName, host, port, authenticate, webhookURL, logger }) {
this.name = name;
this.webhookURL = webhookURL;
this._logger = logger;
constructor(name, server) {
this.name = name;
this.notify = server.notify;
this.webhookURL = server.config.webhookURL(name);
this._server = server;
this._prefixedName = server.config.prefixedName(name);
this._processing = false;
this._handler = null;
}
// Session for storing messages and other manipulations.
// Setup: tell Beanstalkd which tube to use (persistent to session).
this._putSession = new Session({ name, host, port, authenticate, logger }, function(client, callback) {
client.use(prefixedName, callback);
});
// Session for storing messages and other manipulations.
get _put() {
let session = this._putSession;
if (!session) {
// Setup: tell Beanstalkd which tube to use (persistent to session).
session = new Session(this.name, this._server, (client, callback)=> {
client.use(this._prefixedName, callback);
});
this._putSession = session;
}
return session;
}
// Session for processing messages, continously blocks so don't use elsewhere.
// Setup: tell Beanstalkd which tube we're watching (and ignore default tube).
this._getSession = new Session({ name, host, port, authenticate, logger }, function(client, callback) {
client.ignore('default', function() {
client.watch(prefixedName, callback);
// Session for processing messages, continously blocks so don't use elsewhere.
get _reserve() {
let session = this._reserveSession;
if (!session) {
// Setup: tell Beanstalkd which tube we're watching (and ignore default tube).
session = new Session(this.name, this._server, (client, callback)=> {
client.ignore('default', ()=> {
client.watch(this._prefixedName, callback);
});
});
});
this._reserveSession = session;
}
return session;
}

@@ -334,3 +373,3 @@

// Push job to queue.
put(job, options, callback) {
push(job, options, callback) {
assert(job, "Missing job to queue");

@@ -345,3 +384,3 @@ if (typeof(options) == 'function') {

this._putSession.request('put', 0, delay, PROCESSING_TIMEOUT / 1000, payload, (error, jobID)=> {
this._put.request('put', 0, delay, PROCESSING_TIMEOUT / 1000, payload, (error, jobID)=> {
// Don't pass jobID to callback, easy to use in test before hook, like

@@ -353,3 +392,3 @@ // this:

else if (error)
this._logger.error("Error talking to Beanstalkd, queue %s: %s", this.name, error);
this.notify.error("Error talking to Beanstalkd, queue %s: %s", this.name, error);
});

@@ -390,4 +429,4 @@ }

this._logger.debug("Waiting for jobs on queue %s", this.name);
this._getSession.request('reserve_with_timeout', 0, (error, jobID, payload)=> {
this.notify.debug("Waiting for jobs on queue %s", this.name);
this._reserve.request('reserve_with_timeout', 0, (error, jobID, payload)=> {
if (error == 'DEADLINE_SOON' || error == 'TIMED_OUT' || (error && error.message == 'TIMED_OUT'))

@@ -414,7 +453,7 @@ callback(null, false);

this._getSession.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000, (error, jobID, payload)=> {
this._reserve.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000, (error, jobID, payload)=> {
if (error == 'DEADLINE_SOON' || error == 'TIMED_OUT' || (error && error.message == 'TIMED_OUT'))
setImmediate(pickNextJob);
else if (error) {
this._logger.error(error);
this.notify.error(error);
setTimeout(pickNextJob, ERROR_BACKOFF);

@@ -424,3 +463,3 @@ } else {

if (error)
this._logger.error(error);
this.notify.error(error);
setImmediate(pickNextJob);

@@ -432,3 +471,3 @@ });

this._logger.debug("Waiting for jobs on queue %s", this.name);
this.notify.debug("Waiting for jobs on queue %s", this.name);
pickNextJob();

@@ -466,7 +505,7 @@ }

// Promise resolved on successful completion; we destroy the job.
this._getSession.request('destroy', jobID, (error)=> {
this._reserve.request('destroy', jobID, (error)=> {
if (error)
this._logger.error(error.stack);
this.notify.error(error.stack);
});
this._logger.info("Completed job %s from queue %s", jobID, this.name);
this.notify.info("Completed job %s from queue %s", jobID, this.name);
// Move on to process next job.

@@ -481,7 +520,7 @@ clearTimeout(errorOnTimeout);

let delay = (process.env.NODE_ENV == 'test' ? 0 : RELEASE_DELAY);
this._getSession.request('release', jobID, 0, delay / 1000, (error)=> {
this._reserve.request('release', jobID, 0, delay / 1000, (error)=> {
if (error)
this._logger.error(error.stack);
this.notify.error(error.stack);
});
this._logger.error("Error processing job %s from queue %s: %s", jobID, this.name, error.stack);
this.notify.error("Error processing job %s from queue %s: %s", jobID, this.name, error.stack);
// Move on to process next job.

@@ -496,4 +535,4 @@ clearTimeout(errorOnTimeout);

domain.run(()=> {
this._logger.info("Picked up job %s from queue %s", jobID, this.name);
this._logger.debug("Processing %s: %s", jobID, payload.toString());
this.notify.info("Picked up job %s from queue %s", jobID, this.name);
this.notify.debug("Processing %s: %s", jobID, payload.toString());
// Typically we queue JSON objects, but the payload may be just a

@@ -515,3 +554,3 @@ // string, e.g. some services send URL encoded name/value pairs, or MIME

reset(callback) {
this._putSession._withClient(function(client) {
this._put.withClient(function(client) {
function deleteNextJob() {

@@ -518,0 +557,0 @@ client.reserve_with_timeout(0, function(error, jobID, payload) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc