
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
A TypeScript framework for building composable data processing pipelines with handlers
A TypeScript framework for building composable LLM workflows with handlers. Inspired by the Graph + Shared Store architecture pattern and designed for type safety, modularity, and ease of use.
npm install boba-t
import { Handler, Pipeline } from 'boba-t';
interface WorkflowData extends SharedData {
input: string;
output?: string;
}
// Create a simple handler
class DoubleHandler extends Handler<string, string, WorkflowData> {
protected prepareInputs(sharedData: Readonly<WorkflowData>): string {
return sharedData.input;
}
protected handleRequest(input: string): string {
return input + input; // Double the string
}
protected processResults(
sharedData: WorkflowData,
inputs: string,
outputs: string,
): string {
sharedData.output = outputs;
return 'success';
}
}
// Create a pipeline
const handler = new DoubleHandler();
const pipeline = new Pipeline(handler);
// Execute
const sharedData: WorkflowData = { input: 'hello' };
const action = await pipeline.run(sharedData);
console.log(sharedData.output); // "hellohello"
Handlers are the building blocks of your workflow. They follow a 3-phase lifecycle:
import { Handler, SharedData } from 'boba-t';
interface UserData extends SharedData {
userId: string;
userData?: User;
}
class FetchUserHandler extends Handler<string, User, UserData> {
protected prepareInputs(sharedData: Readonly<UserData>): string {
return sharedData.userId;
}
protected async handleRequest(userId: string): Promise<User> {
const response = await fetch(`/api/users/${userId}`);
return response.json();
}
protected processResults(
sharedData: UserData,
inputs: string,
outputs: User,
): string {
sharedData.userData = outputs;
return 'user_fetched';
}
}
Pipelines orchestrate connected handlers through action-based transitions:
import { Pipeline } from 'boba-t';
// Create handlers
const validator = new ValidationHandler();
const processor = new ProcessingHandler();
const formatter = new FormattingHandler();
// Connect handlers with actions
validator.connectTo(processor, 'valid');
validator.connectTo(new ErrorHandler(), 'invalid');
processor.connectTo(formatter, 'success');
// Create and run pipeline
const pipeline = new Pipeline(validator);
const result = await pipeline.run(sharedData);
All handlers share a common data store for communication:
interface WorkflowData extends SharedData {
userQuery: string;
searchResults?: any[];
response?: string;
}
const sharedData: WorkflowData = {
userQuery: 'What is TypeScript?'
};
// Handlers can read from and write to shared data
await pipeline.run(sharedData);
console.log(sharedData.response); // Final result
Process collections of items through the same handler logic:
import { BatchHandler } from 'boba-t';
class FileBatchProcessor extends BatchHandler<string, ProcessedFile, FileData> {
protected prepareBatchInputs(sharedData: Readonly<FileData>): string[] {
return sharedData.filenames;
}
protected async processSingleItem(filename: string): Promise<ProcessedFile> {
const content = await readFile(filename);
return { filename, content, processed: true };
}
protected processBatchResults(
sharedData: FileData,
inputs: string[],
outputs: ProcessedFile[],
): string {
sharedData.processedFiles = outputs;
return 'batch_complete';
}
}
Execute workflows concurrently for better performance:
import { ParallelBatchPipeline } from 'boba-t';
// Create a workflow for processing a single file
const fileWorkflow = new Pipeline(readHandler);
readHandler.connectTo(processHandler, 'success');
// Process multiple files in parallel
class ParallelFileProcessor extends ParallelBatchPipeline<FileData> {
protected prepareBatchParams(sharedData: Readonly<FileData>): HandlerParams[] {
return sharedData.filenames.map(filename => ({ filename }));
}
}
const parallelProcessor = new ParallelFileProcessor(fileWorkflow);
await parallelProcessor.run({ filenames: ['file1.txt', 'file2.txt'] });
Handle asynchronous operations with built-in retry logic:
import { AsyncHandler } from 'boba-t';
class APIHandler extends AsyncHandler<APIRequest, APIResponse, APIData> {
constructor() {
super({ maxRetries: 3, retryDelayMs: 1000 });
}
protected async prepareInputsAsync(sharedData: Readonly<APIData>): Promise<APIRequest> {
return { endpoint: sharedData.endpoint, apiKey: sharedData.apiKey };
}
protected async handleRequestAsync(request: APIRequest): Promise<APIResponse> {
const response = await fetch(request.endpoint, {
headers: { 'Authorization': `Bearer ${request.apiKey}` }
});
return response.json();
}
protected async processResultsAsync(
sharedData: APIData,
inputs: APIRequest,
outputs: APIResponse,
): Promise<string> {
sharedData.apiResponse = outputs;
return 'api_success';
}
}
Handlers support robust error handling with fallback mechanisms:
class RobustHandler extends Handler<Input, Output, Data> {
constructor() {
super({ maxRetries: 3, retryDelayMs: 1000 });
}
protected handleRequest(input: Input): Output {
// This might fail
return processInput(input);
}
protected handleError(input: Input, error: Error): Output {
console.warn('Processing failed, using fallback:', error.message);
return getFallbackResult(input);
}
protected processResults(
sharedData: Data,
inputs: Input,
outputs: Output,
): string {
return outputs.isFallback ? 'fallback_used' : 'success';
}
}
Build complex workflows with flexible routing:
// Conditional routing
const analyzer = new QueryAnalyzer();
const webSearch = new WebSearchHandler();
const calculator = new CalculatorHandler();
const generator = new ResponseGenerator();
// Connect based on analysis results
analyzer.connectTo(webSearch, 'needs_search');
analyzer.connectTo(calculator, 'needs_calculation');
analyzer.connectTo(generator, 'direct_answer');
// All paths lead to response generation
webSearch.connectTo(generator, 'search_complete');
calculator.connectTo(generator, 'calculation_complete');
Configure handlers with parameters:
const handler = new LLMHandler();
handler.setParams({
model: 'gpt-4',
temperature: 0.7,
maxTokens: 1000
});
// Parameters are automatically merged into shared data
await handler.run(sharedData);
// sharedData now contains the parameters
The foundation class for all handlers.
Key Methods:
setParams(params: HandlerParams): this - Set handler parametersconnectTo(handler: BaseHandler, action?: string): BaseHandler - Connect to next handlerrun(sharedData: TSharedData): Promise<ActionResult> - Execute the handlerLifecycle Methods (Override these):
prepareInputs(sharedData: Readonly<TSharedData>): TInput - Extract inputshandleRequest(inputs: TInput): TOutput | Promise<TOutput> - Core logicprocessResults(sharedData: TSharedData, inputs: TInput, outputs: TOutput): ActionResult - Update shared dataEnhanced handler with retry logic and error handling.
Constructor:
config?: { maxRetries?: number; retryDelayMs?: number }Additional Methods:
handleError(inputs: TInput, error: Error): TOutput - Error fallbackAsync-optimized handler for I/O-intensive operations.
Async Lifecycle Methods:
prepareInputsAsync(sharedData: Readonly<TSharedData>): Promise<TInput>handleRequestAsync(inputs: TInput): Promise<TOutput>processResultsAsync(sharedData: TSharedData, inputs: TInput, outputs: TOutput): Promise<ActionResult>handleErrorAsync(inputs: TInput, error: Error): Promise<TOutput>Process collections of items sequentially.
Abstract Methods:
prepareBatchInputs(sharedData: Readonly<TSharedData>): TInput[]processSingleItem(item: TInput): TOutput | Promise<TOutput>processBatchResults(sharedData: TSharedData, inputs: TInput[], outputs: TOutput[]): ActionResultOrchestrates connected handlers through action-based transitions.
Constructor:
startHandler: TStartHandler - The first handler in the workflowMethods:
run(sharedData: SharedData): Promise<ActionResult> - Execute the pipelinegetStartHandler(): BaseHandler - Get the starting handlerRuns a complete pipeline multiple times with different parameter sets.
Abstract Methods:
prepareBatchParams(sharedData: Readonly<TSharedData>): HandlerParams[]processBatchResults(sharedData: TSharedData, inputs: HandlerParams[], outputs: void): ActionResultConcurrent version of BatchPipeline using Promise.all().
Same API as BatchPipeline but executes pipeline instances in parallel.
interface LLMWorkflowData extends SharedData {
userQuery: string;
searchResults?: any[];
response?: string;
}
class QueryAnalyzer extends Handler<string, AnalysisResult, LLMWorkflowData> {
protected prepareInputs(sharedData: Readonly<LLMWorkflowData>): string {
return sharedData.userQuery;
}
protected async handleRequest(query: string): Promise<AnalysisResult> {
// Analyze if query needs web search
return { needsSearch: query.includes('latest') || query.includes('current') };
}
protected processResults(
sharedData: LLMWorkflowData,
inputs: string,
outputs: AnalysisResult,
): string {
return outputs.needsSearch ? 'search' : 'direct_answer';
}
}
class WebSearchHandler extends Handler<string, any[], LLMWorkflowData> {
protected prepareInputs(sharedData: Readonly<LLMWorkflowData>): string {
return sharedData.userQuery;
}
protected async handleRequest(query: string): Promise<any[]> {
// Perform web search
return await searchWeb(query);
}
protected processResults(
sharedData: LLMWorkflowData,
inputs: string,
outputs: any[],
): string {
sharedData.searchResults = outputs;
return 'generate_response';
}
}
class ResponseGenerator extends Handler<GenerateInput, string, LLMWorkflowData> {
protected prepareInputs(sharedData: Readonly<LLMWorkflowData>): GenerateInput {
return {
query: sharedData.userQuery,
context: sharedData.searchResults
};
}
protected async handleRequest(inputs: GenerateInput): Promise<string> {
// Generate response using LLM
return await generateResponse(inputs.query, inputs.context);
}
protected processResults(
sharedData: LLMWorkflowData,
inputs: GenerateInput,
outputs: string,
): string {
sharedData.response = outputs;
return 'complete';
}
}
// Build the workflow
const analyzer = new QueryAnalyzer();
const searcher = new WebSearchHandler();
const generator = new ResponseGenerator();
analyzer.connectTo(generator, 'direct_answer');
analyzer.connectTo(searcher, 'search');
searcher.connectTo(generator, 'generate_response');
const llmWorkflow = new Pipeline(analyzer);
// Use the workflow
const result = await llmWorkflow.run({
userQuery: "What's the latest news about TypeScript?"
});
class FileProcessor extends BatchPipeline<FileProcessingData> {
protected prepareBatchParams(sharedData: Readonly<FileProcessingData>): HandlerParams[] {
return sharedData.filenames.map(filename => ({ filename }));
}
protected processBatchResults(
sharedData: FileProcessingData,
inputs: HandlerParams[],
outputs: void,
): string {
return 'all_files_processed';
}
}
// Create file processing workflow
const readFile = new ReadFileHandler();
const analyzeContent = new AnalyzeContentHandler();
const saveResults = new SaveResultsHandler();
readFile.connectTo(analyzeContent, 'file_read');
analyzeContent.connectTo(saveResults, 'analysis_complete');
const fileWorkflow = new Pipeline(readFile);
const batchProcessor = new FileProcessor(fileWorkflow);
await batchProcessor.run({
filenames: ['doc1.txt', 'doc2.txt', 'doc3.txt']
});
Contributions are welcome! Please feel free to submit a Pull Request.
Apache License - see the LICENSE file for details.
FAQs
A TypeScript framework for building composable data processing pipelines with handlers
We found that boba-t demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.