New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

nomoque

Package Overview
Dependencies
Maintainers
1
Versions
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nomoque - npm Package Compare versions

Comparing version 0.1.6 to 0.1.7

106

lib/queue.js

@@ -13,7 +13,7 @@ /**

var QueueState = {
STATE_ERROR: -1,
STATE_NEW: 0,
STATE_SHIFTED: 1,
STATE_PARTIAL: 2,
STATE_FINISHED: 3
STATE_FINISHED: 3,
STATE_ERROR: 4
};

@@ -26,6 +26,9 @@

EventEmitter.call(this);
options = options || {};
this._tasks = {};
this._topics = [];
this._pausedTasks = [];
this.delay = 1;
this.delay = options.delay || 1;
this.serial = options.serial || false;
this.resultExpire = options.resultExpire || 14;
this._state = 'stopped';

@@ -63,3 +66,2 @@ if (options.maxWorkers > 0) {

this.queColl.ensureIndex([
['date', 1],
['topic', 1],

@@ -70,18 +72,19 @@ ['state', 1]

this.resultColl.ensureIndex([
['date', 1],
['topic', 1],
['state', 1],
['created', -1],
['shifted', -1],
['finished', -1]
['shifted', -1]
]);
this.resultColl.ensureIndex({finished: 1}, {expireAfterSeconds: 24 * 3600 * this.resultExpire});
};
/**
* Push a message to a topic queue
* Push a message to a topic queue with optional options.
*
* @param {String} topic
* @param {Object} message
* @param topic {String}
* @param message {Object}
* @param [options] {Object}
*/
Queue.prototype.push = function (topic, message) {
Queue.prototype.push = function (topic, message, options) {
var self = this;

@@ -97,2 +100,7 @@

};
if (options) {
if (!isNaN(options.priority)) {
msg.state = options.priority > 0 ? options.priority * -1 : options.priority;
}
}
self.queColl.insert(msg, {safe: true}).then(function (doc) {

@@ -163,2 +171,7 @@ if (doc) {

Queue.prototype._getTopic = function () {
var idx = getRandomInt(0, this._topics.length - 1);
return this._topics[idx];
};
Queue.prototype._shift = function () {

@@ -170,5 +183,5 @@ if (this._state === 'stopped' || this._currentWorkers++ >= this._maxWorkers) {

// get messages we are interested
this.queColl.findAndModify({topic: {$in: self._topics}, state: QueueState.STATE_NEW},
// use `nature` order
[],
this.queColl.findAndModify({topic: self._getTopic(), state: {$lte: QueueState.STATE_NEW}},
// order by priority
[['state', 1]],
// modify the message status

@@ -218,3 +231,3 @@ {$set: {shifted: new Date(), state: QueueState.STATE_SHIFTED}},

self._log(log, message, done);
debug('Task "%s" "%s" "%s" error: %s.', log.topic, log._id, log.name, log.error);
debug('Task "%s" "%s" "%s" error: %s.', log.topic, log._id, log.name, JSON.stringify(log.error));
} else {

@@ -250,18 +263,19 @@ log.state = QueueState.STATE_FINISHED;

if (++count === tasks.length) {
if (Object.keys(errors).length > 0) {
// errors found
self.resultColl
.update({_id: message._id}, {$set: {state: QueueState.STATE_PARTIAL}}, {safe: true})
.then(function () {
self._emitOrLog('finished', [topic, errors, results]);
// shift next
self._currentWorkers--;
self._shift();
});
var errorCount = Object.keys(errors).length;
var setDoc = {
state: QueueState.STATE_FINISHED
};
if (errorCount > 0) {
setDoc.state = errorCount === count ? QueueState.STATE_ERROR : QueueState.STATE_PARTIAL;
} else {
self._emitOrLog('finished', [topic, errors, results]);
// shift next
self._currentWorkers--;
self._shift();
setDoc.finished = new Date();
}
self.resultColl
.update({_id: message._id}, {$set: setDoc}, {safe: true})
.then(function () {
self._emitOrLog('finished', [topic, errors, results]);
// shift next
self._currentWorkers--;
self._shift();
});
}

@@ -277,7 +291,21 @@ }

// run each tasks
tasks.forEach(function (task) {
self._runTask(task, message, errors, results, checkFinish);
});
if (this.serial) {
var len = tasks.length;
var _runTask = function () {
if (count < len) {
self._runTask(tasks[count], message, errors, results, function () {
checkFinish();
_runTask();
});
} else {
checkFinish();
}
};
_runTask();
} else {
// run each tasks in parallel
tasks.forEach(function (task) {
self._runTask(task, message, errors, results, checkFinish);
});
}
};

@@ -287,6 +315,2 @@

var logName = 'logs.' + log.name;
var setDoc = {
topic: log.topic,
state: log.state
};
var pushDoc = {};

@@ -305,3 +329,3 @@ pushDoc[logName] = {

var deferred = this.resultColl
.update({_id: log._id}, {$set: setDoc, $push: pushDoc}, {upsert: true, safe: true})
.update({_id: log._id}, {$push: pushDoc}, {upsert: true, safe: true})
.fail(this._onFault(this));

@@ -409,1 +433,5 @@ if (log.error) {

}
function getRandomInt(min, max) {
return Math.floor(Math.random() * (max - min + 1) + min);
}
{
"name": "nomoque",
"version": "0.1.6",
"version": "0.1.7",
"description": "Database Centric Distributed Task Queue",
"keywords": ["task", "queue", "database", "distributed", "persistent", "mongodb"],
"keywords": ["task", "queue", "persistent", "priority", "database", "distributed", "mongodb"],
"homepage": "https://github.com/lsm/nomoque",

@@ -23,2 +23,2 @@ "author": "Senmiao Liu <senmiao.liu@gmail.com>",

}
}
}

@@ -9,2 +9,3 @@ var assert = require('assert');

describe('Queue', function () {
this.timeout(10000);

@@ -68,2 +69,3 @@ it('should create an instance of Queue', function () {

worker.process('topicA', 'add', function (message, callback) {
worker.resume('topicB');
assert.equal(1, message.a);

@@ -74,2 +76,4 @@ assert.equal(2, message.b);

});
worker.pause('topicB');
worker.start();

@@ -76,0 +80,0 @@ });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc