
Security News
Vite+ Joins the Push to Consolidate JavaScript Tooling
Evan You announces Vite+, a commercial, Rust-powered toolchain built on the Vite ecosystem to unify JavaScript development and fund open source.
@alepha/batch
Advanced tools
Efficiently process operations in groups by size or time.
This package is part of the Alepha framework and can be installed via the all-in-one package:
npm install alepha
This module allows you to group multiple asynchronous operations into a single "batch," which is then processed together. This is an essential pattern for improving performance, reducing I/O, and interacting efficiently with rate-limited APIs or databases.
import { Alepha, $hook, run, t } from "alepha";
import { $batch } from "alepha/batch";
class LoggingService {
// define the batch processor
logBatch = $batch({
schema: t.string(),
maxSize: 10,
maxDuration: [5, "seconds"],
handler: async (items) => {
console.log(`[BATCH LOG] Processing ${items.length} events:`, items);
},
});
// example of how to use it
onReady = $hook({
on: "ready",
handler: async () => {
this.logBatch.push("Application started.");
this.logBatch.push("User authenticated.");
// ... more events pushed from elsewhere in the app
},
});
}
This module can be imported and used as follows:
import { Alepha, run } from "alepha";
import { AlephaBatch } from "alepha/batch";
const alepha = Alepha.create()
.with(AlephaBatch);
run(alepha);
Descriptors are functions that define and configure various aspects of your application. They follow the convention of starting with $
and return configured descriptor instances.
For more details, see the Descriptors documentation.
Creates a batch processing descriptor for efficient grouping and processing of multiple operations.
This descriptor provides a powerful batching mechanism that collects multiple individual items and processes them together in groups, significantly improving performance by reducing overhead and enabling bulk operations. It supports partitioning, concurrent processing, automatic flushing, and intelligent retry mechanisms for robust batch processing workflows.
Key Features
Use Cases
Perfect for optimizing high-throughput operations:
Basic database batch operations:
import { $batch } from "alepha/batch";
import { t } from "alepha";
class UserService {
userBatch = $batch({
schema: t.object({
id: t.string(),
name: t.string(),
email: t.string(),
createdAt: t.optional(t.string())
}),
maxSize: 50, // Process up to 50 users at once
maxDuration: [5, "seconds"], // Or flush every 5 seconds
handler: async (users) => {
// Bulk insert users - much faster than individual inserts
console.log(`Processing batch of ${users.length} users`);
const result = await this.database.users.insertMany(users.map(user => ({
...user,
createdAt: user.createdAt || new Date().toISOString()
})));
console.log(`Successfully inserted ${result.length} users`);
return { inserted: result.length, userIds: result.map(r => r.id) };
}
});
async createUser(userData: { name: string; email: string }) {
// Individual calls are automatically batched
const result = await this.userBatch.push({
id: generateId(),
name: userData.name,
email: userData.email
});
return result; // Returns the batch result once batch is processed
}
}
API call batching with partitioning:
class NotificationService {
notificationBatch = $batch({
schema: t.object({
userId: t.string(),
type: t.enum(["email", "sms", "push"]),
message: t.string(),
priority: t.enum(["high", "normal", "low"])
}),
maxSize: 100,
maxDuration: [10, "seconds"],
// Partition by notification type for different processing
partitionBy: (notification) => notification.type,
concurrency: 3, // Process up to 3 different types simultaneously
handler: async (notifications) => {
const type = notifications[0].type; // All items in batch have same type
console.log(`Processing ${notifications.length} ${type} notifications`);
switch (type) {
case 'email':
return await this.emailProvider.sendBulk(notifications.map(n => ({
to: n.userId,
subject: 'Notification',
body: n.message,
priority: n.priority
})));
case 'sms':
return await this.smsProvider.sendBulk(notifications.map(n => ({
to: n.userId,
message: n.message
})));
case 'push':
return await this.pushProvider.sendBulk(notifications.map(n => ({
userId: n.userId,
title: 'Notification',
body: n.message,
priority: n.priority
})));
}
}
});
async sendNotification(userId: string, type: 'email' | 'sms' | 'push', message: string, priority: 'high' | 'normal' | 'low' = 'normal') {
// Notifications are automatically batched by type
return await this.notificationBatch.push({
userId,
type,
message,
priority
});
}
}
Log aggregation with retry logic:
class LoggingService {
logBatch = $batch({
schema: t.object({
timestamp: t.number(),
level: t.enum(["info", "warn", "error"]),
message: t.string(),
metadata: t.optional(t.record(t.string(), t.any())),
source: t.string()
}),
maxSize: 1000, // Large batches for log efficiency
maxDuration: [30, "seconds"], // Longer duration for log aggregation
concurrency: 2, // Limit concurrent log shipments
retry: {
maxAttempts: 5,
delay: [2, "seconds"],
backoff: "exponential"
},
handler: async (logEntries) => {
console.log(`Shipping ${logEntries.length} log entries`);
try {
// Ship logs to external service (e.g., Elasticsearch, Splunk)
const response = await this.logShipper.bulkIndex({
index: 'application-logs',
body: logEntries.map(entry => ([
{ index: { _index: 'application-logs' } },
{
...entry,
'@timestamp': new Date(entry.timestamp).toISOString()
}
])).flat()
});
if (response.errors) {
console.error(`Some log entries failed to index`, response.errors);
// Retry will be triggered by throwing
throw new Error(`Failed to index ${response.errors.length} log entries`);
}
console.log(`Successfully shipped ${logEntries.length} log entries`);
return { shipped: logEntries.length, indexedAt: Date.now() };
} catch (error) {
console.error(`Failed to ship logs batch`, error);
throw error; // Trigger retry mechanism
}
}
});
async log(level: 'info' | 'warn' | 'error', message: string, metadata?: Record<string, any>, source: string = 'application') {
// Individual log calls are batched and shipped efficiently
return await this.logBatch.push({
timestamp: Date.now(),
level,
message,
metadata,
source
});
}
}
File processing with dynamic partitioning:
class FileProcessingService {
fileProcessingBatch = $batch({
schema: t.object({
filePath: t.string(),
fileType: t.enum(["image", "video", "document"]),
processingOptions: t.object({
quality: t.optional(t.enum(["low", "medium", "high"])),
format: t.optional(t.string()),
compress: t.optional(t.boolean())
}),
priority: t.enum(["urgent", "normal", "background"])
}),
maxSize: 20, // Smaller batches for file processing
maxDuration: [2, "minutes"], // Reasonable time for file accumulation
// Partition by file type and priority for optimal resource usage
partitionBy: (file) => `${file.fileType}-${file.priority}`,
concurrency: 4, // Multiple concurrent processing pipelines
retry: {
maxAttempts: 3,
delay: [5, "seconds"]
},
handler: async (files) => {
const fileType = files[0].fileType;
const priority = files[0].priority;
console.log(`Processing ${files.length} ${fileType} files with ${priority} priority`);
try {
const results = [];
for (const file of files) {
const result = await this.processFile(file.filePath, file.fileType, file.processingOptions);
results.push({
originalPath: file.filePath,
processedPath: result.outputPath,
size: result.size,
duration: result.processingTime
});
}
// Update database with batch results
await this.updateProcessingStatus(results);
console.log(`Successfully processed ${files.length} ${fileType} files`);
return {
processed: files.length,
fileType,
priority,
totalSize: results.reduce((sum, r) => sum + r.size, 0),
results
};
} catch (error) {
console.error(`Batch file processing failed for ${fileType} files`, error);
throw error;
}
}
});
async processFile(filePath: string, fileType: 'image' | 'video' | 'document', options: any, priority: 'urgent' | 'normal' | 'background' = 'normal') {
// Files are automatically batched by type and priority
return await this.fileProcessingBatch.push({
filePath,
fileType,
processingOptions: options,
priority
});
}
}
FAQs
Efficiently process operations in groups by size or time.
The npm package @alepha/batch receives a total of 193 weekly downloads. As such, @alepha/batch popularity was classified as not popular.
We found that @alepha/batch demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Evan You announces Vite+, a commercial, Rust-powered toolchain built on the Vite ecosystem to unify JavaScript development and fund open source.
Security News
Ruby Central’s incident report on the RubyGems.org access dispute sparks backlash from former maintainers and renewed debate over project governance.
Research
/Security News
Socket researchers uncover how threat actors weaponize Discord across the npm, PyPI, and RubyGems ecosystems to exfiltrate sensitive data.