Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@forrestjs/service-fetchq-task

Package Overview
Dependencies
Maintainers
1
Versions
57
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@forrestjs/service-fetchq-task - npm Package Compare versions

Comparing version 5.0.0-alpha.6 to 5.0.0-alpha.7

src/task-reset.js

6

package.json
{
"name": "@forrestjs/service-fetchq-task",
"description": "Simple API to run singleton tasks over a Fetchq queue.",
"version": "5.0.0-alpha.6",
"version": "5.0.0-alpha.7",
"author": "Marco Pegoraro <marco.pegoraro@gmail.com",

@@ -11,3 +11,3 @@ "main": "src/index",

"dependencies": {
"@forrestjs/core": "^5.0.0-alpha.6"
"@forrestjs/core": "^5.0.0-alpha.7"
},

@@ -30,3 +30,3 @@ "homepage": "https://github.com/forrestjs/forrestjs/tree/master/packages/service-fetchq-task#readme",

},
"gitHead": "c1f48938eb0fe96df29eb184733a4410c411ce8f"
"gitHead": "aba9a3ffeec394f202b01b5d18a26cadd8ddb9e3"
}

@@ -77,2 +77,4 @@ # Fetchq Task

```js
// Declarative form:
// you can return one single task, or an array of tasks
const myFeature = () => [

@@ -92,3 +94,27 @@ {

}
]
];
// Functional form:
// you can return one single task, or an array of tasks
const myFeature = () => [
{
target: "$FETCHQ_REGISTER_TASK",
handler: [
{
// Document in the tasks' queue:
subject: "cqrs-todos",
payload: { target: "todos" },
// Worker for this specific task:
handler: (doc, ctx) => {
console.log("cqrs-todos", doc.payload);
return doc.reschedule("+1s");
}
},
{
subject: 'foobar',
handler: d => d.complete()
}
]
}
];
```

@@ -98,4 +124,82 @@

> `subject` and `handler` are mandatory.
### subject
type: `String`
### payload
type: `Object`
### firstIteration
type: `Time (absolute or relative)`
Delay the first execution of the task.
```js
{
firstIteration: '+1h',
firstIteration: '1970-01-01 10:22',
}
```
### nextIteration
type: `Time (absolute or relative)`
If provided, it schedules the task for a next execution when the handler completes returning `undefined`.
```js
{
firstIteration: '+1h',
firstIteration: '1970-01-01 10:22',
}
```
### handler
type: `Function`
args: `doc`, `ctx`
Provide the logic to perform for the task.
👉 [Refer to the Fetchq documentation for details on the arguments and returning value](https://github.com/fetchq/node-client#the-handler-function).
The hander can return a valid [Fetchq Action](https://github.com/fetchq/node-client#returning-actions), or simply skip returning.
In case of returning `undefined`, the task will be rescheduled according to the `nextIteration` setting.
In case `nextIteration` was not provided, the task will be marked as completed (single execution mode).
### resetOnBoot
type: `Boolean`
Set it to `true` and the task will be completely reset at boot time.
## APIs
### Run a Task
You can programmatically run any task immediately:
```js
const run = getContext('fetchq.task.run');
await run('taskSubject', 'log info message')
```
> The log message is optional.
### Reset a Task
You can programmatically reset any task to its original state:
```js
const reset = getContext('fetchq.task.reset');
await reset('taskSubject', 'log info message')
```
> The log message is optional.

@@ -1,9 +0,9 @@

const onInitService = require('./on-init-service');
const onFetchqReady = require('./on-fetchq-ready');
const onFetchqRegisterQueue = require('./on-fetchq-register-queue');
const onFetchqRegisterWorker = require('./on-fetchq-register-worker');
const onInitService = require("./on-init-service");
const onFetchqReady = require("./on-fetchq-ready");
const onFetchqRegisterQueue = require("./on-fetchq-register-queue");
const onFetchqRegisterWorker = require("./on-fetchq-register-worker");
const service = {
name: 'fetchq-task',
trace: __filename,
name: "fetchq-task",
trace: __filename
};

@@ -13,3 +13,3 @@

registerTargets({
FETCHQ_REGISTER_TASK: 'fetchq/task/register',
FETCHQ_REGISTER_TASK: "fetchq/task/register"
});

@@ -20,21 +20,21 @@

...service,
target: '$INIT_SERVICE',
handler: onInitService,
target: "$INIT_SERVICE",
handler: onInitService
},
{
...service,
target: '$FETCHQ_REGISTER_QUEUE',
handler: onFetchqRegisterQueue,
target: "$FETCHQ_REGISTER_QUEUE",
handler: onFetchqRegisterQueue
},
{
...service,
target: '$FETCHQ_READY',
handler: onFetchqReady,
target: "$FETCHQ_READY",
handler: onFetchqReady
},
{
...service,
target: '$FETCHQ_REGISTER_WORKER',
handler: onFetchqRegisterWorker,
},
target: "$FETCHQ_REGISTER_WORKER",
handler: onFetchqRegisterWorker
}
];
};
/**
* Push into the tasks queue all the registered tasks
*/
module.exports = async ({ fetchq }, { getContext }) => {
const queueName = getContext('fetchq.task.queueName');
for (const { subject, payload, resetOnBoot } of getContext(
'fetchq.task.register',
)) {
module.exports = async ({ fetchq }, { getContext, log }) => {
const queueName = getContext("fetchq.task.queueName");
const resetTask = getContext("fetchq.task.reset");
for (const {
subject,
payload = {},
firstIteration = "now",
resetOnBoot
} of getContext("fetchq.task.register")) {
// Schedule the task:
log.info(`[fetchq-task] Schedule "${subject}" at "${firstIteration}"`);
await fetchq.doc.push(queueName, {
subject,
payload,
...(firstIteration === "now" ? {} : { nextIteration: firstIteration })
});
if (resetOnBoot) {
const encodedPayload = JSON.stringify(payload || {}).replace(
/'/g,
"''''",
// ResetOnBoot the task:
if (resetOnBoot)
await resetTask(
subject,
`Reset on Boot "${subject}" at "${firstIteration}"`
);
const sql = `
UPDATE fetchq_data.${queueName}__docs SET
"status" = 1,
"attempts" = 0,
"iterations" = 0,
"created_at" = now(),
"last_iteration" = NULL,
"next_iteration" = now(),
"payload" = $1
WHERE "subject" = $2;
`;
await fetchq.pool.query(sql, [encodedPayload, subject]);
}
}
};

@@ -0,1 +1,4 @@

const taskReset = require("./task-reset");
const taskRun = require("./task-run");
/**

@@ -8,9 +11,10 @@ * Collect tasks definition from:

*/
module.exports = ({ createExtension, getConfig, setContext }) => {
const queueName = getConfig('fetchq.task.queue.name', 'task');
const queueSettings = getConfig('fetchq.task.queue.settings', {});
const workerSettings = getConfig('fetchq.task.worker.settings', {});
module.exports = (ctx) => {
const { log, createExtension, getConfig, setContext, getContext } = ctx;
const queueName = getConfig("fetchq.task.queue.name", "task");
const queueSettings = getConfig("fetchq.task.queue.settings", {});
const workerSettings = getConfig("fetchq.task.worker.settings", {});
// Ensure a list of tasks is provided
let configuredTasks = getConfig('fetchq.task.register', []);
let configuredTasks = getConfig("fetchq.task.register", []);
if (!Array.isArray(configuredTasks)) {

@@ -26,3 +30,9 @@ configuredTasks = [configuredTasks];

// Collect from other extensions
...createExtension.sync('$FETCHQ_REGISTER_TASK').map(($) => $[0]),
...createExtension
.sync("$FETCHQ_REGISTER_TASK")
.map(($) => $[0])
// Support array form
.reduce((acc, curr) => {
return [...acc, ...(Array.isArray(curr) ? curr : [curr])];
}, [])
];

@@ -37,3 +47,3 @@

setContext('fetchq.task', {
setContext("fetchq.task", {
queueName,

@@ -43,3 +53,5 @@ queueSettings,

register: registerTasks,
reset: (subject, msg) => taskReset(subject, msg, ctx),
run: (subject, msg) => taskRun(subject, msg, ctx)
});
};
/**
* Matches the task definition from memory and executes it.
*/
module.exports = (doc, ctx) => {
module.exports = async (doc, ctx) => {
const task = ctx

@@ -13,3 +13,12 @@ .getContext("fetchq.task.register")

return task.handler(doc, ctx);
// Execute and honor the returning action:
const result = await task.handler(doc, ctx);
if (result) return result;
// Use the configuration for setting up next execution:
if (task.nextIteration) {
return doc.reschedule(task.nextIteration);
}
return doc.complete();
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc