Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
ETL toolkit which supports RxJS streams, error handling, business rules and many more
ETL-Gun is a platform that employs RxJs observables, allowing developers to build stream-based ETL (Extract, Transform, Load) pipelines complete with buffering, error handling and many useful features.
ETL-Gun is a simple ETL glue represented as an extention to the RxJs library. Typically, you'd use ETL-Gun to help with ETL processes. It can extract data from the one or more sources, transform it and load to one or more destinations in nedded order.
You can use javascript and typescript with it.
ETL-Gun will NOT help you with "big data" - it executes on the one computer and is not supports clustering from the box.
Here's some ways to use it:
You can find many examples of using ETL-Gun in the API Reference section of this file.
npm install etl-gun
or
yarn add etl-gun
Info: You can get the ready to use blank example project in the example-project folder of this repository.
Warning: Since the version 2.0.4 this library is native ESM and no longer provides a CommonJS export. If your project uses CommonJS, you will have to convert to ESM or use the dynamic import()
function.
Introductory example of library using: postgresql -> .csv
import { map } from "rxjs";
import { Csv, GuiManager, Header, Postgres, log, push, run } from "etl-gun";
// If you want to view GUI, uncomment the next line of code
// new GuiManager();
// Step 1: endpoint creation
const postgres = new Postgres.Endpoint("postgres://user:password@127.0.0.1:5432/database");
const source = postgres.getTable('users');
const csv = new Csv.Endpoint('./dest-folder');
const dest = csv.getFile('users.scv');
const header = new Header("id", "name", "login", "email");
// Step 2: transformation streams creation
const sourceToDest$ = source.select().pipe(
log(),
map(v => header.objToArr(v)),
push(dest)
);
// Step 3: runing transformations (and wait until they finish, if necessary)
await run(sourceToDest$);
ETL-Gun contains several main concepts:
Using of this library consists of 3 steps:
ETL process:
Chaining:
Chaning of data transformation performs with pipe() method of the input data stream. Chaning of several streams performs by using await with run() procedure.
import { Postgres, Csv, Header, log, push, run } from "etl-gun";
import { map } from "rxjs";
const postgres = new Postgres.Endpoint("postgres://user:password@127.0.0.1:5432/database");
const source = postgres.getTable('users');
const csv = new Csv.Endpoint('./dest-folder');
const dest = csv.getFile('users.scv');
const header = new Header("id", "name", "login", "email");
const sourceToDest$ = source.select().pipe(
log(),
map(v => header.objToArr(v)),
push(dest)
);
await run(sourceToDest$);
import * as etl from "etl-gun";
const csvEndpoint = new etl.Csv.Endpoint();
const csv = csvEndpoint.getFile('users.scv');
const memory = new etl.Memory.Endpoint();
const buffer = memory.getBuffer('buffer 1');
const scvToBuffer$ = csv.select().pipe(
etl.push(buffer)
);
const bufferToCsv$ = buffer.select().pipe(
etl.push(csv)
);
await etl.run(scvToBuffer$);
buffer.sort((row1, row2) => row1[0] > row2[0]);
await csv.delete();
await etl.run(bufferToCsv$)
import * as etl from "etl-gun";
const telegram = new etl.Telegram.Endpoint();
const bot = telegram.startBot('bot 1', process.env.TELEGRAM_BOT_TOKEN!);
const translator = new etl.GoogleTranslateHelper(process.env.GOOGLE_CLOUD_API_KEY!, 'en', 'ru');
const startTelegramBot$ = bot.select().pipe(
etl.log(), // log user messages to the console
translator.operator([], [message]), // translate 'message' field
etl.push(bot) // echo input message back to the user
);
etl.run(startTelegramBot$);
Base class for all collections. Declares public interface of collection and implements event mechanism.
Methods:
// Read elements from the collection and create data stream to process it
// where: condition of the element selection
select(where?: any): Observable<any>;
// Add value to the collection (usually to the end of stream)
// value: what will be added to the collection
async insert(value: any);
// Update collection elements
// where: condition of the element selection
// value: what will be added to the collection
async update(where: any, value: any);
// Clear data of the collection
// where: condition of the element selection
async delete(where?: any);
// Add listener of specified event
// event: which event we want to listen, see below
// listener: callback function to handle events
on(event: CollectionEvent, listener: (...data: any[]) => void);
// Readable/writable property wich contains errors collection instance for this collection
errors: Errors.ErrorsQueue;
// Calls select() method of errors collection
selectErrors(stopOnEmpty: boolean = false): BaseObservable<EtlError>;
Types:
export type CollectionEvent =
"select.start" | // fires at the start of stream
"select.end" | // at the end of stream
"select.recive" | // for every data value in the stream
"select.error" | // on error
"select.skip" | // when the collection skip some data
"select.up" | // when the collection go to the parent element while the tree data processing
"select.down" | // when the collection go to the child element while the tree data processing
"pipe.start" | // when processing of any collection element was started
"pipe.end" | // when processing of one collection element was ended
"insert" | // when data is inserted to the collection
"update" | // when data is updated in the collection
"delete"; // when data is deleted from the collection
Store and process etl errors. Every collection by default has errors property wich contains collection of errors from collection etl process. You can cancel default errors collection creation for any collection, and specify your own manualy created error collection.
// Creates new errors collection
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getCollection(collectionName: string, options: CollectionOptions<EtlError> = {}): ErrorsQueue;
// Release errors collection object
// collectionName: identificator of the releasing collection object
releaseCollection(collectionName: string);
Queue in memory to store etl errors and process thea. Should be created with getCollection method of Errors.Endpoint
Methods:
// Create the observable object and send errors data from the queue to it
// stopOnEmpty: is errors processing will be stopped when the queue is empty
select(stopOnEmpty: boolean = false): BaseObservable<EtlError>;
// Pushes the error to the queue
// error: what will be added to the queue
async insert(error: EtlError);
// Clear queue
async delete();
Create and manipulate with collections of objects in memory.
// Creates new memory buffer. This is a generic method so you can specify type of data which will be stored in
// collectionName: identificator of the creating collection object
// values: initial data
// guiOptions: Some options how to display this endpoint
getBuffer<T>(collectionName: string, values: T[] = [], guiOptions: CollectionGuiOptions<T> = {}): BufferCollection;
// Release buffer data
// collectionName: identificator of the releasing collection object
releaseBuffer(collectionName: string);
getQueue<T>(collectionName: string, values: T[] = [], guiOptions: CollectionGuiOptions<T> = {}): QueueCollection;
releaseQueue(collectionName: string);
Buffer to store values in memory and perform complex operations on it. Should be created with getBuffer method of MemoryEndpoint
Methods:
// Create the observable object and send data from the buffer to it
select(): Observable<T>;
// Pushes the value to the buffer
// value: what will be added to the buffer
async insert(value: T);
// Clear endpoint data buffer
async delete();
// Sort buffer data
// compareFn: You can spacify the comparison function which returns number
// (for example () => v1 - v2, it is behaviour equals to Array.sort())
// or which returns boolean (for example () => v1 > v2)
sort(compareFn: (v1: T, v2: T) => number | boolean);
// This function is equals to Array.forEach
forEach(callbackfn: (value: T, index: number, array: T[]) => void);
Example:
import * as etl from "etl-gun";
const csvEndpoint = new etl.Csv.Endpoint();
const csv = csvEndpoint.getFile('users.scv');
const memory = new etl.Memory.Endpoint();
const buffer = memory.getBuffer('buffer 1');
const scvToBuffer$ = csv.select().pipe(
etl.push(buffer);
)
const bufferToCsv$ = buffer.select().pipe(
etl.push(csv)
)
await etl.run(scvToBuffer$);
buffer.sort((row1, row2) => row1[0] > row2[0]);
await csv.delete();
etl.run(bufferToCsv$)
Queue to store values in memory and perform ordered processing of it. Should be created with getQueue method of MemoryEndpoint
Methods:
// Create the observable object wich send process queue elements one by one and remove processed element from queue
// dontStopOnEmpty - do we need stop queue processing (unsubscribe) when the queue will be empty
// interval - pause between elements processing, in milliseconds
select(dontStopOnEmpty: boolean = false, interval: number = 0): Observable<T>;
// Pushes the value to the queue
// value: what will be added to the queue
async insert(value: T);
// Clear queue
async delete();
Search for files and folders with standart unix shell wildcards see glob documentation for details.
Methods:
// rootFolder: full or relative path to the folder of intetest
constructor(rootFolder: string);
// Creates new FilesystemCollection
// folderName: subfolder of the root folder and identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getFolder(folderName: string = '.', guiOptions: CollectionGuiOptions<PathDetails> = {}): Collection;
// Release FilesystemCollection
// folderName: identificator of the releasing collection object
releaseFolder(folderName: string);
Methods:
// Create the observable object and send files and folders information to it
// mask: search path mask in glob format (see glob documentation)
// for example:
// *.js - all js files in root folder
// **/*.png - all png files in root folder and subfolders
// options: Search options, see below
select(mask: string = '*', options?: ReadOptions): BaseObservable<PathDetails>;
// Create folder or file
// pathDetails: Information about path, which returns from select() method
// filePath: File or folder path
// isFolder: Is it file or folder
// data: What will be added to the file, if it is a file, ignore for folders
async insert(pathDetails: PathDetails, data?: string | NodeJS.ArrayBufferView | Iterable<string | NodeJS.ArrayBufferView> | AsyncIterable<string | NodeJS.ArrayBufferView> | internal.Stream);
async insert(filePath: string, data?: string | NodeJS.ArrayBufferView | Iterable<string | NodeJS.ArrayBufferView> | AsyncIterable<string | NodeJS.ArrayBufferView> | internal.Stream, isFolder?: boolean);
// Clear the root folder by mask
// mask: Which files and folders we need to delete
// options: Search options, see below
// IMPORTANT! Be careful with option includeRootDir because if it is true, and the objectsToSearch is not 'filesOnly',
// then the root folder will be deleted with all its content! Including folder itself.
async delete(mask: string = '*', options?: ReadOptions);
Types:
type ReadOptions = {
includeRootDir?: boolean; // Is root folder itself will be included to search results
// false by default
objectsToSearch?: // Which object types will be included to the search results
'filesOnly' | // Only files
'foldersOnly' | // Only folders
'all'; // Both files and folders
// all is default option
}
type PathDetails = {
isFolder: boolean
name: string;
relativePath: string; // Empty for root folder
fullPath: string;
parentFolderRelativePath: string; // '..' for root folder
parentFolderFullPath: string;
}
Example:
import * as etl from "etl-gun";
import * as rxjs from "rxjs";
const fs = new etl.Filesystem.Endpoint('~');
const scripts = ep.getFolder('scripts');
const printAllJsFileNames$ = scripts.select('**/*.js').pipe(
rx.map(v => v.name)
etl.log()
);
etl.run(printAllJsFileNames$)
Endpoint to access files on ftp and ftps servers. Implementation based on Basic ftp package.
Methods:
// options: specify connection parameters
constructor(options: AccessOptions, verbose: boolean = false);
// Creates new Collection object to get remote folder contents
// folderPath: remote path to the ftp folder and identificator of the creating collection object
// options: Some options how to display this endpoint
getFolder(folderPath: string = '.', options: CollectionOptions<FileInfo> = {}): Collection;
// Release FilesystemCollection
// folderPath: identificator of the releasing collection object
releaseFolder(folderPath: string);
Methods:
// Create the observable object and send files and folders information to it
select(): BaseObservable<FileInfo>;
// Create folder or file.
// remoteFolderPath, remoteFilePath, remotePath: remote path to be created
// localFilePath: Local source file path
// sourceStream: Source stream
// fileContents: String as file contents
async insertFolder(remoteFolderPath: string);
async insertFile(remoteFilePath: string, localFilePath: string);
async insertFile(remoteFilePath: string, sourceStream: Readable);
async insertFileWithContents(remoteFilePath: string, fileContents: string);
// isFolder: flag to indicate want want to add folder or file
// Only one of localFilePath, sourceStream, contents can be specified here
async insert(remotePath: string, contents: { isFolder: boolean, localFilePath?: string, sourceStream?: Readable, contents?: string });
// Delete file or folder with all it's contents
// remoteFolderPath, remoteFilePath, remotePath: Remote path to file or folder we want to delete
async deleteFolder(remoteFolderPath: string);
async deleteEmptyFolder(remoteFolderPath: string); // raise the exception if the specified folder is not empty
async deleteFile(remoteFilePath: string);
async delete(remotePath: string);
Example:
import * as etl from "etl-gun";
import * as rxjs from "rxjs";
const ftp = new etl.filesystems.Ftp.Endpoint({host: process.env.FTP_HOST, user: process.env.FTP_USER, password: process.env.FTP_PASSWORD});
const folder = ftp.getFolder('/var/logs');
const PrintFolderContents$ = folder.select().pipe(
etl.log()
)
await etl.run(PrintFolderContents$);
Endpoint to access files by sftp. Implementation based on ssh2-sftp-client package.
Endpoint to access remote filesystem via WebDAV protocol. Implementation based on webdav package.
Parses source csv file into individual records or write record to the end of destination csv file. Every record is csv-string and presented by array of values.
Methods:
// Create collection object for the specified file
// filename: full or relative name of the csv file and identificator of the creating collection object
// delimiter: delimiter of values in one string of file data, equals to ',' by default
// guiOptions: Some options how to display this endpoint
getFile(filename: string, delimiter: string = ",", guiOptions: CollectionGuiOptions<string[]> = {}): Collection;
// Release collection object
// filename: identificator of the releasing collection object
releaseFile(filename: string);
Methods:
// Create the observable object and send file data to it string by string
// skipFirstLine: skip the first line in the file, useful for skip header
// skipEmptyLines: skip all empty lines in file
select(skipFirstLine: boolean = false, skipEmptyLines = false): Observable<string[]>;
// Add row to the end of file with specified value
// value: what will be added to the file
async insert(value: string[]);
// Clear the csv file
async delete();
Example:
import * as etl from "etl-gun";
const csv = new etl.Csv.Endpoint('~');
const testFile = csv.getFile('test.csv')
const logTestFileRows$ = testFile.select().pipe(
etl.log()
);
etl.run(logTestFileRows$)
Read and write json file with buffering it in memory. You can get objects from json by path specifing in JSONPath format or in lodash simple path manner (see logash 'get' function documentation).
Methods:
// Create collection object for the specified file
// filename: full or relative name of the json file and identificator of the creating collection object
// autosave: save json from memory to the file after every change
// autoload: load json from the file to memory before every get or search operation
// encoding: file encoding
// guiOptions: Some options how to display this endpoint
getFile(filename: string, autosave: boolean = true, autoload: boolean = false, encoding?: BufferEncoding, guiOptions: CollectionGuiOptions<number> = {}): Collection;
// Release collection object
// filename: identificator of the releasing collection object
releaseFile(filename: string);
Methods:
// Find and send to observable child objects by specified path
// path: search path in lodash simple path manner
// jsonPath: search path in JSONPath format
// options: see below
select(path: string, options?: ReadOptions): Observable<any>;
selectByJsonPath(jsonPath: string | string[], options?: ReadOptions): Observable<any>;
// Find and return child object by specified path
// path: search path in lodash simple path manner
// jsonPath: search path in JSONPath format
get(path: string): any;
getByJsonPath(jsonPath: string): any;
// If fieldname is specified, the function find the object by path and add value as its field
// If fieldname is not specified, the function find the array by path and push value to it
// value: what will be added to the json
// path: where value will be added as child, specified in lodash simple path manner
// fieldname: name of the field to which the value will be added,
// and flag - is we add value to array or to object
async insert(value: any, path?: string, fieldname?: string);
// Clear the json file and write an empty object to it
async delete();
// Reload the json to the memory from the file
load();
// Save the json from the memory to the file
save();
Types:
type JsonReadOptions = {
searchReturns?: 'foundedOnly' // Default value, means that only search results objects will be sended to observable by the function
| 'foundedImmediateChildrenOnly' // Only the immidiate children of search results objects will be sended to observable
| 'foundedWithDescendants'; // Recursive send all objects from the object tree of every search result, including search result object itself
addRelativePathAsField?: string; // If specified, the relative path will be added to the sended objects as addRelativePathAsField field
}
Example:
import * as etl from "etl-gun";
import { tap } from "rxjs";
const json = new etl.Json.Endpoint('~');
const testFile = etl.getFile('test.json');
const printJsonBookNames$ = testFile.select('store.book').pipe(
tap(book => console.log(book.name))
);
const printJsonAuthors$ = testFile.selectByJsonPath('$.store.book[*].author', {searchReturns: 'foundedOnly', addRelativePathAsField: "path"}).pipe(
etl.log()
);
await etl.run(printJsonAuthors$, printJsonBookNames$);
# etl.XmlEndpoint(filename, autosave?, autoload?, encoding?)
Read and write XML document with buffering it in memory. You can get nodes from XML by path specifing in XPath format.
Methods:
// Create collection object for the specified file
// filename: full or relative name of the xml file and identificator of the creating collection object
// autosave: save xml from memory to the file after every change
// autoload: load xml from the file to memory before every get or search operation
// encoding: file encoding
// guiOptions: Some options how to display this endpoint
getFile(filename: string, autosave: boolean = true, autoload: boolean = false, encoding?: BufferEncoding, guiOptions: CollectionGuiOptions<string[]> = {}): Collection;
// Release collection object
// filename: identificator of the releasing collection object
releaseFile(filename: string);
Methods:
// Find and send to observable child objects by specified xpath
// xpath: xpath to search
// options: see below
select(xpath: string = '', options: XmlReadOptions = {}): EtlObservable<Node>;
// Find and return child node by specified path
// xpath: search path
get(xpath: string = ''): XPath.SelectedValue
// If attribute is specified, the function find the object by xpath and add value as its attribute
// If attribute is not specified, the function find the node by xpath and push value as its child node
// value: what will be added to the xml
// xpath: where value will be added as child, specified in lodash simple path manner
// attribute: name of the attribute which value will be setted,
// and flag - is we add value as attribute or as node
async insert(value: any, xpath: string = '', attribute: string = '');
// Clear the xml file and write an empty object to it
async delete();
// Reload the xml to the memory from the file
load();
// Save the xml from the memory to the file
save();
Types:
export type XmlReadOptions = {
searchReturns?: 'foundedOnly' // Default value, means that only search results nodes will be sended to observable by the function
| 'foundedImmediateChildrenOnly' // Only the immediate children of search results nodes will be sended to observable
| 'foundedWithDescendants'; // Recursive send all nodes from the tree of every searched result, including searched result node itself
addRelativePathAsAttribute?: string; // If specified, the relative path will be added to the sended nodes as attribute, specified with this value
}
Example
import * as etl from "etl-gun";
import { map } from "rxjs";
const xml = new etl.Xml.Endpoint('/tmp');
const testFile = xml.getFile('test.xml');
const printXmlAuthors$ = testFile.select('/store/book/author').pipe(
map(v => v.firstChild.nodeValue),
etl.log()
);
await etl.run(printXmlAuthors$);
Represents common Knex database. Based on knex engine.
Methods:
constructor(client: ClientType, connectionString: string, pool?: PoolConfig);
constructor(client: ClientType, connectionConfig: ConnectionConfig, pool?: PoolConfig);
constructor(knexConfig: pkg.Knex.Config);
// Create collection object for the specified database table
// table: name of database table and identificator of the creating collection object
// options: Some options how to display this endpoint
getTable<T = Record<string, any>>(table: string, options: CollectionOptions<string[]> = {}): KnexTableCollection<T>;
// Create collection object for the specified sql query result
// collectionName: identificator of the creating collection object
// query: sql query
// options: Some options how to display this endpoint
getQuery<T = Record<string, any>>(collectionName: string, query: string, options: CollectionOptions<string[]> = {}): KnexQueryCollection<T>;
// Release collection object
// table: identificator of the releasing collection object
releaseCollection(collectionName: string);
// Release all collection objects, endpoint object and release connections to database.
async releaseEndpoint();
Presents the table from the database.
Methods:
// Create the observable object and send data from the database table to it
// where: you can filter incoming data by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
select(where: SqlCondition<T>, fields?: string[]): BaseObservable<T>;
select(whereSql?: string, whereParams?: any[], fields?: string[]): BaseObservable<T>;
// Insert value to the database table
// value: what will be added to the database
async insert(value: T): Promise<number[]>;
async insert(values: T[]): Promise<number[]>;
// Update all rows in database table which match to the specified condition
// where: you can filter table rows to deleting by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
// value: what will be set as new value for updated rows
async update(value: T, where: SqlCondition<T>): Promise<number>;
async update(value: T, whereSql?: string, whereParams?: any[]): Promise<number>;
// Update all rows in database table which match to the specified condition
// where: you can filter table rows to deleting by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
// value: what will be set as new value for updated rows
async upsert(value: T): Promise<number[]>;
// Delete rows from the database table by condition
// where: you can filter table rows to deleting by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
async delete(where: SqlCondition<T>): Promise<number>;
async delete(whereSql?: string, whereParams?: any[]): Promise<number>;
Readonly collection of sql query results.
Methods:
// Create the observable object and send data from the database table to it
// where: you can filter incoming data by this parameter
// it can be SQL where clause
// or object with fields as collumn names
// and its values as needed collumn values
select(params?: any[]): BaseObservable<T>;
Represents CockroachDB database. Endpoint implementation based on KnexEndpoint. You should install node-postgres (aka 'pg') package module to use this endpoint!
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.CockroachDb.Endpoint('postgres://user:password@127.0.0.1:5432/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Represents MariaDB database. Endpoint implementation based on KnexEndpoint. You should install mysql package module to use this endpoint!
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.MariaDb.Endpoint('mysql://user:password@127.0.0.1:3306/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Represents MS SQL Server database. Endpoint implementation based on KnexEndpoint. You should install tedious package module to use this endpoint!
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.SqlServer.Endpoint('mssql://user:password@127.0.0.1:1433/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Represents MySQL database. Endpoint implementation based on KnexEndpoint. You should install mysql package module to use this endpoint!
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.MySql.Endpoint('mysql://user:password@127.0.0.1:3306/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Represents Oracle database. Endpoint implementation based on KnexEndpoint. You should install oracledb package module to use this endpoint!
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.OracleDb.Endpoint({
host: config.oracle.host,
user: config.oracle.user,
password: config.oracle.password,
database: config.oracle.database,
});
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Represents PostgreSQL database. Endpoint implementation based on KnexEndpoint. You should install node-postgres (aka 'pg') package module to use this endpoint!
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.Postgres.Endpoint('postgres://user:password@127.0.0.1:5432/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Represents Amazone Redshift database. Endpoint implementation based on KnexEndpoint. You should install node-postgres (aka 'pg') package module to use this endpoint!
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.Redshift.Endpoint('postgres://user:password@127.0.0.1:5439/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Represents SQLite3 database. Endpoint implementation based on KnexEndpoint. You should install sqlite3 package module to use this endpoint!
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.SqlLite.Endpoint(connection: {
filename: "./mydb.sqlite"
});
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Endpoint to send and recive emails. Implements SMTP protocol and based on nodemailer library.
Methods:
constructor(connectionOptions: ConnectionOptions);
async releaseEndpoint();
async send(to: string[] | string, subject: string, body: string, cc?: string[] | string, bcc?: string[] | string, from?: string): Promise<SendError | undefined>;
async send(value: EMail): Promise<SendError | undefined>;
getInbox(options: CollectionOptions<EMail> = {}): Collection;
releaseInbox();
getMailbox(mailBox: string, options: CollectionOptions<EMail> = {}): Collection;
releaseMailbox(mailBox: string);
Presents emails mailbox.
Methods:
// Create the observable object and send product data from the Magento to it
select(): BaseObservable<EMail>;
select(searchOptions: SearchOptions, markSeen?: boolean): BaseObservable<EMail>;
select(range: string, markSeen?: boolean): BaseObservable<EMail>;
select(searchCriteria: any[], markSeen?: boolean ): BaseObservable<EMail>;
async get(UID: string | number, markSeen: boolean = false): Promise<EMail>;
Example:
import * as etl from "etl-gun";
const gmail = new etl.messangers.Gmail.Endpoint(process.env.GMAIL_USER!, process.env.GMAIL_PASSWORD!);
const inbox = gmail.getInbox();
const PrintMails$ = inbox.select({seen: false}).pipe(
rx.take(10),
etl.log()
)
await etl.run(PrintMails$);
const mail = await inbox.get(1463);
console.log(mail);
Endpoint to work with Gmail service. Based on GeneralEmail endpoint.
You should to get the application password from Gmail service to use this endpoint. Follow this steps to get it:
Methods:
constructor(userEmail: string, appPassword: string);
async releaseEndpoint();
async send(to: string[] | string, subject: string, body: string, cc?: string[] | string, bcc?: string[] | string, from?: string): Promise<SendError | undefined>;
async send(value: EMail): Promise<SendError | undefined>;
getInbox(options: CollectionOptions<EMail> = {}): Collection;
releaseInbox();
getMailbox(mailBox: string, options: CollectionOptions<EMail> = {}): Collection;
releaseMailbox(mailBox: string);
Presents emails mailbox.
Methods:
// Create the observable object and send product data from the Magento to it
select(): BaseObservable<EMail>;
select(searchOptions: SearchOptions, markSeen?: boolean): BaseObservable<EMail>;
select(range: string, markSeen?: boolean): BaseObservable<EMail>;
select(searchCriteria: any[], markSeen?: boolean ): BaseObservable<EMail>;
async get(UID: string | number, markSeen: boolean = false): Promise<EMail>;
Example:
import * as etl from "etl-gun";
const gmail = new etl.messangers.Gmail.Endpoint(process.env.GMAIL_USER!, process.env.GMAIL_PASSWORD!);
const inbox = gmail.getInbox();
const PrintMails$ = inbox.select({seen: false}).pipe(
rx.take(10),
etl.log()
)
await etl.run(PrintMails$);
const mail = await inbox.get(1463);
console.log(mail);
Endpoint to work with http://sms.ru service.
You should have the account on this service to use it.
Methods:
constructor(apiId: string);
async send(message: string, toPhone: string, from?: string): Promise<SendError | undefined>;
async send(message: string, toPhones: string[], from?: string): Promise<SendError | undefined>;
Example:
import * as etl from "etl-gun";
const sms = new etl.messangers.SmsRu();
const err = await sms.sendSms('hello', '123-45-67');
if (err) console.log(err); // log error if any
Presents Magento CMS objects. Go to https://meetanshi.com/blog/create-update-product-using-rest-api-in-magento-2/ for details how to configure Magento integration to get access to it's API.
Methods:
// magentoUrl: Url of Magento
// login: admin login
// password: admin password
// rejectUnauthorized: You can set it to true to ignore ssl servificate problems while development.
constructor(magentoUrl: string, login: string, password: string, rejectUnauthorized: boolean = true);
// Create collection object for the Magento products
// guiOptions: Some options how to display this endpoint
getProducts(guiOptions: CollectionGuiOptions<Partial<Product>> = {}): ProductsCollection;
// Release products collection object
releaseProducts();
Presents Magento CMS products.
Methods:
// Create the observable object and send product data from the Magento to it
// where: you can filter products by specifing object with fields as collumn names and it's values as fields values
// fields: you can select which products fields will be returned (null means 'all fields')
select(where: Partial<Product> = {}, fields: (keyof Product)[] = null): BaseObservable<Partial<Product>> ;
// Add new product to the Magento
// value: product fields values
async insert(value: NewProductAttributes);
// Upload image to the magento and set it as image of specified product and returns total count of images for this product
// product: product sku
// imageContents: binary form of the image file
// filename: name of the file in with magento will store the image
// label: label of the product image
// type: mime type of the image
async uploadImage(product: {sku: string} | string, imageContents: Blob, filename: string, label: string, type: "image/png" | "image/jpeg" | string): Promise<number>;
// Operator to upload product image from the pipe
uploadImageOperator<T>(func: (value: T) => {product: {sku: string} | string, imageContents: Blob, filename: string, label: string, type: "image/png" | "image/jpeg" | string}): OperatorFunction<T, T>;
// Utility static function to get products as array
static async getProducts(endpoint: Endpoint, where: Partial<Product> = {}, fields: (keyof Product)[] = null): Promise<Partial<Product>[]>;
Example:
import * as etl from "etl-gun";
const magento = new etl.Magento.Endpoint('https://magento.test', process.env.MAGENTO_LOGIN!, process.env.MAGENTO_PASSWORD!);
const products = magento.getProducts();
const logProductsWithPrice100$ = products.select({price: 100}).pipe(
etl.log()
);
etl.run(logProductsWithPrice100$)
Presents Magento CMS stock items. Stock items - is products on stock.
Methods:
// Create the observable object and send stock items data from the Magento to it
// sku, product: you can filter stock items by product attributes
select(sku: string): BaseObservable<StockItem>;
select(product: Partial<Product>): BaseObservable<StockItem>;
// Get stock item for specified product
// sku, product: product, wich stock items we need to get
public async getStockItem(sku: string): Promise<StockItem>;
public async getStockItem(product: {sku: string}): Promise<StockItem>;
// Update product stock quantity
public async updateStockQuantity(sku: string, quantity: number);
public async updateStockQuantity(product: {sku: string}, quantity: number);
Presents Trello task tracking system objects. For details how to get API key and authorization token please read Trello documentation.
Methods:
// url: Trello web url
// apiKey: Trello API key
// authToken: Trello authorization token
// rejectUnauthorized: You can set it to true to ignore ssl servificate problems while development.
constructor(apiKey: string, authToken: string, url: string = "https://trello.com", rejectUnauthorized: boolean = true);
// Create collection object for the Trello user boards
// username: user, which boards we need to get, by default it is a Trello authorization token owner
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getUserBoards(username: string = 'me', collectionName: string = 'Boards', guiOptions: CollectionGuiOptions<Partial<Board>> = {}): BoardsCollection;
// Create collection object for the Trello board lists
// boardId: board id
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getBoardLists(boardId: string, collectionName: string = 'Lists', guiOptions: CollectionGuiOptions<Partial<List>> = {}): ListsCollection;
// Create collection object for the Trello list cards
// listId: list id
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getListCards(listId: string, collectionName: string = 'Cards', guiOptions: CollectionGuiOptions<Partial<Card>> = {}): CardsCollection;
// Create collection object for the Trello card comments
// cardId: card id
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getCardComments(cardId: string, collectionName: string = 'Comments', guiOptions: CollectionGuiOptions<Partial<Comment>> = {}): CommentsCollection;
// Release collection data
// collectionName: identificator of the releasing collection object
releaseCollection(collectionName: string);
Presents Trello boards accessible by user which was specified while collection creation.
Methods:
// Create the observable object and send boards data from the Trello to it
// where (does not working now!): you can filter boards by specifing object with fields as collumn names and it's values as fields values
// fields (does not working now!): you can select which board fields will be returned (null means 'all fields')
select(where: Partial<Board> = {}, fields: (keyof Board)[] = null): EtlObservable<Partial<Board>>;
// Add new board to the Trello
// value: board fields values
async insert(value: Omit<Partial<Board>, 'id'>);
// Update board fields values by board id
// boardId: board id
// value: new board fields values as hash object
async update(boardId: string, value: Omit<Partial<Board>, 'id'>);
// Get all user boards
async get(): Promise<Board[]>;
// Get board by id
// boardId: board id
async get(boardId?: string): Promise<Board>;
// Get board by url from browser
async getByBrowserUrl(url: string): Promise<Board>;
Presents Trello lists on board which was specified while collection creation.
Methods:
// Create the observable object and send lists data from the Trello to it
// where (does not working now!): you can filter lists by specifing object with fields as collumn names and it's values as fields values
// fields (does not working now!): you can select which list fields will be returned (null means 'all fields')
select(where: Partial<List> = {}, fields: (keyof List)[] = null): EtlObservable<Partial<List>>;
// Add new list to the Trello
// value: list fields values
async insert(value: Omit<Partial<List>, 'id'>);
// Update list fields values by list id
// listId: list id
// value: new list fields values as hash object
async update(listId: string, value: Omit<Partial<List>, 'id'>);
// Get all lists
async get(): Promise<List[]>;
// Get list by id
// listId: list id
async get(listId?: string): Promise<List>;
// Archive or unarchive a list
// listId: list id
async switchClosed(listId: string);
// Move list to another board
// listId: list id
// destBoardId: destination board id
async move(listId: string, destBoardId: string);
// Get list actions
// listId: list id
async getActions(listId: string);
Presents Trello cards in list which was specified while collection creation.
Methods:
// Create the observable object and send cards data from the Trello to it
// where (does not working now!): you can filter cards by specifing object with fields as collumn names and it's values as fields values
// fields (does not working now!): you can select which card fields will be returned (null means 'all fields')
select(where: Partial<Card> = {}, fields: (keyof Card)[] = null): EtlObservable<Partial<Card>>;
// Add new card to the Trello
// value: card fields values
async insert(value: Omit<Partial<Card>, 'id'>);
// Update card fields values by card id
// listId: card id
// value: new list fields values as hash object
async update(cardId: string, value: Omit<Partial<Card>, 'id'>);
// Get all cards
async get(): Promise<Card[]>;
// Get card by id
// cardId: card id
async get(cardId?: string): Promise<Card>;
// Archive all cards in current list
async archiveListCards();
// Move all cards from the current list to another board and list
// destBoardId: destination board id
// destListId: destination list id
async moveListCards(destBoardId: string, destListId: string);
Presents Trello card comments in card which was specified while collection creation.
Methods:
// Create the observable object and send comments from the Trello to it
// where (does not working now!): you can filter comments by specifing object with fields as collumn names and it's values as fields values
// fields (does not working now!): you can select which comment fields will be returned (null means 'all fields')
select(where: Partial<Comment> = {}, fields: (keyof Comment)[] = null): EtlObservable<Partial<Comment>>;
// Add new comment to the Trello card
// text: comment text
async insert(text: string);
// Update comment fields values by comment id
// commentId: comment id
// value: new comment fields values as hash object
async update(commentId: string, value: Omit<Partial<Comment>, 'id'>);
// Get all comments
async get(): Promise<Comment[]>;
// Get comment by id
// commentId: card id
async get(commentId?: string): Promise<Comment>;
Example:
import * as rx from 'rxjs';
import * as etl from 'etl-gun';
const trello = new etl.Trello.Endpoint(process.env.TRELLO_API_KEY!, process.env.TRELLO_AUTH_TOKEN!);
const boards = trello.getUserBoards();
const board = await boards.getByBrowserUrl('https://trello.com/b/C9zegsyz/board1');
const lists = trello.getBoardLists(board.id);
const list = (await lists.get())[0];
const cards = trello.getListCards(list.id);
const addCommentToAllCards$ = cards.select().pipe(
rx.tap(card => {
const comments = trello.getBoardLists(card.id, 'cards');
comments.push('New comment');
trello.releaseCollection('cards');
})
);
etl.run(addCommentToAllCards$)
Presents Zendesk task tracking system objects. For details how to get API key and authorization token please read Zendesk documentation.
Methods:
// zendeskUrl: Zendesk web url
// username: user login
// token: Zendesk authorization token
// rejectUnauthorized: You can set it to true to ignore ssl servificate problems while development.
constructor(zendeskUrl: string, username: string, token: string, rejectUnauthorized: boolean = true);
// Create collection object for the Zendesk tickets
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getTickets(collectionName: string = 'Tickets', options: CollectionOptions<Partial<Ticket>> = {}): TicketsCollection;
// Create collection object for the Zendesk tickets fields
// collectionName: identificator of the creating collection object
// guiOptions: Some options how to display this endpoint
getTicketFields(collectionName: string = 'TicketFields', options: CollectionOptions<Partial<Field>> = {}): TicketFieldsCollection;
// Release collection data
// collectionName: identificator of the releasing collection object
releaseCollection(collectionName: string);
Presents all Zendesk tickets.
Methods:
// Create the observable object and send tickets data from the Zendesk to it
// where: you can filter tickets by specifing object with fields as collumn names and it's values as fields values
select(where: Partial<Ticket> = {}): BaseObservable<Partial<Ticket>>;
// Add new ticket to the Zendesk
// value: ticket fields values
async insert(value: Omit<Partial<Ticket>, 'id'>);
// Update ticket fields values by ticket id
// ticketId: ticket id
// value: new ticket fields values as hash object
async update(ticketId: number, value: Omit<Partial<Ticket>, 'id'>);
// Get all tickets
async get(): Promise<Ticket[]>;
// Get ticket by id
// ticketId: ticket id
async get(ticketId: number): Promise<Ticket>;
Presents all Zendesk tickets fields.
Methods:
// Create the observable object and send fields description data from the Zendesk to it
select(): BaseObservable<Partial<Field>>;
// Add new field to the Zendesk
// value: field attributes values
async insert(value: Omit<Partial<Field>, 'id'>);
// Update field attributes by field id
// fieldId: field id
// value: new field attributes values as hash object
async update(fieldId: number, value: Omit<Partial<Field>, 'id'>);
// Get all fields
async get(): Promise<Field[]>;
// Get field by id
// fieldId: field id
async get(fieldId: number): Promise<Field>;
Example:
import * as rx from 'rxjs';
import * as etl from 'etl-gun';
const zendesk = new etl.Zendesk.Endpoint(process.env.ZENDESK_URL!, process.env.ZENDESK_USERNAME!, process.env.ZENDESK_TOKEN!);
const tickets = zendesk.getTickets();
const PrintAllOpenedTickets$ = tickets.select().pipe(
etl.where({status: 'open'}),
etl.log()
)
etl.run(PrintAllOpenedTickets$);
With this endpoint you can create telegram bots and chats with users. It can listen for user messages and send the response massages. It also can set the user keyboard for the chat.
Methods:
// Start bot and return collection object for the bot messages
// collectionName: identificator of the creating collection object
// token: Bot token
// keyboard: JSON keyboard description, see the node-telegram-bot-api for detailes
// Keyboard example: [["Text for command 1", "Text for command 2"], ["Text for command 3"]]
// guiOptions: Some options how to display this endpoint
startBot(collectionName: string, token: string, keyboard?: any, guiOptions: CollectionGuiOptions<TelegramInputMessage> = {}): Collection;
// Stop bot
// collectionName: identificator of the releasing collection object
releaseBot(collectionName: string);
Presents all chat bot messages.
Methods:
// Start reciving of all users messages
select(): Observable<T>;
// Stop reciving of user messages
async stop();
// Pushes message to the chat
// value: Message in TelegramInputMessage type
// chatId: id of the destination chat, get it from input user messages
// message: Message to send
async insert(value: TelegramInputMessage);
async insert(chatId: string, message: string);
// Update keyboard structure to specified
// keyboard: JSON keyboard description, constructor for detailes
setKeyboard(keyboard: any)
Example:
import * as etl from "etl-gun";
const telegram = new etl.Telegram.Endpoint();
const bot = telegram.startBot('bot 1', '**********');
const startTelegramBot$ = bot.select().pipe(
etl.log(), // log user messages to the console
etl.push(bot) // echo input message back to the user
);
etl.run(startTelegramBot$);
This endpoint is analog of RxJs interval() operator, with GUI support. It emits simple counter, which increments every interval.
Methods:
// Create new interval collection object
// collectionName: identificator of the creating collection object
// interval: Time interval in milliseconds between two emitted values
// guiOptions: Some options how to display this endpoint
getSequence(collectionName: string, interval: number, guiOptions: CollectionGuiOptions<number> = {}): Collection;
// Stop interval
// collectionName: identificator of the releasing collection object
releaseSequence(collectionName: string);
Methods:
// Start interval generation, create observable and emit counter of intervals to it
select(): Observable<number>;
// Stop endpoint reading
async stop();
// Set value of interval counter
// value: new value of the interval counter
async insert(value: number);
// Set interval counter to 0
async delete();
Example:
import * as etl from "etl-gun";
const timer = new etl.Interval.Endpoint();
const seq = new etl.getSequence('every 500 ms', 500);
const startTimer$ = seq.select().pipe(
etl.log() // log counter
);
etl.run(startTimer$);
Apart from operators from this library, you can use any operators of RxJs library.
This function runs one or several streams and return promise to waiting when all streams are complites.
import * as etl from "etl-gun";
let memory = new etl.Memory.Endpoint();
let buffer = memory.getBuffer('test buffer', [1, 2, 3, 4, 5]);
let stream$ = buffer.select().pipe(
etl.log()
);
etl.run(stream$);
# etl.log([options])
Prints the value from the stream to the console.
Example:
import * as etl from "etl-gun";
import * as rx from "rxjs";
let stream$ = rx.interval(1000).pipe(
etl.log()
);
etl.run(stream$);
This function checks condition end if false - throws an error to the error collection.
import * as etl from "etl-gun";
let memory = new etl.Memory.Endpoint();
let buffer = memory.getBuffer('test buffer', [{count: 1}, {count: 2}]);
const errorsEndpoint = etl.Errors.getEndpoint();
const errors = errorsEndpoint.getCollection('all');
let stream$ = buffer.select().pipe(
etl.expect('count = 1', { count: 1 }, errors),
etl.expect('count one of [1,2,3]', { count: etl.VALUE.of([1,2,3]) }),
etl.expect('count not > 3', { count: etl.VALUE.not['>'](3) }),
etl.expect('count check function', { count: v => v < 5 }),
etl.log()
);
etl.run(stream$);
Variants:
function where<T>(condition: Condition<T>): OperatorFunction<T, T>;
This operator is analog of where operation in SQL and is synonym of the filter operator from the RxJS library - but with improvements. It cat skip some values from the input stream by the specified condition. You can specify predicate function to determine filter conditions or you can specify map object as condition (like typeorm 'where' parameter in find() method).
Example:
import * as etl from "etl-gun";
import * as rx from "rxjs";
let stream$ = rx.interval(1000).pipe(
etl.where(v => v % 2 === 0),
etl.where('count = 1', { count: 1 }),
etl.where('count one of [1,2,3]', { count: etl.VALUE.of([1,2,3]) }),
etl.where('count not > 3', { count: etl.VALUE.not['>'](3) }),
etl.log()
);
etl.run(stream$);
# etl.push([options])
This operator call the Endpoint.push method to push value from stream to the specified endpoint.
Variants:
// Push value to collection with no wait for result
function push<S, T=S>(collection: BaseCollection<T>, options?: PushOptions<S, T> | null): OperatorFunction<S, S>;
// Push value to collection with waiting for result
function pushAndWait<S, T=S>(collection: BaseCollection<T>, options?: PushOptions<S, T> | null): OperatorFunction<S, S>;
// Push value to collection, wait for result, send result to log
function pushAndLog<S, T=S>(collection: BaseCollection<T>, options?: PushOptions<S, T> | null): OperatorFunction<S, S>;
// Push value to collection, get the result and put it as stream value or as property of stream value
function pushAndGet<S, T, R>(collection: BaseCollection<T>, options?: PushOptions<S, T> & {toProperty: string} | null): OperatorFunction<S, R>;
Example:
import * as etl from "etl-gun";
import * as rx from "rxjs";
let csv = new etl.Csv.Endpoint();
let dest = csv.getFile('test.csv');
let stream$ = rx.interval(1000).pipe(
etl.push(dest)
);
etl.run(stream$);
# etl.rools([options])
This operator integrates etl engine with Rools business rules engine. It calls the specified rule set and put the current stream data value to it as a fact, and then uses the result of rules execution as a new stream value.
It allows separate business logic of data analysis and transformation from other etl code. You can load rules from any source at runtime and allow to modify it by the end users.
In rules you can:
You can write you rules in any order, it does not metter to rules engine. See rools documentation for details.
Specification:
type EtlRoolsResult = {
etl?: {
skip?: boolean;
stop?: boolean;
error?: string;
}
}
function rools<T, R = T>(rools: Rools): rx.OperatorFunction<T, R>;
Example:
import * as etl from "etl-gun";
import * as rx from "rxjs";
import { Rools, Rule } from 'rools';
type DbProduct = {
id: number,
name: string,
price: number,
tax_class_id?: number
}
const ruleSkipCheapProducts = new Rule({
name: 'skip products with price <= 1000',
when: (product: DbProduct) => product.price! <= 1000,
then: (product: DbProduct & EtlRoolsResult) => {
product.etl = {skip: true};
},
});
const ruleSetProductTaxClass = new Rule({
name: 'update product tax class',
when: (product: DbProduct) => product.price! > 1000,
then: (product: DbProduct & EtlRoolsResult) => {
product.tax_class_id = 10;
},
});
const rules = new Rools();
await rules.register([ruleSkipCheapProducts, ruleSetProductTaxClass]);
const db = new etl.databases.MySql.Endpoint(process.env.MYSQL_CONNECTION_STRING!, undefined, 'mysql2');
const table = db.getTable<DbProduct>('products');
const PrintRichProducts$ = table.select().pipe(
etl.rools(rules),
etl.log()
)
await etl.run(PrintRichProducts$);
# etl.toProperty([options])
This operator moves to specified property the whole stream value or it's property. Lodash paths is supported.
Example:
import * as etl from "etl-gun";
const memory = etl.Memory.getEndpoint();
const buf = memory.getBuffer<number>('buf', [1,2,3,4,5]);
let stream$ = src.select().pipe(
etl.move<{ nn: number }>({to: 'nn'}), // 1 -> { nn: 1 }
etl.move<{ num: number }>({from: 'nn', to: 'num'}), // { nn: 1 } -> { num: 1 }
etl.copy<{ num: number, kk: {pp: number} }>('nn', 'kk.pp'), // { nn: 1 } -> { nn: 1, kk: {pp: 1} }
etl.log()
);
etl.run(stream$);
# etl.toProperty([options])
This operator copy the specified property of the stream value to the another property. Lodash paths is supported.
Example:
import * as etl from "etl-gun";
const memory = etl.Memory.getEndpoint();
const buf = memory.getBuffer<number>('buf', [1,2,3,4,5]);
let stream$ = src.select().pipe(
etl.move<{ nn: number }>({to: 'nn'}), // 1 -> { nn: 1 }
etl.move<{ num: number }>({from: 'nn', to: 'num'}), // { nn: 1 } -> { num: 1 }
etl.copy<{ num: number, kk: {pp: number} }>('nn', 'kk.pp'), // { nn: 1 } -> { nn: 1, kk: {pp: 1} }
etl.log()
);
etl.run(stream$);
# etl.numerate([options])
This operator enumerate input values and add index field to value if it is object or index column if value is array. If the input stream values is objects, you should specify index field name as the second parameter of operator.
Example:
import * as etl from "etl-gun";
let csv = new etl.Csv.Endpoint();
let src = csv.getFile('test.csv');
let stream$ = src.select().pipe(
etl.numerate(10), // 10 is the first value for numeration
etl.log()
);
etl.run(stream$);
# etl.addField([options])
This operator applicable to the stream of objects. It calculate callback function and add result as new field to the input stream value.
Example:
import * as etl from "etl-gun";
const pg = new etl.Postgres.Endpoint('postgres://user:password@127.0.0.1:5432/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.addField('NAME_IN_UPPERCASE', value => value.name.toUpperCase()),
etl.log()
);
etl.run(logUsers$);
# etl.addColumn([options])
This operator applicable to the stream of arrays. It calculate callback function and add result as a new column to the input stream value.
Example:
import * as etl from "etl-gun";
let csv = new etl.Csv.Endpoint();
let src = csv.getFile('test.csv');
const stream$ = src.select().pipe(
etl.addColumn(value => value[2].toUpperCase()),
etl.log()
);
etl.run(stream$);
# etl.join([options])
This operator is analog of join operation in SQL. It takes the second input stream as the parameter, and gets all values from this second input stream for every value from the main input stream. Then it merges both values to one object (if values are objects) or to one array (if at least one of values are array), and put the result value to the main stream.
Example:
import * as etl from "etl-gun";
let csv = new etl.Csv.Endpoint();
let src = csv.getFile('test.csv');
let mem = new etl.Memory.Endpoint();
let buffer = mem.getBuffer('buffer 1', [1, 2, 3, 4, 5]);
let stream$ = src.select().pipe(
etl.join(buffer),
etl.log()
);
etl.run(stream$);
# etl.mapAsync([options])
This operator is analog of rxjs map operator for async callback function. It call and wait for callback and then use it's result as new stream item.
Example:
import * as etl from "etl-gun";
let mem = new etl.Memory.Endpoint();
let buffer = mem.getBuffer('urls', ['1.json', '2.json', '3.json']);
const mySite = new HttpClientHelper('http://www.mysite.com/jsons');
let stream$ = src.select().pipe(
mapAsync(async (url) => await mySite.getJson(url)),
etl.log()
);
etl.run(stream$);
This class help you to use Google translate service.
import { Csv, GoogleTranslateHelper, log, run } from "etl-gun";
let csv = new Csv.Endpoint();
let src = csv.getFile('products.csv');
const translator = new GoogleTranslateHelper(process.env.GOOGLE_CLOUD_API_KEY!, 'en', 'ru');
let translateProducts$ = src.select().pipe(
translator.operator(),
log()
);
await run(translateProducts$);
This class help you to make requests to http and https resources and get data from it.
Methods:
// Create helper object
// baseUrl: this string will be used as start part of urls in any helper methods
// headers: will be added to headers in all requests maden with this helper instance
constructor(baseUrl?: string, headers?: Record<string, string>);
// GET request
async get(url?: string, headers?: Record<string, string>): Promise<Response>;
async getJson(url?: string, headers?: Record<string, string>): Promise<any>;
async getText(url?: string, headers?: Record<string, string>): Promise<string>;
async getBlob(url?: string, headers?: Record<string, string>): Promise<Blob>;
async getFileContents(url?: string, headers?: Record<string, string>): Promise<Blob>;
getJsonOperator<T, R = T>(): OperatorFunction<T, R>;
getJsonOperator<T, R = T>(url: string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getJsonOperator<T, R = T>(getUrl: (value: T) => string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getTextOperator<T>(): OperatorFunction<T, string>;
getTextOperator<T>(url: string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, string>;
getTextOperator<T>(getUrl: (value: T) => string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, string>;
getBlobOperator<T>(): OperatorFunction<T, Blob>;
getBlobOperator<T, R = T>(url: string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getBlobOperator<T, R = T>(getUrl: (value: T) => string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getFileContentsOperator<T>(): OperatorFunction<T, Blob>;
getFileContentsOperator<T, R = T>(url: string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getFileContentsOperator<T, R = T>(getUrl: (value: T) => string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
// POST request
async post(body: string, url?: string, headers?: Record<string, string>): Promise<Response>;
async postJson(body: any, url?: string, headers?: Record<string, string>): Promise<any>;
async postText(body: string, url?: string, headers?: Record<string, string>): Promise<string>;
postJsonOperator<T, R = T>(bodyParam?: any | ((value: T) => any), urlParam?: string | ((value: T) => string), toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
// PUT request
async put(body: string, url?: string, headers?: Record<string, string>): Promise<Response>;
async putJson(body: any, url?: string, headers?: Record<string, string>): Promise<any>;
async putText(body: string, url?: string, headers?: Record<string, string>): Promise<string>;
putJsonOperator<T, R = T>(bodyParam?: any | ((value: T) => any), urlParam?: string | ((value: T) => string), toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
// Simple fetch method
async fetch(url: string): Promise<Response>;
async fetch(url: string, method: 'GET' | 'POST' | 'PUT' | 'DELETE', headers: Record<string, string>): Promise<Response>;
async fetch(url: string, init: RequestInit): Promise<Response>;
Example:
import { Csv, HttpClientHelper, run } from "etl-gun";
import { map } from "rxjs";
let csv = new Csv.Endpoint();
let src = csv.getFile('products.csv');
const mySite = new HttpClientHelper('http://www.mysite.com');
let sendProductsToSite$ = src.select().pipe(
map(p => mySite.post(p)),
);
await run(sendProductsToSite$);
This class can store array of column names and convert object to array or array to object representation.
import { Postgres, Csv, Header, log, push, run } from "etl-gun";
import { map } from "rxjs";
const pg = new Postgres.Endpoint("postgres://user:password@127.0.0.1:5432/database");
const source = pg.getTable("users");
let csv = new Csv.Endpoint();
const dest = csv.getFile("users.csv");
const header = new Header([{"id": "number"}, "name", {"login": "string", nullValue: "-null-"}, "email"]);
let sourceToDest$ = source.select().pipe(
map(v => header.objToArr(v)),
push(dest)
);
await run(sourceToDest$);
This functions implements some useful things to manipulate data.
// Stop thread for the specified in milliseconds delay.
async function wait(delay: number): Promise<void>;
// Join url parts (or path parts) to full url (or path) with delimeter
function pathJoin(parts: string[], sep: string = '/'): string;
// Extract 'html' from '/var/www/html'
function extractFileName(path: string): string;
// Extract '/var/www' from '/var/www/html'
function extractParentFolderPath(path: string): string
// Get object part by json path
function getByJsonPath(obj: {}, jsonPath?: string): any;
// Get child element of array or object by element property value
function getChildByPropVal(obj: {}, propName: string, propVal?: any): any;
// Convert object to string
function dumpObject(obj: any, deep: number = 1): string;
// Get child by it's property value
// For example getChildByPropVal([{prop1: 'val1'}, {prop1: 'val2'}], 'prop1', 'val1') -> returns {prop1: 'val1'}
function getChildByPropVal(obj: {} | [], propName: string, propVal?: any): any;
This library is provided with MIT license.
FAQs
ETL toolkit which supports RxJS streams, error handling, business rules and many more
The npm package etl-gun receives a total of 95 weekly downloads. As such, etl-gun popularity was classified as not popular.
We found that etl-gun demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.