celery-client
Advanced tools
Comparing version 0.0.2 to 0.1.0
@@ -5,2 +5,4 @@ 'use strict'; | ||
var DEFAULT_QUEUE = 'celery'; | ||
var nope = function() {}; | ||
@@ -30,3 +32,3 @@ | ||
taskOptions.priority = taskOptions.priority || 0; | ||
taskOptions.queue = taskOptions.queue || 'celery'; | ||
taskOptions.queue = taskOptions.queue || DEFAULT_QUEUE; | ||
@@ -48,3 +50,2 @@ taskOptions.deliveryMode = 2; | ||
* @param {Object} [defaultTaskOptions.retries=0] - Task retry times | ||
* @param {Object} [defaultTaskOptions.expires=null] - Task expires | ||
* @param {Object} [defaultTaskOptions.timeLimit=null] - Task time limit (in seconds) | ||
@@ -62,7 +63,12 @@ * @param {Object} [defaultTaskOptions.softTimeLimit=null] - Task time limit (soft, in seconds) | ||
self._defaultTaskOptions = canonicalizeTaskOptions(defaultTaskOptions) || {}; | ||
if (self._brokerHandler) self.broker = self._brokerHandler._handler; | ||
if (self._backendHandler) self.backend = self._backendHandler._handler; | ||
}; | ||
Client.prototype.putTask = function(task, args, kwargs, taskOptions, callback) { | ||
Client.prototype.putTask = function(task, args, kwargs, taskOptions, callback, onResultCallback) { | ||
var self = this; | ||
args = args || []; | ||
kwargs = kwargs || {}; | ||
taskOptions = canonicalizeTaskOptions(taskOptions) || {}; | ||
@@ -73,4 +79,3 @@ | ||
mergedTaskOptions = mergedTaskOptions; | ||
self._backendHandler.onResult(mergedTaskOptions.id, onResultCallback || nope); | ||
self._brokerHandler.putTask(task, args, kwargs, mergedTaskOptions, callback || nope); | ||
@@ -91,4 +96,23 @@ }; | ||
Client.prototype.listQueued = function(queue, callback) { | ||
var self = this; | ||
queue = queue || DEFAULT_QUEUE; | ||
self._brokerHandler.listQueued(queue, callback || nope); | ||
}; | ||
Client.prototype.listScheduled = function(callback) { | ||
var self = this; | ||
self._brokerHandler.listScheduled(callback || nope); | ||
}; | ||
Client.prototype.listRecent = function(callback) { | ||
var self = this; | ||
self._backendHandler.listRecent(callback || nope); | ||
}; | ||
exports.Client = Client; | ||
exports.RedisHandler = require('./handlers/redisHandler'); |
@@ -20,6 +20,6 @@ 'use strict'; | ||
var embed = { | ||
chord : null, | ||
callbacks: null, | ||
errbacks : null, | ||
chain : null, | ||
'chord' : null, | ||
'callbacks': null, | ||
'errbacks' : null, | ||
'chain' : null, | ||
}; | ||
@@ -29,33 +29,33 @@ var body = JSON.stringify([args, kwargs, embed]); | ||
var message = { | ||
"body": body, | ||
"headers": { | ||
"lang" : "py", // [Fixed value] ?? | ||
"task" : task, // Task name in Celery | ||
"id" : taskOptions.id, // Task ID | ||
"root_id" : taskOptions.id, // Same to `headers.id`. For chained task tracing | ||
"parent_id": null, // [Fixed value] For chained task tracing | ||
"group" : null, // [Fixed value] For paellel tasks | ||
'body': body, | ||
'headers': { | ||
'lang' : 'py', // [Fixed value] ?? | ||
'task' : task, // Task name in Celery | ||
'id' : taskOptions.id, // Task ID | ||
'root_id' : taskOptions.id, // Same to `headers.id`. For chained task tracing | ||
'parent_id': null, // [Fixed value] For chained task tracing | ||
'group' : null, // [Fixed value] For paellel tasks | ||
"eta" : taskOptions.eta, // ETA (ISO8601, e.g. 2017-08-29T12:47:00.000Z) | ||
"expires" : taskOptions.expires, // Expire time (ISO8601, e.g. 2017-08-29T12:47:00.000Z) | ||
"retries" : taskOptions.retries, // Retry times | ||
"timelimit": [ | ||
'eta' : taskOptions.eta, // ETA (ISO8601, e.g. 2017-08-29T12:47:00.000Z) | ||
'expires' : taskOptions.expires, // Expire time (ISO8601, e.g. 2017-08-29T12:47:00.000Z) | ||
'retries' : taskOptions.retries, // Retry times | ||
'timelimit': [ | ||
taskOptions.timeLimit, // Time limit (in seconds) | ||
taskOptions.softTimeLimit, // Soft time limit (raise Exception, in seconds) | ||
], | ||
"origin" : taskOptions.origin, // Senders name | ||
'origin' : taskOptions.origin, // Senders name | ||
}, | ||
"properties": { | ||
"priority" : taskOptions.priority, // Task priority | ||
"correlation_id": taskOptions.id, // Same to `headers.id` | ||
"reply_to" : null, | ||
"delivery_info" : { | ||
"routing_key": taskOptions.queue, // Queue name | ||
"exchange" : null | ||
'properties': { | ||
'priority' : taskOptions.priority, // Task priority | ||
'correlation_id': taskOptions.id, // Same to `headers.id` | ||
'reply_to' : null, | ||
'delivery_info' : { | ||
'routing_key': taskOptions.queue, // Queue name | ||
'exchange' : null | ||
}, | ||
"delivery_mode" : taskOptions.deliveryMode, // Fixed value (1: Non-persistent, 2: Persistent) | ||
"delivery_tag" : taskOptions.deliveryTag, // ?? | ||
'delivery_mode' : taskOptions.deliveryMode, // Fixed value (1: Non-persistent, 2: Persistent) | ||
'delivery_tag' : taskOptions.deliveryTag, // ?? | ||
}, | ||
"content-type" : "application/json", // [Fixed value] Content type | ||
"content-encoding": "utf-8" // [Fixed value] Content encoding | ||
'content-type' : 'application/json', // [Fixed value] Content type | ||
'content-encoding': 'utf-8' // [Fixed value] Content encoding | ||
}; | ||
@@ -95,3 +95,3 @@ | ||
var pushFunc = taskOptions.priority > 0 'rpush' : 'lpush'; | ||
var pushFunc = taskOptions.priority > 0 ? 'rpush' : 'lpush'; | ||
self._handler[pushFunc](targetQueue, taskToSend, function(err) { | ||
@@ -138,2 +138,59 @@ if (err) return callback(err); | ||
RedisHandler.prototype.listQueued = function(queue, callback) { | ||
var self = this; | ||
self._handler.lrange(queue, 0, -1, function(err, result) { | ||
if (err) return callback(err); | ||
for (var i = 0; i < result.length; i++) { | ||
result[i] = JSON.parse(result[i]); | ||
result[i].body = result[i].body ? JSON.parse(result[i].body) : null; | ||
} | ||
return callback(null, result); | ||
}); | ||
}; | ||
RedisHandler.prototype.listScheduled = function(callback) { | ||
var self = this; | ||
self._handler.hgetall('unacked', function(err, taskMap) { | ||
if (err) return callback(err); | ||
self._handler.zrange('unacked_index', 0, -1, 'withscores', function(err, tasks) { | ||
if (err) return callback(err); | ||
var result = []; | ||
for (var i = 0; i < tasks.length; i += 2) { | ||
var taskId = tasks[i]; | ||
var t = JSON.parse(taskMap[taskId])[0]; | ||
t.body = t.body ? JSON.parse(t.body) : null; | ||
result.push(t); | ||
} | ||
return callback(null, result); | ||
}); | ||
}); | ||
}; | ||
RedisHandler.prototype.listRecent = function(callback) { | ||
var self = this; | ||
self._handler.keys(self.taskKeyPrefix + '*', function(err, metaTaskIds) { | ||
if (err) return callback(err); | ||
self._handler.mget(metaTaskIds, function(err, result) { | ||
if (err) return callback(err); | ||
for (var i = 0; i < result.length; i++) { | ||
result[i] = JSON.parse(result[i]); | ||
} | ||
return callback(null, result); | ||
}); | ||
}); | ||
}; | ||
module.exports = RedisHandler; |
{ | ||
"name": "celery-client", | ||
"version": "0.0.2", | ||
"version": "0.1.0", | ||
"description": "A client for Celery", | ||
@@ -5,0 +5,0 @@ "main": "celery-client.js", |
12908
222