
Research
/Security News
Toptal’s GitHub Organization Hijacked: 10 Malicious Packages Published
Threat actors hijacked Toptal’s GitHub org, publishing npm packages with malicious payloads that steal tokens and attempt to wipe victim systems.
A strongly-typed graph-based workflow engine for building flexible, composable data processing pipelines with TypeScript
English | 한국어
A lightweight workflow engine for TypeScript that lets you create graph-based execution flows with type safety and minimal complexity.
npm install ts-edge
Type-safe workflows in ts-edge ensure type compatibility between connected nodes:
import { createGraph } from 'ts-edge';
// Each node receives the output of the previous node as its input
// TypeScript validates type compatibility between connected nodes at compile time
const workflow = createGraph()
.addNode({
name: 'number to string',
execute: (input: number) => {
// Convert number to string
return `Input received: ${input}`;
},
})
.addNode({
name: 'string to boolean',
execute: (input: string) => {
// Convert string to boolean
return input !== '';
},
})
.addNode({
name: 'boolean to array',
execute: (input: boolean) => {
// Convert boolean to array
return input ? [] : [1, 2, 3];
},
})
.edge('number to string', 'string to boolean') // Type check passes
// .edge('number to string', 'boolean to array') // ❌ Type error
.edge('string to boolean', 'boolean to array'); // Type check passes
// Compile and run the workflow
const app = workflow.compile('number to string');
const result = await app.run(100);
console.log(result.output); // [1,2,3]
State-based workflows in ts-edge allow nodes to share and modify a common state:
import { createStateGraph, graphStore } from 'ts-edge';
// Define counter state type
type CounterState = {
count: number;
increment: () => void;
decrement: () => void;
updateCount: (count: number) => void;
};
// Create a state store using graphStore
const store = graphStore<CounterState>((set, get) => {
return {
count: 0,
increment: () =>
set((prev) => {
return { count: prev.count + 1 };
}),
decrement: () => set({ count: get().count - 1 }),
updateCount: (count: number) => set({ count }),
};
});
// Create a state-based workflow
// In state-based workflows, nodes share and modify common state
// Note: Return values from state nodes are ignored
const workflow = createStateGraph(store)
.addNode({
name: 'increment',
execute: (state) => {
// Access state
console.log(state.count); // 0
state.increment();
},
})
.addNode({
name: 'checkCount',
execute: (state) => {
console.log(`Current count: ${state.count}`);
},
})
.addNode({
name: 'reset',
execute: (state) => {
// Reset state
state.updateCount(0);
},
})
.edge('increment', 'checkCount')
.dynamicEdge('checkCount', (state) => {
// Determine next node based on state
return state.count > 10 ? 'reset' : 'increment';
});
// Compile and run the workflow
const app = workflow.compile('increment');
const result = await app.run(); // Start with initial state
// Or start with partial state: await app.run({ count: 10 });
Nodes process input and produce output. Edges define the flow between nodes. Nodes can include optional metadata for documentation or visualization purposes.
const workflow = createGraph()
.addNode({
name: 'nodeA',
execute: (input: number) => ({ value: input * 2 }),
metadata: { description: 'Doubles the input value', category: 'math' }
})
.addNode({
name: 'nodeB',
execute: (input: { value: number }) => ({ result: input.value + 10 }),
metadata: { description: 'Adds 10 to the value' }
})
.edge('nodeA', 'nodeB');
Each node's execute function can receive a context object as a second argument:
addNode({
name: 'streamingNode',
metadata: { version: 1, role: 'processor' },
execute: (input, context) => {
// Access node metadata
console.log(context.metadata); // { version: 1, role: 'processor' }
// Emit stream events (useful for reporting progress during execution)
context.stream('Processing started...');
// Perform work
context.stream('50% complete');
// Final result
return { result: 'Completed' };
}
});
Make execution decisions based on node outputs:
workflow.dynamicEdge('processData', (data) => {
if (data.value > 100) return ['highValueProcess', 'standardProcess']; // Route to multiple nodes
if (data.value < 0) return 'errorHandler'; // Route to a single node
return 'standardProcess'; // Default path
});
For better visualization and documentation, you can specify possible targets:
workflow.dynamicEdge('processData', {
possibleTargets: ['highValueProcess', 'errorHandler', 'standardProcess'],
router: (data) => {
if (data.value > 100) return ['highValueProcess', 'standardProcess'];
if (data.value < 0) return 'errorHandler';
return 'standardProcess';
}
});
Process data in parallel branches and merge the results:
const workflow = createGraph()
.addNode({
name: 'fetchData',
execute: (query) => ({ query }),
})
.addNode({
name: 'processBranch1',
execute: (data) => ({ summary: summarize(data.query) }),
})
.addNode({
name: 'processBranch2',
execute: (data) => ({ details: getDetails(data.query) }),
})
.addMergeNode({
name: 'combineResults',
branch: ['processBranch1', 'processBranch2'], // Branches to merge
execute: (inputs) => ({
// inputs object contains outputs from each branch node
result: {
summary: inputs.processBranch1.summary,
details: inputs.processBranch2.details,
},
}),
})
.edge('fetchData', ['processBranch1', 'processBranch2']); // One node to many nodes
Control the behavior of your workflows:
// Basic execution
const result = await app.run(input);
// Execution with options
const resultWithOptions = await app.run(input, {
timeout: 5000, // Maximum execution time in ms
maxNodeVisits: 50, // Prevent infinite loops
});
// State graph initialization
const stateResult = await stateApp.run({ count: 10, name: 'test' }); // Initialize with partial state
// Prevent state reset
const noResetResult = await stateApp.run(undefined, {
noResetState: true // Don't reset state before execution
});
When compiling a workflow, you can specify:
// Only specify start node - runs until a node with no outgoing edges
const app = workflow.compile('inputNode');
// Specify both start and end nodes - terminates at end node
const appWithEnd = workflow.compile('inputNode', 'outputNode');
Monitor workflow execution with events:
app.subscribe((event) => {
// Workflow start event
if (event.eventType === 'WORKFLOW_START') {
console.log(`Workflow started with input:`, event.input);
}
// Node start event
else if (event.eventType === 'NODE_START') {
console.log(`Node started: ${event.node.name}, input:`, event.node.input);
}
// Node stream event (triggered by context.stream calls)
else if (event.eventType === 'NODE_STREAM') {
console.log(`Stream from node ${event.node.name}: ${event.node.chunk}`);
}
// Node end event
else if (event.eventType === 'NODE_END') {
if (event.isOk) {
console.log(`Node completed: ${event.node.name}, output:`, event.node.output);
} else {
console.error(`Node error: ${event.node.name}, error:`, event.error);
}
}
// Workflow end event
else if (event.eventType === 'WORKFLOW_END') {
if (event.isOk) {
console.log(`Workflow completed with output:`, event.output);
} else {
console.error(`Workflow error:`, event.error);
}
}
});
Add middleware to intercept, modify, or redirect node execution:
const app = workflow.compile('startNode');
// Add middleware
app.use((node, next) => {
console.log(`About to execute node: ${node.name}, input:`, node.input);
// Modify input and continue with same node
if (node.name === 'validation') {
next({ name: node.name, input: { ...node.input, validated: true } });
}
// Redirect execution flow to a different node
else if (node.name === 'router' && node.input.special) {
next({ name: 'specialHandler', input: node.input });
}
// Continue normal execution flow
else {
next();
}
// Not calling next() would stop execution
});
ts-edge provides a robust error handling system:
try {
const result = await app.run(input);
if (result.isOk) {
console.log('Success:', result.output);
} else {
console.error('Execution error:', result.error);
}
} catch (error) {
console.error('Unexpected error:', error);
}
These helpers let you define nodes separately for better organization and reusability across files.
graphNode
- Create nodesimport { graphNode } from 'ts-edge';
// Create a node
const userNode = graphNode({
name: 'getUser',
execute: (id: string) => fetchUser(id),
metadata: { description: 'Fetches user data' }
});
// Infer types
type UserNodeType = graphNode.infer<typeof userNode>;
// { name: 'getUser', input: string, output: User }
// Use in graph
graph.addNode(userNode);
graphStateNode
- Create state nodesimport { graphStateNode, graphStore } from 'ts-edge';
// Define state and create store
type CounterState = {
count: number;
name: string;
updateCount: (count: number) => void;
updateName: (name: string) => void;
};
const store = graphStore<CounterState>((set) => {
return {
count: 0,
name: '',
updateName(name) {
set({ name });
},
updateCount(count) {
set({ count });
},
};
});
// Define node in separate file/module
const countNode = graphStateNode({
name: 'processCount',
execute: ({ count, updateCount }: CounterState) => {
if (count < 10) {
updateCount(10);
}
},
});
// Use in state graph
const stateGraph = createStateGraph(store).addNode(countNode);
graphMergeNode
- Create merge nodesimport { graphMergeNode } from 'ts-edge';
// Create a merge node
const mergeNode = graphMergeNode({
name: 'combine',
branch: ['userData', 'userStats'],
execute: (inputs) => ({ ...inputs.userData, stats: inputs.userStats }),
});
// Use in graph
graph.addMergeNode(mergeNode);
graphNodeRouter
- Create routersimport { graphNodeRouter } from 'ts-edge';
// Create a simple router
const simpleRouter = graphNodeRouter((data) => (
data.isValid ? 'success' : 'error'
));
// Create a router with explicit targets
const complexRouter = graphNodeRouter(
['success', 'warning', 'error'],
(data) => {
if (data.score > 90) return 'success';
if (data.score > 50) return 'warning';
return 'error';
}
);
// Use in graph
graph.dynamicEdge('validate', simpleRouter);
MIT
FAQs
A strongly-typed graph-based workflow engine for building flexible, composable data processing pipelines with TypeScript
We found that ts-edge demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
/Security News
Threat actors hijacked Toptal’s GitHub org, publishing npm packages with malicious payloads that steal tokens and attempt to wipe victim systems.
Research
/Security News
Socket researchers investigate 4 malicious npm and PyPI packages with 56,000+ downloads that install surveillance malware.
Security News
The ongoing npm phishing campaign escalates as attackers hijack the popular 'is' package, embedding malware in multiple versions.