@lbu/store
Advanced tools
Comparing version 0.0.88 to 0.0.89
@@ -434,2 +434,6 @@ import * as minioVendor from "minio"; | ||
* Does not throw when a job is already pending with the same name. | ||
* If already exists will update the priority and interval. | ||
* | ||
* The recurring job handler will reschedule the job based on it's own scheduledAt. However if | ||
* the newly scheduled job is not in the future, the interval is added on to the current time. | ||
*/ | ||
@@ -519,3 +523,3 @@ export function addRecurringJobToQueue( | ||
*/ | ||
export function isQueryPart(value: any): value is QueryPart; | ||
export function isQueryObject(value: any): value is QueryPart; | ||
@@ -522,0 +526,0 @@ /** |
{ | ||
"name": "@lbu/store", | ||
"version": "0.0.88", | ||
"version": "0.0.89", | ||
"description": "Postgres & S3-compatible wrappers for common things", | ||
@@ -18,4 +18,4 @@ "main": "./index.js", | ||
"dependencies": { | ||
"@lbu/insight": "0.0.88", | ||
"@lbu/stdlib": "0.0.88", | ||
"@lbu/insight": "0.0.89", | ||
"@lbu/stdlib": "0.0.89", | ||
"@types/minio": "7.0.6", | ||
@@ -47,3 +47,3 @@ "mime-types": "2.1.27", | ||
}, | ||
"gitHead": "f95775597389435d11e6ef713651a2d1d4816067" | ||
"gitHead": "96683eacc77885317739cc1dc3a30d58c74324c7" | ||
} |
@@ -14,2 +14,3 @@ # @lbu/store | ||
- Script runner, can watch & reload almost anything | ||
- Test and benchmark runner | ||
- Flexible code generators supporting routers, validators, api clients, CRUD | ||
@@ -39,4 +40,7 @@ queries and more in the future. | ||
- Run user scripts (in watch mode) | ||
- Run the linter or tests | ||
- Run the linter | ||
- A LBU based boilerplate | ||
- Test runner | ||
- Benchmark runner | ||
- Necessary Docker container management | ||
@@ -54,2 +58,3 @@ **@lbu/lint-config**: | ||
- Various utilities to get insight in the running process | ||
- A manual event system | ||
@@ -60,3 +65,2 @@ **@lbu/stdlib**: | ||
- Wrappers for child_process execution and spawning | ||
- Basic templating system | ||
- A `mainFn` wrapper that reads `.env` and calls the provided function if the | ||
@@ -72,3 +76,3 @@ file is the process entrypoint | ||
- Send file helper | ||
- Re-exports koa-session and koa-compose | ||
- Session support with safe browser readable cookies | ||
@@ -83,4 +87,4 @@ **@lbu/store**: | ||
- Caching files from S3 in memory or on local disk | ||
- Postgres powered JobQueue implementation | ||
- Supports priority, scheduling, multiple async workers | ||
- Postgres powered queue implementation | ||
- Supports priority, scheduling, multiple async workers and recurring jobs | ||
- koa-session compatible SessionStore backed by Postgres | ||
@@ -92,6 +96,7 @@ | ||
- router, with wildcard and path parameter support | ||
- validators, with pre- and postValidate hooks | ||
- queries, CRUD postgres queries | ||
- validators, pure JavaScript implementation | ||
- sql, CRUD postgres queries, graph traversal | ||
- Axios based api client | ||
- Typescript or JSDoc types | ||
- TypeScript or JSDoc types | ||
- react-query hooks | ||
- An extendable set of types: | ||
@@ -123,3 +128,4 @@ - boolean, number, string; | ||
Although some parts heavily rely on conventions set by the packages, we | ||
currently aim not to be a framework. However, the idea of being a bit more of | ||
framework is not completely out of the door yet. | ||
currently aim not to be a framework. We aim to provide a good developer | ||
experience, useful abstractions around the basics, and a stable backend <-> | ||
client interface. |
180
src/queue.js
@@ -10,11 +10,18 @@ import { log } from "@lbu/insight"; | ||
UPDATE "job" | ||
SET "isComplete" = TRUE, | ||
"updatedAt" = now() | ||
WHERE id = (SELECT "id" | ||
FROM "job" | ||
WHERE NOT "isComplete" | ||
AND "scheduledAt" < now() | ||
ORDER BY "scheduledAt", "priority" | ||
FOR UPDATE SKIP LOCKED | ||
LIMIT 1) | ||
SET | ||
"isComplete" = TRUE, | ||
"updatedAt" = now() | ||
WHERE | ||
id = ( | ||
SELECT | ||
"id" | ||
FROM | ||
"job" | ||
WHERE | ||
NOT "isComplete" | ||
AND "scheduledAt" < now() | ||
ORDER BY | ||
"scheduledAt", "priority" FOR UPDATE SKIP LOCKED | ||
LIMIT 1 | ||
) | ||
RETURNING id | ||
@@ -26,12 +33,19 @@ `, | ||
UPDATE "job" | ||
SET "isComplete" = TRUE, | ||
"updatedAt" = now() | ||
WHERE id = (SELECT "id" | ||
FROM "job" | ||
WHERE NOT "isComplete" | ||
AND "scheduledAt" < now() | ||
AND "name" = ${name} | ||
ORDER BY "scheduledAt", "priority" | ||
FOR UPDATE SKIP LOCKED | ||
LIMIT 1) | ||
SET | ||
"isComplete" = TRUE, | ||
"updatedAt" = now() | ||
WHERE | ||
id = ( | ||
SELECT | ||
"id" | ||
FROM | ||
"job" | ||
WHERE | ||
NOT "isComplete" | ||
AND "scheduledAt" < now() | ||
AND "name" = ${name} | ||
ORDER BY | ||
"scheduledAt", "priority" FOR UPDATE SKIP LOCKED | ||
LIMIT 1 | ||
) | ||
RETURNING "id" | ||
@@ -42,6 +56,9 @@ `, | ||
getPendingQueueSize: (sql) => sql` | ||
SELECT sum(CASE WHEN "scheduledAt" < now() THEN 1 ELSE 0 END) AS "pendingCount", | ||
sum(CASE WHEN "scheduledAt" >= now() THEN 1 ELSE 0 END) AS "scheduledCount" | ||
FROM "job" | ||
WHERE NOT "isComplete" | ||
SELECT | ||
sum(CASE WHEN "scheduledAt" < now() THEN 1 ELSE 0 END) AS "pendingCount", | ||
sum(CASE WHEN "scheduledAt" >= now() THEN 1 ELSE 0 END) AS "scheduledCount" | ||
FROM | ||
"job" | ||
WHERE | ||
NOT "isComplete" | ||
`, | ||
@@ -51,6 +68,9 @@ | ||
getPendingQueueSizeForName: (sql, name) => sql` | ||
SELECT sum(CASE WHEN "scheduledAt" < now() THEN 1 ELSE 0 END) AS "pendingCount", | ||
sum(CASE WHEN "scheduledAt" >= now() THEN 1 ELSE 0 END) AS "scheduledCount" | ||
FROM "job" | ||
WHERE NOT "isComplete" | ||
SELECT | ||
sum(CASE WHEN "scheduledAt" < now() THEN 1 ELSE 0 END) AS "pendingCount", | ||
sum(CASE WHEN "scheduledAt" >= now() THEN 1 ELSE 0 END) AS "scheduledCount" | ||
FROM | ||
"job" | ||
WHERE | ||
NOT "isComplete" | ||
AND "name" = ${name} | ||
@@ -60,7 +80,11 @@ `, | ||
// Returns time in milliseconds | ||
getAverageJobTime: (sql, dateStart, dateEnd) => sql` | ||
SELECT avg((EXTRACT(EPOCH FROM "updatedAt" AT TIME ZONE 'UTC') * 1000) - | ||
(EXTRACT(EPOCH FROM "scheduledAt" AT TIME ZONE 'UTC') * 1000)) AS "completionTime" | ||
FROM "job" | ||
WHERE "isComplete" | ||
getAverageJobTime: (sql, name, dateStart, dateEnd) => sql` | ||
SELECT | ||
avg((EXTRACT(EPOCH FROM "updatedAt" AT TIME ZONE 'UTC') * 1000) - | ||
(EXTRACT(EPOCH FROM "scheduledAt" AT TIME ZONE 'UTC') * 1000)) AS "completionTime" | ||
FROM | ||
"job" | ||
WHERE | ||
"isComplete" IS TRUE | ||
AND (COALESCE(${name ?? null}) IS NULL OR "name" = ${name ?? null}) | ||
AND "updatedAt" > ${dateStart} | ||
@@ -70,13 +94,2 @@ AND "updatedAt" <= ${dateEnd}; | ||
// Returns time in milliseconds | ||
getAverageJobTimeForName: (sql, name, dateStart, dateEnd) => sql` | ||
SELECT avg((EXTRACT(EPOCH FROM "updatedAt" AT TIME ZONE 'UTC') * 1000) - | ||
(EXTRACT(EPOCH FROM "scheduledAt" AT TIME ZONE 'UTC') * 1000)) AS "completionTime" | ||
FROM "job" | ||
WHERE "isComplete" | ||
AND name = ${name} | ||
AND "updatedAt" > ${dateStart} | ||
AND "updatedAt" <= ${dateEnd}; | ||
`, | ||
/** | ||
@@ -88,8 +101,28 @@ * @param {Postgres} sql | ||
getRecurringJobForName: (sql, name) => sql` | ||
SELECT id | ||
FROM "job" | ||
WHERE name = ${LBU_RECURRING_JOB} | ||
SELECT | ||
id | ||
FROM | ||
"job" | ||
WHERE | ||
name = ${LBU_RECURRING_JOB} | ||
AND "isComplete" IS FALSE | ||
AND data ->> 'name' = ${name} | ||
`, | ||
/** | ||
* @param {Postgres} sql | ||
* @param {number} id | ||
* @param {number} priority | ||
* @param {StoreJobInterval} interval | ||
*/ | ||
updateRecurringJob: (sql, id, priority, interval) => sql` | ||
UPDATE "job" | ||
SET | ||
"priority" = ${priority}, | ||
"data" = jsonb_set("data", ${sql.array(["interval"])}, ${sql.json( | ||
interval, | ||
)}) | ||
WHERE | ||
id = ${id} | ||
`, | ||
}; | ||
@@ -174,3 +207,3 @@ | ||
if (this.name) { | ||
return getAverageTimeToJobCompletionForName( | ||
return getAverageTimeToJobCompletion( | ||
this.sql, | ||
@@ -183,3 +216,8 @@ this.name, | ||
return getAverageTimeToJobCompletion(this.sql, startDate, endDate); | ||
return getAverageTimeToJobCompletion( | ||
this.sql, | ||
undefined, | ||
startDate, | ||
endDate, | ||
); | ||
} | ||
@@ -297,2 +335,3 @@ | ||
* Does not throw when a job is already pending with the same name. | ||
* If exists will update the interval | ||
* | ||
@@ -314,2 +353,8 @@ * @param {Postgres} sql | ||
if (existingJobs.length > 0) { | ||
await queueQueries.updateRecurringJob( | ||
sql, | ||
existingJobs[0].id, | ||
priority, | ||
interval, | ||
); | ||
return; | ||
@@ -366,2 +411,3 @@ } | ||
* @param {Postgres} sql | ||
* @param {string|undefined} name | ||
* @param {Date} startDate | ||
@@ -371,5 +417,6 @@ * @param {Date} endDate | ||
*/ | ||
async function getAverageTimeToJobCompletion(sql, startDate, endDate) { | ||
async function getAverageTimeToJobCompletion(sql, name, startDate, endDate) { | ||
const [result] = await queueQueries.getAverageJobTime( | ||
sql, | ||
name, | ||
startDate, | ||
@@ -379,34 +426,10 @@ endDate, | ||
return parseFloat(result?.completionTime ?? 0); | ||
return Math.floor(parseFloat(result?.completionTime ?? "0")); | ||
} | ||
/** | ||
* Return the average time between scheduled and completed for jobs completed in the | ||
* provided time range | ||
* Handles recurring jobs, by scheduling the 'child' and the current job again. | ||
* If the next scheduled item is not in the future, the interval is added to the current Date. | ||
* | ||
* @param {Postgres} sql | ||
* @param {string} name | ||
* @param {Date} startDate | ||
* @param {Date} endDate | ||
* @returns {Promise<number>} | ||
*/ | ||
async function getAverageTimeToJobCompletionForName( | ||
sql, | ||
name, | ||
startDate, | ||
endDate, | ||
) { | ||
const [result] = await queueQueries.getAverageJobTimeForName( | ||
sql, | ||
name, | ||
startDate, | ||
endDate, | ||
); | ||
return result?.completionTime ?? 0; | ||
} | ||
/** | ||
* Handles recurring jobs, by scheduling the 'child' and the current job again | ||
* | ||
* @param {Postgres} sql | ||
* @param {StoreJob} job | ||
@@ -421,3 +444,6 @@ */ | ||
const nextSchedule = getNextScheduledAt(scheduledAt, interval); | ||
let nextSchedule = getNextScheduledAt(scheduledAt, interval); | ||
if (nextSchedule.getTime() < Date.now()) { | ||
nextSchedule = getNextScheduledAt(new Date(), interval); | ||
} | ||
@@ -424,0 +450,0 @@ // Dispatch 'job' with higher priority |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
169107
4959
124
+ Added@lbu/insight@0.0.89(transitive)
+ Added@lbu/stdlib@0.0.89(transitive)
+ Added@types/node@14.14.5(transitive)
- Removed@lbu/insight@0.0.88(transitive)
- Removed@lbu/stdlib@0.0.88(transitive)
- Removed@types/node@14.14.2(transitive)
Updated@lbu/insight@0.0.89
Updated@lbu/stdlib@0.0.89