@lbu/store
Advanced tools
Comparing version 0.0.43 to 0.0.44
{ | ||
"name": "@lbu/store", | ||
"version": "0.0.43", | ||
"version": "0.0.44", | ||
"description": "Postgres & S3-compatible wrappers for common things", | ||
@@ -17,7 +17,7 @@ "main": "./index.js", | ||
"dependencies": { | ||
"@lbu/insight": "0.0.43", | ||
"@lbu/stdlib": "0.0.43", | ||
"@lbu/insight": "0.0.44", | ||
"@lbu/stdlib": "0.0.44", | ||
"mime-types": "2.1.27", | ||
"minio": "7.0.16", | ||
"postgres": "1.0.2" | ||
"postgres": "2.0.0-beta.0" | ||
}, | ||
@@ -39,3 +39,3 @@ "author": { | ||
}, | ||
"gitHead": "f3b1d86fb7237f6bdc30f3499e0a383a0e0724da" | ||
"gitHead": "7b7f5bb4365346c8e3098c0eb8a8b43ad306635f" | ||
} |
@@ -52,3 +52,2 @@ # @lbu/store | ||
- Various utilities to get insight in the running process | ||
- Parser to process production logs in an external process | ||
@@ -55,0 +54,0 @@ **@lbu/stdlib**: |
@@ -1,2 +0,1 @@ | ||
import { isNil, pathJoin } from "@lbu/stdlib"; | ||
import { once } from "events"; | ||
@@ -6,2 +5,3 @@ import { createReadStream, createWriteStream } from "fs"; | ||
import { promisify } from "util"; | ||
import { isNil, pathJoin } from "@lbu/stdlib"; | ||
import { getFileStream } from "./files.js"; | ||
@@ -8,0 +8,0 @@ |
@@ -0,3 +1,3 @@ | ||
import { createReadStream } from "fs"; | ||
import { uuid } from "@lbu/stdlib"; | ||
import { createReadStream } from "fs"; | ||
import mime from "mime-types"; | ||
@@ -9,3 +9,11 @@ import { storeQueries } from "./generated/queries.js"; | ||
copyFile: (sql, targetId, targetBucket, sourceId, sourceBucket) => | ||
sql`INSERT INTO file_store (id, bucket_name, content_type, content_length, filename) SELECT ${targetId}, ${targetBucket}, content_type, content_length, filename FROM file_store WHERE id = ${sourceId} AND bucket_name = ${sourceBucket} RETURNING id`, | ||
sql`INSERT INTO "fileStore" ("id", "bucketName", "contentType", "contentLength", "filename") | ||
SELECT | ||
${targetId}, | ||
${targetBucket}, | ||
"contentType", | ||
"contentLength", | ||
"filename" | ||
FROM "fileStore" | ||
WHERE id = ${sourceId} AND "bucketName" = ${sourceBucket} RETURNING id`, | ||
}; | ||
@@ -12,0 +20,0 @@ |
@@ -15,12 +15,16 @@ // Generated by @lbu/code-gen | ||
where, | ||
) => sql`SELECT fs.id as "id", fs.bucket_name as "bucketName", fs.content_length as "contentLength", fs.content_type as "contentType", fs.filename as "filename", fs.created_at as "createdAt", fs.updated_at as "updatedAt" FROM file_store fs | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR fs.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR fs.id = ANY (${sql.array( | ||
) => sql`SELECT fs."id", fs."bucketName", fs."contentLength", fs."contentType", fs."filename", fs."createdAt", fs."updatedAt" FROM "fileStore" fs | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR fs."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${ | ||
where.idIn ?? null | ||
}, NULL) IS NULL OR fs."id" = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.bucketName | ||
}, NULL) IS NULL OR fs.bucket_name = ${where.bucketName}) AND (COALESCE(${ | ||
where.bucketNameLike | ||
}, NULL) IS NULL OR fs.bucket_name LIKE ${"%" + where.bucketNameLike + "%"}) | ||
where.bucketName ?? null | ||
}, NULL) IS NULL OR fs."bucketName" = ${ | ||
where.bucketName ?? null | ||
}) AND (COALESCE(${ | ||
where.bucketNameLike ?? null | ||
}, NULL) IS NULL OR fs."bucketName" LIKE ${"%" + where.bucketNameLike + "%"}) | ||
`, | ||
@@ -34,14 +38,20 @@ | ||
fileStoreCount: async (sql, where) => { | ||
const result = await sql`SELECT count(*) as gen_count FROM file_store fs | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR fs.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR fs.id = ANY (${sql.array( | ||
const result = await sql`SELECT count(*) as "genCount" FROM "fileStore" fs | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR fs."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${ | ||
where.idIn ?? null | ||
}, NULL) IS NULL OR fs."id" = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.bucketName | ||
}, NULL) IS NULL OR fs.bucket_name = ${where.bucketName}) AND (COALESCE(${ | ||
where.bucketNameLike | ||
}, NULL) IS NULL OR fs.bucket_name LIKE ${"%" + where.bucketNameLike + "%"}) | ||
where.bucketName ?? null | ||
}, NULL) IS NULL OR fs."bucketName" = ${ | ||
where.bucketName ?? null | ||
}) AND (COALESCE(${ | ||
where.bucketNameLike ?? null | ||
}, NULL) IS NULL OR fs."bucketName" LIKE ${ | ||
"%" + where.bucketNameLike + "%" | ||
}) | ||
`; | ||
return result?.[0]?.gen_count ?? 0; | ||
return result?.[0]?.genCount ?? 0; | ||
}, | ||
@@ -54,12 +64,16 @@ | ||
*/ | ||
fileStoreDelete: (sql, where) => sql`DELETE FROM file_store fs | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR fs.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR fs.id = ANY (${sql.array( | ||
fileStoreDelete: (sql, where) => sql`DELETE FROM "fileStore" fs | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR fs."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${ | ||
where.idIn ?? null | ||
}, NULL) IS NULL OR fs."id" = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.bucketName | ||
}, NULL) IS NULL OR fs.bucket_name = ${where.bucketName}) AND (COALESCE(${ | ||
where.bucketNameLike | ||
}, NULL) IS NULL OR fs.bucket_name LIKE ${"%" + where.bucketNameLike + "%"}) | ||
where.bucketName ?? null | ||
}, NULL) IS NULL OR fs."bucketName" = ${ | ||
where.bucketName ?? null | ||
}) AND (COALESCE(${ | ||
where.bucketNameLike ?? null | ||
}, NULL) IS NULL OR fs."bucketName" LIKE ${"%" + where.bucketNameLike + "%"}) | ||
`, | ||
@@ -74,22 +88,23 @@ | ||
const data = Array.isArray(insert) ? insert : [insert]; | ||
const input = []; | ||
if (data.length === 0) { | ||
return []; | ||
} | ||
let query = `INSERT INTO "fileStore" ("bucketName", "contentLength", "contentType", "filename", "createdAt", "updatedAt") VALUES `; | ||
const argList = []; | ||
let idx = 1; | ||
for (const it of data) { | ||
input.push({ | ||
bucket_name: it.bucketName ?? undefined, | ||
content_length: it.contentLength ?? undefined, | ||
content_type: it.contentType ?? undefined, | ||
filename: it.filename ?? undefined, | ||
created_at: it.createdAt ?? new Date(), | ||
updated_at: it.updatedAt ?? new Date(), | ||
}); | ||
argList.push( | ||
it.bucketName ?? undefined, | ||
it.contentLength ?? undefined, | ||
it.contentType ?? undefined, | ||
it.filename ?? undefined, | ||
it.createdAt ?? new Date(), | ||
it.updatedAt ?? new Date(), | ||
); | ||
query += `($${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}),`; | ||
} | ||
return sql`INSERT INTO file_store ${sql( | ||
input, | ||
"bucket_name", | ||
"content_length", | ||
"content_type", | ||
"filename", | ||
"created_at", | ||
"updated_at", | ||
)} RETURNING id as "id", bucket_name as "bucketName", content_length as "contentLength", content_type as "contentType", filename as "filename", created_at as "createdAt", updated_at as "updatedAt"`; | ||
// Remove trailing comma | ||
query = query.substring(0, query.length - 1); | ||
query += ` RETURNING "id", "bucketName", "contentLength", "contentType", "filename", "createdAt", "updatedAt"`; | ||
return sql.unsafe(query, argList); | ||
}, | ||
@@ -104,48 +119,74 @@ | ||
fileStoreUpdate: async (sql, value, where) => { | ||
const updateValue = {}; | ||
await sql`INSERT INTO "fileStoreHistory" ("fileStoreId", "bucketName", "contentLength", "contentType", "filename", "createdAt") | ||
SELECT id, "bucketName", "contentLength", "contentType", "filename", COALESCE("updatedAt", "createdAt", now()) FROM "fileStore" fs | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR fs."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${ | ||
where.idIn ?? null | ||
}, NULL) IS NULL OR fs."id" = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.bucketName ?? null | ||
}, NULL) IS NULL OR fs."bucketName" = ${ | ||
where.bucketName ?? null | ||
}) AND (COALESCE(${ | ||
where.bucketNameLike ?? null | ||
}, NULL) IS NULL OR fs."bucketName" LIKE ${ | ||
"%" + where.bucketNameLike + "%" | ||
}) | ||
`; | ||
let query = `UPDATE "fileStore" fs SET `; | ||
const argList = []; | ||
let idx = 1; | ||
if (value["bucketName"] !== undefined) { | ||
updateValue["bucket_name"] = value["bucketName"]; | ||
query += `"bucketName" = $${idx++}, `; | ||
argList.push(value["bucketName"]); | ||
} | ||
if (value["contentLength"] !== undefined) { | ||
updateValue["content_length"] = value["contentLength"]; | ||
query += `"contentLength" = $${idx++}, `; | ||
argList.push(value["contentLength"]); | ||
} | ||
if (value["contentType"] !== undefined) { | ||
updateValue["content_type"] = value["contentType"]; | ||
query += `"contentType" = $${idx++}, `; | ||
argList.push(value["contentType"]); | ||
} | ||
if (value["filename"] !== undefined) { | ||
updateValue["filename"] = value["filename"]; | ||
query += `"filename" = $${idx++}, `; | ||
argList.push(value["filename"]); | ||
} | ||
if (value["createdAt"] !== undefined) { | ||
updateValue["created_at"] = value["createdAt"]; | ||
query += `"createdAt" = $${idx++}, `; | ||
argList.push(value["createdAt"]); | ||
} | ||
if (value["updatedAt"] !== undefined) { | ||
updateValue["updated_at"] = value["updatedAt"]; | ||
query += `"updatedAt" = $${idx++}, `; | ||
argList.push(new Date()); | ||
query = query.substring(0, query.length - 2); | ||
query += ` WHERE `; | ||
if (where.id !== undefined) { | ||
query += `fs."id" `; | ||
query += `= $${idx++}`; | ||
argList.push(where.id); | ||
query += " AND "; | ||
} | ||
updateValue.updated_at = new Date(); | ||
await sql`INSERT INTO file_store_history (file_store_id, bucket_name, content_length, content_type, filename, created_at) | ||
SELECT id, bucket_name, content_length, content_type, filename, COALESCE(updated_at, created_at, now()) FROM file_store fs | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR fs.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR fs.id = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.bucketName | ||
}, NULL) IS NULL OR fs.bucket_name = ${where.bucketName}) AND (COALESCE(${ | ||
where.bucketNameLike | ||
}, NULL) IS NULL OR fs.bucket_name LIKE ${"%" + where.bucketNameLike + "%"}) | ||
`; | ||
return sql`UPDATE file_store fs set ${sql( | ||
updateValue, | ||
...Object.keys(updateValue), | ||
)} | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR fs.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR fs.id = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.bucketName | ||
}, NULL) IS NULL OR fs.bucket_name = ${where.bucketName}) AND (COALESCE(${ | ||
where.bucketNameLike | ||
}, NULL) IS NULL OR fs.bucket_name LIKE ${"%" + where.bucketNameLike + "%"}) | ||
RETURNING fs.id as "id", fs.bucket_name as "bucketName", fs.content_length as "contentLength", fs.content_type as "contentType", fs.filename as "filename", fs.created_at as "createdAt", fs.updated_at as "updatedAt"`; | ||
if (where.idIn !== undefined) { | ||
query += `fs."id" `; | ||
query += `= ANY $${idx++}::uuid[]`; | ||
argList.push(where.idIn); | ||
query += " AND "; | ||
} | ||
if (where.bucketName !== undefined) { | ||
query += `fs."bucketName" `; | ||
query += `= $${idx++}`; | ||
argList.push(where.bucketName); | ||
query += " AND "; | ||
} | ||
if (where.bucketNameLike !== undefined) { | ||
query += `fs."bucketName" `; | ||
query += `LIKE $${idx++}`; | ||
argList.push(`%${where.bucketNameLike}%`); | ||
query += " AND "; | ||
} | ||
query = query.substring(0, query.length - 4); | ||
query += ` RETURNING fs."id", fs."bucketName", fs."contentLength", fs."contentType", fs."filename", fs."createdAt", fs."updatedAt"`; | ||
return sql.unsafe(query, argList); | ||
}, | ||
@@ -161,12 +202,16 @@ | ||
where, | ||
) => sql`SELECT fs.id as "id", fs.bucket_name as "bucketName", fs.content_length as "contentLength", fs.content_type as "contentType", fs.filename as "filename", fs.created_at as "createdAt", fs.updated_at as "updatedAt", array_agg(fsh.*) as history FROM file_store fs LEFT JOIN file_store_history fsh ON fs.id = fsh.file_store_id | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR fs.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR fs.id = ANY (${sql.array( | ||
) => sql`SELECT fs."id", fs."bucketName", fs."contentLength", fs."contentType", fs."filename", fs."createdAt", fs."updatedAt", array_agg(to_jsonb(fsh.*)) as history FROM "fileStore" fs LEFT JOIN "fileStoreHistory" fsh ON fs.id = fsh."fileStoreId" | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR fs."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${ | ||
where.idIn ?? null | ||
}, NULL) IS NULL OR fs."id" = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.bucketName | ||
}, NULL) IS NULL OR fs.bucket_name = ${where.bucketName}) AND (COALESCE(${ | ||
where.bucketNameLike | ||
}, NULL) IS NULL OR fs.bucket_name LIKE ${"%" + where.bucketNameLike + "%"}) | ||
where.bucketName ?? null | ||
}, NULL) IS NULL OR fs."bucketName" = ${ | ||
where.bucketName ?? null | ||
}) AND (COALESCE(${ | ||
where.bucketNameLike ?? null | ||
}, NULL) IS NULL OR fs."bucketName" LIKE ${"%" + where.bucketNameLike + "%"}) | ||
GROUP BY fs.id`, | ||
@@ -182,14 +227,18 @@ | ||
where, | ||
) => sql`SELECT ss.id as "id", ss.expires as "expires", ss.data as "data", ss.created_at as "createdAt", ss.updated_at as "updatedAt" FROM session_store ss | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR ss.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR ss.id = ANY (${sql.array( | ||
) => sql`SELECT ss."id", ss."expires", ss."data", ss."createdAt", ss."updatedAt" FROM "sessionStore" ss | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR ss."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${ | ||
where.idIn ?? null | ||
}, NULL) IS NULL OR ss."id" = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${where.expires}, NULL) IS NULL OR ss.expires = ${ | ||
where.expires | ||
}) AND (COALESCE(${where.expiresGreaterThan}, NULL) IS NULL OR ss.expires > ${ | ||
where.expiresGreaterThan | ||
}) AND (COALESCE(${where.expiresLowerThan}, NULL) IS NULL OR ss.expires < ${ | ||
where.expiresLowerThan | ||
}) | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.expires ?? null | ||
}, NULL) IS NULL OR ss."expires" = ${where.expires ?? null}) AND (COALESCE(${ | ||
where.expiresGreaterThan ?? null | ||
}, NULL) IS NULL OR ss."expires" > ${ | ||
where.expiresGreaterThan ?? null | ||
}) AND (COALESCE(${ | ||
where.expiresLowerThan ?? null | ||
}, NULL) IS NULL OR ss."expires" < ${where.expiresLowerThan ?? null}) | ||
`, | ||
@@ -203,18 +252,22 @@ | ||
sessionStoreCount: async (sql, where) => { | ||
const result = await sql`SELECT count(*) as gen_count FROM session_store ss | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR ss.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR ss.id = ANY (${sql.array( | ||
const result = await sql`SELECT count(*) as "genCount" FROM "sessionStore" ss | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR ss."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${ | ||
where.idIn ?? null | ||
}, NULL) IS NULL OR ss."id" = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.expires | ||
}, NULL) IS NULL OR ss.expires = ${where.expires}) AND (COALESCE(${ | ||
where.expiresGreaterThan | ||
}, NULL) IS NULL OR ss.expires > ${ | ||
where.expiresGreaterThan | ||
}) AND (COALESCE(${where.expiresLowerThan}, NULL) IS NULL OR ss.expires < ${ | ||
where.expiresLowerThan | ||
}) | ||
where.expires ?? null | ||
}, NULL) IS NULL OR ss."expires" = ${ | ||
where.expires ?? null | ||
}) AND (COALESCE(${ | ||
where.expiresGreaterThan ?? null | ||
}, NULL) IS NULL OR ss."expires" > ${ | ||
where.expiresGreaterThan ?? null | ||
}) AND (COALESCE(${ | ||
where.expiresLowerThan ?? null | ||
}, NULL) IS NULL OR ss."expires" < ${where.expiresLowerThan ?? null}) | ||
`; | ||
return result?.[0]?.gen_count ?? 0; | ||
return result?.[0]?.genCount ?? 0; | ||
}, | ||
@@ -227,14 +280,18 @@ | ||
*/ | ||
sessionStoreDelete: (sql, where) => sql`DELETE FROM session_store ss | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR ss.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR ss.id = ANY (${sql.array( | ||
sessionStoreDelete: (sql, where) => sql`DELETE FROM "sessionStore" ss | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR ss."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${ | ||
where.idIn ?? null | ||
}, NULL) IS NULL OR ss."id" = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${where.expires}, NULL) IS NULL OR ss.expires = ${ | ||
where.expires | ||
}) AND (COALESCE(${where.expiresGreaterThan}, NULL) IS NULL OR ss.expires > ${ | ||
where.expiresGreaterThan | ||
}) AND (COALESCE(${where.expiresLowerThan}, NULL) IS NULL OR ss.expires < ${ | ||
where.expiresLowerThan | ||
}) | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.expires ?? null | ||
}, NULL) IS NULL OR ss."expires" = ${where.expires ?? null}) AND (COALESCE(${ | ||
where.expiresGreaterThan ?? null | ||
}, NULL) IS NULL OR ss."expires" > ${ | ||
where.expiresGreaterThan ?? null | ||
}) AND (COALESCE(${ | ||
where.expiresLowerThan ?? null | ||
}, NULL) IS NULL OR ss."expires" < ${where.expiresLowerThan ?? null}) | ||
`, | ||
@@ -249,18 +306,21 @@ | ||
const data = Array.isArray(insert) ? insert : [insert]; | ||
const input = []; | ||
if (data.length === 0) { | ||
return []; | ||
} | ||
let query = `INSERT INTO "sessionStore" ("expires", "data", "createdAt", "updatedAt") VALUES `; | ||
const argList = []; | ||
let idx = 1; | ||
for (const it of data) { | ||
input.push({ | ||
expires: it.expires ?? undefined, | ||
data: JSON.stringify(it.data ?? {}), | ||
created_at: it.createdAt ?? new Date(), | ||
updated_at: it.updatedAt ?? new Date(), | ||
}); | ||
argList.push( | ||
it.expires ?? undefined, | ||
JSON.stringify(it.data ?? {}), | ||
it.createdAt ?? new Date(), | ||
it.updatedAt ?? new Date(), | ||
); | ||
query += `($${idx++}, $${idx++}, $${idx++}, $${idx++}),`; | ||
} | ||
return sql`INSERT INTO session_store ${sql( | ||
input, | ||
"expires", | ||
"data", | ||
"created_at", | ||
"updated_at", | ||
)} RETURNING id as "id", expires as "expires", data as "data", created_at as "createdAt", updated_at as "updatedAt"`; | ||
// Remove trailing comma | ||
query = query.substring(0, query.length - 1); | ||
query += ` RETURNING "id", "expires", "data", "createdAt", "updatedAt"`; | ||
return sql.unsafe(query, argList); | ||
}, | ||
@@ -275,34 +335,54 @@ | ||
sessionStoreUpdate: (sql, value, where) => { | ||
const updateValue = {}; | ||
let query = `UPDATE "sessionStore" ss SET `; | ||
const argList = []; | ||
let idx = 1; | ||
if (value["expires"] !== undefined) { | ||
updateValue["expires"] = value["expires"]; | ||
query += `"expires" = $${idx++}, `; | ||
argList.push(value["expires"]); | ||
} | ||
if (value["data"] !== undefined) { | ||
updateValue["data"] = JSON.stringify(value["data"]); | ||
query += `"data" = $${idx++}, `; | ||
argList.push(JSON.stringify(value["data"])); | ||
} | ||
if (value["createdAt"] !== undefined) { | ||
updateValue["created_at"] = value["createdAt"]; | ||
query += `"createdAt" = $${idx++}, `; | ||
argList.push(value["createdAt"]); | ||
} | ||
if (value["updatedAt"] !== undefined) { | ||
updateValue["updated_at"] = value["updatedAt"]; | ||
query += `"updatedAt" = $${idx++}, `; | ||
argList.push(new Date()); | ||
query = query.substring(0, query.length - 2); | ||
query += ` WHERE `; | ||
if (where.id !== undefined) { | ||
query += `ss."id" `; | ||
query += `= $${idx++}`; | ||
argList.push(where.id); | ||
query += " AND "; | ||
} | ||
updateValue.updated_at = new Date(); | ||
return sql`UPDATE session_store ss set ${sql( | ||
updateValue, | ||
...Object.keys(updateValue), | ||
)} | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR ss.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idIn}, NULL) IS NULL OR ss.id = ANY (${sql.array( | ||
where?.idIn ?? [], | ||
)}::uuid[])) AND (COALESCE(${ | ||
where.expires | ||
}, NULL) IS NULL OR ss.expires = ${where.expires}) AND (COALESCE(${ | ||
where.expiresGreaterThan | ||
}, NULL) IS NULL OR ss.expires > ${ | ||
where.expiresGreaterThan | ||
}) AND (COALESCE(${where.expiresLowerThan}, NULL) IS NULL OR ss.expires < ${ | ||
where.expiresLowerThan | ||
}) | ||
RETURNING ss.id as "id", ss.expires as "expires", ss.data as "data", ss.created_at as "createdAt", ss.updated_at as "updatedAt"`; | ||
if (where.idIn !== undefined) { | ||
query += `ss."id" `; | ||
query += `= ANY $${idx++}::uuid[]`; | ||
argList.push(where.idIn); | ||
query += " AND "; | ||
} | ||
if (where.expires !== undefined) { | ||
query += `ss."expires" `; | ||
query += `= $${idx++}`; | ||
argList.push(where.expires); | ||
query += " AND "; | ||
} | ||
if (where.expiresGreaterThan !== undefined) { | ||
query += `ss."expires" `; | ||
query += `> $${idx++}`; | ||
argList.push(where.expiresGreaterThan); | ||
query += " AND "; | ||
} | ||
if (where.expiresLowerThan !== undefined) { | ||
query += `ss."expires" `; | ||
query += `< $${idx++}`; | ||
argList.push(where.expiresLowerThan); | ||
query += " AND "; | ||
} | ||
query = query.substring(0, query.length - 4); | ||
query += ` RETURNING ss."id", ss."expires", ss."data", ss."createdAt", ss."updatedAt"`; | ||
return sql.unsafe(query, argList); | ||
}, | ||
@@ -317,17 +397,12 @@ | ||
sessionStoreUpsert: (sql, it) => { | ||
const data = { | ||
expires: it.expires ?? undefined, | ||
data: JSON.stringify(it.data ?? {}), | ||
created_at: it.createdAt ?? new Date(), | ||
updated_at: it.updatedAt ?? new Date(), | ||
}; | ||
data.id = it.id || uuid(); | ||
return sql`INSERT INTO session_store ${sql( | ||
data, | ||
"id", | ||
"expires", | ||
"data", | ||
"created_at", | ||
"updated_at", | ||
)} ON CONFLICT (id) DO UPDATE SET expires = EXCLUDED.expires, data = EXCLUDED.data, updated_at = EXCLUDED.updated_at RETURNING id as "id", expires as "expires", data as "data", created_at as "createdAt", updated_at as "updatedAt"`; | ||
return sql` | ||
INSERT INTO "sessionStore" ("id", "expires", "data", "createdAt", "updatedAt" | ||
) VALUES ( | ||
${it.id ?? uuid()}, ${it.expires ?? undefined}, ${JSON.stringify( | ||
it.data ?? {}, | ||
)}, ${it.createdAt ?? new Date()}, ${it.updatedAt ?? new Date()} | ||
) ON CONFLICT("id") DO UPDATE SET | ||
"expires" = EXCLUDED."expires", "data" = EXCLUDED."data", "updatedAt" = EXCLUDED."updatedAt" | ||
RETURNING "id", "expires", "data", "createdAt", "updatedAt" | ||
`; | ||
}, | ||
@@ -342,17 +417,12 @@ | ||
sessionStoreUpsertByExpires: (sql, it) => { | ||
const data = { | ||
expires: it.expires ?? undefined, | ||
data: JSON.stringify(it.data ?? {}), | ||
created_at: it.createdAt ?? new Date(), | ||
updated_at: it.updatedAt ?? new Date(), | ||
}; | ||
data.id = it.id || uuid(); | ||
return sql`INSERT INTO session_store ${sql( | ||
data, | ||
"id", | ||
"expires", | ||
"data", | ||
"created_at", | ||
"updated_at", | ||
)} ON CONFLICT (expires) DO UPDATE SET data = EXCLUDED.data, updated_at = EXCLUDED.updated_at RETURNING id as "id", expires as "expires", data as "data", created_at as "createdAt", updated_at as "updatedAt"`; | ||
return sql` | ||
INSERT INTO "sessionStore" ("id", "expires", "data", "createdAt", "updatedAt" | ||
) VALUES ( | ||
${it.id ?? uuid()}, ${it.expires ?? undefined}, ${JSON.stringify( | ||
it.data ?? {}, | ||
)}, ${it.createdAt ?? new Date()}, ${it.updatedAt ?? new Date()} | ||
) ON CONFLICT("expires") DO UPDATE SET | ||
"data" = EXCLUDED."data", "updatedAt" = EXCLUDED."updatedAt" | ||
RETURNING "id", "expires", "data", "createdAt", "updatedAt" | ||
`; | ||
}, | ||
@@ -368,12 +438,12 @@ | ||
where, | ||
) => sql`SELECT jq.id as "id", jq.is_complete as "isComplete", jq.priority as "priority", jq.scheduled_at as "scheduledAt", jq.name as "name", jq.data as "data", jq.created_at as "createdAt", jq.updated_at as "updatedAt" FROM job_queue jq | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR jq.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idGreaterThan}, NULL) IS NULL OR jq.id > ${ | ||
where.idGreaterThan | ||
}) AND (COALESCE(${where.idLowerThan}, NULL) IS NULL OR jq.id < ${ | ||
where.idLowerThan | ||
}) AND (COALESCE(${where.name}, NULL) IS NULL OR jq.name = ${ | ||
where.name | ||
}) AND (COALESCE(${where.nameLike}, NULL) IS NULL OR jq.name LIKE ${ | ||
) => sql`SELECT jq."id", jq."isComplete", jq."priority", jq."scheduledAt", jq."name", jq."data", jq."createdAt", jq."updatedAt" FROM "jobQueue" jq | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR jq."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${where.idGreaterThan ?? null}, NULL) IS NULL OR jq."id" > ${ | ||
where.idGreaterThan ?? null | ||
}) AND (COALESCE(${where.idLowerThan ?? null}, NULL) IS NULL OR jq."id" < ${ | ||
where.idLowerThan ?? null | ||
}) AND (COALESCE(${where.name ?? null}, NULL) IS NULL OR jq."name" = ${ | ||
where.name ?? null | ||
}) AND (COALESCE(${where.nameLike ?? null}, NULL) IS NULL OR jq."name" LIKE ${ | ||
"%" + where.nameLike + "%" | ||
@@ -389,16 +459,18 @@ }) | ||
jobQueueCount: async (sql, where) => { | ||
const result = await sql`SELECT count(*) as gen_count FROM job_queue jq | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR jq.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idGreaterThan}, NULL) IS NULL OR jq.id > ${ | ||
where.idGreaterThan | ||
}) AND (COALESCE(${where.idLowerThan}, NULL) IS NULL OR jq.id < ${ | ||
where.idLowerThan | ||
}) AND (COALESCE(${where.name}, NULL) IS NULL OR jq.name = ${ | ||
where.name | ||
}) AND (COALESCE(${where.nameLike}, NULL) IS NULL OR jq.name LIKE ${ | ||
"%" + where.nameLike + "%" | ||
}) | ||
const result = await sql`SELECT count(*) as "genCount" FROM "jobQueue" jq | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR jq."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${ | ||
where.idGreaterThan ?? null | ||
}, NULL) IS NULL OR jq."id" > ${ | ||
where.idGreaterThan ?? null | ||
}) AND (COALESCE(${where.idLowerThan ?? null}, NULL) IS NULL OR jq."id" < ${ | ||
where.idLowerThan ?? null | ||
}) AND (COALESCE(${where.name ?? null}, NULL) IS NULL OR jq."name" = ${ | ||
where.name ?? null | ||
}) AND (COALESCE(${ | ||
where.nameLike ?? null | ||
}, NULL) IS NULL OR jq."name" LIKE ${"%" + where.nameLike + "%"}) | ||
`; | ||
return result?.[0]?.gen_count ?? 0; | ||
return result?.[0]?.genCount ?? 0; | ||
}, | ||
@@ -411,12 +483,12 @@ | ||
*/ | ||
jobQueueDelete: (sql, where) => sql`DELETE FROM job_queue jq | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR jq.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idGreaterThan}, NULL) IS NULL OR jq.id > ${ | ||
where.idGreaterThan | ||
}) AND (COALESCE(${where.idLowerThan}, NULL) IS NULL OR jq.id < ${ | ||
where.idLowerThan | ||
}) AND (COALESCE(${where.name}, NULL) IS NULL OR jq.name = ${ | ||
where.name | ||
}) AND (COALESCE(${where.nameLike}, NULL) IS NULL OR jq.name LIKE ${ | ||
jobQueueDelete: (sql, where) => sql`DELETE FROM "jobQueue" jq | ||
WHERE (COALESCE(${where.id ?? null}, NULL) IS NULL OR jq."id" = ${ | ||
where.id ?? null | ||
}) AND (COALESCE(${where.idGreaterThan ?? null}, NULL) IS NULL OR jq."id" > ${ | ||
where.idGreaterThan ?? null | ||
}) AND (COALESCE(${where.idLowerThan ?? null}, NULL) IS NULL OR jq."id" < ${ | ||
where.idLowerThan ?? null | ||
}) AND (COALESCE(${where.name ?? null}, NULL) IS NULL OR jq."name" = ${ | ||
where.name ?? null | ||
}) AND (COALESCE(${where.nameLike ?? null}, NULL) IS NULL OR jq."name" LIKE ${ | ||
"%" + where.nameLike + "%" | ||
@@ -433,24 +505,24 @@ }) | ||
const data = Array.isArray(insert) ? insert : [insert]; | ||
const input = []; | ||
if (data.length === 0) { | ||
return []; | ||
} | ||
let query = `INSERT INTO "jobQueue" ("isComplete", "priority", "scheduledAt", "name", "data", "createdAt", "updatedAt") VALUES `; | ||
const argList = []; | ||
let idx = 1; | ||
for (const it of data) { | ||
input.push({ | ||
is_complete: it.isComplete ?? false, | ||
priority: it.priority ?? 0, | ||
scheduled_at: it.scheduledAt ?? new Date(), | ||
name: it.name ?? undefined, | ||
data: JSON.stringify(it.data ?? {}), | ||
created_at: it.createdAt ?? new Date(), | ||
updated_at: it.updatedAt ?? new Date(), | ||
}); | ||
argList.push( | ||
it.isComplete ?? false, | ||
it.priority ?? 0, | ||
it.scheduledAt ?? new Date(), | ||
it.name ?? undefined, | ||
JSON.stringify(it.data ?? {}), | ||
it.createdAt ?? new Date(), | ||
it.updatedAt ?? new Date(), | ||
); | ||
query += `($${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}),`; | ||
} | ||
return sql`INSERT INTO job_queue ${sql( | ||
input, | ||
"is_complete", | ||
"priority", | ||
"scheduled_at", | ||
"name", | ||
"data", | ||
"created_at", | ||
"updated_at", | ||
)} RETURNING id as "id", is_complete as "isComplete", priority as "priority", scheduled_at as "scheduledAt", name as "name", data as "data", created_at as "createdAt", updated_at as "updatedAt"`; | ||
// Remove trailing comma | ||
query = query.substring(0, query.length - 1); | ||
query += ` RETURNING "id", "isComplete", "priority", "scheduledAt", "name", "data", "createdAt", "updatedAt"`; | ||
return sql.unsafe(query, argList); | ||
}, | ||
@@ -465,41 +537,66 @@ | ||
jobQueueUpdate: (sql, value, where) => { | ||
const updateValue = {}; | ||
let query = `UPDATE "jobQueue" jq SET `; | ||
const argList = []; | ||
let idx = 1; | ||
if (value["isComplete"] !== undefined) { | ||
updateValue["is_complete"] = value["isComplete"]; | ||
query += `"isComplete" = $${idx++}, `; | ||
argList.push(value["isComplete"]); | ||
} | ||
if (value["priority"] !== undefined) { | ||
updateValue["priority"] = value["priority"]; | ||
query += `"priority" = $${idx++}, `; | ||
argList.push(value["priority"]); | ||
} | ||
if (value["scheduledAt"] !== undefined) { | ||
updateValue["scheduled_at"] = value["scheduledAt"]; | ||
query += `"scheduledAt" = $${idx++}, `; | ||
argList.push(value["scheduledAt"]); | ||
} | ||
if (value["name"] !== undefined) { | ||
updateValue["name"] = value["name"]; | ||
query += `"name" = $${idx++}, `; | ||
argList.push(value["name"]); | ||
} | ||
if (value["data"] !== undefined) { | ||
updateValue["data"] = JSON.stringify(value["data"]); | ||
query += `"data" = $${idx++}, `; | ||
argList.push(JSON.stringify(value["data"])); | ||
} | ||
if (value["createdAt"] !== undefined) { | ||
updateValue["created_at"] = value["createdAt"]; | ||
query += `"createdAt" = $${idx++}, `; | ||
argList.push(value["createdAt"]); | ||
} | ||
if (value["updatedAt"] !== undefined) { | ||
updateValue["updated_at"] = value["updatedAt"]; | ||
query += `"updatedAt" = $${idx++}, `; | ||
argList.push(new Date()); | ||
query = query.substring(0, query.length - 2); | ||
query += ` WHERE `; | ||
if (where.id !== undefined) { | ||
query += `jq."id" `; | ||
query += `= $${idx++}`; | ||
argList.push(where.id); | ||
query += " AND "; | ||
} | ||
updateValue.updated_at = new Date(); | ||
return sql`UPDATE job_queue jq set ${sql( | ||
updateValue, | ||
...Object.keys(updateValue), | ||
)} | ||
WHERE (COALESCE(${where.id}, NULL) IS NULL OR jq.id = ${ | ||
where.id | ||
}) AND (COALESCE(${where.idGreaterThan}, NULL) IS NULL OR jq.id > ${ | ||
where.idGreaterThan | ||
}) AND (COALESCE(${where.idLowerThan}, NULL) IS NULL OR jq.id < ${ | ||
where.idLowerThan | ||
}) AND (COALESCE(${where.name}, NULL) IS NULL OR jq.name = ${ | ||
where.name | ||
}) AND (COALESCE(${where.nameLike}, NULL) IS NULL OR jq.name LIKE ${ | ||
"%" + where.nameLike + "%" | ||
}) | ||
RETURNING jq.id as "id", jq.is_complete as "isComplete", jq.priority as "priority", jq.scheduled_at as "scheduledAt", jq.name as "name", jq.data as "data", jq.created_at as "createdAt", jq.updated_at as "updatedAt"`; | ||
if (where.idGreaterThan !== undefined) { | ||
query += `jq."id" `; | ||
query += `> $${idx++}`; | ||
argList.push(where.idGreaterThan); | ||
query += " AND "; | ||
} | ||
if (where.idLowerThan !== undefined) { | ||
query += `jq."id" `; | ||
query += `< $${idx++}`; | ||
argList.push(where.idLowerThan); | ||
query += " AND "; | ||
} | ||
if (where.name !== undefined) { | ||
query += `jq."name" `; | ||
query += `= $${idx++}`; | ||
argList.push(where.name); | ||
query += " AND "; | ||
} | ||
if (where.nameLike !== undefined) { | ||
query += `jq."name" `; | ||
query += `LIKE $${idx++}`; | ||
argList.push(`%${where.nameLike}%`); | ||
query += " AND "; | ||
} | ||
query = query.substring(0, query.length - 4); | ||
query += ` RETURNING jq."id", jq."isComplete", jq."priority", jq."scheduledAt", jq."name", jq."data", jq."createdAt", jq."updatedAt"`; | ||
return sql.unsafe(query, argList); | ||
}, | ||
@@ -514,23 +611,14 @@ | ||
jobQueueUpsert: (sql, it) => { | ||
const data = { | ||
is_complete: it.isComplete ?? false, | ||
priority: it.priority ?? 0, | ||
scheduled_at: it.scheduledAt ?? new Date(), | ||
name: it.name ?? undefined, | ||
data: JSON.stringify(it.data ?? {}), | ||
created_at: it.createdAt ?? new Date(), | ||
updated_at: it.updatedAt ?? new Date(), | ||
}; | ||
data.id = it.id || uuid(); | ||
return sql`INSERT INTO job_queue ${sql( | ||
data, | ||
"id", | ||
"is_complete", | ||
"priority", | ||
"scheduled_at", | ||
"name", | ||
"data", | ||
"created_at", | ||
"updated_at", | ||
)} ON CONFLICT (id) DO UPDATE SET is_complete = EXCLUDED.is_complete, priority = EXCLUDED.priority, scheduled_at = EXCLUDED.scheduled_at, name = EXCLUDED.name, data = EXCLUDED.data, updated_at = EXCLUDED.updated_at RETURNING id as "id", is_complete as "isComplete", priority as "priority", scheduled_at as "scheduledAt", name as "name", data as "data", created_at as "createdAt", updated_at as "updatedAt"`; | ||
return sql` | ||
INSERT INTO "jobQueue" ("id", "isComplete", "priority", "scheduledAt", "name", "data", "createdAt", "updatedAt" | ||
) VALUES ( | ||
${it.id ?? uuid()}, ${it.isComplete ?? false}, ${it.priority ?? 0}, ${ | ||
it.scheduledAt ?? new Date() | ||
}, ${it.name ?? undefined}, ${JSON.stringify(it.data ?? {})}, ${ | ||
it.createdAt ?? new Date() | ||
}, ${it.updatedAt ?? new Date()} | ||
) ON CONFLICT("id") DO UPDATE SET | ||
"isComplete" = EXCLUDED."isComplete", "priority" = EXCLUDED."priority", "scheduledAt" = EXCLUDED."scheduledAt", "name" = EXCLUDED."name", "data" = EXCLUDED."data", "updatedAt" = EXCLUDED."updatedAt" | ||
RETURNING "id", "isComplete", "priority", "scheduledAt", "name", "data", "createdAt", "updatedAt" | ||
`; | ||
}, | ||
@@ -545,24 +633,15 @@ | ||
jobQueueUpsertByName: (sql, it) => { | ||
const data = { | ||
is_complete: it.isComplete ?? false, | ||
priority: it.priority ?? 0, | ||
scheduled_at: it.scheduledAt ?? new Date(), | ||
name: it.name ?? undefined, | ||
data: JSON.stringify(it.data ?? {}), | ||
created_at: it.createdAt ?? new Date(), | ||
updated_at: it.updatedAt ?? new Date(), | ||
}; | ||
data.id = it.id || uuid(); | ||
return sql`INSERT INTO job_queue ${sql( | ||
data, | ||
"id", | ||
"is_complete", | ||
"priority", | ||
"scheduled_at", | ||
"name", | ||
"data", | ||
"created_at", | ||
"updated_at", | ||
)} ON CONFLICT (name) DO UPDATE SET is_complete = EXCLUDED.is_complete, priority = EXCLUDED.priority, scheduled_at = EXCLUDED.scheduled_at, data = EXCLUDED.data, updated_at = EXCLUDED.updated_at RETURNING id as "id", is_complete as "isComplete", priority as "priority", scheduled_at as "scheduledAt", name as "name", data as "data", created_at as "createdAt", updated_at as "updatedAt"`; | ||
return sql` | ||
INSERT INTO "jobQueue" ("id", "isComplete", "priority", "scheduledAt", "name", "data", "createdAt", "updatedAt" | ||
) VALUES ( | ||
${it.id ?? uuid()}, ${it.isComplete ?? false}, ${it.priority ?? 0}, ${ | ||
it.scheduledAt ?? new Date() | ||
}, ${it.name ?? undefined}, ${JSON.stringify(it.data ?? {})}, ${ | ||
it.createdAt ?? new Date() | ||
}, ${it.updatedAt ?? new Date()} | ||
) ON CONFLICT("name") DO UPDATE SET | ||
"isComplete" = EXCLUDED."isComplete", "priority" = EXCLUDED."priority", "scheduledAt" = EXCLUDED."scheduledAt", "data" = EXCLUDED."data", "updatedAt" = EXCLUDED."updatedAt" | ||
RETURNING "id", "isComplete", "priority", "scheduledAt", "name", "data", "createdAt", "updatedAt" | ||
`; | ||
}, | ||
}; |
@@ -1,5 +0,5 @@ | ||
import { dirnameForModule } from "@lbu/stdlib"; | ||
import { createHash } from "crypto"; | ||
import { existsSync, promises as fs } from "fs"; | ||
import path from "path"; | ||
import { dirnameForModule } from "@lbu/stdlib"; | ||
@@ -168,3 +168,3 @@ /** | ||
FROM migrations | ||
ORDER BY namespace, number, created_at DESC`; | ||
ORDER BY namespace, number, "createdAt" DESC`; | ||
} catch (e) { | ||
@@ -251,6 +251,6 @@ if ((e.message ?? "").indexOf(`"migrations" does not exist`) === -1) { | ||
const exports = await import(sub); | ||
if (exports && exports.migrations) { | ||
const exportedItems = await import(sub); | ||
if (exportedItems && exportedItems.migrations) { | ||
const subResult = await readMigrationsDir( | ||
exports.migrations, | ||
exportedItems.migrations, | ||
sub, | ||
@@ -257,0 +257,0 @@ namespaces, |
@@ -5,10 +5,10 @@ import { storeQueries } from "./generated/queries.js"; | ||
// Should only run in a transaction | ||
getAnyJob: (sql) => sql`UPDATE job_queue | ||
SET is_complete = TRUE, | ||
updated_at = now() | ||
WHERE id = (SELECT id | ||
FROM job_queue | ||
WHERE NOT is_complete | ||
AND scheduled_at < now() | ||
ORDER BY scheduled_at, priority | ||
getAnyJob: (sql) => sql`UPDATE "jobQueue" | ||
SET "isComplete" = TRUE, | ||
"updatedAt" = now() | ||
WHERE id = (SELECT "id" | ||
FROM "jobQueue" | ||
WHERE NOT "isComplete" | ||
AND "scheduledAt" < now() | ||
ORDER BY "scheduledAt", "priority" | ||
FOR UPDATE SKIP LOCKED | ||
@@ -19,14 +19,14 @@ LIMIT 1) | ||
// Should only run in a transaction | ||
getJobByName: (name, sql) => sql`UPDATE job_queue | ||
SET is_complete = TRUE, | ||
updated_at = now() | ||
WHERE id = (SELECT id | ||
FROM job_queue | ||
WHERE NOT is_complete | ||
AND scheduled_at < now() | ||
AND name = ${name} | ||
ORDER BY scheduled_at, priority | ||
getJobByName: (name, sql) => sql`UPDATE jobQueue | ||
SET "isComplete" = TRUE, | ||
"updatedAt" = now() | ||
WHERE id = (SELECT "id" | ||
FROM "jobQueue" | ||
WHERE NOT "isComplete" | ||
AND "scheduledAt" < now() | ||
AND "name" = ${name} | ||
ORDER BY "scheduledAt", "priority" | ||
FOR UPDATE SKIP LOCKED | ||
LIMIT 1) | ||
RETURNING id`, | ||
RETURNING "id"`, | ||
@@ -36,6 +36,6 @@ // Alternatively use COUNT with a WHERE and UNION all to calculate the same | ||
sql, | ||
) => sql`SELECT sum(CASE WHEN scheduled_at < now() THEN 1 ELSE 0 END) AS pending_count, | ||
sum(CASE WHEN scheduled_at >= now() THEN 1 ELSE 0 END) AS scheduled_count | ||
FROM job_queue | ||
WHERE NOT is_complete`, | ||
) => sql`SELECT sum(CASE WHEN "scheduledAt" < now() THEN 1 ELSE 0 END) AS "pendingCount", | ||
sum(CASE WHEN "scheduledAt" >= now() THEN 1 ELSE 0 END) AS "scheduledCount" | ||
FROM "jobQueue" | ||
WHERE NOT "isComplete"`, | ||
@@ -46,14 +46,14 @@ // Alternatively use COUNT with a WHERE and UNION all to calculate the same | ||
name, | ||
) => sql`SELECT sum(CASE WHEN scheduled_at < now() THEN 1 ELSE 0 END) AS pending_count, | ||
sum(CASE WHEN scheduled_at >= now() THEN 1 ELSE 0 END) AS scheduled_count | ||
FROM job_queue | ||
WHERE NOT is_complete AND name = ${name}`, | ||
) => sql`SELECT sum(CASE WHEN "scheduledAt" < now() THEN 1 ELSE 0 END) AS "pendingCount", | ||
sum(CASE WHEN "scheduledAt" >= now() THEN 1 ELSE 0 END) AS "scheduledCount" | ||
FROM "jobQueue" | ||
WHERE NOT "isComplete" AND "name" = ${name}`, | ||
// Returns time in milliseconds | ||
getAverageJobTime: (sql, dateStart, dateEnd) => | ||
sql`SELECT avg((EXTRACT(EPOCH FROM updated_at AT TIME ZONE 'UTC') * 1000) - (EXTRACT(EPOCH FROM scheduled_at AT TIME ZONE 'UTC') * 1000)) AS completion_time FROM job_queue WHERE is_complete AND updated_at > ${dateStart} AND updated_at <= ${dateEnd};`, | ||
sql`SELECT avg((EXTRACT(EPOCH FROM "updatedAt" AT TIME ZONE 'UTC') * 1000) - (EXTRACT(EPOCH FROM "scheduledAt" AT TIME ZONE 'UTC') * 1000)) AS "completionTime" FROM "jobQueue" WHERE "isComplete" AND "updatedAt" > ${dateStart} AND "updatedAt" <= ${dateEnd};`, | ||
// Returns time in milliseconds | ||
getAverageJobTimeForName: (sql, name, dateStart, dateEnd) => | ||
sql`SELECT avg((EXTRACT(EPOCH FROM updated_at AT TIME ZONE 'UTC') * 1000) - (EXTRACT(EPOCH FROM scheduled_at AT TIME ZONE 'UTC') * 1000)) AS completion_time FROM job_queue WHERE is_complete AND name = ${name} AND updated_at > ${dateStart} AND updated_at <= ${dateEnd};`, | ||
sql`SELECT avg((EXTRACT(EPOCH FROM "updatedAt" AT TIME ZONE 'UTC') * 1000) - (EXTRACT(EPOCH FROM "scheduledAt" AT TIME ZONE 'UTC') * 1000)) AS "completionTime" FROM "jobQueue" WHERE "isComplete" AND name = ${name} AND "updatedAt" > ${dateStart} AND "updatedAt" <= ${dateEnd};`, | ||
}; | ||
@@ -68,4 +68,4 @@ | ||
* @property {number} id | ||
* @property {Date} created_at | ||
* @property {Date} scheduled_at | ||
* @property {Date} createdAt | ||
* @property {Date} scheduledAt | ||
* @property {string} name | ||
@@ -175,3 +175,3 @@ * @property {object} data | ||
* @public | ||
* @returns {Promise<{pending_count: number, scheduled_count: number}|undefined>} | ||
* @returns {Promise<{pendingCount: number, scheduledCount: number}|undefined>} | ||
*/ | ||
@@ -321,4 +321,4 @@ pendingQueueSize() { | ||
return { | ||
pendingCount: result?.pending_count ?? 0, | ||
scheduledCount: result?.scheduled_count ?? 0, | ||
pendingCount: parseInt(result?.pendingCount ?? 0, 10), | ||
scheduledCount: parseInt(result?.scheduledCount ?? 0, 10), | ||
}; | ||
@@ -339,4 +339,4 @@ } | ||
return { | ||
pendingCount: result?.pending_count ?? 0, | ||
scheduledCount: result?.scheduled_count ?? 0, | ||
pendingCount: parseInt(result?.pendingCount ?? 0, 10), | ||
scheduledCount: parseInt(result?.scheduledCount ?? 0, 10), | ||
}; | ||
@@ -357,3 +357,3 @@ } | ||
return result?.completion_time ?? 0; | ||
return parseFloat(result?.completionTime ?? 0); | ||
} | ||
@@ -383,3 +383,3 @@ | ||
); | ||
return result?.completion_time ?? 0; | ||
return result?.completionTime ?? 0; | ||
} |
@@ -38,7 +38,7 @@ import { log } from "@lbu/insight"; | ||
const tableNames = schemas | ||
.map((it) => it.table_name) | ||
.filter((it) => it !== "migrations"); | ||
.filter((it) => it.table_name !== "migrations") | ||
.map((it) => `"${it.table_name}"`); | ||
if (tableNames.length > 0) { | ||
await sql`TRUNCATE ${sql(tableNames)} CASCADE `; | ||
await sql.unsafe(`TRUNCATE ${tableNames.join(", ")} CASCADE`); | ||
} else { | ||
@@ -45,0 +45,0 @@ // Just a query to initialize the connection |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
70440
1939
19
113
+ Added@lbu/insight@0.0.44(transitive)
+ Added@lbu/stdlib@0.0.44(transitive)
+ Added@types/node@14.0.23(transitive)
+ Addedpostgres@2.0.0-beta.0(transitive)
- Removed@lbu/insight@0.0.43(transitive)
- Removed@lbu/stdlib@0.0.43(transitive)
- Removed@types/node@14.0.22(transitive)
- Removedend-of-stream@1.4.4(transitive)
- Removedonce@1.4.0(transitive)
- Removedpostgres@1.0.2(transitive)
- Removedpump@3.0.0(transitive)
- Removedsplit2@3.1.1(transitive)
- Removedwrappy@1.0.2(transitive)
Updated@lbu/insight@0.0.44
Updated@lbu/stdlib@0.0.44
Updatedpostgres@2.0.0-beta.0