New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

@rotorsoft/act-pg

Package Overview
Dependencies
Maintainers
1
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@rotorsoft/act-pg

act pg adapters

latest
Source
npmnpm
Version
0.14.2
Version published
Maintainers
1
Created
Source

@rotorsoft/act-pg

NPM Version NPM Downloads Build Status License: MIT

PostgreSQL event store adapter for @rotorsoft/act. Provides persistent, production-ready event storage with ACID guarantees, connection pooling, and distributed stream processing.

Installation

npm install @rotorsoft/act @rotorsoft/act-pg
# or
pnpm add @rotorsoft/act @rotorsoft/act-pg

Requirements: Node.js >= 22.18.0, PostgreSQL >= 14

Usage

import { act, state, store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";
import { z } from "zod";

// Inject the PostgreSQL store before building your app
store(new PostgresStore({
  host: "localhost",
  port: 5432,
  database: "myapp",
  user: "postgres",
  password: "secret",
}));

// Initialize tables (creates schema, events table, streams table, and indexes)
await store().seed();

// Build and use your app as normal
const Counter = state({ Counter: z.object({ count: z.number() }) })
  .init(() => ({ count: 0 }))
  .emits({ Incremented: z.object({ amount: z.number() }) })
  .patch({ Incremented: ({ data }, s) => ({ count: s.count + data.amount }) })  // optional — only for custom reducers
  .on({ increment: z.object({ by: z.number() }) })
    .emit((action) => ["Incremented", { amount: action.by }])
  .build();

const app = act().withState(Counter).build();
await app.do("increment", { stream: "counter1", actor: { id: "1", name: "User" } }, { by: 1 });

Configuration

All configuration fields are optional and have sensible defaults:

OptionDefaultDescription
hostlocalhostPostgreSQL host
port5432PostgreSQL port
databasepostgresDatabase name
userpostgresDatabase user
passwordpostgresDatabase password
schemapublicSchema for event tables
tableeventsBase name for event tables

Custom Schema and Table Names

const pgStore = new PostgresStore({
  host: "db.example.com",
  database: "production",
  user: "app_user",
  password: process.env.DB_PASSWORD,
  schema: "events",       // custom schema
  table: "act_events",    // creates act_events and act_events_streams tables
});

Environment-Based Configuration

if (process.env.NODE_ENV === "production") {
  store(new PostgresStore({
    host: process.env.DB_HOST,
    port: parseInt(process.env.DB_PORT || "5432"),
    database: process.env.DB_NAME,
    user: process.env.DB_USER,
    password: process.env.DB_PASSWORD,
  }));
}
// In development, the default InMemoryStore is used

Features

  • ACID Transactions - Events are committed atomically within PostgreSQL transactions
  • Optimistic Concurrency - Version-based conflict detection prevents lost updates
  • Connection Pooling - Uses node-postgres Pool for efficient connection management
  • Atomic Stream Claiming - Zero-contention competing consumers via FOR UPDATE SKIP LOCKED
  • Auto Schema Setup - seed() creates all required tables, indexes, and schema
  • NOTIFY/LISTEN - Real-time event notifications via PostgreSQL channels
  • Multi-Tenant - Isolate tenants using separate schemas

Database Schema

Calling seed() creates two tables:

Events table ({schema}.{table}) - stores all committed events:

  • id (serial) - global event sequence
  • name - event type name
  • data (jsonb) - event payload
  • stream - stream identifier
  • version - per-stream sequence number
  • created (timestamptz) - event timestamp
  • meta (jsonb) - correlation, causation, and actor metadata

Streams table ({schema}.{table}_streams) - tracks stream processing state for reactions:

  • stream - stream identifier
  • at - last processed event position
  • leased_by / leased_until - distributed processing claim info
  • blocked / error - error tracking for failed streams

Competing Consumer Pattern

The PostgreSQL adapter uses FOR UPDATE SKIP LOCKED for atomic stream claiming — the idiomatic PostgreSQL competing consumer pattern. The claim() method discovers streams with pending events and locks them in a single query:

  • Workers never block each other — locked rows are silently skipped
  • No race between discovery and locking (unlike a separate poll + lease)
  • Same pattern used by pgBoss, Graphile Worker, and other production job queues
  • Enables horizontal scaling by simply adding more workers

This replaces the previous two-step poll/lease approach, eliminating contention and simplifying the drain cycle.

License

MIT

FAQs

Package last updated on 29 Mar 2026

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts