@effect/sql
Advanced tools
| "use strict"; | ||
| Object.defineProperty(exports, "__esModule", { | ||
| value: true | ||
| }); | ||
| exports.make = exports.layerStore = void 0; | ||
| var PersistedQueue = _interopRequireWildcard(require("@effect/experimental/PersistedQueue")); | ||
| var Cause = _interopRequireWildcard(require("effect/Cause")); | ||
| var Data = _interopRequireWildcard(require("effect/Data")); | ||
| var Duration = _interopRequireWildcard(require("effect/Duration")); | ||
| var Effect = _interopRequireWildcard(require("effect/Effect")); | ||
| var Exit = _interopRequireWildcard(require("effect/Exit")); | ||
| var Layer = _interopRequireWildcard(require("effect/Layer")); | ||
| var Mailbox = _interopRequireWildcard(require("effect/Mailbox")); | ||
| var RcMap = _interopRequireWildcard(require("effect/RcMap")); | ||
| var Schedule = _interopRequireWildcard(require("effect/Schedule")); | ||
| var SqlClient = _interopRequireWildcard(require("./SqlClient.js")); | ||
| function _interopRequireWildcard(e, t) { if ("function" == typeof WeakMap) var r = new WeakMap(), n = new WeakMap(); return (_interopRequireWildcard = function (e, t) { if (!t && e && e.__esModule) return e; var o, i, f = { __proto__: null, default: e }; if (null === e || "object" != typeof e && "function" != typeof e) return f; if (o = t ? n : r) { if (o.has(e)) return o.get(e); o.set(e, f); } for (const t in e) "default" !== t && {}.hasOwnProperty.call(e, t) && ((i = (o = Object.defineProperty) && Object.getOwnPropertyDescriptor(e, t)) && (i.get || i.set) ? o(f, t, i) : f[t] = e[t]); return f; })(e, t); } | ||
| /** | ||
| * @since 1.0.0 | ||
| */ | ||
| /** | ||
| * @since 1.0.0 | ||
| * @category constructors | ||
| */ | ||
| const make = exports.make = /*#__PURE__*/Effect.fnUntraced(function* (options) { | ||
| const sql = (yield* SqlClient.SqlClient).withoutTransforms(); | ||
| const tableName = options?.tableName ?? "effect_queue"; | ||
| const tableNameSql = sql(tableName); | ||
| const pollInterval = options?.pollInterval ? Duration.decode(options.pollInterval) : Duration.millis(1000); | ||
| const pollBatchSize = options?.pollBatchSize ?? 1; | ||
| const pollBatchSizeSql = sql.literal(pollBatchSize.toString()); | ||
| const lockRefreshInterval = options?.lockRefreshInterval ? Duration.decode(options.lockRefreshInterval) : Duration.seconds(30); | ||
| const lockExpiration = options?.lockExpiration ? Duration.decode(options.lockExpiration) : Duration.minutes(2); | ||
| const lockExpirationSql = sql.literal(Math.ceil(Duration.toSeconds(lockExpiration)).toString()); | ||
| const workerId = crypto.randomUUID(); | ||
| const sqlNow = sql.onDialectOrElse({ | ||
| mssql: () => sql.literal("GETDATE()"), | ||
| mysql: () => sql.literal("NOW()"), | ||
| pg: () => sql.literal("NOW()"), | ||
| // sqlite | ||
| orElse: () => sql.literal("CURRENT_TIMESTAMP") | ||
| }); | ||
| const expiresAt = sql.onDialectOrElse({ | ||
| pg: () => sql`${sqlNow} - INTERVAL '${lockExpirationSql} seconds'`, | ||
| mysql: () => sql`DATE_SUB(${sqlNow}, INTERVAL ${lockExpirationSql} SECOND)`, | ||
| mssql: () => sql`DATEADD(SECOND, -${lockExpirationSql}, ${sqlNow})`, | ||
| orElse: () => sql`datetime(${sqlNow}, '-${lockExpirationSql} seconds')` | ||
| }); | ||
| yield* sql.onDialectOrElse({ | ||
| mysql: () => sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( | ||
| id VARCHAR(36) PRIMARY KEY, | ||
| queue_name VARCHAR(100) NOT NULL, | ||
| element TEXT NOT NULL, | ||
| completed BOOLEAN NOT NULL, | ||
| attempts INT NOT NULL DEFAULT 0, | ||
| last_failure TEXT NULL, | ||
| acquired_at DATETIME NULL, | ||
| acquired_by VARCHAR(36) NULL, | ||
| created_at DATETIME NOT NULL, | ||
| updated_at DATETIME NOT NULL | ||
| )`, | ||
| pg: () => sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( | ||
| id UUID PRIMARY KEY, | ||
| queue_name VARCHAR(100) NOT NULL, | ||
| element TEXT NOT NULL, | ||
| completed BOOLEAN NOT NULL, | ||
| attempts INTEGER NOT NULL DEFAULT 0, | ||
| last_failure TEXT NULL, | ||
| acquired_at TIMESTAMP NULL, | ||
| acquired_by UUID NULL, | ||
| created_at TIMESTAMP NOT NULL, | ||
| updated_at TIMESTAMP NOT NULL | ||
| )`, | ||
| mssql: () => sql`IF NOT EXISTS (SELECT * FROM sysobjects WHERE name=${tableNameSql} AND xtype='U') | ||
| CREATE TABLE ${tableNameSql} ( | ||
| id UNIQUEIDENTIFIER PRIMARY KEY, | ||
| queue_name NVARCHAR(100) NOT NULL, | ||
| element NVARCHAR(MAX) NOT NULL, | ||
| completed BIT NOT NULL, | ||
| attempts INT NOT NULL DEFAULT 0, | ||
| last_failure NVARCHAR(MAX) NULL, | ||
| acquired_at DATETIME2 NULL, | ||
| acquired_by UNIQUEIDENTIFIER NULL, | ||
| created_at DATETIME2 NOT NULL, | ||
| updated_at DATETIME2 NOT NULL | ||
| )`, | ||
| // sqlite | ||
| orElse: () => sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( | ||
| id TEXT PRIMARY KEY, | ||
| queue_name TEXT NOT NULL, | ||
| element TEXT NOT NULL, | ||
| completed BOOLEAN NOT NULL, | ||
| attempts INTEGER NOT NULL DEFAULT 0, | ||
| last_failure TEXT NULL, | ||
| acquired_at DATETIME NULL, | ||
| acquired_by TEXT NULL, | ||
| created_at DATETIME NOT NULL, | ||
| updated_at DATETIME NOT NULL | ||
| )` | ||
| }); | ||
| yield* sql.onDialectOrElse({ | ||
| mssql: () => sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_take') | ||
| CREATE INDEX idx_${tableNameSql}_take ON ${tableNameSql} (queue_name, completed, attempts, acquired_at)`, | ||
| mysql: () => sql`CREATE INDEX ${sql(`idx_${tableName}_take`)} ON ${tableNameSql} (queue_name, completed, attempts, acquired_at)`.pipe(Effect.ignore), | ||
| orElse: () => sql`CREATE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_take`)} ON ${tableNameSql} (queue_name, completed, attempts, acquired_at)` | ||
| }); | ||
| yield* sql.onDialectOrElse({ | ||
| mssql: () => sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_update') | ||
| CREATE INDEX ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (id, acquired_by)`, | ||
| mysql: () => sql`CREATE INDEX ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (id, acquired_by)`.pipe(Effect.ignore), | ||
| orElse: () => sql`CREATE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (id, acquired_by)` | ||
| }); | ||
| const wrapString = sql.onDialectOrElse({ | ||
| mssql: () => s => `N'${s}'`, | ||
| orElse: () => s => `'${s}'` | ||
| }); | ||
| const stringLiteral = s => sql.literal(wrapString(s)); | ||
| const stringLiteralArr = arr => sql.literal(`(${arr.map(wrapString).join(",")})`); | ||
| const sqlTrue = sql.onDialectOrElse({ | ||
| sqlite: () => sql.literal("1"), | ||
| orElse: () => sql.literal("TRUE") | ||
| }); | ||
| const sqlFalse = sql.onDialectOrElse({ | ||
| sqlite: () => sql.literal("0"), | ||
| orElse: () => sql.literal("FALSE") | ||
| }); | ||
| const workerIdSql = stringLiteral(workerId); | ||
| const elementIds = new Set(); | ||
| const refreshLocks = () => { | ||
| if (elementIds.size === 0) { | ||
| return Effect.void; | ||
| } | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow} | ||
| WHERE acquired_by = ${workerIdSql} | ||
| `; | ||
| }; | ||
| const complete = (id, attempts) => { | ||
| elementIds.delete(id); | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = NULL, acquired_by = NULL, updated_at = ${sqlNow}, completed = ${sqlTrue}, attempts = ${attempts} | ||
| WHERE id = ${id} | ||
| AND acquired_by = ${workerIdSql} | ||
| `.pipe(Effect.retry({ | ||
| times: 5, | ||
| schedule: Schedule.exponential(100, 1.5) | ||
| }), Effect.orDie); | ||
| }; | ||
| const retry = (id, attempts, cause) => { | ||
| elementIds.delete(id); | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = NULL, acquired_by = NULL, updated_at = ${sqlNow}, attempts = ${attempts}, last_failure = ${Cause.pretty(cause, { | ||
| renderErrorCause: true | ||
| })} | ||
| WHERE id = ${id} | ||
| AND acquired_by = ${workerIdSql} | ||
| `.pipe(Effect.retry({ | ||
| times: 5, | ||
| schedule: Schedule.exponential(100, 1.5) | ||
| }), Effect.orDie); | ||
| }; | ||
| const interrupt = ids => { | ||
| for (const id of ids) { | ||
| elementIds.delete(id); | ||
| } | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = NULL, acquired_by = NULL | ||
| WHERE id IN ${stringLiteralArr(ids)} | ||
| AND acquired_by = ${workerIdSql} | ||
| `.pipe(Effect.retry({ | ||
| times: 5, | ||
| schedule: Schedule.exponential(100, 1.5) | ||
| }), Effect.orDie); | ||
| }; | ||
| yield* Effect.suspend(refreshLocks).pipe(Effect.tapErrorCause(Effect.logWarning), Effect.retry(Schedule.spaced(500)), Effect.scheduleForked(Schedule.fixed(lockRefreshInterval)), Effect.annotateLogs({ | ||
| package: "@effect/sql", | ||
| module: "SqlPersistedQueue", | ||
| fiber: "refreshLocks" | ||
| }), Effect.interruptible); | ||
| const mailboxes = yield* RcMap.make({ | ||
| lookup: Effect.fnUntraced(function* ({ | ||
| maxAttempts, | ||
| name | ||
| }) { | ||
| const mailbox = yield* Mailbox.make({ | ||
| capacity: 0 | ||
| }); | ||
| const poll = sql.onDialectOrElse({ | ||
| pg: () => sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| WHERE id IN ( | ||
| SELECT id FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = FALSE | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| FOR UPDATE SKIP LOCKED | ||
| LIMIT ${pollBatchSizeSql} | ||
| ) | ||
| RETURNING id, queue_name, element, attempts | ||
| `, | ||
| mysql: () => sql` | ||
| SELECT id, queue_name, element, attempts FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = FALSE | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| LIMIT ${pollBatchSizeSql} | ||
| FOR UPDATE SKIP LOCKED | ||
| `.pipe(Effect.tap(rows => { | ||
| if (rows.length === 0) return Effect.void; | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| WHERE id IN ${stringLiteralArr(rows.map(r => r.id))} | ||
| `.unprepared; | ||
| }), sql.withTransaction), | ||
| mssql: () => sql` | ||
| WITH cte AS ( | ||
| SELECT TOP ${pollBatchSizeSql} id FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = 0 | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| ) | ||
| UPDATE q | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| OUTPUT inserted.id, inserted.queue_name, inserted.element, inserted.attempts | ||
| FROM ${tableNameSql} AS q | ||
| INNER JOIN cte ON q.id = cte.id | ||
| `, | ||
| // sqlite | ||
| orElse: () => sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| WHERE id IN ( | ||
| SELECT id FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = 0 | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| LIMIT ${pollBatchSizeSql} | ||
| ) | ||
| RETURNING id, queue_name, element, attempts | ||
| ` | ||
| }); | ||
| yield* Effect.gen(function* () { | ||
| while (true) { | ||
| const results = yield* poll; | ||
| if (results.length > 0) { | ||
| const toOffer = new Set(results); | ||
| yield* Effect.forEach(toOffer, element => { | ||
| element.element = JSON.parse(element.element); | ||
| return mailbox.offer(element).pipe(Effect.tap(() => { | ||
| toOffer.delete(element); | ||
| })); | ||
| }).pipe(Effect.onInterrupt(() => interrupt(Array.from(toOffer, e => e.id)))); | ||
| yield* Effect.yieldNow(); | ||
| } else { | ||
| // TODO: use listen/notify or equivalent to avoid polling | ||
| yield* Effect.sleep(pollInterval); | ||
| } | ||
| } | ||
| }).pipe(Effect.sandbox, Effect.retry(Schedule.spaced(500)), Effect.forkScoped, Effect.interruptible); | ||
| return mailbox; | ||
| }) | ||
| }); | ||
| return PersistedQueue.PersistedQueueStore.of({ | ||
| offer: (name, id, element) => Effect.suspend(() => sql` | ||
| INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at) | ||
| VALUES (${id}, ${name}, ${JSON.stringify(element)}, ${sqlFalse}, 0, ${sqlNow}, ${sqlNow}) | ||
| `).pipe(Effect.catchAllCause(cause => Effect.fail(new PersistedQueue.PersistedQueueError({ | ||
| message: "Failed to offer element to persisted queue", | ||
| cause: Cause.squash(cause) | ||
| })))), | ||
| take: ({ | ||
| maxAttempts, | ||
| name | ||
| }) => Effect.uninterruptibleMask(restore => RcMap.get(mailboxes, new QueueKey({ | ||
| name, | ||
| maxAttempts | ||
| })).pipe(Effect.flatMap(m => Effect.orDie(m.take)), Effect.zipLeft(Effect.yieldNow()), Effect.scoped, restore, Effect.tap(element => Effect.addFinalizer(Exit.match({ | ||
| onFailure: cause => Cause.isInterruptedOnly(cause) ? interrupt([element.id]) : retry(element.id, element.attempts + 1, cause), | ||
| onSuccess: () => complete(element.id, element.attempts + 1) | ||
| }))))) | ||
| }); | ||
| }); | ||
| class QueueKey extends Data.Class {} | ||
| /** | ||
| * @since 1.0.0 | ||
| * @category layers | ||
| */ | ||
| const layerStore = options => Layer.scoped(PersistedQueue.PersistedQueueStore, make(options)); | ||
| exports.layerStore = layerStore; | ||
| //# sourceMappingURL=SqlPersistedQueue.js.map |
| {"version":3,"file":"SqlPersistedQueue.js","names":["PersistedQueue","_interopRequireWildcard","require","Cause","Data","Duration","Effect","Exit","Layer","Mailbox","RcMap","Schedule","SqlClient","e","t","WeakMap","r","n","__esModule","o","i","f","__proto__","default","has","get","set","hasOwnProperty","call","Object","defineProperty","getOwnPropertyDescriptor","make","exports","fnUntraced","options","sql","withoutTransforms","tableName","tableNameSql","pollInterval","decode","millis","pollBatchSize","pollBatchSizeSql","literal","toString","lockRefreshInterval","seconds","lockExpiration","minutes","lockExpirationSql","Math","ceil","toSeconds","workerId","crypto","randomUUID","sqlNow","onDialectOrElse","mssql","mysql","pg","orElse","expiresAt","pipe","ignore","wrapString","s","stringLiteral","stringLiteralArr","arr","map","join","sqlTrue","sqlite","sqlFalse","workerIdSql","elementIds","Set","refreshLocks","size","void","complete","id","attempts","delete","retry","times","schedule","exponential","orDie","cause","pretty","renderErrorCause","interrupt","ids","suspend","tapErrorCause","logWarning","spaced","scheduleForked","fixed","annotateLogs","package","module","fiber","interruptible","mailboxes","lookup","maxAttempts","name","mailbox","capacity","poll","tap","rows","length","unprepared","withTransaction","gen","results","toOffer","forEach","element","JSON","parse","offer","onInterrupt","Array","from","yieldNow","sleep","sandbox","forkScoped","PersistedQueueStore","of","stringify","catchAllCause","fail","PersistedQueueError","message","squash","take","uninterruptibleMask","restore","QueueKey","flatMap","m","zipLeft","scoped","addFinalizer","match","onFailure","isInterruptedOnly","onSuccess","Class","layerStore"],"sources":["../../src/SqlPersistedQueue.ts"],"sourcesContent":[null],"mappings":";;;;;;AAGA,IAAAA,cAAA,GAAAC,uBAAA,CAAAC,OAAA;AACA,IAAAC,KAAA,GAAAF,uBAAA,CAAAC,OAAA;AACA,IAAAE,IAAA,GAAAH,uBAAA,CAAAC,OAAA;AACA,IAAAG,QAAA,GAAAJ,uBAAA,CAAAC,OAAA;AACA,IAAAI,MAAA,GAAAL,uBAAA,CAAAC,OAAA;AACA,IAAAK,IAAA,GAAAN,uBAAA,CAAAC,OAAA;AACA,IAAAM,KAAA,GAAAP,uBAAA,CAAAC,OAAA;AACA,IAAAO,OAAA,GAAAR,uBAAA,CAAAC,OAAA;AACA,IAAAQ,KAAA,GAAAT,uBAAA,CAAAC,OAAA;AACA,IAAAS,QAAA,GAAAV,uBAAA,CAAAC,OAAA;AAEA,IAAAU,SAAA,GAAAX,uBAAA,CAAAC,OAAA;AAA2C,SAAAD,wBAAAY,CAAA,EAAAC,CAAA,6BAAAC,OAAA,MAAAC,CAAA,OAAAD,OAAA,IAAAE,CAAA,OAAAF,OAAA,YAAAd,uBAAA,YAAAA,CAAAY,CAAA,EAAAC,CAAA,SAAAA,CAAA,IAAAD,CAAA,IAAAA,CAAA,CAAAK,UAAA,SAAAL,CAAA,MAAAM,CAAA,EAAAC,CAAA,EAAAC,CAAA,KAAAC,SAAA,QAAAC,OAAA,EAAAV,CAAA,iBAAAA,CAAA,uBAAAA,CAAA,yBAAAA,CAAA,SAAAQ,CAAA,MAAAF,CAAA,GAAAL,CAAA,GAAAG,CAAA,GAAAD,CAAA,QAAAG,CAAA,CAAAK,GAAA,CAAAX,CAAA,UAAAM,CAAA,CAAAM,GAAA,CAAAZ,CAAA,GAAAM,CAAA,CAAAO,GAAA,CAAAb,CAAA,EAAAQ,CAAA,gBAAAP,CAAA,IAAAD,CAAA,gBAAAC,CAAA,OAAAa,cAAA,CAAAC,IAAA,CAAAf,CAAA,EAAAC,CAAA,OAAAM,CAAA,IAAAD,CAAA,GAAAU,MAAA,CAAAC,cAAA,KAAAD,MAAA,CAAAE,wBAAA,CAAAlB,CAAA,EAAAC,CAAA,OAAAM,CAAA,CAAAK,GAAA,IAAAL,CAAA,CAAAM,GAAA,IAAAP,CAAA,CAAAE,CAAA,EAAAP,CAAA,EAAAM,CAAA,IAAAC,CAAA,CAAAP,CAAA,IAAAD,CAAA,CAAAC,CAAA,WAAAO,CAAA,KAAAR,CAAA,EAAAC,CAAA;AAd3C;;;;AAiBA;;;;AAIO,MAAMkB,IAAI,GAAAC,OAAA,CAAAD,IAAA,gBAYb1B,MAAM,CAAC4B,UAAU,CAAC,WAAUC,OAAO;EACrC,MAAMC,GAAG,GAAG,CAAC,OAAOxB,SAAS,CAACA,SAAS,EAAEyB,iBAAiB,EAAE;EAC5D,MAAMC,SAAS,GAAGH,OAAO,EAAEG,SAAS,IAAI,cAAc;EACtD,MAAMC,YAAY,GAAGH,GAAG,CAACE,SAAS,CAAC;EACnC,MAAME,YAAY,GAAGL,OAAO,EAAEK,YAAY,GACtCnC,QAAQ,CAACoC,MAAM,CAACN,OAAO,CAACK,YAAY,CAAC,GACrCnC,QAAQ,CAACqC,MAAM,CAAC,IAAI,CAAC;EACzB,MAAMC,aAAa,GAAGR,OAAO,EAAEQ,aAAa,IAAI,CAAC;EACjD,MAAMC,gBAAgB,GAAGR,GAAG,CAACS,OAAO,CAACF,aAAa,CAACG,QAAQ,EAAE,CAAC;EAC9D,MAAMC,mBAAmB,GAAGZ,OAAO,EAAEY,mBAAmB,GACpD1C,QAAQ,CAACoC,MAAM,CAACN,OAAO,CAACY,mBAAmB,CAAC,GAC5C1C,QAAQ,CAAC2C,OAAO,CAAC,EAAE,CAAC;EACxB,MAAMC,cAAc,GAAGd,OAAO,EAAEc,cAAc,GAAG5C,QAAQ,CAACoC,MAAM,CAACN,OAAO,CAACc,cAAc,CAAC,GAAG5C,QAAQ,CAAC6C,OAAO,CAAC,CAAC,CAAC;EAC9G,MAAMC,iBAAiB,GAAGf,GAAG,CAACS,OAAO,CAACO,IAAI,CAACC,IAAI,CAAChD,QAAQ,CAACiD,SAAS,CAACL,cAAc,CAAC,CAAC,CAACH,QAAQ,EAAE,CAAC;EAC/F,MAAMS,QAAQ,GAAGC,MAAM,CAACC,UAAU,EAAE;EAEpC,MAAMC,MAAM,GAAGtB,GAAG,CAACuB,eAAe,CAAC;IACjCC,KAAK,EAAEA,CAAA,KAAMxB,GAAG,CAACS,OAAO,CAAC,WAAW,CAAC;IACrCgB,KAAK,EAAEA,CAAA,KAAMzB,GAAG,CAACS,OAAO,CAAC,OAAO,CAAC;IACjCiB,EAAE,EAAEA,CAAA,KAAM1B,GAAG,CAACS,OAAO,CAAC,OAAO,CAAC;IAC9B;IACAkB,MAAM,EAAEA,CAAA,KAAM3B,GAAG,CAACS,OAAO,CAAC,mBAAmB;GAC9C,CAAC;EAEF,MAAMmB,SAAS,GAAG5B,GAAG,CAACuB,eAAe,CAAC;IACpCG,EAAE,EAAEA,CAAA,KAAM1B,GAAG,GAAGsB,MAAM,gBAAgBP,iBAAiB,WAAW;IAClEU,KAAK,EAAEA,CAAA,KAAMzB,GAAG,YAAYsB,MAAM,cAAcP,iBAAiB,UAAU;IAC3ES,KAAK,EAAEA,CAAA,KAAMxB,GAAG,oBAAoBe,iBAAiB,KAAKO,MAAM,GAAG;IACnEK,MAAM,EAAEA,CAAA,KAAM3B,GAAG,YAAYsB,MAAM,OAAOP,iBAAiB;GAC5D,CAAC;EAEF,OAAOf,GAAG,CAACuB,eAAe,CAAC;IACzBE,KAAK,EAAEA,CAAA,KACLzB,GAAG,8BAA8BG,YAAY;;;;;;;;;;;QAW3C;IACJuB,EAAE,EAAEA,CAAA,KACF1B,GAAG,8BAA8BG,YAAY;;;;;;;;;;;QAW3C;IACJqB,KAAK,EAAEA,CAAA,KACLxB,GAAG,sDAAsDG,YAAY;qBACtDA,YAAY;;;;;;;;;;;QAWzB;IACJ;IACAwB,MAAM,EAAEA,CAAA,KACN3B,GAAG,8BAA8BG,YAAY;;;;;;;;;;;;GAYhD,CAAC;EAEF,OAAOH,GAAG,CAACuB,eAAe,CAAC;IACzBC,KAAK,EAAEA,CAAA,KACLxB,GAAG,+DAA+DE,SAAS;2BACtDC,YAAY,YAAYA,YAAY,iDAAiD;IAC5GsB,KAAK,EAAEA,CAAA,KACLzB,GAAG,gBACDA,GAAG,CAAC,OAAOE,SAAS,OAAO,CAC7B,OAAOC,YAAY,iDAAiD,CACjE0B,IAAI,CAAC3D,MAAM,CAAC4D,MAAM,CAAC;IACxBH,MAAM,EAAEA,CAAA,KACN3B,GAAG,8BACDA,GAAG,CAAC,OAAOE,SAAS,OAAO,CAC7B,OAAOC,YAAY;GACtB,CAAC;EAEF,OAAOH,GAAG,CAACuB,eAAe,CAAC;IACzBC,KAAK,EAAEA,CAAA,KACLxB,GAAG,+DAA+DE,SAAS;uBAC1DF,GAAG,CAAC,OAAOE,SAAS,SAAS,CAAC,OAAOC,YAAY,oBAAoB;IACxFsB,KAAK,EAAEA,CAAA,KACLzB,GAAG,gBAAgBA,GAAG,CAAC,OAAOE,SAAS,SAAS,CAAC,OAAOC,YAAY,oBAAoB,CAAC0B,IAAI,CAAC3D,MAAM,CAAC4D,MAAM,CAAC;IAC9GH,MAAM,EAAEA,CAAA,KAAM3B,GAAG,8BAA8BA,GAAG,CAAC,OAAOE,SAAS,SAAS,CAAC,OAAOC,YAAY;GACjG,CAAC;EAEF,MAAM4B,UAAU,GAAG/B,GAAG,CAACuB,eAAe,CAAC;IACrCC,KAAK,EAAEA,CAAA,KAAOQ,CAAS,IAAK,KAAKA,CAAC,GAAG;IACrCL,MAAM,EAAEA,CAAA,KAAOK,CAAS,IAAK,IAAIA,CAAC;GACnC,CAAC;EACF,MAAMC,aAAa,GAAID,CAAS,IAAKhC,GAAG,CAACS,OAAO,CAACsB,UAAU,CAACC,CAAC,CAAC,CAAC;EAC/D,MAAME,gBAAgB,GAAIC,GAA0B,IAAKnC,GAAG,CAACS,OAAO,CAAC,IAAI0B,GAAG,CAACC,GAAG,CAACL,UAAU,CAAC,CAACM,IAAI,CAAC,GAAG,CAAC,GAAG,CAAC;EAE1G,MAAMC,OAAO,GAAGtC,GAAG,CAACuB,eAAe,CAAC;IAClCgB,MAAM,EAAEA,CAAA,KAAMvC,GAAG,CAACS,OAAO,CAAC,GAAG,CAAC;IAC9BkB,MAAM,EAAEA,CAAA,KAAM3B,GAAG,CAACS,OAAO,CAAC,MAAM;GACjC,CAAC;EACF,MAAM+B,QAAQ,GAAGxC,GAAG,CAACuB,eAAe,CAAC;IACnCgB,MAAM,EAAEA,CAAA,KAAMvC,GAAG,CAACS,OAAO,CAAC,GAAG,CAAC;IAC9BkB,MAAM,EAAEA,CAAA,KAAM3B,GAAG,CAACS,OAAO,CAAC,OAAO;GAClC,CAAC;EAEF,MAAMgC,WAAW,GAAGR,aAAa,CAACd,QAAQ,CAAC;EAC3C,MAAMuB,UAAU,GAAG,IAAIC,GAAG,EAAU;EACpC,MAAMC,YAAY,GAAGA,CAAA,KAAoC;IACvD,IAAIF,UAAU,CAACG,IAAI,KAAK,CAAC,EAAE;MACzB,OAAO3E,MAAM,CAAC4E,IAAI;IACpB;IACA,OAAO9C,GAAG;eACCG,YAAY;0BACDmB,MAAM;4BACJmB,WAAW;KAClC;EACH,CAAC;EACD,MAAMM,QAAQ,GAAGA,CAACC,EAAU,EAAEC,QAAgB,KAAI;IAChDP,UAAU,CAACQ,MAAM,CAACF,EAAE,CAAC;IACrB,OAAOhD,GAAG;eACCG,YAAY;iEACsCmB,MAAM,iBAAiBgB,OAAO,gBAAgBW,QAAQ;mBACpGD,EAAE;0BACKP,WAAW;KAChC,CAACZ,IAAI,CACJ3D,MAAM,CAACiF,KAAK,CAAC;MACXC,KAAK,EAAE,CAAC;MACRC,QAAQ,EAAE9E,QAAQ,CAAC+E,WAAW,CAAC,GAAG,EAAE,GAAG;KACxC,CAAC,EACFpF,MAAM,CAACqF,KAAK,CACb;EACH,CAAC;EACD,MAAMJ,KAAK,GAAGA,CAACH,EAAU,EAAEC,QAAgB,EAAEO,KAAuB,KAAI;IACtEd,UAAU,CAACQ,MAAM,CAACF,EAAE,CAAC;IACrB,OAAOhD,GAAG;eACCG,YAAY;iEACsCmB,MAAM,gBAAgB2B,QAAQ,oBACzFlF,KAAK,CAAC0F,MAAM,CAACD,KAAK,EAAE;MAAEE,gBAAgB,EAAE;IAAI,CAAE,CAChD;mBACeV,EAAE;0BACKP,WAAW;KAChC,CAACZ,IAAI,CACJ3D,MAAM,CAACiF,KAAK,CAAC;MACXC,KAAK,EAAE,CAAC;MACRC,QAAQ,EAAE9E,QAAQ,CAAC+E,WAAW,CAAC,GAAG,EAAE,GAAG;KACxC,CAAC,EACFpF,MAAM,CAACqF,KAAK,CACb;EACH,CAAC;EACD,MAAMI,SAAS,GAAIC,GAAkB,IAAI;IACvC,KAAK,MAAMZ,EAAE,IAAIY,GAAG,EAAE;MACpBlB,UAAU,CAACQ,MAAM,CAACF,EAAE,CAAC;IACvB;IACA,OAAOhD,GAAG;eACCG,YAAY;;oBAEP+B,gBAAgB,CAAC0B,GAAG,CAAC;0BACfnB,WAAW;KAChC,CAACZ,IAAI,CACJ3D,MAAM,CAACiF,KAAK,CAAC;MACXC,KAAK,EAAE,CAAC;MACRC,QAAQ,EAAE9E,QAAQ,CAAC+E,WAAW,CAAC,GAAG,EAAE,GAAG;KACxC,CAAC,EACFpF,MAAM,CAACqF,KAAK,CACb;EACH,CAAC;EAED,OAAOrF,MAAM,CAAC2F,OAAO,CAACjB,YAAY,CAAC,CAACf,IAAI,CACtC3D,MAAM,CAAC4F,aAAa,CAAC5F,MAAM,CAAC6F,UAAU,CAAC,EACvC7F,MAAM,CAACiF,KAAK,CAAC5E,QAAQ,CAACyF,MAAM,CAAC,GAAG,CAAC,CAAC,EAClC9F,MAAM,CAAC+F,cAAc,CAAC1F,QAAQ,CAAC2F,KAAK,CAACvD,mBAAmB,CAAC,CAAC,EAC1DzC,MAAM,CAACiG,YAAY,CAAC;IAClBC,OAAO,EAAE,aAAa;IACtBC,MAAM,EAAE,mBAAmB;IAC3BC,KAAK,EAAE;GACR,CAAC,EACFpG,MAAM,CAACqG,aAAa,CACrB;EAQD,MAAMC,SAAS,GAAG,OAAOlG,KAAK,CAACsB,IAAI,CAAC;IAClC6E,MAAM,EAAEvG,MAAM,CAAC4B,UAAU,CAAC,WAAU;MAAE4E,WAAW;MAAEC;IAAI,CAAY;MACjE,MAAMC,OAAO,GAAG,OAAOvG,OAAO,CAACuB,IAAI,CAAU;QAAEiF,QAAQ,EAAE;MAAC,CAAE,CAAC;MAE7D,MAAMC,IAAI,GAAG9E,GAAG,CAACuB,eAAe,CAAC;QAC/BG,EAAE,EAAEA,CAAA,KACF1B,GAAY;qBACDG,YAAY;gCACDmB,MAAM,mBAAmBmB,WAAW;;+BAErCtC,YAAY;mCACRwE,IAAI;;+BAERD,WAAW;0DACgB9C,SAAS;;;sBAG7CpB,gBAAgB;;;WAG3B;QACHiB,KAAK,EAAEA,CAAA,KACLzB,GAAY;4DACsCG,YAAY;iCACvCwE,IAAI;;6BAERD,WAAW;wDACgB9C,SAAS;;oBAE7CpB,gBAAgB;;WAEzB,CAACqB,IAAI,CACJ3D,MAAM,CAAC6G,GAAG,CAAEC,IAAI,IAAI;UAClB,IAAIA,IAAI,CAACC,MAAM,KAAK,CAAC,EAAE,OAAO/G,MAAM,CAAC4E,IAAI;UACzC,OAAO9C,GAAG;yBACCG,YAAY;oCACDmB,MAAM,mBAAmBmB,WAAW;8BAC1CP,gBAAgB,CAAC8C,IAAI,CAAC5C,GAAG,CAAExD,CAAC,IAAKA,CAAC,CAACoE,EAAE,CAAC,CAAC;eACtD,CAACkC,UAAU;QACd,CAAC,CAAC,EACFlF,GAAG,CAACmF,eAAe,CACpB;QACH3D,KAAK,EAAEA,CAAA,KACLxB,GAAY;;2BAEKQ,gBAAgB,YAAYL,YAAY;mCAChCwE,IAAI;;+BAERD,WAAW;0DACgB9C,SAAS;;;;gCAInCN,MAAM,mBAAmBmB,WAAW;;mBAEjDtC,YAAY;;WAEpB;QACH;QACAwB,MAAM,EAAEA,CAAA,KACN3B,GAAY;qBACDG,YAAY;gCACDmB,MAAM,mBAAmBmB,WAAW;;+BAErCtC,YAAY;mCACRwE,IAAI;;+BAERD,WAAW;0DACgB9C,SAAS;;sBAE7CpB,gBAAgB;;;;OAI/B,CAAC;MAEF,OAAOtC,MAAM,CAACkH,GAAG,CAAC,aAAS;QACzB,OAAO,IAAI,EAAE;UACX,MAAMC,OAAO,GAAG,OAAOP,IAAI;UAC3B,IAAIO,OAAO,CAACJ,MAAM,GAAG,CAAC,EAAE;YACtB,MAAMK,OAAO,GAAG,IAAI3C,GAAG,CAAC0C,OAAO,CAAC;YAChC,OAAOnH,MAAM,CAACqH,OAAO,CAACD,OAAO,EAAGE,OAAO,IAAI;cACzCA,OAAO,CAACA,OAAO,GAAGC,IAAI,CAACC,KAAK,CAACF,OAAO,CAACA,OAAO,CAAC;cAC7C,OAAOZ,OAAO,CAACe,KAAK,CAACH,OAAO,CAAC,CAAC3D,IAAI,CAChC3D,MAAM,CAAC6G,GAAG,CAAC,MAAK;gBACdO,OAAO,CAACpC,MAAM,CAACsC,OAAO,CAAC;cACzB,CAAC,CAAC,CACH;YACH,CAAC,CAAC,CAAC3D,IAAI,CACL3D,MAAM,CAAC0H,WAAW,CAAC,MAAMjC,SAAS,CAACkC,KAAK,CAACC,IAAI,CAACR,OAAO,EAAG7G,CAAC,IAAKA,CAAC,CAACuE,EAAE,CAAC,CAAC,CAAC,CACtE;YACD,OAAO9E,MAAM,CAAC6H,QAAQ,EAAE;UAC1B,CAAC,MAAM;YACL;YACA,OAAO7H,MAAM,CAAC8H,KAAK,CAAC5F,YAAY,CAAC;UACnC;QACF;MACF,CAAC,CAAC,CAACyB,IAAI,CACL3D,MAAM,CAAC+H,OAAO,EACd/H,MAAM,CAACiF,KAAK,CAAC5E,QAAQ,CAACyF,MAAM,CAAC,GAAG,CAAC,CAAC,EAClC9F,MAAM,CAACgI,UAAU,EACjBhI,MAAM,CAACqG,aAAa,CACrB;MAED,OAAOK,OAAO;IAChB,CAAC;GACF,CAAC;EAEF,OAAOhH,cAAc,CAACuI,mBAAmB,CAACC,EAAE,CAAC;IAC3CT,KAAK,EAAEA,CAAChB,IAAI,EAAE3B,EAAE,EAAEwC,OAAO,KACvBtH,MAAM,CAAC2F,OAAO,CAAC,MACb7D,GAAG;wBACaG,YAAY;oBAChB6C,EAAE,KAAK2B,IAAI,KAAKc,IAAI,CAACY,SAAS,CAACb,OAAO,CAAC,KAAKhD,QAAQ,QAAQlB,MAAM,KAAKA,MAAM;SACxF,CACF,CAACO,IAAI,CACJ3D,MAAM,CAACoI,aAAa,CAAE9C,KAAK,IACzBtF,MAAM,CAACqI,IAAI,CACT,IAAI3I,cAAc,CAAC4I,mBAAmB,CAAC;MACrCC,OAAO,EAAE,4CAA4C;MACrDjD,KAAK,EAAEzF,KAAK,CAAC2I,MAAM,CAAClD,KAAK;KAC1B,CAAC,CACH,CACF,CACF;IACHmD,IAAI,EAAEA,CAAC;MAAEjC,WAAW;MAAEC;IAAI,CAAE,KAC1BzG,MAAM,CAAC0I,mBAAmB,CAAEC,OAAO,IACjCvI,KAAK,CAACe,GAAG,CAACmF,SAAS,EAAE,IAAIsC,QAAQ,CAAC;MAAEnC,IAAI;MAAED;IAAW,CAAE,CAAC,CAAC,CAAC7C,IAAI,CAC5D3D,MAAM,CAAC6I,OAAO,CAAEC,CAAC,IAAK9I,MAAM,CAACqF,KAAK,CAACyD,CAAC,CAACL,IAAI,CAAC,CAAC,EAC3CzI,MAAM,CAAC+I,OAAO,CAAC/I,MAAM,CAAC6H,QAAQ,EAAE,CAAC,EACjC7H,MAAM,CAACgJ,MAAM,EACbL,OAAO,EACP3I,MAAM,CAAC6G,GAAG,CAAES,OAAO,IACjBtH,MAAM,CAACiJ,YAAY,CAAChJ,IAAI,CAACiJ,KAAK,CAAC;MAC7BC,SAAS,EAAG7D,KAAK,IACfzF,KAAK,CAACuJ,iBAAiB,CAAC9D,KAAK,CAAC,GAC1BG,SAAS,CAAC,CAAC6B,OAAO,CAACxC,EAAE,CAAC,CAAC,GACvBG,KAAK,CAACqC,OAAO,CAACxC,EAAE,EAAEwC,OAAO,CAACvC,QAAQ,GAAG,CAAC,EAAEO,KAAK,CAAC;MACpD+D,SAAS,EAAEA,CAAA,KAAMxE,QAAQ,CAACyC,OAAO,CAACxC,EAAE,EAAEwC,OAAO,CAACvC,QAAQ,GAAG,CAAC;KAC3D,CAAC,CAAC,CACJ,CACF;GAEN,CAAC;AACJ,CAAC,CAAC;AAEF,MAAM6D,QAAS,SAAQ9I,IAAI,CAACwJ,KAG1B;AAEF;;;;AAIO,MAAMC,UAAU,GAAI1H,OAAY,IAIlC3B,KAAK,CAAC8I,MAAM,CAACtJ,cAAc,CAACuI,mBAAmB,EAAEvG,IAAI,CAACG,OAAO,CAAC,CAAC;AAAAF,OAAA,CAAA4H,UAAA,GAAAA,UAAA","ignoreList":[]} |
| /** | ||
| * @since 1.0.0 | ||
| */ | ||
| import * as PersistedQueue from "@effect/experimental/PersistedQueue"; | ||
| import * as Duration from "effect/Duration"; | ||
| import * as Effect from "effect/Effect"; | ||
| import * as Layer from "effect/Layer"; | ||
| import type * as Scope from "effect/Scope"; | ||
| import * as SqlClient from "./SqlClient.js"; | ||
| import type { SqlError } from "./SqlError.js"; | ||
| /** | ||
| * @since 1.0.0 | ||
| * @category constructors | ||
| */ | ||
| export declare const make: (options?: { | ||
| readonly tableName?: string | undefined; | ||
| readonly pollInterval?: Duration.DurationInput | undefined; | ||
| readonly pollBatchSize?: number | undefined; | ||
| readonly lockRefreshInterval?: Duration.DurationInput | undefined; | ||
| readonly lockExpiration?: Duration.DurationInput | undefined; | ||
| } | undefined) => Effect.Effect<PersistedQueue.PersistedQueueStore["Type"], SqlError, SqlClient.SqlClient | Scope.Scope>; | ||
| /** | ||
| * @since 1.0.0 | ||
| * @category layers | ||
| */ | ||
| export declare const layerStore: (options?: {}) => Layer.Layer<PersistedQueue.PersistedQueueStore, SqlError, SqlClient.SqlClient>; | ||
| //# sourceMappingURL=SqlPersistedQueue.d.ts.map |
| {"version":3,"file":"SqlPersistedQueue.d.ts","sourceRoot":"","sources":["../../src/SqlPersistedQueue.ts"],"names":[],"mappings":"AAAA;;GAEG;AACH,OAAO,KAAK,cAAc,MAAM,qCAAqC,CAAA;AAGrE,OAAO,KAAK,QAAQ,MAAM,iBAAiB,CAAA;AAC3C,OAAO,KAAK,MAAM,MAAM,eAAe,CAAA;AAEvC,OAAO,KAAK,KAAK,MAAM,cAAc,CAAA;AAIrC,OAAO,KAAK,KAAK,KAAK,MAAM,cAAc,CAAA;AAC1C,OAAO,KAAK,SAAS,MAAM,gBAAgB,CAAA;AAC3C,OAAO,KAAK,EAAE,QAAQ,EAAE,MAAM,eAAe,CAAA;AAE7C;;;GAGG;AACH,eAAO,MAAM,IAAI,EAAE,CACjB,OAAO,CAAC,EAAE;IACR,QAAQ,CAAC,SAAS,CAAC,EAAE,MAAM,GAAG,SAAS,CAAA;IACvC,QAAQ,CAAC,YAAY,CAAC,EAAE,QAAQ,CAAC,aAAa,GAAG,SAAS,CAAA;IAC1D,QAAQ,CAAC,aAAa,CAAC,EAAE,MAAM,GAAG,SAAS,CAAA;IAC3C,QAAQ,CAAC,mBAAmB,CAAC,EAAE,QAAQ,CAAC,aAAa,GAAG,SAAS,CAAA;IACjE,QAAQ,CAAC,cAAc,CAAC,EAAE,QAAQ,CAAC,aAAa,GAAG,SAAS,CAAA;CAC7D,GAAG,SAAS,KACV,MAAM,CAAC,MAAM,CAChB,cAAc,CAAC,mBAAmB,CAAC,MAAM,CAAC,EAC1C,QAAQ,EACR,SAAS,CAAC,SAAS,GAAG,KAAK,CAAC,KAAK,CAiWjC,CAAA;AAOF;;;GAGG;AACH,eAAO,MAAM,UAAU,GAAI,UAAU,EAAE,KAAG,KAAK,CAAC,KAAK,CACnD,cAAc,CAAC,mBAAmB,EAClC,QAAQ,EACR,SAAS,CAAC,SAAS,CAC+C,CAAA"} |
| /** | ||
| * @since 1.0.0 | ||
| */ | ||
| import * as PersistedQueue from "@effect/experimental/PersistedQueue"; | ||
| import * as Cause from "effect/Cause"; | ||
| import * as Data from "effect/Data"; | ||
| import * as Duration from "effect/Duration"; | ||
| import * as Effect from "effect/Effect"; | ||
| import * as Exit from "effect/Exit"; | ||
| import * as Layer from "effect/Layer"; | ||
| import * as Mailbox from "effect/Mailbox"; | ||
| import * as RcMap from "effect/RcMap"; | ||
| import * as Schedule from "effect/Schedule"; | ||
| import * as SqlClient from "./SqlClient.js"; | ||
| /** | ||
| * @since 1.0.0 | ||
| * @category constructors | ||
| */ | ||
| export const make = /*#__PURE__*/Effect.fnUntraced(function* (options) { | ||
| const sql = (yield* SqlClient.SqlClient).withoutTransforms(); | ||
| const tableName = options?.tableName ?? "effect_queue"; | ||
| const tableNameSql = sql(tableName); | ||
| const pollInterval = options?.pollInterval ? Duration.decode(options.pollInterval) : Duration.millis(1000); | ||
| const pollBatchSize = options?.pollBatchSize ?? 1; | ||
| const pollBatchSizeSql = sql.literal(pollBatchSize.toString()); | ||
| const lockRefreshInterval = options?.lockRefreshInterval ? Duration.decode(options.lockRefreshInterval) : Duration.seconds(30); | ||
| const lockExpiration = options?.lockExpiration ? Duration.decode(options.lockExpiration) : Duration.minutes(2); | ||
| const lockExpirationSql = sql.literal(Math.ceil(Duration.toSeconds(lockExpiration)).toString()); | ||
| const workerId = crypto.randomUUID(); | ||
| const sqlNow = sql.onDialectOrElse({ | ||
| mssql: () => sql.literal("GETDATE()"), | ||
| mysql: () => sql.literal("NOW()"), | ||
| pg: () => sql.literal("NOW()"), | ||
| // sqlite | ||
| orElse: () => sql.literal("CURRENT_TIMESTAMP") | ||
| }); | ||
| const expiresAt = sql.onDialectOrElse({ | ||
| pg: () => sql`${sqlNow} - INTERVAL '${lockExpirationSql} seconds'`, | ||
| mysql: () => sql`DATE_SUB(${sqlNow}, INTERVAL ${lockExpirationSql} SECOND)`, | ||
| mssql: () => sql`DATEADD(SECOND, -${lockExpirationSql}, ${sqlNow})`, | ||
| orElse: () => sql`datetime(${sqlNow}, '-${lockExpirationSql} seconds')` | ||
| }); | ||
| yield* sql.onDialectOrElse({ | ||
| mysql: () => sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( | ||
| id VARCHAR(36) PRIMARY KEY, | ||
| queue_name VARCHAR(100) NOT NULL, | ||
| element TEXT NOT NULL, | ||
| completed BOOLEAN NOT NULL, | ||
| attempts INT NOT NULL DEFAULT 0, | ||
| last_failure TEXT NULL, | ||
| acquired_at DATETIME NULL, | ||
| acquired_by VARCHAR(36) NULL, | ||
| created_at DATETIME NOT NULL, | ||
| updated_at DATETIME NOT NULL | ||
| )`, | ||
| pg: () => sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( | ||
| id UUID PRIMARY KEY, | ||
| queue_name VARCHAR(100) NOT NULL, | ||
| element TEXT NOT NULL, | ||
| completed BOOLEAN NOT NULL, | ||
| attempts INTEGER NOT NULL DEFAULT 0, | ||
| last_failure TEXT NULL, | ||
| acquired_at TIMESTAMP NULL, | ||
| acquired_by UUID NULL, | ||
| created_at TIMESTAMP NOT NULL, | ||
| updated_at TIMESTAMP NOT NULL | ||
| )`, | ||
| mssql: () => sql`IF NOT EXISTS (SELECT * FROM sysobjects WHERE name=${tableNameSql} AND xtype='U') | ||
| CREATE TABLE ${tableNameSql} ( | ||
| id UNIQUEIDENTIFIER PRIMARY KEY, | ||
| queue_name NVARCHAR(100) NOT NULL, | ||
| element NVARCHAR(MAX) NOT NULL, | ||
| completed BIT NOT NULL, | ||
| attempts INT NOT NULL DEFAULT 0, | ||
| last_failure NVARCHAR(MAX) NULL, | ||
| acquired_at DATETIME2 NULL, | ||
| acquired_by UNIQUEIDENTIFIER NULL, | ||
| created_at DATETIME2 NOT NULL, | ||
| updated_at DATETIME2 NOT NULL | ||
| )`, | ||
| // sqlite | ||
| orElse: () => sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( | ||
| id TEXT PRIMARY KEY, | ||
| queue_name TEXT NOT NULL, | ||
| element TEXT NOT NULL, | ||
| completed BOOLEAN NOT NULL, | ||
| attempts INTEGER NOT NULL DEFAULT 0, | ||
| last_failure TEXT NULL, | ||
| acquired_at DATETIME NULL, | ||
| acquired_by TEXT NULL, | ||
| created_at DATETIME NOT NULL, | ||
| updated_at DATETIME NOT NULL | ||
| )` | ||
| }); | ||
| yield* sql.onDialectOrElse({ | ||
| mssql: () => sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_take') | ||
| CREATE INDEX idx_${tableNameSql}_take ON ${tableNameSql} (queue_name, completed, attempts, acquired_at)`, | ||
| mysql: () => sql`CREATE INDEX ${sql(`idx_${tableName}_take`)} ON ${tableNameSql} (queue_name, completed, attempts, acquired_at)`.pipe(Effect.ignore), | ||
| orElse: () => sql`CREATE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_take`)} ON ${tableNameSql} (queue_name, completed, attempts, acquired_at)` | ||
| }); | ||
| yield* sql.onDialectOrElse({ | ||
| mssql: () => sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_update') | ||
| CREATE INDEX ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (id, acquired_by)`, | ||
| mysql: () => sql`CREATE INDEX ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (id, acquired_by)`.pipe(Effect.ignore), | ||
| orElse: () => sql`CREATE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (id, acquired_by)` | ||
| }); | ||
| const wrapString = sql.onDialectOrElse({ | ||
| mssql: () => s => `N'${s}'`, | ||
| orElse: () => s => `'${s}'` | ||
| }); | ||
| const stringLiteral = s => sql.literal(wrapString(s)); | ||
| const stringLiteralArr = arr => sql.literal(`(${arr.map(wrapString).join(",")})`); | ||
| const sqlTrue = sql.onDialectOrElse({ | ||
| sqlite: () => sql.literal("1"), | ||
| orElse: () => sql.literal("TRUE") | ||
| }); | ||
| const sqlFalse = sql.onDialectOrElse({ | ||
| sqlite: () => sql.literal("0"), | ||
| orElse: () => sql.literal("FALSE") | ||
| }); | ||
| const workerIdSql = stringLiteral(workerId); | ||
| const elementIds = new Set(); | ||
| const refreshLocks = () => { | ||
| if (elementIds.size === 0) { | ||
| return Effect.void; | ||
| } | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow} | ||
| WHERE acquired_by = ${workerIdSql} | ||
| `; | ||
| }; | ||
| const complete = (id, attempts) => { | ||
| elementIds.delete(id); | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = NULL, acquired_by = NULL, updated_at = ${sqlNow}, completed = ${sqlTrue}, attempts = ${attempts} | ||
| WHERE id = ${id} | ||
| AND acquired_by = ${workerIdSql} | ||
| `.pipe(Effect.retry({ | ||
| times: 5, | ||
| schedule: Schedule.exponential(100, 1.5) | ||
| }), Effect.orDie); | ||
| }; | ||
| const retry = (id, attempts, cause) => { | ||
| elementIds.delete(id); | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = NULL, acquired_by = NULL, updated_at = ${sqlNow}, attempts = ${attempts}, last_failure = ${Cause.pretty(cause, { | ||
| renderErrorCause: true | ||
| })} | ||
| WHERE id = ${id} | ||
| AND acquired_by = ${workerIdSql} | ||
| `.pipe(Effect.retry({ | ||
| times: 5, | ||
| schedule: Schedule.exponential(100, 1.5) | ||
| }), Effect.orDie); | ||
| }; | ||
| const interrupt = ids => { | ||
| for (const id of ids) { | ||
| elementIds.delete(id); | ||
| } | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = NULL, acquired_by = NULL | ||
| WHERE id IN ${stringLiteralArr(ids)} | ||
| AND acquired_by = ${workerIdSql} | ||
| `.pipe(Effect.retry({ | ||
| times: 5, | ||
| schedule: Schedule.exponential(100, 1.5) | ||
| }), Effect.orDie); | ||
| }; | ||
| yield* Effect.suspend(refreshLocks).pipe(Effect.tapErrorCause(Effect.logWarning), Effect.retry(Schedule.spaced(500)), Effect.scheduleForked(Schedule.fixed(lockRefreshInterval)), Effect.annotateLogs({ | ||
| package: "@effect/sql", | ||
| module: "SqlPersistedQueue", | ||
| fiber: "refreshLocks" | ||
| }), Effect.interruptible); | ||
| const mailboxes = yield* RcMap.make({ | ||
| lookup: Effect.fnUntraced(function* ({ | ||
| maxAttempts, | ||
| name | ||
| }) { | ||
| const mailbox = yield* Mailbox.make({ | ||
| capacity: 0 | ||
| }); | ||
| const poll = sql.onDialectOrElse({ | ||
| pg: () => sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| WHERE id IN ( | ||
| SELECT id FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = FALSE | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| FOR UPDATE SKIP LOCKED | ||
| LIMIT ${pollBatchSizeSql} | ||
| ) | ||
| RETURNING id, queue_name, element, attempts | ||
| `, | ||
| mysql: () => sql` | ||
| SELECT id, queue_name, element, attempts FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = FALSE | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| LIMIT ${pollBatchSizeSql} | ||
| FOR UPDATE SKIP LOCKED | ||
| `.pipe(Effect.tap(rows => { | ||
| if (rows.length === 0) return Effect.void; | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| WHERE id IN ${stringLiteralArr(rows.map(r => r.id))} | ||
| `.unprepared; | ||
| }), sql.withTransaction), | ||
| mssql: () => sql` | ||
| WITH cte AS ( | ||
| SELECT TOP ${pollBatchSizeSql} id FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = 0 | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| ) | ||
| UPDATE q | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| OUTPUT inserted.id, inserted.queue_name, inserted.element, inserted.attempts | ||
| FROM ${tableNameSql} AS q | ||
| INNER JOIN cte ON q.id = cte.id | ||
| `, | ||
| // sqlite | ||
| orElse: () => sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| WHERE id IN ( | ||
| SELECT id FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = 0 | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| LIMIT ${pollBatchSizeSql} | ||
| ) | ||
| RETURNING id, queue_name, element, attempts | ||
| ` | ||
| }); | ||
| yield* Effect.gen(function* () { | ||
| while (true) { | ||
| const results = yield* poll; | ||
| if (results.length > 0) { | ||
| const toOffer = new Set(results); | ||
| yield* Effect.forEach(toOffer, element => { | ||
| element.element = JSON.parse(element.element); | ||
| return mailbox.offer(element).pipe(Effect.tap(() => { | ||
| toOffer.delete(element); | ||
| })); | ||
| }).pipe(Effect.onInterrupt(() => interrupt(Array.from(toOffer, e => e.id)))); | ||
| yield* Effect.yieldNow(); | ||
| } else { | ||
| // TODO: use listen/notify or equivalent to avoid polling | ||
| yield* Effect.sleep(pollInterval); | ||
| } | ||
| } | ||
| }).pipe(Effect.sandbox, Effect.retry(Schedule.spaced(500)), Effect.forkScoped, Effect.interruptible); | ||
| return mailbox; | ||
| }) | ||
| }); | ||
| return PersistedQueue.PersistedQueueStore.of({ | ||
| offer: (name, id, element) => Effect.suspend(() => sql` | ||
| INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at) | ||
| VALUES (${id}, ${name}, ${JSON.stringify(element)}, ${sqlFalse}, 0, ${sqlNow}, ${sqlNow}) | ||
| `).pipe(Effect.catchAllCause(cause => Effect.fail(new PersistedQueue.PersistedQueueError({ | ||
| message: "Failed to offer element to persisted queue", | ||
| cause: Cause.squash(cause) | ||
| })))), | ||
| take: ({ | ||
| maxAttempts, | ||
| name | ||
| }) => Effect.uninterruptibleMask(restore => RcMap.get(mailboxes, new QueueKey({ | ||
| name, | ||
| maxAttempts | ||
| })).pipe(Effect.flatMap(m => Effect.orDie(m.take)), Effect.zipLeft(Effect.yieldNow()), Effect.scoped, restore, Effect.tap(element => Effect.addFinalizer(Exit.match({ | ||
| onFailure: cause => Cause.isInterruptedOnly(cause) ? interrupt([element.id]) : retry(element.id, element.attempts + 1, cause), | ||
| onSuccess: () => complete(element.id, element.attempts + 1) | ||
| }))))) | ||
| }); | ||
| }); | ||
| class QueueKey extends Data.Class {} | ||
| /** | ||
| * @since 1.0.0 | ||
| * @category layers | ||
| */ | ||
| export const layerStore = options => Layer.scoped(PersistedQueue.PersistedQueueStore, make(options)); | ||
| //# sourceMappingURL=SqlPersistedQueue.js.map |
| {"version":3,"file":"SqlPersistedQueue.js","names":["PersistedQueue","Cause","Data","Duration","Effect","Exit","Layer","Mailbox","RcMap","Schedule","SqlClient","make","fnUntraced","options","sql","withoutTransforms","tableName","tableNameSql","pollInterval","decode","millis","pollBatchSize","pollBatchSizeSql","literal","toString","lockRefreshInterval","seconds","lockExpiration","minutes","lockExpirationSql","Math","ceil","toSeconds","workerId","crypto","randomUUID","sqlNow","onDialectOrElse","mssql","mysql","pg","orElse","expiresAt","pipe","ignore","wrapString","s","stringLiteral","stringLiteralArr","arr","map","join","sqlTrue","sqlite","sqlFalse","workerIdSql","elementIds","Set","refreshLocks","size","void","complete","id","attempts","delete","retry","times","schedule","exponential","orDie","cause","pretty","renderErrorCause","interrupt","ids","suspend","tapErrorCause","logWarning","spaced","scheduleForked","fixed","annotateLogs","package","module","fiber","interruptible","mailboxes","lookup","maxAttempts","name","mailbox","capacity","poll","tap","rows","length","r","unprepared","withTransaction","gen","results","toOffer","forEach","element","JSON","parse","offer","onInterrupt","Array","from","e","yieldNow","sleep","sandbox","forkScoped","PersistedQueueStore","of","stringify","catchAllCause","fail","PersistedQueueError","message","squash","take","uninterruptibleMask","restore","get","QueueKey","flatMap","m","zipLeft","scoped","addFinalizer","match","onFailure","isInterruptedOnly","onSuccess","Class","layerStore"],"sources":["../../src/SqlPersistedQueue.ts"],"sourcesContent":[null],"mappings":"AAAA;;;AAGA,OAAO,KAAKA,cAAc,MAAM,qCAAqC;AACrE,OAAO,KAAKC,KAAK,MAAM,cAAc;AACrC,OAAO,KAAKC,IAAI,MAAM,aAAa;AACnC,OAAO,KAAKC,QAAQ,MAAM,iBAAiB;AAC3C,OAAO,KAAKC,MAAM,MAAM,eAAe;AACvC,OAAO,KAAKC,IAAI,MAAM,aAAa;AACnC,OAAO,KAAKC,KAAK,MAAM,cAAc;AACrC,OAAO,KAAKC,OAAO,MAAM,gBAAgB;AACzC,OAAO,KAAKC,KAAK,MAAM,cAAc;AACrC,OAAO,KAAKC,QAAQ,MAAM,iBAAiB;AAE3C,OAAO,KAAKC,SAAS,MAAM,gBAAgB;AAG3C;;;;AAIA,OAAO,MAAMC,IAAI,gBAYbP,MAAM,CAACQ,UAAU,CAAC,WAAUC,OAAO;EACrC,MAAMC,GAAG,GAAG,CAAC,OAAOJ,SAAS,CAACA,SAAS,EAAEK,iBAAiB,EAAE;EAC5D,MAAMC,SAAS,GAAGH,OAAO,EAAEG,SAAS,IAAI,cAAc;EACtD,MAAMC,YAAY,GAAGH,GAAG,CAACE,SAAS,CAAC;EACnC,MAAME,YAAY,GAAGL,OAAO,EAAEK,YAAY,GACtCf,QAAQ,CAACgB,MAAM,CAACN,OAAO,CAACK,YAAY,CAAC,GACrCf,QAAQ,CAACiB,MAAM,CAAC,IAAI,CAAC;EACzB,MAAMC,aAAa,GAAGR,OAAO,EAAEQ,aAAa,IAAI,CAAC;EACjD,MAAMC,gBAAgB,GAAGR,GAAG,CAACS,OAAO,CAACF,aAAa,CAACG,QAAQ,EAAE,CAAC;EAC9D,MAAMC,mBAAmB,GAAGZ,OAAO,EAAEY,mBAAmB,GACpDtB,QAAQ,CAACgB,MAAM,CAACN,OAAO,CAACY,mBAAmB,CAAC,GAC5CtB,QAAQ,CAACuB,OAAO,CAAC,EAAE,CAAC;EACxB,MAAMC,cAAc,GAAGd,OAAO,EAAEc,cAAc,GAAGxB,QAAQ,CAACgB,MAAM,CAACN,OAAO,CAACc,cAAc,CAAC,GAAGxB,QAAQ,CAACyB,OAAO,CAAC,CAAC,CAAC;EAC9G,MAAMC,iBAAiB,GAAGf,GAAG,CAACS,OAAO,CAACO,IAAI,CAACC,IAAI,CAAC5B,QAAQ,CAAC6B,SAAS,CAACL,cAAc,CAAC,CAAC,CAACH,QAAQ,EAAE,CAAC;EAC/F,MAAMS,QAAQ,GAAGC,MAAM,CAACC,UAAU,EAAE;EAEpC,MAAMC,MAAM,GAAGtB,GAAG,CAACuB,eAAe,CAAC;IACjCC,KAAK,EAAEA,CAAA,KAAMxB,GAAG,CAACS,OAAO,CAAC,WAAW,CAAC;IACrCgB,KAAK,EAAEA,CAAA,KAAMzB,GAAG,CAACS,OAAO,CAAC,OAAO,CAAC;IACjCiB,EAAE,EAAEA,CAAA,KAAM1B,GAAG,CAACS,OAAO,CAAC,OAAO,CAAC;IAC9B;IACAkB,MAAM,EAAEA,CAAA,KAAM3B,GAAG,CAACS,OAAO,CAAC,mBAAmB;GAC9C,CAAC;EAEF,MAAMmB,SAAS,GAAG5B,GAAG,CAACuB,eAAe,CAAC;IACpCG,EAAE,EAAEA,CAAA,KAAM1B,GAAG,GAAGsB,MAAM,gBAAgBP,iBAAiB,WAAW;IAClEU,KAAK,EAAEA,CAAA,KAAMzB,GAAG,YAAYsB,MAAM,cAAcP,iBAAiB,UAAU;IAC3ES,KAAK,EAAEA,CAAA,KAAMxB,GAAG,oBAAoBe,iBAAiB,KAAKO,MAAM,GAAG;IACnEK,MAAM,EAAEA,CAAA,KAAM3B,GAAG,YAAYsB,MAAM,OAAOP,iBAAiB;GAC5D,CAAC;EAEF,OAAOf,GAAG,CAACuB,eAAe,CAAC;IACzBE,KAAK,EAAEA,CAAA,KACLzB,GAAG,8BAA8BG,YAAY;;;;;;;;;;;QAW3C;IACJuB,EAAE,EAAEA,CAAA,KACF1B,GAAG,8BAA8BG,YAAY;;;;;;;;;;;QAW3C;IACJqB,KAAK,EAAEA,CAAA,KACLxB,GAAG,sDAAsDG,YAAY;qBACtDA,YAAY;;;;;;;;;;;QAWzB;IACJ;IACAwB,MAAM,EAAEA,CAAA,KACN3B,GAAG,8BAA8BG,YAAY;;;;;;;;;;;;GAYhD,CAAC;EAEF,OAAOH,GAAG,CAACuB,eAAe,CAAC;IACzBC,KAAK,EAAEA,CAAA,KACLxB,GAAG,+DAA+DE,SAAS;2BACtDC,YAAY,YAAYA,YAAY,iDAAiD;IAC5GsB,KAAK,EAAEA,CAAA,KACLzB,GAAG,gBACDA,GAAG,CAAC,OAAOE,SAAS,OAAO,CAC7B,OAAOC,YAAY,iDAAiD,CACjE0B,IAAI,CAACvC,MAAM,CAACwC,MAAM,CAAC;IACxBH,MAAM,EAAEA,CAAA,KACN3B,GAAG,8BACDA,GAAG,CAAC,OAAOE,SAAS,OAAO,CAC7B,OAAOC,YAAY;GACtB,CAAC;EAEF,OAAOH,GAAG,CAACuB,eAAe,CAAC;IACzBC,KAAK,EAAEA,CAAA,KACLxB,GAAG,+DAA+DE,SAAS;uBAC1DF,GAAG,CAAC,OAAOE,SAAS,SAAS,CAAC,OAAOC,YAAY,oBAAoB;IACxFsB,KAAK,EAAEA,CAAA,KACLzB,GAAG,gBAAgBA,GAAG,CAAC,OAAOE,SAAS,SAAS,CAAC,OAAOC,YAAY,oBAAoB,CAAC0B,IAAI,CAACvC,MAAM,CAACwC,MAAM,CAAC;IAC9GH,MAAM,EAAEA,CAAA,KAAM3B,GAAG,8BAA8BA,GAAG,CAAC,OAAOE,SAAS,SAAS,CAAC,OAAOC,YAAY;GACjG,CAAC;EAEF,MAAM4B,UAAU,GAAG/B,GAAG,CAACuB,eAAe,CAAC;IACrCC,KAAK,EAAEA,CAAA,KAAOQ,CAAS,IAAK,KAAKA,CAAC,GAAG;IACrCL,MAAM,EAAEA,CAAA,KAAOK,CAAS,IAAK,IAAIA,CAAC;GACnC,CAAC;EACF,MAAMC,aAAa,GAAID,CAAS,IAAKhC,GAAG,CAACS,OAAO,CAACsB,UAAU,CAACC,CAAC,CAAC,CAAC;EAC/D,MAAME,gBAAgB,GAAIC,GAA0B,IAAKnC,GAAG,CAACS,OAAO,CAAC,IAAI0B,GAAG,CAACC,GAAG,CAACL,UAAU,CAAC,CAACM,IAAI,CAAC,GAAG,CAAC,GAAG,CAAC;EAE1G,MAAMC,OAAO,GAAGtC,GAAG,CAACuB,eAAe,CAAC;IAClCgB,MAAM,EAAEA,CAAA,KAAMvC,GAAG,CAACS,OAAO,CAAC,GAAG,CAAC;IAC9BkB,MAAM,EAAEA,CAAA,KAAM3B,GAAG,CAACS,OAAO,CAAC,MAAM;GACjC,CAAC;EACF,MAAM+B,QAAQ,GAAGxC,GAAG,CAACuB,eAAe,CAAC;IACnCgB,MAAM,EAAEA,CAAA,KAAMvC,GAAG,CAACS,OAAO,CAAC,GAAG,CAAC;IAC9BkB,MAAM,EAAEA,CAAA,KAAM3B,GAAG,CAACS,OAAO,CAAC,OAAO;GAClC,CAAC;EAEF,MAAMgC,WAAW,GAAGR,aAAa,CAACd,QAAQ,CAAC;EAC3C,MAAMuB,UAAU,GAAG,IAAIC,GAAG,EAAU;EACpC,MAAMC,YAAY,GAAGA,CAAA,KAAoC;IACvD,IAAIF,UAAU,CAACG,IAAI,KAAK,CAAC,EAAE;MACzB,OAAOvD,MAAM,CAACwD,IAAI;IACpB;IACA,OAAO9C,GAAG;eACCG,YAAY;0BACDmB,MAAM;4BACJmB,WAAW;KAClC;EACH,CAAC;EACD,MAAMM,QAAQ,GAAGA,CAACC,EAAU,EAAEC,QAAgB,KAAI;IAChDP,UAAU,CAACQ,MAAM,CAACF,EAAE,CAAC;IACrB,OAAOhD,GAAG;eACCG,YAAY;iEACsCmB,MAAM,iBAAiBgB,OAAO,gBAAgBW,QAAQ;mBACpGD,EAAE;0BACKP,WAAW;KAChC,CAACZ,IAAI,CACJvC,MAAM,CAAC6D,KAAK,CAAC;MACXC,KAAK,EAAE,CAAC;MACRC,QAAQ,EAAE1D,QAAQ,CAAC2D,WAAW,CAAC,GAAG,EAAE,GAAG;KACxC,CAAC,EACFhE,MAAM,CAACiE,KAAK,CACb;EACH,CAAC;EACD,MAAMJ,KAAK,GAAGA,CAACH,EAAU,EAAEC,QAAgB,EAAEO,KAAuB,KAAI;IACtEd,UAAU,CAACQ,MAAM,CAACF,EAAE,CAAC;IACrB,OAAOhD,GAAG;eACCG,YAAY;iEACsCmB,MAAM,gBAAgB2B,QAAQ,oBACzF9D,KAAK,CAACsE,MAAM,CAACD,KAAK,EAAE;MAAEE,gBAAgB,EAAE;IAAI,CAAE,CAChD;mBACeV,EAAE;0BACKP,WAAW;KAChC,CAACZ,IAAI,CACJvC,MAAM,CAAC6D,KAAK,CAAC;MACXC,KAAK,EAAE,CAAC;MACRC,QAAQ,EAAE1D,QAAQ,CAAC2D,WAAW,CAAC,GAAG,EAAE,GAAG;KACxC,CAAC,EACFhE,MAAM,CAACiE,KAAK,CACb;EACH,CAAC;EACD,MAAMI,SAAS,GAAIC,GAAkB,IAAI;IACvC,KAAK,MAAMZ,EAAE,IAAIY,GAAG,EAAE;MACpBlB,UAAU,CAACQ,MAAM,CAACF,EAAE,CAAC;IACvB;IACA,OAAOhD,GAAG;eACCG,YAAY;;oBAEP+B,gBAAgB,CAAC0B,GAAG,CAAC;0BACfnB,WAAW;KAChC,CAACZ,IAAI,CACJvC,MAAM,CAAC6D,KAAK,CAAC;MACXC,KAAK,EAAE,CAAC;MACRC,QAAQ,EAAE1D,QAAQ,CAAC2D,WAAW,CAAC,GAAG,EAAE,GAAG;KACxC,CAAC,EACFhE,MAAM,CAACiE,KAAK,CACb;EACH,CAAC;EAED,OAAOjE,MAAM,CAACuE,OAAO,CAACjB,YAAY,CAAC,CAACf,IAAI,CACtCvC,MAAM,CAACwE,aAAa,CAACxE,MAAM,CAACyE,UAAU,CAAC,EACvCzE,MAAM,CAAC6D,KAAK,CAACxD,QAAQ,CAACqE,MAAM,CAAC,GAAG,CAAC,CAAC,EAClC1E,MAAM,CAAC2E,cAAc,CAACtE,QAAQ,CAACuE,KAAK,CAACvD,mBAAmB,CAAC,CAAC,EAC1DrB,MAAM,CAAC6E,YAAY,CAAC;IAClBC,OAAO,EAAE,aAAa;IACtBC,MAAM,EAAE,mBAAmB;IAC3BC,KAAK,EAAE;GACR,CAAC,EACFhF,MAAM,CAACiF,aAAa,CACrB;EAQD,MAAMC,SAAS,GAAG,OAAO9E,KAAK,CAACG,IAAI,CAAC;IAClC4E,MAAM,EAAEnF,MAAM,CAACQ,UAAU,CAAC,WAAU;MAAE4E,WAAW;MAAEC;IAAI,CAAY;MACjE,MAAMC,OAAO,GAAG,OAAOnF,OAAO,CAACI,IAAI,CAAU;QAAEgF,QAAQ,EAAE;MAAC,CAAE,CAAC;MAE7D,MAAMC,IAAI,GAAG9E,GAAG,CAACuB,eAAe,CAAC;QAC/BG,EAAE,EAAEA,CAAA,KACF1B,GAAY;qBACDG,YAAY;gCACDmB,MAAM,mBAAmBmB,WAAW;;+BAErCtC,YAAY;mCACRwE,IAAI;;+BAERD,WAAW;0DACgB9C,SAAS;;;sBAG7CpB,gBAAgB;;;WAG3B;QACHiB,KAAK,EAAEA,CAAA,KACLzB,GAAY;4DACsCG,YAAY;iCACvCwE,IAAI;;6BAERD,WAAW;wDACgB9C,SAAS;;oBAE7CpB,gBAAgB;;WAEzB,CAACqB,IAAI,CACJvC,MAAM,CAACyF,GAAG,CAAEC,IAAI,IAAI;UAClB,IAAIA,IAAI,CAACC,MAAM,KAAK,CAAC,EAAE,OAAO3F,MAAM,CAACwD,IAAI;UACzC,OAAO9C,GAAG;yBACCG,YAAY;oCACDmB,MAAM,mBAAmBmB,WAAW;8BAC1CP,gBAAgB,CAAC8C,IAAI,CAAC5C,GAAG,CAAE8C,CAAC,IAAKA,CAAC,CAAClC,EAAE,CAAC,CAAC;eACtD,CAACmC,UAAU;QACd,CAAC,CAAC,EACFnF,GAAG,CAACoF,eAAe,CACpB;QACH5D,KAAK,EAAEA,CAAA,KACLxB,GAAY;;2BAEKQ,gBAAgB,YAAYL,YAAY;mCAChCwE,IAAI;;+BAERD,WAAW;0DACgB9C,SAAS;;;;gCAInCN,MAAM,mBAAmBmB,WAAW;;mBAEjDtC,YAAY;;WAEpB;QACH;QACAwB,MAAM,EAAEA,CAAA,KACN3B,GAAY;qBACDG,YAAY;gCACDmB,MAAM,mBAAmBmB,WAAW;;+BAErCtC,YAAY;mCACRwE,IAAI;;+BAERD,WAAW;0DACgB9C,SAAS;;sBAE7CpB,gBAAgB;;;;OAI/B,CAAC;MAEF,OAAOlB,MAAM,CAAC+F,GAAG,CAAC,aAAS;QACzB,OAAO,IAAI,EAAE;UACX,MAAMC,OAAO,GAAG,OAAOR,IAAI;UAC3B,IAAIQ,OAAO,CAACL,MAAM,GAAG,CAAC,EAAE;YACtB,MAAMM,OAAO,GAAG,IAAI5C,GAAG,CAAC2C,OAAO,CAAC;YAChC,OAAOhG,MAAM,CAACkG,OAAO,CAACD,OAAO,EAAGE,OAAO,IAAI;cACzCA,OAAO,CAACA,OAAO,GAAGC,IAAI,CAACC,KAAK,CAACF,OAAO,CAACA,OAAO,CAAC;cAC7C,OAAOb,OAAO,CAACgB,KAAK,CAACH,OAAO,CAAC,CAAC5D,IAAI,CAChCvC,MAAM,CAACyF,GAAG,CAAC,MAAK;gBACdQ,OAAO,CAACrC,MAAM,CAACuC,OAAO,CAAC;cACzB,CAAC,CAAC,CACH;YACH,CAAC,CAAC,CAAC5D,IAAI,CACLvC,MAAM,CAACuG,WAAW,CAAC,MAAMlC,SAAS,CAACmC,KAAK,CAACC,IAAI,CAACR,OAAO,EAAGS,CAAC,IAAKA,CAAC,CAAChD,EAAE,CAAC,CAAC,CAAC,CACtE;YACD,OAAO1D,MAAM,CAAC2G,QAAQ,EAAE;UAC1B,CAAC,MAAM;YACL;YACA,OAAO3G,MAAM,CAAC4G,KAAK,CAAC9F,YAAY,CAAC;UACnC;QACF;MACF,CAAC,CAAC,CAACyB,IAAI,CACLvC,MAAM,CAAC6G,OAAO,EACd7G,MAAM,CAAC6D,KAAK,CAACxD,QAAQ,CAACqE,MAAM,CAAC,GAAG,CAAC,CAAC,EAClC1E,MAAM,CAAC8G,UAAU,EACjB9G,MAAM,CAACiF,aAAa,CACrB;MAED,OAAOK,OAAO;IAChB,CAAC;GACF,CAAC;EAEF,OAAO1F,cAAc,CAACmH,mBAAmB,CAACC,EAAE,CAAC;IAC3CV,KAAK,EAAEA,CAACjB,IAAI,EAAE3B,EAAE,EAAEyC,OAAO,KACvBnG,MAAM,CAACuE,OAAO,CAAC,MACb7D,GAAG;wBACaG,YAAY;oBAChB6C,EAAE,KAAK2B,IAAI,KAAKe,IAAI,CAACa,SAAS,CAACd,OAAO,CAAC,KAAKjD,QAAQ,QAAQlB,MAAM,KAAKA,MAAM;SACxF,CACF,CAACO,IAAI,CACJvC,MAAM,CAACkH,aAAa,CAAEhD,KAAK,IACzBlE,MAAM,CAACmH,IAAI,CACT,IAAIvH,cAAc,CAACwH,mBAAmB,CAAC;MACrCC,OAAO,EAAE,4CAA4C;MACrDnD,KAAK,EAAErE,KAAK,CAACyH,MAAM,CAACpD,KAAK;KAC1B,CAAC,CACH,CACF,CACF;IACHqD,IAAI,EAAEA,CAAC;MAAEnC,WAAW;MAAEC;IAAI,CAAE,KAC1BrF,MAAM,CAACwH,mBAAmB,CAAEC,OAAO,IACjCrH,KAAK,CAACsH,GAAG,CAACxC,SAAS,EAAE,IAAIyC,QAAQ,CAAC;MAAEtC,IAAI;MAAED;IAAW,CAAE,CAAC,CAAC,CAAC7C,IAAI,CAC5DvC,MAAM,CAAC4H,OAAO,CAAEC,CAAC,IAAK7H,MAAM,CAACiE,KAAK,CAAC4D,CAAC,CAACN,IAAI,CAAC,CAAC,EAC3CvH,MAAM,CAAC8H,OAAO,CAAC9H,MAAM,CAAC2G,QAAQ,EAAE,CAAC,EACjC3G,MAAM,CAAC+H,MAAM,EACbN,OAAO,EACPzH,MAAM,CAACyF,GAAG,CAAEU,OAAO,IACjBnG,MAAM,CAACgI,YAAY,CAAC/H,IAAI,CAACgI,KAAK,CAAC;MAC7BC,SAAS,EAAGhE,KAAK,IACfrE,KAAK,CAACsI,iBAAiB,CAACjE,KAAK,CAAC,GAC1BG,SAAS,CAAC,CAAC8B,OAAO,CAACzC,EAAE,CAAC,CAAC,GACvBG,KAAK,CAACsC,OAAO,CAACzC,EAAE,EAAEyC,OAAO,CAACxC,QAAQ,GAAG,CAAC,EAAEO,KAAK,CAAC;MACpDkE,SAAS,EAAEA,CAAA,KAAM3E,QAAQ,CAAC0C,OAAO,CAACzC,EAAE,EAAEyC,OAAO,CAACxC,QAAQ,GAAG,CAAC;KAC3D,CAAC,CAAC,CACJ,CACF;GAEN,CAAC;AACJ,CAAC,CAAC;AAEF,MAAMgE,QAAS,SAAQ7H,IAAI,CAACuI,KAG1B;AAEF;;;;AAIA,OAAO,MAAMC,UAAU,GAAI7H,OAAY,IAIlCP,KAAK,CAAC6H,MAAM,CAACnI,cAAc,CAACmH,mBAAmB,EAAExG,IAAI,CAACE,OAAO,CAAC,CAAC","ignoreList":[]} |
| { | ||
| "sideEffects": [], | ||
| "main": "../dist/cjs/SqlPersistedQueue.js", | ||
| "module": "../dist/esm/SqlPersistedQueue.js", | ||
| "types": "../dist/dts/SqlPersistedQueue.d.ts" | ||
| } |
| /** | ||
| * @since 1.0.0 | ||
| */ | ||
| import * as PersistedQueue from "@effect/experimental/PersistedQueue" | ||
| import * as Cause from "effect/Cause" | ||
| import * as Data from "effect/Data" | ||
| import * as Duration from "effect/Duration" | ||
| import * as Effect from "effect/Effect" | ||
| import * as Exit from "effect/Exit" | ||
| import * as Layer from "effect/Layer" | ||
| import * as Mailbox from "effect/Mailbox" | ||
| import * as RcMap from "effect/RcMap" | ||
| import * as Schedule from "effect/Schedule" | ||
| import type * as Scope from "effect/Scope" | ||
| import * as SqlClient from "./SqlClient.js" | ||
| import type { SqlError } from "./SqlError.js" | ||
| /** | ||
| * @since 1.0.0 | ||
| * @category constructors | ||
| */ | ||
| export const make: ( | ||
| options?: { | ||
| readonly tableName?: string | undefined | ||
| readonly pollInterval?: Duration.DurationInput | undefined | ||
| readonly pollBatchSize?: number | undefined | ||
| readonly lockRefreshInterval?: Duration.DurationInput | undefined | ||
| readonly lockExpiration?: Duration.DurationInput | undefined | ||
| } | undefined | ||
| ) => Effect.Effect< | ||
| PersistedQueue.PersistedQueueStore["Type"], | ||
| SqlError, | ||
| SqlClient.SqlClient | Scope.Scope | ||
| > = Effect.fnUntraced(function*(options) { | ||
| const sql = (yield* SqlClient.SqlClient).withoutTransforms() | ||
| const tableName = options?.tableName ?? "effect_queue" | ||
| const tableNameSql = sql(tableName) | ||
| const pollInterval = options?.pollInterval | ||
| ? Duration.decode(options.pollInterval) | ||
| : Duration.millis(1000) | ||
| const pollBatchSize = options?.pollBatchSize ?? 1 | ||
| const pollBatchSizeSql = sql.literal(pollBatchSize.toString()) | ||
| const lockRefreshInterval = options?.lockRefreshInterval | ||
| ? Duration.decode(options.lockRefreshInterval) | ||
| : Duration.seconds(30) | ||
| const lockExpiration = options?.lockExpiration ? Duration.decode(options.lockExpiration) : Duration.minutes(2) | ||
| const lockExpirationSql = sql.literal(Math.ceil(Duration.toSeconds(lockExpiration)).toString()) | ||
| const workerId = crypto.randomUUID() | ||
| const sqlNow = sql.onDialectOrElse({ | ||
| mssql: () => sql.literal("GETDATE()"), | ||
| mysql: () => sql.literal("NOW()"), | ||
| pg: () => sql.literal("NOW()"), | ||
| // sqlite | ||
| orElse: () => sql.literal("CURRENT_TIMESTAMP") | ||
| }) | ||
| const expiresAt = sql.onDialectOrElse({ | ||
| pg: () => sql`${sqlNow} - INTERVAL '${lockExpirationSql} seconds'`, | ||
| mysql: () => sql`DATE_SUB(${sqlNow}, INTERVAL ${lockExpirationSql} SECOND)`, | ||
| mssql: () => sql`DATEADD(SECOND, -${lockExpirationSql}, ${sqlNow})`, | ||
| orElse: () => sql`datetime(${sqlNow}, '-${lockExpirationSql} seconds')` | ||
| }) | ||
| yield* sql.onDialectOrElse({ | ||
| mysql: () => | ||
| sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( | ||
| id VARCHAR(36) PRIMARY KEY, | ||
| queue_name VARCHAR(100) NOT NULL, | ||
| element TEXT NOT NULL, | ||
| completed BOOLEAN NOT NULL, | ||
| attempts INT NOT NULL DEFAULT 0, | ||
| last_failure TEXT NULL, | ||
| acquired_at DATETIME NULL, | ||
| acquired_by VARCHAR(36) NULL, | ||
| created_at DATETIME NOT NULL, | ||
| updated_at DATETIME NOT NULL | ||
| )`, | ||
| pg: () => | ||
| sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( | ||
| id UUID PRIMARY KEY, | ||
| queue_name VARCHAR(100) NOT NULL, | ||
| element TEXT NOT NULL, | ||
| completed BOOLEAN NOT NULL, | ||
| attempts INTEGER NOT NULL DEFAULT 0, | ||
| last_failure TEXT NULL, | ||
| acquired_at TIMESTAMP NULL, | ||
| acquired_by UUID NULL, | ||
| created_at TIMESTAMP NOT NULL, | ||
| updated_at TIMESTAMP NOT NULL | ||
| )`, | ||
| mssql: () => | ||
| sql`IF NOT EXISTS (SELECT * FROM sysobjects WHERE name=${tableNameSql} AND xtype='U') | ||
| CREATE TABLE ${tableNameSql} ( | ||
| id UNIQUEIDENTIFIER PRIMARY KEY, | ||
| queue_name NVARCHAR(100) NOT NULL, | ||
| element NVARCHAR(MAX) NOT NULL, | ||
| completed BIT NOT NULL, | ||
| attempts INT NOT NULL DEFAULT 0, | ||
| last_failure NVARCHAR(MAX) NULL, | ||
| acquired_at DATETIME2 NULL, | ||
| acquired_by UNIQUEIDENTIFIER NULL, | ||
| created_at DATETIME2 NOT NULL, | ||
| updated_at DATETIME2 NOT NULL | ||
| )`, | ||
| // sqlite | ||
| orElse: () => | ||
| sql`CREATE TABLE IF NOT EXISTS ${tableNameSql} ( | ||
| id TEXT PRIMARY KEY, | ||
| queue_name TEXT NOT NULL, | ||
| element TEXT NOT NULL, | ||
| completed BOOLEAN NOT NULL, | ||
| attempts INTEGER NOT NULL DEFAULT 0, | ||
| last_failure TEXT NULL, | ||
| acquired_at DATETIME NULL, | ||
| acquired_by TEXT NULL, | ||
| created_at DATETIME NOT NULL, | ||
| updated_at DATETIME NOT NULL | ||
| )` | ||
| }) | ||
| yield* sql.onDialectOrElse({ | ||
| mssql: () => | ||
| sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_take') | ||
| CREATE INDEX idx_${tableNameSql}_take ON ${tableNameSql} (queue_name, completed, attempts, acquired_at)`, | ||
| mysql: () => | ||
| sql`CREATE INDEX ${ | ||
| sql(`idx_${tableName}_take`) | ||
| } ON ${tableNameSql} (queue_name, completed, attempts, acquired_at)` | ||
| .pipe(Effect.ignore), | ||
| orElse: () => | ||
| sql`CREATE INDEX IF NOT EXISTS ${ | ||
| sql(`idx_${tableName}_take`) | ||
| } ON ${tableNameSql} (queue_name, completed, attempts, acquired_at)` | ||
| }) | ||
| yield* sql.onDialectOrElse({ | ||
| mssql: () => | ||
| sql`IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = N'idx_${tableName}_update') | ||
| CREATE INDEX ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (id, acquired_by)`, | ||
| mysql: () => | ||
| sql`CREATE INDEX ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (id, acquired_by)`.pipe(Effect.ignore), | ||
| orElse: () => sql`CREATE INDEX IF NOT EXISTS ${sql(`idx_${tableName}_update`)} ON ${tableNameSql} (id, acquired_by)` | ||
| }) | ||
| const wrapString = sql.onDialectOrElse({ | ||
| mssql: () => (s: string) => `N'${s}'`, | ||
| orElse: () => (s: string) => `'${s}'` | ||
| }) | ||
| const stringLiteral = (s: string) => sql.literal(wrapString(s)) | ||
| const stringLiteralArr = (arr: ReadonlyArray<string>) => sql.literal(`(${arr.map(wrapString).join(",")})`) | ||
| const sqlTrue = sql.onDialectOrElse({ | ||
| sqlite: () => sql.literal("1"), | ||
| orElse: () => sql.literal("TRUE") | ||
| }) | ||
| const sqlFalse = sql.onDialectOrElse({ | ||
| sqlite: () => sql.literal("0"), | ||
| orElse: () => sql.literal("FALSE") | ||
| }) | ||
| const workerIdSql = stringLiteral(workerId) | ||
| const elementIds = new Set<string>() | ||
| const refreshLocks = (): Effect.Effect<void, SqlError> => { | ||
| if (elementIds.size === 0) { | ||
| return Effect.void | ||
| } | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow} | ||
| WHERE acquired_by = ${workerIdSql} | ||
| ` | ||
| } | ||
| const complete = (id: string, attempts: number) => { | ||
| elementIds.delete(id) | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = NULL, acquired_by = NULL, updated_at = ${sqlNow}, completed = ${sqlTrue}, attempts = ${attempts} | ||
| WHERE id = ${id} | ||
| AND acquired_by = ${workerIdSql} | ||
| `.pipe( | ||
| Effect.retry({ | ||
| times: 5, | ||
| schedule: Schedule.exponential(100, 1.5) | ||
| }), | ||
| Effect.orDie | ||
| ) | ||
| } | ||
| const retry = (id: string, attempts: number, cause: Cause.Cause<any>) => { | ||
| elementIds.delete(id) | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = NULL, acquired_by = NULL, updated_at = ${sqlNow}, attempts = ${attempts}, last_failure = ${ | ||
| Cause.pretty(cause, { renderErrorCause: true }) | ||
| } | ||
| WHERE id = ${id} | ||
| AND acquired_by = ${workerIdSql} | ||
| `.pipe( | ||
| Effect.retry({ | ||
| times: 5, | ||
| schedule: Schedule.exponential(100, 1.5) | ||
| }), | ||
| Effect.orDie | ||
| ) | ||
| } | ||
| const interrupt = (ids: Array<string>) => { | ||
| for (const id of ids) { | ||
| elementIds.delete(id) | ||
| } | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = NULL, acquired_by = NULL | ||
| WHERE id IN ${stringLiteralArr(ids)} | ||
| AND acquired_by = ${workerIdSql} | ||
| `.pipe( | ||
| Effect.retry({ | ||
| times: 5, | ||
| schedule: Schedule.exponential(100, 1.5) | ||
| }), | ||
| Effect.orDie | ||
| ) | ||
| } | ||
| yield* Effect.suspend(refreshLocks).pipe( | ||
| Effect.tapErrorCause(Effect.logWarning), | ||
| Effect.retry(Schedule.spaced(500)), | ||
| Effect.scheduleForked(Schedule.fixed(lockRefreshInterval)), | ||
| Effect.annotateLogs({ | ||
| package: "@effect/sql", | ||
| module: "SqlPersistedQueue", | ||
| fiber: "refreshLocks" | ||
| }), | ||
| Effect.interruptible | ||
| ) | ||
| type Element = { | ||
| readonly id: string | ||
| readonly queue_name: string | ||
| element: string | ||
| readonly attempts: number | ||
| } | ||
| const mailboxes = yield* RcMap.make({ | ||
| lookup: Effect.fnUntraced(function*({ maxAttempts, name }: QueueKey) { | ||
| const mailbox = yield* Mailbox.make<Element>({ capacity: 0 }) | ||
| const poll = sql.onDialectOrElse({ | ||
| pg: () => | ||
| sql<Element>` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| WHERE id IN ( | ||
| SELECT id FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = FALSE | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| FOR UPDATE SKIP LOCKED | ||
| LIMIT ${pollBatchSizeSql} | ||
| ) | ||
| RETURNING id, queue_name, element, attempts | ||
| `, | ||
| mysql: () => | ||
| sql<Element>` | ||
| SELECT id, queue_name, element, attempts FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = FALSE | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| LIMIT ${pollBatchSizeSql} | ||
| FOR UPDATE SKIP LOCKED | ||
| `.pipe( | ||
| Effect.tap((rows) => { | ||
| if (rows.length === 0) return Effect.void | ||
| return sql` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| WHERE id IN ${stringLiteralArr(rows.map((r) => r.id))} | ||
| `.unprepared | ||
| }), | ||
| sql.withTransaction | ||
| ), | ||
| mssql: () => | ||
| sql<Element>` | ||
| WITH cte AS ( | ||
| SELECT TOP ${pollBatchSizeSql} id FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = 0 | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| ) | ||
| UPDATE q | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| OUTPUT inserted.id, inserted.queue_name, inserted.element, inserted.attempts | ||
| FROM ${tableNameSql} AS q | ||
| INNER JOIN cte ON q.id = cte.id | ||
| `, | ||
| // sqlite | ||
| orElse: () => | ||
| sql<Element>` | ||
| UPDATE ${tableNameSql} | ||
| SET acquired_at = ${sqlNow}, acquired_by = ${workerIdSql} | ||
| WHERE id IN ( | ||
| SELECT id FROM ${tableNameSql} | ||
| WHERE queue_name = ${name} | ||
| AND completed = 0 | ||
| AND attempts < ${maxAttempts} | ||
| AND (acquired_at IS NULL OR acquired_at < ${expiresAt}) | ||
| ORDER BY updated_at ASC | ||
| LIMIT ${pollBatchSizeSql} | ||
| ) | ||
| RETURNING id, queue_name, element, attempts | ||
| ` | ||
| }) | ||
| yield* Effect.gen(function*() { | ||
| while (true) { | ||
| const results = yield* poll | ||
| if (results.length > 0) { | ||
| const toOffer = new Set(results) | ||
| yield* Effect.forEach(toOffer, (element) => { | ||
| element.element = JSON.parse(element.element) | ||
| return mailbox.offer(element).pipe( | ||
| Effect.tap(() => { | ||
| toOffer.delete(element) | ||
| }) | ||
| ) | ||
| }).pipe( | ||
| Effect.onInterrupt(() => interrupt(Array.from(toOffer, (e) => e.id))) | ||
| ) | ||
| yield* Effect.yieldNow() | ||
| } else { | ||
| // TODO: use listen/notify or equivalent to avoid polling | ||
| yield* Effect.sleep(pollInterval) | ||
| } | ||
| } | ||
| }).pipe( | ||
| Effect.sandbox, | ||
| Effect.retry(Schedule.spaced(500)), | ||
| Effect.forkScoped, | ||
| Effect.interruptible | ||
| ) | ||
| return mailbox | ||
| }) | ||
| }) | ||
| return PersistedQueue.PersistedQueueStore.of({ | ||
| offer: (name, id, element) => | ||
| Effect.suspend(() => | ||
| sql` | ||
| INSERT INTO ${tableNameSql} (id, queue_name, element, completed, attempts, created_at, updated_at) | ||
| VALUES (${id}, ${name}, ${JSON.stringify(element)}, ${sqlFalse}, 0, ${sqlNow}, ${sqlNow}) | ||
| ` | ||
| ).pipe( | ||
| Effect.catchAllCause((cause) => | ||
| Effect.fail( | ||
| new PersistedQueue.PersistedQueueError({ | ||
| message: "Failed to offer element to persisted queue", | ||
| cause: Cause.squash(cause) | ||
| }) | ||
| ) | ||
| ) | ||
| ), | ||
| take: ({ maxAttempts, name }) => | ||
| Effect.uninterruptibleMask((restore) => | ||
| RcMap.get(mailboxes, new QueueKey({ name, maxAttempts })).pipe( | ||
| Effect.flatMap((m) => Effect.orDie(m.take)), | ||
| Effect.zipLeft(Effect.yieldNow()), | ||
| Effect.scoped, | ||
| restore, | ||
| Effect.tap((element) => | ||
| Effect.addFinalizer(Exit.match({ | ||
| onFailure: (cause) => | ||
| Cause.isInterruptedOnly(cause) | ||
| ? interrupt([element.id]) | ||
| : retry(element.id, element.attempts + 1, cause), | ||
| onSuccess: () => complete(element.id, element.attempts + 1) | ||
| })) | ||
| ) | ||
| ) | ||
| ) | ||
| }) | ||
| }) | ||
| class QueueKey extends Data.Class<{ | ||
| readonly name: string | ||
| readonly maxAttempts: number | ||
| }> {} | ||
| /** | ||
| * @since 1.0.0 | ||
| * @category layers | ||
| */ | ||
| export const layerStore = (options?: {}): Layer.Layer< | ||
| PersistedQueue.PersistedQueueStore, | ||
| SqlError, | ||
| SqlClient.SqlClient | ||
| > => Layer.scoped(PersistedQueue.PersistedQueueStore, make(options)) |
@@ -6,3 +6,3 @@ "use strict"; | ||
| }); | ||
| exports.Statement = exports.SqlStream = exports.SqlSchema = exports.SqlResolver = exports.SqlEventLogServer = exports.SqlEventJournal = exports.SqlError = exports.SqlConnection = exports.SqlClient = exports.Model = exports.Migrator = void 0; | ||
| exports.Statement = exports.SqlStream = exports.SqlSchema = exports.SqlResolver = exports.SqlPersistedQueue = exports.SqlEventLogServer = exports.SqlEventJournal = exports.SqlError = exports.SqlConnection = exports.SqlClient = exports.Model = exports.Migrator = void 0; | ||
| var _Migrator = _interopRequireWildcard(require("./Migrator.js")); | ||
@@ -22,2 +22,4 @@ exports.Migrator = _Migrator; | ||
| exports.SqlEventLogServer = _SqlEventLogServer; | ||
| var _SqlPersistedQueue = _interopRequireWildcard(require("./SqlPersistedQueue.js")); | ||
| exports.SqlPersistedQueue = _SqlPersistedQueue; | ||
| var _SqlResolver = _interopRequireWildcard(require("./SqlResolver.js")); | ||
@@ -24,0 +26,0 @@ exports.SqlResolver = _SqlResolver; |
@@ -32,2 +32,6 @@ /** | ||
| */ | ||
| export * as SqlPersistedQueue from "./SqlPersistedQueue.js"; | ||
| /** | ||
| * @since 1.0.0 | ||
| */ | ||
| export * as SqlResolver from "./SqlResolver.js"; | ||
@@ -34,0 +38,0 @@ /** |
@@ -1,1 +0,1 @@ | ||
| {"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AAAA;;GAEG;AACH,OAAO,KAAK,QAAQ,MAAM,eAAe,CAAA;AAEzC;;GAEG;AACH,OAAO,KAAK,KAAK,MAAM,YAAY,CAAA;AAEnC;;GAEG;AACH,OAAO,KAAK,SAAS,MAAM,gBAAgB,CAAA;AAE3C;;GAEG;AACH,OAAO,KAAK,aAAa,MAAM,oBAAoB,CAAA;AAEnD;;GAEG;AACH,OAAO,KAAK,QAAQ,MAAM,eAAe,CAAA;AAEzC;;GAEG;AACH,OAAO,KAAK,eAAe,MAAM,sBAAsB,CAAA;AAEvD;;GAEG;AACH,OAAO,KAAK,iBAAiB,MAAM,wBAAwB,CAAA;AAE3D;;GAEG;AACH,OAAO,KAAK,WAAW,MAAM,kBAAkB,CAAA;AAE/C;;GAEG;AACH,OAAO,KAAK,SAAS,MAAM,gBAAgB,CAAA;AAE3C;;GAEG;AACH,OAAO,KAAK,SAAS,MAAM,gBAAgB,CAAA;AAE3C;;GAEG;AACH,OAAO,KAAK,SAAS,MAAM,gBAAgB,CAAA"} | ||
| {"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../../src/index.ts"],"names":[],"mappings":"AAAA;;GAEG;AACH,OAAO,KAAK,QAAQ,MAAM,eAAe,CAAA;AAEzC;;GAEG;AACH,OAAO,KAAK,KAAK,MAAM,YAAY,CAAA;AAEnC;;GAEG;AACH,OAAO,KAAK,SAAS,MAAM,gBAAgB,CAAA;AAE3C;;GAEG;AACH,OAAO,KAAK,aAAa,MAAM,oBAAoB,CAAA;AAEnD;;GAEG;AACH,OAAO,KAAK,QAAQ,MAAM,eAAe,CAAA;AAEzC;;GAEG;AACH,OAAO,KAAK,eAAe,MAAM,sBAAsB,CAAA;AAEvD;;GAEG;AACH,OAAO,KAAK,iBAAiB,MAAM,wBAAwB,CAAA;AAE3D;;GAEG;AACH,OAAO,KAAK,iBAAiB,MAAM,wBAAwB,CAAA;AAE3D;;GAEG;AACH,OAAO,KAAK,WAAW,MAAM,kBAAkB,CAAA;AAE/C;;GAEG;AACH,OAAO,KAAK,SAAS,MAAM,gBAAgB,CAAA;AAE3C;;GAEG;AACH,OAAO,KAAK,SAAS,MAAM,gBAAgB,CAAA;AAE3C;;GAEG;AACH,OAAO,KAAK,SAAS,MAAM,gBAAgB,CAAA"} |
@@ -16,3 +16,3 @@ /** | ||
| export declare class SqlError extends SqlError_base<{ | ||
| cause: unknown; | ||
| cause?: unknown; | ||
| message?: string; | ||
@@ -19,0 +19,0 @@ }> { |
@@ -1,1 +0,1 @@ | ||
| {"version":3,"file":"SqlError.d.ts","sourceRoot":"","sources":["../../src/SqlError.ts"],"names":[],"mappings":"AAKA;;GAEG;AACH,eAAO,MAAM,cAAc,EAAE,OAAO,MAA2C,CAAA;AAE/E;;GAEG;AACH,MAAM,MAAM,cAAc,GAAG,OAAO,cAAc,CAAA;;;;AAElD;;GAEG;AACH,qBAAa,QAAS,SAAQ,cAAwC;IACpE,KAAK,EAAE,OAAO,CAAA;IACd,OAAO,CAAC,EAAE,MAAM,CAAA;CACjB,CAAC;CAAG;;;;AAEL;;GAEG;AACH,qBAAa,oBAAqB,SAAQ,0BAAoD;IAC5F,QAAQ,CAAC,QAAQ,EAAE,MAAM,CAAA;IACzB,QAAQ,CAAC,MAAM,EAAE,MAAM,CAAA;CACxB,CAAC;IACA,IAAI,OAAO,WAEV;CACF"} | ||
| {"version":3,"file":"SqlError.d.ts","sourceRoot":"","sources":["../../src/SqlError.ts"],"names":[],"mappings":"AAKA;;GAEG;AACH,eAAO,MAAM,cAAc,EAAE,OAAO,MAA2C,CAAA;AAE/E;;GAEG;AACH,MAAM,MAAM,cAAc,GAAG,OAAO,cAAc,CAAA;;;;AAElD;;GAEG;AACH,qBAAa,QAAS,SAAQ,cAAwC;IACpE,KAAK,CAAC,EAAE,OAAO,CAAA;IACf,OAAO,CAAC,EAAE,MAAM,CAAA;CACjB,CAAC;CAAG;;;;AAEL;;GAEG;AACH,qBAAa,oBAAqB,SAAQ,0BAAoD;IAC5F,QAAQ,CAAC,QAAQ,EAAE,MAAM,CAAA;IACzB,QAAQ,CAAC,MAAM,EAAE,MAAM,CAAA;CACxB,CAAC;IACA,IAAI,OAAO,WAEV;CACF"} |
@@ -32,2 +32,6 @@ /** | ||
| */ | ||
| export * as SqlPersistedQueue from "./SqlPersistedQueue.js"; | ||
| /** | ||
| * @since 1.0.0 | ||
| */ | ||
| export * as SqlResolver from "./SqlResolver.js"; | ||
@@ -34,0 +38,0 @@ /** |
@@ -1,1 +0,1 @@ | ||
| {"version":3,"file":"index.js","names":["Migrator","Model","SqlClient","SqlConnection","SqlError","SqlEventJournal","SqlEventLogServer","SqlResolver","SqlSchema","SqlStream","Statement"],"sources":["../../src/index.ts"],"sourcesContent":[null],"mappings":"AAAA;;;AAGA,OAAO,KAAKA,QAAQ,MAAM,eAAe;AAEzC;;;AAGA,OAAO,KAAKC,KAAK,MAAM,YAAY;AAEnC;;;AAGA,OAAO,KAAKC,SAAS,MAAM,gBAAgB;AAE3C;;;AAGA,OAAO,KAAKC,aAAa,MAAM,oBAAoB;AAEnD;;;AAGA,OAAO,KAAKC,QAAQ,MAAM,eAAe;AAEzC;;;AAGA,OAAO,KAAKC,eAAe,MAAM,sBAAsB;AAEvD;;;AAGA,OAAO,KAAKC,iBAAiB,MAAM,wBAAwB;AAE3D;;;AAGA,OAAO,KAAKC,WAAW,MAAM,kBAAkB;AAE/C;;;AAGA,OAAO,KAAKC,SAAS,MAAM,gBAAgB;AAE3C;;;AAGA,OAAO,KAAKC,SAAS,MAAM,gBAAgB;AAE3C;;;AAGA,OAAO,KAAKC,SAAS,MAAM,gBAAgB","ignoreList":[]} | ||
| {"version":3,"file":"index.js","names":["Migrator","Model","SqlClient","SqlConnection","SqlError","SqlEventJournal","SqlEventLogServer","SqlPersistedQueue","SqlResolver","SqlSchema","SqlStream","Statement"],"sources":["../../src/index.ts"],"sourcesContent":[null],"mappings":"AAAA;;;AAGA,OAAO,KAAKA,QAAQ,MAAM,eAAe;AAEzC;;;AAGA,OAAO,KAAKC,KAAK,MAAM,YAAY;AAEnC;;;AAGA,OAAO,KAAKC,SAAS,MAAM,gBAAgB;AAE3C;;;AAGA,OAAO,KAAKC,aAAa,MAAM,oBAAoB;AAEnD;;;AAGA,OAAO,KAAKC,QAAQ,MAAM,eAAe;AAEzC;;;AAGA,OAAO,KAAKC,eAAe,MAAM,sBAAsB;AAEvD;;;AAGA,OAAO,KAAKC,iBAAiB,MAAM,wBAAwB;AAE3D;;;AAGA,OAAO,KAAKC,iBAAiB,MAAM,wBAAwB;AAE3D;;;AAGA,OAAO,KAAKC,WAAW,MAAM,kBAAkB;AAE/C;;;AAGA,OAAO,KAAKC,SAAS,MAAM,gBAAgB;AAE3C;;;AAGA,OAAO,KAAKC,SAAS,MAAM,gBAAgB;AAE3C;;;AAGA,OAAO,KAAKC,SAAS,MAAM,gBAAgB","ignoreList":[]} |
+12
-4
| { | ||
| "name": "@effect/sql", | ||
| "version": "0.48.0", | ||
| "version": "0.48.1", | ||
| "description": "A SQL toolkit for Effect", | ||
@@ -17,5 +17,5 @@ "license": "MIT", | ||
| "peerDependencies": { | ||
| "@effect/experimental": "^0.57.0", | ||
| "@effect/platform": "^0.93.0", | ||
| "effect": "^3.19.0" | ||
| "@effect/experimental": "^0.57.5", | ||
| "@effect/platform": "^0.93.4", | ||
| "effect": "^3.19.6" | ||
| }, | ||
@@ -75,2 +75,7 @@ "publishConfig": { | ||
| }, | ||
| "./SqlPersistedQueue": { | ||
| "types": "./dist/dts/SqlPersistedQueue.d.ts", | ||
| "import": "./dist/esm/SqlPersistedQueue.js", | ||
| "default": "./dist/cjs/SqlPersistedQueue.js" | ||
| }, | ||
| "./SqlResolver": { | ||
@@ -128,2 +133,5 @@ "types": "./dist/dts/SqlResolver.d.ts", | ||
| ], | ||
| "SqlPersistedQueue": [ | ||
| "./dist/dts/SqlPersistedQueue.d.ts" | ||
| ], | ||
| "SqlResolver": [ | ||
@@ -130,0 +138,0 @@ "./dist/dts/SqlResolver.d.ts" |
+5
-0
@@ -39,2 +39,7 @@ /** | ||
| */ | ||
| export * as SqlPersistedQueue from "./SqlPersistedQueue.js" | ||
| /** | ||
| * @since 1.0.0 | ||
| */ | ||
| export * as SqlResolver from "./SqlResolver.js" | ||
@@ -41,0 +46,0 @@ |
+1
-1
@@ -20,3 +20,3 @@ /** | ||
| export class SqlError extends TypeIdError(SqlErrorTypeId, "SqlError")<{ | ||
| cause: unknown | ||
| cause?: unknown | ||
| message?: string | ||
@@ -23,0 +23,0 @@ }> {} |
722538
9.86%130
6.56%11721
9.57%