You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 4-6.RSVP
Socket
Book a DemoInstallSign in
Socket

@effect/workflow

Package Overview
Dependencies
Maintainers
3
Versions
42
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@effect/workflow

Durable workflows for Effect

0.6.0
Source
npmnpm
Version published
Weekly downloads
139K
-19.36%
Maintainers
3
Weekly downloads
 
Created
Source

@effect/workflow

Build and run durable workflows in TypeScript with Effect.

Example

import { ClusterWorkflowEngine } from "@effect/cluster"
import { NodeClusterRunnerSocket, NodeRuntime } from "@effect/platform-node"
import { PgClient } from "@effect/sql-pg"
import {
  Activity,
  DurableClock,
  DurableDeferred,
  Workflow
} from "@effect/workflow"
import { Effect, Layer, Redacted, Schema } from "effect"

// Define a custom error for the SendEmail activity
class SendEmailError extends Schema.TaggedError<SendEmailError>(
  "SendEmailError"
)("SendEmailError", {
  message: Schema.String
}) {}

// Define a workflow using the `Workflow.make` api.
const EmailWorkflow = Workflow.make({
  // Every workflow needs a unique name
  name: "EmailWorkflow",
  // Add a success schema. You can omit this to use the default value `Schema.Void`
  success: Schema.Void,
  // Add an error schema. You can omit this to use the default value `Schema.Never`
  error: SendEmailError,
  // Define the payload for the workflow
  payload: {
    id: Schema.String,
    to: Schema.String
  },
  // Define the idempotency key for the workflow. This is used to ensure that
  // the workflow is not duplicated if it is retried.
  idempotencyKey: ({ id }) => id
})

// Once you have defined the workflow, you can create a layer for by providing
// the implementation.
const EmailWorkflowLayer = EmailWorkflow.toLayer(
  Effect.fn(function* (payload, executionId) {
    // An `Activity` represents an unit of work in the workflow.
    // They will only ever be executed once, unless you use `Activity.retry`.
    yield* Activity.make({
      name: "SendEmail",
      error: SendEmailError,
      execute: Effect.gen(function* () {
        // You can access the current attempt number of the activity.
        const attempt = yield* Activity.CurrentAttempt

        yield* Effect.annotateLogs(Effect.log(`Sending email`), {
          id: payload.id,
          executionId,
          attempt
        })

        if (attempt !== 5) {
          return yield* new SendEmailError({
            message: `Failed to send email for ${payload.id} on attempt ${attempt}`
          })
        }
      })
    }).pipe(
      Activity.retry({ times: 5 }),
      EmailWorkflow.withCompensation(
        Effect.fn(function* (value, cause) {
          // This is a compensation finalizer that will be executed if the workflow
          // fails.
          //
          // You can use the success `value` of the wrapped effect, as well as the
          // Cause of the workflow failure.
          yield* Effect.log(`Compensating activity SendEmail`)
        })
      )
    )

    // Use the `DurableClock` to sleep for a specified duration.
    // The workflow will pause execution for the specified duration.
    //
    // You can sleep for as long as you want - when the workflow pauses it
    // consumes no resources.
    yield* Effect.log("Sleeping for 10 seconds")
    yield* DurableClock.sleep({
      name: "Some sleep",
      duration: "10 seconds"
    })
    yield* Effect.log("Woke up")

    // You can use `DurableDeferred` to create a signal that can be awaited later.
    const EmailTrigger = DurableDeferred.make("EmailTrigger")

    // You can use the `DurableDeferred.token` api to acquire the token that can
    // later be used with `DurableDeferred.done / succeed / fail`
    const token = yield* DurableDeferred.token(EmailTrigger)

    // You then use the token to send a result to the deferred.
    //
    // This doesn't need to be done inside the workflow, it just needs access to
    // the `WorkflowEngine` service.
    yield* DurableDeferred.succeed(EmailTrigger, {
      token,
      value: void 0
    }).pipe(
      Effect.delay("1 second"), // Simulate some delay before completing the deferred
      Effect.forkDaemon
    )

    // Finally, you can await the deferred to get the result.
    //
    // It will pause the workflow until the deferred is completed.
    yield* DurableDeferred.await(EmailTrigger)
  })
)

// To integrate with @effect/cluster, you can use the
// `ClusterWorkflowEngine.layer` Layer, and provide it with your cluster Runner
// layer.
const WorkflowEngineLayer = ClusterWorkflowEngine.layer.pipe(
  Layer.provideMerge(NodeClusterRunnerSocket.layer({ storage: "sql" })),
  Layer.provideMerge(
    PgClient.layer({
      database: "effect_cluster",
      username: "cluster",
      password: Redacted.make("cluster")
    })
  )
)

const EnvLayer = Layer.mergeAll(
  EmailWorkflowLayer
  // You can add any other cluster entities or workflow layers here
).pipe(Layer.provide(WorkflowEngineLayer))

// Finally, you can execute a workflow using the `.execute` method.
EmailWorkflow.execute({ id: "123", to: "hello@timsmart.co" }).pipe(
  Effect.provide(EnvLayer),
  NodeRuntime.runMain
)

FAQs

Package last updated on 21 Jul 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts