New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

nomoque

Package Overview
Dependencies
Maintainers
1
Versions
7
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nomoque - npm Package Compare versions

Comparing version 0.1.5 to 0.1.6

18

index.js

@@ -37,3 +37,2 @@ /**

var db = server.db(options.dbName);
ensureIndexes(db);
return new Queue(db, options);

@@ -54,18 +53,1 @@ };

}
function ensureIndexes(db) {
db.collection('queues').ensureIndex([
['date', 1],
['topic', 1],
['state', 1]
]);
// index for `results`
db.collection('results').ensureIndex([
['date', 1],
['topic', 1],
['state', 1],
['created', -1],
['shifted', -1],
['finished', -1]
]);
}

113

lib/queue.js

@@ -27,5 +27,10 @@ /**

this._topics = [];
this._pausedTasks = [];
this.delay = 1;
this._state = 'stopped';
this._maxWorkers = 2;
if (options.maxWorkers > 0) {
this._maxWorkers = options.maxWorkers;
} else {
this._maxWorkers = 2;
}
this._currentWorkers = 0;

@@ -46,2 +51,4 @@ this.db = db;

this.configColl = db.collection(configColl);
this.ensureIndexes();
};

@@ -54,2 +61,19 @@

Queue.prototype.ensureIndexes = function () {
this.queColl.ensureIndex([
['date', 1],
['topic', 1],
['state', 1]
]);
// index for `results`
this.resultColl.ensureIndex([
['date', 1],
['topic', 1],
['state', 1],
['created', -1],
['shifted', -1],
['finished', -1]
]);
};
/**

@@ -66,3 +90,3 @@ * Push a message to a topic queue

var qid = nextIdDoc.currentId;
var date = new Date;
var date = new Date();
var msg = {_id: qid, topic: topic, message: message,

@@ -89,4 +113,4 @@ state: QueueState.STATE_NEW,

*
* @param {String} topic Topic of the queue
* @param {String} name Name of the task
* @param topic {String} Topic of the queue
* @param name {String} Name of the task
* @param {Function} fn

@@ -125,3 +149,3 @@ */

var id = message._id;
delete message['_id'];
delete message._id;
// upsert the message into `toColl`

@@ -134,2 +158,3 @@ return toColl.update({_id: id}, {$set: message}, {safe: true, upsert: true})

message._id = id;
debug('Message "%s" "%s" moved from "%s" to "%s".', message.topic, id, fromColl.name, toColl.name);
defer.next(message);

@@ -150,3 +175,3 @@ }).fail(defer.error);

// modify the message status
{$set: {shifted: new Date, state: QueueState.STATE_SHIFTED}},
{$set: {shifted: new Date(), state: QueueState.STATE_SHIFTED}},
// return updated message

@@ -158,2 +183,3 @@ {remove: false, 'new': true, upsert: false}).then(function (message) {

}
debug('Message "%s" shifted from queue.', message._id);
// handle shifted message

@@ -184,2 +210,3 @@ // move the message from `queues` to collection `results`

};
debug('Processing task "%s" "%s" "%s".', log.topic, log._id, log.name);
try {

@@ -191,5 +218,6 @@ task.fn.call(message, message.message, function (taskErr, result) {

log.state = QueueState.STATE_ERROR;
log.error = taskErr;
log.error = taskErr.stack || taskErr.message || taskErr;
errors[task.name] = taskErr;
self._log(log, message, done);
debug('Task "%s" "%s" "%s" error: %s.', log.topic, log._id, log.name, log.error);
} else {

@@ -203,2 +231,3 @@ log.state = QueueState.STATE_FINISHED;

self._log(log, message, done);
debug('Task "%s" "%s" "%s" finished: %s.', log.topic, log._id, log.name, log.result || '(no result)');
}

@@ -211,2 +240,3 @@ });

self._log(log, message, done);
debug('Task "%s" "%s" "%s" exception: %s.', log.topic, log._id, log.name, log.error);
}

@@ -216,3 +246,4 @@ };

Queue.prototype._process = function (message) {
var tasks = this._tasks[message.topic];
var topic = message.topic;
var tasks = this._tasks[topic];
var self = this;

@@ -222,3 +253,3 @@ var count = 0;

var errors = {};
debug('Processing message %s for topic %s.', message._id, topic);
function checkFinish() {

@@ -231,3 +262,3 @@ if (++count === tasks.length) {

.then(function () {
self._emitOrLog('finished', [message.topic, errors, results]);
self._emitOrLog('finished', [topic, errors, results]);
// shift next

@@ -238,3 +269,3 @@ self._currentWorkers--;

} else {
self._emitOrLog('finished', [message.topic, errors, results]);
self._emitOrLog('finished', [topic, errors, results]);
// shift next

@@ -247,2 +278,10 @@ self._currentWorkers--;

// filter paused tasks
if (this._pausedTasks.length > 0) {
tasks = tasks.filter(function (task) {
return self._pausedTasks.indexOf(topic + '-' + task.name) === -1;
});
}
// run each tasks

@@ -273,3 +312,3 @@ tasks.forEach(function (task) {

var deferred = this.resultColl
.update({_id: log._id}, {$set:setDoc, $push: pushDoc}, {upsert: true, safe: true})
.update({_id: log._id}, {$set: setDoc, $push: pushDoc}, {upsert: true, safe: true})
.fail(this._onFault(this));

@@ -303,3 +342,3 @@ if (log.error) {

} else {
debug("Event %s: %s", event, JSON.stringify(args));
debug('Event "%s": %s', event, JSON.stringify(args));
}

@@ -309,2 +348,3 @@ };

Queue.prototype.start = function (workers) {
debug('Start processing queue.');
this._state = 'running';

@@ -320,2 +360,3 @@ if (workers) {

Queue.prototype.stop = function () {
debug('Stop processing queue.');
this._state = 'stopped';

@@ -329,2 +370,46 @@ };

/**
* Pause processing a topic or task under that topic.
*
* @param topic {String} Topic that you want to pause processing.
* @param [task] {String} Specific task for the topic you want to pause.
* @public
*/
Queue.prototype.pause = function (topic, task) {
if (task) {
topic += '-' + task;
if (-1 === this._pausedTasks.indexOf(topic)) {
this._pausedTasks.push(topic);
}
} else {
this._topics = this._topics.filter(function (t) {
return topic !== t;
});
}
return this;
};
/**
* Resume processing a topic or task under that topic.
*
* @param topic {String} Topic that you want to resume processing.
* @param [task] {String} Specific task for the topic you want to resume.
* @public
*/
Queue.prototype.resume = function (topic, task) {
if (task) {
topic += '-' + task;
this._pausedTasks = this._pausedTasks.filter(function (task) {
return task !== topic;
});
} else {
if (-1 === this._topics.indexOf(topic)) {
this._topics.push(topic);
}
}
return this;
};
Queue.QueueState = QueueState;

@@ -334,2 +419,2 @@

return coll.findAndModify({_id: 'nextQueueId'}, [], {$inc: {currentId: 1}}, {'new': true, upsert: 'true'});
}
}
{
"name": "nomoque",
"version": "0.1.5",
"version": "0.1.6",
"description": "Database Centric Distributed Task Queue",

@@ -5,0 +5,0 @@ "keywords": ["task", "queue", "database", "distributed", "persistent", "mongodb"],

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc