@forrestjs/service-fetchq-task
Advanced tools
Comparing version 5.0.0-alpha.6 to 5.0.0-alpha.7
{ | ||
"name": "@forrestjs/service-fetchq-task", | ||
"description": "Simple API to run singleton tasks over a Fetchq queue.", | ||
"version": "5.0.0-alpha.6", | ||
"version": "5.0.0-alpha.7", | ||
"author": "Marco Pegoraro <marco.pegoraro@gmail.com", | ||
@@ -11,3 +11,3 @@ "main": "src/index", | ||
"dependencies": { | ||
"@forrestjs/core": "^5.0.0-alpha.6" | ||
"@forrestjs/core": "^5.0.0-alpha.7" | ||
}, | ||
@@ -30,3 +30,3 @@ "homepage": "https://github.com/forrestjs/forrestjs/tree/master/packages/service-fetchq-task#readme", | ||
}, | ||
"gitHead": "c1f48938eb0fe96df29eb184733a4410c411ce8f" | ||
"gitHead": "aba9a3ffeec394f202b01b5d18a26cadd8ddb9e3" | ||
} |
106
README.md
@@ -77,2 +77,4 @@ # Fetchq Task | ||
```js | ||
// Declarative form: | ||
// you can return one single task, or an array of tasks | ||
const myFeature = () => [ | ||
@@ -92,3 +94,27 @@ { | ||
} | ||
] | ||
]; | ||
// Functional form: | ||
// you can return one single task, or an array of tasks | ||
const myFeature = () => [ | ||
{ | ||
target: "$FETCHQ_REGISTER_TASK", | ||
handler: [ | ||
{ | ||
// Document in the tasks' queue: | ||
subject: "cqrs-todos", | ||
payload: { target: "todos" }, | ||
// Worker for this specific task: | ||
handler: (doc, ctx) => { | ||
console.log("cqrs-todos", doc.payload); | ||
return doc.reschedule("+1s"); | ||
} | ||
}, | ||
{ | ||
subject: 'foobar', | ||
handler: d => d.complete() | ||
} | ||
] | ||
} | ||
]; | ||
``` | ||
@@ -98,4 +124,82 @@ | ||
> `subject` and `handler` are mandatory. | ||
### subject | ||
type: `String` | ||
### payload | ||
type: `Object` | ||
### firstIteration | ||
type: `Time (absolute or relative)` | ||
Delay the first execution of the task. | ||
```js | ||
{ | ||
firstIteration: '+1h', | ||
firstIteration: '1970-01-01 10:22', | ||
} | ||
``` | ||
### nextIteration | ||
type: `Time (absolute or relative)` | ||
If provided, it schedules the task for a next execution when the handler completes returning `undefined`. | ||
```js | ||
{ | ||
firstIteration: '+1h', | ||
firstIteration: '1970-01-01 10:22', | ||
} | ||
``` | ||
### handler | ||
type: `Function` | ||
args: `doc`, `ctx` | ||
Provide the logic to perform for the task. | ||
👉 [Refer to the Fetchq documentation for details on the arguments and returning value](https://github.com/fetchq/node-client#the-handler-function). | ||
The hander can return a valid [Fetchq Action](https://github.com/fetchq/node-client#returning-actions), or simply skip returning. | ||
In case of returning `undefined`, the task will be rescheduled according to the `nextIteration` setting. | ||
In case `nextIteration` was not provided, the task will be marked as completed (single execution mode). | ||
### resetOnBoot | ||
type: `Boolean` | ||
Set it to `true` and the task will be completely reset at boot time. | ||
## APIs | ||
### Run a Task | ||
You can programmatically run any task immediately: | ||
```js | ||
const run = getContext('fetchq.task.run'); | ||
await run('taskSubject', 'log info message') | ||
``` | ||
> The log message is optional. | ||
### Reset a Task | ||
You can programmatically reset any task to its original state: | ||
```js | ||
const reset = getContext('fetchq.task.reset'); | ||
await reset('taskSubject', 'log info message') | ||
``` | ||
> The log message is optional. |
@@ -1,9 +0,9 @@ | ||
const onInitService = require('./on-init-service'); | ||
const onFetchqReady = require('./on-fetchq-ready'); | ||
const onFetchqRegisterQueue = require('./on-fetchq-register-queue'); | ||
const onFetchqRegisterWorker = require('./on-fetchq-register-worker'); | ||
const onInitService = require("./on-init-service"); | ||
const onFetchqReady = require("./on-fetchq-ready"); | ||
const onFetchqRegisterQueue = require("./on-fetchq-register-queue"); | ||
const onFetchqRegisterWorker = require("./on-fetchq-register-worker"); | ||
const service = { | ||
name: 'fetchq-task', | ||
trace: __filename, | ||
name: "fetchq-task", | ||
trace: __filename | ||
}; | ||
@@ -13,3 +13,3 @@ | ||
registerTargets({ | ||
FETCHQ_REGISTER_TASK: 'fetchq/task/register', | ||
FETCHQ_REGISTER_TASK: "fetchq/task/register" | ||
}); | ||
@@ -20,21 +20,21 @@ | ||
...service, | ||
target: '$INIT_SERVICE', | ||
handler: onInitService, | ||
target: "$INIT_SERVICE", | ||
handler: onInitService | ||
}, | ||
{ | ||
...service, | ||
target: '$FETCHQ_REGISTER_QUEUE', | ||
handler: onFetchqRegisterQueue, | ||
target: "$FETCHQ_REGISTER_QUEUE", | ||
handler: onFetchqRegisterQueue | ||
}, | ||
{ | ||
...service, | ||
target: '$FETCHQ_READY', | ||
handler: onFetchqReady, | ||
target: "$FETCHQ_READY", | ||
handler: onFetchqReady | ||
}, | ||
{ | ||
...service, | ||
target: '$FETCHQ_REGISTER_WORKER', | ||
handler: onFetchqRegisterWorker, | ||
}, | ||
target: "$FETCHQ_REGISTER_WORKER", | ||
handler: onFetchqRegisterWorker | ||
} | ||
]; | ||
}; |
/** | ||
* Push into the tasks queue all the registered tasks | ||
*/ | ||
module.exports = async ({ fetchq }, { getContext }) => { | ||
const queueName = getContext('fetchq.task.queueName'); | ||
for (const { subject, payload, resetOnBoot } of getContext( | ||
'fetchq.task.register', | ||
)) { | ||
module.exports = async ({ fetchq }, { getContext, log }) => { | ||
const queueName = getContext("fetchq.task.queueName"); | ||
const resetTask = getContext("fetchq.task.reset"); | ||
for (const { | ||
subject, | ||
payload = {}, | ||
firstIteration = "now", | ||
resetOnBoot | ||
} of getContext("fetchq.task.register")) { | ||
// Schedule the task: | ||
log.info(`[fetchq-task] Schedule "${subject}" at "${firstIteration}"`); | ||
await fetchq.doc.push(queueName, { | ||
subject, | ||
payload, | ||
...(firstIteration === "now" ? {} : { nextIteration: firstIteration }) | ||
}); | ||
if (resetOnBoot) { | ||
const encodedPayload = JSON.stringify(payload || {}).replace( | ||
/'/g, | ||
"''''", | ||
// ResetOnBoot the task: | ||
if (resetOnBoot) | ||
await resetTask( | ||
subject, | ||
`Reset on Boot "${subject}" at "${firstIteration}"` | ||
); | ||
const sql = ` | ||
UPDATE fetchq_data.${queueName}__docs SET | ||
"status" = 1, | ||
"attempts" = 0, | ||
"iterations" = 0, | ||
"created_at" = now(), | ||
"last_iteration" = NULL, | ||
"next_iteration" = now(), | ||
"payload" = $1 | ||
WHERE "subject" = $2; | ||
`; | ||
await fetchq.pool.query(sql, [encodedPayload, subject]); | ||
} | ||
} | ||
}; |
@@ -0,1 +1,4 @@ | ||
const taskReset = require("./task-reset"); | ||
const taskRun = require("./task-run"); | ||
/** | ||
@@ -8,9 +11,10 @@ * Collect tasks definition from: | ||
*/ | ||
module.exports = ({ createExtension, getConfig, setContext }) => { | ||
const queueName = getConfig('fetchq.task.queue.name', 'task'); | ||
const queueSettings = getConfig('fetchq.task.queue.settings', {}); | ||
const workerSettings = getConfig('fetchq.task.worker.settings', {}); | ||
module.exports = (ctx) => { | ||
const { log, createExtension, getConfig, setContext, getContext } = ctx; | ||
const queueName = getConfig("fetchq.task.queue.name", "task"); | ||
const queueSettings = getConfig("fetchq.task.queue.settings", {}); | ||
const workerSettings = getConfig("fetchq.task.worker.settings", {}); | ||
// Ensure a list of tasks is provided | ||
let configuredTasks = getConfig('fetchq.task.register', []); | ||
let configuredTasks = getConfig("fetchq.task.register", []); | ||
if (!Array.isArray(configuredTasks)) { | ||
@@ -26,3 +30,9 @@ configuredTasks = [configuredTasks]; | ||
// Collect from other extensions | ||
...createExtension.sync('$FETCHQ_REGISTER_TASK').map(($) => $[0]), | ||
...createExtension | ||
.sync("$FETCHQ_REGISTER_TASK") | ||
.map(($) => $[0]) | ||
// Support array form | ||
.reduce((acc, curr) => { | ||
return [...acc, ...(Array.isArray(curr) ? curr : [curr])]; | ||
}, []) | ||
]; | ||
@@ -37,3 +47,3 @@ | ||
setContext('fetchq.task', { | ||
setContext("fetchq.task", { | ||
queueName, | ||
@@ -43,3 +53,5 @@ queueSettings, | ||
register: registerTasks, | ||
reset: (subject, msg) => taskReset(subject, msg, ctx), | ||
run: (subject, msg) => taskRun(subject, msg, ctx) | ||
}); | ||
}; |
/** | ||
* Matches the task definition from memory and executes it. | ||
*/ | ||
module.exports = (doc, ctx) => { | ||
module.exports = async (doc, ctx) => { | ||
const task = ctx | ||
@@ -13,3 +13,12 @@ .getContext("fetchq.task.register") | ||
return task.handler(doc, ctx); | ||
// Execute and honor the returning action: | ||
const result = await task.handler(doc, ctx); | ||
if (result) return result; | ||
// Use the configuration for setting up next execution: | ||
if (task.nextIteration) { | ||
return doc.reschedule(task.nextIteration); | ||
} | ||
return doc.complete(); | ||
}; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
12440
12
204
203