New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details
Socket
Book a DemoSign in
Socket

nuxt-queue

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nuxt-queue

Nuxt queue service based on Bullmq

latest
Source
npmnpm
Version
0.4.4
Version published
Maintainers
1
Created
Source

Nuxt Queue

Event-sourced queue and flow orchestration for Nuxt. Built on BullMQ with integrated real-time monitoring and multi-step workflow support.

✨ Features

  • 🔄 Queue Management: Reliable job processing with BullMQ
  • 🎭 Flow Orchestration: Multi-step workflows with event sourcing
  • Flow Scheduling: Cron-based and delayed flow execution
  • Real-time Updates: Redis Pub/Sub for <100ms latency monitoring
  • 📊 Event Sourcing: Complete audit trail of all flow operations
  • 🎨 Development UI: Visual flow diagrams, timeline, and scheduling
  • 🔌 Worker Context: Rich runtime with state, logging, and events
  • 📦 Auto-discovery: Filesystem-based worker registry
  • 🚀 Horizontal Scaling: Stateless architecture for easy scaling
  • 🔍 Full Observability: Real-time logs, metrics, and event streams

Version: v0.4.0
Status: ✅ Current Implementation
Last Updated: 2025-11-07 ✅ Core queue and flow functionality
✅ Event sourcing with Redis Streams
✅ Real-time monitoring UI with Vue Flow diagrams
✅ Flow scheduling (cron patterns and delays)
✅ Worker context with state, logging, and events
✅ Auto-discovery and flow analysis
🚧 Comprehensive trigger system (planned v0.5)
🚧 Python workers (planned v0.5)
🚧 Postgres adapters (planned v0.6)

🗃️ Event Schema & Storage

All flow operations are event-sourced and stored in Redis Streams (nq:flow:<runId>). Events are immutable, type-safe, and provide a complete audit trail.

Event types:

  • flow.start, flow.completed, flow.failed
  • step.started, step.completed, step.failed, step.retry
  • log, emit, state

See Event Schema for full details and field definitions.

🏆 Best Practices

  • Keep steps small and focused
  • Use state for shared data between steps
  • Use ctx.flow.emit() to trigger downstream steps
  • Log with context using ctx.logger.log()
  • Set concurrency based on resource needs
  • Use on-complete state cleanup for automatic state management
  • Document schedules with metadata for maintainability

⚠️ Limitations (v0.4)

  • TypeScript only: Python workers not yet implemented (planned for v0.5)
  • No complex triggers: Only basic scheduling available (v0.5 will add triggers)
  • No await patterns: Pausing flows for time/events planned for v0.5
  • Redis only: No Postgres adapter yet (planned for v0.6)
  • State separate from events: Not unified with stream store (planned for v0.6)
  • Basic logging: No advanced logger adapters (planned for v0.7)
  • No schedule editing: Must delete and recreate schedules (v0.5 will add full trigger management)

🚀 Quick Start

Installation

npx nuxi@latest module add nuxt-queue

Configuration

// nuxt.config.ts
export default defineNuxtConfig({
  modules: ['nuxt-queue'],
  queue: {
    ui: true,  // Enable dev UI
    // Shortcut: Configure all backends with one setting
    store: {
      adapter: 'redis',
      redis: {
        host: '127.0.0.1',
        port: 6379,
      },
    },
    // Or configure individually:
    // queue: {
    //   adapter: 'redis',
    //   redis: { host: '127.0.0.1', port: 6379 },
    //   defaultConfig: { concurrency: 2 }
    // },
    // state: {
    //   adapter: 'redis',
    //   redis: { host: '127.0.0.1', port: 6379 }
    // },
    // eventStore: {
    //   adapter: 'memory'  // Use memory for events
    // },
  },
})

Create Your First Worker

// server/queues/example/process.ts
export default defineQueueWorker(async (job, ctx) => {
  // Access job data
  const { message } = job.data
  
  // Log to stream
  ctx.logger.log('info', 'Processing message', { message })
  
  // Store state
  await ctx.state.set('processedAt', new Date().toISOString())
  
  // Return result
  return { success: true, processed: message }
})

export const config = defineQueueConfig({
  concurrency: 5,
})

Enqueue a Job

// API route or wherever
const queueProvider = useQueueProvider()
await queueProvider.enqueue('process', {
  name: 'process',
  data: { message: 'Hello World' }
})

Create a Flow

Multi-step workflows with event-driven orchestration:

// server/queues/my-flow/start.ts
export default defineQueueWorker(async (job, ctx) => {
  ctx.logger.log('info', 'Flow started')
  const prepared = { step: 1, data: job.data }
  
  // Emit event to trigger next steps
  ctx.flow.emit('data.prepared', prepared)
  
  return prepared
})

export const config = defineQueueConfig({
  flow: {
    names: ['my-flow'],
    role: 'entry',
    step: 'start',
    emits: ['data.prepared']
  }
})

// server/queues/my-flow/process.ts
export default defineQueueWorker(async (job, ctx) => {
  const result = await processData(job.data)
  
  // Emit to trigger next step
  ctx.flow.emit('data.processed', result)
  
  return result
})

export const config = defineQueueConfig({
  flow: {
    names: ['my-flow'],
    role: 'step',
    step: 'process',
    subscribes: ['data.prepared'],  // Triggered by start
    emits: ['data.processed']
  }
})

// server/queues/my-flow/validate.ts
export default defineQueueWorker(async (job, ctx) => {
  const validated = await validate(job.data)
  ctx.flow.emit('validation.complete', validated)
  return validated
})

export const config = defineQueueConfig({
  flow: {
    names: ['my-flow'],
    role: 'step',
    step: 'validate',
    subscribes: ['data.prepared'],  // Also triggered by start (parallel with process)
    emits: ['validation.complete']
  }
})

Start the flow:

const { startFlow } = useFlowEngine()
await startFlow('my-flow', { input: 'data' })

Flow execution: Entry step emits data.prepared → Both process and validate steps run in parallel (they both subscribe to data.prepared) → Each emits its own completion event for downstream steps.

Schedule a Flow

Schedule flows to run automatically with cron patterns or delays:

// Schedule a flow to run daily at 2 AM
await $fetch('/api/_flows/my-flow/schedule', {
  method: 'POST',
  body: {
    cron: '0 2 * * *',
    input: { retentionDays: 30 },
    metadata: {
      description: 'Daily cleanup job'
    }
  }
})

// Schedule a one-time delayed execution (5 minutes)
await $fetch('/api/_flows/reminder-flow/schedule', {
  method: 'POST',
  body: {
    delay: 300000,  // milliseconds
    input: { userId: '123', message: 'Check your email' }
  }
})

// List all schedules for a flow
const schedules = await $fetch('/api/_flows/my-flow/schedules')

// Delete a schedule
await $fetch('/api/_flows/my-flow/schedules/schedule-id', {
  method: 'DELETE'
})

Common cron patterns:

  • * * * * * - Every minute
  • */5 * * * * - Every 5 minutes
  • 0 * * * * - Every hour
  • 0 2 * * * - Daily at 2 AM
  • 0 9 * * 1 - Every Monday at 9 AM
  • 0 0 1 * * - First day of month at midnight

🎨 Development UI

Access the built-in UI as <QueueApp /> component:

  • 📊 Dashboard: Overview of queues and flows
  • 🔄 Flow Diagrams: Visual representation with Vue Flow
  • Flow Scheduling: Create and manage cron-based or delayed schedules
  • 📝 Event Timeline: Real-time event stream with step details
  • 📋 Logs: Filtered logging by flow/step
  • 📈 Metrics: Queue statistics and performance
  • 🔍 Flow Runs: Complete history with status tracking

🏗️ Architecture

Event Sourcing

Every flow operation is stored as an event in Redis Streams:

nq:flow:<runId>
├─ flow.start
├─ step.started
├─ log
├─ step.completed
├─ step.started
├─ log
├─ step.completed
└─ flow.completed

Real-time Distribution

Events are broadcast via Redis Pub/Sub for instant UI updates (<100ms latency).

Worker Context

Every worker receives a rich context:

{
  jobId: string              // BullMQ job ID
  queue: string              // Queue name
  flowId: string             // Flow run UUID
  flowName: string           // Flow definition name
  stepName: string           // Current step name
  logger: {
    log(level, msg, meta)    // Structured logging
  },
  state: {
    get(key)                 // Get flow-scoped state
    set(key, value, opts)    // Set with optional TTL
    delete(key)              // Delete state
  },
  flow: {
    emit(eventName, data)    // Emit flow event to trigger subscribed steps
    startFlow(name, input)   // Start nested flow
  }
}

📚 Documentation

v0.4 Documentation

API & Advanced

Roadmap & Future

🔮 Roadmap

v0.4 (Current - November 2025)

✅ Core queue and flow orchestration
✅ Event sourcing with Redis Streams
✅ Real-time monitoring UI
✅ Flow scheduling (cron and delays)
✅ Worker context with state and logging

v0.5

  • 🎯 Comprehensive trigger system (schedule, webhook, event, manual)
  • ⏱️ Await patterns (time, event, condition)
  • 🐍 Python worker support with RPC bridge
  • 🔗 Webhook triggers with auto-setup

v0.6

  • 🐘 PgBoss queue provider option
  • 🗄️ Postgres stream store adapter
  • 🔄 Unified state and event storage
  • 📊 Advanced state management

v0.7

  • 📊 Enhanced logger with multiple adapters
  • 🌐 HTTP mode for workers (REST/gRPC)
  • 🔌 External service hooks
  • 🎨 Pluggable worker execution modes

See specs/roadmap.md for complete details.

🤝 Contributing

Contributions welcome! Please read our architecture docs first:

Development Setup

# Install dependencies
yarn install

# Start playground with dev UI
cd playground
yarn dev

# Run tests
yarn test

📄 License

MIT License - Copyright (c) DevJoghurt

FAQs

Package last updated on 10 Nov 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts