
Research
/Security News
Mini Shai-Hulud Campaign Hits Red Hat Cloud Services npm Packages
A mini Shai-Hulud campaign compromised Red Hat Cloud Services npm packages to steal developer and CI/CD secrets during installation.
@llamaindex/workflow-core
Advanced tools
⚠️ DEPRECATED: This package is no longer maintained. Please use LlamaAgents (Python Workflows) instead.
🌊 is a simple, lightweight workflow engine, in TypeScript.
npm i @llamaindex/workflow-core
yarn add @llamaindex/workflow-core
pnpm add @llamaindex/workflow-core
bun add @llamaindex/workflow-core
deno add npm:@llamaindex/workflow-core
For examples, check out the demo folder.
import { workflowEvent } from "@llamaindex/workflow-core";
const startEvent = workflowEvent<string>();
const stopEvent = workflowEvent<1 | -1>();
import { createWorkflow } from "@llamaindex/workflow-core";
const convertEvent = workflowEvent();
const workflow = createWorkflow();
workflow.handle([startEvent], (start) => {
return convertEvent.with(Number.parseInt(start.data, 10));
});
workflow.handle([convertEvent], (convert) => {
return stopEvent.with(convert.data > 0 ? 1 : -1);
});
import { pipeline } from "node:stream/promises";
const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with());
const result = await pipeline(stream, async function (source) {
for await (const event of source) {
if (stopEvent.include(event)) {
return "stop received!";
}
}
});
console.log(result); // stop received!
// or
const allEvents = await stream.until(stopEvent).toArray();
There are helper functions to make working with workflows even simpler:
import {
runWorkflow,
runAndCollect,
runWorkflowWithFilter,
} from "@llamaindex/workflow-core/stream/run";
// Run workflow and get final result
const result = await runWorkflow(workflow, startEvent.with("42"), stopEvent);
// Run workflow and collect all events
const allEvents = await runAndCollect(
workflow,
startEvent.with("42"),
stopEvent,
);
By default, we provide a simple fan-out utility to run multiple workflows in parallel
context.sendEvent will emit a new event to current workflowcontext.stream will return a stream of events emitted by the sub-workflowlet condition = false;
workflow.handle([startEvent], async (context, start) => {
const { sendEvent, stream } = context;
for (let i = 0; i < 10; i++) {
sendEvent(convertEvent.with(i));
}
// You define the condition to stop the workflow
const results = await stream
.until(() => condition)
.filter(convertStopEvent)
.toArray();
console.log(results.length); // 10
return stopEvent.with();
});
workflow.handle([convertEvent], (convert) => {
if (convert.data === 9) {
condition = true;
}
return convertStopEvent.with(/* ... */);
});
Workflow is event-driven, you can use any stream API to handle the workflow like rxjs
import { from, pipe } from "rxjs";
const { stream, sendEvent } = workflow.createContext();
from(stream)
.pipe(filter((ev) => eventSource(ev) === messageEvent))
.subscribe((ev) => {
console.log(ev.data);
});
sendEvent(fileParseWorkflow.startEvent(directory));
Workflow can be used as middleware in any server framework, like express, hono, fastify, etc.
import { Hono } from "hono";
import { serve } from "@hono/node-server";
import { createHonoHandler } from "@llamaindex/workflow-core/interrupter/hono";
import {
agentWorkflow,
startEvent,
stopEvent,
} from "../workflows/tool-call-agent.js";
const app = new Hono();
app.post(
"/workflow",
createHonoHandler(
agentWorkflow,
async (ctx) => startEvent(await ctx.req.text()),
stopEvent,
),
);
serve(app, ({ port }) => {
console.log(`Server started at http://localhost:${port}`);
});
You can use signal in the context parameter to handle error
workflow.handle([convertEvent], (context) => {
const { signal } = context;
signal.onabort = () => {
console.error("error in convert event:", abort.reason);
};
});
Workflow handlers receive the context as the first parameter, providing access to sendEvent, stream, and signal.
workflow.handle([startEvent], async (context) => {
const { sendEvent, stream, signal } = context;
// Use context properties directly
sendEvent(processEvent.with());
});
withStateAdding a state property to the workflow context, which returns a state object, each state is linked to the workflow
context.
import { createStatefulMiddleware } from "@llamaindex/workflow-core/middleware/state";
const { withState } = createStatefulMiddleware(() => ({
pendingTasks: new Set<Promise<unknown>>(),
}));
const workflow = withState(createWorkflow());
workflow.handle([startEvent], (context) => {
const { state } = context;
state.pendingTasks.add(
new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 100);
}),
);
});
const { state } = workflow.createContext();
You can also create a state with input:
const { withState } = createStatefulMiddleware((input: { id: string }) => ({
id: input.id,
}));
const workflow = withState(createWorkflow());
const { state } = workflow.createContext({ id: "1" });
withState also supports snapshot, you can use snapshot to save the state of the workflow, and resume to restore the state of the workflow.
const { snapshot, resume } = workflow.createContext();
// create snapshot
const snapshotData = await snapshot();
// resume workflow from snapshot
const { stream, sendEvent } = workflow.resume(snapshotData);
sendEvent(humanResponseEvent.with("hello"));
withValidationMake first parameter of handler to be sendEvent and its type safe and runtime safe
when you create a workflow using withValidation.
// before:
workflow.handle([startEvent], (start) => {});
// after:
workflow.handle([startEvent], (sendEvent, start) => {});
import { withValidation } from "@llamaindex/workflow-core/middleware/validation";
const startEvent = workflowEvent<void, "start">();
const disallowedEvent = workflowEvent<void, "disallowed">({
debugLabel: "disallowed",
});
const parseEvent = workflowEvent<string, "parse">();
const stopEvent = workflowEvent<number, "stop">();
const workflow = withValidation(createWorkflow(), [
[[startEvent], [stopEvent]],
[[startEvent], [parseEvent]],
]);
workflow.strictHandle([startEvent], (sendEvent, start) => {
sendEvent(
disallowedEvent.with(), // <-- ❌ Type Check Failed, Runtime Error
);
sendEvent(parseEvent.with("")); // <-- ✅
sendEvent(stopEvent.with(1)); // <-- ✅
});
withTraceEventsAdds tracing capabilities to your workflow, allowing you to monitor/decorate handler and debug event flows easily.
When enabled, it collects events based on the directed graph of the runtime and provide lifecycle hooks for each handler.
import {
withTraceEvents,
runOnce,
} from "@llamaindex/workflow-core/middleware/trace-events";
const workflow = withTraceEvents(createWorkflow());
workflow.handle(
[messageEvent],
runOnce(() => {
console.log("This message handler will only run once");
}),
);
workflow.handle([startEvent], (context) => {
context.sendEvent(messageEvent.with());
context.sendEvent(messageEvent.with());
});
{
const { sendEvent } = workflow.createContext();
sendEvent(startEvent.with());
sendEvent(messageEvent.with());
// This message handler will only run once!
}
{
const { sendEvent } = workflow.createContext();
// For each new context, the decorator is isolated.
sendEvent(startEvent.with());
sendEvent(messageEvent.with());
// This message handler will only run once!
}
workflow.substream(target, stream)You can use substream to create a substream from the workflow context,
which will only emit events that are emitted by the target event.
const ev = startEvent.with();
const { sendEvent, stream } = workflow.createContext();
sendEvent(ev);
sendEvent(messageEvent.with()); // <- this will not be included in the substream
const substream = workflow.substream(ev, stream);
This is helpful when you have async requests, and you want to track the events that are emitted by the target event.
For example:
Parallel requests
workflow.handle([startEvent], async (context, { data: uuid }) => {
const { sendEvent, stream } = context;
const ev = networkRequestEvent.with(uuid);
sendEvent(networkRequestEvent);
// you need bypass uuid to all events to get the correct response
const responses = await collect(
filter(workflow.substream(ev, stream), (ev) => ev.data === uuid),
);
});
sendEvent(startEvent.with(crypto.randomUUID()));
sendEvent(startEvent.with(crypto.randomUUID()));
workflow.handle([startEvent], async (context) => {
const { sendEvent, stream } = context;
const ev = networkRequestEvent.with();
sendEvent(networkRequestEvent);
const responses = await collect(workflow.substream(ev, stream));
});
sendEvent(startEvent.with());
sendEvent(startEvent.with());
createHandlerDecoratorYou can create your own handler decorator to modify the behavior of the handler.
import { createHandlerDecorator } from "@llamaindex/workflow-core/middleware/trace-events";
const noop: (...args: any[]) => void = function noop() {};
export const runOnce = createHandlerDecorator({
debugLabel: "onceHook",
getInitialValue: () => false,
onBeforeHandler: (handler, handlerContext, tracked) =>
tracked ? noop : handler,
onAfterHandler: () => true,
});
HandlerContextThe HandlerContext includes the runtime information of the handler in the directed graph of the workflow.
type BaseHandlerContext = {
// ... some other properties are hidden
handler: Handler<WorkflowEvent<any>[], any>;
inputEvents: WorkflowEvent<any>[];
// events data that are accepted by the handler
inputs: WorkflowEventData<any>[];
// events data that are emitted by the handler
outputs: WorkflowEventData<any>[];
//#region linked list data structure
prev: HandlerContext;
next: Set<HandlerContext>;
root: HandlerContext;
//#endregion
};
type SyncHandlerContext = BaseHandlerContext & {
async: false;
pending: null;
};
type AsyncHandlerContext = BaseHandlerContext & {
async: true;
pending: Promise<WorkflowEventData<any> | void> | null;
};
type HandlerContext = AsyncHandlerContext | SyncHandlerContext;
For example, when you send two startEvent events, and send messageEvent twice (once in the handler and once in the global),
the HandlerContext from root to leaf is:
let once = false;
workflow.handle([startEvent], (context) => {
const { sendEvent } = context;
if (once) {
return;
}
once = true;
sendEvent(messageEvent.with());
});
const { sendEvent } = workflow.createContext();
sendEvent(startEvent.with());
sendEvent(startEvent.with());
sendEvent(messageEvent.with());
rootHandlerContext(0)
├── startEventContext(0)
│ └── messageEventContext(0)
├── startEventContext(1)
└── messageEventContext(1)
You can use any directed graph library to visualize the directed graph of the workflow.
FAQs
event-based workflow engine
We found that @llamaindex/workflow-core demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 9 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
/Security News
A mini Shai-Hulud campaign compromised Red Hat Cloud Services npm packages to steal developer and CI/CD secrets during installation.

Research
/Security News
The North Korean malware loader hides in a Packagist-listed package and its GitHub branch to fetch and execute remote code in a likely Contagious Interview-style lure.

Security News
The Rust project is moving toward formal rules on LLM use in contributions after months of internal debate over maintainer burden, code quality, and contributor experience.