
Research
Namastex.ai npm Packages Hit with TeamPCP-Style CanisterWorm Malware
Malicious Namastex.ai npm packages appear to replicate TeamPCP-style Canister Worm tradecraft, including exfiltration and self-propagation.
@dagengine/core
Advanced tools
🚀 Type-Safe DAG Engine for AI Workflows
Define task dependencies. Get automatic parallelization, cost tracking, and 10x speedup.
🚀 Quick Start • 📖 Documentation • 💬 Discussions • 🐛 Issues • 📦 Examples
dagengine is a TypeScript DAG engine that turns sequential AI workflows into parallel ones automatically.
// ❌ What most developers do (sequential, slow, expensive)
for (const item of items) {
const sentiment = await ai.analyze(item); // Wait...
const topics = await ai.extract(item); // Wait...
const summary = await ai.summarize(item); // Wait...
}
// Result: 100 items × 15 seconds = 25 minutes, $15
// ✅ With dagengine (parallel, fast, cheap)
const engine = new DagEngine({ plugin: new MyPlugin() });
const result = await engine.process(items);
// Result: 100 items in 2.5 minutes, $5
10x faster. 67% cheaper. Zero orchestration code.
Define dependencies → get automatic parallelization.
npm install @dagengine/core
Requirements: Node.js ≥ 18.0.0, TypeScript ≥ 5.0 (recommended)
import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';
// Define result types (optional but helps with TypeScript)
interface SentimentResult {
sentiment: "positive" | "negative" | "neutral";
score: number;
}
interface TopicsResult {
topics: string[];
}
// 1. Define your workflow
class ReviewAnalyzer extends Plugin {
constructor() {
super('analyzer', 'Review Analyzer', 'Analyzes reviews');
this.dimensions = ['sentiment', 'topics', 'summary'];
}
defineDependencies(): Record<string, string[]> {
return {
summary: ['sentiment', 'topics']
};
}
createPrompt(context: PromptContext): string {
const content = context.sections[0]?.content || '';
if (context.dimension === 'sentiment') {
return `Analyze sentiment: "${content}"
Return JSON: {"sentiment": "positive|negative|neutral", "score": 0-1}`;
}
if (context.dimension === 'topics') {
return `Extract topics: "${content}"
Return JSON: {"topics": ["topic1", "topic2"]}`;
}
if (context.dimension === 'summary') {
const sentiment = context.dependencies.sentiment?.data as SentimentResult;
const topics = context.dependencies.topics?.data as TopicsResult;
return `Create a ${sentiment.sentiment} summary covering ${topics.topics.join(', ')}:
"${content}"
Return JSON: {"summary": "summary text"}`;
}
throw new Error(`Unknown dimension: ${context.dimension}`);
}
selectProvider(): ProviderSelection {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
}
// 2. Process your data
async function main(): Promise<void> {
// Validate API key
if (!process.env.ANTHROPIC_API_KEY) {
console.error('❌ Missing ANTHROPIC_API_KEY environment variable');
console.error('Set it with: export ANTHROPIC_API_KEY="your-key"');
process.exit(1);
}
// Create engine
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
}
});
// Prepare reviews
const reviews = [
{ content: 'Great product!', metadata: { id: 1 } },
{ content: 'Not good.', metadata: { id: 2 } }
];
// Process
const result = await engine.process(reviews);
// Display results
console.log(JSON.stringify(result.sections[0]?.results, null, 4));
}
// 3. Run with error handling
main().catch((error: Error) => {
console.error('❌ Processing failed:', error.message);
process.exit(1);
});
What just happened?
sentiment and topics ran in parallel (both have no dependencies)summary waited for both to completeNext: Full Documentation • Examples • Production Guide
| Feature | DIY Code | LangChain | dagengine |
|---|---|---|---|
| Setup | Manual loops | Learn LCEL | 2 methods |
| Parallelization | Manual | Manual | Automatic |
| Cost Tracking | Manual calc | Manual calc | Built-in |
| TypeScript | ✅ Full | ⚠️ Partial | ✅ Full |
| Code (100 items) | 150 lines | 80 lines | 25 lines |
| Best For | Small scripts | RAG/Agents | Orchestration |
Use dagengine when:
Skip dagengine when:
🎯 Zero InfrastructureDefine task dependencies once. Engine handles execution order, parallelization, and coordination automatically. No queues, workers, or complex orchestration code. 💰 Cost OptimizedSkip low-value processing with conditional execution. Route tasks to optimal models. Track costs per dimension in real-time with automatic token counting. 🔄 Production ReadyAutomatic retry with exponential backoff. Provider fallback chains. Graceful error recovery with partial results. Battle-tested reliability. |
🌍 Multi-Provider SupportUse Anthropic Claude, OpenAI GPT, Google Gemini with a unified interface. Switch providers per dimension. Mix models in one workflow. 🪝 18 Lifecycle HooksFull async/await support. Integrate databases, caches, APIs at every processing stage. Transform data mid-pipeline. Complete control when you need it. 📊 Real-Time TrackingBuilt-in cost and token tracking per dimension and provider. Progress callbacks with throughput metrics. Detailed breakdowns in results. |
const sections = [
{
content: 'Customer review text here',
metadata: { id: 1, userId: 123, productId: 'SKU-789' }
}
];
Sections are the pieces of data you analyze (reviews, emails, documents, etc.).
this.dimensions = ['sentiment', 'topics', 'summary'];
Dimensions are the analyses you run. Each dimension processes all sections.
defineDependencies() {
return {
sentiment: [], // No dependencies (runs first)
topics: [], // No dependencies (runs first)
summary: ['sentiment', 'topics'] // Waits for both
};
}
Dependencies control execution order. Engine automatically parallelizes independent tasks.
Execution Plan:
sentiment ──┐
├─→ Both run in parallel → summary
topics ─────┘
Section Dimensions (default) - Analyze each item independently:
this.dimensions = ['sentiment']; // Runs once per section
Global Dimensions - Analyze all items together:
this.dimensions = [
{ name: 'categorize', scope: 'global' } // Runs once for all sections
];
class SmartAnalyzer extends Plugin {
dimensions = ['quality_check', 'deep_analysis'];
defineDependencies() {
return { deep_analysis: ['quality_check'] };
}
shouldSkipSectionDimension(context) {
if (context.dimension === 'deep_analysis') {
const quality = context.dependencies.quality_check.data;
return quality.score < 0.7; // Skip low-quality items
}
return false;
}
selectProvider(dimension) {
if (dimension === 'quality_check') {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' } // Cheap model
};
}
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' } // Expensive model
};
}
}
Result: 100 items → 40 high-quality → 60% fewer expensive API calls
selectProvider() {
return {
provider: 'anthropic',
options: { model: 'claude-sonnet-4-5-20250929' },
fallbacks: [
{ provider: 'openai', options: { model: 'gpt-4o' } },
{ provider: 'gemini', options: { model: 'gemini-2.5-pro' } }
]
};
}
Automatic failover: If Anthropic fails, automatically tries OpenAI, then Gemini.
class CategoryAnalyzer extends Plugin {
dimensions = [
'classify',
{ name: 'group_by_category', scope: 'global' },
'analyze_category'
];
transformSections(context) {
if (context.dimension === 'group_by_category') {
const categories = context.result.data.categories;
// Transform: 100 sections → 5 category groups
return categories.map(cat => ({
content: cat.items.join('\n\n'),
metadata: { category: cat.name, count: cat.items.length }
}));
}
}
}
Result: Analyze 5 category groups instead of 100 individual items (95% fewer API calls)
class DatabaseIntegratedPlugin extends Plugin {
async beforeProcessStart(context) {
// Initialize connections
await this.db.connect();
}
async shouldSkipSectionDimension(context) {
// Check cache before processing
const cached = await this.redis.get(`${context.section.id}:${context.dimension}`);
if (cached) return true;
return false;
}
async afterDimensionExecute(context) {
// Save results to database
await this.db.results.insert({
section: context.section.id,
dimension: context.dimension,
data: context.result.data
});
}
async afterProcessComplete(context) {
// Cleanup
await this.db.disconnect();
}
}
All 18 hooks support async/await for seamless external service integration.
| Provider | Description | Best For | Docs |
|---|---|---|---|
| Anthropic | Claude models for reasoning and analysis | Complex tasks, deep reasoning | Docs |
| OpenAI | GPT models for general-purpose tasks | Fast responses, versatile workflows | Docs |
| Google Gemini | Gemini models for high-speed processing | High throughput, multimodal inputs | Docs |
Mix and match: Route different dimensions to different providers in the same workflow.
selectProvider(dimension) {
if (dimension === 'quality_check') {
return { provider: 'gemini', options: { model: 'gemini-1.5-flash' } };
}
if (dimension === 'deep_analysis') {
return { provider: 'anthropic', options: { model: 'claude-sonnet-4-5-20250929' } };
}
}
dagengine supports Portkey as a unified AI gateway for advanced features:
| Feature | Direct Mode | With Portkey Gateway |
|---|---|---|
| Automatic Retries | ✅ Engine-level | ✅ Gateway-level with smart backoff |
| Rate Limit Handling | ⚠️ Manual | ✅ Automatic with queuing |
| Semantic Caching | ❌ | ✅ Reduce costs and latency |
| Load Balancing | ❌ | ✅ Multi-provider routing |
| Observability | ✅ Basic | ✅ Full dashboard & analytics |
Enable Portkey:
providers: {
anthropic: {
apiKey: process.env.ANTHROPIC_API_KEY,
gateway: 'portkey',
gatewayApiKey: process.env.PORTKEY_API_KEY,
gatewayConfig: 'pc-my-config-id' // Optional: retry/cache config
}
}
Learn more: Portkey Integration Guide • Portkey Docs
const engine = new DagEngine({
plugin: new MyPlugin(),
// Provider credentials
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY },
openai: { apiKey: process.env.OPENAI_API_KEY }
},
// Execution settings
execution: {
concurrency: 10, // Max parallel operations
maxRetries: 3, // Retry attempts
retryDelay: 1000, // Base delay (ms)
timeout: 60000, // Default timeout
continueOnError: true // Process partial results
},
// Cost tracking
pricing: {
models: {
'claude-sonnet-4-5-20250929': {
inputPer1M: 3.00,
outputPer1M: 15.00
}
}
},
// Progress display
progressDisplay: {
display: 'bar', // 'simple' | 'bar' | 'multi' | 'none'
showDimensions: true
}
});
# Clone repository
git clone https://github.com/dagengine/dagengine.git
cd dagengine
# Install dependencies
npm install
# Run tests
npm test
# Run tests with coverage
npm run test:coverage
# Type check
npm run type-check
# Build
npm run build
# Run all checks
npm run validate
We welcome contributions! Here's how to get started:
git clone https://github.com/YOUR_USERNAME/dagengine.gitgit checkout -b feature/your-feature-namenpm run validategit commit -m "feat: add your feature"git push origin feature/your-feature-namenpm run format && npm run lint:fix)npm test)npm run type-check)See CONTRIBUTING.md for detailed guidelines.
We take security seriously. See SECURITY.md for our security policy.
Never report security issues through public GitHub issues.
Use GitHub's private vulnerability reporting or email the maintainers directly.
Apache License 2.0 © dagengine contributors
Licensed under the Apache License, Version 2.0. See LICENSE for the full license text.
This license includes an explicit patent grant (Section 3), protecting users from patent litigation. See LICENSE for details.
Built with:
⭐ Star us on GitHub — it helps the project grow!
Made with ❤️ by the dagengine community
FAQs
Type-safe DAG execution engine for AI workflows
We found that @dagengine/core demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
Malicious Namastex.ai npm packages appear to replicate TeamPCP-style Canister Worm tradecraft, including exfiltration and self-propagation.

Product
Explore exportable charts for vulnerabilities, dependencies, and usage with Reports, Socket’s new extensible reporting framework.

Product
Socket for Jira lets teams turn alerts into Jira tickets with manual creation, automated ticketing rules, and two-way sync.