New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

raft-logic

Package Overview
Dependencies
Maintainers
1
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

raft-logic

Node.js wrapper around a WASM build of tikv/raft-rs (via wasm-bindgen).

latest
npmnpm
Version
0.3.14
Version published
Maintainers
1
Created
Source

raft-logic

raft-logic is a Node.js library that wraps the etcd/tikv Raft implementation (raft-rs) via WebAssembly, exposing a small, promise-based ES module API for running Raft nodes in JavaScript.

Use it to:

  • Embed consensus in a service without running a separate Raft daemon.
  • Build replicated state machines for metadata, coordination, or job scheduling.
  • Prototype or test Raft behaviors with deterministic, controllable ticks.

Highlights:

  • ESM-only (Node 18+)
  • Minimal in-memory transport and storage for tests/examples
  • Bundled WASM artifacts (no external download needed)
  • Deterministic helpers: transferLeadership, stepDown, runUntilStableLeader
  • Event signals and waiters: onRoleChange, onCommitAdvanced, onQuorumActive; waitForLeaderStable(), waitForHeartbeatRound(), waitForFirstStableLeader()

Looking for runnable snippets? Skip to the examples section: Examples.

Installation

  • npm install raft-logic

Requirements

  • Node.js >= 18
  • ESM environment (either use .mjs files or set "type": "module" in your package.json)

Quick start (single node)

import { RaftNode, InMemoryTransport, InMemoryStorage } from 'raft-logic';

async function run() {
  const transport = new InMemoryTransport();
  const storage = new InMemoryStorage();

  const applied = [];
  const node = new RaftNode({
    id: '1',
    peers: ['1'],          // single-node cluster
    electionTick: 10,
    heartbeatTick: 1,
    transport,
    storage,
    apply: async (entry) => {
      if (entry.data) {
        const data = Buffer.from(entry.data, 'base64').toString('utf8');
        applied.push(data);
        console.log('[apply] committed entry:', data);
      }
    },
    tickIntervalMs: 50,
  });

  await node.start();

  // Wait until a leader exists (for single node this should be quick)
  await node.waitForLeader(5000);

  // Ergonomic client request: auto-forward (if follower), and optionally wait for local apply
  await node.clientRequest('hello-from-node', { waitFor: 'apply', timeout: 2000 });

  console.log('Applied entries:', applied);
  await node.stop();
}

run().catch((e) => {
  console.error(e);
  process.exit(1);
});

Threaded quick start (worker thread)

import { ThreadedRaftNode, InMemoryTransport } from 'raft-logic';

async function run() {
  const transport = new InMemoryTransport();

  const applied = [];
  const node = new ThreadedRaftNode({
    id: '1',
    peers: ['1'],
    electionTick: 10,
    heartbeatTick: 1,
    transport,
    apply: async (entry) => {
      if (entry.data) {
        const data = Buffer.from(entry.data, 'base64').toString('utf8');
        applied.push(data);
        console.log('[apply/main-thread] committed entry:', data);
      }
    },
    tickIntervalMs: 50,
    preVote: true,
  });

  await node.start();
  await node.waitForLeader(5000);

  // Propose via clientRequest and wait for local apply
  const res = await node.clientRequest('hello-from-threaded', { waitFor: 'apply', timeout: 2000 });
  console.log('Proposed at index', res.index, 'term', res.term);

  await node.stop();
}

run().catch((e) => { console.error(e); process.exit(1); });

Threaded worker apply/storage (optional)

import { ThreadedRaftNode, InMemoryTransport } from 'raft-logic';
import { fileURLToPath } from 'node:url';

const applyModule = fileURLToPath(new URL('./apply.mjs', import.meta.url));

const node = new ThreadedRaftNode({
  id: '1',
  peers: ['1'],
  electionTick: 10,
  heartbeatTick: 1,
  transport: new InMemoryTransport(),
  workerApply: { module: applyModule, export: 'apply' }, // runs apply inside the worker
  workerStorage: { kind: 'sqlite', options: { file: './data/node-1.sqlite' } },
});

API overview (high level)

  • class RaftNode(options)

    • options:
      • id: string (stringified u64)
      • peers: string[] (initial voter set, stringified u64)
      • electionTick: number
      • heartbeatTick: number
      • transport: { send(fromId: string, msgs: object[]): Promise, register?: (node: RaftNode) => void }
      • storage?: { initialState?(id: string): Promise, persistReady?(id: string, ready: any): Promise }
      • apply(entry): function called for each committed entry
      • tickIntervalMs?: number (default ~100ms)
      • preVote?: boolean (default false)
      • checkQuorum?: boolean (default false)
      • onStateUpdate?(snapshot): optional callback to observe internal state cache updates
      • onDrain?(): optional callback when the Ready queue is drained
      • metrics?: { onPropose?(): void, onRejected?(reason: string, meta?: object): void, onRoleChange?(prev: string, next: string): void }
      • methods:
        • start(): Promise
        • stop(opts?): Promise
          • opts: { drainApply?: boolean, drainTicks?: boolean } to quiesce before stopping
        • status(): Promise<{ role, term, lead, commitIndex, lastApplied, lastLogIndex, raft_state } | null>
        • propose(data: Uint8Array | ArrayBuffer | string): Promise
          • Throws NotLeaderError on follower/candidate; safe to call on single-node or when leader hint points to self
        • schedulePropose(data, opts?): Promise<{ index, term }>
          • Defers propose out of apply() context; safe alternative to propose() in re-entrant scenarios
        • clientRequest(data, opts?): Promise<{ index, term }>
          • opts: { autoForward?: boolean=true, waitFor?: 'none'|'commit'|'apply'='none', timeout?: number, abortSignal?: AbortSignal }
          • Auto-forwards to current leader if called on a follower (when transport supports it). Optionally waits for commit or local apply.
        • waitForLeader(timeoutMs?): Promise — resolves with leaderId when a stable leader exists
        • waitForLeaderStable(options?): Promise — event-driven waiter; options { requireQuorum=true, minTermResidencyTicks=0, signal?: AbortSignal }
        • waitForHeartbeatRound(n=1, options?): Promise — resolves after n majority heartbeat rounds in current term
        • waitForCommit(index, { localApply?: boolean, timeout?: number, abortSignal?: AbortSignal }): Promise
          • Waits for index to be committed cluster-wide and (optionally) applied locally
        • readIndex(opts?): Promise — leader-gated lease read; returns safe commit index
        • barrier(): Promise
          • Leader: resolves when all prior proposals are committed
          • Follower: resolves when all prior proposals are committed and applied locally
          • manualTick(): Promise, advanceTicks(n): Promise
            • Deterministic testing helpers to manually drive ticks
          • step(msg: object): Promise
          • campaign(): Promise
          • transferLeadership(targetId, timeoutMs?): Promise — deterministic leader handoff to a target peer (test helper)
          • stepDown(timeoutMs?): Promise — relinquish leadership and trigger re-election/transfer
          • runUntilStableLeader(timeoutMs?): Promise — wait for a stable leader and a short stability window
      • class ThreadedRaftNode(options)

        • Same high-level behavior as RaftNode but runs the raft core (WASM + storage + drain loop) inside a worker thread.
        • Options: same as RaftNode; apply() is required unless workerApply/applyMode 'worker' is set.
          • applyMode: 'main' | 'worker' (workerApply implies 'worker')
          • workerApply: { module: string, export?: string, options?: any } to run apply inside the worker.
          • workerStorage: { kind: 'inmemory'|'sqlite', options?: object } or { module: string, export?: string, options?: object } for worker storage.
        • Additional methods and signals:
          • onStateChange(cb), onBecameLeader(cb), onBecameFollower(cb): subscribe to state updates
          • transferLeadership(targetId, timeoutMs?): Promise — test helper via MsgTransferLeader
          • stepDown(timeoutMs?): Promise — test helper via MsgTimeoutNow
          • runUntilStableLeader(timeoutMs?): Promise — waits for a leader and a stability window (no role/lead changes)
          • readIndex(opts?): Promise — leader-gated lease read; returns safe commit index
          • stop(opts?): Promise — accepts { drainApply?: boolean, drainTicks?: boolean }
      • Adapters

        • InMemoryTransport: routes messages within the same process for testing.
          • sendClientRequest(leaderId, data, opts) — in-memory convenience used by clientRequest auto-forwarding
        • Utilities:
          • leaderForwarder(transport, getLeaderId): returns async function to route clientRequest to the current leader
          • waitForFirstStableLeader(nodes, options?): Promise — Promise.any over nodes’ waitForLeaderStable to pick the first stable leader
          • waitForReadyLeader(node, options?): Promise — composite helper that waits for a leader, drains backlog, and (optionally) enforces a lease-based read barrier before treating the node as ready for client operations.
        • InMemoryStorage: demonstrates the persist-before-advance contract (not durable)
        • SqliteStorage: production-oriented durable storage
          • getDb(): returns the underlying better-sqlite3 Database handle for co-located app tables
          • onOpen(db): optional callback invoked when the DB is opened to initialize co-located tables
      • Typed errors

        • NotLeaderError: { code: 'NotLeader', term: number, leaderId?: string, forwardHint?: { id?: string } }
        • TimeoutError: { code: 'Timeout' }
        • RejectedError: { code: 'Rejected', reason?: 'Reentrancy' | 'NotLeader' | ... }
      • Status/observability

        • status(): returns a richer snapshot: { role: 'leader'|'follower'|'candidate', term, lead, commitIndex, lastApplied, lastLogIndex, raft_state }
        • Event-style callbacks on ThreadedRaftNode: onStateChange, onBecameLeader, onBecameFollower
        • Barrier and waiter APIs for deterministic sequencing in tests and app logic

        Examples: metrics and readIndex

        import { RaftNode, InMemoryTransport, SimpleMetrics } from 'raft-logic';
        
        const transport = new InMemoryTransport();
        const metrics = new SimpleMetrics();
        
        const node = new RaftNode({
          id: '1',
          peers: ['1'],
          electionTick: 10,
          heartbeatTick: 1,
          transport,
          apply: () => {},
          metrics, // enable metrics hooks
        });
        
        await node.start();
        await node.waitForLeader(5000);
        
        // Metrics counters increment on proposals and rejections
        await node.clientRequest('ex', { waitFor: 'commit', timeout: 2000 });
        console.log('metrics snapshot', metrics.snapshot());
        
        // Linearizable read (lease-based): returns a safe commit index
        const safeIndex = await node.readIndex({ timeout: 2000 });
        // perform your application read knowing state ≥ safeIndex
        
        await node.stop({ drainApply: true, drainTicks: true });
        

        Deterministic testing helpers

        • Set tickIntervalMs in RaftNode to 0 (or use ThreadedRaftNode helpers) and drive ticks via manualTick()/advanceTicks(n) to remove wall clock coupling.
        • transferLeadership(targetId), stepDown() allow deterministic failover in tests.
        • runUntilStableLeader(timeout) blocks until a stable leader is observed (and holds through a short stability window).

        Example: deterministic leadership transfer and step down

        import { ThreadedRaftNode, InMemoryTransport } from 'raft-logic';
        
        const transport = new InMemoryTransport();
        const peers = ['1','2','3'];
        
        const common = {
          peers,
          electionTick: 10,
          heartbeatTick: 1,
          preVote: true,
          checkQuorum: false,
          transport,
          apply: async () => {},
          tickIntervalMs: 0 // manual ticking for determinism
        };
        
        const n1 = new ThreadedRaftNode({ id: '1', ...common });
        const n2 = new ThreadedRaftNode({ id: '2', ...common });
        const n3 = new ThreadedRaftNode({ id: '3', ...common });
        
        await Promise.all([n1.start(), n2.start(), n3.start()]);
        
        // helper to drive ticks across the cluster
        async function driveTicks(rounds = 100) {
          for (let i = 0; i < rounds; i++) {
            await Promise.all([n1.manualTick(), n2.manualTick(), n3.manualTick()]);
          }
        }
        
        // elect a leader deterministically
        await driveTicks(200);
        const leaderId = await n1.runUntilStableLeader(5000);
        console.log('Leader elected:', leaderId);
        
        // transfer leadership to node 2
        if (leaderId !== '2') {
          const leaderNode = leaderId === '1' ? n1 : (leaderId === '2' ? n2 : n3);
          await leaderNode.transferLeadership('2', 5000);
          await driveTicks(200);
        }
        
        // ask node 2 to step down and observe a new leader
        await n2.stepDown(5000);
        await driveTicks(200);
        const nextLeader = await n1.runUntilStableLeader(5000);
        console.log('New leader:', nextLeader);
        
        await Promise.all([n1.stop(), n2.stop(), n3.stop()]);
        

        Durable storage: SqliteStorage

        • Upholds persist-before-advance:
          • snapshot (compaction: delete entries <= snapshot.index)
          • entries (tail replacement: delete entries >= firstNew, then insert)
          • hardState (term, vote, commit)
        • Durable PRAGMAs on open: WAL + synchronous=FULL by default (configurable).
        • getDb(): gives access to the same database handle for co-located app tables.
        • onOpen(db): optional hook to set up your co-located schema.
        • Optional applyWithDb(entry, db): if provided, called during apply with the same handle used by raft-logic. This enables efficient, consistent side-effects (best-effort; user manages app-level transactions).

        Usage (durable)

        import { RaftNode, InMemoryTransport, SqliteStorage } from 'raft-logic';
        
        const transport = new InMemoryTransport();
        const storage = new SqliteStorage({
          file: './data/node-1.sqlite',
          onOpen(db) {
            db.exec('CREATE TABLE IF NOT EXISTS outbox(id INTEGER PRIMARY KEY, payload TEXT)');
          }
        });
        storage.open();
        
        const node = new RaftNode({
          id: '1',
          peers: ['1','2','3'],
          electionTick: 10,
          heartbeatTick: 1,
          transport,
          storage,
          apply: async (entry) => { /* your state machine */ },
        });
        await node.start();
        await node.waitForLeader(5000);
        
        const { index, term } = await node.clientRequest('do-something', { waitFor: 'commit', timeout: 2000 });
        console.log('Committed at index', index, 'term', term);
        
        await node.stop();
        storage.close();
        

        Notes on WASM

        • The package bundles the wasm-bindgen output under ./wasm and the loader automatically initializes it in Node.js.
        • You normally do not need to call loadWasm() directly; RaftNode.start() will initialize it if needed.
        • Low-level helpers:
          • loadWasm(customUrl?): Promise
          • wasmReady(): Exports
          • ready(customUrl?): Promise

        Examples

        • Single node (threaded):
          • npm run example:single-threaded
        • Three nodes (in-memory):
          • npm run example:three
        • Three nodes + SQLite:
          • npm run example:three-sqlite
        • Restart + recovery demo:
          • npm run example:three-sqlite-restart

        Implementation notes

        • Schema avoids reserved SQLite keywords:
          • hard_state.commit_index (not "commit")
          • snapshot.snap_index (not "index")
        • Entries persist the entry type as etype; the host JSON boundary uses entryType.

        Changelog (recent)

        • New event signals: onRoleChange, onCommitAdvanced, onQuorumActive
        • New waiters: waitForLeaderStable(options), waitForHeartbeatRound(n), and waitForFirstStableLeader(nodes, options)
        • loader.ready(): Promise ensures WASM initialized without polling
        • Added clientRequest(entry, opts) with auto-forwarding, waitFor=commit/apply, timeout/abort.
        • propose() returns NotLeaderError on followers (with leaderId, term, forwardHint).
        • Leadership signals and barriers:
          • onBecameLeader/onBecameFollower/onStateChange
          • waitForLeader(), barrier()
        • Waiters: waitForCommit(index, { localApply, timeout, abortSignal })
        • Deterministic testing: manualTick(), advanceTicks(), transferLeadership(), stepDown(), runUntilStableLeader()
        • SqliteStorage: getDb(), onOpen(db), optional applyWithDb(entry, db)
        • Richer status(): role, term, lead, commitIndex, lastApplied, lastLogIndex
        • Typed errors: NotLeaderError, TimeoutError, RejectedError
        • Metrics hooks: metrics.onPropose(), metrics.onRejected(reason, meta), metrics.onRoleChange(prev, next)
        • readIndex(): leader-gated lease-based linearizable read; returns commit index
        • stop(opts): { drainApply, drainTicks } to quiesce before freeing
        • Utilities: leaderForwarder(transport, getLeaderId)

        WASM Lifecycle Control and Diagnostics

        New APIs have been added to give developers explicit control and visibility over the WebAssembly runtime lifecycle.
        These are especially useful in long-lived or test-driven environments where multiple Raft clusters are created and destroyed in one process.

        Prevent premature freeing

        import { disableAutoFree } from "raft-logic/loader.mjs";
        disableAutoFree(); // Keeps the WASM runtime alive for the entire process
        

        or equivalently:

        import { retainWasm } from "raft-logic/loader.mjs";
        retainWasm(true);
        

        Global singleton loader

        Ensures all RaftNodes share the same WASM instance:

        import { getWasmInstance } from "raft-logic/loader.mjs";
        const wasm = await getWasmInstance();
        

        Diagnostics

        Inspect the current WASM runtime state:

        import { getWasmStatus } from "raft-logic/loader.mjs";
        console.log(getWasmStatus()); // { refCount: 0, freed: false, autoFreeDisabled: true }
        

        Controlled shutdown

        When auto-free is disabled, the runtime will not be freed automatically:

        import { controlledShutdownWasm } from "raft-logic/loader.mjs";
        await controlledShutdownWasm(); // Skips freeing if disableAutoFree() was called
        

        These APIs make raft-logic more robust for frameworks, test suites, and long-lived processes.

        Diagnostic Tests and Optional Logging

        A new diagnostic test has been added to help verify multi-cluster Raft behavior and isolate issues such as worker lifecycle or election stalls.

        Running the diagnostic test

        npm test --silent -- test/multi-instance-diagnostics.test.mjs
        

        This test creates two independent Raft clusters and drives them manually using manualTick().
        It verifies that both clusters elect leaders independently and remain isolated.

        Optional logging

        The diagnostic test supports an enableLogs option to toggle detailed Raft logs:

        const clusterA = await makeCluster('A', { enableLogs: true });
        const clusterB = await makeCluster('B', { enableLogs: false });
        

        When enableLogs is true, detailed [apply ...] and Raft debug logs are printed.
        When false, the test runs silently except for high-level diagnostic messages.

        License

        • MIT OR Apache-2.0

Keywords

raft

FAQs

Package last updated on 02 Jan 2026

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts