New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

rethinkdb-job-queue

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rethinkdb-job-queue - npm Package Compare versions

Comparing version 0.2.1 to 0.3.0

dist/queue-state.js

5

CHANGELOG.md
# `rethinkdb-job-queue` Change log
## v0.3.0 / 2016-09-28
* Added global queue pause.
* Fixed tests for global queue pause.
## v0.2.1 / 2016-09-27

@@ -4,0 +9,0 @@

10

dist/enums.js

@@ -13,2 +13,7 @@ 'use strict';

state: {
docId: '86f6ff5b-0c4e-46ad-9a5f-e90eb19c9b00',
global: 'global',
local: 'local'
},
priority: {

@@ -96,4 +101,7 @@ lowest: 60,

concurrencyInvalid: 'Invalid concurrency value',
cancelCallbackInvalid: 'The onCancel callback is not a function'
cancelCallbackInvalid: 'The onCancel callback is not a function',
globalStateError: 'The global state document change feed is invalid',
noErrorStack: 'The error has no stack detail',
noErrorMessage: 'The error has no messag'
}
};

5

dist/job-failed.js

@@ -27,6 +27,7 @@ 'use strict';

var errMessage = err && err.message ? err.message : err;
var log = job.createLog(errMessage, logType);
var log = job.createLog(enums.message.failed, logType);
log.duration = duration;
log.retryCount = job.retryCount;
log.errorMessage = err && err.message ? err.message : enums.message.noErrorMessage;
log.errorStack = err && err.stack ? err.stack : enums.message.noErrorStack;

@@ -33,0 +34,0 @@ return Promise.resolve().then(function () {

@@ -10,3 +10,2 @@ 'use strict';

// skipStatusCheck is for ease of adding jobs during tests
module.exports = function queueAddJob(q, job, skipStatusCheck) {

@@ -13,0 +12,0 @@ logger('addJob', job);

@@ -8,6 +8,8 @@ 'use strict';

var queueProcess = require('./queue-process');
var queueInterruption = require('./queue-interruption');
// Following is the list of supported change feed events;
// paused - Global event
// resumed - Global event
// added
// log
// active

@@ -20,2 +22,3 @@ // progress

// removed
// log

@@ -25,11 +28,12 @@ module.exports = function queueChange(q, err) {

logger('queueChange');
logger('queueChange', change);
var newVal = change.new_val;
var oldVal = change.old_val;
var queueId = false;
if (is.job(newVal)) {
if (newVal && newVal.queueId) {
queueId = newVal.queueId;
}
if (!is.job(newVal) && is.job(oldVal)) {
if (!newVal && oldVal && oldVal.queueId) {
queueId = oldVal.queueId;

@@ -43,3 +47,3 @@ }

// Prevent any change processing if change is caused by this queue
if (queueId === q.id && !q.testing) {
if (queueId === q.id) {
logger('Change feed by self, skipping events');

@@ -53,7 +57,27 @@ return;

if (q.testing) {
logger('------------- QUEUE CHANGE -------------');
logger(util.inspect(change, { colors: true }));
logger('----------------------------------------');
logger('------------- QUEUE CHANGE -------------');
logger(util.inspect(change, { colors: true }));
logger(queueId);
logger('----------------------------------------');
// Queue global state change
if (!newVal && oldVal && oldVal.id && oldVal.id === enums.state.docId) {
// Ignoring state document deletion.
return enums.status.active;
}
if (newVal && newVal.id && newVal.id === enums.state.docId) {
logger('State document changed');
if (newVal && newVal.state) {
if (newVal.state === enums.status.paused) {
logger('Global queue state paused');
return queueInterruption.pause(q, enums.state.global);
}
if (newVal.state === enums.status.active) {
logger('Global queue state active');
return queueInterruption.resume(q, enums.state.global);
}
}
q.emit(enums.status.error, new Error(enums.message.globalStateError));
return enums.status.error;
}

@@ -77,2 +101,9 @@ // Job added

// Job progress
if (is.job(newVal) && is.job(oldVal) && newVal.progress !== oldVal.progress) {
logger('Event: progress [' + newVal.progress + ']');
q.emit(enums.status.progress, newVal.id, newVal.progress);
return enums.status.progress;
}
// Job completed

@@ -85,9 +116,2 @@ if (is.completed(newVal) && !is.completed(oldVal)) {

// Job removed
if (!is.job(newVal) && is.job(oldVal)) {
logger('Event: removed [' + oldVal.id + ']');
q.emit(enums.status.removed, oldVal.id);
return enums.status.removed;
}
// Job cancelled

@@ -114,7 +138,7 @@ if (is.cancelled(newVal) && !is.cancelled(oldVal)) {

// Job progress
if (is.job(newVal) && is.job(oldVal) && newVal.progress !== oldVal.progress) {
logger('Event: progress [' + newVal.progress + ']');
q.emit(enums.status.progress, newVal.id, newVal.progress);
return enums.status.progress;
// Job removed
if (!is.job(newVal) && is.job(oldVal)) {
logger('Event: removed [' + oldVal.id + ']');
q.emit(enums.status.removed, oldVal.id);
return enums.status.removed;
}

@@ -121,0 +145,0 @@

@@ -6,23 +6,34 @@ 'use strict';

var enums = require('./enums');
var is = require('./is');
var queueProcess = require('./queue-process');
var queueState = require('./queue-state');
module.exports.pause = function interruptionPause(q) {
logger('pause');
return new Promise(function (resolve, reject) {
q._paused = true;
logger('Event: pausing [' + q.id + ']');
q.emit(enums.status.pausing, q.id);
if (q.running < 1) {
return resolve();
module.exports.pause = function interruptionPause(q, source) {
logger('pause', source);
q._paused = true;
var makeGlobal = is.true(source);
var eventGlobal = makeGlobal || source === enums.state.global;
return q.ready().then(function () {
if (makeGlobal) {
return queueState(q, enums.status.paused);
}
var intId = setInterval(function pausing() {
logger('Pausing, waiting on running jobs: [' + q.running + ']');
return;
}).then(function () {
return new Promise(function (resolve, reject) {
logger('Event: pausing [' + q.id + ']');
q.emit(enums.status.pausing, eventGlobal, q.id);
if (q.running < 1) {
clearInterval(intId);
resolve();
return resolve();
}
}, 400);
var intId = setInterval(function pausing() {
logger('Pausing, waiting on running jobs: [' + q.running + ']');
if (q.running < 1) {
clearInterval(intId);
resolve();
}
}, 400);
});
}).then(function () {
logger('Event: paused [' + q.id + ']');
q.emit(enums.status.paused, q.id);
logger('Event: paused [global:' + eventGlobal + '] [' + q.id + ']');
q.emit(enums.status.paused, eventGlobal, q.id);
return true;

@@ -32,11 +43,18 @@ });

module.exports.resume = function interruptionResume(q) {
logger('resume');
return Promise.resolve().then(function () {
q._paused = false;
module.exports.resume = function interruptionResume(q, source) {
logger('resume', source);
q._paused = false;
var makeGlobal = is.true(source);
var eventGlobal = makeGlobal || source === enums.state.global;
return q.ready().then(function () {
if (makeGlobal) {
return queueState(q, enums.status.active);
}
return;
}).then(function () {
queueProcess.restart(q);
logger('Event: resumed [' + q.id + ']');
q.emit(enums.status.resumed, q.id);
logger('Event: resumed [global:' + eventGlobal + '] [' + q.id + ']');
q.emit(enums.status.resumed, eventGlobal, q.id);
return true;
});
};

@@ -203,3 +203,3 @@ 'use strict';

key: 'pause',
value: function pause() {
value: function pause(global) {
var _this10 = this;

@@ -209,3 +209,3 @@

return this.ready().then(function () {
return queueInterruption.pause(_this10);
return queueInterruption.pause(_this10, global);
}).catch(function (err) {

@@ -219,3 +219,3 @@ logger('pause Error:', err);

key: 'resume',
value: function resume() {
value: function resume(global) {
var _this11 = this;

@@ -225,3 +225,3 @@

return this.ready().then(function () {
return queueInterruption.resume(_this11);
return queueInterruption.resume(_this11, global);
}).catch(function (err) {

@@ -228,0 +228,0 @@ logger('resume Error:', err);

{
"name": "rethinkdb-job-queue",
"version": "0.2.1",
"version": "0.3.0",
"description": "A persistent job or task queue backed by RethinkDB.",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -37,3 +37,3 @@ # Introduction

* Rich job [history log][job-log-url]
* Over 1200 [integration tests][testing-url]
* Over 1300 [integration tests][testing-url]

@@ -65,3 +65,3 @@

* This `rethinkdb-job-queue` module is fully functional.
* There are over 1200 integration tests.
* There are over 1300 integration tests.
* This project is complete and needs to be taken out for a spin.

@@ -68,0 +68,0 @@ * In a few months I will bump the version up to 1.0.0 to support [SemVer](http://semver.org/).

@@ -7,3 +7,3 @@ const test = require('tape')

test('enums', (t) => {
t.plan(12)
t.plan(13)

@@ -17,8 +17,9 @@ try {

t.equal(enums.priorityFromValue(10), 'highest', 'Priority from value 10 returns highest')
t.equal(Object.keys(enums.priority).length, 6, 'Enums priority has has correct number of keys')
t.equal(Object.keys(enums.state).length, 3, 'Enums state has the correct number of keys')
t.equal(Object.keys(enums.priority).length, 6, 'Enums priority has correct number of keys')
t.equal(Object.keys(enums.status).length, 25, 'Enums status has correct number of keys')
t.equal(Object.keys(enums.options).length, 11, 'Enums options has correct number of keys')
t.equal(Object.keys(enums.index).length, 3, 'Enums index has has correct number of keys')
t.equal(Object.keys(enums.log).length, 3, 'Enums log has has correct number of keys')
t.equal(Object.keys(enums.message).length, 13, 'Enums message has has correct number of keys')
t.equal(Object.keys(enums.index).length, 3, 'Enums index has correct number of keys')
t.equal(Object.keys(enums.log).length, 3, 'Enums log has correct number of keys')
t.equal(Object.keys(enums.message).length, 16, 'Enums message has correct number of keys')
} catch (err) {

@@ -25,0 +26,0 @@ tError(err, module, t)

@@ -14,3 +14,3 @@ const test = require('tape')

test('job-failed', (t) => {
t.plan(76)
t.plan(84)

@@ -83,3 +83,5 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

t.ok(retry1[0].log[1].message, 'Log message exists')
t.equal(retry1[0].log[1].message, err.message, 'Log message is valid')
t.equal(retry1[0].log[1].message, enums.message.failed, 'Log message is valid')
t.equal(retry1[0].log[1].errorMessage, err.message, 'Log error message is valid')
t.equal(retry1[0].log[1].errorStack, err.stack, 'Log stack is valid')
t.ok(retry1[0].log[1].duration >= 0, 'Log duration is >= 0')

@@ -106,3 +108,5 @@

t.ok(retry2[0].log[2].message, 'Log message exists')
t.equal(retry2[0].log[2].message, err.message, 'Log message is valid')
t.equal(retry2[0].log[2].message, enums.message.failed, 'Log message is valid')
t.equal(retry2[0].log[2].errorMessage, err.message, 'Log error message is valid')
t.equal(retry2[0].log[2].errorStack, err.stack, 'Log stack is valid')
t.ok(retry2[0].log[2].duration >= 0, 'Log duration is >= 0')

@@ -130,3 +134,5 @@

t.ok(retry3[0].log[3].message, 'Log message exists')
t.equal(retry3[0].log[3].message, err.message, 'Log message is valid')
t.equal(retry3[0].log[3].message, enums.message.failed, 'Log message is valid')
t.equal(retry3[0].log[3].errorMessage, err.message, 'Log error message is valid')
t.equal(retry3[0].log[3].errorStack, err.stack, 'Log stack is valid')
t.ok(retry3[0].log[3].duration >= 0, 'Log duration is >= 0')

@@ -154,3 +160,5 @@

t.ok(failed[0].log[4].message, 'Log message exists')
t.equal(failed[0].log[4].message, err.message, 'Log message is valid')
t.equal(failed[0].log[4].message, enums.message.failed, 'Log message is valid')
t.equal(failed[0].log[4].errorMessage, err.message, 'Log error message is valid')
t.equal(failed[0].log[4].errorStack, err.stack, 'Log stack is valid')
t.ok(failed[0].log[4].duration >= 0, 'Log duration is >= 0')

@@ -157,0 +165,0 @@

@@ -12,18 +12,46 @@ const test = require('tape')

test('queue-change', (t) => {
t.plan(48)
t.plan(77)
const q = new Queue(tOpts.cxn(), tOpts.default())
// ---------- Event Handler Setup ----------
let testEvents = false
let readyEventCount = 0
let readyEventTotal = 0
function readyEventHandler (queueId) {
readyEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: ready [${readyEventCount} of ${readyEventTotal}] [${queueId}]`)
}
}
let addedEventCount = 0
let addedEventTotal = 3
function addedEventHandler (jobId) {
addedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: added [${jobId}]`)
t.ok(is.uuid(jobId), `Event: added [${addedEventCount} of ${addedEventTotal}] [${jobId}]`)
}
}
let activeEventCount = 0
let activeEventTotal = 3
function activeEventHandler (jobId) {
activeEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: active [${jobId}]`)
t.ok(is.uuid(jobId), `Event: active [${activeEventCount} of ${activeEventTotal}] [${jobId}]`)
}
}
let processingEventCount = 0
let processingEventTotal = 3
function processingEventHandler (jobId) {
processingEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: processing [${processingEventCount} of ${processingEventTotal}] [${jobId}]`)
}
}
let progressEventCount = 0
let progressEventTotal = 3
function progressEventHandler (jobId, percent) {
progressEventCount++
if (testEvents) {
t.pass(`Event: progress [${progressEventCount} of ${progressEventTotal}]`)
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`)

@@ -33,39 +61,159 @@ t.ok(is.number(percent), `Event: progress [${percent}%]`)

}
let completedEventCount = 0
let completedEventTotal = 1
function completedEventHandler (jobId) {
completedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: completed [${jobId}]`)
t.ok(is.uuid(jobId), `Event: completed [${completedEventCount} of ${completedEventTotal}] [${jobId}]`)
}
}
let cancelledEventCount = 0
let cancelledEventTotal = 1
function cancelledEventHandler (jobId) {
cancelledEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: cancelled [${jobId}]`)
t.ok(is.uuid(jobId), `Event: cancelled [${cancelledEventCount} of ${cancelledEventTotal}] [${jobId}]`)
}
}
let failedEventCount = 0
let failedEventTotal = 1
function failedEventHandler (jobId) {
failedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: failed [${jobId}]`)
t.ok(is.uuid(jobId), `Event: failed [${failedEventCount} of ${failedEventTotal}] [${jobId}]`)
}
}
let terminatedEventCount = 0
let terminatedEventTotal = 1
function terminatedEventHandler (jobId) {
terminatedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: terminated [${jobId}]`)
t.ok(is.uuid(jobId), `Event: terminated [${terminatedEventCount} of ${terminatedEventTotal}] [${jobId}]`)
}
}
let logEventCount = 0
let logEventTotal = 1
function logEventHandler (jobId) {
logEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: log [${logEventCount} of ${logEventTotal}] [${jobId}]`)
}
}
let updatedEventCount = 0
let updatedEventTotal = 0
function updatedEventHandler (jobId) {
updatedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: updated [${updatedEventCount} of ${updatedEventTotal}] [${jobId}]`)
}
}
let pausingEventCount = 0
let pausingEventTotal = 2
function pausingEventHandler (global, queueId) {
pausingEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: pausing [${pausingEventCount} of ${pausingEventTotal}]`)
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`)
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`)
}
}
let pausedEventCount = 0
let pausedEventTotal = 2
function pausedEventHandler (global, queueId) {
pausedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: paused [${pausedEventCount} of ${pausedEventTotal}]`)
t.ok(is.boolean(global), `Event: paused [global: ${global}]`)
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`)
}
}
let resumedEventCount = 0
let resumedEventTotal = 2
function resumedEventHandler (global, queueId) {
resumedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: resumed [${resumedEventCount} of ${resumedEventTotal}]`)
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`)
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`)
}
}
let removedEventCount = 0
let removedEventTotal = 1
function removedEventHandler (jobId) {
removedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: removed [${jobId}]`)
t.ok(is.uuid(jobId), `Event: removed [${removedEventCount} of ${removedEventTotal}] [${jobId}]`)
}
}
function logEventHandler (jobId) {
let idleEventCount = 0
let idleEventTotal = 1
function idleEventHandler (queueId) {
idleEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: log [${jobId}]`)
t.ok(is.string(queueId), `Event: idle [${idleEventCount} of ${idleEventTotal}] [${queueId}]`)
}
}
let testEvents = false
let reviewedEventCount = 0
let reviewedEventTotal = 0
function reviewedEventHandler (queueId) {
reviewedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: reviewed [${reviewedEventCount} of ${reviewedEventTotal}] [${queueId}]`)
}
}
let resetEventCount = 0
let resetEventTotal = 1
function resetEventHandler (total) {
resetEventCount++
if (testEvents) {
t.ok(is.integer(total), `Event: reset [${resetEventCount} of ${resetEventTotal}] [${total}]`)
}
}
let errorEventCount = 0
let errorEventTotal = 0
function errorEventHandler (err) {
errorEventCount++
if (testEvents) {
t.ok(is.string(err.message), `Event: error [${errorEventCount} of ${errorEventTotal}] [${err.message}]`)
}
}
let detachedEventCount = 0
let detachedEventTotal = 0
function detachedEventHandler (queueId) {
detachedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: detached [${detachedEventCount} of ${detachedEventTotal}] [${queueId}]`)
}
}
let stoppingEventCount = 0
let stoppingEventTotal = 0
function stoppingEventHandler (queueId) {
stoppingEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: stopping [${stoppingEventCount} of ${stoppingEventTotal}] [${queueId}]`)
}
}
let stoppedEventCount = 0
let stoppedEventTotal = 0
function stoppedEventHandler (queueId) {
stoppedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: stopped [${stoppedEventCount} of ${stoppedEventTotal}] [${queueId}]`)
}
}
let droppedEventCount = 0
let droppedEventTotal = 0
function droppedEventHandler (queueId) {
droppedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: dropped [${droppedEventCount} of ${droppedEventTotal}] [${queueId}]`)
}
}
function addEventHandlers () {
testEvents = true
q.testing = true
q.on(enums.status.ready, readyEventHandler)
q.on(enums.status.added, addedEventHandler)
q.on(enums.status.log, logEventHandler)
q.on(enums.status.active, activeEventHandler)
q.on(enums.status.processing, processingEventHandler)
q.on(enums.status.progress, progressEventHandler)

@@ -76,10 +224,23 @@ q.on(enums.status.completed, completedEventHandler)

q.on(enums.status.terminated, terminatedEventHandler)
q.on(enums.status.paused, pausedEventHandler)
q.on(enums.status.pausing, pausingEventHandler)
q.on(enums.status.resumed, resumedEventHandler)
q.on(enums.status.removed, removedEventHandler)
q.on(enums.status.log, logEventHandler)
q.on(enums.status.updated, updatedEventHandler)
q.on(enums.status.idle, idleEventHandler)
q.on(enums.status.reviewed, reviewedEventHandler)
q.on(enums.status.reset, resetEventHandler)
q.on(enums.status.error, errorEventHandler)
q.on(enums.status.detached, detachedEventHandler)
q.on(enums.status.stopping, stoppingEventHandler)
q.on(enums.status.stopped, stoppedEventHandler)
q.on(enums.status.dropped, droppedEventHandler)
}
function removeEventHandlers () {
testEvents = false
q.testing = false
q.removeListener(enums.status.ready, readyEventHandler)
q.removeListener(enums.status.added, addedEventHandler)
q.removeListener(enums.status.log, logEventHandler)
q.removeListener(enums.status.active, activeEventHandler)
q.removeListener(enums.status.processing, processingEventHandler)
q.removeListener(enums.status.progress, progressEventHandler)

@@ -90,3 +251,16 @@ q.removeListener(enums.status.completed, completedEventHandler)

q.removeListener(enums.status.terminated, terminatedEventHandler)
q.removeListener(enums.status.paused, pausedEventHandler)
q.removeListener(enums.status.pausing, pausingEventHandler)
q.removeListener(enums.status.resumed, resumedEventHandler)
q.removeListener(enums.status.removed, removedEventHandler)
q.removeListener(enums.status.log, logEventHandler)
q.removeListener(enums.status.updated, updatedEventHandler)
q.removeListener(enums.status.idle, idleEventHandler)
q.removeListener(enums.status.reviewed, reviewedEventHandler)
q.removeListener(enums.status.reset, resetEventHandler)
q.removeListener(enums.status.error, errorEventHandler)
q.removeListener(enums.status.detached, detachedEventHandler)
q.removeListener(enums.status.stopping, stoppingEventHandler)
q.removeListener(enums.status.stopped, stoppedEventHandler)
q.removeListener(enums.status.dropped, droppedEventHandler)
return q.resume()

@@ -156,4 +330,30 @@ }

t.ok(resetResult >= 0, 'Queue reset')
return removeEventHandlers()
}).then(() => {
removeEventHandlers()
// ---------- Event summary Test ----------
t.comment('queue: Event Summary')
t.equal(readyEventCount, readyEventTotal, 'Ready event count valid')
t.equal(addedEventCount, addedEventTotal, 'Added event count valid')
t.equal(activeEventCount, activeEventTotal, 'Active event count valid')
t.equal(processingEventCount, processingEventTotal, 'Processing event count valid')
t.equal(progressEventCount, progressEventTotal, 'Progress event count valid')
t.equal(completedEventCount, completedEventTotal, 'Completed event count valid')
t.equal(cancelledEventCount, cancelledEventTotal, 'Cancelled event count valid')
t.equal(failedEventCount, failedEventTotal, 'Failed event count valid')
t.equal(terminatedEventCount, terminatedEventTotal, 'Terminated event count valid')
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid')
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid')
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid')
t.equal(removedEventCount, removedEventTotal, 'Removed event count valid')
t.equal(logEventCount, logEventTotal, 'Log event count valid')
t.equal(updatedEventCount, updatedEventTotal, 'Updated event count valid')
t.equal(idleEventCount, idleEventTotal, 'Idle event count valid')
t.equal(reviewedEventCount, reviewedEventTotal, 'Reviewed event count valid')
t.equal(resetEventCount, resetEventTotal, 'Reset event count valid')
t.equal(errorEventCount, errorEventTotal, 'Error event count valid')
t.equal(detachedEventCount, detachedEventTotal, 'Detached event count valid')
t.equal(stoppingEventCount, stoppingEventTotal, 'Stopping event count valid')
t.equal(stoppedEventCount, stoppedEventTotal, 'Stopped event count valid')
t.equal(droppedEventCount, droppedEventTotal, 'Dropped event count valid')
q.stop()

@@ -160,0 +360,0 @@ return resolve(t.end())

@@ -48,2 +48,3 @@ const test = require('tape')

jobFailed.data = 'Failed'
jobFailed.dateEnable = datetime.add.sec(new Date(), -100)
jobFailed.dateCreated = datetime.add.sec(new Date(), -100)

@@ -50,0 +51,0 @@ const jobActive = q.createJob({priority: 'normal'})

const test = require('tape')
const Promise = require('bluebird')
const enums = require('../src/enums')
const is = require('../src/is')
const tError = require('./test-error')

@@ -15,3 +16,3 @@ const Queue = require('../src/queue')

test('queue-interruption', (t) => {
t.plan(10)
t.plan(19)

@@ -25,15 +26,30 @@ const q = new Queue(tOpts.cxn(), tOpts.default())

let testEvents = false
function pausingEventHandler (qId) {
let pausingEventCount = 0
let pausingEventTotal = 1
function pausingEventHandler (global, queueId) {
pausingEventCount++
if (testEvents) {
t.equal(qId, q.id, `Event: pausing [${qId}]`)
t.pass(`Event: pausing [${pausingEventCount} of ${pausingEventTotal}] `)
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`)
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`)
}
}
function pausedEventHandler (qId) {
let pausedEventCount = 0
let pausedEventTotal = 1
function pausedEventHandler (global, queueId) {
pausedEventCount++
if (testEvents) {
t.equal(qId, q.id, `Event: paused [${qId}]`)
t.pass(`Event: paused [${pausedEventCount} of ${pausedEventTotal}] `)
t.ok(is.boolean(global), `Event: paused [global: ${global}]`)
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`)
}
}
function resumedEventHandler (qId) {
let resumedEventCount = 0
let resumedEventTotal = 1
function resumedEventHandler (global, queueId) {
resumedEventCount++
if (testEvents) {
t.equal(qId, q.id, `Event: resumed [${qId}]`)
t.pass(`Event: resumed [${resumedEventCount} of ${resumedEventTotal}] `)
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`)
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`)
}

@@ -78,2 +94,9 @@ }

removeEventHandlers()
// ---------- Event summary Test ----------
t.comment('queue-interruption: Event Summary')
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid')
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid')
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid')
q.stop()

@@ -80,0 +103,0 @@ return resolve(t.end())

@@ -12,3 +12,3 @@ const test = require('tape')

test('queue', (t) => {
t.plan(110)
t.plan(154)

@@ -33,66 +33,192 @@ let q = new Queue(tOpts.cxn(), tOpts.queueNameOnly())

let testEvents = false
function readyEventHandler (qId) {
let readyEventCount = 0
let readyEventTotal = 0
function readyEventHandler (queueId) {
readyEventCount++
if (testEvents) {
t.ok(is.string(qId), `Event: ready [${qId}]`)
t.ok(is.string(queueId), `Event: ready [${readyEventCount} of ${readyEventTotal}] [${queueId}]`)
}
}
let addedEventCount = 0
let addedEventTotal = 4
function addedEventHandler (jobId) {
addedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: added [${jobId}]`)
t.ok(is.uuid(jobId), `Event: added [${addedEventCount} of ${addedEventTotal}] [${jobId}]`)
}
}
let activeEventCount = 0
let activeEventTotal = 3
function activeEventHandler (jobId) {
activeEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: active [${activeEventCount} of ${activeEventTotal}] [${jobId}]`)
}
}
let processingEventCount = 0
let processingEventTotal = 3
function processingEventHandler (jobId) {
processingEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: processing [${jobId}]`)
t.ok(is.uuid(jobId), `Event: processing [${processingEventCount} of ${processingEventTotal}] [${jobId}]`)
}
}
function pausedEventHandler (qId) {
let progressEventCount = 0
let progressEventTotal = 0
function progressEventHandler (jobId, percent) {
progressEventCount++
if (testEvents) {
t.ok(is.string(qId), `Event: paused [${qId}]`)
t.pass(`Event: progress [${progressEventCount} of ${progressEventTotal}]`)
t.ok(is.uuid(jobId), `Event: progress [${jobId}]`)
t.ok(is.number(percent), `Event: progress [${percent}%]`)
}
}
function resumedEventHandler (qId) {
let completedEventCount = 0
let completedEventTotal = 3
function completedEventHandler (jobId) {
completedEventCount++
if (testEvents) {
t.ok(is.string(qId), `Event: resumed [${qId}]`)
t.ok(is.uuid(jobId), `Event: completed [${completedEventCount} of ${completedEventTotal}] [${jobId}]`)
}
}
let cancelledEventCount = 0
let cancelledEventTotal = 1
function cancelledEventHandler (jobId) {
cancelledEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: cancelled [${cancelledEventCount} of ${cancelledEventTotal}] [${jobId}]`)
}
}
let failedEventCount = 0
let failedEventTotal = 0
function failedEventHandler (jobId) {
failedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: failed [${failedEventCount} of ${failedEventTotal}] [${jobId}]`)
}
}
let terminatedEventCount = 0
let terminatedEventTotal = 0
function terminatedEventHandler (jobId) {
terminatedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: terminated [${terminatedEventCount} of ${terminatedEventTotal}] [${jobId}]`)
}
}
let logEventCount = 0
let logEventTotal = 0
function logEventHandler (jobId) {
logEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: log [${logEventCount} of ${logEventTotal}] [${jobId}]`)
}
}
let updatedEventCount = 0
let updatedEventTotal = 0
function updatedEventHandler (jobId) {
updatedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: updated [${updatedEventCount} of ${updatedEventTotal}] [${jobId}]`)
}
}
let pausingEventCount = 0
let pausingEventTotal = 2
function pausingEventHandler (global, queueId) {
pausingEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: pausing [${pausingEventCount} of ${pausingEventTotal}]`)
t.ok(is.boolean(global), `Event: pausing [global: ${global}]`)
t.ok(is.string(queueId), `Event: pausing [queueId: ${queueId}]`)
}
}
let pausedEventCount = 0
let pausedEventTotal = 2
function pausedEventHandler (global, queueId) {
pausedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: paused [${pausedEventCount} of ${pausedEventTotal}]`)
t.ok(is.boolean(global), `Event: paused [global: ${global}]`)
t.ok(is.string(queueId), `Event: paused [queueId: ${queueId}]`)
}
}
let resumedEventCount = 0
let resumedEventTotal = 1
function resumedEventHandler (global, queueId) {
resumedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: resumed [${resumedEventCount} of ${resumedEventTotal}]`)
t.ok(is.boolean(global), `Event: resumed [global: ${global}]`)
t.ok(is.string(queueId), `Event: resumed [queueId: ${queueId}]`)
}
}
let removedEventCount = 0
let removedEventTotal = 1
function removedEventHandler (jobId) {
removedEventCount++
if (testEvents) {
t.ok(is.uuid(jobId), `Event: removed [${jobId}]`)
t.ok(is.uuid(jobId), `Event: removed [${removedEventCount} of ${removedEventTotal}] [${jobId}]`)
}
}
function idleEventHandler (qId) {
let idleEventCount = 0
let idleEventTotal = 4
function idleEventHandler (queueId) {
idleEventCount++
if (testEvents) {
t.ok(is.string(qId), `Event: idle [${qId}]`)
q.removeListener(enums.status.idle, idleEventHandler)
t.ok(is.string(queueId), `Event: idle [${idleEventCount} of ${idleEventTotal}] [${queueId}]`)
}
}
let reviewedEventCount = 0
let reviewedEventTotal = 0
function reviewedEventHandler (queueId) {
reviewedEventCount++
if (testEvents) {
t.ok(is.string(queueId), `Event: reviewed [${reviewedEventCount} of ${reviewedEventTotal}] [${queueId}]`)
}
}
let resetEventCount = 0
let resetEventTotal = 1
function resetEventHandler (total) {
resetEventCount++
if (testEvents) {
t.ok(is.integer(total), `Event: reset [${total}]`)
t.ok(is.integer(total), `Event: reset [${resetEventCount} of ${resetEventTotal}] [${total}]`)
}
}
let errorEventCount = 0
let errorEventTotal = 3
function errorEventHandler (err) {
errorEventCount++
if (testEvents) {
t.ok(is.string(err.message), `Event: error [${err.message}]`)
t.ok(is.string(err.message), `Event: error [${errorEventCount} of ${errorEventTotal}] [${err.message}]`)
}
}
function detachedEventHandler (qId) {
let detachedEventCount = 0
let detachedEventTotal = 1
function detachedEventHandler (queueId) {
detachedEventCount++
if (testEvents) {
t.ok(is.string(qId), `Event: detached [${qId}]`)
t.ok(is.string(queueId), `Event: detached [${detachedEventCount} of ${detachedEventTotal}] [${queueId}]`)
}
}
function stoppingEventHandler (qId) {
let stoppingEventCount = 0
let stoppingEventTotal = 1
function stoppingEventHandler (queueId) {
stoppingEventCount++
if (testEvents) {
t.ok(is.string(qId), `Event: stopping [${qId}]`)
t.ok(is.string(queueId), `Event: stopping [${stoppingEventCount} of ${stoppingEventTotal}] [${queueId}]`)
}
}
function stoppedEventHandler (qId) {
let stoppedEventCount = 0
let stoppedEventTotal = 1
function stoppedEventHandler (queueId) {
stoppedEventCount++
if (testEvents) {
t.ok(is.string(qId), `Event: stopped [${qId}]`)
t.ok(is.string(queueId), `Event: stopped [${stoppedEventCount} of ${stoppedEventTotal}] [${queueId}]`)
}
}
function droppedEventHandler (qId) {
let droppedEventCount = 0
let droppedEventTotal = 1
function droppedEventHandler (queueId) {
droppedEventCount++
if (testEvents) {
t.ok(is.string(qId), `Event: dropped[${qId}]`)
t.ok(is.string(queueId), `Event: dropped [${droppedEventCount} of ${droppedEventTotal}] [${queueId}]`)
}

@@ -105,7 +231,17 @@ }

q.on(enums.status.added, addedEventHandler)
q.on(enums.status.active, activeEventHandler)
q.on(enums.status.processing, processingEventHandler)
q.on(enums.status.progress, progressEventHandler)
q.on(enums.status.completed, completedEventHandler)
q.on(enums.status.cancelled, cancelledEventHandler)
q.on(enums.status.failed, failedEventHandler)
q.on(enums.status.terminated, terminatedEventHandler)
q.on(enums.status.paused, pausedEventHandler)
q.on(enums.status.pausing, pausingEventHandler)
q.on(enums.status.resumed, resumedEventHandler)
q.on(enums.status.removed, removedEventHandler)
q.on(enums.status.log, logEventHandler)
q.on(enums.status.updated, updatedEventHandler)
q.on(enums.status.idle, idleEventHandler)
q.on(enums.status.reviewed, reviewedEventHandler)
q.on(enums.status.reset, resetEventHandler)

@@ -122,7 +258,17 @@ q.on(enums.status.error, errorEventHandler)

q.removeListener(enums.status.added, addedEventHandler)
q.removeListener(enums.status.active, activeEventHandler)
q.removeListener(enums.status.processing, processingEventHandler)
q.removeListener(enums.status.progress, progressEventHandler)
q.removeListener(enums.status.completed, completedEventHandler)
q.removeListener(enums.status.cancelled, cancelledEventHandler)
q.removeListener(enums.status.failed, failedEventHandler)
q.removeListener(enums.status.terminated, terminatedEventHandler)
q.removeListener(enums.status.paused, pausedEventHandler)
q.removeListener(enums.status.pausing, pausingEventHandler)
q.removeListener(enums.status.resumed, resumedEventHandler)
q.removeListener(enums.status.removed, removedEventHandler)
q.removeListener(enums.status.log, logEventHandler)
q.removeListener(enums.status.updated, updatedEventHandler)
q.removeListener(enums.status.idle, idleEventHandler)
q.removeListener(enums.status.reviewed, reviewedEventHandler)
q.removeListener(enums.status.reset, resetEventHandler)

@@ -362,2 +508,29 @@ q.removeListener(enums.status.error, errorEventHandler)

removeEventHandlers()
// ---------- Event summary Test ----------
t.comment('queue: Event Summary')
t.equal(readyEventCount, readyEventTotal, 'Ready event count valid')
t.equal(addedEventCount, addedEventTotal, 'Added event count valid')
t.equal(activeEventCount, activeEventTotal, 'Active event count valid')
t.equal(processingEventCount, processingEventTotal, 'Processing event count valid')
t.equal(progressEventCount, progressEventTotal, 'Progress event count valid')
t.equal(completedEventCount, completedEventTotal, 'Completed event count valid')
t.equal(cancelledEventCount, cancelledEventTotal, 'Cancelled event count valid')
t.equal(failedEventCount, failedEventTotal, 'Failed event count valid')
t.equal(terminatedEventCount, terminatedEventTotal, 'Terminated event count valid')
t.equal(pausingEventCount, pausingEventTotal, 'Pausing event count valid')
t.equal(pausedEventCount, pausedEventTotal, 'Paused event count valid')
t.equal(resumedEventCount, resumedEventTotal, 'Resumed event count valid')
t.equal(removedEventCount, removedEventTotal, 'Removed event count valid')
t.equal(logEventCount, logEventTotal, 'Log event count valid')
t.equal(updatedEventCount, updatedEventTotal, 'Updated event count valid')
t.equal(idleEventCount, idleEventTotal, 'Idle event count valid')
t.equal(reviewedEventCount, reviewedEventTotal, 'Reviewed event count valid')
t.equal(resetEventCount, resetEventTotal, 'Reset event count valid')
t.equal(errorEventCount, errorEventTotal, 'Error event count valid')
t.equal(detachedEventCount, detachedEventTotal, 'Detached event count valid')
t.equal(stoppingEventCount, stoppingEventTotal, 'Stopping event count valid')
t.equal(stoppedEventCount, stoppedEventTotal, 'Stopped event count valid')
t.equal(droppedEventCount, droppedEventTotal, 'Dropped event count valid')
q.stop()

@@ -364,0 +537,0 @@ q2.stop()

@@ -18,6 +18,7 @@ // const Promise = require('bluebird')

// const jobUpdate = require('./job-update.spec')
// const jobFailed = require('./job-failed.spec')
const jobFailed = require('./job-failed.spec')
// const jobAddLog = require('./job-add-log.spec')
const queue = require('./queue.spec')
// const queue = require('./queue.spec')
// const queueDb = require('./queue-db.spec')
// const queueState = require('./queue-state.spec')
// const queueAddJob = require('./queue-add-job.spec')

@@ -39,3 +40,3 @@ // const queueGetJob = require('./queue-get-job.spec')

}).then(() => {
return queue()
return jobFailed()
})

@@ -22,2 +22,3 @@ const Promise = require('bluebird')

const queueDb = require('./queue-db.spec')
const queueState = require('./queue-state.spec')
const queueAddJob = require('./queue-add-job.spec')

@@ -79,2 +80,4 @@ const queueGetJob = require('./queue-get-job.spec')

}).then(() => {
return queueState()
}).then(() => {
return queueProcess()

@@ -81,0 +84,0 @@ }).then(() => {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc