
Product
Introducing Webhook Events for Alert Changes
Add real-time Socket webhook events to your workflows to automatically receive software supply chain alert changes in real time.
Highly performant JavaScript data stream ETL engine.
Bellboy streams input data row by row. Every row, in turn, goes through user-defined function where it can be transformed. When enough data is collected in batch, it is being loaded to destination.
Before install, make sure you are using latest version of Node.js.
npm install bellboy
If you will be using bellboy with the native msnodesqlv8 driver, add it as a dependency.
npm install msnodesqlv8
This example shows how bellboy can extract rows from the Excel file, modify it on the fly, load to the Postgres database, move processed file to the other folder and process remaining files.
Just in five simple steps.
const bellboy = require("bellboy");
const fs = require("fs");
const path = require("path");
(async () => {
const srcPath = `C:/source`;
// 1. create a processor which will process
// Excel files in the folder one by one
const processor = new bellboy.ExcelProcessor({
path: srcPath,
hasHeader: true,
});
// 2. create a destination which will add a new 'status'
// field to each row and load processed data into a Postgres database
const destination = new bellboy.PostgresDestination({
connection: {
user: "user",
password: "password",
host: "localhost",
database: "bellboy",
},
table: "stats",
recordGenerator: async function* (record) {
yield {
...record.raw.obj,
status: "done",
};
},
});
// 3. create a job which will glue the processor and the destination together
const job = new bellboy.Job(processor, [destination]);
// 4. tell bellboy to move the file away as soon as it was processed
job.on("endProcessingStream", async (file) => {
const filePath = path.join(srcPath, file);
const newFilePath = path.join(`./destination`, file);
await fs.renameSync(filePath, newFilePath);
});
// 5. Log all error events
job.onAny(async (eventName, ...args) => {
if (eventName.includes("Error")) {
console.log(args);
}
});
// 6. run your job
await job.run();
})();
A job in bellboy is a relationship link between processor and destinations. When the job is run, data processing and loading mechanism will be started.
To initialize a Job instance, pass processor and some destination(s).
const job = new bellboy.Job(
processor_instance,
[destination_instance],
(job_options = {})
);
Reporter[]stringasync function()function(event, async function listener)function(async function listener)function(errorMessage?)errorMessage is passed, job will throw an error with this message.Event listeners, which can be registered with job.on or job.onAny methods, allow you to listen to specific events in the job lifecycle and to interact with them.
.on will always be executed first, regardless of the order in which they were added compared to .onAny. This ensures that specific event listeners have priority over generic ones.job.stop() method can be used inside a listener to stop job execution and throw an error if needed.job.on(
"startProcessing",
async (processor: IProcessor, destinations: IDestination[]) => {
// Job has started execution.
}
);
job.on("startProcessingStream", async (...args: any) => {
// Stream processing has been started.
// Passed parameters may vary based on specific processor.
});
job.on("startProcessingRow", async (row: any) => {
// Row has been received and is about to be processed inside `recordGenerator` method.
});
job.on("rowGenerated", async (destinationIndex: number, generatedRow: any) => {
// Row has been generated using `recordGenerator` method.
});
job.on(
"rowGenerationError",
async (destinationIndex: number, row: any, error: any) => {
// Record generation (`recordGenerator` method) has thrown an error.
}
);
job.on('endProcessingRow', async ()) => {
// Row has been processed.
});
job.on("transformingBatch", async (destinationIndex: number, rows: any[]) => {
// Batch is about to be transformed inside `batchTransformer` method.
});
job.on(
"transformedBatch",
async (destinationIndex: number, transformedRows: any) => {
// Batch has been transformed using`batchTransformer` method.
}
);
job.on(
"transformingBatchError",
async (destinationIndex: number, rows: any[], error: any) => {
// Batch transformation (`batchTransformer` method) has thrown an error.
}
);
job.on("endTransformingBatch", async (destinationIndex: number) => {
// Batch has been transformed.
});
job.on("loadingBatch", async (destinationIndex: number, data: any[]) => {
// Batch is about to be loaded into destination.
});
job.on(
"loadedBatch",
async (destinationIndex: number, data: any[], result: any) => {
// Batch has been loaded into destination.
}
);
job.on(
"loadingBatchError",
async (destinationIndex: number, data: any[], error: any) => {
// Batch load has failed.
}
);
job.on("endLoadingBatch", async (destinationIndex: number) => {
// Batch load has finished .
});
job.on("endProcessingStream", async (...args: any) => {
// Stream processing has finished.
// Passed parameters may vary based on specific processor.
});
job.on("processingError", async (error: any) => {
// Unexpected error has occured.
});
job.on("endProcessing", async () => {
// Job has finished execution.
});
Special listener can be registered using job.onAny method which will listen for any previously mentioned event.
job.onAny(async (eventName: string, ...args: any) => {
// An event has been fired.
});
Sometimes more information about event is needed, especially if you are building custom reporter to log or trace fired events.
This information can be obtained by registering an async function as a third parameter with job.on method or as a second parameter with job.onAny method.
For example,
job.on("rowGenerated", undefined, async (event: IBellboyEvent) => {
// Row has been generated using `recordGenerator` method.
console.log(
`${event.jobName} has generated row for #${event.eventArguments.destinationIndex} destination`
);
});
or
job.onAny(undefined, async (event: IBellboyEvent) => {
console.log(`${event.jobName} has fired ${event.jobEvent}`);
});
stringanystring?stringstringnumberbooleanEach processor in bellboy is a class which has a single responsibility of processing data of specific type -
number0 is passed, all records will be processed.Listens for messages and processes them one by one. It also handles backpressure by queuing messages, so all messages can be eventually processed.
string requiredstring[] requiredProcesses data received from a HTTP call. Can process json, xml as well as delimited data. Can handle pagination by using nextRequest function.
For delimited data produces rows described here.
object requireddelimited | json | xml requiredstring required for delimitedstring only for delimitedboolean only for delimitedtrue, first row will be processed as a header.string only for delimitedstring only for delimitedRegExp | stringobject only for xmlobject
header or query.header or query using applyTo option.async function(header)connection for the next request or null if the next request is not needed.const processor = new bellboy.HttpProcessor({
nextRequest: async function () {
if (currentPage < pageCount) {
return {
...connection,
url: `${url}¤t_page=${currentPage + 1}`,
};
}
return null;
},
// ...
});
Used for streaming text data from files in directory. There are currently four types of directory processors - ExcelProcessor, JsonProcessor, DelimitedProcessor and TailProcessor. Such processors search for the files in the source directory and process them one by one.
File name (file) and full file path (filePath) parameters will be passed to startProcessingStream event.
stringRegExpstring[]filePattern regex and processed in alphabetical order.Processes XLSX files in the directory.
boolean | numberfalse by default. 0-based row location can be passed to this option if header is not located on the first row.booleantrue, merged cells wil have the same value (by default, only the first cell of merged cells is filled with value). false by default.booleantrue by default.(string | number)[] | async function(sheets)stringconst processor = new bellboy.ExcelProcessor({
// process last sheet
sheets: async (sheets) => {
const sheet = sheets[sheets.length - 1];
return [sheet.name];
},
// ...
});
To see how processed row will look like, proceed to xlstream library documentation which is used for Excel processing.
Processes JSON files in the directory.
RegExp | string. as separator and then tested against provided regular expression. If not specified, a root array will be streamed. As an example, if you have this JSON object:{ "animals": { "dogs": [ "pug", "bulldog", "poodle" ] } }dogs array, path you will need to use is /animals.dogs.(\d+)/ if using RegExp as jsonPath and animals.dogs.(\\d+) if a string is used.(\d+) is used here because each index of the array is a number.Processes files with delimited data in the directory.
string requiredstringbooleantrue, first row will be processed as a header.string string only for delimitedstring[]hasHeader is true, first row will appear here.stringdelimiter and qualifier.stringhasHeader is true, object with header elements as keys will appear here.stringWatches for file changes and outputs last part of file as soon as new lines are added to the file.
booleanfalse by default.stringstringProcesses a PostgreSQL SELECT query row by row.
string requiredobject required
Processes a MySQL SELECT query row by row.
string requiredobject required
Processes a Firebird SELECT query row by row.
string requiredobject required
Processes a MSSQL SELECT query row by row.
string requiredobject required
Here is an example of how to configure MssqlProcessor with a native TDS driver instead of the default pure JavasScript Tedious driver.
const nativeDriver: ITdsDriver = await import("mssql/msnodesqlV8");
const connection: IMssqlDbConnection = {
user: "user",
password: "password",
server: "server",
database: "database",
driver: nativeDriver,
};
const source = new MssqlProcessor({
connection,
query: "select * from orders",
});
In previous versions of bellboy, connection.driver was a string parameter.
Processor which generates records on the fly. Can be used to define custom data processors.
async generator function required// processor which generates 10 records dynamically
const processor = new bellboy.DynamicProcessor({
generator: async function* () {
for (let i = 0; i < 10; i++) {
yield i;
}
},
});
Every job can have as many destinations (outputs) as needed. For example, one job can load processed data into a database, log this data to stdout and post it by HTTP simultaneously.
booleantrue, no data will be loaded to the destination. In combination with reporters, this option can become handy during testing process.number0 is passed, all records will be processed.async generator function(row)async function(rows)batchSize. Data is being loaded to destination immediately after this function has been executed.Logs out all data to stdout (console).
booleantrue, data will be printed as table.Puts processed data one by one in body and executes specified HTTP request.
requiredobject
header or query.header or query using applyTo option.Inserts data to PostgreSQL.
string requiredstring[]UPSERT command will be executed based on provided constraints.object required
Inserts data to MySQL.
string requiredobject required
boolean true, only the columns in the source data will be used for data load. Default is false, using all destination table columns.string result object of the loadedBatch event.Inserts data to MSSQL.
string requiredobject required
Here is an example of how to configure MssqlDestination with a native TDS driver instead of the default pure JavasScript Tedious driver.
const nativeDriver: ITdsDriver = await import("mssql/msnodesqlV8");
const connection: IMssqlDbConnection = {
user: "user",
password: "password",
server: "server",
database: "database",
driver: nativeDriver,
};
const sink = new MssqlDestination({
connection,
table: "orders",
batchSize: 1000,
});
New processors and destinations can be made by extending existing ones. Feel free to make a pull request if you create something interesting.
To create a new processor, you must extend Processor class and implement async process function. This function accepts one parameter:
async function(readStream, ...args) requiredjob instance will handle passed stream internally. Passed parameters (args) will be emitted with startProcessingStream event during job execution.class CustomProcessor extends bellboy.Processor {
async process(processStream) {
// await processStream(readStream, 'hello', 'world');
}
}
To create a new destination, you must extend Destination class and implement async loadBatch function. This function accepts one parameter:
any[] requiredclass CustomDestination extends bellboy.Destination {
async loadBatch(data) {
console.log(data);
}
}
Reporter is a job wrapper which can operate with job instance (for example, listen to events using job on method). To create a new reporter, you must extend Reporter class and implement report function, which will be executed during job instance initialization.
Reporter event listeners (on, onAny) are added before any other user-defined listeners.
This function accepts one parameter:
Job requiredclass CustomReporter extends bellboy.Reporter {
report(job) {
job.on("startProcessing", undefined, async ({ jobName }) => {
console.log(`Job ${jobName} has been started.`);
});
}
}
Tests can be run by using docker compose up --abort-on-container-exit --exit-code-from test --build test command.
FAQs
Highly performant JavaScript data stream ETL engine.
The npm package bellboy receives a total of 268 weekly downloads. As such, bellboy popularity was classified as not popular.
We found that bellboy demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Add real-time Socket webhook events to your workflows to automatically receive software supply chain alert changes in real time.

Security News
ENISA has become a CVE Program Root, giving the EU a central authority for coordinating vulnerability reporting, disclosure, and cross-border response.

Product
Socket now scans OpenVSX extensions, giving teams early detection of risky behaviors, hidden capabilities, and supply chain threats in developer tools.