@effect/workflow
Build and run durable workflows in TypeScript with Effect.
Example
import { ClusterWorkflowEngine } from "@effect/cluster"
import { NodeClusterSocket, NodeRuntime } from "@effect/platform-node"
import { PgClient } from "@effect/sql-pg"
import {
Activity,
DurableClock,
DurableDeferred,
Workflow
} from "@effect/workflow"
import { Effect, Layer, Redacted, Schema } from "effect"
class SendEmailError extends Schema.TaggedError<SendEmailError>(
"SendEmailError"
)("SendEmailError", {
message: Schema.String
}) {}
const EmailWorkflow = Workflow.make({
name: "EmailWorkflow",
success: Schema.Void,
error: SendEmailError,
payload: {
id: Schema.String,
to: Schema.String
},
idempotencyKey: ({ id }) => id
})
const EmailWorkflowLayer = EmailWorkflow.toLayer(
Effect.fn(function* (payload, executionId) {
yield* Activity.make({
name: "SendEmail",
error: SendEmailError,
execute: Effect.gen(function* () {
const attempt = yield* Activity.CurrentAttempt
yield* Effect.annotateLogs(Effect.log(`Sending email`), {
id: payload.id,
executionId,
attempt
})
if (attempt !== 5) {
return yield* new SendEmailError({
message: `Failed to send email for ${payload.id} on attempt ${attempt}`
})
}
})
}).pipe(
Activity.retry({ times: 5 }),
EmailWorkflow.withCompensation(
Effect.fn(function* (value, cause) {
yield* Effect.log(`Compensating activity SendEmail`)
})
)
)
yield* Effect.log("Sleeping for 10 seconds")
yield* DurableClock.sleep({
name: "Some sleep",
duration: "10 seconds"
})
yield* Effect.log("Woke up")
const EmailTrigger = DurableDeferred.make("EmailTrigger")
const token = yield* DurableDeferred.token(EmailTrigger)
yield* DurableDeferred.succeed(EmailTrigger, {
token,
value: void 0
}).pipe(
Effect.delay("1 second"),
Effect.forkDaemon
)
yield* DurableDeferred.await(EmailTrigger)
})
)
const WorkflowEngineLayer = ClusterWorkflowEngine.layer.pipe(
Layer.provideMerge(NodeClusterSocket.layer()),
Layer.provideMerge(
PgClient.layer({
database: "effect_cluster",
username: "cluster",
password: Redacted.make("cluster")
})
)
)
const EnvLayer = Layer.mergeAll(
EmailWorkflowLayer
).pipe(Layer.provide(WorkflowEngineLayer))
EmailWorkflow.execute({ id: "123", to: "hello@timsmart.co" }).pipe(
Effect.provide(EnvLayer),
NodeRuntime.runMain
)