simple-async-tasks
Advanced tools
Comparing version 1.3.4 to 1.3.5
import { HasMetadata } from 'type-fns'; | ||
import { AsyncTask } from './objects/AsyncTask'; | ||
export type AsyncTaskDaoDatabaseConnection = Record<string, any>; | ||
export type AsyncTaskDaoContext = Record<string, any> | void; | ||
/** | ||
@@ -11,10 +11,7 @@ * a dao that can be used to persist the async-task | ||
*/ | ||
export interface AsyncTaskDao<T extends AsyncTask, U extends Partial<T>, D extends AsyncTaskDaoDatabaseConnection | undefined> { | ||
findByUnique: (args: U & { | ||
dbConnection?: D; | ||
}) => Promise<HasMetadata<T> | null>; | ||
upsert: (args: { | ||
export interface AsyncTaskDao<T extends AsyncTask, U extends Partial<T>, C extends AsyncTaskDaoContext> { | ||
findByUnique: (input: U, context: C) => Promise<HasMetadata<T> | null>; | ||
upsert: (input: { | ||
task: T; | ||
dbConnection?: D; | ||
}) => Promise<HasMetadata<T>>; | ||
}, context: C) => Promise<HasMetadata<T>>; | ||
} |
@@ -5,2 +5,2 @@ export { extractTaskFromSqsEvent } from './logic/extractTaskFromSqsEvent'; | ||
export { AsyncTask, AsyncTaskStatus } from './domain/objects/AsyncTask'; | ||
export { AsyncTaskDao, AsyncTaskDaoDatabaseConnection, } from './domain/constants'; | ||
export { AsyncTaskDao, AsyncTaskDaoContext } from './domain/constants'; |
import type { LogMethods } from 'simple-leveled-log-methods'; | ||
import { HasMetadata } from 'type-fns'; | ||
import { AsyncTaskDao, AsyncTaskDaoDatabaseConnection } from '../domain/constants'; | ||
import { AsyncTaskDao, AsyncTaskDaoContext } from '../domain/constants'; | ||
import { AsyncTask } from '../domain/objects/AsyncTask'; | ||
@@ -51,9 +51,7 @@ /** | ||
*/ | ||
export declare const withAsyncTaskExecutionLifecycleEnqueue: <T extends AsyncTask, U extends Partial<T>, D extends AsyncTaskDaoDatabaseConnection | undefined, P extends { | ||
dbConnection?: D | undefined; | ||
} & U>({ getNew, dao, log, queue, }: { | ||
getNew: (args: P) => T | Promise<T>; | ||
dao: AsyncTaskDao<T, U, D>; | ||
export declare const withAsyncTaskExecutionLifecycleEnqueue: <T extends AsyncTask, U extends Partial<T>, C extends AsyncTaskDaoContext>({ getNew, dao, log, queue, }: { | ||
getNew: (input: U, context: C) => T | Promise<T>; | ||
dao: AsyncTaskDao<T, U, C>; | ||
log: LogMethods; | ||
queue: SimpleAsyncTaskSqsQueueContract | SimpleAsyncTaskAnyQueueContract<T>; | ||
}) => (args: P) => Promise<HasMetadata<T, "id" | "uuid" | "createdAt" | "updatedAt" | "deletedAt" | "effectiveAt">>; | ||
}) => (input: U, context: C) => Promise<HasMetadata<T, "id" | "uuid" | "createdAt" | "updatedAt" | "deletedAt" | "effectiveAt">>; |
@@ -36,5 +36,5 @@ "use strict"; | ||
const withAsyncTaskExecutionLifecycleEnqueue = ({ getNew, dao, log, queue, }) => { | ||
return (args) => __awaiter(void 0, void 0, void 0, function* () { | ||
return (input, context) => __awaiter(void 0, void 0, void 0, function* () { | ||
// try to find the task by unique | ||
const taskFound = yield dao.findByUnique(Object.assign(Object.assign({}, args), { dbConnection: args.dbConnection })); | ||
const taskFound = yield dao.findByUnique(Object.assign({}, input), context); | ||
// if the task already exists, check that its in a queueable state | ||
@@ -66,3 +66,3 @@ if ((taskFound === null || taskFound === void 0 ? void 0 : taskFound.status) === AsyncTask_1.AsyncTaskStatus.QUEUED) { | ||
// if the task does not exist, create the task with the new initial state | ||
const taskReadyToQueue = taskFound !== null && taskFound !== void 0 ? taskFound : (yield getNew(args)); // note: we dont save to the database yet to prevent duplicate calls but also because if this is called within a transaction, the effective_at time is the same for the initial version and the queued version, leading to duplicate-key violations on the version table | ||
const taskReadyToQueue = taskFound !== null && taskFound !== void 0 ? taskFound : (yield getNew(input, context)); // note: we dont save to the database yet to prevent duplicate calls but also because if this is called within a transaction, the effective_at time is the same for the initial version and the queued version, leading to duplicate-key violations on the version table | ||
// now queue the task into sqs | ||
@@ -88,5 +88,4 @@ const taskToQueue = Object.assign(Object.assign({}, taskReadyToQueue), { status: AsyncTask_1.AsyncTaskStatus.QUEUED }); | ||
return yield dao.upsert({ | ||
dbConnection: args.dbConnection, | ||
task: taskToQueue, | ||
}); | ||
}, context); | ||
}); | ||
@@ -93,0 +92,0 @@ }; |
import { HelpfulError } from '@ehmpathy/error-fns'; | ||
import type { LogMethods } from 'simple-leveled-log-methods'; | ||
import { HasMetadata } from 'type-fns'; | ||
import { AsyncTaskDao, AsyncTaskDaoDatabaseConnection } from '../domain/constants'; | ||
import { AsyncTaskDao, AsyncTaskDaoContext } from '../domain/constants'; | ||
import { AsyncTask } from '../domain/objects/AsyncTask'; | ||
@@ -26,9 +26,8 @@ export declare class SimpleAsyncTaskRetryLaterError extends HelpfulError { | ||
*/ | ||
export declare const withAsyncTaskExecutionLifecycleExecute: <T extends AsyncTask, U extends Partial<T>, D extends AsyncTaskDaoDatabaseConnection | undefined, P extends { | ||
dbConnection?: D | undefined; | ||
export declare const withAsyncTaskExecutionLifecycleExecute: <T extends AsyncTask, U extends Partial<T>, I extends { | ||
task: T; | ||
}, R extends Record<string, any>>(logic: (args: P & { | ||
}, C extends AsyncTaskDaoContext, O extends Record<string, any>>(logic: (input: I & { | ||
task: HasMetadata<T, "id" | "uuid" | "createdAt" | "updatedAt" | "deletedAt" | "effectiveAt">; | ||
}) => R | Promise<R>, { dao, log, options, }: { | ||
dao: AsyncTaskDao<T, U, D>; | ||
}, context: C) => O | Promise<O>, { dao, log, options, }: { | ||
dao: AsyncTaskDao<T, U, C>; | ||
log: LogMethods; | ||
@@ -47,3 +46,3 @@ options?: { | ||
} | undefined; | ||
}) => (args: P) => Promise<(R & { | ||
}) => (input: I, context: C) => Promise<(O & { | ||
task: T; | ||
@@ -50,0 +49,0 @@ }) | { |
@@ -40,8 +40,8 @@ "use strict"; | ||
const withAsyncTaskExecutionLifecycleExecute = (logic, { dao, log, options, }) => { | ||
return (args) => __awaiter(void 0, void 0, void 0, function* () { | ||
return (input, context) => __awaiter(void 0, void 0, void 0, function* () { | ||
var _a, _b; | ||
// try to find the task by unique; it must be defined in db by now | ||
const foundTask = yield dao.findByUnique(Object.assign(Object.assign({}, args.task), { dbConnection: args.dbConnection })); | ||
const foundTask = yield dao.findByUnique(Object.assign({}, input.task), context); | ||
if (!foundTask) | ||
throw new error_fns_1.BadRequestError(`task not found by unique: '${JSON.stringify(args.task)}'`); | ||
throw new error_fns_1.BadRequestError(`task not found by unique: '${JSON.stringify(input.task)}'`); | ||
// check that the task is not already being attempted | ||
@@ -70,10 +70,9 @@ if (foundTask.status === AsyncTask_1.AsyncTaskStatus.ATTEMPTED) { | ||
const attemptedTask = yield dao.upsert({ | ||
dbConnection: args.dbConnection, | ||
task: Object.assign(Object.assign({}, foundTask), { status: AsyncTask_1.AsyncTaskStatus.ATTEMPTED }), | ||
}); | ||
}, context); | ||
// try and run the logic with db connection in a txn | ||
try { | ||
const result = yield logic(Object.assign(Object.assign({}, args), { task: attemptedTask })); // execute the task | ||
const result = yield logic(Object.assign(Object.assign({}, input), { task: attemptedTask }), context); // execute the task | ||
// if the status of the task was not changed from attempted, then throw an error; its the obligation of the execute function to specify what status the task is in now | ||
const taskNow = yield dao.findByUnique(Object.assign(Object.assign({}, args.task), { dbConnection: args.dbConnection })); | ||
const taskNow = yield dao.findByUnique(Object.assign({}, input.task), context); | ||
if (!taskNow) | ||
@@ -88,5 +87,4 @@ throw new Error('task can no longer be found by unique. this should not be possible'); // fail fast | ||
yield dao.upsert({ | ||
dbConnection: args.dbConnection, | ||
task: Object.assign(Object.assign({}, attemptedTask), { status: AsyncTask_1.AsyncTaskStatus.FAILED }), | ||
}); // record that it failed | ||
}, context); // record that it failed | ||
throw error; // and pass the error back up | ||
@@ -93,0 +91,0 @@ } |
@@ -5,3 +5,3 @@ { | ||
"description": "A simple in-memory queue, for nodejs and the browser, with consumers for common usecases.", | ||
"version": "1.3.4", | ||
"version": "1.3.5", | ||
"repository": "ehmpathy/simple-async-tasks", | ||
@@ -8,0 +8,0 @@ "homepage": "https://github.com/ehmpathy/simple-async-tasks", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
47423
579