New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

@rotorsoft/act-sse

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@rotorsoft/act-sse

Incremental state broadcast over SSE for act apps

latest
Source
npmnpm
Version
1.2.1
Version published
Maintainers
1
Created
Source

@rotorsoft/act-sse

NPM Version NPM Downloads Build Status License: MIT

Incremental state broadcast over SSE for @rotorsoft/act event-sourced apps. Zero dependencies.

Instead of sending full aggregate state after each action, act-sse forwards the domain patches that event handlers already compute — sending only what changed as version-keyed partials.

Installation

npm install @rotorsoft/act-sse
# or
pnpm add @rotorsoft/act-sse

Requirements: Node.js >= 22.18.0

Architecture

  app.do() → Snapshot[] (each carries its event's domain patch)
      │
      ▼
  deriveState(snap)              ← app-specific (overlay presence, deadlines, etc.)
  state._v = snap.event.version  ← event store version is the single source of truth
      │
      ▼
  broadcast.publish(streamId, state, patches)
      │
      └── version-key each patch → { "5": { count: 3 }, "6": { name: "updated" } }
      └── push to all SSE subscribers
      │
      ▼
  Client: applyPatchMessage(msg, cached)
      │
      ├── contiguous → deep merge patches in version order
      ├── stale      → skip (client already ahead)
      └── behind     → invalidate + refetch (client missed versions)

Wire Format

// Version-keyed domain patches
// Keys = state version after that patch is applied
// Values = domain patch (deep partial of state)
{
  "5": { territories: { brazil: { armies: 3 } } },
  "6": { currentPlayerIndex: 2, phase: "reinforce" }
}

Multi-event commits produce multiple version-keyed entries. Version gaps trigger full state refetch on the client.

Server Usage

import { BroadcastChannel } from "@rotorsoft/act-sse";

const broadcast = new BroadcastChannel<MyAppState>();

// After every app.do():
const snaps = await app.do(action, target, payload);
const snap = snaps.at(-1)!;
const patches = snaps.map(s => s.patch).filter(Boolean);
const state = deriveState(snap);
broadcast.publish(streamId, state, patches);

// For non-event state changes (e.g. presence overlay):
broadcast.publishOverlay(streamId, { players: { pid: { connected: true } } });

// SSE subscription (tRPC example):
onStateChange: publicProcedure
  .input(z.object({ streamId: z.string() }))
  .subscription(async function* ({ input, signal }) {
    let resolve: (() => void) | null = null;
    let pending: PatchMessage | null = null;

    const cleanup = broadcast.subscribe(input.streamId, (msg) => {
      pending = msg;
      if (resolve) { resolve(); resolve = null; }
    });

    try {
      while (!signal?.aborted) {
        if (!pending) {
          await new Promise<void>((r) => {
            resolve = r;
            signal?.addEventListener("abort", () => r(), { once: true });
          });
        }
        if (signal?.aborted) break;
        if (pending) { const msg = pending; pending = null; yield msg; }
      }
    } finally {
      cleanup();
    }
  }),

Client Usage

import { applyPatchMessage } from "@rotorsoft/act-sse";

// In your SSE onData handler (React Query example):
onData: (msg) => {
  const cached = utils.getState.getData({ streamId });
  const result = applyPatchMessage(msg, cached);

  if (result.ok) {
    utils.getState.setData({ streamId }, result.state);
  } else if (result.reason === "behind") {
    // Client missed versions — trigger full refetch
    utils.getState.invalidate({ streamId });
  }
  // "stale" → no-op (client already has newer state from mutation response)
}

API

BroadcastChannel<S>

Server-side broadcast manager with per-stream subscriber channels and LRU state cache.

MethodDescription
publish(streamId, state, patches?)Cache state, version-key patches, push to subscribers
publishOverlay(streamId, patch)Same-version update (e.g. presence change)
subscribe(streamId, cb)Register subscriber, returns cleanup function
getState(streamId)Get cached state (for reconnects)
getSubscriberCount(streamId)Number of active subscribers
cacheDirect access to StateCache instance

applyPatchMessage(msg, cached)

Client-side patch applicator. Returns { ok: true, state } or { ok: false, reason }.

Reasons: "stale" (skip), "behind" (resync).

patch(original, patches)

Browser-safe deep merge utility. Deep merges plain objects, replaces arrays and other non-mergeable types (Date, Map, Set, RegExp, TypedArrays). Deletes keys set to null or undefined.

StateCache<S>

Generic LRU cache. Methods: get, set, delete, has, size, entries.

Types

type BroadcastState = Record<string, unknown> & { _v: number };
type PatchMessage<S extends BroadcastState> = Record<number, DeepPartial<S>>;
type Subscriber<S extends BroadcastState> = (msg: PatchMessage<S>) => void;

Version Contract

The _v field must be set from snap.event.version — the event store's monotonic stream version. This is the single source of truth for ordering. No separate version counters.

Bandwidth Savings

Typical savings for a game/collaborative app with ~5KB state:

ActionFull (bytes)Patch (bytes)Savings
Single field change~5,000~15097%
Multi-field update~5,000~40092%
Presence toggle~5,000~8098%

License

MIT

FAQs

Package last updated on 14 Mar 2026

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts