Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

@webblackbox/pipeline

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@webblackbox/pipeline

Chunking, indexing, blob storage, and archive export pipeline for WebBlackbox recording sessions.

latest
Source
npmnpm
Version
0.5.0
Version published
Maintainers
1
Created
Source

WebBlackbox

@webblackbox/pipeline

Chunking, indexing, blob storage, and encrypted archive export pipeline.

npm version License WebBlackbox

The event processing pipeline for WebBlackbox. Handles chunking, indexing, blob storage, and archive export for recorded sessions.

Overview

  • FlightRecorderPipeline — Main pipeline orchestrating the full event processing lifecycle
  • EventChunker — Groups events into size-bounded chunks with codec support
  • EventIndexer — Builds time-based, request-based, and inverted text search indexes on demand from stored chunks
  • Codec — NDJSON chunk codec support for none, gzip, br, and zst
  • Archive Export — Creates .webblackbox ZIP archives with optional AES-GCM encryption
  • PipelineStorage — Abstract storage interface with in-memory implementation
  • IndexedDB Quota Recovery — Indexed storage evicts oldest sessions on quota pressure (best-effort)

Usage

Basic Pipeline

import { FlightRecorderPipeline, MemoryPipelineStorage } from "@webblackbox/pipeline";
import type { SessionMetadata } from "@webblackbox/protocol";

const session: SessionMetadata = {
  sid: "S-1706000000000-abc",
  tabId: 123,
  startedAt: Date.now(),
  mode: "lite",
  url: "https://example.com",
  tags: ["debug"]
};

const pipeline = new FlightRecorderPipeline({
  session,
  storage: new MemoryPipelineStorage(),
  maxChunkBytes: 512 * 1024, // 512KB per chunk
  chunkCodec: "gzip" // supported codecs: none | gzip | br | zst
});

// Start the pipeline
await pipeline.start();

// Ingest events
for (const event of events) {
  await pipeline.ingest(event);
}

// Flush remaining events
await pipeline.flush();

// Build search indexes on demand from persisted chunks
const indexes = await pipeline.finalizeIndexes();

// Export as archive
const result = await pipeline.exportBundle({
  passphrase: "optional-encryption-key",
  includeScreenshots: true,
  maxArchiveBytes: 100 * 1024 * 1024,
  recentWindowMs: 20 * 60 * 1000
});

console.log(`Exported: ${result.fileName} (${result.bytes.length} bytes)`);

includeScreenshots, maxArchiveBytes, and recentWindowMs are optional export filters. If omitted, export includes the full retained session.

Optional At-Rest Storage Encryption

EncryptedPipelineStorage encrypts chunk/blob cache payload bytes before persistence (for example when using IndexedDB storage).

import {
  EncryptedPipelineStorage,
  IndexedDbPipelineStorage,
  derivePipelineStorageKey
} from "@webblackbox/pipeline";

const derived = await derivePipelineStorageKey("cache-passphrase");

const storage = new EncryptedPipelineStorage(
  new IndexedDbPipelineStorage("webblackbox-flight-recorder"),
  {
    key: derived.key
  }
);

// Persist derived.salt + derived.iterations with your own secure key policy.

Note: this protects event/blob payload bytes at rest; indexes and session metadata remain plaintext for queryability.

Blob Storage

// Store binary data (screenshots, DOM snapshots, response bodies)
const hash = await pipeline.putBlob("image/webp", screenshotBytes);
// Returns SHA-256 hash for content-addressable retrieval

Event Chunking

The EventChunker groups events into size-bounded chunks:

import { EventChunker } from "@webblackbox/pipeline";

const chunker = new EventChunker(
  512 * 1024, // Max 512KB per chunk
  "gzip" // Codec: none | gzip | br | zst
);

// Append events; returns a finalized chunk when size threshold is reached
const chunk = await chunker.append(event);
if (chunk) {
  // chunk.meta: ChunkTimeIndexEntry (timestamps, size, hash)
  // chunk.bytes: Uint8Array (encoded NDJSON)
  // chunk.events: WebBlackboxEvent[] (original events)
}

// Flush remaining events
const remaining = await chunker.flush();

FinalizedChunk

type FinalizedChunk = {
  meta: ChunkTimeIndexEntry; // Chunk metadata for indexing
  bytes: Uint8Array; // Encoded event data
  events: WebBlackboxEvent[]; // Original events in this chunk
};

Indexing

FlightRecorderPipeline does not retain full request/text indexes in memory while recording. It rebuilds them from persisted chunks when finalizeIndexes() or exportBundle() runs, which keeps long-running extension sessions memory-bounded.

The EventIndexer builds three types of indexes:

import { EventIndexer } from "@webblackbox/pipeline";

const indexer = new EventIndexer();

// Add chunk metadata for time-based indexing
indexer.addChunk(chunkMeta);

// Add events for request and text indexing
indexer.addEvents(events);

// Get all indexes
const { time, request, inverted } = indexer.snapshot();

Index Types

IndexPurposeStructure
Time IndexLocate chunks by timestamp{ chunkId, seq, tStart, tEnd, monoStart, monoEnd, eventCount, byteLength, codec, sha256 }
Request IndexMap request IDs to event IDs{ reqId, eventIds[] }
Inverted IndexFull-text search{ term, eventIds[] }

Codec

import { encodeEventsNdjson, decodeEventsNdjson } from "@webblackbox/pipeline";

// Encode events as NDJSON
const bytes = encodeEventsNdjson(events);

// Decode NDJSON back to events
const decoded = decodeEventsNdjson(bytes);

SHA-256 Hashing

import { sha256Hex } from "@webblackbox/pipeline";

const hash = await sha256Hex(data); // Returns hex string

Storage Interface

import type { PipelineStorage } from "@webblackbox/pipeline";

// Implement custom storage backend
class CustomStorage implements PipelineStorage {
  async putSession(metadata: SessionMetadata): Promise<void> {
    /* ... */
  }
  async getSession(sid: string): Promise<SessionMetadata | undefined> {
    /* ... */
  }
  async putChunk(chunk: StoredChunk): Promise<void> {
    /* ... */
  }
  async listChunks(sid: string): Promise<StoredChunk[]> {
    /* ... */
  }
  async getChunk(sid: string, chunkId: string): Promise<StoredChunk | undefined> {
    /* ... */
  }
  async putBlob(blob: StoredBlob, sidHint?: string): Promise<void> {
    /* ... */
  }
  async getBlob(hash: string): Promise<StoredBlob | undefined> {
    /* ... */
  }
  async listBlobs(): Promise<StoredBlob[]> {
    /* ... */
  }
  async putIndexes(sid: string, indexes: object): Promise<void> {
    /* ... */
  }
  async getIndexes(sid: string): Promise<object> {
    /* ... */
  }
  async putIntegrity(sid: string, manifest: HashesManifest): Promise<void> {
    /* ... */
  }
  async getIntegrity(sid: string): Promise<HashesManifest | undefined> {
    /* ... */
  }
}

MemoryPipelineStorage

In-memory implementation using Maps. Features:

  • Blob deduplication by SHA-256 hash
  • Reference counting for shared blobs
  • Suitable for extension offscreen documents and testing

Archive Format

Structure

session.webblackbox (ZIP)
├── manifest.json           # Export metadata
├── events/
│   ├── C-000001.ndjson     # Event chunks (NDJSON)
│   └── ...
├── index/
│   ├── time.json           # Time-based chunk index
│   ├── req.json            # Request ID mapping
│   └── inv.json            # Full-text search index
├── blobs/
│   ├── sha256-<hash>.webp  # Binary blobs (screenshots, etc.)
│   └── ...
└── integrity/
    └── hashes.json         # SHA-256 hashes

Encryption

const result = await pipeline.exportBundle({
  passphrase: "my-secret-key"
});

When a passphrase is provided:

  • KDF: PBKDF2 with SHA-256, 120,000 iterations, random salt
  • Encryption: AES-GCM with per-file random IVs
  • Scope: Event chunks, indexes, and blobs are encrypted
  • Manifest: Remains unencrypted (contains encryption metadata)
  • Integrity: SHA-256 hashes computed on encrypted content

Archive Creation

import { createWebBlackboxArchive } from "@webblackbox/pipeline";

const { bytes, integrity } = await createWebBlackboxArchive(
  {
    manifest,
    chunks, // StoredChunk[]
    timeIndex, // ChunkTimeIndexEntry[]
    requestIndex, // RequestIndexEntry[]
    invertedIndex, // InvertedIndexEntry[]
    blobs // StoredBlob[]
  },
  {
    passphrase: "optional"
  }
);

License

MIT

Keywords

pipeline

FAQs

Package last updated on 14 May 2026

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts