qonvoy
Advanced tools
Comparing version 0.0.4 to 0.0.5
@@ -14,8 +14,113 @@ export interface Config { | ||
} | ||
/** | ||
* Initialize Qonvoy | ||
* @param c | ||
*/ | ||
export declare function init(c: Config): void; | ||
/** | ||
* Get Client | ||
* returns redis client that powers Qonvoy | ||
*/ | ||
export declare function getClient(): any; | ||
/** | ||
* Get Timestamp | ||
* @param seconds | ||
*/ | ||
export declare function getTimestamp(seconds?: number): number; | ||
/** | ||
* Modify Timestamp | ||
* modifies a timestamp value by x seconds | ||
* @param timestamp | ||
* @param seconds | ||
*/ | ||
export declare function modifyTimestamp(timestamp: number, seconds?: number): number; | ||
/** | ||
* Add | ||
* adds a task to named queue | ||
* @param queue | ||
* @param meta | ||
*/ | ||
export declare function add(queue: string, meta: any): Promise<any>; | ||
/** | ||
* Status | ||
* returns status of job by id | ||
* @param queue | ||
* @param id | ||
*/ | ||
export declare function status(queue: string, id: string): Promise<Item>; | ||
/** | ||
* Process Next | ||
* processes next task from named queue | ||
* @param queue | ||
* @param func | ||
*/ | ||
export declare function processNext(queue: string, func: (item: Item) => Promise<boolean>): Promise<boolean>; | ||
/** | ||
* Process One | ||
* process task by id from named queue | ||
* @param queue | ||
* @param id | ||
* @param func | ||
*/ | ||
export declare function processOne(queue: string, id: string, func: (item: any) => Promise<boolean>): Promise<boolean>; | ||
export declare function reQueue(queue: string, minAge: number): Promise<boolean>; | ||
/** | ||
* Re-Queue | ||
* moves any stuck jobs from the processing queue back to pending | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export declare function reQueue(queue: string, minAge?: number): Promise<boolean>; | ||
/** | ||
* Count Successes | ||
* returns number of current successes from named queue | ||
* @param queue | ||
*/ | ||
export declare function countSuccesses(queue: string): Promise<any>; | ||
/** | ||
* Count Errors | ||
* returns number of current errors from named queue | ||
* @param queue | ||
*/ | ||
export declare function countErrors(queue: string): Promise<any>; | ||
/** | ||
* Report Successes | ||
* returns success log from named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export declare function reportSuccesses(queue: string, minAge?: number): Promise<any>; | ||
/** | ||
* Report Errors | ||
* returns error log from named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export declare function reportErrors(queue: string, minAge?: number): Promise<any>; | ||
/** | ||
* Clear Successes | ||
* clears success log from named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export declare function clearSuccesses(queue: string, minAge?: number): Promise<any>; | ||
/** | ||
* Clear Errors | ||
* clears error log from the named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export declare function clearErrors(queue: string, minAge?: number): Promise<any>; | ||
/** | ||
* Destroy | ||
* manually destroy a task by id | ||
* @param queue | ||
* @param id | ||
*/ | ||
export declare function destroy(queue: string, id: string): Promise<boolean>; | ||
/** | ||
* Retry | ||
* moves a task from the error log to the queue | ||
* @param queue | ||
* @param id | ||
*/ | ||
export declare function retry(queue: string, id: string): Promise<boolean>; |
@@ -15,2 +15,6 @@ "use strict"; | ||
var config = null; | ||
/** | ||
* Initialize Qonvoy | ||
* @param c | ||
*/ | ||
function init(c) { | ||
@@ -21,5 +25,37 @@ config = c; | ||
exports.init = init; | ||
/** | ||
* Get Client | ||
* returns redis client that powers Qonvoy | ||
*/ | ||
function getClient() { | ||
return Connection.client; | ||
} | ||
exports.getClient = getClient; | ||
/** | ||
* Get Timestamp | ||
* @param seconds | ||
*/ | ||
function getTimestamp(seconds = 0) { | ||
return (new Date().getTime()) + (seconds * 1000); | ||
} | ||
exports.getTimestamp = getTimestamp; | ||
/** | ||
* Modify Timestamp | ||
* modifies a timestamp value by x seconds | ||
* @param timestamp | ||
* @param seconds | ||
*/ | ||
function modifyTimestamp(timestamp, seconds = 0) { | ||
return timestamp + (seconds * 1000); | ||
} | ||
exports.modifyTimestamp = modifyTimestamp; | ||
/** | ||
* Add | ||
* adds a task to named queue | ||
* @param queue | ||
* @param meta | ||
*/ | ||
function add(queue, meta) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let timestamp = new Date().getTime(); | ||
let timestamp = getTimestamp(); | ||
let id = uuid(); | ||
@@ -43,2 +79,8 @@ let item = {}; | ||
exports.add = add; | ||
/** | ||
* Status | ||
* returns status of job by id | ||
* @param queue | ||
* @param id | ||
*/ | ||
function status(queue, id) { | ||
@@ -54,6 +96,12 @@ return __awaiter(this, void 0, void 0, function* () { | ||
exports.status = status; | ||
/** | ||
* Process Next | ||
* processes next task from named queue | ||
* @param queue | ||
* @param func | ||
*/ | ||
function processNext(queue, func) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let success = false; | ||
let timestamp = new Date().getTime(); | ||
let timestamp = getTimestamp(); | ||
let result = yield Connection.client | ||
@@ -68,6 +116,13 @@ .eval(LuaCommands.getNext(), 3, `queue:${queue}`, `processing:${queue}`, timestamp + ''); | ||
exports.processNext = processNext; | ||
/** | ||
* Process One | ||
* process task by id from named queue | ||
* @param queue | ||
* @param id | ||
* @param func | ||
*/ | ||
function processOne(queue, id, func) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let success = false; | ||
let timestamp = new Date().getTime(); | ||
let timestamp = getTimestamp(); | ||
let result = yield Connection.client | ||
@@ -82,8 +137,19 @@ .eval(LuaCommands.getOne(), 4, `queue:${queue}`, `processing:${queue}`, timestamp + '', id); | ||
exports.processOne = processOne; | ||
/** | ||
* Re-Queue | ||
* moves any stuck jobs from the processing queue back to pending | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
function reQueue(queue, minAge) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let timestamp = new Date().getTime(); | ||
let cutoffTimestamp = timestamp - (minAge * 1000); | ||
let timestamp; | ||
if (minAge) { | ||
timestamp = minAge; | ||
} | ||
else { | ||
timestamp = getTimestamp(); | ||
} | ||
let result = yield Connection.client | ||
.zrangebyscore(`processing:${queue}`, '-inf', cutoffTimestamp); | ||
.zrangebyscore(`processing:${queue}`, '-inf', timestamp); | ||
if (result) { | ||
@@ -102,7 +168,144 @@ for (let id of result) { | ||
exports.reQueue = reQueue; | ||
/** | ||
* Count Successes | ||
* returns number of current successes from named queue | ||
* @param queue | ||
*/ | ||
function countSuccesses(queue) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let result = yield Connection.client | ||
.zcard(`success:${queue}`); | ||
return result; | ||
}); | ||
} | ||
exports.countSuccesses = countSuccesses; | ||
/** | ||
* Count Errors | ||
* returns number of current errors from named queue | ||
* @param queue | ||
*/ | ||
function countErrors(queue) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let result = yield Connection.client | ||
.zcard(`error:${queue}`); | ||
return result; | ||
}); | ||
} | ||
exports.countErrors = countErrors; | ||
/** | ||
* Report Successes | ||
* returns success log from named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
function reportSuccesses(queue, minAge) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let timestamp; | ||
if (minAge) { | ||
timestamp = minAge; | ||
} | ||
else { | ||
timestamp = getTimestamp(); | ||
} | ||
let result = yield Connection.client | ||
.zrangebyscore(`success:${queue}`, '-inf', timestamp); | ||
return result; | ||
}); | ||
} | ||
exports.reportSuccesses = reportSuccesses; | ||
/** | ||
* Report Errors | ||
* returns error log from named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
function reportErrors(queue, minAge) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let timestamp; | ||
if (minAge) { | ||
timestamp = minAge; | ||
} | ||
else { | ||
timestamp = getTimestamp(); | ||
} | ||
let result = yield Connection.client | ||
.zrangebyscore(`error:${queue}`, '-inf', timestamp); | ||
return result; | ||
}); | ||
} | ||
exports.reportErrors = reportErrors; | ||
/** | ||
* Clear Successes | ||
* clears success log from named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
function clearSuccesses(queue, minAge) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let timestamp; | ||
if (minAge) { | ||
timestamp = minAge; | ||
} | ||
else { | ||
timestamp = getTimestamp(); | ||
} | ||
let result = yield Connection.client | ||
.zremrangebyscore(`success:${queue}`, '-inf', timestamp); | ||
return result; | ||
}); | ||
} | ||
exports.clearSuccesses = clearSuccesses; | ||
/** | ||
* Clear Errors | ||
* clears error log from the named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
function clearErrors(queue, minAge) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let timestamp; | ||
if (minAge) { | ||
timestamp = minAge; | ||
} | ||
else { | ||
timestamp = getTimestamp(); | ||
} | ||
let result = yield Connection.client | ||
.zremrangebyscore(`error:${queue}`, '-inf', timestamp); | ||
return result; | ||
}); | ||
} | ||
exports.clearErrors = clearErrors; | ||
/** | ||
* Destroy | ||
* manually destroy a task by id | ||
* @param queue | ||
* @param id | ||
*/ | ||
function destroy(queue, id) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
yield Connection.client | ||
.multi() | ||
.zrem(`queue:${queue}`, id) | ||
.zrem(`processing:${queue}`, id) | ||
.zrem(`success:${queue}`, id) | ||
.zrem(`error:${queue}`, id) | ||
.hrem(`items:${queue}:${id}`) | ||
.exec(); | ||
return true; | ||
}); | ||
} | ||
exports.destroy = destroy; | ||
/** | ||
* Retry | ||
* moves a task from the error log to the queue | ||
* @param queue | ||
* @param id | ||
*/ | ||
function retry(queue, id) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
let timestamp = new Date().getTime(); | ||
let timestamp = getTimestamp(); | ||
yield Connection.client | ||
.multi() | ||
.persist(`items:${queue}:${id}`) | ||
.zrem(`error:${queue}`, id) | ||
@@ -115,2 +318,8 @@ .zadd(`queue:${queue}`, timestamp, id) | ||
exports.retry = retry; | ||
/** | ||
* Load | ||
* de-serializes from redis hash to Item by id | ||
* @param queue | ||
* @param id | ||
*/ | ||
function load(queue, id) { | ||
@@ -134,2 +343,9 @@ return __awaiter(this, void 0, void 0, function* () { | ||
} | ||
/** | ||
* Run Task | ||
* task running/logging logic | ||
* @param queue | ||
* @param id | ||
* @param func | ||
*/ | ||
function runTask(queue, id, func) { | ||
@@ -144,3 +360,3 @@ return __awaiter(this, void 0, void 0, function* () { | ||
let success = yield func(item); | ||
let completedTimestamp = new Date().getTime(); | ||
let completedTimestamp = getTimestamp(); | ||
if (success) { | ||
@@ -161,3 +377,3 @@ yield Connection.client | ||
catch (err) { | ||
let completedTimestamp = new Date().getTime(); | ||
let completedTimestamp = getTimestamp(); | ||
yield Connection.client | ||
@@ -167,3 +383,4 @@ .multi() | ||
.zadd(`error:${item.queue}`, completedTimestamp, item.id) | ||
.hset(`data:${item.queue}:${item.id}`, 'error', err.message) | ||
.hset(`items:${item.queue}:${item.id}`, 'error', err.message) | ||
.expire(`items:${item.queue}:${item.id}`, config.retention || 60 * 60 * 24) | ||
.exec(); | ||
@@ -170,0 +387,0 @@ } |
{ | ||
"name": "qonvoy", | ||
"version": "0.0.4", | ||
"version": "0.0.5", | ||
"description": "An extremely minimal task runner", | ||
@@ -5,0 +5,0 @@ "main": "./build/qonvoy.js", |
@@ -23,2 +23,6 @@ const uuid = require('uuid/v4') | ||
/** | ||
* Initialize Qonvoy | ||
* @param c | ||
*/ | ||
export function init(c: Config) { | ||
@@ -29,4 +33,36 @@ config = c | ||
/** | ||
* Get Client | ||
* returns redis client that powers Qonvoy | ||
*/ | ||
export function getClient() { | ||
return Connection.client | ||
} | ||
/** | ||
* Get Timestamp | ||
* @param seconds | ||
*/ | ||
export function getTimestamp(seconds: number = 0) { | ||
return (new Date().getTime()) + (seconds * 1000) | ||
} | ||
/** | ||
* Modify Timestamp | ||
* modifies a timestamp value by x seconds | ||
* @param timestamp | ||
* @param seconds | ||
*/ | ||
export function modifyTimestamp(timestamp: number, seconds: number = 0) { | ||
return timestamp + (seconds * 1000) | ||
} | ||
/** | ||
* Add | ||
* adds a task to named queue | ||
* @param queue | ||
* @param meta | ||
*/ | ||
export async function add(queue: string, meta: any) { | ||
let timestamp = new Date().getTime() | ||
let timestamp = getTimestamp() | ||
let id = uuid() | ||
@@ -61,2 +97,8 @@ let item = <Item>{} | ||
/** | ||
* Status | ||
* returns status of job by id | ||
* @param queue | ||
* @param id | ||
*/ | ||
export async function status(queue: string, id: string) { | ||
@@ -72,2 +114,8 @@ let item = await load(queue, id) | ||
/** | ||
* Process Next | ||
* processes next task from named queue | ||
* @param queue | ||
* @param func | ||
*/ | ||
export async function processNext( | ||
@@ -78,3 +126,3 @@ queue: string, | ||
let success: boolean = false | ||
let timestamp = new Date().getTime() | ||
let timestamp = getTimestamp() | ||
let result = await Connection.client | ||
@@ -96,2 +144,9 @@ .eval( | ||
/** | ||
* Process One | ||
* process task by id from named queue | ||
* @param queue | ||
* @param id | ||
* @param func | ||
*/ | ||
export async function processOne( | ||
@@ -103,3 +158,3 @@ queue: string, | ||
let success: boolean = false | ||
let timestamp = new Date().getTime() | ||
let timestamp = getTimestamp() | ||
let result = await Connection.client | ||
@@ -122,7 +177,20 @@ .eval( | ||
export async function reQueue(queue: string, minAge: number) { | ||
let timestamp = new Date().getTime() | ||
let cutoffTimestamp = timestamp - (minAge * 1000) | ||
/** | ||
* Re-Queue | ||
* moves any stuck jobs from the processing queue back to pending | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export async function reQueue(queue: string, minAge?: number) { | ||
let timestamp | ||
if (minAge) { | ||
timestamp = minAge | ||
} | ||
else { | ||
timestamp = getTimestamp() | ||
} | ||
let result = await Connection.client | ||
.zrangebyscore(`processing:${queue}`, '-inf', cutoffTimestamp) | ||
.zrangebyscore(`processing:${queue}`, '-inf', timestamp) | ||
@@ -142,7 +210,145 @@ if (result) { | ||
/** | ||
* Count Successes | ||
* returns number of current successes from named queue | ||
* @param queue | ||
*/ | ||
export async function countSuccesses(queue: string) { | ||
let result = await Connection.client | ||
.zcard(`success:${queue}`) | ||
return result | ||
} | ||
/** | ||
* Count Errors | ||
* returns number of current errors from named queue | ||
* @param queue | ||
*/ | ||
export async function countErrors(queue: string) { | ||
let result = await Connection.client | ||
.zcard(`error:${queue}`) | ||
return result | ||
} | ||
/** | ||
* Report Successes | ||
* returns success log from named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export async function reportSuccesses(queue: string, minAge?: number) { | ||
let timestamp | ||
if (minAge) { | ||
timestamp = minAge | ||
} | ||
else { | ||
timestamp = getTimestamp() | ||
} | ||
let result = await Connection.client | ||
.zrangebyscore(`success:${queue}`, '-inf', timestamp) | ||
return result | ||
} | ||
/** | ||
* Report Errors | ||
* returns error log from named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export async function reportErrors(queue: string, minAge?: number) { | ||
let timestamp | ||
if (minAge) { | ||
timestamp = minAge | ||
} | ||
else { | ||
timestamp = getTimestamp() | ||
} | ||
let result = await Connection.client | ||
.zrangebyscore(`error:${queue}`, '-inf', timestamp) | ||
return result | ||
} | ||
/** | ||
* Clear Successes | ||
* clears success log from named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export async function clearSuccesses(queue: string, minAge?: number) { | ||
let timestamp | ||
if (minAge) { | ||
timestamp = minAge | ||
} | ||
else { | ||
timestamp = getTimestamp() | ||
} | ||
let result = await Connection.client | ||
.zremrangebyscore(`success:${queue}`, '-inf', timestamp) | ||
return result | ||
} | ||
/** | ||
* Clear Errors | ||
* clears error log from the named queue | ||
* @param queue | ||
* @param minAge | ||
*/ | ||
export async function clearErrors(queue: string, minAge?: number) { | ||
let timestamp | ||
if (minAge) { | ||
timestamp = minAge | ||
} | ||
else { | ||
timestamp = getTimestamp() | ||
} | ||
let result = await Connection.client | ||
.zremrangebyscore(`error:${queue}`, '-inf', timestamp) | ||
return result | ||
} | ||
/** | ||
* Destroy | ||
* manually destroy a task by id | ||
* @param queue | ||
* @param id | ||
*/ | ||
export async function destroy(queue: string, id: string) { | ||
await Connection.client | ||
.multi() | ||
.zrem(`queue:${queue}`, id) | ||
.zrem(`processing:${queue}`, id) | ||
.zrem(`success:${queue}`, id) | ||
.zrem(`error:${queue}`, id) | ||
.hrem(`items:${queue}:${id}`) | ||
.exec() | ||
return true | ||
} | ||
/** | ||
* Retry | ||
* moves a task from the error log to the queue | ||
* @param queue | ||
* @param id | ||
*/ | ||
export async function retry(queue: string, id: string) { | ||
let timestamp = new Date().getTime() | ||
let timestamp = getTimestamp() | ||
await Connection.client | ||
.multi() | ||
.persist(`items:${queue}:${id}`) | ||
.zrem(`error:${queue}`, id) | ||
@@ -155,2 +361,8 @@ .zadd(`queue:${queue}`, timestamp, id) | ||
/** | ||
* Load | ||
* de-serializes from redis hash to Item by id | ||
* @param queue | ||
* @param id | ||
*/ | ||
async function load(queue: string, id: string) { | ||
@@ -176,2 +388,9 @@ let data = await Connection.client.hgetall(`items:${queue}:${id}`) | ||
/** | ||
* Run Task | ||
* task running/logging logic | ||
* @param queue | ||
* @param id | ||
* @param func | ||
*/ | ||
async function runTask( | ||
@@ -196,3 +415,3 @@ queue: string, | ||
let success: boolean = await func(item) | ||
let completedTimestamp = new Date().getTime() | ||
let completedTimestamp = getTimestamp() | ||
@@ -222,3 +441,3 @@ if (success) { | ||
catch (err) { | ||
let completedTimestamp = new Date().getTime() | ||
let completedTimestamp = getTimestamp() | ||
@@ -230,6 +449,10 @@ await Connection.client | ||
.hset( | ||
`data:${item.queue}:${item.id}`, | ||
`items:${item.queue}:${item.id}`, | ||
'error', | ||
err.message | ||
) | ||
.expire( | ||
`items:${item.queue}:${item.id}`, | ||
config.retention || 60*60*24 | ||
) | ||
.exec() | ||
@@ -236,0 +459,0 @@ } |
30628
1055