
Product
Introducing Repository Access Permissions and Custom Roles
Socket now supports Custom Roles and Repository Access Permissions so organizations can control who can access specific repositories and actions.
@fluojs/cqrs
Advanced tools
Command/query buses with bootstrap-time handler discovery, saga support, and event-bus delegation for Fluo.
English 한국어
CQRS primitives for fluo applications with bootstrap-time handler discovery, command/query dispatch, and event publishing delegation through @fluojs/event-bus.
npm install @fluojs/cqrs
Register the CqrsModule and define your first command and handler.
Use CqrsModule.forRoot(...) to wire CQRS buses and handler discovery.
import { Inject, Module } from '@fluojs/core';
import {
CqrsModule,
CommandHandler,
ICommand,
ICommandHandler,
CommandBusLifecycleService,
} from '@fluojs/cqrs';
// 1. Define a Command
class CreateUserCommand implements ICommand {
constructor(public readonly name: string) {}
}
// 2. Implement the Handler
@CommandHandler(CreateUserCommand)
class CreateUserHandler implements ICommandHandler<CreateUserCommand, string> {
async execute(command: CreateUserCommand): Promise<string> {
console.log(`Creating user: ${command.name}`);
return 'user-id-123';
}
}
// 3. Use the Command Bus
@Inject(CommandBusLifecycleService)
class UserService {
constructor(private readonly commandBus: CommandBusLifecycleService) {}
async create(name: string) {
return this.commandBus.execute(new CreateUserCommand(name));
}
}
@Module({
imports: [CqrsModule.forRoot()],
providers: [CreateUserHandler, UserService],
})
class AppModule {}
Read projections keep query-shaped data separate from the write model. Publish a domain event after the write succeeds, update the projection from an @EventHandler(...), and serve that denormalized view from a @QueryHandler(...).
import { Inject } from '@fluojs/core';
import {
EventHandler,
IEvent,
IEventHandler,
IQuery,
IQueryHandler,
QueryHandler,
} from '@fluojs/cqrs';
interface OrderSummaryView {
id: string;
customerId: string;
status: 'placed';
}
class OrderPlacedEvent implements IEvent {
constructor(
public readonly orderId: string,
public readonly customerId: string,
) {}
}
class GetOrderSummaryQuery implements IQuery<OrderSummaryView | undefined> {
constructor(public readonly orderId: string) {}
}
@Inject(OrderSummaryProjectionStore)
@EventHandler(OrderPlacedEvent)
class OrderSummaryProjectionHandler implements IEventHandler<OrderPlacedEvent> {
constructor(private readonly store: OrderSummaryProjectionStore) {}
async handle(event: OrderPlacedEvent): Promise<void> {
await this.store.upsert({
id: event.orderId,
customerId: event.customerId,
status: 'placed',
});
}
}
@Inject(OrderSummaryProjectionStore)
@QueryHandler(GetOrderSummaryQuery)
class GetOrderSummaryHandler
implements IQueryHandler<GetOrderSummaryQuery, OrderSummaryView | undefined>
{
constructor(private readonly store: OrderSummaryProjectionStore) {}
async execute(query: GetOrderSummaryQuery): Promise<OrderSummaryView | undefined> {
return this.store.findById(query.orderId);
}
}
Register the projection handler, query handler, and projection store as singleton providers in the same application module that imports CqrsModule.forRoot(...). CqrsEventBusService.publish(new OrderPlacedEvent(...)) runs matching @EventHandler(...) providers before sagas and delegated @fluojs/event-bus publication, so the read model observes the write-side fact through the documented CQRS event pipeline. Keep projection handlers idempotent because event replay, retries, or external transports can deliver the same business fact more than once.
Sagas allow you to listen for events and trigger new commands, enabling complex long-running workflows.
import { Inject } from '@fluojs/core';
import { Saga, ISaga, IEvent, ICommand, CqrsDispatchContext, CommandBusLifecycleService } from '@fluojs/cqrs';
class UserCreatedEvent implements IEvent {
constructor(public readonly userId: string) {}
}
class SendWelcomeEmailCommand implements ICommand {
constructor(public readonly userId: string) {}
}
@Inject(CommandBusLifecycleService)
@Saga(UserCreatedEvent)
class UserSaga implements ISaga<UserCreatedEvent> {
constructor(private readonly commandBus: CommandBusLifecycleService) {}
async handle(event: UserCreatedEvent, context?: CqrsDispatchContext): Promise<void> {
await this.commandBus.execute(new SendWelcomeEmailCommand(event.userId), context);
}
}
Saga execution fails fast with SagaTopologyError when an in-process publish chain re-enters the same saga route cyclically or exceeds 32 nested saga hops. Multi-stage sagas may still react to different event types in sequence, but in-process saga graphs must stay acyclic overall; move intentionally cyclic or long-running feedback loops behind an external transport, scheduler, or other bounded boundary.
When a saga, command handler, query handler, or event handler performs another CQRS execute(...), publish(...), or publishAll(...) call, pass the optional CqrsDispatchContext argument through unchanged. CQRS uses this explicit runtime-agnostic context to keep saga topology checks intact across nested dispatch without relying on Node.js async-local APIs. The context is opaque: do not construct it, inspect it, or depend on topology fields because those fields are internal runtime state.
CqrsEventBusService.publish(event) runs the CQRS event pipeline in a fixed order: matching @EventHandler(...) providers first, matching @Saga(...) providers second, and delegated @fluojs/event-bus publication last. publishAll(events) preserves the input order by awaiting each event's CQRS handlers, sagas, and delegated publication call before publishing the next event. During application shutdown, the CQRS event bus waits for active publish(...) pipelines, publishAll(...) sequences, and saga execution chains to settle before marking itself stopped. Once shutdown starts, new publish(...), publishAll(...), and direct saga dispatch calls are rejected; already active publish and saga work continues draining inside the bounded shutdown window. Shutdown drain is bounded by CqrsModule.forRoot({ shutdown: { drainTimeoutMs } }), which defaults to 5000ms; if a CQRS handler, saga, or delegated publish chain is still stuck after the bound, CQRS records degraded status diagnostics, logs a warning, and lets application close continue instead of hanging indefinitely. When CqrsModule.forRoot({ eventBus: { publish: { waitForHandlers: false } } }) is configured, the delegated publication call can resolve before matching @OnEvent(...) subscribers finish, so publish(...), publishAll(...), and shutdown drain completion do not imply subscriber completion in that mode.
Each CQRS event handler and saga receives an isolated event copy with the matched event prototype restored. Mutating that copy is local to the current handler or saga route; those mutations are not visible to other CQRS handlers, sagas, the original event object, or delegated @fluojs/event-bus subscribers. The delegated event-bus publication receives the original event after CQRS side effects complete, so @OnEvent(...) projections and transports observe the caller-owned payload rather than a CQRS handler's mutated copy.
Event classes should keep their payload state cloneable and enumerable. String-keyed and symbol-keyed enumerable payload fields are preserved by the shared core clone fallback, while intentionally non-cloneable resources such as open sockets, functions, or process-local handles should be represented by IDs or other serializable boundaries before publishing.
CQRS handlers, event handlers, and sagas are discovered only on singleton providers. Non-singleton registrations are skipped with warnings.
Use these exports when you want explicit symbol tokens for the CQRS buses:
import { Inject } from '@fluojs/core';
import { COMMAND_BUS, QUERY_BUS, EVENT_BUS } from '@fluojs/cqrs';
@Inject(COMMAND_BUS, QUERY_BUS, EVENT_BUS)
class TokenInjectedService {
constructor(commandBus, queryBus, eventBus) {}
}
CqrsModule.forRoot(options): Main entry point. Registers buses and starts discovery.commandHandlers, queryHandlers, eventHandlers, sagas, and delegated eventBus options.CommandBusLifecycleService: Primary service for executing commands.QueryBusLifecycleService: Primary service for executing queries.CqrsEventBusService: Primary service for publishing events.@CommandHandler(Command): Associates a class with a Command.@QueryHandler(Query): Associates a class with a Query.@EventHandler(Event): Associates a class with an Event.@Saga(Event | Event[]): Marks a class as a Saga listener.ICommand, IQuery<T>, IEvent: Marker interfaces for messages.ICommandHandler<C, R>, IQueryHandler<Q, R>, IEventHandler<E>, ISaga<E>: Handler contracts.CqrsDispatchContext: Opaque optional context value to pass through nested CQRS dispatch from handlers and sagas. It exposes no public fields; provider assembly remains behind CqrsModule.forRoot(...) rather than a public createCqrsProviders(...) helper.CommandHandlerNotFoundException, QueryHandlerNotFoundException: Raised when a bus has no matching handler.DuplicateCommandHandlerError, DuplicateQueryHandlerError: Raised when different singleton providers claim the same command or query type.DuplicateEventHandlerError: Exported for conflicting event-handler discovery failures; ordinary multiple @EventHandler(...) providers for the same event are valid and fan out in discovery order.SagaExecutionError: Wraps unexpected non-Fluo saga failures.SagaTopologyError: Raised when saga orchestration detects a self-triggering, cyclic, or over-deep in-process saga graph.createCqrsPlatformStatusSnapshot(...): Creates CQRS status snapshots for diagnostics and health surfaces.@fluojs/event-bus: Underlying event distribution used by CqrsEventBusService.@fluojs/core: Required for @Module and @Inject decorators.packages/cqrs/src/module.test.ts: Module registration and basic bus usage.packages/cqrs/src/public-api.test.ts: Root-barrel public API contract coverage.packages/cqrs/src/status.test.ts: CQRS status snapshot behavior.packages/cqrs/src/event-clone.test.ts: Event clone fallback behavior.FAQs
Command/query buses with bootstrap-time handler discovery, saga support, and event-bus delegation for Fluo.
We found that @fluojs/cqrs demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Socket now supports Custom Roles and Repository Access Permissions so organizations can control who can access specific repositories and actions.

Product
Socket MCP now lets AI assistants review org alerts, investigate threats using the Socket threat feed, and inspect package files in addition to dependency scoring.

Product
Socket Firewall blocks malicious VS Code and Open VSX extensions before install, protecting developers from compromised editor marketplaces.