
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
harmony-plugin-manager
Advanced tools
A comprehensive TypeScript library for generating harmonious color palettes with WCAG 2.1 accessibility compliance
A type-safe, dependency-aware plugin system built on Harmony Pipeline
Transform your applications into extensible, modular powerhouses with a plugin architecture that just works. Built with TypeScript-first design, comprehensive lifecycle management, and zero-configuration dependency resolution.
๐ฏ Type-Safe by Design - Full TypeScript support with generic type constraints
๐ Smart Dependency Resolution - Automatic topological sorting with circular dependency
detection
โก Zero Configuration - Works out of the box with sensible defaults
๐๏ธ Flexible Architecture - Builder patterns, factories, and middleware support
๐ก๏ธ Production Ready - Comprehensive error handling and validation
๐ Rich Execution Context - Shared state, logging, and metadata management
npm install harmony-plugin-manager harmony-pipeline
import { createPluginManager, simplePlugin } from 'harmony-plugin-manager';
// Create a plugin manager
const manager = createPluginManager<string, string>()
.register(simplePlugin({ name: 'hello', version: '1.0.0' }, input => `Hello, ${input}!`))
.register(simplePlugin({ name: 'exclaim', version: '1.0.0' }, input => `${input}!!!`))
.build();
// Execute the pipeline
const result = await manager.execute('World', {});
console.log(result.results[1].output); // "Hello, World!!!"
The library offers multiple plugin creation patterns to suit different development styles and requirements:
Quick and functional approach for straightforward transformations:
const validationPlugin = simplePlugin(
{ name: 'validate-email', version: '1.0.0' },
(email: string) => {
if (!email.includes('@')) throw new Error('Invalid email');
return email;
},
);
Benefits:
For complex plugins that need state management and lifecycle hooks, inherit from BasePlugin to
access the full plugin infrastructure:
The BasePlugin abstract class provides the foundation for all class-based plugins:
abstract class BasePlugin<
TInput = unknown,
TOutput = unknown,
TContext extends PluginContext = PluginContext,
> implements IPlugin<TInput, TOutput, TContext>
{
abstract readonly metadata: PluginMetadata;
abstract execute(input: TInput, context: TContext): Promise<TOutput> | TOutput;
// Lifecycle hooks with default implementations
async initialize(context: TContext): Promise<void> {
/* logging */
}
async cleanup(context: TContext): Promise<void> {
/* logging */
}
async onError(error: Error, context: TContext): Promise<void> {
/* error logging */
}
// Utility methods for consistent logging
protected logInfo(context: TContext, message: string, data?: unknown): void;
protected logWarning(context: TContext, message: string, data?: unknown): void;
protected logError(context: TContext, message: string, data?: unknown): void;
}
1. Consistent Lifecycle Management
// โ
Automatic resource setup and teardown
class DatabasePlugin extends BasePlugin<Query, Result> {
private connection?: DatabaseConnection;
async initialize(context: PluginContext): Promise<void> {
await super.initialize(context); // Gets consistent logging
this.connection = await createConnection();
// Resources automatically cleaned up in cleanup()
}
async cleanup(context: PluginContext): Promise<void> {
await this.connection?.close();
await super.cleanup(context); // Consistent cleanup logging
}
}
2. Built-in Error Handling Infrastructure
// โ
Structured error handling with context
class PaymentPlugin extends BasePlugin<Payment, ProcessedPayment> {
async onError(error: Error, context: PluginContext): Promise<void> {
// Custom error handling
if (error instanceof PaymentGatewayError) {
await this.handlePaymentFailure(error, context);
}
// Parent handles standard error logging and context updates
await super.onError(error, context);
}
}
3. Consistent Logging with Plugin Context
// โ
Structured logging that includes plugin metadata
class ValidationPlugin extends BasePlugin<Data, ValidatedData> {
async execute(data: Data, context: PluginContext): Promise<ValidatedData> {
// Logs automatically include plugin name and execution context
this.logInfo(context, 'Starting validation', { dataSize: data.items.length });
if (someWarningCondition) {
// Warnings are automatically aggregated in context
this.logWarning(context, 'Validation warning', { issue: 'minor format issue' });
}
return validatedData;
}
}
4. State Management and Resource Sharing
// โ
Clean state management with proper encapsulation
class CachingPlugin extends BasePlugin<CacheableData, CachedData> {
private cache = new Map<string, CachedData>();
private readonly MAX_CACHE_SIZE = 1000;
async execute(data: CacheableData, context: PluginContext): Promise<CachedData> {
// Instance state is isolated per plugin
const cached = this.cache.get(data.key);
if (cached) return cached;
const processed = await this.processData(data);
// Manage cache size
if (this.cache.size >= this.MAX_CACHE_SIZE) {
this.evictOldestEntries();
}
this.cache.set(data.key, processed);
return processed;
}
async cleanup(context: PluginContext): Promise<void> {
this.cache.clear(); // Automatic cleanup
await super.cleanup(context);
}
}
Benefits:
Understanding how data flows through the plugin pipeline is crucial for building reliable, predictable applications. The plugin system provides clear patterns for data transformation while maintaining state integrity.
Input data should be treated as immutable. Plugins receive read-only input and produce new output rather than modifying existing data.
// โ
GOOD: Immutable transformation
class UserNormalizerPlugin extends BasePlugin<RawUser, NormalizedUser> {
execute(input: RawUser): NormalizedUser {
// Create new object instead of modifying input
return {
id: input.user_id,
name: input.full_name.trim(),
email: input.email_address.toLowerCase(),
createdAt: new Date(input.created_timestamp),
};
}
}
// โ BAD: Mutating input data
class BadUserPlugin extends BasePlugin<RawUser, RawUser> {
execute(input: RawUser): RawUser {
input.email_address = input.email_address.toLowerCase(); // Mutates input!
input.processed = true; // Side effect!
return input;
}
}
Shared state mutations should only occur through the plugin context's controlled mechanisms.
class StatefulProcessorPlugin extends BasePlugin<Data, ProcessedData> {
execute(input: Data, context: PluginContext): ProcessedData {
// โ
GOOD: Controlled shared state mutation
const statistics = context.retrieve<ProcessingStats>('stats') || {
totalProcessed: 0,
errors: 0,
};
statistics.totalProcessed++;
context.share('stats', statistics);
// โ
GOOD: Immutable data transformation
return {
...input,
processedAt: Date.now(),
status: 'processed',
};
}
}
Each plugin receives the output of the previous plugin as its input, creating a clear data transformation pipeline.
interface UserRegistrationFlow {
// Stage 1: Raw form data
formData: FormData;
// Stage 2: After validation
validatedData: ValidatedUserData;
// Stage 3: After normalization
normalizedData: NormalizedUserData;
// Stage 4: After enrichment
enrichedData: EnrichedUserData;
// Stage 5: Final result
savedUser: SavedUser;
}
const registrationPipeline = createPluginManager<FormData, SavedUser>()
// Stage 1 โ 2: Validation
.register(
simplePlugin(
{ name: 'input-validator', version: '1.0.0' },
(formData: FormData): ValidatedUserData => {
if (!formData.email || !formData.password) {
throw new Error('Missing required fields');
}
return {
email: formData.email,
password: formData.password,
name: formData.name || '',
validatedAt: Date.now(),
};
},
),
)
// Stage 2 โ 3: Normalization
.register(
simplePlugin(
{
name: 'data-normalizer',
version: '1.0.0',
dependencies: ['input-validator'],
},
(validated: ValidatedUserData): NormalizedUserData => ({
email: validated.email.toLowerCase().trim(),
password: hashPassword(validated.password),
name: validated.name.trim(),
normalizedAt: Date.now(),
}),
),
)
// Stage 3 โ 4: Enrichment
.register(
simplePlugin(
{
name: 'user-enricher',
version: '1.0.0',
dependencies: ['data-normalizer'],
},
async (normalized: NormalizedUserData): Promise<EnrichedUserData> => {
const profile = await fetchUserProfile(normalized.email);
return {
...normalized,
profile,
enrichedAt: Date.now(),
};
},
),
)
// Stage 4 โ 5: Persistence
.register(
simplePlugin(
{
name: 'user-saver',
version: '1.0.0',
dependencies: ['user-enricher'],
},
async (enriched: EnrichedUserData, context): Promise<SavedUser> => {
const db = context.retrieve<Database>('database');
const savedUser = await db.users.create(enriched);
context.share('user-registration-result', {
userId: savedUser.id,
registeredAt: Date.now(),
});
return savedUser;
},
),
)
.build();
Handle complex data flows where processing branches and merges.
// Parallel processing with result aggregation
const dataAnalysisPipeline = createPluginManager<DataSet, AnalysisResult>()
.register(
simplePlugin(
{ name: 'data-validator', version: '1.0.0' },
(input: DataSet): ValidatedDataSet => validateAndCleanData(input),
),
)
// Branch 1: Statistical Analysis
.register(
simplePlugin(
{
name: 'statistical-analyzer',
version: '1.0.0',
dependencies: ['data-validator'],
},
async (data: ValidatedDataSet, context): Promise<ValidatedDataSet> => {
const stats = await performStatisticalAnalysis(data);
context.share('statistical-results', stats);
return data; // Pass through unchanged
},
),
)
// Branch 2: Trend Analysis
.register(
simplePlugin(
{
name: 'trend-analyzer',
version: '1.0.0',
dependencies: ['data-validator'],
},
async (data: ValidatedDataSet, context): Promise<ValidatedDataSet> => {
const trends = await performTrendAnalysis(data);
context.share('trend-results', trends);
return data; // Pass through unchanged
},
),
)
// Merge: Combine Results
.register(
simplePlugin(
{
name: 'result-aggregator',
version: '1.0.0',
dependencies: ['statistical-analyzer', 'trend-analyzer'],
},
(data: ValidatedDataSet, context): AnalysisResult => {
const stats = context.retrieve<StatisticalResults>('statistical-results');
const trends = context.retrieve<TrendResults>('trend-results');
return {
dataset: data,
statistics: stats,
trends: trends,
aggregatedAt: Date.now(),
};
},
),
)
.build();
State that belongs to a single plugin and doesn't need sharing.
class CachingPlugin extends BasePlugin<CacheableData, CacheableData> {
private cache = new Map<string, any>();
private readonly maxSize = 1000;
execute(input: CacheableData, context: PluginContext): CacheableData {
const key = this.generateCacheKey(input);
// Check plugin-local cache
if (this.cache.has(key)) {
context.logger.debug('Cache hit', { key });
return this.cache.get(key);
}
// Process and cache
const result = this.processData(input);
this.addToCache(key, result);
return result;
}
private addToCache(key: string, value: any): void {
if (this.cache.size >= this.maxSize) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}
this.cache.set(key, value);
}
}
State that needs to be shared between plugins in the same execution.
class DatabaseConnectionPlugin extends BasePlugin<any, any> {
async initialize(context: PluginContext): Promise<void> {
const settings = context.getSettings(this.metadata.name);
const connectionString = settings?.config.connectionString as string;
const db = await createDatabaseConnection(connectionString);
// Share connection for other plugins
context.share('database-connection', db);
context.share('connection-stats', {
connectedAt: Date.now(),
queriesExecuted: 0,
});
}
async cleanup(context: PluginContext): Promise<void> {
const db = context.retrieve<Database>('database-connection');
await db?.close();
}
}
class QueryExecutorPlugin extends BasePlugin<Query, QueryResult> {
async execute(input: Query, context: PluginContext): Promise<QueryResult> {
const db = context.retrieve<Database>('database-connection');
if (!db) {
throw new Error('Database connection not available');
}
const result = await db.execute(input);
// Update shared statistics
const stats = context.retrieve<ConnectionStats>('connection-stats');
if (stats) {
stats.queriesExecuted++;
context.share('connection-stats', stats);
}
return result;
}
}
Building up state across multiple plugin executions.
class MetricsCollectorPlugin extends BasePlugin<ProcessableData, ProcessableData> {
execute(input: ProcessableData, context: PluginContext): ProcessableData {
// Get or initialize metrics accumulator
const metrics = context.retrieve<ProcessingMetrics>('processing-metrics') || {
totalItems: 0,
successfulItems: 0,
failedItems: 0,
processingTimes: [],
errors: [],
};
const startTime = Date.now();
try {
const result = this.processItem(input);
// Update success metrics
metrics.totalItems++;
metrics.successfulItems++;
metrics.processingTimes.push(Date.now() - startTime);
context.share('processing-metrics', metrics);
return result;
} catch (error) {
// Update failure metrics
metrics.totalItems++;
metrics.failedItems++;
metrics.errors.push({
item: input.id,
error: error.message,
timestamp: Date.now(),
});
context.share('processing-metrics', metrics);
throw error;
}
}
}
class MetricsReporterPlugin extends BasePlugin<any, ProcessingReport> {
execute(input: any, context: PluginContext): ProcessingReport {
const metrics = context.retrieve<ProcessingMetrics>('processing-metrics');
if (!metrics) {
throw new Error('No processing metrics available');
}
const avgProcessingTime =
metrics.processingTimes.length > 0
? metrics.processingTimes.reduce((a, b) => a + b, 0) / metrics.processingTimes.length
: 0;
return {
summary: {
totalItems: metrics.totalItems,
successRate:
metrics.totalItems > 0 ? (metrics.successfulItems / metrics.totalItems) * 100 : 0,
averageProcessingTime: avgProcessingTime,
},
details: {
successfulItems: metrics.successfulItems,
failedItems: metrics.failedItems,
errors: metrics.errors,
processingTimes: metrics.processingTimes,
},
generatedAt: Date.now(),
};
}
}
Define explicit input/output types for each transformation stage.
// โ
GOOD: Clear type progression
interface PipelineStages {
raw: RawUserInput; // From form submission
validated: ValidatedUser; // After input validation
normalized: NormalizedUser; // After data normalization
enriched: EnrichedUser; // After external data enrichment
persisted: PersistedUser; // After database save
}
Design plugins as pure functions when possible.
// โ
GOOD: Pure transformation
const emailNormalizer = simplePlugin(
{ name: 'email-normalizer', version: '1.0.0' },
(user: UserWithEmail): UserWithNormalizedEmail => ({
...user,
email: user.email.toLowerCase().trim(),
}),
);
// โ
GOOD: Composable transformations
const userProcessingPipeline = createPluginManager<RawUser, ProcessedUser>()
.register(emailNormalizer)
.register(nameNormalizer)
.register(phoneNormalizer)
.build();
Ensure errors in one plugin don't corrupt shared state.
class ResilientProcessorPlugin extends BasePlugin<Data, ProcessedData> {
async execute(input: Data, context: PluginContext): Promise<ProcessedData> {
const processingId = `process-${Date.now()}`;
try {
// Create isolated processing scope
context.share(`${processingId}-status`, 'processing');
const result = await this.processData(input);
// Mark as successful
context.share(`${processingId}-status`, 'completed');
return result;
} catch (error) {
// Clean up any partial state
this.cleanupProcessingState(context, processingId);
context.share(`${processingId}-status`, 'failed');
throw error;
}
}
private cleanupProcessingState(context: PluginContext, processingId: string): void {
// Remove any partial state created during processing
context.deleteShared(`${processingId}-temp-data`);
context.deleteShared(`${processingId}-partial-results`);
}
}
class StateInspectorPlugin extends BasePlugin<any, any> {
execute(input: any, context: PluginContext): any {
if (process.env.NODE_ENV === 'development') {
this.logContextState(context);
}
return input; // Pass-through plugin
}
private logContextState(context: PluginContext): void {
const sharedKeys = context.getSharedKeys();
context.logger.debug('Current context state', {
executionId: context.executionId,
sharedKeys: sharedKeys,
sharedValues: sharedKeys.reduce(
(acc, key) => {
acc[key] = this.sanitizeForLogging(context.retrieve(key));
return acc;
},
{} as Record<string, any>,
),
});
}
private sanitizeForLogging(value: any): any {
// Remove sensitive data before logging
if (value && typeof value === 'object') {
const sanitized = { ...value };
delete sanitized.password;
delete sanitized.token;
delete sanitized.secret;
return sanitized;
}
return value;
}
}
class MemoryEfficientPlugin extends BasePlugin<LargeDataSet, ProcessedDataSet> {
async execute(input: LargeDataSet, context: PluginContext): Promise<ProcessedDataSet> {
// Process in chunks to avoid memory issues
const chunkSize = 1000;
const results: ProcessedItem[] = [];
for (let i = 0; i < input.items.length; i += chunkSize) {
const chunk = input.items.slice(i, i + chunkSize);
const processedChunk = await this.processChunk(chunk);
results.push(...processedChunk);
// Allow garbage collection between chunks
if (i % (chunkSize * 10) === 0) {
await new Promise(resolve => setImmediate(resolve));
}
}
return { items: results, processedAt: Date.now() };
}
}
Our factory methods solve specific architectural challenges by providing opinionated, battle-tested patterns:
Traditional validation often scatters throughout codebases, making it hard to maintain and test. The validation factory centralizes validation logic with consistent error handling.
// โ
GOOD: Dedicated validation plugin
const emailValidator = PluginFactory.validation(
'email-check',
(email: string, context) => {
// Comprehensive validation with context awareness
if (!email || typeof email !== 'string') {
throw new Error('Email must be a non-empty string');
}
if (!email.includes('@')) {
throw new Error('Email must contain @ symbol');
}
if (!email.includes('.')) {
throw new Error('Email must contain a domain');
}
if (email.length > 254) {
throw new Error('Email too long (max 254 characters)');
}
// Context-aware logging
context.logger.debug('Email validation passed', { email: email.substring(0, 3) + '***' });
return email.toLowerCase().trim(); // Normalize on successful validation
},
{
stopOnError: true, // Halt pipeline on validation failure
description: 'RFC-compliant email validation with normalization',
version: '1.2.0',
},
);
Lifecycle Execution:
Best Practices:
// โ
DO: Use descriptive validation names
const userAgeValidator = PluginFactory.validation('user-age-range', validateAge);
const passwordStrengthValidator = PluginFactory.validation('password-strength', validatePassword);
// โ
DO: Return normalized data
const phoneValidator = PluginFactory.validation('phone-format', phone => {
if (!isValidPhone(phone)) throw new Error('Invalid phone format');
return normalizePhoneNumber(phone); // Remove spaces, add country code
});
// โ
DO: Use context for conditional validation
const businessHourValidator = PluginFactory.validation('business-hours', (request, context) => {
const userTimezone = context.metadata.timezone;
const currentHour = getCurrentHour(userTimezone);
if (currentHour < 9 || currentHour > 17) {
throw new Error('Service only available during business hours (9 AM - 5 PM)');
}
return request;
});
Common Mistakes to Avoid:
// โ AVOID: Generic validation names
const validator1 = PluginFactory.validation('validate', someFunction);
const checker = PluginFactory.validation('check', someOtherFunction);
// โ AVOID: Side effects in validation
const badValidator = PluginFactory.validation('user-check', user => {
if (!user.email) throw new Error('No email');
// BAD: Don't modify external state in validation
updateUserLastSeen(user.id); // This is a side effect!
sendWelcomeEmail(user.email); // This is business logic!
return user;
});
// โ AVOID: Swallowing validation errors
const silentValidator = PluginFactory.validation('silent-check', data => {
try {
validateData(data);
return data;
} catch (error) {
// BAD: Don't hide validation failures
console.log('Validation failed, but continuing anyway');
return data; // This defeats the purpose of validation
}
});
Naming Convention:
{domain}-{validation-type}-validatoremail-format-validator, user-age-range-validator, password-strength-validatorData transformation is one of the most common operations in pipelines. The transform factory provides a clean pattern for shape changes, normalization, and enrichment.
// โ
GOOD: Comprehensive transformation with error handling
const userTransformer = PluginFactory.transform(
'user-normalizer',
(rawUser: RawUserData, context) => {
// Log transformation start
context.logger.debug('Transforming user data', { userId: rawUser.id });
// Get transformation settings
const settings = context.getSettings('user-normalizer');
const includeMetadata = settings?.config?.includeMetadata ?? true;
const transformed: NormalizedUser = {
// Required fields with fallbacks
id: rawUser.user_id || rawUser.id,
name: (rawUser.full_name || rawUser.name || '').trim(),
email: (rawUser.email_address || rawUser.email || '').toLowerCase(),
// Conditional transformations
phone: rawUser.phone_number ? normalizePhoneNumber(rawUser.phone_number) : undefined,
// Date transformations with validation
createdAt: rawUser.created_timestamp ? new Date(rawUser.created_timestamp) : new Date(),
// Nested object transformation
profile: {
firstName: extractFirstName(rawUser.full_name),
lastName: extractLastName(rawUser.full_name),
avatar: rawUser.profile_picture_url,
preferences: transformUserPreferences(rawUser.settings),
},
};
// Add metadata if enabled
if (includeMetadata) {
transformed.metadata = {
transformedAt: new Date().toISOString(),
transformVersion: '2.1.0',
sourceFormat: rawUser._format || 'unknown',
};
}
// Validate transformed data
if (!transformed.id || !transformed.email) {
throw new Error('Transformation failed: missing required fields');
}
context.logger.debug('User transformation completed', {
userId: transformed.id,
fieldsTransformed: Object.keys(transformed).length,
});
return transformed;
},
{
dependencies: ['user-validator'], // Only transform valid data
description: 'Transforms raw user data to normalized application format',
version: '2.1.0',
},
);
Lifecycle Execution:
Best Practices:
// โ
DO: Make transformations pure and predictable
const priceCalculator = PluginFactory.transform('price-calculator', order => ({
...order,
subtotal: calculateSubtotal(order.items),
tax: calculateTax(order.items, order.region),
total: calculateTotal(order.items, order.region),
}));
// โ
DO: Handle missing/invalid data gracefully
const addressNormalizer = PluginFactory.transform('address-normalizer', address => ({
street: address.street || '',
city: address.city || '',
state: address.state || address.province || '',
country: normalizeCountryCode(address.country || 'US'),
postalCode: normalizePostalCode(address.zip || address.postal_code || ''),
}));
// โ
DO: Use configuration for behavior customization
const currencyConverter = PluginFactory.transform(
'currency-converter',
(amount, context) => {
const targetCurrency =
context.getSettings('currency-converter')?.config?.targetCurrency || 'USD';
const rate = getExchangeRate(amount.currency, targetCurrency);
return {
...amount,
originalValue: amount.value,
originalCurrency: amount.currency,
value: amount.value * rate,
currency: targetCurrency,
convertedAt: new Date().toISOString(),
};
},
{ description: 'Converts monetary amounts between currencies' },
);
Common Mistakes to Avoid:
// โ AVOID: Mutating input data
const badTransformer = PluginFactory.transform('bad-transform', user => {
user.email = user.email.toLowerCase(); // BAD: Modifying input
user.processedAt = new Date(); // BAD: Side effects
return user; // BAD: Returning mutated input
});
// โ AVOID: Complex business logic in transformers
const overloadedTransformer = PluginFactory.transform('overloaded', order => {
// BAD: This should be separate plugins
sendConfirmationEmail(order.customerEmail); // Business logic
updateInventory(order.items); // Side effect
chargePaymentMethod(order.payment); // Business logic
return { ...order, status: 'processed' };
});
// โ AVOID: Ignoring transformation errors
const unsafeTransformer = PluginFactory.transform('unsafe', data => {
try {
return transformComplexData(data);
} catch (error) {
// BAD: Returning partial/corrupted data
return { error: 'transformation failed' };
}
});
Naming Convention:
{domain}-{transformation-type}user-normalizer, price-calculator, address-formatter, data-enricherCross-cutting concerns like logging, caching, authentication, and monitoring shouldn't be mixed with business logic. Middleware provides a clean separation.
// โ
GOOD: Comprehensive middleware with proper concern separation
const performanceMiddleware = PluginFactory.middleware(
'performance-monitor',
async (data, context, next) => {
const startTime = process.hrtime.bigint();
const memoryBefore = process.memoryUsage();
// Pre-processing: Setup monitoring
const requestId = context.executionId;
const dataSize = JSON.stringify(data).length;
context.logger.info('Processing started', {
requestId,
dataSize,
timestamp: new Date().toISOString(),
});
try {
// Execute the next plugin in the chain
const result = await next();
// Post-processing: Collect metrics
const endTime = process.hrtime.bigint();
const memoryAfter = process.memoryUsage();
const duration = Number(endTime - startTime) / 1_000_000; // Convert to milliseconds
const metrics = {
requestId,
duration,
memoryDelta: memoryAfter.heapUsed - memoryBefore.heapUsed,
inputSize: dataSize,
outputSize: JSON.stringify(result).length,
success: true,
};
// Store metrics for other plugins
const allMetrics = context.retrieve<any[]>('performance-metrics') || [];
allMetrics.push(metrics);
context.share('performance-metrics', allMetrics);
// Add performance warning if slow
if (duration > 1000) {
context.addWarning('SLOW_PROCESSING', `Processing took ${duration}ms`, metrics);
}
context.logger.info('Processing completed', metrics);
return result;
} catch (error) {
// Error handling with metrics
const endTime = process.hrtime.bigint();
const duration = Number(endTime - startTime) / 1_000_000;
context.logger.error('Processing failed', {
requestId,
duration,
error: error.message,
stack: error.stack,
});
// Re-throw to maintain error flow
throw error;
}
},
{
description: 'Monitors performance metrics and memory usage',
version: '1.3.0',
},
);
Lifecycle Execution:
next() to execute wrapped pluginsBest Practices:
// โ
DO: Keep middleware focused on single concerns
const authMiddleware = PluginFactory.middleware('auth-guard', async (request, context, next) => {
// Only handle authentication
const user = await authenticateRequest(request);
context.share('authenticated-user', user);
return next();
});
const cacheMiddleware = PluginFactory.middleware('cache-layer', async (input, context, next) => {
// Only handle caching
const cacheKey = generateCacheKey(input);
const cached = getFromCache(cacheKey);
if (cached) return cached;
const result = await next();
setInCache(cacheKey, result);
return result;
});
// โ
DO: Always call next() unless intentionally stopping the pipeline
const rateLimitMiddleware = PluginFactory.middleware(
'rate-limiter',
async (request, context, next) => {
const userId = request.userId;
const isAllowed = await checkRateLimit(userId);
if (!isAllowed) {
throw new Error('Rate limit exceeded');
}
// Always continue to next plugin
return next();
},
);
// โ
DO: Use middleware for monitoring and observability
const tracingMiddleware = PluginFactory.middleware(
'request-tracer',
async (data, context, next) => {
const span = startSpan('plugin-execution', {
pluginName: context.metadata.currentPlugin,
executionId: context.executionId,
});
try {
const result = await next();
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.setStatus({ code: SpanStatusCode.ERROR, message: error.message });
throw error;
} finally {
span.end();
}
},
);
Common Mistakes to Avoid:
// โ AVOID: Forgetting to call next()
const brokenMiddleware = PluginFactory.middleware('broken', async (data, context, next) => {
console.log('Processing data');
// BAD: Missing next() call - pipeline stops here
return data; // This bypasses all subsequent plugins
});
// โ AVOID: Modifying data in middleware (use transforms instead)
const mutatingMiddleware = PluginFactory.middleware('mutating', async (data, context, next) => {
data.processedAt = new Date(); // BAD: Modifying input
const result = await next();
result.completedAt = new Date(); // BAD: Modifying output
return result;
});
// โ AVOID: Heavy business logic in middleware
const businessLogicMiddleware = PluginFactory.middleware(
'business',
async (order, context, next) => {
// BAD: This should be separate plugins
await calculateTaxes(order);
await validateInventory(order);
await processPayment(order);
return next(); // Middleware should wrap, not replace business logic
},
);
// โ AVOID: Swallowing errors without re-throwing
const silentMiddleware = PluginFactory.middleware('silent', async (data, context, next) => {
try {
return await next();
} catch (error) {
console.log('Error occurred:', error.message);
// BAD: Not re-throwing - hides errors from pipeline
return { error: 'Something went wrong' };
}
});
Naming Convention:
{concern}-{type} or {domain}-{concern}auth-guard, cache-layer, request-logger, performance-monitor, error-tracker1. Code Organization: Prevents plugin logic from being scattered across the codebase 2. Consistency: Ensures similar plugins follow the same patterns 3. Testability: Each factory creates easily testable, isolated units 4. Reusability: Common patterns become reusable building blocks 5. Maintainability: Changes to patterns only require factory updates 6. Type Safety: Factory methods provide better TypeScript inference 7. Best Practices: Built-in patterns prevent common anti-patterns
Benefits:
const manager = createPluginBuilder<UserData, ProcessedUser>()
.plugin(validationPlugin, { priority: 100 })
.plugin(transformPlugin, { priority: 50 })
.when(process.env.NODE_ENV === 'development', debugPlugin)
.group({
tag: 'formatters',
items: [{ plugin: dateFormatterPlugin }, { plugin: currencyFormatterPlugin }],
})
.withPriority(200, criticalPlugin)
.validate()
.build();
The plugin system is extensible through custom factory methods. You can create specialized plugin factories that encapsulate domain-specific patterns, provide pre-configured behaviors, or implement complex plugin compositions.
Custom factories solve common development challenges:
Create utility classes with static factory methods for related plugin types.
/**
* Database plugin factory for standardized database operations
*/
export class DatabasePluginFactory {
/**
* Creates a database query plugin with connection management
*/
static query<TQuery, TResult>(
name: string,
queryHandler: (query: TQuery, db: Database) => Promise<TResult>,
options?: {
version?: string;
timeout?: number;
retryAttempts?: number;
enableQueryLogging?: boolean;
},
): IPlugin<TQuery, TResult> {
return PluginFactory.create(
{
name: `${name}-query`,
version: options?.version || '1.0.0',
description: `Database query plugin: ${name}`,
tags: ['database', 'query'],
},
async (query, context) => {
const db = context.retrieve<Database>('database-connection');
if (!db) {
throw new Error('Database connection not available');
}
const timeout = options?.timeout || 30000;
const enableLogging = options?.enableQueryLogging ?? false;
if (enableLogging) {
context.logger.debug(`Executing query: ${name}`, { query });
}
const startTime = Date.now();
try {
const result = await Promise.race([
queryHandler(query, db),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('Query timeout')), timeout),
),
]);
if (enableLogging) {
const duration = Date.now() - startTime;
context.logger.debug(`Query completed: ${name}`, { duration });
}
return result;
} catch (error) {
const duration = Date.now() - startTime;
context.logger.error(`Query failed: ${name}`, {
error: error.message,
duration,
});
throw error;
}
},
);
}
/**
* Creates a database transaction plugin with automatic rollback
*/
static transaction<TInput, TOutput>(
name: string,
transactionHandler: (input: TInput, tx: DatabaseTransaction) => Promise<TOutput>,
options?: {
version?: string;
isolationLevel?: 'READ_COMMITTED' | 'REPEATABLE_READ' | 'SERIALIZABLE';
timeout?: number;
},
): IPlugin<TInput, TOutput> {
return PluginFactory.create(
{
name: `${name}-transaction`,
version: options?.version || '1.0.0',
description: `Database transaction plugin: ${name}`,
tags: ['database', 'transaction'],
},
async (input, context) => {
const db = context.retrieve<Database>('database-connection');
if (!db) {
throw new Error('Database connection not available');
}
const transaction = await db.beginTransaction({
isolationLevel: options?.isolationLevel || 'READ_COMMITTED',
timeout: options?.timeout || 60000,
});
try {
context.logger.debug(`Starting transaction: ${name}`);
const result = await transactionHandler(input, transaction);
await transaction.commit();
context.logger.debug(`Transaction committed: ${name}`);
return result;
} catch (error) {
await transaction.rollback();
context.logger.error(`Transaction rolled back: ${name}`, {
error: error.message,
});
throw error;
}
},
);
}
/**
* Creates a database migration plugin
*/
static migration(
name: string,
migrations: Array<{
version: string;
up: (db: Database) => Promise<void>;
down: (db: Database) => Promise<void>;
}>,
options?: {
version?: string;
checkOnly?: boolean;
},
): IPlugin<void, MigrationResult> {
return PluginFactory.create(
{
name: `${name}-migration`,
version: options?.version || '1.0.0',
description: `Database migration plugin: ${name}`,
tags: ['database', 'migration', 'setup'],
},
async (_, context) => {
const db = context.retrieve<Database>('database-connection');
if (!db) {
throw new Error('Database connection not available');
}
const migrationResults: Array<{
version: string;
status: 'applied' | 'skipped' | 'failed';
error?: string;
}> = [];
for (const migration of migrations) {
try {
const isApplied = await db.isMigrationApplied(migration.version);
if (isApplied) {
migrationResults.push({
version: migration.version,
status: 'skipped',
});
continue;
}
if (options?.checkOnly) {
migrationResults.push({
version: migration.version,
status: 'skipped',
});
continue;
}
context.logger.info(`Applying migration: ${migration.version}`);
await migration.up(db);
await db.markMigrationApplied(migration.version);
migrationResults.push({
version: migration.version,
status: 'applied',
});
} catch (error) {
context.logger.error(`Migration failed: ${migration.version}`, {
error: error.message,
});
migrationResults.push({
version: migration.version,
status: 'failed',
error: error.message,
});
if (!options?.checkOnly) {
throw error; // Fail fast on migration errors
}
}
}
return {
totalMigrations: migrations.length,
applied: migrationResults.filter(r => r.status === 'applied').length,
skipped: migrationResults.filter(r => r.status === 'skipped').length,
failed: migrationResults.filter(r => r.status === 'failed').length,
results: migrationResults,
};
},
);
}
}
Create factories for specific business domains with rich functionality.
/**
* API client factory for consistent external service integration
*/
export class ApiClientFactory {
/**
* Creates a RESTful API client plugin with built-in retry and error handling
*/
static restClient<TRequest, TResponse>(
name: string,
config: {
baseUrl: string;
defaultHeaders?: Record<string, string>;
timeout?: number;
retryAttempts?: number;
retryDelay?: number;
authProvider?: (context: PluginContext) => Promise<string>;
},
): IPlugin<TRequest & { endpoint: string; method: string }, TResponse> {
return PluginFactory.withRetry(
name,
async (request, context) => {
const headers = { ...config.defaultHeaders };
// Add authentication if provider exists
if (config.authProvider) {
const authToken = await config.authProvider(context);
headers['Authorization'] = `Bearer ${authToken}`;
}
const url = `${config.baseUrl}${request.endpoint}`;
const startTime = Date.now();
context.logger.debug(`API Request: ${request.method} ${url}`);
const response = await fetch(url, {
method: request.method,
headers: {
'Content-Type': 'application/json',
...headers,
},
body: request.method !== 'GET' ? JSON.stringify(request) : undefined,
signal: AbortSignal.timeout(config.timeout || 30000),
});
const duration = Date.now() - startTime;
if (!response.ok) {
const error = new Error(`API Error: ${response.status} ${response.statusText}`);
context.logger.error(`API Request failed: ${request.method} ${url}`, {
status: response.status,
duration,
});
throw error;
}
const result = await response.json();
context.logger.debug(`API Request completed: ${request.method} ${url}`, {
status: response.status,
duration,
});
// Track API metrics
const metrics = context.retrieve<ApiMetrics>('api-metrics') || {
totalRequests: 0,
successfulRequests: 0,
failedRequests: 0,
totalDuration: 0,
};
metrics.totalRequests++;
metrics.successfulRequests++;
metrics.totalDuration += duration;
context.share('api-metrics', metrics);
return result;
},
{
maxAttempts: config.retryAttempts || 3,
backoffMs: config.retryDelay || 1000,
retryOn: error => {
// Retry on network errors but not on client errors (4xx)
return !error.message.includes('4');
},
},
);
}
/**
* Creates a GraphQL client plugin
*/
static graphqlClient<TVariables, TResponse>(
name: string,
config: {
endpoint: string;
defaultHeaders?: Record<string, string>;
timeout?: number;
authProvider?: (context: PluginContext) => Promise<string>;
},
): IPlugin<{ query: string; variables?: TVariables }, TResponse> {
return PluginFactory.create(
{
name: `${name}-graphql`,
version: '1.0.0',
description: `GraphQL client plugin: ${name}`,
tags: ['api', 'graphql', 'client'],
},
async (request, context) => {
const headers = { ...config.defaultHeaders };
if (config.authProvider) {
const authToken = await config.authProvider(context);
headers['Authorization'] = `Bearer ${authToken}`;
}
const response = await fetch(config.endpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...headers,
},
body: JSON.stringify({
query: request.query,
variables: request.variables,
}),
signal: AbortSignal.timeout(config.timeout || 30000),
});
const result = await response.json();
if (result.errors) {
throw new Error(`GraphQL errors: ${JSON.stringify(result.errors)}`);
}
return result.data;
},
);
}
}
Create factories that combine multiple plugins into complete workflows.
/**
* Data processing workflow factory
*/
export class DataWorkflowFactory {
/**
* Creates a complete ETL (Extract, Transform, Load) workflow
*/
static etlWorkflow<TSource, TTransformed, TDestination>(
name: string,
config: {
extractor: (source: TSource, context: PluginContext) => Promise<any[]>;
transformer: (item: any, context: PluginContext) => Promise<TTransformed>;
loader: (items: TTransformed[], context: PluginContext) => Promise<TDestination>;
batchSize?: number;
enableParallel?: boolean;
errorHandling?: 'fail-fast' | 'continue' | 'collect-errors';
},
): IPlugin<TSource, TDestination> {
return PluginFactory.create(
{
name: `${name}-etl`,
version: '1.0.0',
description: `ETL workflow plugin: ${name}`,
tags: ['etl', 'workflow', 'data-processing'],
},
async (source, context) => {
const batchSize = config.batchSize || 100;
const enableParallel = config.enableParallel ?? true;
const errorHandling = config.errorHandling || 'fail-fast';
context.logger.info(`Starting ETL workflow: ${name}`);
// Extract phase
const extractStartTime = Date.now();
const rawData = await config.extractor(source, context);
const extractDuration = Date.now() - extractStartTime;
context.logger.info(`Extraction completed: ${rawData.length} items`, {
duration: extractDuration,
});
// Transform phase
const transformStartTime = Date.now();
const transformedItems: TTransformed[] = [];
const errors: Array<{ item: any; error: string }> = [];
for (let i = 0; i < rawData.length; i += batchSize) {
const batch = rawData.slice(i, i + batchSize);
const batchPromises = enableParallel
? batch.map(async item => {
try {
return await config.transformer(item, context);
} catch (error) {
if (errorHandling === 'fail-fast') {
throw error;
}
errors.push({ item, error: error.message });
return null;
}
})
: [];
if (enableParallel) {
const batchResults = await Promise.all(batchPromises);
transformedItems.push(...batchResults.filter(item => item !== null));
} else {
for (const item of batch) {
try {
const transformed = await config.transformer(item, context);
transformedItems.push(transformed);
} catch (error) {
if (errorHandling === 'fail-fast') {
throw error;
}
errors.push({ item, error: error.message });
}
}
}
}
const transformDuration = Date.now() - transformStartTime;
context.logger.info(`Transformation completed: ${transformedItems.length} items`, {
duration: transformDuration,
errors: errors.length,
});
if (errors.length > 0) {
context.addWarning(
'TRANSFORMATION_ERRORS',
`${errors.length} items failed transformation`,
{ errors: errors.slice(0, 10) }, // Log first 10 errors
);
}
// Load phase
const loadStartTime = Date.now();
const result = await config.loader(transformedItems, context);
const loadDuration = Date.now() - loadStartTime;
context.logger.info(`Loading completed`, {
duration: loadDuration,
});
// Store workflow metrics
const workflowMetrics = {
extractDuration,
transformDuration,
loadDuration,
totalDuration: extractDuration + transformDuration + loadDuration,
itemsExtracted: rawData.length,
itemsTransformed: transformedItems.length,
itemsLoaded: transformedItems.length,
errors: errors.length,
};
context.share(`${name}-etl-metrics`, workflowMetrics);
return result;
},
);
}
/**
* Creates a data validation and cleansing workflow
*/
static dataQualityWorkflow<TInput, TOutput>(
name: string,
config: {
validators: Array<{
name: string;
validate: (item: TInput) => ValidationResult;
}>;
cleaners: Array<{
name: string;
clean: (item: TInput) => TInput;
}>;
qualityThreshold?: number; // Percentage of items that must pass validation
outputInvalidItems?: boolean;
},
): IPlugin<TInput[], { valid: TOutput[]; invalid: TInput[]; metrics: QualityMetrics }> {
return PluginFactory.create(
{
name: `${name}-data-quality`,
version: '1.0.0',
description: `Data quality workflow plugin: ${name}`,
tags: ['data-quality', 'validation', 'cleansing'],
},
async (input, context) => {
const qualityThreshold = config.qualityThreshold || 95;
const validItems: TOutput[] = [];
const invalidItems: TInput[] = [];
const validationResults: Record<string, number> = {};
context.logger.info(`Starting data quality workflow: ${name}`, {
totalItems: input.length,
validators: config.validators.length,
cleaners: config.cleaners.length,
});
for (const item of input) {
let currentItem = item;
let isValid = true;
const itemValidationResults: string[] = [];
// Apply validators
for (const validator of config.validators) {
const result = validator.validate(currentItem);
validationResults[validator.name] = (validationResults[validator.name] || 0) + 1;
if (!result.isValid) {
isValid = false;
itemValidationResults.push(`${validator.name}: ${result.errors.join(', ')}`);
}
}
if (isValid) {
// Apply cleaners to valid items
for (const cleaner of config.cleaners) {
currentItem = cleaner.clean(currentItem);
}
validItems.push(currentItem as TOutput);
} else {
invalidItems.push(currentItem);
context.logger.debug(`Item failed validation`, {
item: currentItem,
failures: itemValidationResults,
});
}
}
const qualityScore = (validItems.length / input.length) * 100;
const metrics: QualityMetrics = {
totalItems: input.length,
validItems: validItems.length,
invalidItems: invalidItems.length,
qualityScore,
validationResults,
passedThreshold: qualityScore >= qualityThreshold,
};
context.logger.info(`Data quality workflow completed`, metrics);
if (qualityScore < qualityThreshold) {
context.addWarning(
'QUALITY_THRESHOLD_NOT_MET',
`Data quality score ${qualityScore.toFixed(2)}% below threshold ${qualityThreshold}%`,
metrics,
);
}
return {
valid: validItems,
invalid: config.outputInvalidItems ? invalidItems : [],
metrics,
};
},
);
}
}
// Create user management plugins using the database factory
const userQueryPlugin = DatabasePluginFactory.query(
'user-finder',
async (query: { email: string }, db) => {
return await db.users.findByEmail(query.email);
},
{
timeout: 5000,
enableQueryLogging: true,
},
);
const userUpdatePlugin = DatabasePluginFactory.transaction(
'user-updater',
async (update: UserUpdate, tx) => {
const user = await tx.users.findById(update.userId);
if (!user) {
throw new Error('User not found');
}
return await tx.users.update(update.userId, update.data);
},
{
isolationLevel: 'REPEATABLE_READ',
},
);
const migrationPlugin = DatabasePluginFactory.migration('user-schema', [
{
version: '001',
up: async db => {
await db.exec(`
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`);
},
down: async db => {
await db.exec('DROP TABLE users');
},
},
]);
// Create API plugins using the client factory
const userServiceClient = ApiClientFactory.restClient<UserRequest, UserResponse>('user-service', {
baseUrl: 'https://api.userservice.com/v1',
timeout: 10000,
retryAttempts: 3,
authProvider: async context => {
const authService = context.retrieve<AuthService>('auth-service');
return await authService.getToken();
},
});
const analyticsClient = ApiClientFactory.graphqlClient<AnalyticsVariables, AnalyticsData>(
'analytics',
{
endpoint: 'https://analytics.example.com/graphql',
timeout: 15000,
},
);
// Create ETL workflow for user data processing
const userDataETL = DataWorkflowFactory.etlWorkflow<UserDataSource, ProcessedUser, ETLResult>(
'user-processing',
{
extractor: async (source, context) => {
const db = context.retrieve<Database>('database');
return await db.users.findAll({ active: true });
},
transformer: async (user, context) => {
return {
id: user.id,
name: user.name.trim(),
email: user.email.toLowerCase(),
processedAt: new Date().toISOString(),
};
},
loader: async (users, context) => {
const api = context.retrieve<ApiClient>('external-api');
return await api.bulkCreate(users);
},
batchSize: 50,
enableParallel: true,
errorHandling: 'collect-errors',
},
);
// Data quality workflow
const dataQualityPlugin = DataWorkflowFactory.dataQualityWorkflow<RawUserData, CleanUserData>(
'user-data-quality',
{
validators: [
{
name: 'email-format',
validate: user => ({
isValid: /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(user.email),
errors: user.email ? [] : ['Invalid email format'],
}),
},
{
name: 'required-fields',
validate: user => ({
isValid: !!(user.name && user.email),
errors: [!user.name && 'Name is required', !user.email && 'Email is required'].filter(
Boolean,
),
}),
},
],
cleaners: [
{
name: 'normalize-email',
clean: user => ({
...user,
email: user.email.toLowerCase().trim(),
}),
},
{
name: 'normalize-name',
clean: user => ({
...user,
name: user.name.trim(),
}),
},
],
qualityThreshold: 90,
},
);
// factories/index.ts
export { DatabasePluginFactory } from './database.factory';
export { ApiClientFactory } from './api-client.factory';
export { DataWorkflowFactory } from './data-workflow.factory';
// Export types for consumers
export type { DatabaseOptions, ApiClientConfig, ETLConfig, DataQualityConfig } from './types';
// Create domain-specific factory collections
export class ECommercePluginFactory {
static paymentProcessor = PaymentPluginFactory;
static inventory = InventoryPluginFactory;
static order = OrderPluginFactory;
static shipping = ShippingPluginFactory;
}
export class AnalyticsPluginFactory {
static dataCollection = DataCollectionFactory;
static reporting = ReportingFactory;
static visualization = VisualizationFactory;
}
Custom factory plugins provide a powerful way to standardize plugin development patterns within your organization while maintaining the flexibility and type safety of the core plugin system.
Customize behavior through plugin settings:
const manager = createPluginManager()
.register(plugin, {
enabled: true,
priority: 50,
config: {
timeout: 5000,
retries: 3,
cache: true,
},
})
.build();
Understanding how plugins are executed is crucial for building efficient, reliable pipelines:
class LifecycleAwarePlugin extends BasePlugin<Input, Output> {
readonly metadata = {
name: 'lifecycle-example',
version: '1.0.0',
dependencies: ['validator-plugin'],
};
// Phase 1: Initialization (once per pipeline build)
async initialize(context: PluginContext): Promise<void> {
await super.initialize(context); // Call parent for logging
// Setup resources, connections, caches
this.setupDatabaseConnection();
this.initializeCache();
this.registerEventHandlers();
// Store cleanup references
context.share(`cleanup-${this.metadata.name}`, this.cleanup.bind(this));
this.logInfo(context, 'Plugin initialized successfully');
}
// Phase 2: Execution (once per pipeline run)
async execute(input: Input, context: PluginContext): Promise<Output> {
// This is your main business logic
this.logInfo(context, 'Starting execution', { inputSize: input.data?.length });
// Access shared state from other plugins
const validationResults = context.retrieve<ValidationResults>('validation-results');
// Perform the main work
const result = await this.processData(input, validationResults);
// Share results for downstream plugins
context.share('processing-results', result);
this.logInfo(context, 'Execution completed', { outputSize: result.data?.length });
return result;
}
// Phase 3: Cleanup (once per pipeline completion/error)
async cleanup(context: PluginContext): Promise<void> {
// Close connections, clear caches, release resources
await this.closeDatabaseConnection();
this.clearCache();
this.unregisterEventHandlers();
this.logInfo(context, 'Plugin cleanup completed');
await super.cleanup(context); // Call parent for logging
}
// Phase 4: Error Handling (when errors occur)
async onError(error: Error, context: PluginContext): Promise<void> {
// Custom error handling, reporting, recovery
this.logError(context, 'Plugin execution failed', {
errorType: error.constructor.name,
errorMessage: error.message,
stackTrace: error.stack,
});
// Report to monitoring systems
await this.reportError(error, context);
// Attempt cleanup of partial state
await this.cleanupPartialState();
await super.onError(error, context);
}
}
The plugin manager resolves execution order through multiple factors:
// Example: E-commerce Order Processing
const orderPipeline = createPluginManager<Order, ProcessedOrder>()
.register(inventoryCheckPlugin, {
// No dependencies - can run first
priority: 100,
})
.register(priceCalculationPlugin, {
dependencies: ['inventory-check'], // Must run after inventory
priority: 200, // Higher priority, but dependency overrides
})
.register(taxCalculationPlugin, {
dependencies: ['price-calculation'], // Must run after pricing
priority: 50, // Lower priority, but dependency chain determines order
})
.register(shippingCalculationPlugin, {
dependencies: ['inventory-check'], // Parallel with price calculation
priority: 150,
})
.register(finalTotalPlugin, {
dependencies: ['price-calculation', 'tax-calculation', 'shipping-calculation'],
priority: 10, // Runs last due to dependencies
})
.build();
// Actual execution order:
// 1. inventory-check (no dependencies)
// 2. price-calculation, shipping-calculation (parallel - both depend only on inventory-check)
// 3. tax-calculation (depends on price-calculation)
// 4. final-total (depends on all calculations)
const authPipeline = createPluginManager<AuthRequest, AuthResult>()
// All run in parallel (no dependencies), but priority determines order
.register(rateLimitPlugin, { priority: 1000 }) // Highest priority - runs first
.register(ipWhitelistPlugin, { priority: 900 })
.register(basicValidationPlugin, { priority: 800 })
.register(bruteForceProtectionPlugin, { priority: 700 }) // Lowest priority - runs last
.build();
// Execution order: rate-limit โ ip-whitelist โ basic-validation โ brute-force-protection
// โ
GOOD: Design for parallel execution
const dataEnrichmentPipeline = createPluginManager<UserData, EnrichedUserData>()
.register(baseValidationPlugin) // Must run first
// These can run in parallel (all depend only on validation)
.register(userProfileEnricherPlugin, { dependencies: ['base-validation'] })
.register(preferencesLoaderPlugin, { dependencies: ['base-validation'] })
.register(activityHistoryPlugin, { dependencies: ['base-validation'] })
.register(recommendationsPlugin, { dependencies: ['base-validation'] })
// Final aggregation (depends on all enrichers)
.register(dataAggregatorPlugin, {
dependencies: [
'user-profile-enricher',
'preferences-loader',
'activity-history',
'recommendations',
],
})
.build();
class DatabaseConnectionPlugin extends BasePlugin<any, any> {
private static connectionPool: DatabasePool;
async initialize(context: PluginContext): Promise<void> {
// Share database connection across all plugins
if (!DatabaseConnectionPlugin.connectionPool) {
DatabaseConnectionPlugin.connectionPool = await createConnectionPool();
}
context.share('db-connection', DatabaseConnectionPlugin.connectionPool);
}
}
class DataReaderPlugin extends BasePlugin<Query, Data> {
async execute(query: Query, context: PluginContext): Promise<Data> {
// Reuse shared connection
const db = context.retrieve<DatabasePool>('db-connection');
return db.query(query.sql, query.params);
}
}
// โ BAD: Creates unnecessary blocking
const badPipeline = createPluginManager()
.register(pluginA) // No dependencies
.register(pluginB, { dependencies: ['plugin-a'] }) // Must wait for A
.register(pluginC, { dependencies: ['plugin-b'] }) // Must wait for B
.register(pluginD, { dependencies: ['plugin-c'] }) // Must wait for C
.build();
// Result: Completely serial execution
// โ
GOOD: Minimize blocking dependencies
const goodPipeline = createPluginManager()
.register(pluginA) // No dependencies
.register(pluginB) // No dependencies - can run parallel with A
.register(pluginC) // No dependencies - can run parallel with A & B
.register(pluginD, { dependencies: ['plugin-a', 'plugin-b', 'plugin-c'] }) // Only final aggregation depends on all
.build();
// Result: A, B, C run in parallel, then D
// โ BAD: Each plugin creates its own resources
class BadDatabasePlugin extends BasePlugin<Input, Output> {
async execute(input: Input, context: PluginContext): Promise<Output> {
const db = await createDatabaseConnection(); // New connection each time!
const result = await db.query(input.query);
await db.close();
return result;
}
}
// โ
GOOD: Shared resource pool
class GoodDatabasePlugin extends BasePlugin<Input, Output> {
async execute(input: Input, context: PluginContext): Promise<Output> {
const dbPool = context.retrieve<DatabasePool>('shared-db-pool');
const connection = await dbPool.getConnection();
try {
const result = await connection.query(input.query);
return result;
} finally {
dbPool.releaseConnection(connection);
}
}
}
// โ BAD: Accumulates memory over time
class LeakyPlugin extends BasePlugin<Input, Output> {
private cache = new Map(); // Never cleared!
async execute(input: Input, context: PluginContext): Promise<Output> {
this.cache.set(input.id, input); // Keeps growing
return this.process(input);
}
}
// โ
GOOD: Proper cleanup and size limits
class CleanPlugin extends BasePlugin<Input, Output> {
private cache = new Map();
private readonly MAX_CACHE_SIZE = 1000;
async execute(input: Input, context: PluginContext): Promise<Output> {
// Implement LRU or TTL cleanup
if (this.cache.size >= this.MAX_CACHE_SIZE) {
this.cleanupOldEntries();
}
this.cache.set(input.id, input);
return this.process(input);
}
async cleanup(context: PluginContext): Promise<void> {
this.cache.clear(); // Explicit cleanup
await super.cleanup(context);
}
}
// โ
Make plugins stateless where possible
class StatelessPlugin extends BasePlugin<Input, Output> {
async execute(input: Input, context: PluginContext): Promise<Output> {
// No instance variables - purely functional
return this.transform(input, context.getSettings(this.metadata.name));
}
}
// โ
Use context for state sharing instead of instance variables
class ContextAwarePlugin extends BasePlugin<Input, Output> {
async execute(input: Input, context: PluginContext): Promise<Output> {
const state =
context.retrieve<ProcessingState>('processing-state') || this.createInitialState();
const newState = this.updateState(state, input);
context.share('processing-state', newState);
return this.processWithState(input, newState);
}
}
class ResilientPlugin extends BasePlugin<Input, Output> {
async execute(input: Input, context: PluginContext): Promise<Output> {
const maxRetries = 3;
let attempt = 0;
while (attempt < maxRetries) {
try {
return await this.processData(input);
} catch (error) {
attempt++;
this.logWarning(context, `Attempt ${attempt}/${maxRetries} failed: ${error.message}`);
if (attempt === maxRetries) {
// Try fallback strategy
return this.fallbackProcess(input, context);
}
// Exponential backoff
await this.delay(Math.pow(2, attempt) * 1000);
}
}
}
}
class MeasuredPlugin extends BasePlugin<Input, Output> {
async execute(input: Input, context: PluginContext): Promise<Output> {
const startTime = process.hrtime.bigint();
try {
const result = await this.processData(input);
const duration = Number(process.hrtime.bigint() - startTime) / 1_000_000;
// Store performance metrics
const metrics = context.retrieve<PerformanceMetric[]>('metrics') || [];
metrics.push({
plugin: this.metadata.name,
duration,
inputSize: this.calculateSize(input),
outputSize: this.calculateSize(result),
timestamp: Date.now(),
});
context.share('metrics', metrics);
// Warning for slow operations
if (duration > 1000) {
context.addWarning('SLOW_PLUGIN', `Plugin took ${duration}ms to execute`);
}
return result;
} catch (error) {
const duration = Number(process.hrtime.bigint() - startTime) / 1_000_000;
this.logError(context, `Plugin failed after ${duration}ms`, { error: error.message });
throw error;
}
}
}
One of the most powerful features - intelligent dependency management that eliminates configuration headaches:
const plugins = [
simplePlugin(
{
name: 'base-validator',
version: '1.0.0',
},
validateInput,
),
simplePlugin(
{
name: 'enhanced-validator',
version: '1.0.0',
dependencies: ['base-validator'], // Will run after base-validator
},
enhancedValidation,
),
simplePlugin(
{
name: 'finalizer',
version: '1.0.0',
dependencies: ['enhanced-validator'], // Will run last
},
finalizeData,
),
];
// Execution order automatically: base-validator โ enhanced-validator โ finalizer
Handle sophisticated dependency relationships:
const authPipeline = createPluginManager<AuthRequest, AuthResponse>()
// No dependencies - can run first
.register(simplePlugin({ name: 'rate-limiter', version: '1.0.0' }, checkRateLimit))
// Depends on rate limiting
.register(
simplePlugin(
{
name: 'user-lookup',
version: '1.0.0',
dependencies: ['rate-limiter'],
},
lookupUser,
),
)
// Depends on user lookup
.register(
simplePlugin(
{
name: 'password-validator',
version: '1.0.0',
dependencies: ['user-lookup'],
},
validatePassword,
),
)
// Depends on user lookup (parallel with password validation)
.register(
simplePlugin(
{
name: 'session-checker',
version: '1.0.0',
dependencies: ['user-lookup'],
},
checkExistingSessions,
),
)
// Depends on both password validation and session checking
.register(
simplePlugin(
{
name: 'token-generator',
version: '1.0.0',
dependencies: ['password-validator', 'session-checker'],
},
generateAuthToken,
),
)
.build();
Execution Flow:
rate-limiter (no dependencies)user-lookup (after rate-limiter)password-validator and session-checker (parallel, after user-lookup)token-generator (after both validators complete)The system automatically validates your dependency configuration:
// โ This will throw an error during build
const invalidManager = createPluginManager()
.register(
simplePlugin(
{
name: 'dependent-plugin',
dependencies: ['non-existent-plugin'], // Error: dependency not found
},
someFunction,
),
)
.build(); // Throws: Plugin 'dependent-plugin' depends on 'non-existent-plugin' which is not registered
// โ Circular dependencies are detected
const circularManager = createPluginManager()
.register(simplePlugin({ name: 'plugin-a', dependencies: ['plugin-b'] }, funcA))
.register(
simplePlugin(
{ name: 'plugin-b', dependencies: ['plugin-a'] }, // Circular!
funcB,
),
)
.build(); // Throws: Circular dependency detected
// โ Disabled dependencies are caught
const disabledDepManager = createPluginManager()
.register(requiredPlugin, { enabled: false })
.register(simplePlugin({ name: 'dependent', dependencies: ['required-plugin'] }, func))
.build(); // Throws: depends on 'required-plugin' which is disabled
Priority + Dependencies: Dependencies override priority when needed
const manager = createPluginManager()
.register(dependentPlugin, {
priority: 1000, // High priority
dependencies: ['base-plugin'], // But still runs after base-plugin
})
.register(basePlugin, { priority: 1 }) // Low priority, but runs first due to dependency
.build();
Conditional Dependencies: Dependencies respect enabled/disabled state
const manager = createPluginBuilder()
.plugin(basePlugin)
.when(config.advancedMode, advancedPlugin, { dependencies: ['base-plugin'] })
.when(config.debugMode, debugPlugin, { dependencies: ['advanced-plugin'] })
.build();
// If advancedMode is false, debugPlugin won't have unmet dependencies
Benefits:
The execution context is the nerve center of your plugin pipeline, providing comprehensive state management and inter-plugin communication:
class DataProcessor extends BasePlugin<Data, ProcessedData> {
readonly metadata = { name: 'processor', version: '1.0.0' };
async execute(data: Data, context: PluginContext): Promise<ProcessedData> {
// ๐ Access execution metadata
this.logInfo(context, `Processing batch ${context.executionId}`);
this.logInfo(context, `Started at ${new Date(context.startTime).toISOString()}`);
// ๐ช Shared state management
const cache = context.retrieve<Map<string, any>>('cache') || new Map();
const processingStats = context.retrieve<ProcessingStats>('stats') || {
totalItems: 0,
processedItems: 0,
};
// โ๏ธ Plugin-specific configuration
const settings = context.getSettings('processor');
const batchSize = (settings?.config?.batchSize as number) || 100;
const enableCaching = (settings?.config?.enableCaching as boolean) || false;
// ๐ Processing with context awareness
const processed = await this.processInBatches(data, batchSize, cache, enableCaching);
// ๐ Update shared statistics
processingStats.totalItems += data.items.length;
processingStats.processedItems += processed.items.length;
context.share('stats', processingStats);
// โ ๏ธ Add warnings for monitoring
if (processed.items.length < data.items.length) {
context.addWarning(
'PARTIAL_PROCESSING',
`Only ${processed.items.length}/${data.items.length} items processed`,
{ failedItems: data.items.length - processed.items.length },
);
}
return processed;
}
private async processInBatches(
data: Data,
batchSize: number,
cache: Map<string, any>,
enableCaching: boolean,
): Promise<ProcessedData> {
// Implementation with caching logic
// ...
}
}
Data Sharing Between Plugins:
// Plugin A: Data Producer
class DataCollectorPlugin extends BasePlugin<Request, EnrichedRequest> {
async execute(request: Request, context: PluginContext): Promise<EnrichedRequest> {
const userData = await this.fetchUserData(request.userId);
// Share data for downstream plugins
context.share('user-data', userData);
context.share('collection-timestamp', Date.now());
return { ...request, userData };
}
}
// Plugin B: Data Consumer
class DataProcessorPlugin extends BasePlugin<EnrichedRequest, ProcessedRequest> {
async execute(request: EnrichedRequest, context: PluginContext): Promise<ProcessedRequest> {
// Retrieve shared data
const userData = context.retrieve<UserData>('user-data');
const collectionTime = context.retrieve<number>('collection-timestamp');
if (!userData) {
throw new Error('User data not available from previous plugin');
}
const processingDelay = Date.now() - collectionTime!;
this.logInfo(context, `Processing delay: ${processingDelay}ms`);
return this.processWithUserContext(request, userData);
}
}
State Accumulation Pattern:
class MetricsCollectorPlugin extends BasePlugin<any, any> {
async execute(input: any, context: PluginContext): Promise<any> {
// Get or create metrics accumulator
const metrics = context.retrieve<PluginMetrics>('metrics') || {
pluginsExecuted: [],
totalDuration: 0,
warnings: 0,
errors: 0,
};
// Update metrics
metrics.pluginsExecuted.push(this.metadata.name);
metrics.totalDuration += Date.now() - context.startTime;
// Store updated metrics
context.share('metrics', metrics);
return input; // Pass-through plugin
}
}
Conditional Logic Based on Context:
class AdaptiveProcessorPlugin extends BasePlugin<Data, ProcessedData> {
async execute(data: Data, context: PluginContext): Promise<ProcessedData> {
// Adapt behavior based on pipeline state
const failedPlugins = context.retrieve<string[]>('failed-plugins') || [];
const isHighPriority = context.metadata.priority === 'high';
const processingMode = failedPlugins.length > 0 ? 'safe' : 'optimized';
this.logInfo(context, `Using ${processingMode} processing mode`);
if (processingMode === 'safe') {
return this.safeProcess(data, context);
} else {
return this.optimizedProcess(data, context);
}
}
}
Context Cleanup and Resource Management:
class ResourceManagedPlugin extends BasePlugin<Input, Output> {
private connections: DatabaseConnection[] = [];
async initialize(context: PluginContext): Promise<void> {
await super.initialize(context);
// Store cleanup reference in context
context.share(`cleanup-${this.metadata.name}`, () => {
this.connections.forEach(conn => conn.close());
this.connections = [];
});
}
async cleanup(context: PluginContext): Promise<void> {
// Retrieve and execute cleanup
const cleanupFn = context.retrieve<() => void>(`cleanup-${this.metadata.name}`);
if (cleanupFn) {
cleanupFn();
context.deleteShared(`cleanup-${this.metadata.name}`);
}
await super.cleanup(context);
}
}
interface PluginContext<T extends Record<string, unknown>> {
// Execution Information
readonly executionId: string; // Unique execution identifier
readonly startTime: number; // Pipeline start timestamp
readonly metadata: Readonly<T>; // User-provided metadata
readonly logger: Logger; // Structured logging
// Plugin Configuration
readonly settings: ReadonlyMap<string, PluginSettings>;
getSettings(pluginName: string): PluginSettings | undefined;
// Shared State Management
readonly shared: Map<string, unknown>;
share<V>(key: string, value: V): void; // Store data
retrieve<V>(key: string): V | undefined; // Retrieve data
hasShared(key: string): boolean; // Check existence
deleteShared(key: string): boolean; // Remove data
clearShared(): void; // Clear all shared data
getSharedKeys(): readonly string[]; // List all keys
// Warning System
addWarning(code: string, message: string, details?: unknown): void;
}
Benefits:
const loggingMiddleware = PluginFactory.middleware(
'request-logger',
async (data, context, next) => {
console.log('Before processing:', data);
const result = await next();
console.log('After processing:', result);
return result;
},
);
const manager = createPluginBuilder<ApiRequest, ApiResponse>()
.when(config.enableCaching, cachingPlugin)
.when(config.enableAuth, authPlugin)
.when(process.env.NODE_ENV === 'production', compressionPlugin)
.build();
class ResilientPlugin extends BasePlugin<Input, Output> {
readonly metadata = { name: 'resilient', version: '1.0.0' };
async execute(input: Input, context: PluginContext): Promise<Output> {
try {
return await this.processData(input);
} catch (error) {
this.logWarning(context, 'Processing failed, using fallback');
return this.fallbackProcess(input);
}
}
async onError(error: Error, context: PluginContext): Promise<void> {
// Custom error handling
context.share('error-count', (context.retrieve('error-count') || 0) + 1);
}
}
Understanding how data flows through the plugin pipeline is crucial for building reliable, predictable applications. The plugin system provides clear patterns for data transformation while maintaining state integrity.
Input data should be treated as immutable. Plugins receive read-only input and produce new output rather than modifying existing data.
// โ
GOOD: Immutable transformation
class UserNormalizerPlugin extends BasePlugin<RawUser, NormalizedUser> {
execute(input: RawUser): NormalizedUser {
// Create new object instead of modifying input
return {
id: input.user_id,
name: input.full_name.trim(),
email: input.email_address.toLowerCase(),
createdAt: new Date(input.created_timestamp),
};
}
}
// โ BAD: Mutating input data
class BadUserPlugin extends BasePlugin<RawUser, RawUser> {
execute(input: RawUser): RawUser {
input.email_address = input.email_address.toLowerCase(); // Mutates input!
input.processed = true; // Side effect!
return input;
}
}
Shared state mutations should only occur through the plugin context's controlled mechanisms.
class StatefulProcessorPlugin extends BasePlugin<Data, ProcessedData> {
execute(input: Data, context: PluginContext): ProcessedData {
// โ
GOOD: Controlled shared state mutation
const statistics = context.retrieve<ProcessingStats>('stats') || {
totalProcessed: 0,
errors: 0,
};
statistics.totalProcessed++;
context.share('stats', statistics);
// โ
GOOD: Immutable data transformation
return {
...input,
processedAt: Date.now(),
status: 'processed',
};
}
}
Each plugin receives the output of the previous plugin as its input, creating a clear data transformation pipeline.
interface UserRegistrationFlow {
// Stage 1: Raw form data
formData: FormData;
// Stage 2: After validation
validatedData: ValidatedUserData;
// Stage 3: After normalization
normalizedData: NormalizedUserData;
// Stage 4: After enrichment
enrichedData: EnrichedUserData;
// Stage 5: Final result
savedUser: SavedUser;
}
const registrationPipeline = createPluginManager<FormData, SavedUser>()
// Stage 1 โ 2: Validation
.register(
simplePlugin(
{ name: 'input-validator', version: '1.0.0' },
(formData: FormData): ValidatedUserData => {
if (!formData.email || !formData.password) {
throw new Error('Missing required fields');
}
return {
email: formData.email,
password: formData.password,
name: formData.name || '',
validatedAt: Date.now(),
};
},
),
)
// Stage 2 โ 3: Normalization
.register(
simplePlugin(
{
name: 'data-normalizer',
version: '1.0.0',
dependencies: ['input-validator'],
},
(validated: ValidatedUserData): NormalizedUserData => ({
email: validated.email.toLowerCase().trim(),
password: hashPassword(validated.password),
name: validated.name.trim(),
normalizedAt: Date.now(),
}),
),
)
// Stage 3 โ 4: Enrichment
.register(
simplePlugin(
{
name: 'user-enricher',
version: '1.0.0',
dependencies: ['data-normalizer'],
},
async (normalized: NormalizedUserData): Promise<EnrichedUserData> => {
const profile = await fetchUserProfile(normalized.email);
return {
...normalized,
profile,
enrichedAt: Date.now(),
};
},
),
)
// Stage 4 โ 5: Persistence
.register(
simplePlugin(
{
name: 'user-saver',
version: '1.0.0',
dependencies: ['user-enricher'],
},
async (enriched: EnrichedUserData, context): Promise<SavedUser> => {
const db = context.retrieve<Database>('database');
const savedUser = await db.users.create(enriched);
context.share('user-registration-result', {
userId: savedUser.id,
registeredAt: Date.now(),
});
return savedUser;
},
),
)
.build();
Handle complex data flows where processing branches and merges.
// Parallel processing with result aggregation
const dataAnalysisPipeline = createPluginManager<DataSet, AnalysisResult>()
.register(
simplePlugin(
{ name: 'data-validator', version: '1.0.0' },
(input: DataSet): ValidatedDataSet => validateAndCleanData(input),
),
)
// Branch 1: Statistical Analysis
.register(
simplePlugin(
{
name: 'statistical-analyzer',
version: '1.0.0',
dependencies: ['data-validator'],
},
async (data: ValidatedDataSet, context): Promise<ValidatedDataSet> => {
const stats = await performStatisticalAnalysis(data);
context.share('statistical-results', stats);
return data; // Pass through unchanged
},
),
)
// Branch 2: Trend Analysis
.register(
simplePlugin(
{
name: 'trend-analyzer',
version: '1.0.0',
dependencies: ['data-validator'],
},
async (data: ValidatedDataSet, context): Promise<ValidatedDataSet> => {
const trends = await performTrendAnalysis(data);
context.share('trend-results', trends);
return data; // Pass through unchanged
},
),
)
// Merge: Combine Results
.register(
simplePlugin(
{
name: 'result-aggregator',
version: '1.0.0',
dependencies: ['statistical-analyzer', 'trend-analyzer'],
},
(data: ValidatedDataSet, context): AnalysisResult => {
const stats = context.retrieve<StatisticalResults>('statistical-results');
const trends = context.retrieve<TrendResults>('trend-results');
return {
dataset: data,
statistics: stats,
trends: trends,
aggregatedAt: Date.now(),
};
},
),
)
.build();
State that belongs to a single plugin and doesn't need sharing.
class CachingPlugin extends BasePlugin<CacheableData, CacheableData> {
private cache = new Map<string, any>();
private readonly maxSize = 1000;
execute(input: CacheableData, context: PluginContext): CacheableData {
const key = this.generateCacheKey(input);
// Check plugin-local cache
if (this.cache.has(key)) {
context.logger.debug('Cache hit', { key });
return this.cache.get(key);
}
// Process and cache
const result = this.processData(input);
this.addToCache(key, result);
return result;
}
private addToCache(key: string, value: any): void {
if (this.cache.size >= this.maxSize) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}
this.cache.set(key, value);
}
}
State that needs to be shared between plugins in the same execution.
class DatabaseConnectionPlugin extends BasePlugin<any, any> {
async initialize(context: PluginContext): Promise<void> {
const settings = context.getSettings(this.metadata.name);
const connectionString = settings?.config.connectionString as string;
const db = await createDatabaseConnection(connectionString);
// Share connection for other plugins
context.share('database-connection', db);
context.share('connection-stats', {
connectedAt: Date.now(),
queriesExecuted: 0,
});
}
async cleanup(context: PluginContext): Promise<void> {
const db = context.retrieve<Database>('database-connection');
await db?.close();
}
}
class QueryExecutorPlugin extends BasePlugin<Query, QueryResult> {
async execute(input: Query, context: PluginContext): Promise<QueryResult> {
const db = context.retrieve<Database>('database-connection');
if (!db) {
throw new Error('Database connection not available');
}
const result = await db.execute(input);
// Update shared statistics
const stats = context.retrieve<ConnectionStats>('connection-stats');
if (stats) {
stats.queriesExecuted++;
context.share('connection-stats', stats);
}
return result;
}
}
Building up state across multiple plugin executions.
class MetricsCollectorPlugin extends BasePlugin<ProcessableData, ProcessableData> {
execute(input: ProcessableData, context: PluginContext): ProcessableData {
// Get or initialize metrics accumulator
const metrics = context.retrieve<ProcessingMetrics>('processing-metrics') || {
totalItems: 0,
successfulItems: 0,
failedItems: 0,
processingTimes: [],
errors: [],
};
const startTime = Date.now();
try {
const result = this.processItem(input);
// Update success metrics
metrics.totalItems++;
metrics.successfulItems++;
metrics.processingTimes.push(Date.now() - startTime);
context.share('processing-metrics', metrics);
return result;
} catch (error) {
// Update failure metrics
metrics.totalItems++;
metrics.failedItems++;
metrics.errors.push({
item: input.id,
error: error.message,
timestamp: Date.now(),
});
context.share('processing-metrics', metrics);
throw error;
}
}
}
class MetricsReporterPlugin extends BasePlugin<any, ProcessingReport> {
execute(input: any, context: PluginContext): ProcessingReport {
const metrics = context.retrieve<ProcessingMetrics>('processing-metrics');
if (!metrics) {
throw new Error('No processing metrics available');
}
const avgProcessingTime =
metrics.processingTimes.length > 0
? metrics.processingTimes.reduce((a, b) => a + b, 0) / metrics.processingTimes.length
: 0;
return {
summary: {
totalItems: metrics.totalItems,
successRate:
metrics.totalItems > 0 ? (metrics.successfulItems / metrics.totalItems) * 100 : 0,
averageProcessingTime: avgProcessingTime,
},
details: {
successfulItems: metrics.successfulItems,
failedItems: metrics.failedItems,
errors: metrics.errors,
processingTimes: metrics.processingTimes,
},
generatedAt: Date.now(),
};
}
}
Define explicit input/output types for each transformation stage.
// โ
GOOD: Clear type progression
interface PipelineStages {
raw: RawUserInput; // From form submission
validated: ValidatedUser; // After input validation
normalized: NormalizedUser; // After data normalization
enriched: EnrichedUser; // After external data enrichment
persisted: PersistedUser; // After database save
}
Design plugins as pure functions when possible.
// โ
GOOD: Pure transformation
const emailNormalizer = simplePlugin(
{ name: 'email-normalizer', version: '1.0.0' },
(user: UserWithEmail): UserWithNormalizedEmail => ({
...user,
email: user.email.toLowerCase().trim(),
}),
);
// โ
GOOD: Composable transformations
const userProcessingPipeline = createPluginManager<RawUser, ProcessedUser>()
.register(emailNormalizer)
.register(nameNormalizer)
.register(phoneNormalizer)
.build();
Ensure errors in one plugin don't corrupt shared state.
class ResilientProcessorPlugin extends BasePlugin<Data, ProcessedData> {
async execute(input: Data, context: PluginContext): Promise<ProcessedData> {
const processingId = `process-${Date.now()}`;
try {
// Create isolated processing scope
context.share(`${processingId}-status`, 'processing');
const result = await this.processData(input);
// Mark as successful
context.share(`${processingId}-status`, 'completed');
return result;
} catch (error) {
// Clean up any partial state
this.cleanupProcessingState(context, processingId);
context.share(`${processingId}-status`, 'failed');
throw error;
}
}
private cleanupProcessingState(context: PluginContext, processingId: string): void {
// Remove any partial state created during processing
context.deleteShared(`${processingId}-temp-data`);
context.deleteShared(`${processingId}-partial-results`);
}
}
class StateInspectorPlugin extends BasePlugin<any, any> {
execute(input: any, context: PluginContext): any {
if (process.env.NODE_ENV === 'development') {
this.logContextState(context);
}
return input; // Pass-through plugin
}
private logContextState(context: PluginContext): void {
const sharedKeys = context.getSharedKeys();
context.logger.debug('Current context state', {
executionId: context.executionId,
sharedKeys: sharedKeys,
sharedValues: sharedKeys.reduce(
(acc, key) => {
acc[key] = this.sanitizeForLogging(context.retrieve(key));
return acc;
},
{} as Record<string, any>,
),
});
}
private sanitizeForLogging(value: any): any {
// Remove sensitive data before logging
if (value && typeof value === 'object') {
const sanitized = { ...value };
delete sanitized.password;
delete sanitized.token;
delete sanitized.secret;
return sanitized;
}
return value;
}
}
class MemoryEfficientPlugin extends BasePlugin<LargeDataSet, ProcessedDataSet> {
async execute(input: LargeDataSet, context: PluginContext): Promise<ProcessedDataSet> {
// Process in chunks to avoid memory issues
const chunkSize = 1000;
const results: ProcessedItem[] = [];
for (let i = 0; i < input.items.length; i += chunkSize) {
const chunk = input.items.slice(i, i + chunkSize);
const processedChunk = await this.processChunk(chunk);
results.push(...processedChunk);
// Allow garbage collection between chunks
if (i % (chunkSize * 10) === 0) {
await new Promise(resolve => setImmediate(resolve));
}
}
return { items: results, processedAt: Date.now() };
}
}
Customize behavior through plugin settings:
const manager = createPluginManager()
.register(plugin, {
enabled: true,
priority: 50,
config: {
timeout: 5000,
retries: 3,
cache: true,
},
})
.build();
PluginManager<TInput, TOutput, TMetadata>The main orchestrator for plugin execution.
interface IPluginManager<TInput, TOutput, TMetadata> {
register(plugin: IPlugin, settings?: Partial<PluginSettings>): this;
unregister(pluginName: string): this;
configure(pluginName: string, settings: Partial<PluginSettings>): this;
hasPlugin(name: string): boolean;
getPluginNames(): readonly string[];
build(): this;
execute(
input: TInput,
metadata: TMetadata,
options?: ExecutionOptions,
): Promise<ManagerResult<TOutput>>;
}
BasePlugin<TInput, TOutput, TContext>Abstract base class for creating plugins.
abstract class BasePlugin<TInput, TOutput, TContext extends PluginContext> {
abstract readonly metadata: PluginMetadata;
abstract execute(input: TInput, context: TContext): Promise<TOutput> | TOutput;
async initialize(context: TContext): Promise<void>;
async cleanup(context: TContext): Promise<void>;
async onError(error: Error, context: TContext): Promise<void>;
}
// Create a plugin manager
createPluginManager<TInput, TOutput, TMetadata>(): PluginManager
// Create a builder
createPluginBuilder<TInput, TOutput, TMetadata>(): PluginManagerBuilder
// Create simple plugins
simplePlugin<TInput, TOutput>(
metadata: PluginMetadata,
execute: (input: TInput, context: PluginContext) => Promise<TOutput> | TOutput,
hooks?: PluginHooks
): IPlugin
// Specialized plugin factories
PluginFactory.validation(name, validator, options?)
PluginFactory.transform(name, transformer, options?)
PluginFactory.middleware(name, middleware, options?)
interface PluginMetadata {
readonly name: string;
readonly version: string;
readonly description?: string;
readonly dependencies?: readonly string[];
readonly tags?: readonly string[];
}
interface PluginSettings {
readonly enabled: boolean;
readonly priority: number;
readonly config: Record<string, unknown>;
}
interface PluginResult<T> {
readonly pluginName: string;
readonly success: boolean;
readonly output?: T;
readonly error?: Error;
readonly duration: number;
readonly warnings: readonly string[];
}
interface ManagerResult<T> {
readonly success: boolean;
readonly results: readonly PluginResult<T>[];
readonly duration: number;
readonly errors: readonly Error[];
readonly warnings: readonly string[];
}
interface ApiRequest {
url: string;
method: string;
body?: any;
}
interface ApiResponse {
status: number;
data: any;
headers: Record<string, string>;
}
const apiManager = createPluginBuilder<ApiRequest, ApiResponse>()
.plugin(PluginFactory.validation('request-validator', validateRequest))
.plugin(authenticationPlugin, { priority: 90 })
.plugin(rateLimitPlugin, { priority: 80 })
.plugin(cachePlugin, { priority: 70 })
.plugin(requestProcessorPlugin, { priority: 50 })
.plugin(responseFormatterPlugin, { priority: 30 })
.plugin(loggingPlugin, { priority: 10 })
.build();
const response = await apiManager.execute(request, { userId: '123' });
const dataProcessor = createPluginManager<RawData, ProcessedData>()
.register(dataValidationPlugin)
.register(dataCleaningPlugin, {
dependencies: ['data-validation'],
config: { removeEmptyFields: true },
})
.register(dataTransformPlugin, { priority: 50 })
.register(dataEnrichmentPlugin, { priority: 30 })
.build();
const result = await dataProcessor.execute(rawData, {
processingId: uuid(),
timestamp: Date.now(),
});
interface FormData {
email: string;
password: string;
confirmPassword: string;
}
interface ValidationResult {
isValid: boolean;
errors: string[];
warnings: string[];
}
const formValidator = createPluginBuilder<FormData, ValidationResult>()
.plugin(PluginFactory.validation('email', validateEmail))
.plugin(PluginFactory.validation('password-strength', validatePasswordStrength))
.plugin(PluginFactory.validation('password-match', validatePasswordMatch))
.plugin(sanitizationPlugin)
.build();
The plugin manager provides comprehensive error handling:
const result = await manager.execute(input, metadata);
if (!result.success) {
// Handle global errors
result.errors.forEach(error => {
console.error('Pipeline error:', error.message);
});
// Handle individual plugin failures
result.results.forEach(pluginResult => {
if (!pluginResult.success) {
console.error(`Plugin ${pluginResult.pluginName} failed:`, pluginResult.error);
}
});
}
// Check for warnings
if (result.warnings.length > 0) {
console.warn('Pipeline warnings:', result.warnings);
}
The library is built with testing in mind:
// Test individual plugins
const mockContext = ContextFactory.create({}, new Map());
const result = await plugin.execute(testInput, mockContext);
// Test plugin manager
const testManager = createPluginManager<string, string>().register(testPlugin).build();
const result = await testManager.execute('test', {});
expect(result.success).toBe(true);
Customize behavior through plugin settings:
const manager = createPluginManager()
.register(plugin, {
enabled: true,
priority: 50,
config: {
timeout: 5000,
retries: 3,
cache: true,
},
})
.build();
We welcome contributions! Please see our Contributing Guide for details.
MIT ยฉ Khaled Sameer
Ready to build your next plugin-powered application? ๐
npm install harmony-plugin-manager harmony-pipeline
Start building modular, extensible applications today!
FAQs
A comprehensive TypeScript library for generating harmonious color palettes with WCAG 2.1 accessibility compliance
The npm package harmony-plugin-manager receives a total of 1 weekly downloads. As such, harmony-plugin-manager popularity was classified as not popular.
We found that harmony-plugin-manager demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago.ย It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.