
Security News
Axios Supply Chain Attack Reaches OpenAI macOS Signing Pipeline, Forces Certificate Rotation
OpenAI rotated macOS signing certificates after a malicious Axios package reached its CI pipeline in a broader software supply chain attack.
nuxt-queue
Advanced tools
Event-sourced queue and flow orchestration for Nuxt. Built on BullMQ with integrated real-time monitoring and multi-step workflow support.
Version: v0.4.0
Status: ✅ Current Implementation
Last Updated: 2025-11-07
✅ Core queue and flow functionality
✅ Event sourcing with Redis Streams
✅ Real-time monitoring UI with Vue Flow diagrams
✅ Flow scheduling (cron patterns and delays)
✅ Worker context with state, logging, and events
✅ Auto-discovery and flow analysis
🚧 Comprehensive trigger system (planned v0.5)
🚧 Python workers (planned v0.5)
🚧 Postgres adapters (planned v0.6)
All flow operations are event-sourced and stored in Redis Streams (nq:flow:<runId>). Events are immutable, type-safe, and provide a complete audit trail.
Event types:
flow.start, flow.completed, flow.failedstep.started, step.completed, step.failed, step.retrylog, emit, stateSee Event Schema for full details and field definitions.
ctx.flow.emit() to trigger downstream stepsctx.logger.log()on-complete state cleanup for automatic state managementnpx nuxi@latest module add nuxt-queue
// nuxt.config.ts
export default defineNuxtConfig({
modules: ['nuxt-queue'],
queue: {
ui: true, // Enable dev UI
// Shortcut: Configure all backends with one setting
store: {
adapter: 'redis',
redis: {
host: '127.0.0.1',
port: 6379,
},
},
// Or configure individually:
// queue: {
// adapter: 'redis',
// redis: { host: '127.0.0.1', port: 6379 },
// defaultConfig: { concurrency: 2 }
// },
// state: {
// adapter: 'redis',
// redis: { host: '127.0.0.1', port: 6379 }
// },
// eventStore: {
// adapter: 'memory' // Use memory for events
// },
},
})
// server/queues/example/process.ts
export default defineQueueWorker(async (job, ctx) => {
// Access job data
const { message } = job.data
// Log to stream
ctx.logger.log('info', 'Processing message', { message })
// Store state
await ctx.state.set('processedAt', new Date().toISOString())
// Return result
return { success: true, processed: message }
})
export const config = defineQueueConfig({
concurrency: 5,
})
// API route or wherever
const queueProvider = useQueueProvider()
await queueProvider.enqueue('process', {
name: 'process',
data: { message: 'Hello World' }
})
Multi-step workflows with event-driven orchestration:
// server/queues/my-flow/start.ts
export default defineQueueWorker(async (job, ctx) => {
ctx.logger.log('info', 'Flow started')
const prepared = { step: 1, data: job.data }
// Emit event to trigger next steps
ctx.flow.emit('data.prepared', prepared)
return prepared
})
export const config = defineQueueConfig({
flow: {
names: ['my-flow'],
role: 'entry',
step: 'start',
emits: ['data.prepared']
}
})
// server/queues/my-flow/process.ts
export default defineQueueWorker(async (job, ctx) => {
const result = await processData(job.data)
// Emit to trigger next step
ctx.flow.emit('data.processed', result)
return result
})
export const config = defineQueueConfig({
flow: {
names: ['my-flow'],
role: 'step',
step: 'process',
subscribes: ['data.prepared'], // Triggered by start
emits: ['data.processed']
}
})
// server/queues/my-flow/validate.ts
export default defineQueueWorker(async (job, ctx) => {
const validated = await validate(job.data)
ctx.flow.emit('validation.complete', validated)
return validated
})
export const config = defineQueueConfig({
flow: {
names: ['my-flow'],
role: 'step',
step: 'validate',
subscribes: ['data.prepared'], // Also triggered by start (parallel with process)
emits: ['validation.complete']
}
})
Start the flow:
const { startFlow } = useFlowEngine()
await startFlow('my-flow', { input: 'data' })
Flow execution: Entry step emits data.prepared → Both process and validate steps run in parallel (they both subscribe to data.prepared) → Each emits its own completion event for downstream steps.
Schedule flows to run automatically with cron patterns or delays:
// Schedule a flow to run daily at 2 AM
await $fetch('/api/_flows/my-flow/schedule', {
method: 'POST',
body: {
cron: '0 2 * * *',
input: { retentionDays: 30 },
metadata: {
description: 'Daily cleanup job'
}
}
})
// Schedule a one-time delayed execution (5 minutes)
await $fetch('/api/_flows/reminder-flow/schedule', {
method: 'POST',
body: {
delay: 300000, // milliseconds
input: { userId: '123', message: 'Check your email' }
}
})
// List all schedules for a flow
const schedules = await $fetch('/api/_flows/my-flow/schedules')
// Delete a schedule
await $fetch('/api/_flows/my-flow/schedules/schedule-id', {
method: 'DELETE'
})
Common cron patterns:
* * * * * - Every minute*/5 * * * * - Every 5 minutes0 * * * * - Every hour0 2 * * * - Daily at 2 AM0 9 * * 1 - Every Monday at 9 AM0 0 1 * * - First day of month at midnightAccess the built-in UI as <QueueApp /> component:
Every flow operation is stored as an event in Redis Streams:
nq:flow:<runId>
├─ flow.start
├─ step.started
├─ log
├─ step.completed
├─ step.started
├─ log
├─ step.completed
└─ flow.completed
Events are broadcast via Redis Pub/Sub for instant UI updates (<100ms latency).
Every worker receives a rich context:
{
jobId: string // BullMQ job ID
queue: string // Queue name
flowId: string // Flow run UUID
flowName: string // Flow definition name
stepName: string // Current step name
logger: {
log(level, msg, meta) // Structured logging
},
state: {
get(key) // Get flow-scoped state
set(key, value, opts) // Set with optional TTL
delete(key) // Delete state
},
flow: {
emit(eventName, data) // Emit flow event to trigger subscribed steps
startFlow(name, input) // Start nested flow
}
}
✅ Core queue and flow orchestration
✅ Event sourcing with Redis Streams
✅ Real-time monitoring UI
✅ Flow scheduling (cron and delays)
✅ Worker context with state and logging
See specs/roadmap.md for complete details.
Contributions welcome! Please read our architecture docs first:
# Install dependencies
yarn install
# Start playground with dev UI
cd playground
yarn dev
# Run tests
yarn test
MIT License - Copyright (c) DevJoghurt
FAQs
Nuxt queue service based on Bullmq
We found that nuxt-queue demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
OpenAI rotated macOS signing certificates after a malicious Axios package reached its CI pipeline in a broader software supply chain attack.

Security News
Open source is under attack because of how much value it creates. It has been the foundation of every major software innovation for the last three decades. This is not the time to walk away from it.

Security News
Socket CEO Feross Aboukhadijeh breaks down how North Korea hijacked Axios and what it means for the future of software supply chain security.