
Security News
PEP 810 Proposes Explicit Lazy Imports for Python 3.15
An opt-in lazy import keyword aims to speed up Python startups, especially CLIs, without the ecosystem-wide risks that sank PEP 690.
@alepha/batch
Advanced tools
Efficiently process operations in groups by size or time.
This package is part of the Alepha framework and can be installed via the all-in-one package:
npm install alepha
This module allows you to group multiple asynchronous operations into a single "batch," which is then processed together. This is an essential pattern for improving performance, reducing I/O, and interacting efficiently with rate-limited APIs or databases.
import { Alepha, $hook, run, t } from "alepha";
import { $batch } from "alepha/batch";
class LoggingService {
// define the batch processor
logBatch = $batch({
schema: t.string(),
maxSize: 10,
maxDuration: [5, "seconds"],
handler: async (items) => {
console.log(`[BATCH LOG] Processing ${items.length} events:`, items);
},
});
// example of how to use it
onReady = $hook({
on: "ready",
handler: async () => {
this.logBatch.push("Application started.");
this.logBatch.push("User authenticated.");
// ... more events pushed from elsewhere in the app
},
});
}
This module can be imported and used as follows:
import { Alepha, run } from "alepha";
import { AlephaBatch } from "alepha/batch";
const alepha = Alepha.create()
.with(AlephaBatch);
run(alepha);
Descriptors are functions that define and configure various aspects of your application. They follow the convention of starting with $
and return configured descriptor instances.
For more details, see the Descriptors documentation.
Creates a batch processing descriptor for efficient grouping and processing of multiple operations.
This descriptor provides a powerful batching mechanism that collects multiple individual items and processes them together in groups, significantly improving performance by reducing overhead and enabling bulk operations. It supports partitioning, concurrent processing, automatic flushing, and intelligent retry mechanisms for robust batch processing workflows.
Key Features
Use Cases
Perfect for optimizing high-throughput operations:
Basic database batch operations:
import { $batch } from "alepha/batch";
import { t } from "alepha";
class UserService {
userBatch = $batch({
schema: t.object({
id: t.string(),
name: t.string(),
email: t.string(),
createdAt: t.optional(t.string())
}),
maxSize: 50, // Process up to 50 users at once
maxDuration: [5, "seconds"], // Or flush every 5 seconds
handler: async (users) => {
// Bulk insert users - much faster than individual inserts
console.log(`Processing batch of ${users.length} users`);
const result = await this.database.users.insertMany(users.map(user => ({
...user,
createdAt: user.createdAt || new Date().toISOString()
})));
console.log(`Successfully inserted ${result.length} users`);
return { inserted: result.length, userIds: result.map(r => r.id) };
}
});
async createUser(userData: { name: string; email: string }) {
// Individual calls are automatically batched
const result = await this.userBatch.push({
id: generateId(),
name: userData.name,
email: userData.email
});
return result; // Returns the batch result once batch is processed
}
}
API call batching with partitioning:
class NotificationService {
notificationBatch = $batch({
schema: t.object({
userId: t.string(),
type: t.enum(["email", "sms", "push"]),
message: t.string(),
priority: t.enum(["high", "normal", "low"])
}),
maxSize: 100,
maxDuration: [10, "seconds"],
// Partition by notification type for different processing
partitionBy: (notification) => notification.type,
concurrency: 3, // Process up to 3 different types simultaneously
handler: async (notifications) => {
const type = notifications[0].type; // All items in batch have same type
console.log(`Processing ${notifications.length} ${type} notifications`);
switch (type) {
case 'email':
return await this.emailProvider.sendBulk(notifications.map(n => ({
to: n.userId,
subject: 'Notification',
body: n.message,
priority: n.priority
})));
case 'sms':
return await this.smsProvider.sendBulk(notifications.map(n => ({
to: n.userId,
message: n.message
})));
case 'push':
return await this.pushProvider.sendBulk(notifications.map(n => ({
userId: n.userId,
title: 'Notification',
body: n.message,
priority: n.priority
})));
}
}
});
async sendNotification(userId: string, type: 'email' | 'sms' | 'push', message: string, priority: 'high' | 'normal' | 'low' = 'normal') {
// Notifications are automatically batched by type
return await this.notificationBatch.push({
userId,
type,
message,
priority
});
}
}
Log aggregation with retry logic:
class LoggingService {
logBatch = $batch({
schema: t.object({
timestamp: t.number(),
level: t.enum(["info", "warn", "error"]),
message: t.string(),
metadata: t.optional(t.record(t.string(), t.any())),
source: t.string()
}),
maxSize: 1000, // Large batches for log efficiency
maxDuration: [30, "seconds"], // Longer duration for log aggregation
concurrency: 2, // Limit concurrent log shipments
retry: {
maxAttempts: 5,
delay: [2, "seconds"],
backoff: "exponential"
},
handler: async (logEntries) => {
console.log(`Shipping ${logEntries.length} log entries`);
try {
// Ship logs to external service (e.g., Elasticsearch, Splunk)
const response = await this.logShipper.bulkIndex({
index: 'application-logs',
body: logEntries.map(entry => ([
{ index: { _index: 'application-logs' } },
{
...entry,
'@timestamp': new Date(entry.timestamp).toISOString()
}
])).flat()
});
if (response.errors) {
console.error(`Some log entries failed to index`, response.errors);
// Retry will be triggered by throwing
throw new Error(`Failed to index ${response.errors.length} log entries`);
}
console.log(`Successfully shipped ${logEntries.length} log entries`);
return { shipped: logEntries.length, indexedAt: Date.now() };
} catch (error) {
console.error(`Failed to ship logs batch`, error);
throw error; // Trigger retry mechanism
}
}
});
async log(level: 'info' | 'warn' | 'error', message: string, metadata?: Record<string, any>, source: string = 'application') {
// Individual log calls are batched and shipped efficiently
return await this.logBatch.push({
timestamp: Date.now(),
level,
message,
metadata,
source
});
}
}
File processing with dynamic partitioning:
class FileProcessingService {
fileProcessingBatch = $batch({
schema: t.object({
filePath: t.string(),
fileType: t.enum(["image", "video", "document"]),
processingOptions: t.object({
quality: t.optional(t.enum(["low", "medium", "high"])),
format: t.optional(t.string()),
compress: t.optional(t.boolean())
}),
priority: t.enum(["urgent", "normal", "background"])
}),
maxSize: 20, // Smaller batches for file processing
maxDuration: [2, "minutes"], // Reasonable time for file accumulation
// Partition by file type and priority for optimal resource usage
partitionBy: (file) => `${file.fileType}-${file.priority}`,
concurrency: 4, // Multiple concurrent processing pipelines
retry: {
maxAttempts: 3,
delay: [5, "seconds"]
},
handler: async (files) => {
const fileType = files[0].fileType;
const priority = files[0].priority;
console.log(`Processing ${files.length} ${fileType} files with ${priority} priority`);
try {
const results = [];
for (const file of files) {
const result = await this.processFile(file.filePath, file.fileType, file.processingOptions);
results.push({
originalPath: file.filePath,
processedPath: result.outputPath,
size: result.size,
duration: result.processingTime
});
}
// Update database with batch results
await this.updateProcessingStatus(results);
console.log(`Successfully processed ${files.length} ${fileType} files`);
return {
processed: files.length,
fileType,
priority,
totalSize: results.reduce((sum, r) => sum + r.size, 0),
results
};
} catch (error) {
console.error(`Batch file processing failed for ${fileType} files`, error);
throw error;
}
}
});
async processFile(filePath: string, fileType: 'image' | 'video' | 'document', options: any, priority: 'urgent' | 'normal' | 'background' = 'normal') {
// Files are automatically batched by type and priority
return await this.fileProcessingBatch.push({
filePath,
fileType,
processingOptions: options,
priority
});
}
}
FAQs
Efficiently process operations in groups by size or time.
We found that @alepha/batch demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
An opt-in lazy import keyword aims to speed up Python startups, especially CLIs, without the ecosystem-wide risks that sank PEP 690.
Security News
Socket CEO Feross Aboukhadijeh discusses the recent npm supply chain attacks on PodRocket, covering novel attack vectors and how developers can protect themselves.
Security News
Maintainers back GitHub’s npm security overhaul but raise concerns about CI/CD workflows, enterprise support, and token management.