ETL-Gun
ETL-Gun is a platform that employs RxJs observables, allowing developers to build stream-based ETL (Extract, Transform, Load) pipelines complete with buffering, error handling and many useful features.
Table of Contents
Why / when would I need this?
ETL-Gun is a simple ETL glue represented as an extention to the RxJs library.
Typically, you'd use ETL-Gun to help with ETL processes. It can extract data from the one or more sources, transform it and load to one or more destinations in nedded order.
You can use javascript and typescript with it.
ETL-Gun will NOT help you with "big data" - it executes on the one computer and is not supports clustering from the box.
Here's some ways to use it:
- Read some data from database and export it to the .csv file and vice versa
- Create file converters
- Filter or sort content of some files
- Run some queries in database
- Create Telegram bots with Telegram.Endpoint
You can find many examples of using ETL-Gun in the API Reference section of this file.
Installation
npm install etl-gun
or
yarn add etl-gun
Usage
Import the ETL-Gun library in the desired file to make it accessible.
Warning: Since the version 2.0.4 this library is native ESM and no longer provides a CommonJS export. If your project uses CommonJS, you will have to convert to ESM or use the dynamic import()
function.
Introductory example of library using: postgresql -> .csv
import { map } from "rxjs";
import { Csv, GuiManager, Header, Postgres, log, push, run } from "etl-gun";
const postgres = new Postgres.Endpoint("postgres://user:password@127.0.0.1:5432/database");
const source = postgres.getTable('users');
const csv = new Csv.Endpoint('./dest-folder');
const dest = csv.getFile('users.scv');
const header = new Header("id", "name", "login", "email");
const sourceToDest$ = source.select().pipe(
log(),
map(v => header.objToArr(v)),
push(dest)
);
await run(sourceToDest$);
Concept
ETL-Gun contains several main concepts:
- Endpoints - sources and destinations of data, which holds connection to the one system instance and other parameters of this system, and groups methods to get collections related this system
- Collections - data object types exists in the endpoint system
- Piplines (or streams) - routs of data transformation and delivery, based on RxJs streams
Using of this library consists of 3 steps:
- Define your endpoints and collections for sources and destinations
- Define data transformation pipelines using pipe() method of input streams of your source endpoints
- Run transformation pipelines in order and wait for completion
ETL process:
- Extract: Data extraction from the source collection performs with select() method, which returns the RxJs stream
- Transform: Use any RxJs and ETL-Gun operators inside pipe() method of the input stream to transform the input data. To complex data transformation you can use the Memory.Endpoint class, which can store data and which collections have forEach() and some other methods to manipulate with data in it
- Load: Loading of data to the destination endpoint performs with push() collection operator
Chaining:
Chaning of data transformation performs with pipe() method of the input data stream.
Chaning of several streams performs by using await with run() procedure.
Features
- Simple way to use! Consists of only 3 steps:
- Create endpoints and get all collections which you need
- Create pipelines to process collection data (via select method of the source collections)
- Run piplines in order you want (with run operator)
- This library contains embedded debug console. It created as console application and works in any terminals. It supports step-by-step debuging with watching the processed values. If you want to use this GUI - you simply need to create the instance of GuiManager class before any endpoints and collections creation (see GUI)
- Library written in typescript, contains end systems types information and full support of types checking. But you can use it in javascript applications too
- Fully compatible with RsJs library, it's observables, operators etc.
- Contains many kind of sources and destinations, for example many relational databases (Postgre, Mysql, ...), file formats (csv, json, xml), business applications (Magento, Trello, ZenDesk, ...), etc.
- Work with any types of input/output data, including arrays any hierarchical data structures (json, xml)
- With endpoint events mechanism you can handle different stream events, for example stream start/end, errors and other (see Endpoint)
- Supports validation and error handling mechanism:
- Data validation with expect operator
- Special endpoint type for errors, which base on queue
- Any collections contains property errors with endpoint which collect all errors, occurred while collection processing. This endpoint automatic creates when the collection creates, but you can change it's value to collect errors in place of you chois
- Any collections contains method selectErrors to create processing pipeline for the collection errors
- Console GUI display all error collections, statistic for it and it's errors
- Contains some ready to use helpers and integrations, for example you can translate some data to another language with GoogleTranslateHelper
- Contains business rules integration and allows to extract analisys and transformation logic from the etl program sources, and then change it in runtime without application changing and redeployment (see rools)
GUI
- Simple way to use, you need only create instance of GuiManager class before any endpoint creation (at the begin of the program)
- You can pause the ETL-process and resume it with 'space' on keyboard
- With 'enter' you can execute ETL process step-by-step in pause mode
- With 'esc' you can quit the program
- GUI display full list of created endpoints, collections, their statuses and last values recived from (or pushed to) them
- Logs are displayed in footer part of console window
- You can select the log window with 'tab' and scroll it with up/down arrows
Examples (how to)
Export rows from Postgres table to csv-file (postgresql -> .csv)
import { Postgres, Csv, Header, log, push, run } from "etl-gun";
import { map } from "rxjs";
const postgres = new Postgres.Endpoint("postgres://user:password@127.0.0.1:5432/database");
const source = postgres.getTable('users');
const csv = new Csv.Endpoint('./dest-folder');
const dest = csv.getFile('users.scv');
const header = new Header("id", "name", "login", "email");
const sourceToDest$ = source.select().pipe(
log(),
map(v => header.objToArr(v)),
push(dest)
);
await run(sourceToDest$);
Sort rows in csv-file by the first column (.csv -> .csv)
import * as etl from "etl-gun";
const csvEndpoint = new etl.Csv.Endpoint();
const csv = csvEndpoint.getFile('users.scv');
const memory = new etl.Memory.Endpoint();
const buffer = memory.getBuffer('buffer 1');
const scvToBuffer$ = csv.select().pipe(
etl.push(buffer)
);
const bufferToCsv$ = buffer.select().pipe(
etl.push(csv)
);
await etl.run(scvToBuffer$);
buffer.sort((row1, row2) => row1[0] > row2[0]);
await csv.delete();
await etl.run(bufferToCsv$)
Create telegram bot with translation functionality
import * as etl from "etl-gun";
const telegram = new etl.Telegram.Endpoint();
const bot = telegram.startBot('bot 1', process.env.TELEGRAM_BOT_TOKEN!);
const translator = new etl.GoogleTranslateHelper(process.env.GOOGLE_CLOUD_API_KEY!, 'en', 'ru');
const startTelegramBot$ = bot.select().pipe(
etl.log(),
translator.operator([], [message]),
etl.push(bot)
);
etl.run(startTelegramBot$);
API Reference
Core
BaseCollection
Base class for all collections. Declares public interface of collection and implements event mechanism.
Methods:
select(where?: any): Observable<any>;
async insert(value: any);
async update(where: any, value: any);
async delete(where?: any);
on(event: CollectionEvent, listener: (...data: any[]) => void);
errors: Errors.ErrorsQueue;
selectErrors(stopOnEmpty: boolean = false): BaseObservable<EtlError>;
Types:
export type CollectionEvent =
"select.start" |
"select.end" |
"select.recive" |
"select.error" |
"select.skip" |
"select.up" |
"select.down" |
"pipe.start" |
"pipe.end" |
"insert" |
"update" |
"delete";
Endpoints and it's collections
Errors
Store and process etl errors. Every collection by default has errors property wich contains collection of errors from collection etl process. You can cancel default errors collection creation for any collection, and specify your own manualy created error collection.
Endpoint
getCollection(collectionName: string, options: CollectionOptions<EtlError> = {}): ErrorsQueue;
releaseCollection(collectionName: string);
ErrorsQueue
Queue in memory to store etl errors and process thea. Should be created with getCollection method of Errors.Endpoint
Methods:
select(stopOnEmpty: boolean = false): BaseObservable<EtlError>;
async insert(error: EtlError);
async delete();
Memory
Create and manipulate with collections of objects in memory.
Endpoint
getBuffer<T>(collectionName: string, values: T[] = [], guiOptions: CollectionGuiOptions<T> = {}): BufferCollection;
releaseBuffer(collectionName: string);
getQueue<T>(collectionName: string, values: T[] = [], guiOptions: CollectionGuiOptions<T> = {}): QueueCollection;
releaseQueue(collectionName: string);
BufferCollection
Buffer to store values in memory and perform complex operations on it. Should be created with getBuffer method of MemoryEndpoint
Methods:
select(): Observable<T>;
async insert(value: T);
async delete();
sort(compareFn: (v1: T, v2: T) => number | boolean);
forEach(callbackfn: (value: T, index: number, array: T[]) => void);
Example:
import * as etl from "etl-gun";
const csvEndpoint = new etl.Csv.Endpoint();
const csv = csvEndpoint.getFile('users.scv');
const memory = new etl.Memory.Endpoint();
const buffer = memory.getBuffer('buffer 1');
const scvToBuffer$ = csv.select().pipe(
etl.push(buffer);
)
const bufferToCsv$ = buffer.select().pipe(
etl.push(csv)
)
await etl.run(scvToBuffer$);
buffer.sort((row1, row2) => row1[0] > row2[0]);
await csv.delete();
etl.run(bufferToCsv$)
QueueCollection
Queue to store values in memory and perform ordered processing of it. Should be created with getQueue method of MemoryEndpoint
Methods:
select(dontStopOnEmpty: boolean = false, interval: number = 0): Observable<T>;
async insert(value: T);
async delete();
Local Filesystem
Search for files and folders with standart unix shell wildcards see glob documentation for details.
Endpoint
Methods:
constructor(rootFolder: string);
getFolder(folderName: string = '.', guiOptions: CollectionGuiOptions<PathDetails> = {}): Collection;
releaseFolder(folderName: string);
Collection
Methods:
select(mask: string = '*', options?: ReadOptions): BaseObservable<PathDetails>;
async insert(pathDetails: PathDetails, data?: string | NodeJS.ArrayBufferView | Iterable<string | NodeJS.ArrayBufferView> | AsyncIterable<string | NodeJS.ArrayBufferView> | internal.Stream);
async insert(filePath: string, data?: string | NodeJS.ArrayBufferView | Iterable<string | NodeJS.ArrayBufferView> | AsyncIterable<string | NodeJS.ArrayBufferView> | internal.Stream, isFolder?: boolean);
async delete(mask: string = '*', options?: ReadOptions);
Types:
type ReadOptions = {
includeRootDir?: boolean;
objectsToSearch?:
'filesOnly' |
'foldersOnly' |
'all';
}
type PathDetails = {
isFolder: boolean
name: string;
relativePath: string;
fullPath: string;
parentFolderRelativePath: string;
parentFolderFullPath: string;
}
Example:
import * as etl from "etl-gun";
import * as rxjs from "rxjs";
const fs = new etl.Filesystem.Endpoint('~');
const scripts = ep.getFolder('scripts');
const printAllJsFileNames$ = scripts.select('**/*.js').pipe(
rx.map(v => v.name)
etl.log()
);
etl.run(printAllJsFileNames$)
FTP
Endpoint to access files on ftp and ftps servers. Implementation based on Basic ftp package.
Endpoint
Methods:
constructor(options: AccessOptions, verbose: boolean = false);
getFolder(folderPath: string = '.', options: CollectionOptions<FileInfo> = {}): Collection;
releaseFolder(folderPath: string);
Collection
Methods:
select(): BaseObservable<FileInfo>;
async insertFolder(remoteFolderPath: string);
async insertFile(remoteFilePath: string, localFilePath: string);
async insertFile(remoteFilePath: string, sourceStream: Readable);
async insertFileWithContents(remoteFilePath: string, fileContents: string);
async insert(remotePath: string, contents: { isFolder: boolean, localFilePath?: string, sourceStream?: Readable, contents?: string });
async deleteFolder(remoteFolderPath: string);
async deleteEmptyFolder(remoteFolderPath: string);
async deleteFile(remoteFilePath: string);
async delete(remotePath: string);
Example:
import * as etl from "etl-gun";
import * as rxjs from "rxjs";
const ftp = new etl.filesystems.Ftp.Endpoint({host: process.env.FTP_HOST, user: process.env.FTP_USER, password: process.env.FTP_PASSWORD});
const folder = ftp.getFolder('/var/logs');
const PrintFolderContents$ = folder.select().pipe(
etl.log()
)
await etl.run(PrintFolderContents$);
Csv
Parses source csv file into individual records or write record to the end of destination csv file. Every record is csv-string and presented by array of values.
Endpoint
Methods:
getFile(filename: string, delimiter: string = ",", guiOptions: CollectionGuiOptions<string[]> = {}): Collection;
releaseFile(filename: string);
Collection
Methods:
select(skipFirstLine: boolean = false, skipEmptyLines = false): Observable<string[]>;
async insert(value: string[]);
async delete();
Example:
import * as etl from "etl-gun";
const csv = new etl.Csv.Endpoint('~');
const testFile = csv.getFile('test.csv')
const logTestFileRows$ = testFile.select().pipe(
etl.log()
);
etl.run(logTestFileRows$)
Json
Read and write json file with buffering it in memory. You can get objects from json by path specifing in JSONPath format or in lodash simple path manner (see logash 'get' function documentation).
Endpoint
Methods:
getFile(filename: string, autosave: boolean = true, autoload: boolean = false, encoding?: BufferEncoding, guiOptions: CollectionGuiOptions<number> = {}): Collection;
releaseFile(filename: string);
Collection
Methods:
select(path: string, options?: ReadOptions): Observable<any>;
selectByJsonPath(jsonPath: string | string[], options?: ReadOptions): Observable<any>;
get(path: string): any;
getByJsonPath(jsonPath: string): any;
async insert(value: any, path?: string, fieldname?: string);
async delete();
load();
save();
Types:
type JsonReadOptions = {
searchReturns?: 'foundedOnly'
| 'foundedImmediateChildrenOnly'
| 'foundedWithDescendants';
addRelativePathAsField?: string;
}
Example:
import * as etl from "etl-gun";
import { tap } from "rxjs";
const json = new etl.Json.Endpoint('~');
const testFile = etl.getFile('test.json');
const printJsonBookNames$ = testFile.select('store.book').pipe(
tap(book => console.log(book.name))
);
const printJsonAuthors$ = testFile.selectByJsonPath('$.store.book[*].author', {searchReturns: 'foundedOnly', addRelativePathAsField: "path"}).pipe(
etl.log()
);
await etl.run(printJsonAuthors$, printJsonBookNames$);
Xml
# etl.XmlEndpoint(filename, autosave?, autoload?, encoding?)
Read and write XML document with buffering it in memory. You can get nodes from XML by path specifing in XPath format.
Endpoint
Methods:
getFile(filename: string, autosave: boolean = true, autoload: boolean = false, encoding?: BufferEncoding, guiOptions: CollectionGuiOptions<string[]> = {}): Collection;
releaseFile(filename: string);
Collection
Methods:
select(xpath: string = '', options: XmlReadOptions = {}): EtlObservable<Node>;
get(xpath: string = ''): XPath.SelectedValue
async insert(value: any, xpath: string = '', attribute: string = '');
async delete();
load();
save();
Types:
export type XmlReadOptions = {
searchReturns?: 'foundedOnly'
| 'foundedImmediateChildrenOnly'
| 'foundedWithDescendants';
addRelativePathAsAttribute?: string;
}
Example
import * as etl from "etl-gun";
import { map } from "rxjs";
const xml = new etl.Xml.Endpoint('/tmp');
const testFile = xml.getFile('test.xml');
const printXmlAuthors$ = testFile.select('/store/book/author').pipe(
map(v => v.firstChild.nodeValue),
etl.log()
);
await etl.run(printXmlAuthors$);
Knex
Represents common Knex database. Based on knex engine.
KnexEndpoint
Methods:
constructor(client: ClientType, connectionString: string, pool?: PoolConfig);
constructor(client: ClientType, connectionConfig: ConnectionConfig, pool?: PoolConfig);
constructor(knexConfig: pkg.Knex.Config);
getTable<T = Record<string, any>>(table: string, options: CollectionOptions<string[]> = {}): KnexTableCollection<T>;
getQuery<T = Record<string, any>>(collectionName: string, query: string, options: CollectionOptions<string[]> = {}): KnexQueryCollection<T>;
releaseCollection(collectionName: string);
async releaseEndpoint();
KnexTableCollection
Presents the table from the database.
Methods:
select(where: SqlCondition<T>, fields?: string[]): BaseObservable<T>;
select(whereSql?: string, whereParams?: any[], fields?: string[]): BaseObservable<T>;
async insert(value: T): Promise<number[]>;
async insert(values: T[]): Promise<number[]>;
async update(value: T, where: SqlCondition<T>): Promise<number>;
async update(value: T, whereSql?: string, whereParams?: any[]): Promise<number>;
async upsert(value: T): Promise<number[]>;
async delete(where: SqlCondition<T>): Promise<number>;
async delete(whereSql?: string, whereParams?: any[]): Promise<number>;
KnexQueryCollection
Readonly collection of sql query results.
Methods:
select(params?: any[]): BaseObservable<T>;
CockroachDB
Represents CockroachDB database. Endpoint implementation based on KnexEndpoint.
You should install node-postgres (aka 'pg') package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.CockroachDb.Endpoint('postgres://user:password@127.0.0.1:5432/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
MariaDB
Represents MariaDB database. Endpoint implementation based on KnexEndpoint.
You should install mysql package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.MariaDb.Endpoint('mysql://user:password@127.0.0.1:3306/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
MS SQL Server
Represents MS SQL Server database. Endpoint implementation based on KnexEndpoint.
You should install tedious package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.SqlServer.Endpoint('mssql://user:password@127.0.0.1:1433/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
MySQL
Represents MySQL database. Endpoint implementation based on KnexEndpoint.
You should install mysql package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig, driver?: 'mysql' | 'mysql2');
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.MySql.Endpoint('mysql://user:password@127.0.0.1:3306/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Oracle DB
Represents Oracle database. Endpoint implementation based on KnexEndpoint.
You should install oracledb package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.OracleDb.Endpoint({
host: config.oracle.host,
user: config.oracle.user,
password: config.oracle.password,
database: config.oracle.database,
});
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Postgres
Represents PostgreSQL database. Endpoint implementation based on KnexEndpoint.
You should install node-postgres (aka 'pg') package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.Postgres.Endpoint('postgres://user:password@127.0.0.1:5432/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Amazone Redshift
Represents Amazone Redshift database. Endpoint implementation based on KnexEndpoint.
You should install node-postgres (aka 'pg') package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.Redshift.Endpoint('postgres://user:password@127.0.0.1:5439/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
SQLite
Represents SQLite3 database. Endpoint implementation based on KnexEndpoint.
You should install sqlite3 package module to use this endpoint!
Endpoint
Extends KnexEndpoint and contains all it's methods.
Constructors:
constructor(connectionString: string, pool?: PoolConfig);
constructor(connectionConfig: ConnectionConfig, pool?: PoolConfig);
Example:
import * as etl from "etl-gun";
const pg = new etl.databases.SqlLite.Endpoint(connection: {
filename: "./mydb.sqlite"
});
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.log()
);
etl.run(logUsers$)
Magento
Presents Magento CMS objects.
Go to https://meetanshi.com/blog/create-update-product-using-rest-api-in-magento-2/ for details how to configure Magento integration to get access to it's API.
Endpoint
Methods:
constructor(magentoUrl: string, login: string, password: string, rejectUnauthorized: boolean = true);
getProducts(guiOptions: CollectionGuiOptions<Partial<Product>> = {}): ProductsCollection;
releaseProducts();
ProductsCollection
Presents Magento CMS products.
Methods:
select(where: Partial<Product> = {}, fields: (keyof Product)[] = null): BaseObservable<Partial<Product>> ;
async insert(value: NewProductAttributes);
async uploadImage(product: {sku: string} | string, imageContents: Blob, filename: string, label: string, type: "image/png" | "image/jpeg" | string): Promise<number>;
uploadImageOperator<T>(func: (value: T) => {product: {sku: string} | string, imageContents: Blob, filename: string, label: string, type: "image/png" | "image/jpeg" | string}): OperatorFunction<T, T>;
static async getProducts(endpoint: Endpoint, where: Partial<Product> = {}, fields: (keyof Product)[] = null): Promise<Partial<Product>[]>;
Example:
import * as etl from "etl-gun";
const magento = new etl.Magento.Endpoint('https://magento.test', process.env.MAGENTO_LOGIN!, process.env.MAGENTO_PASSWORD!);
const products = magento.getProducts();
const logProductsWithPrice100$ = products.select({price: 100}).pipe(
etl.log()
);
etl.run(logProductsWithPrice100$)
StockCollection
Presents Magento CMS stock items. Stock items - is products on stock.
Methods:
select(sku: string): BaseObservable<StockItem>;
select(product: Partial<Product>): BaseObservable<StockItem>;
public async getStockItem(sku: string): Promise<StockItem>;
public async getStockItem(product: {sku: string}): Promise<StockItem>;
public async updateStockQuantity(sku: string, quantity: number);
public async updateStockQuantity(product: {sku: string}, quantity: number);
Trello
Presents Trello task tracking system objects.
For details how to get API key and authorization token please read Trello documentation.
Endpoint
Methods:
constructor(apiKey: string, authToken: string, url: string = "https://trello.com", rejectUnauthorized: boolean = true);
getUserBoards(username: string = 'me', collectionName: string = 'Boards', guiOptions: CollectionGuiOptions<Partial<Board>> = {}): BoardsCollection;
getBoardLists(boardId: string, collectionName: string = 'Lists', guiOptions: CollectionGuiOptions<Partial<List>> = {}): ListsCollection;
getListCards(listId: string, collectionName: string = 'Cards', guiOptions: CollectionGuiOptions<Partial<Card>> = {}): CardsCollection;
getCardComments(cardId: string, collectionName: string = 'Comments', guiOptions: CollectionGuiOptions<Partial<Comment>> = {}): CommentsCollection;
releaseCollection(collectionName: string);
BoardsCollection
Presents Trello boards accessible by user which was specified while collection creation.
Methods:
select(where: Partial<Board> = {}, fields: (keyof Board)[] = null): EtlObservable<Partial<Board>>;
async insert(value: Omit<Partial<Board>, 'id'>);
async update(boardId: string, value: Omit<Partial<Board>, 'id'>);
async get(): Promise<Board[]>;
async get(boardId?: string): Promise<Board>;
async getByBrowserUrl(url: string): Promise<Board>;
ListsCollection
Presents Trello lists on board which was specified while collection creation.
Methods:
select(where: Partial<List> = {}, fields: (keyof List)[] = null): EtlObservable<Partial<List>>;
async insert(value: Omit<Partial<List>, 'id'>);
async update(listId: string, value: Omit<Partial<List>, 'id'>);
async get(): Promise<List[]>;
async get(listId?: string): Promise<List>;
async switchClosed(listId: string);
async move(listId: string, destBoardId: string);
async getActions(listId: string);
CardsCollection
Presents Trello cards in list which was specified while collection creation.
Methods:
select(where: Partial<Card> = {}, fields: (keyof Card)[] = null): EtlObservable<Partial<Card>>;
async insert(value: Omit<Partial<Card>, 'id'>);
async update(cardId: string, value: Omit<Partial<Card>, 'id'>);
async get(): Promise<Card[]>;
async get(cardId?: string): Promise<Card>;
async archiveListCards();
async moveListCards(destBoardId: string, destListId: string);
Presents Trello card comments in card which was specified while collection creation.
Methods:
select(where: Partial<Comment> = {}, fields: (keyof Comment)[] = null): EtlObservable<Partial<Comment>>;
async insert(text: string);
async update(commentId: string, value: Omit<Partial<Comment>, 'id'>);
async get(): Promise<Comment[]>;
async get(commentId?: string): Promise<Comment>;
Example:
import * as rx from 'rxjs';
import * as etl from 'etl-gun';
const trello = new etl.Trello.Endpoint(process.env.TRELLO_API_KEY!, process.env.TRELLO_AUTH_TOKEN!);
const boards = trello.getUserBoards();
const board = await boards.getByBrowserUrl('https://trello.com/b/C9zegsyz/board1');
const lists = trello.getBoardLists(board.id);
const list = (await lists.get())[0];
const cards = trello.getListCards(list.id);
const addCommentToAllCards$ = cards.select().pipe(
rx.tap(card => {
const comments = trello.getBoardLists(card.id, 'cards');
comments.push('New comment');
trello.releaseCollection('cards');
})
);
etl.run(addCommentToAllCards$)
Zendesk
Presents Zendesk task tracking system objects.
For details how to get API key and authorization token please read Zendesk documentation.
Endpoint
Methods:
constructor(zendeskUrl: string, username: string, token: string, rejectUnauthorized: boolean = true);
getTickets(collectionName: string = 'Tickets', options: CollectionOptions<Partial<Ticket>> = {}): TicketsCollection;
getTicketFields(collectionName: string = 'TicketFields', options: CollectionOptions<Partial<Field>> = {}): TicketFieldsCollection;
releaseCollection(collectionName: string);
TicketsCollection
Presents all Zendesk tickets.
Methods:
select(where: Partial<Ticket> = {}): BaseObservable<Partial<Ticket>>;
async insert(value: Omit<Partial<Ticket>, 'id'>);
async update(ticketId: number, value: Omit<Partial<Ticket>, 'id'>);
async get(): Promise<Ticket[]>;
async get(ticketId: number): Promise<Ticket>;
TicketFieldsCollection
Presents all Zendesk tickets fields.
Methods:
select(): BaseObservable<Partial<Field>>;
async insert(value: Omit<Partial<Field>, 'id'>);
async update(fieldId: number, value: Omit<Partial<Field>, 'id'>);
async get(): Promise<Field[]>;
async get(fieldId: number): Promise<Field>;
Example:
import * as rx from 'rxjs';
import * as etl from 'etl-gun';
const zendesk = new etl.Zendesk.Endpoint(process.env.ZENDESK_URL!, process.env.ZENDESK_USERNAME!, process.env.ZENDESK_TOKEN!);
const tickets = zendesk.getTickets();
const PrintAllOpenedTickets$ = tickets.select().pipe(
etl.where({status: 'open'}),
etl.log()
)
etl.run(PrintAllOpenedTickets$);
Telegram
With this endpoint you can create telegram bots and chats with users. It can listen for user messages and send the response massages.
It also can set the user keyboard for the chat.
Endpoint
Methods:
startBot(collectionName: string, token: string, keyboard?: any, guiOptions: CollectionGuiOptions<TelegramInputMessage> = {}): Collection;
releaseBot(collectionName: string);
Collection
Presents all chat bot messages.
Methods:
select(): Observable<T>;
async stop();
async insert(value: TelegramInputMessage);
async insert(chatId: string, message: string);
setKeyboard(keyboard: any)
Example:
import * as etl from "etl-gun";
const telegram = new etl.Telegram.Endpoint();
const bot = telegram.startBot('bot 1', '**********');
const startTelegramBot$ = bot.select().pipe(
etl.log(),
etl.push(bot)
);
etl.run(startTelegramBot$);
Interval
This endpoint is analog of RxJs interval() operator, with GUI support. It emits simple counter, which increments every interval.
Endpoint
Methods:
getSequence(collectionName: string, interval: number, guiOptions: CollectionGuiOptions<number> = {}): Collection;
releaseSequence(collectionName: string);
Collection
Methods:
select(): Observable<number>;
async stop();
async insert(value: number);
async delete();
Example:
import * as etl from "etl-gun";
const timer = new etl.Interval.Endpoint();
const seq = new etl.getSequence('every 500 ms', 500);
const startTimer$ = seq.select().pipe(
etl.log()
);
etl.run(startTimer$);
Operators
Apart from operators from this library, you can use any operators of RxJs library.
run
This function runs one or several streams and return promise to waiting when all streams are complites.
import * as etl from "etl-gun";
let memory = new etl.Memory.Endpoint();
let buffer = memory.getBuffer('test buffer', [1, 2, 3, 4, 5]);
let stream$ = buffer.select().pipe(
etl.log()
);
etl.run(stream$);
log
# etl.log([options])
Prints the value from the stream to the console.
Example:
import * as etl from "etl-gun";
import * as rx from "rxjs";
let stream$ = rx.interval(1000).pipe(
etl.log()
);
etl.run(stream$);
expect
This function checks condition end if false - throws an error to the error collection.
import * as etl from "etl-gun";
let memory = new etl.Memory.Endpoint();
let buffer = memory.getBuffer('test buffer', [{count: 1}, {count: 2}]);
const errorsEndpoint = etl.Errors.getEndpoint();
const errors = errorsEndpoint.getCollection('all');
let stream$ = buffer.select().pipe(
etl.expect('count = 1', { count: 1 }, errors),
etl.expect('count one of [1,2,3]', { count: etl.VALUE.of([1,2,3]) }),
etl.expect('count not > 3', { count: etl.VALUE.not['>'](3) }),
etl.expect('count check function', { count: v => v < 5 }),
etl.log()
);
etl.run(stream$);
where
Variants:
function where<T>(condition: Condition<T>): OperatorFunction<T, T>;
This operator is analog of where operation in SQL and is synonym of the filter operator from the RxJS library - but with improvements. It cat skip some values from the input stream by the specified condition. You can specify predicate function to determine filter conditions or you can specify map object as condition (like typeorm 'where' parameter in find() method).
Example:
import * as etl from "etl-gun";
import * as rx from "rxjs";
let stream$ = rx.interval(1000).pipe(
etl.where(v => v % 2 === 0),
etl.where('count = 1', { count: 1 }),
etl.where('count one of [1,2,3]', { count: etl.VALUE.of([1,2,3]) }),
etl.where('count not > 3', { count: etl.VALUE.not['>'](3) }),
etl.log()
);
etl.run(stream$);
push
# etl.push([options])
This operator call the Endpoint.push method to push value from stream to the specified endpoint.
Variants:
function push<S, T=S>(collection: BaseCollection<T>, options?: PushOptions<S, T> | null): OperatorFunction<S, S>;
function pushAndWait<S, T=S>(collection: BaseCollection<T>, options?: PushOptions<S, T> | null): OperatorFunction<S, S>;
function pushAndLog<S, T=S>(collection: BaseCollection<T>, options?: PushOptions<S, T> | null): OperatorFunction<S, S>;
function pushAndGet<S, T, R>(collection: BaseCollection<T>, options?: PushOptions<S, T> & {toProperty: string} | null): OperatorFunction<S, R>;
Example:
import * as etl from "etl-gun";
import * as rx from "rxjs";
let csv = new etl.Csv.Endpoint();
let dest = csv.getFile('test.csv');
let stream$ = rx.interval(1000).pipe(
etl.push(dest)
);
etl.run(stream$);
rools
# etl.rools([options])
This operator integrates etl engine with Rools business rules engine. It calls the specified rule set and put the current stream data value to it as a fact, and then uses the result of rules execution as a new stream value.
It allows separate business logic of data analysis and transformation from other etl code. You can load rules from any source at runtime and allow to modify it by the end users.
In rules you can:
- Analyse and change any stream value properties
- Call any async methods from 'then' section (the 'when' section is sync, it is a rule engine specific, but 'then' is fully async compatible)
- Add 'etl' property to value with some control flow instructions for etl engine:
- value.etl.skip - set to true to skip futher processing of this value
- value.etl.stop - set to true to stop processing of all remaining collection values
- value.etl.error - set to error message if any error was founded (it raise the exception and stop futher processing)
- Specify priority of rules
- Use rules inheritance
You can write you rules in any order, it does not metter to rules engine. See rools documentation for details.
Specification:
type EtlRoolsResult = {
etl?: {
skip?: boolean;
stop?: boolean;
error?: string;
}
}
function rools<T, R = T>(rools: Rools): rx.OperatorFunction<T, R>;
Example:
import * as etl from "etl-gun";
import * as rx from "rxjs";
import { Rools, Rule } from 'rools';
type DbProduct = {
id: number,
name: string,
price: number,
tax_class_id?: number
}
const ruleSkipCheapProducts = new Rule({
name: 'skip products with price <= 1000',
when: (product: DbProduct) => product.price! <= 1000,
then: (product: DbProduct & EtlRoolsResult) => {
product.etl = {skip: true};
},
});
const ruleSetProductTaxClass = new Rule({
name: 'update product tax class',
when: (product: DbProduct) => product.price! > 1000,
then: (product: DbProduct & EtlRoolsResult) => {
product.tax_class_id = 10;
},
});
const rules = new Rools();
await rules.register([ruleSkipCheapProducts, ruleSetProductTaxClass]);
const db = new etl.databases.MySql.Endpoint(process.env.MYSQL_CONNECTION_STRING!, undefined, 'mysql2');
const table = db.getTable<DbProduct>('products');
const PrintRichProducts$ = table.select().pipe(
etl.rools(rules),
etl.log()
)
await etl.run(PrintRichProducts$);
move
# etl.toProperty([options])
This operator moves to specified property the whole stream value or it's property. Lodash paths is supported.
Example:
import * as etl from "etl-gun";
const memory = etl.Memory.getEndpoint();
const buf = memory.getBuffer<number>('buf', [1,2,3,4,5]);
let stream$ = src.select().pipe(
etl.move<{ nn: number }>({to: 'nn'}),
etl.move<{ num: number }>({from: 'nn', to: 'num'}),
etl.copy<{ num: number, kk: {pp: number} }>('nn', 'kk.pp'),
etl.log()
);
etl.run(stream$);
copy
# etl.toProperty([options])
This operator copy the specified property of the stream value to the another property. Lodash paths is supported.
Example:
import * as etl from "etl-gun";
const memory = etl.Memory.getEndpoint();
const buf = memory.getBuffer<number>('buf', [1,2,3,4,5]);
let stream$ = src.select().pipe(
etl.move<{ nn: number }>({to: 'nn'}),
etl.move<{ num: number }>({from: 'nn', to: 'num'}),
etl.copy<{ num: number, kk: {pp: number} }>('nn', 'kk.pp'),
etl.log()
);
etl.run(stream$);
numerate
# etl.numerate([options])
This operator enumerate input values and add index field to value if it is object or index column if value is array. If the input stream values is objects, you should specify index field name as the second parameter of operator.
Example:
import * as etl from "etl-gun";
let csv = new etl.Csv.Endpoint();
let src = csv.getFile('test.csv');
let stream$ = src.select().pipe(
etl.numerate(10),
etl.log()
);
etl.run(stream$);
addField
# etl.addField([options])
This operator applicable to the stream of objects. It calculate callback function and add result as new field to the input stream value.
Example:
import * as etl from "etl-gun";
const pg = new etl.Postgres.Endpoint('postgres://user:password@127.0.0.1:5432/database');
const table = pg.getTable('users');
const logUsers$ = table.select().pipe(
etl.addField('NAME_IN_UPPERCASE', value => value.name.toUpperCase()),
etl.log()
);
etl.run(logUsers$);
addColumn
# etl.addColumn([options])
This operator applicable to the stream of arrays. It calculate callback function and add result as a new column to the input stream value.
Example:
import * as etl from "etl-gun";
let csv = new etl.Csv.Endpoint();
let src = csv.getFile('test.csv');
const stream$ = src.select().pipe(
etl.addColumn(value => value[2].toUpperCase()),
etl.log()
);
etl.run(stream$);
join
# etl.join([options])
This operator is analog of join operation in SQL. It takes the second input stream as the parameter, and gets all values from this second input stream for every value from the main input stream. Then it merges both values to one object (if values are objects) or to one array (if at least one of values are array), and put the result value to the main stream.
Example:
import * as etl from "etl-gun";
let csv = new etl.Csv.Endpoint();
let src = csv.getFile('test.csv');
let mem = new etl.Memory.Endpoint();
let buffer = mem.getBuffer('buffer 1', [1, 2, 3, 4, 5]);
let stream$ = src.select().pipe(
etl.join(buffer),
etl.log()
);
etl.run(stream$);
mapAsync
# etl.mapAsync([options])
This operator is analog of rxjs map operator for async callback function. It call and wait for callback and then use it's result as new stream item.
Example:
import * as etl from "etl-gun";
let mem = new etl.Memory.Endpoint();
let buffer = mem.getBuffer('urls', ['1.json', '2.json', '3.json']);
const mySite = new HttpClientHelper('http://www.mysite.com/jsons');
let stream$ = src.select().pipe(
mapAsync(async (url) => await mySite.getJson(url)),
etl.log()
);
etl.run(stream$);
Misc
GoogleTranslateHelper
This class help you to use Google translate service.
import { Csv, GoogleTranslateHelper, log, run } from "etl-gun";
let csv = new Csv.Endpoint();
let src = csv.getFile('products.csv');
const translator = new GoogleTranslateHelper(process.env.GOOGLE_CLOUD_API_KEY!, 'en', 'ru');
let translateProducts$ = src.select().pipe(
translator.operator(),
log()
);
await run(translateProducts$);
HttpClientHelper
This class help you to make requests to http and https resources and get data from it.
Methods:
constructor(baseUrl?: string, headers?: Record<string, string>);
async get(url?: string, headers?: Record<string, string>): Promise<Response>;
async getJson(url?: string, headers?: Record<string, string>): Promise<any>;
async getText(url?: string, headers?: Record<string, string>): Promise<string>;
async getBlob(url?: string, headers?: Record<string, string>): Promise<Blob>;
async getFileContents(url?: string, headers?: Record<string, string>): Promise<Blob>;
getJsonOperator<T, R = T>(): OperatorFunction<T, R>;
getJsonOperator<T, R = T>(url: string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getJsonOperator<T, R = T>(getUrl: (value: T) => string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getTextOperator<T>(): OperatorFunction<T, string>;
getTextOperator<T>(url: string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, string>;
getTextOperator<T>(getUrl: (value: T) => string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, string>;
getBlobOperator<T>(): OperatorFunction<T, Blob>;
getBlobOperator<T, R = T>(url: string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getBlobOperator<T, R = T>(getUrl: (value: T) => string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getFileContentsOperator<T>(): OperatorFunction<T, Blob>;
getFileContentsOperator<T, R = T>(url: string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
getFileContentsOperator<T, R = T>(getUrl: (value: T) => string, toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
async post(body: string, url?: string, headers?: Record<string, string>): Promise<Response>;
async postJson(body: any, url?: string, headers?: Record<string, string>): Promise<any>;
async postText(body: string, url?: string, headers?: Record<string, string>): Promise<string>;
postJsonOperator<T, R = T>(bodyParam?: any | ((value: T) => any), urlParam?: string | ((value: T) => string), toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
async put(body: string, url?: string, headers?: Record<string, string>): Promise<Response>;
async putJson(body: any, url?: string, headers?: Record<string, string>): Promise<any>;
async putText(body: string, url?: string, headers?: Record<string, string>): Promise<string>;
putJsonOperator<T, R = T>(bodyParam?: any | ((value: T) => any), urlParam?: string | ((value: T) => string), toProperty?: string, headers?: Record<string, string>): OperatorFunction<T, R>;
async fetch(url: string): Promise<Response>;
async fetch(url: string, method: 'GET' | 'POST' | 'PUT' | 'DELETE', headers: Record<string, string>): Promise<Response>;
async fetch(url: string, init: RequestInit): Promise<Response>;
Example:
import { Csv, HttpClientHelper, run } from "etl-gun";
import { map } from "rxjs";
let csv = new Csv.Endpoint();
let src = csv.getFile('products.csv');
const mySite = new HttpClientHelper('http://www.mysite.com');
let sendProductsToSite$ = src.select().pipe(
map(p => mySite.post(p)),
);
await run(sendProductsToSite$);
This class can store array of column names and convert object to array or array to object representation.
import { Postgres, Csv, Header, log, push, run } from "etl-gun";
import { map } from "rxjs";
const pg = new Postgres.Endpoint("postgres://user:password@127.0.0.1:5432/database");
const source = pg.getTable("users");
let csv = new Csv.Endpoint();
const dest = csv.getFile("users.csv");
const header = new Header([{"id": "number"}, "name", {"login": "string", nullValue: "-null-"}, "email"]);
let sourceToDest$ = source.select().pipe(
map(v => header.objToArr(v)),
push(dest)
);
await run(sourceToDest$);
Utility functions
This functions implements some useful things to manipulate data.
async function wait(delay: number): Promise<void>;
function pathJoin(parts: string[], sep: string = '/'): string;
function extractFileName(path: string): string;
function extractParentFolderPath(path: string): string
function getByJsonPath(obj: {}, jsonPath?: string): any;
function getChildByPropVal(obj: {}, propName: string, propVal?: any): any;
function dumpObject(obj: any, deep: number = 1): string;
function getChildByPropVal(obj: {} | [], propName: string, propVal?: any): any;
License
This library is provided with MIT license.